'IT 관련/Hadoop The Definitive Guide (4th)'에 해당되는 글 3건

본 포스팅은 개인적인 공부를 위해 책의 내용을 요약하여 정리하는 글입니다.

작성자는 분산 데이터베이스, 하둡에 대해서 처음 공부해보는 것이니 참고가 된다면 기쁘겠지만 틀린 정보가 있을 수 있습니다.


개발환경 : 우분투 18.04, 하둡 3.0.3


데이터셋의 크기가 한개의 물리적 머신의 저장용량보다 커지면 여러개의 머신에 데이터셋을 분할시킬 필요가 있다.


분산 파일 시스템(Distributed File System)

머신들의 네트워크의 저장소를 관리하는 파일 시스템

(Filesystems that manage the storage across a network of machines are called distributed filesystem)


하둡은 HDFS라는 분산 파일 시스템을 가진다. HDFS는 하둡의 주력 파일 시스템이지만, 사실 하둡은 일반목적 파일시스템에 대한 추상화도 가지고 있다. 이번 장에서는 하둡이 어떻게 다른 저장소 시스템(Storage system)(로컬 파일 시스템, 아마존 S3 등)과 통합되는 지 볼 것이다.


1. HDFS의 디자인(The Design of HDFS)

HDFS는 아주 큰 파일들을 스트리밍 데이터 엑세스 패턴으로 저장하도록 디자인 된 파일시스템으로, 상용 하드웨어들의 클러스터 상에서 돌아간다.

다음 용어들을 보도록 하자.


Very large

"Very large"라는 말은 수백 메가바이트, 기가바이트, 테라바이트의 크기를 말한다. 최근에는 페타바이트 단위의 데이터를 저장하는 하둡 클러스터도 있다.


Streaming data access

HDFS는 write-once, read-many-time(한번 쓰고 여러번 읽기)가 가장 효율적인 데이터 처리 패턴이라는 아이디어를 기반으로 설계되었다.  데이터셋은 생성되거나 복제 된 것이고, 그 데이서 셋에 대한 다양한 분석 작업이 이루어 질 것이다. 각각의 분석작업은 데이터셋의 많은 부분들을 필요로 할 것이므로 전체 데이터셋을 읽는 시간이 첫 번째 레코드를 읽는 대기시간보다 중요하다.


