하둡 완벽 가이드 - 19. Spark
본문 바로가기


Programmer/hadoop

하둡 완벽 가이드 - 19. Spark

사용자의 입장으로만 하둡을 바라보게 되어 깊이가 부족하다는 생각을 하게 되었다.
하둡 완벽 가이드를 읽고 이해한대로 정리한다.


아파치 스파크는 대용량 데이터 처리를 위한 클러스터 컴퓨팅 프레임워크다.
스파크는 맵리듀스를 사용하지 않고 클러스터 기반으로 작업을 실행하는 자체 분산 런타임 엔진이 있다. YARN기반으로 실행할 수 있고, 하둡 파일 포맷과 HDFS 같은 기반 저장소를 지원한다.

잡 사이의 대용량 작업 데이터셋을 메모리상에 유지하는 특성으로, 매번 디스크에서 데이터셋을 읽는 맵리듀스 워크플로우에 비해 10배 이상 더 빠르다. 반복적 알고리즘이나 대화형 분석과 같은 애플리케이션에 적합하다.

인메모리 캐싱 뿐 아니라 DAG 엔진과 사용자 경험을 제공하여 사용자에게 큰 매력이 있다. (https://www.databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html)
맵리듀스와 달리 스파크의 DAG엔진은 연산자 중심의 파이프라인을 처리해서 사용자를 위한 단일 잡으로 변환한다.

19.1 스파크 설치

바이너리 배포판의 stable 버전을 설치한다! 압축 풀고 PATH를 추가한다.

19.2 예제

예제에서는 RDD를 중점적으로 고려하면 된다.

RDD
Resilient Distributed dataset (탄력적인 분산 데이터셋)으로, 클러스터에 있는 다수의 머신에 분할되어 저장된 읽기 전용 컬렉션이다. 
전형적 스파크 프로그램은 하나 또는 그 이상의 RDD를 입력받고, 일련의 변환 작업을 거쳐 목표 RDD 집합으로 변형된다.
RDD의 Resilient는 유실된 파티션이 있을 때 스파크가 RDD의 처리 과정을 다시 계산하여 자동으로 복구할 수 있다는 의미다.

19.2.1 스파크 애플리케이션, 잡, 스테이지, 태스크

애플리케이션 > 잡 > 스테이지 > 태스크
1: N : N : N
  • 태스크: 가장 작은 작업 단위로, 각 태스크는 맵리듀스의 태스크와 같이 클러스터에 분산된 RDD 파티션에서 병렬로 실행
  • 스테이지: 임의의 방향성 비순환 그래프(directed acyclic graph, DAG)
  • 잡: 맵리듀스와 유사한 개념으로, 맵리듀스보다 더 일반적이다. 하나의 맵과 하나의 리듀스로 구성된 단일 맵리듀스의 잡과 달리 스파크의 잡은 Stage로 구성된다. 항상 RDD 및 공유변수를 제공하는 애플리케이션의 콘텍스트 내에서 실행
  • 애플리케이션: 하나 이상의 잡을 수행할 수 있고, 직렬 또는 병렬로 실행되며, 동일한 애플리케이션에서 수행된 이전 잡에서 캐싱된 RDD에 접근할 수 있는 메커니즘을 제공한다.

19.2.2 스칼라 독립 애플리케이션

스칼라 독립 애플리케이션은 스칼라로 원하는 프로그램을 구현하고 아래와 같은 방식으로 제출하면 된다.

% spark-submit --class 클래스명 --master local jar파일명 arg0 arg1 ...

--class : 스파크에 애플리케이션의 클래스 이름을 전달하는 옵션
--master: 잡이 실행되는 위치 (여기서는 local, 로컬 머신에 있는 단일 JVM에서 잡을 실행)

스칼라 뿐만 아니라 자바나 파이썬으로도 구현하여 동일하게 실행시킬 수 있다.

19.3 탄력적인 분산 데이터셋 RDD

19.3.1 생성

RDD를 만드는 세가지 방법이 있다.

Parallelizing (병렬) 컬렉션

val params = sc.parallelize(1 to 10)
val result = params.map(performExpensiveComputation)

parallelize의 두 번째 인자로 병렬 처리 수준을 재정의할 수 있다.

HDFS 같은 기존 외부 저장소의 데이터 셋 사용

val text: RDD[String] = sc.textFile(inputPath)

textFile은 내부적으로 구버전 맵리듀스 API의 TextInputFormat을 이용한다.
따라서, HDFS의 경우 HDFS 블록당 스파크 파티션도 하나다. 물론 두 번째 인자로 파일을 스플릿하는 값을 지정할 수 있다.

text 파일 뿐 만 아니라 시퀀스파일도 가능하다.

기존의 RDD 변환

다음 절에서 설명한다.

 

19.3.2 트랜스포메이션과 액션

map 메서드는 transformation, foreach의 println은 action

val text = sc.textFile(inputPath)
val lower: RDD[String] = text.map(_.toLowerCase())
lower.foreach(println(_))
  • 트랜스포메이션
    • 기존의 RDD에서 새로운 RDD를 생성
    • 호출되면 바로 작동되는 것이 아니라 lazy하게 작동되어 action이 발생해야 작동됨
    • Return type이 RDD
    • Spark의 라이브러리에 RDD와 관련된 트랜스포메이션으로 mapping, grouping, aggregating, repartitioning, sampling, joining RDD, set으로 RDD 다루기 등의 기능이 있음
  • 액션
    • RDD에 적용된 작업들을 트리거하고, 사용자에게 특정 결과를 돌려주거나 외부 저장소에 저장
    • 호출되면 바로 작동됨
    • Return type이 RDD가 아님
    • Spark의 라이브러리에 RDD와 관련된 액션으로 RDD를 컬렉션으로 materializing(구체화), RDD의 통계적 계산, RDD에서 몇개의 데이터 샘플링, RDD를 외부 저장소에 저장 등의 기능이 있음
MapReduce in Spark
Spark의 map()과 redude() 기능은 하둡의 맵리듀스와 완전히 같지는 않다. 하둡의 작동 방식은 아래와 같다.
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)

