• sparkR을 공부해보자 [Scaling R Programs with Spark]

    2017. 8. 12. 12:28

    by. 위지원

    논문  http://getliner.com/webpdf/web/viewer.html?file=b4b0049c03bf34257d7779ff722732b4f926b11c.pdf를 보고 공부한 내용



    Scaling R Programs with Spark


    R은 데이터 처리,머신런닝 학습 작업을 지원하는 통계 프로그래밍 언어로 데이터프레임을 사용하여 구조화 된 

    데이터 처리를 지원하고 통계 분석 및 시각화를 위한 여러 패키지를 포함하고 있음


    그러나


    R의 대화형 데이터 분석은 일반적으로 단일스레드형식으로 동작하여 제한적이며 대용량 데이터 세트에 실용적이지않고 단일 기계메모리에 맞는 데이터 세트만 처리할 수 있음


    그래서


    더 나은 i/o지원 hadoop과의 통합 및 dbms엔진과 통합 될 수 있는 runtimes 설계해서 일부를 해결


    라이브러리 지원

    spark에서는 sql쿼리실행,분산기계학습,그래프분석이 포함된 프로젝트가 있으며 sparkR은이러한 도메인에 대해잘 테스트되고 분산 된 구현을 재사용 할 수 있게한다.

    Performance Improvements
    : As opposed to a new distributed en-
    gine, SparkR can inherit all of the optimizations made to the Spark
    computation engine in terms of task scheduling, code generation,
    memory management [3], etc.
    Performance Improvements
    : As opposed to a new distributed en-
    gine, SparkR can inherit all of the optimizations made to the Spark
    computation engine in terms of task scheduling, code generation,
    memory management [3], etc.
    Performance Improvements
    : As opposed to a new distributed en-

    gine, SparkR can inherit all of the optimizations made to the Spark
    computatio

    n engine in terms of task scheduling, code generation,

    memory management [3], etc



    성능향상

    sparkR은 작업 스케쥴링,코드생성,메모리 관리sparkcomputation 엔진의 모든 최적화를 상속받을 수 있다.



    SparkR is built as an R package and requires no changes to
    R. The central component of SparkR is a distributed data frame
    that enables structured data processing with a syntax familiar to R
    users [31](Figure 1). To improve performance over large datasets,
    SparkR performs lazy evaluation on data frame operations and uses
    Spark’s relational query optimizer [10] to optimize execution.

    spartR?

    -sparkR은 AMPLab에서 개발

    -R패키지로 제작되었으며 R을 변경할 필요는 없음

    -sparkR의 중심 구성요소는 Rusers에 익숙한 구문으로 구조화된 데이터 처리를 가능하게 하는 분산 데이터 '프레임'

    -대규모 데이터 세트에 대한 성능을 향상시키기 위해서 

    -sparkR은 데이터프레임 작업에 대한 지연 평가를 수행

    - spark의 관계쿼리 최적화 프로그램을 사용해 실행을 최적화



    모 데이터 처리를 위해 R프로그래머는 어떻게 애플리케이션을 사용할까?

    우선 spark R을 개발하는데 사용되는 두 가지 주요 시스템 spark와 R 2가지를 알아보자


    spark?

    -대규모 데이터 처리를 위한 범용 엔진

    -클러스터 컴퓨팅 환경에서 내결함성 계산을 위한 Resilient DistributedDatasets api를 처음으로 도입

    -최근에는 많은 api가 개발

    -ml-lib : 대규모 기계 학습용 라이브러리

    -graphX : 큰 그래프를 처리하기 위한 라이브러리

    -sparkSQL : 분석 쿼리용 sql 

    -주어진 쿼리에 대해 최적의 물리 계획을 생성하여 성능을 향상해 분산 쿼리를 최적화 시킴

    -dataframes에 대한 쿼리가 sparksql 쿼리옵티마이저를 사용하여 실행되기때문에 rdd와 비교하여 더 나은 유용성과성능 제공


    data frames in R

    -데이터프레임은 각 열의 특정 유형의 요소를 지니고있으며 필터링을 위한 쉬운 구문을 제공

    -dplyr은 데이터 조작을 위한 소수의 베타를 제공하며 선택,프로젝션,집계및 조인과 같은 관계연산을 포함



    application patterns


    BigData,Small learning

    -일반적으로 json,csv로 저장되는 대규모 데이터셋부터 시작

    -사용자는 데이터 정리 작업을 수행하여 데이터셋에서 불필요한 행,열 제거(데이터 집계,샘플링)하여 데이터 크기를 줄임


    Partition Aggregate

    -파티션에서 입력데이터 셋에 대해 병렬로 실행해야하는 특정 기능을 가지고있으며 각 파티션별로 결과를 얻어 집계함수를 이용


    Large  Scale  Machine  Learning

    -일부 사용자는 대규모 데이터 셋에서 기계학습 알고리즘을 실행 ( 알고리즘을 적용하기전엔 데이터 특징을 생성하기위해 전처리됨)

    -예측을 위해 알고리즘학습으로 생성된 모델을 사용



    design involvedin  building  SparkR


    sparkR dataFrames API

    -sparkR의 중심구성요소는 spark위에 구현된 분산 데이터 프레임

    -sparkR 데이터 프레임에는 

    -dplyr

    -local R dataframe과 동일한 api


    -하지만 spark의 실행 엔진과 관계형 쿼리 최적화 프로그램을 사용하여 대규모 데이터 셋으로 확장



    -data frames operator

    -sparkR의 dataFrame은 입력을 읽고 구조하된 데이터분석을 수행하는 여러가지 방법을 지원

    -SparkR'read.dfmethod는 spark의 데이터소스api와 통합되어 사용자가 다른 시스템에서 데이터를 로드할 수 있음:

    df <- read.df(sqlContext, "./nycflights13.csv",3"com.databricks.spark.csv")

    -집계,조인 : count = n(jfk_flights$dest))

    -문자열 조작 방법,통계함수,날짜시간작업을 포함해 미리정의된 함수가 100여개 이상

    -sql 명령을 사용하여 sql쿼리를 실행할 수도있음 : 

    training <- sql(sqlContext,"SELECT distance, depDelay, arrDelay FROM table")

    -기존의 R라이브러리도 이용가능

    dest_flights <- filter(df, df$origin == "JFK") %>%groupBy(df$dest) %>%summarize(count = n(df$dest))


    -colletoroperator를 사용하여 localR데이터 프레임으로 변환할수 있음 --> BigData,Small learning에 유용

    -최적화

    -R api를 spark의 최적화된 sql 실행 엔진과 긴밀하게통합하여 자신의 코드를 R에 쓰더라도 해석된 R코드의 오버헤드

    가 발생하지 않으며 스칼라,sql 사용하는것과 동일한형식을 얻을 수 있음

    -Figure 4는 R,python,scala가 포함된 spark를 사용하는 단일 시스템에서 1천만개의 teger 쌍에 대해 실행중인 그룹화 집계 성능을

    비교하면 sparkR으 ㅣ성능이 scala/python 성능과 비슷하다는것을 알 수있음 물리적 실행에서 R의 논리적 사양을 

    분리했을때의 이점을 보여줌 



    아키텍쳐

    드라이버와 worker 두개의 주 구성요소 (Figure3 참조)

    -driver에서 R에서 jvm으로 바인딩되어 R프로그램이 spark 클러스터에 작업을 제출하고 spark 실행 프로그램 실행을 지원할 수 있음

    *JVM:Java Virtual Machine 의 줄임말 이며, Java Byte Code를 OS에 맞게 해석 해주는 역할 함


    -바인딩

    sparkR의 구현의 핵심중 하나는 R의 jvm에서 spark기능을 호출하는것을 지원하는것

    요구사항

    -yarn과 같은 클러스터 관리자를 통해 jvm 드라이버 프로세스를 독립적으로 실행할 수 있는 유연한 접근

    -크로스 플랫폼 지원 onWindows,Linux등 

    -sparkR을 설치하기가 번거롭지 않은 경량 솔루션


    그런데


    in process jvm 시작을 지원하는 기존 패키지가 있지만 이러한 메서드가 우리의 모든 요구사항을 충족시키지 못한다는

    사실을 논문에서 발견함


    그래서


    R에서 jvm에 대한 함수를 호출하는 데 사용할 수 있는 새로운 소켓 기반 sparkR 내부 api를 개발

    -netty기반 소켓서버

    -소켓을 쓰는 이유는 java,R 플랫폼 모두 지원 ( 외부 라이브러리 사용 안해도됨 )

    -전달되는 메시지 대부분이 컨트롤 메시지이므로 프로세스내 통신에 관한 다른방법에 비해 비용이 그리 높지않음


    sparkR jvm 백엔드에서 지원하는 두가지 rpc

    *rpc(한 프로그램이 네트웍 상의 다른 컴퓨터에 위치하고 있는 프로그램에 서비스를 요청하는데 사용되는 프로토콜)

    -메서드 호출

    innocations 메서드는 기존 java개체(static은 class명),메서드에 전달할 인수 목록을 사용하여 호출

    -새 객체 생성

    nameinit 메서드를 사용하여 아규먼트를 제공하면 알아서 적절한 생성자를 호출


    java객체를 참조하는 새로운 R Class jobj를 사용 : java에서 추적되어 R측에서 범위를 벗어날때 자동으로 가비지 수집 <-이해가안딘다.. 

    Finally, we use a new R class ’jobj’ that refers to a Javaobject existing in the backend. These references are tracked on the Java side and are automatically garbage collected when they go outof scope on the R side



    -workers

    executor machine에서 process를 시작하는것을 지원

    초기접근방법은 R함수 실행시마다 R프로세스를 fork하는것있으나 이는 고정된 오버헤드를 발생시켜 비용문제가 생김


    그래서


    1.필요한 수많은 R 함수를 결합할 수 있도록 R 연산을 통합하는 지원

    2.spark작업의 수명시간동안 살아있는 deamon R process를 지원하고 작업자 r process를 관리하는데 필요한 추가기능 추가


    오버헤드를 줄이고 end to end 시간을 낮춤


    In summary,  SparkR provides an R frontend to Apache Sparkand allows users to run large scale data analysis using Spark’s dis-tributed computation engine. 



    대화의 장 💬