본 포스팅은 개인적인 공부를 위해 책의 내용을 요약하여 정리하는 글입니다.

작성자는 분산 데이터베이스, 하둡에 대해서 처음 공부해보는 것이니 참고가 된다면 기쁘겠지만 틀린 정보가 있을 수 있습니다.


개발환경 : 우분투 18.04, 하둡 3.0.3


개발환경 구축은 Ssup2 블로그를 참조하였습니다.

(https://ssup2.github.io/record/Hadoop_%EC%84%A4%EC%B9%98_%EC%84%A4%EC%A0%95_Ubuntu_18.04/)


하둡은 여러 언어로 쓰여진 프로그램을 돌릴 수 있다. 이 장에서는 자바, 루비, 파이썬으로 표현된 프로그램을 보도록 하자.


1. A Weather Dataset

예제로 날씨 데이터를 분석하여 각 년도별로 최대 온도가 몇도인지 분석하는 프로그램을 만들어 보자.


1.1 DataFormat

예제 프로그램 작성을 위해 NCDC의 데이터를이용한다.

그리고 문제를 단순화하기 위해 기온 등 기본적인 요소만 사용한다.


* NCDC 데이터 받는 법

이 깃허브 링크로 들어가 전체 프로젝트를 다운받는다.

https://github.com/tomwhite/hadoop-book/


input/ncdc/ 디렉토리의 sample.txt 파일을 이용하면 된다.


2. Unix Tool을 이용한 데이터 분석

Shell Script를 이용한 데이터 분석에 대한 내용.

사실상 하둡과는 무관하므로 생략.


3. Hadoop을 이용한 데이터 분석

하둡의 병렬 처리기능을 이용하려면 쿼리문을 맵리듀스용으로 표현해야한다. 로컬로 소규모 테스트를 마치고 나면 클러스터 머신에서 돌려 볼 수 있다.


3.1 Map and Reduce

맵리듀스 작업은 맵/리듀스 2개의 phase로 구분하여 작동한다. 각 페이즈는 key-value pair를 입출력으로 가지고 그 타입은 프로그래머가 결정한다.


그리고 맵함수와 리듀스함수도 프로그래머가 결정한다.


맵 페이즈의 입력값은 raw NCDC 데이터다. 맵 페이즈에서는 텍스트 입력 포멧을 선택하여 raw 데이터의 각 라인을 텍스트값으로 변형하는데 사용 할 수 있다.


맵 함수는 리듀스 함수가 작업할 수 있게 데이터를 준비하는 단계로, 이번 예제에서 맵 함수는 년도와 온도를 뽑아오는 간단한 함수다.


또한 맵 함수는 빠져있거나, 의심스럽거나, 에러가 있는 베드 레코드(Bad Record)를 걸러내기도 한다.


그리고 리듀스 함수의 역할은 각 년도별로 최대 온도를 찾는 것이다.


데이터 처리 과정


먼저 맵함수는 다음의 입력을 받는다.


0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999

0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999 0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999 0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999


맵 함수에서 처리할 때, 데이터는 다음의 key-value pair로 표현된다. 

(*참고로 여기서 key 값은 파일의 시작점으로부터의 offset이다.)


(0, 0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999)

(106, 0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999) (212, 0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999) (318, 0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999) (424, 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999)


여기서 우리에게 필요한 데이터는 진하게 칠한 년도와 온도 데이터다.


여기서 key 값은 필요없으니 무시하고, 맵 함수는 년도와 기온을 추출하여 결과값으로 내보낸다.

(1950, 0)

(1950, 22)

(1950, -11)

(1949, 111)

(1949, 78)


이제 이 결과값을 리듀스함수로 보내기 전에 맵리듀스 프레임워크가 이 결과값을 가공한다.

맵리듀스 프레임워크의 가공 과정에서는 데이터를 key 값을 기준으로 정렬하고 그룹화한다.


(1949, [111, 78])

(1950, [0, 22, -11])


각 연도별 기온 리스트는 완성되었으니 리듀스함수에서 이 리스트들을 돌며 최대값을 고르면 완료된다.


(1949, 111)

(1950, 22)


3.2 Java MapReduce


이클립스 설정

우선 맵 리듀스 라이브러리 JAR 파일들을 프로젝트에 추가해야한다.

(* 라이브러리가 root 디렉토리에 있는 경우 이클립스를 관리자 권한으로 실행해야 JAR 파일들이 있는 디렉토리에 접근 가능합니다.)


생선한 프로젝트 우클릭-properties-Java Build Path-Libraries 에서 Add External JARs를 클릭하여 필요한 라이브러리들을 모두 추가한다.


Add External JARs....




이제 이번 예제 구현에 사용된 클래스 코드들을 살펴보도록 하자.


Mapper

import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class TestMapClass extends Mapper<LongWritable, Text, Text, IntWritable>{ private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15,19); int airTemperature; if(line.charAt(87) == '+') { airTemperature = Integer.parseInt(line.substring(88,92)); }else{ airTemperature = Integer.parseInt(line.substring(87,92)); }

//퀄리티 코드 String quality = line.substring(92,93);


//퀄리티 코드 == 01459 : 올바르게 읽어졌음.

//올바르게 읽어진 데이터만 결과에 적용한다. if(airTemperature != MISSING && quality.matches("[01459]")) {

//map() 메소드는 context를 통해 결과값을 전달한다.

//결과값은 Hadoop 데이터 타입으로 변환하여 입력한다. context.write(new Text(year), new IntWritable(airTemperature)); } } }


Mapper 클래스는 제네릭 타입이라서 <input key, input value, output key, output value> 4개의 타입 패러미터를 갖는다.

(쉽게 말해서 템플릿을 사용하기 때문에 이 4개의 타입을 결정해줘야 한다.)


여기서 각 타입은 다음과 같다.

input key = Long int = LongWritable

input value = Text Line = Text

output key = Year = Text

output value = Air Temperature = IntWritable


하둡에서는 자바의 기본 타입을 쓰지 않고 새로운 타입을 제공한다.

LongWritable, Text, IntWritable이 각각 Long, String, Integer에 해당되는 하둡의 클래스 타입들이다.

이 하둡 클래스 타입들은 Serialization에 최적화된 클래스 타입들이다.


Reducer

import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int maxValue = Integer.MIN_VALUE; for(IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } }


리듀서 클래스도 제네릭 타입이다.

<input key, input value, output key, output value>를 설정해 줘야하는데, 리듀서 함수의 input 타입은 맵 함수의 output 타입과 같아야 하므로 input key는 Text, input value는 IntWritable이다.

리듀서 함수의 output key는 Text, output value는 연도별 최대 온도값을 IntWritable로 내보낸다.


Main Class

import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxTemperature { public static void main(String[] args)throws Exception{ if(args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } //버전 업으로 사용법이 바뀜 //Job job = new org.apache.hadoop.mapreduce.Job();

Job job = Job.getInstance();


job.setJarByClass(MaxTemperature.class); job.setJobName("Max Temperature");

//입력 경로, 출력 경로 설정 FileInputFormat.addInputPath(job, new Path(args[0]));; FileOutputFormat.setOutputPath(job,new Path(args[1]));

//매퍼, 리듀서 클래스 설정 job.setMapperClass(TestMapClass.class); job.setReducerClass(MaxTemperatureReducer.class);

//output key, value 타입 설정 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }


Job 객체는 작업의 세부사항을 정의하고 일이 어떻게 처리되는지에 대한 통제권을 제공하는 객체다.


하둡 클러스터에서 작업을 수행할 때, main 클래스 코드를 JAR 파일로 만들면 하둡이 클러스터 내에서 이 JAR 파일을 분배한다.


Job 클래스의 setJarByClass() 메소드를 사용하여 클래스를 전달하면 하둡이 해당 클래스를 포함한 JAR 파일을 찾아준다.


Job 객체를 생성한 후에는 FileInputFormat 클래스의 addInputPath() 메소드로 input 경로를 정하는데, input 경로는 파일, 디렉토리, 파일 패턴, 무엇이든 될 수  있고, addInputPath() 메소드를 여러번 불러 다양한 경로를 포함시키는 것도 가능하다.


output 경로는 FileOutputFormat 클래스의 setOutputPath() 메소드로 결정한다. 리듀서 함수의 결과가 이 경로에 쓰여지며, 해당 디렉토리는 비어있어야한다.


3.3 A test Run

이제 구현한 코드로 실제 데이터를 처리해 보도록 하자.


JAR 파일로 export

우선 구현한 프로젝트를 JAR 파일로 export 해야한다.

File - Export에서  JAR file을 선택하고 파일 이름을 정한 후 JAR 파일을 만든다.


finish


HDFS에 파일 등록

하둡을 이용해 데이터를 처리하기 위해서는 우선 HDFS(Hadoop Distributed File System)에 데이터 파일을 등록해야한다.


다음 명령어들을 이용하여 이를 수행한다.

# hadoop fs -mkdir '디렉토리 명'

# hadoop fs -put '파일 명' 'HDFS상 디렉토리'

# hadoop fs -ls '디렉토리 명'

mkdir : 디렉토리 생성

put : HDFS에 파일 등록

ls : 디렉토리 내용 출력


HDFS에 입력 파일 디렉토리 생성 및 파일 등록


출력 파일용 디렉토리 생성


분석하기

이제 yarn 명령어를 이용하여 분석을 실행한다.

# yarn '실행시킬 타입' '실행시킬 파일 이름' '실행시킬 클래스 경로' [-옵션]

여기서 우리는 JAR 파일을 실행 할 것이므로 실행시킬 타입은 jar 이다.

실행시킬 파일 이름은 HDFS에 등록된 파일 들 중 입력 파일로 사용할 파일의 이름이다.

실행시킬 클래스 경로는 main 함수를 포함하는 클래스를 (패키지명.클래스명)으로 입력하면 된다.


분석 시작


분석 과정 출력


분석 결과


분석 결과 1949년도 최고 온도는 11.1도, 1950년도 최고 온도는 2.2도로 측정되었다고 한다.


output 디렉토리 입력시 output 디렉토리는 존재하지 않는 디렉토리여야한다.

분석 결과를 출력 할 때 하둡이 입력한 output 디렉토리 이름으로 새 폴더를 만들 것이다.



4. Scaling out


스케일 아웃 작업을 하기 위해서 데이터는 HDFS에 저장되어야 한다.


4.1 데이터 플로우(Data Flow)


* 맵 리듀스 Job 이란 : 클라이언트가 수행되길 원하는 작업들의 묶음이다. 입력 데이터, 맵리듀스 프로그램, 설정 정보로 구성된다.

(A MapReduce job is a unit of work that the client wants to be performed;)


하둡은 맵리듀스 job을 task로 나누어 처리한다.

이 task는 map task와 reduce task로 나뉘고, task들은 YARN에 의해 스케쥴링되고 클러스터상의 노드에서 실행된다.


하둡은 맵 리듀스 job의 input을 input split, 또는 split이라고 불리는 고정크기의 조각들로 나눈다.

하둡은 각 스플릿(split)마다 하나의 태스크를 만들고 각각의 테스크는 사용자 정의 맵함수를 실행시킨다.


이 스플릿의 크기는 너무 작으면 스플릿과 map task를 관리하는 오버헤드가 너무 커지고, 너무 크면 처리하는데 시간이 오래 걸린다.

그래서 적절한 크기로 스플릿을 나누어야 하는데, 일반적으로 HDFS block의 크기인 128MB가 가장 효율적이다.


Data Locality Optimization

하둡은 HDFS 내의 입력 데이터가 있는 노드에서 태스크를 수행할 때 가장 빠르게 작동한다.

이러한 속성을 데이터 지역성 최적화(Data Locality Optimization) 라고 한다.


*Reduce task는 상관없고 Map task에만 영향이 있는 속성이다.



HDFS block의 크기로 스플릿을 나누는 것이 효율적인 이유.

간혹 HDFS 블록 복제본을 호스팅하는 모든 노드들이 이미 다른 맵 태스크를 수행하고 있는 경우 잡 스캐줄러가 같은 랙 장비 내에서 비어있는 맵 슬롯(map slot)을 가진 노드를 찾으려 하게 된다.

더욱 잘 일어나지 않지만, 다른 랙 장비에 있는 노드를 사용해야하는 경우도 있다. 이런 경우는 렉 장비간 네트워크 통신이 필요하게 된다.

이런 상황들에선 성능이 저하된다.

Data Locality Optimization라는 특성도 네트워크 통신으로 인한 성능저하가 발생하기 때문에 생기는 특성이라고 볼 수 있을 것이다.


이제, HDFS Block의 크기(128MB)로 스플릿을 나누는 것이 효율적인 이유에 대해 얘기해보자면, 한개의 노드에 저장 될 수 있는 가장 큰 크기의 데이터가 128MB이기 떄문이다.

128MB보다 큰 데이터를 저장하려고 하면 두개의 블록에 저장하게 될 것이고, 스플릿들 중 일부는 네트워크를 통해 다른 렉 장비에 있는 노드에 저장될 것이다. 네트워크 작업이 늘어나기 때문에 성능이 저하된다.


(사진 출처 : Hadoop the definitive Guide)

Map Task가 HDFS Block에 접근하는 경우의 수

(a) data-local (같은 노드 내에서 처리함. 가장 효율 적인 상황.)

(b) rack-local (같은 랙 장비 내에서, 다른 노드에 있는 맵 슬롯에  데이터를 전송)

(c) off-lack (다른 랙장비에 있는 맵 슬롯에 네트워크를 통해 데이터를 전송)


Map Task는 output을 HDFS에 저장하지 않고 로컬 디스크에 저장한다.

Map Output은 최종결과가 아닌 reduce task에서 입력 데이터로 사용할 중간 결과다. 모든 작업이 끝나면 불필요하기 때문에 HDFS에 저장하지 않는다.



Reduce Task에는 Data Locality 속성의 이점이 없다.

일반적으로 한개의 Reduce Task의 입력은 모든 매퍼들로부터 오기 때문에 Reduce Task가 입력 데이터를 받는 데에는 네트워크 통신이 불가피하기 때문이다.


Reduce ouput의 첫 번째 복사본만 로컬로 저장되고 나머지는 다른 랙 장비에 저장되어 네트워크 대역폭을 소모하는 작업이 된다. 하지만 이는 HDFS 파이프라인이 소모하는 양 만큼만이다.


Reduce Task의 수는 input size와 상관 없이 독립적으로 명시해줘야 한다. 이것에 관해서는 추후에 다루도록 한다.


Reduce Task가 다수일 때에는 Map Task가 output을 partition으로 나누고, 한 파티션당 한 개의 reduce task에 할당된다.

각 파티션에는 다수의 키 값이 존재할 수 있고, 각 키 값들에 대한 레코드는 각 키와 같은 파티션 내에 포함되어야 한다.


(사진 출처 : Hadoop the definitive Guide)

reduce가 한개일 경우 Map Reduce 데이터 플로우.


(사진 출처 : Hadoop the definitive Guide)

reduce가 여러개일 경우 Map Reduce 데이터 플로우.



Reduce Task가 0개 필요한 경우도 있다. 이는 프로세스가 완전히 평행하게 수행 될 수 있어서 Shuffle 작업이 필요 없는 경우에 유용하다. 이 경우 노드를 벗어나서 데이터를 전송하는 경우는 Map Task가 HDFS에 결과값을 보낼 때 뿐이므로 성능도 괜찮게 나올 수 있다.


(사진 출처 : Hadoop the definitive Guide)

Reducer Task가 없는 경우 Data Flow


4.2 Combiner Function

많은 맵리듀스 job들이 클러스터의 이용가능한 네트워크 대역폭 한계 때문에 성능이 저하된다. 그래서 맵과 리듀스 태스크 사이의 데이터 전송을 최소화하는것이 중요하다.


하둡에서는 개발자가 Combiner function을 정의하게 할 수 있다.

Combiner function은 map의 output을 입력으로 받아서 새로운 output으로 만들고, 그 output을 reduce 함수의 input으로 전달하는 역할을 하는 함수다.

Combiner function은 최적화 함수이므로 몇 번 호출되든 상관 없이 reducer 함수의 output은 동일해야한다.


Combiner function의 contract는 사용될 함수의 타입을 제한한다.


Combiner function에 대한 이해를 돕기 위해, 위에서 봤던 연도별 최고 온도 예제를 생각해 보자.

1950년도의 온도 데이터를 2개의 Mapper가 추출해냈다고 하자.


첫 번째 Mapper 결과:

(1950, 0)

(1950, 20)

(1950, 10)


두 번째 Mapper 결과:

(1950, 25)

(1950, 15)


Combiner function을 따로 정의하지 않으면 Reducer Task는 다음의 데이터를 받는다.


(1950, [0, 20 , 10, 25, 15])


그리고 Reducer Task의 최종 output은 


(1950, 25)


일 것이다.


여기서 Mapper Task의 Output 중 최고 온도만 선별하도록 Combiner Function을 정의하면 Reduce Task에는 다음의 데이터만 전송하면 된다.


(1950, [20, 25])


이렇게 하면 맵과 리듀스 태스크 사이의 데이터 전송량이 최소화 되고, Combiner Function을 몇 번 적용하든지 Reduce Task의 결과값이 변하지 않는다.


MAX(0, 20, 10, 25, 15) = 25


MAX(MAX(0, 20, 10), MAX(25, 15)) =MAX(20, 25) = 25


그리고 이 경우에는 Combiner Function을 몇 번 적용하든지 Reduce Task의 결과값이 변하지 않기 때문에 적용해도 되는데, 그렇지 않은 경우에는 주의해야한다.

예를 들어, 최고 온도가 아닌 평균 온도를 구하는 경우를 생각 해보자.

결과는 다음과 같이 변하게 된다.


mean(0, 20, 10, 25, 15) = 14


mean(mean(0, 20, 10), mean(25, 15)) =mean(10, 20) = 15


Combiner Function 적용

Combiner Function은 Reducer 클래스를 사용하여 정의된다.(Reducer 클래스를 상속하여 구현)

최고 온도 예제에서는 Reducer 함수와 Combiner 함수의 역할이 사실상 똑같기 때문에 MaxTemperatureReducer 클래스를 Combiner로 그대로 사용하면 된다.


Main Class (Combiner 포함)

import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxTemperature { public static void main(String[] args)throws Exception{ if(args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } //버전 업으로 사용법이 바뀜 //Job job = new org.apache.hadoop.mapreduce.Job();

Job job = Job.getInstance();


job.setJarByClass(MaxTemperature.class); job.setJobName("Max Temperature");

//입력 경로, 출력 경로 설정 FileInputFormat.addInputPath(job, new Path(args[0]));; FileOutputFormat.setOutputPath(job,new Path(args[1]));

//매퍼, 리듀서, Combiner 클래스 설정 job.setMapperClass(TestMapClass.class);

job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class);

//output key, value 타입 설정 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }



Running a Distributed MapReduce Job

전체 데이터셋에 대해서 동일한 프로그램을 변환하지 않고 사용해서 처리가능하다.

이것에 관해서는 6장에서 다룬다.


5. 하둡 스트리밍

하둡은 자바 이외의 언어로도 맵, 리듀스 함수를 작성 할 수 있는 API를 제공한다. 그러니 표준 입출력을 사용할 수 있는 어떤 언어든 MapReduce 프로그램 개발에 사용할 수 있다.


스트리밍은 원래 텍스트 프로세싱에 가장 적합하다.

맵 입력 데이터는 표준 입력으로 줄별로 전달되고, 줄별로 출력된다.


맵함수의 Output인 key-value pair는 tab으로 구분되고, 이는 리듀스 함수의 입력에도 동일하게 적용된다.



레퍼런스

Hadoop The Definitive Guide 4th Edition(O'REILLY. Tom White)


하둡 설치

(https://ssup2.github.io/record/Hadoop_%EC%84%A4%EC%B9%98_%EC%84%A4%EC%A0%95_Ubuntu_18.04/)


NCDC 예제 데이터

(https://github.com/tomwhite/hadoop-book/)


Kamang's IT Blog

(https://kamang-it.tistory.com/entry/Hadoop-02%EA%B8%B0%EC%B4%88-%EC%98%88%EC%A0%9C-%EB%A7%8C%EB%93%A4%EA%B8%B0SingleFileWirteRead)

'IT 관련 > Hadoop The Definitive Guide (4th)' 카테고리의 다른 글

Chapter3. The Hadoop Distributed Filesystem  (0) 2019.02.20
Chapter1. Meet Hadoop  (0) 2019.01.17
블로그 이미지

서기리보이

,