*(Streaming Data Access란 랜덤 탐색 없이 파일의 시작점으로 부터 sequential하게 쭉 읽어나가는 것을 말한다.

https://stackoverflow.com/questions/16260535/streaming-data-access-and-latency-in-hadoop-applications)


Commodity hardware

하둡은 비싸고 신뢰할 수 있는 하드웨어를 필요로하지 않는다.

하둡은 상용 하드웨어(Commodity hardware)(여러 상점에서 구할 수 있는 일반적인 하드웨어들)의 클러스터 상에서 돌아가도록 디자인되었기 때문에 클러스터 상의 노드에 오류가 있을 가능성이 높다. HDFS는 그런 오류에도 크게 방해되지 않고 작동되도록 디자인되었다.



HDFS를 사용하는게 부적합 한 경우도 있다. 이 문제는 미래에는 바뀔 수 있지만, 지금은 HDFS가 취약한 부분이다.

다음의 특성을 가지는 어플리케이션에는 HDFS를 사용하는 것이 부적합 할 수 있다.


짧은 반응시간(Low-latency data access)

수십 밀리세컨드 정도로 데이터 접근에 대해 짧은 응답시간(latency)을 필요로하는 어플리케이션은 HDFS와는 잘 맞지 않다. HDFS는 처리량(throughput)이 높지만 응답시간은 느릴 수 있다. 데이터 접근에 짧은 응답시간을 위해서는 HBase(20장에서 다룬다.)이 더 적합하다.


많은 작은 파일(Lots of small files)

메모리 상에 존재하는 네임노드(namenode)가 파일 시스템의 메타데이터를 가지고 있기 때문에, 파일 시스템 상에 존재 할 수 있는 파일의 수는 네임노드의 크기에 의해 제한된다.

일반적으로(As a rule of thumb) 각 파일, 디렉토리, 블록의 크기는 150바이트다. 그러므로 예를 덜어, 각각 1블록을 차지하는 100만개의 파일이 있다고 하면 적어도 300MB의 메모리를 필요하게 된다. 수백만가지의 파일을 저장하는 것이 가능하긴 해도, 수억개의 파일을 저장하는 것은 현재의 하드웨어 기술력으로는 무리다.


다수의 작성자, 무작위적인 변경(Multiple writers, arbitrary file modifications)

HDFS 상의 파일들은 한 명의 writer에 의해 쓰여지는 것이 좋다. Writer는 항상 append-only 방법으로 파일의 맨 끝에 만들어진다. 그래서 다수의 writer나 파일의 무작위적인 오프셋 변경은 지원되지 않는다.


2. HDFS 컨셉


2.1 Blocks

블록은 데이터를 읽고 쓰는 최소한의 크기단위다. 파일 시스템의 블록은 몇 키로바이트 정도이고, 디스크의 블록은 보통 512 바이트다.

HDFS도 블록의 컨셉을 가지는데, 블록의 크기가 기본 128MB로 아주 크다.

다른 블록 개념들과 같이 HDFS도 블록의 크기보다 큰 파일은 블록의 크기만큼 나눠서 저장하는데(예를 들어 블록의 크기가 128MB이고, 200MB의 파일을 저장해야 한다면 한 블록에 128MB, 다른 블록에 나머지를 저장) 다른 블록 개념들과 다르게 블록의 크기보다 작은 파일은 블록 하나를 완전히 소유하지 않는다.(예를 들어 블록의 크기가 128MB이고 파일의 크기가 1MB이면 파일은 128MB를 다 차지하는 것이 아니라 1MB만 차지한다.)


이제부터 Block이라는 단어가 나오면 HDFS의 블록을 얘기하는 것이다.


HDFS의 블록 크기가 큰 이유

HDFS의 블록이 큰 것은 탐색의 코스트(seek time)를 줄이기 위해서다.

디스크에서 한 블록을 읽는데 걸리는 시간은 seek time + transfer time 이다.(seek time : 데이터의 시작점을 찾는 시간. transfer time : 블록의 크기 / transfer rate). 여기서 transfer rate는 디스크의 물리적인 한계가 있으므로 더 빠르게 만들기는 어려우니 블록의 크기를 크게 만들면 transfer time이 크게 늘어나 한 블록을 읽는 시간에 seek time은 무시해도 될 정도로 작은 값으로 취급할 수 있다.

그러므로 블록을 크게 만들면 대용량 데이터를 읽을 때 seek time의 영향이 줄어들고 디스크의 물리적인 속도인 transfer rate과 데이터를 읽는 속도가 거의 비슷해진다.


분산 파일 시스템에서 블록 개념을 도입하는 이점

- 한개의 파일의 크기가 네트워크 내의 디스크 한개의 크기보다 커도 된다.

- 파일 대신 블록으로 추상화하면 저장소의 서브시스템을 간단하게 할 수 있다.

- Fault tolerance와 availability를 위한 복제(replication)에 블록이 더 용이하다.


같은 파일의 블록들이 반드시 한 디스크 내에 존재해야하는 것은 아니다. 그래서 블록 개념을 도입하면 아주 큰 파일이더라도 네트워크상의 여러 디스크에 나눠서 저장 할 수 있다.


분산 시스템의 경우 고장나는 경우의 수가 다양하기 때문에 간단함(simplicity)이 아주 중요하다. 블록은 크기가 고정되어 있기 때문에 저장소 관리를 쉽게 하고,  블록은 데이터 덩어리일 뿐이니 블록에 대한 접근 권한 등 메타데이터가 불필요하다.

그래서 블록 개념을 도입하는것이 간단함을 유지하는데 도움이 된다.


각 블록은 몇개의 머신들에 중복되어 저장된다.(보통 3개의 머신에 나눠서 저장한다.) 한 블록이 이용불가능한 상황이 되면 다른 머신에 있는 중복 데이터를 가져 올 수 있는데, 이 과정은 클라이언트에게 투명하다(transparent).(클라이언트가 이런 과정은 신경쓰지 않아도 원하는 블록을 얻어 올 수 있다.)

그리고 자주 사용되는 파일의 블록들의 경우 좀 더 여러 곳에 복사 해 두어 클러스터 내의 읽기 부하량(read load)를 분산 시킬 수도 있다.



* HDFS의 fsck 명령어는 블록 개념을 이해하고 있는 명령어다. 다음의 명령어를 입력하면 파일시스템 내의 각 파일을 구성하는 블록들을 리스트화 해 줄 것이다.

# hdfs fsck / -files -blocks


2.2 네임노드와 데이터노드














의문점

HDFS에서 다수의 작성자, 무작위적인 변경이 지원되지 않는다는 것의 의미. 아직 이해하지 못함.



레퍼런스

스택 오버플로우. streaming data access에 관한 내용. - https://stackoverflow.com/questions/16260535/streaming-data-access-and-latency-in-hadoop-applications


스택 오버플로우. HDFS의 블록이 큰 이유. - https://stackoverflow.com/questions/22353122/why-is-a-block-in-hdfs-so-large

'IT 관련 > Hadoop The Definitive Guide (4th)' 카테고리의 다른 글

Chapter2. Map Reduce  (0) 2019.01.23
Chapter1. Meet Hadoop  (0) 2019.01.17
블로그 이미지

서기리보이

,

본 포스팅은 개인적인 공부를 위해 책의 내용을 요약하여 정리하는 글입니다.

작성자는 분산 데이터베이스, 하둡에 대해서 처음 공부해보는 것이니 참고가 된다면 기쁘겠지만 틀린 정보가 있을 수 있습니다.


개발환경 : 우분투 18.04, 하둡 3.0.3


개발환경 구축은 Ssup2 블로그를 참조하였습니다.

(https://ssup2.github.io/record/Hadoop_%EC%84%A4%EC%B9%98_%EC%84%A4%EC%A0%95_Ubuntu_18.04/)


하둡은 여러 언어로 쓰여진 프로그램을 돌릴 수 있다. 이 장에서는 자바, 루비, 파이썬으로 표현된 프로그램을 보도록 하자.


1. A Weather Dataset

예제로 날씨 데이터를 분석하여 각 년도별로 최대 온도가 몇도인지 분석하는 프로그램을 만들어 보자.


1.1 DataFormat

예제 프로그램 작성을 위해 NCDC의 데이터를이용한다.

그리고 문제를 단순화하기 위해 기온 등 기본적인 요소만 사용한다.


* NCDC 데이터 받는 법

이 깃허브 링크로 들어가 전체 프로젝트를 다운받는다.

https://github.com/tomwhite/hadoop-book/


input/ncdc/ 디렉토리의 sample.txt 파일을 이용하면 된다.


2. Unix Tool을 이용한 데이터 분석

Shell Script를 이용한 데이터 분석에 대한 내용.

사실상 하둡과는 무관하므로 생략.


3. Hadoop을 이용한 데이터 분석

하둡의 병렬 처리기능을 이용하려면 쿼리문을 맵리듀스용으로 표현해야한다. 로컬로 소규모 테스트를 마치고 나면 클러스터 머신에서 돌려 볼 수 있다.


3.1 Map and Reduce

맵리듀스 작업은 맵/리듀스 2개의 phase로 구분하여 작동한다. 각 페이즈는 key-value pair를 입출력으로 가지고 그 타입은 프로그래머가 결정한다.


그리고 맵함수와 리듀스함수도 프로그래머가 결정한다.


맵 페이즈의 입력값은 raw NCDC 데이터다. 맵 페이즈에서는 텍스트 입력 포멧을 선택하여 raw 데이터의 각 라인을 텍스트값으로 변형하는데 사용 할 수 있다.


맵 함수는 리듀스 함수가 작업할 수 있게 데이터를 준비하는 단계로, 이번 예제에서 맵 함수는 년도와 온도를 뽑아오는 간단한 함수다.


또한 맵 함수는 빠져있거나, 의심스럽거나, 에러가 있는 베드 레코드(Bad Record)를 걸러내기도 한다.


그리고 리듀스 함수의 역할은 각 년도별로 최대 온도를 찾는 것이다.


데이터 처리 과정


먼저 맵함수는 다음의 입력을 받는다.


0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999

0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999 0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999 0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999


맵 함수에서 처리할 때, 데이터는 다음의 key-value pair로 표현된다. 

(*참고로 여기서 key 값은 파일의 시작점으로부터의 offset이다.)


(0, 0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999)

(106, 0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999) (212, 0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999) (318, 0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999) (424, 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999)


여기서 우리에게 필요한 데이터는 진하게 칠한 년도와 온도 데이터다.


여기서 key 값은 필요없으니 무시하고, 맵 함수는 년도와 기온을 추출하여 결과값으로 내보낸다.

(1950, 0)

(1950, 22)

(1950, -11)

(1949, 111)

(1949, 78)


이제 이 결과값을 리듀스함수로 보내기 전에 맵리듀스 프레임워크가 이 결과값을 가공한다.

맵리듀스 프레임워크의 가공 과정에서는 데이터를 key 값을 기준으로 정렬하고 그룹화한다.


(1949, [111, 78])

(1950, [0, 22, -11])


각 연도별 기온 리스트는 완성되었으니 리듀스함수에서 이 리스트들을 돌며 최대값을 고르면 완료된다.


(1949, 111)

(1950, 22)


3.2 Java MapReduce


이클립스 설정

우선 맵 리듀스 라이브러리 JAR 파일들을 프로젝트에 추가해야한다.

(* 라이브러리가 root 디렉토리에 있는 경우 이클립스를 관리자 권한으로 실행해야 JAR 파일들이 있는 디렉토리에 접근 가능합니다.)


생선한 프로젝트 우클릭-properties-Java Build Path-Libraries 에서 Add External JARs를 클릭하여 필요한 라이브러리들을 모두 추가한다.


Add External JARs....




이제 이번 예제 구현에 사용된 클래스 코드들을 살펴보도록 하자.


Mapper

import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class TestMapClass extends Mapper<LongWritable, Text, Text, IntWritable>{ private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15,19); int airTemperature; if(line.charAt(87) == '+') { airTemperature = Integer.parseInt(line.substring(88,92)); }else{ airTemperature = Integer.parseInt(line.substring(87,92)); }

//퀄리티 코드 String quality = line.substring(92,93);


//퀄리티 코드 == 01459 : 올바르게 읽어졌음.

//올바르게 읽어진 데이터만 결과에 적용한다. if(airTemperature != MISSING && quality.matches("[01459]")) {

//map() 메소드는 context를 통해 결과값을 전달한다.

//결과값은 Hadoop 데이터 타입으로 변환하여 입력한다. context.write(new Text(year), new IntWritable(airTemperature)); } } }


Mapper 클래스는 제네릭 타입이라서 <input key, input value, output key, output value> 4개의 타입 패러미터를 갖는다.

(쉽게 말해서 템플릿을 사용하기 때문에 이 4개의 타입을 결정해줘야 한다.)


여기서 각 타입은 다음과 같다.

input key = Long int = LongWritable

input value = Text Line = Text

output key = Year = Text

output value = Air Temperature = IntWritable


하둡에서는 자바의 기본 타입을 쓰지 않고 새로운 타입을 제공한다.

LongWritable, Text, IntWritable이 각각 Long, String, Integer에 해당되는 하둡의 클래스 타입들이다.

이 하둡 클래스 타입들은 Serialization에 최적화된 클래스 타입들이다.


Reducer

import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int maxValue = Integer.MIN_VALUE; for(IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } }


리듀서 클래스도 제네릭 타입이다.

<input key, input value, output key, output value>를 설정해 줘야하는데, 리듀서 함수의 input 타입은 맵 함수의 output 타입과 같아야 하므로 input key는 Text, input value는 IntWritable이다.

리듀서 함수의 output key는 Text, output value는 연도별 최대 온도값을 IntWritable로 내보낸다.


Main Class

import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxTemperature { public static void main(String[] args)throws Exception{ if(args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } //버전 업으로 사용법이 바뀜 //Job job = new org.apache.hadoop.mapreduce.Job();

Job job = Job.getInstance();


job.setJarByClass(MaxTemperature.class); job.setJobName("Max Temperature");

//입력 경로, 출력 경로 설정 FileInputFormat.addInputPath(job, new Path(args[0]));; FileOutputFormat.setOutputPath(job,new Path(args[1]));

//매퍼, 리듀서 클래스 설정 job.setMapperClass(TestMapClass.class); job.setReducerClass(MaxTemperatureReducer.class);

//output key, value 타입 설정 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }


Job 객체는 작업의 세부사항을 정의하고 일이 어떻게 처리되는지에 대한 통제권을 제공하는 객체다.


하둡 클러스터에서 작업을 수행할 때, main 클래스 코드를 JAR 파일로 만들면 하둡이 클러스터 내에서 이 JAR 파일을 분배한다.


Job 클래스의 setJarByClass() 메소드를 사용하여 클래스를 전달하면 하둡이 해당 클래스를 포함한 JAR 파일을 찾아준다.


Job 객체를 생성한 후에는 FileInputFormat 클래스의 addInputPath() 메소드로 input 경로를 정하는데, input 경로는 파일, 디렉토리, 파일 패턴, 무엇이든 될 수  있고, addInputPath() 메소드를 여러번 불러 다양한 경로를 포함시키는 것도 가능하다.


output 경로는 FileOutputFormat 클래스의 setOutputPath() 메소드로 결정한다. 리듀서 함수의 결과가 이 경로에 쓰여지며, 해당 디렉토리는 비어있어야한다.


3.3 A test Run

이제 구현한 코드로 실제 데이터를 처리해 보도록 하자.


JAR 파일로 export

우선 구현한 프로젝트를 JAR 파일로 export 해야한다.

File - Export에서  JAR file을 선택하고 파일 이름을 정한 후 JAR 파일을 만든다.


finish


HDFS에 파일 등록

하둡을 이용해 데이터를 처리하기 위해서는 우선 HDFS(Hadoop Distributed File System)에 데이터 파일을 등록해야한다.


다음 명령어들을 이용하여 이를 수행한다.

# hadoop fs -mkdir '디렉토리 명'

# hadoop fs -put '파일 명' 'HDFS상 디렉토리'

# hadoop fs -ls '디렉토리 명'

mkdir : 디렉토리 생성

put : HDFS에 파일 등록

ls : 디렉토리 내용 출력


HDFS에 입력 파일 디렉토리 생성 및 파일 등록


출력 파일용 디렉토리 생성


분석하기

이제 yarn 명령어를 이용하여 분석을 실행한다.

# yarn '실행시킬 타입' '실행시킬 파일 이름' '실행시킬 클래스 경로' [-옵션]

여기서 우리는 JAR 파일을 실행 할 것이므로 실행시킬 타입은 jar 이다.

실행시킬 파일 이름은 HDFS에 등록된 파일 들 중 입력 파일로 사용할 파일의 이름이다.

실행시킬 클래스 경로는 main 함수를 포함하는 클래스를 (패키지명.클래스명)으로 입력하면 된다.


분석 시작


분석 과정 출력


분석 결과


분석 결과 1949년도 최고 온도는 11.1도, 1950년도 최고 온도는 2.2도로 측정되었다고 한다.


output 디렉토리 입력시 output 디렉토리는 존재하지 않는 디렉토리여야한다.

분석 결과를 출력 할 때 하둡이 입력한 output 디렉토리 이름으로 새 폴더를 만들 것이다.



4. Scaling out


스케일 아웃 작업을 하기 위해서 데이터는 HDFS에 저장되어야 한다.


4.1 데이터 플로우(Data Flow)


* 맵 리듀스 Job 이란 : 클라이언트가 수행되길 원하는 작업들의 묶음이다. 입력 데이터, 맵리듀스 프로그램, 설정 정보로 구성된다.

(A MapReduce job is a unit of work that the client wants to be performed;)


하둡은 맵리듀스 job을 task로 나누어 처리한다.

이 task는 map task와 reduce task로 나뉘고, task들은 YARN에 의해 스케쥴링되고 클러스터상의 노드에서 실행된다.


하둡은 맵 리듀스 job의 input을 input split, 또는 split이라고 불리는 고정크기의 조각들로 나눈다.

하둡은 각 스플릿(split)마다 하나의 태스크를 만들고 각각의 테스크는 사용자 정의 맵함수를 실행시킨다.


이 스플릿의 크기는 너무 작으면 스플릿과 map task를 관리하는 오버헤드가 너무 커지고, 너무 크면 처리하는데 시간이 오래 걸린다.

그래서 적절한 크기로 스플릿을 나누어야 하는데, 일반적으로 HDFS block의 크기인 128MB가 가장 효율적이다.


Data Locality Optimization

하둡은 HDFS 내의 입력 데이터가 있는 노드에서 태스크를 수행할 때 가장 빠르게 작동한다.

이러한 속성을 데이터 지역성 최적화(Data Locality Optimization) 라고 한다.


*Reduce task는 상관없고 Map task에만 영향이 있는 속성이다.



HDFS block의 크기로 스플릿을 나누는 것이 효율적인 이유.

간혹 HDFS 블록 복제본을 호스팅하는 모든 노드들이 이미 다른 맵 태스크를 수행하고 있는 경우 잡 스캐줄러가 같은 랙 장비 내에서 비어있는 맵 슬롯(map slot)을 가진 노드를 찾으려 하게 된다.

더욱 잘 일어나지 않지만, 다른 랙 장비에 있는 노드를 사용해야하는 경우도 있다. 이런 경우는 렉 장비간 네트워크 통신이 필요하게 된다.

이런 상황들에선 성능이 저하된다.

Data Locality Optimization라는 특성도 네트워크 통신으로 인한 성능저하가 발생하기 때문에 생기는 특성이라고 볼 수 있을 것이다.


이제, HDFS Block의 크기(128MB)로 스플릿을 나누는 것이 효율적인 이유에 대해 얘기해보자면, 한개의 노드에 저장 될 수 있는 가장 큰 크기의 데이터가 128MB이기 떄문이다.

128MB보다 큰 데이터를 저장하려고 하면 두개의 블록에 저장하게 될 것이고, 스플릿들 중 일부는 네트워크를 통해 다른 렉 장비에 있는 노드에 저장될 것이다. 네트워크 작업이 늘어나기 때문에 성능이 저하된다.


(사진 출처 : Hadoop the definitive Guide)

Map Task가 HDFS Block에 접근하는 경우의 수

(a) data-local (같은 노드 내에서 처리함. 가장 효율 적인 상황.)

(b) rack-local (같은 랙 장비 내에서, 다른 노드에 있는 맵 슬롯에  데이터를 전송)

(c) off-lack (다른 랙장비에 있는 맵 슬롯에 네트워크를 통해 데이터를 전송)


Map Task는 output을 HDFS에 저장하지 않고 로컬 디스크에 저장한다.

Map Output은 최종결과가 아닌 reduce task에서 입력 데이터로 사용할 중간 결과다. 모든 작업이 끝나면 불필요하기 때문에 HDFS에 저장하지 않는다.



Reduce Task에는 Data Locality 속성의 이점이 없다.

일반적으로 한개의 Reduce Task의 입력은 모든 매퍼들로부터 오기 때문에 Reduce Task가 입력 데이터를 받는 데에는 네트워크 통신이 불가피하기 때문이다.


Reduce ouput의 첫 번째 복사본만 로컬로 저장되고 나머지는 다른 랙 장비에 저장되어 네트워크 대역폭을 소모하는 작업이 된다. 하지만 이는 HDFS 파이프라인이 소모하는 양 만큼만이다.


Reduce Task의 수는 input size와 상관 없이 독립적으로 명시해줘야 한다. 이것에 관해서는 추후에 다루도록 한다.


Reduce Task가 다수일 때에는 Map Task가 output을 partition으로 나누고, 한 파티션당 한 개의 reduce task에 할당된다.

각 파티션에는 다수의 키 값이 존재할 수 있고, 각 키 값들에 대한 레코드는 각 키와 같은 파티션 내에 포함되어야 한다.


(사진 출처 : Hadoop the definitive Guide)

reduce가 한개일 경우 Map Reduce 데이터 플로우.


(사진 출처 : Hadoop the definitive Guide)

reduce가 여러개일 경우 Map Reduce 데이터 플로우.



Reduce Task가 0개 필요한 경우도 있다. 이는 프로세스가 완전히 평행하게 수행 될 수 있어서 Shuffle 작업이 필요 없는 경우에 유용하다. 이 경우 노드를 벗어나서 데이터를 전송하는 경우는 Map Task가 HDFS에 결과값을 보낼 때 뿐이므로 성능도 괜찮게 나올 수 있다.


(사진 출처 : Hadoop the definitive Guide)

Reducer Task가 없는 경우 Data Flow


4.2 Combiner Function

많은 맵리듀스 job들이 클러스터의 이용가능한 네트워크 대역폭 한계 때문에 성능이 저하된다. 그래서 맵과 리듀스 태스크 사이의 데이터 전송을 최소화하는것이 중요하다.


하둡에서는 개발자가 Combiner function을 정의하게 할 수 있다.

Combiner function은 map의 output을 입력으로 받아서 새로운 output으로 만들고, 그 output을 reduce 함수의 input으로 전달하는 역할을 하는 함수다.

Combiner function은 최적화 함수이므로 몇 번 호출되든 상관 없이 reducer 함수의 output은 동일해야한다.


Combiner function의 contract는 사용될 함수의 타입을 제한한다.


Combiner function에 대한 이해를 돕기 위해, 위에서 봤던 연도별 최고 온도 예제를 생각해 보자.

1950년도의 온도 데이터를 2개의 Mapper가 추출해냈다고 하자.


첫 번째 Mapper 결과:

(1950, 0)

(1950, 20)

(1950, 10)


두 번째 Mapper 결과:

(1950, 25)

(1950, 15)


Combiner function을 따로 정의하지 않으면 Reducer Task는 다음의 데이터를 받는다.


(1950, [0, 20 , 10, 25, 15])


그리고 Reducer Task의 최종 output은 


(1950, 25)


일 것이다.


여기서 Mapper Task의 Output 중 최고 온도만 선별하도록 Combiner Function을 정의하면 Reduce Task에는 다음의 데이터만 전송하면 된다.


(1950, [20, 25])


이렇게 하면 맵과 리듀스 태스크 사이의 데이터 전송량이 최소화 되고, Combiner Function을 몇 번 적용하든지 Reduce Task의 결과값이 변하지 않는다.


MAX(0, 20, 10, 25, 15) = 25


MAX(MAX(0, 20, 10), MAX(25, 15)) =MAX(20, 25) = 25


그리고 이 경우에는 Combiner Function을 몇 번 적용하든지 Reduce Task의 결과값이 변하지 않기 때문에 적용해도 되는데, 그렇지 않은 경우에는 주의해야한다.

예를 들어, 최고 온도가 아닌 평균 온도를 구하는 경우를 생각 해보자.

결과는 다음과 같이 변하게 된다.


mean(0, 20, 10, 25, 15) = 14


mean(mean(0, 20, 10), mean(25, 15)) =mean(10, 20) = 15


Combiner Function 적용

Combiner Function은 Reducer 클래스를 사용하여 정의된다.(Reducer 클래스를 상속하여 구현)

최고 온도 예제에서는 Reducer 함수와 Combiner 함수의 역할이 사실상 똑같기 때문에 MaxTemperatureReducer 클래스를 Combiner로 그대로 사용하면 된다.


Main Class (Combiner 포함)

import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxTemperature { public static void main(String[] args)throws Exception{ if(args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } //버전 업으로 사용법이 바뀜 //Job job = new org.apache.hadoop.mapreduce.Job();

Job job = Job.getInstance();


job.setJarByClass(MaxTemperature.class); job.setJobName("Max Temperature");

//입력 경로, 출력 경로 설정 FileInputFormat.addInputPath(job, new Path(args[0]));; FileOutputFormat.setOutputPath(job,new Path(args[1]));

//매퍼, 리듀서, Combiner 클래스 설정 job.setMapperClass(TestMapClass.class);

job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class);

//output key, value 타입 설정 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }



Running a Distributed MapReduce Job

전체 데이터셋에 대해서 동일한 프로그램을 변환하지 않고 사용해서 처리가능하다.

이것에 관해서는 6장에서 다룬다.


5. 하둡 스트리밍

하둡은 자바 이외의 언어로도 맵, 리듀스 함수를 작성 할 수 있는 API를 제공한다. 그러니 표준 입출력을 사용할 수 있는 어떤 언어든 MapReduce 프로그램 개발에 사용할 수 있다.


스트리밍은 원래 텍스트 프로세싱에 가장 적합하다.

맵 입력 데이터는 표준 입력으로 줄별로 전달되고, 줄별로 출력된다.


맵함수의 Output인 key-value pair는 tab으로 구분되고, 이는 리듀스 함수의 입력에도 동일하게 적용된다.



레퍼런스

Hadoop The Definitive Guide 4th Edition(O'REILLY. Tom White)


하둡 설치

(https://ssup2.github.io/record/Hadoop_%EC%84%A4%EC%B9%98_%EC%84%A4%EC%A0%95_Ubuntu_18.04/)


NCDC 예제 데이터

(https://github.com/tomwhite/hadoop-book/)


Kamang's IT Blog

(https://kamang-it.tistory.com/entry/Hadoop-02%EA%B8%B0%EC%B4%88-%EC%98%88%EC%A0%9C-%EB%A7%8C%EB%93%A4%EA%B8%B0SingleFileWirteRead)

'IT 관련 > Hadoop The Definitive Guide (4th)' 카테고리의 다른 글

Chapter3. The Hadoop Distributed Filesystem  (0) 2019.02.20
Chapter1. Meet Hadoop  (0) 2019.01.17
블로그 이미지

서기리보이

,

본 포스팅은 개인적인 공부를 위해 책의 내용을 요약하여 정리하는 글입니다.

작성자는 분산 데이터베이스, 하둡에 대해서 처음 공부해보는 것이니 참고가 된다면 기쁘겠지만 틀린 정보가 있을 수 있습니다.


사전 지식

- 맵 리듀스(Map Reduce) : 대용량 데이터 처리를 분산 병렬 컴퓨팅에서 처리하기 위해서 제작한 프레임워크



1. Data!


IDC의 예측에 따르면 "디지털 세상"은 4.4 제타바이트에 달한다.(1제타파비트 = 1억테라바이트).

현재는 데이터시데이며 IoT로 인해 생성되는 데이터는 더욱 많아질 것이다.

일부 분야에서는 알고리즘의 퀄리티 보다는 테이터의 양이 더 중요하다.

이런 상황에서 대용량의 데이터를 관리하고, 처리하는 방법에 대한 개발이 매우 중요해졌다.


2. Data Storage and Analysis


기술 발전으로 인해 하드 디스크 등 저장장치의 용량(capacity)은 매년 매우 커지는 중이지만, 드라이브에서 데이터를 읽고 쓰는 접근 속도(access rate)는 크게 발전하지 못하고 있다.

접근 속도를 높이는 방법으로 여러개의 디스크에 데이터를 나눠서 저장해두고 한번에 병렬적으로 읽는 방법이 있다.


다수의 디스크에 병렬적으로 데이터를 저장하는 것의 문제점(어려움)

- 하드웨어 장치가 많으니 그 중에 고장나는 장치가 발생할 가능성이 높다.

- 여러 디스크에 분산 저장된 데이터를 하나로 모아야하는 경우가 발생할 수 있다.


하드디스크 1개에 데이터를 저장할 때 하드디스크가 고장날 확률보다 하드디스크 100개에 데이터를 저장할 때 고장나는 하드디스크가 생길 확률이 더 높다. 장치가 하나라도 고장나면 데이터 전체를 쓸 수없게 될 수도 있는데, 이 문제는 하나의 데이터를 여러 디스크에 중복(replication)되게 저장하여 어느정도 해결 할 수 있다.


그리고 여러 디스크에 분산 저장된 데이터를 하나로 모으는 작업은 매우 어렵다. 맵 리듀스를 사용하면 디스크 읽고 쓰기의 문제를 키값과 value값에 대한 계산문제로 문제를 추상화기 때문에 이러한 어려움이 해결된다.



3. Querying All Your Data


맵 리듀스는 일괄 쿼리 프로세서(batch query processor)이기 때문에 전체 데이터 집합에 대해 비정형 쿼리(ad hoc query)를 보내는 능력과 그 결과를 빠르게 가져오는 능력이 있다.


하지만 반응형 분석에는 부적합하다.


4. Beyond Batch(일괄처리를 넘어서)


맵 리듀스는 일괄처리 시스템이라서 반응형 분석에는 부적합하다. 

하지만 하둡은 반응형 SQL, 반응형 프로세싱, 스트림 프로세싱, 검색 기능과도 잘 동작한다. 맵 리듀스를 활용하여 개발되었지만, 하둡은 일괄처리 프로세싱의 한계를 뛰어넘은 것이다.


YARN의 등장으로 하둡은 일괄처리 프로세싱 모델 이외에 다른 프로세싱 모델들을 적용할 수 있다.

*YARN : 어떤 분산처리 프로그램이라도 하둡 클러스터의 데이터로 실행가능하게 만드는 클러스터 리소스 관리 프로그램


5. Comparison with Other Systems(다른 시스템과의 비교)


5.1 Relational Database Management Systems(RDBMS)

 

RDBMS 

MapReduce 

데이터 크기 

기가바이트(Gigabytes) 단위 처리에 효율적

페타바이트(Petabytes) 단위 처리에 효율적

접근 방식 

반응형 및 일괄처리

(Interactive and batch) 

일괄처리(Batch)

 업데이트

업데이트가 자주 있어도 효율적 

업데이트가 자주 이뤄지지 않고, 읽기가 많을 때 효율적 

트랜잭션 특성 

ACID 

없음 

구조 

Schema-on-write 

Schema-on-read 

Scaling 

비선형(Non-linear) 

선형(Linear) 


* schema on read : 데이터의 스키마를 데이터를 읽는 시점에 확인한다.(schema on write는 반대로 쓰는 시점에 확인)

(참조 : http://datacookbook.kr/90)


맵 리듀스 데이터 병렬 처리

맵 리듀스는 데이터를 분할해 병렬적으로 처리하는 것이 가능하다.


Unstructured/Semi-Structured Data

비정형 데이터(Unstructured Data)는 미리 정의된 방식으로 정리되지 않은 데이터, 반정형 데이터(Semi-Structured Data)는 정형구조를 준수하지 않는 정형 데이터를 말한다.


RDBMS는 미리 정의된 형식을 따르는 정형 데이터(Structured)만 처리 할 수 있지만, 하둡은 Scheam-on-read, 즉 데이터를 읽는 시점에 데이터의 스키마를 확인하기 때문에 비정형 데이터, 반정형 데이터도 처리 할 수 있다.

따라서 유연성이 높다고 할 수 있고, RDBMS에서의 데이터 로딩 단계가 필요하지 않다.


Normalization 

관계형 데이터는 무결성(Integrity) 유지와 중복제거를 위해 자주 정규화(normalization)된다.

하둡에서 정규화를 적용하면 특정 레코드를 읽는 작업이 local opeartion이 아닌 non-local opeartion이 되어 문제가 발생 할 수 있다.

그리고 하둡은 빠른 속도로 스트리밍 읽기/쓰기를 수행할 수 있어야 하기 때문에 그 점도 정규화 적용 시 문제가 된다.


로그 파일이 정규화 되지 않은 대표적인 예시이다. (로그파일이 정규화되지 않는 예시로, 로그파일에는 고객의 이름이 여러 로그에 중복되어 나타나는 경우가 많다.) 그래서 로그파일 분석에는 하둡이 적합하다.



풀리지 않은 의문점

5.1에서 하둡에는 RDBMS에서의 데이터 로딩 단계가 필요하지 않은 이유



레퍼런스

Schema-on-read의 이해 - http://datacookbook.kr/90

비정형 데이터 

https://ko.wikipedia.org/wiki/%EB%B9%84%EC%A0%95%ED%98%95_%EB%8D%B0%EC%9D%B4%ED%84%B0

반정형 데이터 

https://ko.wikipedia.org/wiki/%EB%B0%98%EC%A0%95%ED%98%95_%EB%8D%B0%EC%9D%B4%ED%84%B0


'IT 관련 > Hadoop The Definitive Guide (4th)' 카테고리의 다른 글

Chapter3. The Hadoop Distributed Filesystem  (0) 2019.02.20
Chapter2. Map Reduce  (0) 2019.01.23
블로그 이미지

서기리보이

,