RDD(Resilient Distributed Datasets)
http://weejw.tistory.com/46 에서 이론적으로 공부를 했다. 이때당시 스파크를 아예 처음봐서 지금 보니 너무 대충해놨었다. 다시한번 보자
1. RDD(Resilient Distributed Datasets) * Resilient : 탄력있는
1) 다수의 서버에 분산되어 저장되어있는 데이터 집합 요소
2) 장애 발생시 스스로 복구 가능한 내성(tolerance) 보유
3) RDD는 "Partition" 이라는 더 작은 단위로 나뉘며 작업 수행시 파티션 단위로 나누어서 병렬 처리 수행
4) RAM에 기록된 RDD생성 내역을 Read-Only
2. 리니지(Lineage) : RDD를 생성하기 위해 수행한 연산 과정을 기록하여 복구시 사용
3-1. Transformation
- RDD형태를 변형하는 연산으로 RDD를 생성하지만 리니지만 늘어나며 실제 연산이 이루어지지 않음
-Transformations are lazy 최적의 수행방법을 찾아 처리할 수 있는 장점을 지님
ⓐ 변환의 종류
- 좁은(Narrow) 변환 : map(),filter()연산등이 해당되며 연산에 필요한 파티션이 무조건 부모 RDD의 단일 파티션에 존재
- 넓은(Wide) 변환 : groupbyKey(),reducebyKey()연산등이 해당되며 연산에 필요한 파티션이 상위 RDD 어디에든 존재 할 수 있음
그림1. 좁은 변환 그림2. 넓은 변환
ⓑ 연산의 종류
Map과 관련된 연산 |
요소간의 mapping을 정의한 함수를 RDD에 속하는 모든 요소에 적용 |
그룹과 관련된 연산 |
특정 조건에 따라 요소를 그룹화하거나 특정 함수 적용 |
집합과 관련된 연산 |
RDD에 포함된 요소를 하나의 집합으로 간주할 때 서로 다른 RDD간에 합,교집합 등 계산 |
파티션과 관련된 연산 |
RDD의 파티션수 조정 |
필터와 정렬과 관련된 연산 |
특정 조건만을 만족하는 요소를 선태갛거나 각 요소를 기준에따라 정렬 |
3-2 . Action : 비로소 실제 작업이 수행되며 결과는 RDD가 아닌 Int,Long등 다른 타입을 통칭해서 부르는 용어
4. 브로드 캐스트 변수
- 스파크 잡이 실행 되는 동안 클러스트 내의 모든 서버에서 공유할 수 있는 읽기 전용 자원을 설정할 수 있는 변수
- 액션 연산을 수행할 때 동일한 스테이지 내에서 실행되는 태스크 간에는 내부적으로 브로드캐스트 변수 생성후 전달
val broadcastUser = sc.broadcast(Set("u1","u2"))
val rdd = sc.parallelize(List("u1","u3","u4"))
val result = rdd.filter(broadcastUser.value.contains(_))
[result]
u1
5. 어큐물레이터
- 스파크 잡이 실행 되는 동안 클러스트 내의 모든 서버에서 공유할 수 있는 쓰기 전용 자원을 설정할 수 있는 변수
- RDD의 트랜스포메이션,액션 연산 내부에서는 어큐물레이터의 값을 증가만 시킬수 있고 참조사용은 불가능
val acc1 = sc.longAccumulato("invalidFormat")
val acc2 = sc.collectionAccumulator[String]("invalidFormat2")
val data = List("U1:Addr1","U2","U3","U4;Addr2","U5:Addr3")
sc.parallelize(data,3).foreach{v=>
if(v.split(":".length !=2){
acc1.add(1L)
acc2.add(v)}
}
println("잘못된 데이터 수: "+acc1.value)
printfln("잘못된 데이터:"+acc2.value)
[value]
잘못된 데이터 수 : 3
잘못된 데이터 : U2,U3,U4;Addr2