두개 모두 다수의 출력 페어를 반환한다. 이것은 Spark의 flatMap()과 동일하다. Spark의 Map()과 flatMap()은 다음과 같은 결과를 낸다.

scala> val l = List(1, 2, 3) l: List[Int] = List(1, 2, 3)
scala> l.map(a => List(a))
res0: List[List[Int]] = List(List(1), List(2), List(3))
scala> l.flatMap(a => List(a)) res1: List[Int] = List(1, 2, 3)

 

완전히 하둡의 맵리듀스와 동일하게 구현하려면 다음과 같이 구현해야한다.
val input: RDD[(K1, V1)] = ...
val mapOutput: RDD[(K2, V2)] = input.flatMap(mapFn)
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput.groupByKey().sortByKey()
val output: RDD[(K3, V3)] = shuffled.flatMap(reduceFn)

꼭 위와같이 개발하기 보다는 적절하게 구현하는게 더 좋다.

Aggregation transformations

key로 RDD를 집계하는 대표적인 transformation 함수는 reduceByKey(), foldByKey(), aggregateByKey() 세가지가 있다. 약간 다르지만, 세개 모두 키를 기반으로 집계된다. (동일한 역할을 하는 action함수는 reduce(), fold(), aggregate()가 있다)

  • reduceByKey()
val pairs: RDD[(String, Int)] = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
val sums: RDD[(String, Int)] = pairs.reduceByKey(_+_)
assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))

parallelize로 생성된 array는 다른 파티션에서 다른 task로 분산, 작동되기 때문에 어느 값이 먼저 계산 되던지 간에 같은 결과를 내야한다.

  • foldByKey():
val sums: RDD[(String, Int)] = pairs.foldByKey(0)(_+_)
assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))

foldByKey는 값을 제공해야하는게 다르다. 첫번째 제공한 값이 제일 먼저 연산되지만, 그 뒤 순서는 바뀔 수 있다. reduceByKey랑 비슷하다.

  • aggregateByKey()
