• RDD(Resilient Distributed Datasets)

    2018. 2. 27. 16:43

    by. 위지원

    http://weejw.tistory.com/46 에서 이론적으로 공부를 했다. 이때당시 스파크를 아예 처음봐서 지금 보니 너무 대충해놨었다. 다시한번 보자


    1. RDD(Resilient Distributed Datasets) * Resilient : 탄력있는


    1) 다수의 서버에 분산되어 저장되어있는 데이터 집합 요소

    2) 장애 발생시 스스로 복구 가능한 내성(tolerance) 보유

    3) RDD는 "Partition" 이라는 더 작은 단위로 나뉘며 작업 수행시 파티션 단위로 나누어서 병렬 처리 수행

    4) RAM에 기록된 RDD생성 내역을 Read-Only


    2. 리니지(Lineage) : RDD를 생성하기 위해 수행한 연산 과정을 기록하여 복구시 사용


    3. Transformation과 Action


    3-1. Transformation

    - RDD형태를 변형하는 연산으로 RDD를 생성하지만 리니지만 늘어나며 실제 연산이 이루어지지 않음

    -Transformations are lazy 최적의 수행방법을 찾아 처리할 수 있는 장점을 지님


    ⓐ 변환의 종류

    - 좁은(Narrow) 변환 : map(),filter()연산등이 해당되며 연산에 필요한 파티션이 무조건 부모 RDD의 단일 파티션에 존재

    - 넓은(Wide) 변환 : groupbyKey(),reducebyKey()연산등이  해당되며 연산에 필요한 파티션이 상위 RDD 어디에든 존재 할 수 있음


                

    그림1. 좁은 변환                                 그림2. 넓은 변환


    ⓑ 연산의 종류

    *연산의 종류는 우측 링크에서 자세하게 확인할 수 있음. *https://data-flair.training/blogs/spark-rdd-operations-transformations-actions/


    Map과 관련된 연산

    요소간의 mapping을 정의한 함수를 RDD에 속하는 모든 요소에 적용

     그룹과 관련된 연산

    특정 조건에 따라 요소를 그룹화하거나 특정 함수 적용

     집합과 관련된 연산

     RDD에 포함된 요소를 하나의 집합으로 간주할 때 서로 다른 RDD간에 합,교집합 등 계산

     파티션과 관련된 연산

     RDD의 파티션수 조정

     필터와 정렬과 관련된 연산

     특정 조건만을 만족하는 요소를 선태갛거나 각 요소를 기준에따라 정렬



    3-2 . Action : 비로소 실제 작업이 수행되며 결과는 RDD가 아닌 Int,Long등 다른 타입을 통칭해서 부르는 용어


    4. 브로드 캐스트 변수

    - 스파크 잡이 실행 되는 동안 클러스트 내의 모든 서버에서 공유할 수 있는 읽기 전용 자원을 설정할 수 있는 변수

    - 액션 연산을 수행할 때 동일한 스테이지 내에서 실행되는 태스크 간에는 내부적으로 브로드캐스트 변수 생성후 전달


    val broadcastUser = sc.broadcast(Set("u1","u2"))

    val rdd = sc.parallelize(List("u1","u3","u4"))

    val result = rdd.filter(broadcastUser.value.contains(_))


    [result]

      u1



    5. 어큐물레이터

    - 스파크 잡이 실행 되는 동안 클러스트 내의 모든 서버에서 공유할 수 있는 쓰기 전용 자원을 설정할 수 있는 변수

    - RDD의 트랜스포메이션,액션 연산 내부에서는 어큐물레이터의 값을 증가만 시킬수 있고 참조사용은 불가능


    val acc1 = sc.longAccumulato("invalidFormat")

    val acc2 = sc.collectionAccumulator[String]("invalidFormat2")

    val data = List("U1:Addr1","U2","U3","U4;Addr2","U5:Addr3")


    sc.parallelize(data,3).foreach{v=>

    if(v.split(":".length !=2){

    acc1.add(1L)

    acc2.add(v)}

    }


    println("잘못된 데이터 수: "+acc1.value)

    printfln("잘못된 데이터:"+acc2.value)


    [value]

      잘못된 데이터 수 : 3

      잘못된 데이터 : U2,U3,U4;Addr2




    '2018년 > spark' 카테고리의 다른 글

    GraphX ~그래프 연산  (0) 2018.04.03
    GraphX ~그래프 생성까지  (0) 2018.04.02
    spark info좀 꺼보자  (0) 2018.02.28
    Spark Streaming,Structured Streaming  (0) 2018.02.27
    [Spark] :: 구조 및 동작 과정 [이론]  (0) 2018.01.17

    대화의 장 💬