Spark Summary
Spark
1. Spark
- 하이브는 MapReduce 연산을 통해 데이터에 접근함.
- 허나, 시간이 지날수록 데이터는 방대해지고 MapReduce의 배치처리 연산에 따른 DISK I/O가 빈번하게 발생하기 때문에 latency가 높아져서 처리 성능이 낮아짐.
- 반면 Spark는 memory에서 작동하기 때문에, 저장 및 연산을 더욱 효율적으로 할 수 있음.
1) Spark 주요 내용
- 스파크의 클러스터 모드 종류 :
- Standalone, Apache Mesos, Yarn
- Spark RDD(Resilient Distributed Dataset) :
- 스파크 프로그래밍의 기초 데이터셋 모델
- Spark DataFrame :
- Spark RDD에서 데이터를 더 효율적으로 다루기 위해 개발. 자료들을 RDBMS의 테이블과 같이 다룰 수 있으며, 이 테이블을 다루기 위해 SparkSQL이란 것을 사용 가능
- Spark DataSets :
- Spark DataFrame에서 더욱 발전. DataSets은 DataFrame과 달리 스키마 및 데이터타입에 매우 안정적인 특징을 가지기 때문에 조금의 오차도 허용하지 않음.
- 때문에, 그만큼 데이터 변조 위험이 적지만 대신 flexible한 변형이 불가능함.
- Scala 기준으로 DataSets은 DataSets[(case class를 이용한 사용자 지정 스키마] / DataFrame은 DataSets[Row]라는 데이터 타입을 가짐
- 스파크 세션(SparkSession)
- 데이터셋을 다루기 위해 SparkSession을 알아야 한다.
- RDD를 생성하기 위해 SparkContext가 필요했던 것처럼 데이터프레임을 생성하기 위해서는 SparkSession을 이용해야 한다.
- SparkSession은 인스턴스 생성을 위한 build() 메서드를 제공하고, 이 메서드를 이용하면 기존 인스턴스를 재사용하거나 새로운 인스턴스를 생성할 수 있다.
- 만약 스파크셸을 사용하다면 스파크 셸이 자동으로 spark라는 이름으로 SparkSession 인스턴스를 생성해주므로 별도의 생성 단계를 거치지 않고 spark라는 변수를 통해 접근할 수 있다.
- SparkSession 이전에는 Hive 사용 여부에 따라 SQLContext와 HiveContext라는 별도의 API를사용했지만 SparkSession에서는 Hive 지원이 기본 사항으로 바뀌면서 기존 API는 더이상 사용하지 않게 되었다.
- RDD를 생성하려면 SparkContext 객체를 먼저 생성하고, DataFrame 또는 DataSet을 생성하려면 SparkSession 객체를 먼저 생성해야 한다. 하지만 정확하게 말하면 스파크세션 객체 안에는 스파크컨텍스트 객체가 포함돼 있기 때문에 RDD를 만들때나 데이터프레임을 만들때나 상관없이 스파크세션 객체를 먼저 생성하면 된다.
- SparkSession = SparkContext + Session정보를 담은 내용
1 2 3 4 5 6 7 8 9 10 11 12 13 14
from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession\ .builder\ .appName("Test")\ .master("local[*]")\ .getOrCreate() spark.stop()
2) Spark 지원 서비스
Spark SQL : SQL 방식으로 스파크 RDD 프로그래밍 지원
Spark Streaming : 스트리밍 데이터를 마이크로타임의 배치로 나누어 실시간 처리
Spark MLib : 스파크에서 머신러닝 프로그래밍(군집, 분류, 추천) 지원
Spark GraphX : 다양한 유형의 네트워크 구조 분석을 지원
2. Spark Driver / Executor
Driver는 RDD 프로그램을 분산 노드에서 실행하기 위한 Task의 구성, 할당, 계획 등을 수립.
Executor는 Task를 직접 실행한다.
Driver : 드라이버란 프로그램의 main() 메소드가 실행되는 프로세스를 말한다.
- 드라이버는 SparkContext를 생성하고 RDD를 만들고 트랜스포메이션과 액션을 실행하는 사용자 코드를 실행하는 프로세스이다.
스파크 셸을 실행할 때 이미 드라이버 프로그램을 만든 것이다(스파크 셸은 미리 로딩된 sc라고 불리는 SparkContext 객체를 갖고 시작한다) 드라이버가 종료되면 애플리케이션도 종료된다.
- 출처: https://12bme.tistory.com/437?category=682904
Executor : 스파크 각각의 애플리케이션을 위해 익스큐터를 생성한다. 클러스터 워커노드마다 생성되며, 익스큐터 프로세스는 애플리케이션 코드를 실행하고, 필요할 때마다 데이터를 메모리나 디스크에 캐시.
Task : 익스큐터에서 실행할 수 있는 작업 단위. 워커 노드의 익스큐터는 많은 스레드를 갖고 있는데 각 태스크는 이들 중 하나의 스레드에서 실행된다. 태스크는 계산을 하고, 그 결과를 반환하거나 셔플(Shuffle) 오퍼레이션과 같은 오퍼레이션 실행
Job과 Stage : 잡은 태스크들의 집합으로 병렬 계산 가능. 잡당 할당되는 태스크 수는 데이터 파티션의 숫자에 좌우됨. 스테이지는 job을 의존성이 있는 작은 태스크 세트로 나눈 것
3. Spark Cluster Manager :
- 스파크 실행 환경을 구성하는 클러스터 관리자
- Spark Standalone
- YARN
- Mesos
- Mesos 또한 아파치 재단의 오픈소스로서 분산 환경 클러스터를 제어 담당하는 역할을 한다.
4. Spark Join의 원리 :
Shuffle 데이터가 한 노드에서 다른 노드로 이동하는 것을 말한다
- Narrow Transformation :
- filter(), map(), flatMap()과 같이 이미 키로 파티셔닝 되어있고, 키의 값만 변화 시키려는 경우
- Wide Transformation :
- groupByKey(), reduceByKey() 처럼 서로 다른 파티션들로부터 특정한 값을 추출하고 싶은 경우
- Shuffle은 주로 Wide Transformation이다
- Wide Transformation의 종류
1) Shuffled Hash Join : 두 테이블을 서로 Join할 때 발생하며, 서로 공통되는 키 값을 가질 경우 동일한 파티션으로 계속 이동하게 된다.
2) Broadcast Hash Join : 예를 들어 A와 B를 조인 시킬 때, B의 크기가 극도로 작은 경우에는 B의 파티션이 모든 Executor에 복사가 된다. 그 후, 공통되는 부분에 대해서만 값을 찾아 내면 된다. (Shuffle이 일어나지 않음)