val sets: RDD[(String, HashSet[Int])] = pairs.aggregateByKey(new HashSet[Int])(_+=_, _++=_)
assert(sets.collect().toSet === Set(("a", Set(1, 3, 5)), ("b", Set(7))))

aggregateByKey는 초기 값을 주고 (여기서는 빈 HashSet[Int]) 두개의 함수를 전달한다. 첫번째 함수는 HashSet[Int] 에 Int가 적재되는 방식이고, 두번째 함수는 HashSet[Int] 자체를 어떻게 결합할지에 대한 정보이다.

19.3.3 영속성

cache() 함수로 데이터를 메모리에 임시로 쌓아둘 수 있다. cache()를 부른다고 해서 바로 RDD를 메모리에 캐시하지는 않고, RDD에 Spark job이 작동하면 캐시되어야한다고 플래그를 달아둔다.

scala> tuples.cache()
res1: tuples.type = MappedRDD[4] at map at <console>:18
scala> tuples.reduceByKey((a, b) => Math.max(a, b)).foreach(println(_)) 
INFO BlockManagerInfo: Added rdd_4_0 in memory on 192.168.1.90:64640
INFO BlockManagerInfo: Added rdd_4_1 in memory on 192.168.1.90:64640
(1950,22)
(1949,111)

위 코드를 보면 BlockManagerInfo의 info를 통해 0,1 파티션을 가진 4번 RDD가 캐시되었음을 확인할 수 있다.

영속성 레벨

cache()를 부르는건 executor의 메모리에 RDD의 각각의 파티션을 유지 시킨다는 것이다. 이때 executor가 RDD 파티션을 저장할 메모리가 없으면 계산이 실패하고 처음부터 다시 계산해야하게 된다. 반드시 cache가 되어야 프로그램의 성능이 좋아지는 경우들이 종종 있는데, 이런 경우를 위해 Spark는 StorageLevel 설정으로 영속성 방식을 설정할 수 있는 persist() 함수를 제공한다.

기본적으로는 MEMORY_ONLY이지만, MEMORY_ONLY_SER, MEMORY_AND_DISK, MEMORY_AND_DISK_SER 도 가능하다. 그 외에 더 복잡한 설정도 가능하다. 그건 Spark 도큐먼트를 보자.

  • MEMORY_ONLY_SER: CPU를 조금 더 사용해서(시리얼라이징을 위해) 시리얼라이징 하여 압축해 저장할 수 있고
    1byte array로 저장되 가비지 컬렉션의 부담을 줄일 수 있다.
  • MEMORY_AND_DISK: 데이터가 메모리보다 넘치면 disk에 spill
  • MEMORY_AND_DISK_SER: 시리얼라이징 한 데이터가 메모리보다 넘치면 disk에 spill

19.3.4 직렬화

Spark은 크게 두가지 serialization 이 있다. data와 function(or closures)

Data

기본적으로 Spark는 executor 사이에 데이터를 네트워크로 전송할 때 나 cache()를 persist(MEMORY_ONLY_SER)로 호출하면 Java serialization을 사용한다. 이건 프로그래머들이 쉽게 이해할 수 있지만 성능이나 사이즈 측변에서는 별로다.

거의 모든 Spark 프로그램에게 더 나은 선택지는 Kryo serialization이다. Java를 위한 더 효과적인 범용 목적의 serialization 라이브러리이다. SparkConf에서 아래와 같이 설정하면  Kryo serialization를 사용할 수 있다.

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

추가 구현도 필요 없다. 그리고 클래스를 등록하는것도 더쉽다. serialization을 사용하기 전에 크라이오를 이용한 클래스를 등록하는 것이 더 효율적이다. (?)

Functions

흔히, 함수의 serialization 은 그냥 작업 자체이다. Spark는 심지어 로컬모드에서 함수가 실행될때도 serialize 되어서 실수로 함수를 non-serializable하게 만들어도 개발 중간에 금방 발견할 수 있을 것이다.

19.4 공유 변수

스파크 프로그램은 RDD의 일부가 아닌 데이터에 접근할 일이 빈번하게 발생한다. 예를들어.. 아래 프로그램은 lookup이라는 Map을 사용해야한다. 이걸 더 효과적인 방법으로, broadcast 변수를 사용해서 해결할 수 있다.

val lookup = Map(1->"a",2->"e",3->"i",4->"o",5->"u")
val result = sc.parallelize(Array(2, 1, 3)).map(lookup(_))
assert(result.collect().toSet === Set("a", "e", "i"))

19.4.1 Broadcast 변수

Broadcast 변수는 serialize 되어 각각의 executor에 전송되어 캐싱된다. 일반 변수와 다르게 closure의 부분으로 serialize되어 task마다 한번씩 네트워크를 통해 전송된다. Broadcast 변수는 맵리듀스에서의 분산캐시와 비슷한 역할을 한다. Spark는 메모리가 넘쳐서 disk에 spilling 될때만 disk를 쓴다는 것만 빼고.

SparkContext에 broadcast() 메서드로 Broadcast 변수를 만들 수 있다.

val lookup: Broadcast[Map[Int, String]] = sc.broadcast(Map(1 -> "a", 2 -> "e", 3 -> "i", 4 -> "o", 5 -> "u"))
val result = sc.parallelize(Array(2, 1, 3)).map(lookup.value(_))
assert(result.collect().toSet === Set("a", "e", "i"))

이름에서 알 수 있듯이 Broadcast 변수는 driver에서 task로 전송만 가능하다. 반대 방향으로 값을 더하기를 원한다면 accumulator를 사용해야한다.

19.4.2 Accumulators

Accumulator는 맵리듀스의 counter와 같이 더할수만 있는 공유 변수이다. Job이 종료되면 accumulator의 최종 값은 driver 프로그램이 검색할 수 있다.

val count: Accumulator[Int] = sc.accumulator(0)
val result = sc.parallelize(Array(1, 2, 3)).map(i=>{count+=1;i}).reduce((x, y) => x + y)
assert(count.value === 3)
assert(result === 6)

accumulator 변수인 count는 accumulator() 매서드로 생성되고, value로 값에 접근할 수 있다. 여기서는 Int만 사용했지만, 어떤 type이든 가능하다. accumulate 변수를 mutable collections로 쓰는것도 가능하다 (accumulableCollection()으로!)

19.5 Spark Job Run 뜯어보기

Spark Job을 한번 잘 살펴보자. 가장 상위에는 두개의 독립적인 개체가 있다.

  • Driver: application의 host와 job의 task를 스케쥴링, 보통 cluster 매니저가 관리하지 않는 클라이언트로 동작
  • Executors:  application에 종속되어 application 실행 중에 application의 task를 실행하는 주체, 보통 cluster의 머신 위에서 동작.

19.5.1 Job 제출

Spark job은 count() 같은 액션 RDD에 실행되면 자동적으로 제출된다. 내부적으로 SparkContext에 runJob()이 호출되고, driver에서 작동되는 스케쥴러를 호출한다(step 2). 스케쥴러는 job을 stage의 DAG로 나누는 DAG 스케쥴러와 cluster의 각각의 stage에 task를 제출하는 task 스케쥴러 두 부분으로 나눠진다.

19.5.2 DAG Construction

job이 어떻게 stage로 나누어지는지 이해하려면 stage에서 실행될 수 있는 task의 유형을 살펴봐야한다. 두개의 타입이 존재한다.

  • shuffle map tasks
    • 맵리듀스의 맵 부분에서 일어나는 셔플과 비슷하다. 각각의 shuffle map task는 하나의 RDD 파티션에서 계산되고, 새 파티션에 출력값이 쓰여지며 이후 스테이지에서 실행된다.
    • 최종 stage를 제외하고 어느 stage던지 실행될 수 있다.
    • 간단한 Spark job들은 해당 task가 필요하지 않는 경우도 많다.
  • result tasks
    • 사용자의 프로그램에서 결과를 반환하는 최종 stage에서 실행된다.
    • 각각의 result tasksms RDD 파티션에서 계산이 실행되고, 결과를 driver에 반환하고, driver가 각각의 partition에서 최종 결과를 모은다.   
val hist: Map[Int, Long] = sc.textFile(inputPath)
                             .map(word => (word.toLowerCase(), 1))
                             .reduceByKey((a, b) => a + b)
                             .map(_.swap)
                             .countByKey()

위와 같은 코드를 통해 나오는 스파크 잡의 스테이지는 다음 그림과 같다

여기서 reduceByKey() 트랜스포메이션이 두 개의 스테이지에 걸쳐있다. 셔플로 구현되어있고, 이 리듀스 함수는 맵리듀스와 유사하게 맵 부분(스테이지1)에서는 컴바이너로, 리듀스 부분(스테이지 2)에서는 리듀서로 동작하기 때문이다.

19.5.3 Task Scheduling

task 스케쥴러가 task들을 받으면 application을 위해 작동하고 있는 executor의 목록을 찾고, 배치 우선권이 있는 익스큐터에 각 태스크를 매핑한다. 그 다음에 빈 코어가 있는 익스큐터에 태스크를 할당한다(이때, 동일한 애플리케이션에서 실행되는 다른 잡이 있으면 안할 수도 있다). 모든 태스크가 완료될 때까지 반복한다.

프로세스 -> 노드 -> 랙 순으로 지역성을 고려하고, 후보가 없으면 임의 태스크를 할당하거나 투기적 태스크를 시도한다.

태스크가 할당되면 스케쥴러의 백엔드에서 구동을 위한 작업이 시작된다 (4단계). 스케줄러 백엔드는 익스큐터 백엔드에 태스크를 구동하라는 원격 메시지를 보내고(5단계) 익스큐터는 해당 태스크를 실행한다(6단계).

익스큐터는 태스크가 완료되거나 실패하면 드라이버에 상태 변경 메시지를 전송한다. 실패하면 태스크 스케쥴러가 다른 익스큐터에 해당 태스크를 다시 보낸다.

19.5.4 Task 수행

익스큐터는 다음 순서로 태스크를 실행한다(7단계).

  1. 태스크의 JAR 및 파일 의존성을 검사해 최신인지 확인한다. 변경되면 다시 내려받는다
  2. 태스크 구동 메시지의 일부로 전송된 직렬화 바이트를 사용자 함수가 포함된 태스크 코드로 역직렬화한다.
  3. 태스크 코드를 수행한다. 익스큐터와 동일한 JVM에서 실행되므로 태스크 실행에 따른 프로세스 오버헤드는 없다

태스크가 완료되면 드라이버에 결과를 반환하고, 그 결과는 serialize 되어 executor 백엔드로 전송된다. 그리고 상태변경 메시지를 드라이버에 돌려준다. 셔플 맵 태스크는 다음 스테이지를 위해 출력 파티션에 대한 정보를 반환하고, 결과 태스크는 처리한 파티션의 결괏값을 반환한다. 드라이버는 반환된 값을 취합하여 최종 결과를 사용자 프로그램에 돌려준다.

19.6 Executor와 Cluster Manager

executor의 생명주기를 관리하는 것은 클러스터 매니저이다. 다양한 클러스터 매니저가 있다.

  • 로컬 모드 매니저
    • 드라이버와 동일한 JVM에서 실행되는 단일 익스큐터.
    • 마스터 URL: local (단일 스레드 사용), local[n](n개의 스레드 사용) 또는 local(*)(컴퓨터 전체의 코어를 사용, 코어당 하나의 스레드)
  • 독립 클러스터 매니저
    • 단일 스파크 마스터와 하나 이상의 워커로 실행되는 간단한 분산 방식
    • 마스터는 애플리케이션을 대신해 모든 워커에 익스큐터 프로세스를 생성하도록 요청
    • 마스터 URL: spark://host:port
  • 아파치 메소스
    • 범용 클러스터 자원 관리자로, 조직의 정책에 따라 다수의 애플리케이션이 세밀하게 자원을 공유
    • 기본 설정인 fine-grained(미세 단위) 모드에서 각 스파크 태스크는 메소스 태스크로 실행 (클러스터의 자원 관리는 효율적이나, 프로세스 구동 시 오버헤드. 스파크 애플리케이션이 실행되는 동안 익스큐터 프로세스가 클러스터의 자원을 계속 유지하기 때문)
    • 마스터 URL: mesos://host:port
  • YARN
    • 하둡에서 사용하는 리소스 매니저
    • 실행되는 각 스파크 애플리케이션은 YARN 애플리케이션의 인스턴스며, 각 익스큐터는 자체의 YARN 컨테이너에서 실행된다.
    • 마스터 URL: yarn-client 혹은 yarn-cluster

19.6.1 YARN에서 스파크 실행

기존에 하둡 클러스터가 있고, 스파크를 추가로 사용할 때 가장 편리한 방법이다. 두가지 배포 모드가 있다.

YARN 클라이언트 모드

spark-shell이나 pyspark와 같은 대화형 컴포넌트가 필요하다. 해당 모드에서 스파크 익스큐터를 구동하는 방법은 다음과 같다.

  1. 드라이버 프로그램이 새로운 SparkContext 인스턴스를 생성할 때 YARN과 연결된다.
  2. SparkContext는 YARN 애플리케이션을 YARN 리소스 매니저에 제출하고.
  3. 클러스터의 노드 매니저에서 YARN 컨테이너를 시작하고 스파크 ExecutorLauncher 애플리케이션 마스터를 실행한다.
  4. ExecutorLauncher 잡은 YARN 컨테이너에서 익스큐터를 시작하고, 필요한 자원을 리소스 매니저에 요청한다.
  5. 할당받은 컨테이너에서 ExecutorBackend 프로세스를 구동한다.

각 익스큐터가 시작되면 SparkContext가 태스크를 실행할 수 있는 가용 익스큐터 수와 위치에 대한 정보를 알 수 있기 때문에 반대로 SparkContext와 연결하고 자신을 등록한다.

실행될 익스큐터 수를 옵션으로 지정할 수도 있다.

% spark-shell --master yarn-client --num-executors 4 --executor-cores 1 --executor-memory 2g

YARN 클러스터 모드

전체 애플리케이션이 클러스터에서 실행되어 운영 잡에 적합하다. 사용자의 드라이버 프로그램은 YARN 애플리케이션 마스터 프로세스의 내부에서 실행된다. 아래와 같은 명령어로 실행할 수 있다.

% spark-submit --master yarn-cluster ...

해당 모드에서 스파크 익스큐터를 구동하는 방법은 다음과 같다.

1. spark-submit 클라이언트는 YARN 애플리케이션을 구동한다. (사용자 코드는 실행하지 않는다)
3b. 애플리케이션 마스터가 드라이버 프로그램을 시작하고
4. 익스큐터에 자원을 할당한다.

그외는 클라이언트모드와 동일하다.

두개의 YARN 모드에서 익스큐터는 데이터 지역성 정보가 없는 상태에서 구동된다. 따라서 스파크 잡이 접근할 파일을 관리하는 데이터 노드에 함께 배치되지 않는다. 대화형 세션에서는 이러한 방식을 허용할 수 밖에 없지만, 운영 잡에는 적합 하지 않으므로, 스파크는 YARN 클러스터에서 익스큐터를 실행할 때 데이터 지역성을 높이기 위해 배치 정책에 대한 정보를 미리 알려주는 방식을 추가로 제공한다.

SparkContext 생성자는 InputFormatInfo 헬퍼 클래스를 이용하여 입력 포맷과 경로로부터 계산한 선호 위치를 두 번째 인수로 취할 수 있다. 이렇게 얻은 선호 위치는 애플리케이션 마스터가 리소스 매니저에게 자원 할당을 요청할 때 사용된다.