• [Spark] Data Analytics with Spark using Python - 기초 함수

    2021. 4. 6. 20:53

    by. 위지원

     

    이 책을 보면 볼수록 누구에게도 추천해주고싶지않다. 그래도 샀으니 꾸역꾸역하는 중 ㅠ

    RDD를 생성하는 다양한 방법들

    - 디렉터리 아래에 있는 파일들을 이용하기 (1): sc.textFile()

          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.1.1
          /_/
    
    Using Python version 3.8.1 (v3.8.1:1b293b6006, Dec 18 2019 14:08:53)
    Spark context Web UI available at http://jiwon.local:4040
    Spark context available as 'sc' (master = spark://jiwon.local:7077, app id = app-20210406133738-0000).
    SparkSession available as 'spark'.
    >>> import os
    >>> spark_home = os.environ['SPARK_HOME']
    >>> licensefiles = sc.textFile("file:///%s/licenses/" % spark_home)
    
    #디렉토리에 있는 첫 번째 파일의 첫 번째 줄을 포함하는 목록을 반환
    >>> licensefiles.take(1)
    ['JTransforms']
    
    #디렉토리의 각 파일에 대해 생성된 파티션 
    >>> licensefiles.getNumPartitions()
    
    #모든 파일의 라인 수
    57
    >>> licensefiles.count()
    2973

     

    - 디렉터리 아래에 있는 파일들을 이용하기 (2): wholeTextFiles()

    >>> licensefiles = sc.wholeTextFiles("file:///%s/licenses/" % spark_home)
    >>> licensefiles
    org.apache.spark.api.java.JavaPairRDD@36689925
    
    # key:file value:contents
    >>> licensefiles.take(1)
    [('file:/Users/jiwonwee/SPARK_BOOK/spark/spark-3.1.1-bin-hadoop2.7/licenses/LICENSE-J
    Transforms.txt', 'JTransforms\nCopyright (c) 2007 onward, Piotr Wendykier\nAll rights
    reserved.\n\nRedistribution and use in source and binary forms, with or without\nmodif
    ication, are permitted provided that the following conditions are met:\n\n1. Redistrib
    utions of source code must retain the above copyright notice, this\n   list of condit
    ions and the following disclaimer. \n2. Redistributions in binary form must reproduce 
    the above copyright notice,\n   this list of conditions and the following disclaimer 
    in the documentation\n   and/or other materials provided with the distribution.\n\nTH
    IS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND\nANY EX
    PRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED\nWARRANTIES O
    F MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE\nDISCLAIMED. IN NO EVENT 
    SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR\nANY DIRECT, INDIRECT, INCIDE
    NTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES\n(INCLUDING, BUT NOT LIMITED TO, 
    PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;\nLOSS OF USE, DATA, OR PROFITS; OR BUSIN
    ESS INTERRUPTION) HOWEVER CAUSED AND\nON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT
    , STRICT LIABILITY, OR TORT\n(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 
    OUT OF THE USE OF THIS\nSOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.')]
    
    #디렉토리의 각 파일에 대한 키/값 쌍을 포함하는 단일 파티션을 생성한다
    >>> licensefiles.getNumPartitions()
    2
    
    # 파일 또는 키/값 쌍의 수를 계산
    >>> licensefiles.count()
    57

     

    - 아래 connector를 이용하면 db와도 연결이 가능하다. 뒤에 sql 챕터가 있어서 그 때 자세하게 알아보..자..⍢

    dev.mysql.com/downloads/connector/j/

     

    MySQL :: Download Connector/J

    MySQL Connector/J 8.0 is highly recommended for use with MySQL Server 8.0, 5.7 and 5.6. Please upgrade to MySQL Connector/J 8.0.

    dev.mysql.com

     

    - json으로 만들기

    >>> people =  spark.read.json("file://%s/examples/src/main/resources/people.json"%spark_home)
    >>> people
    DataFrame[age: bigint, name: string]
    >>> people.dtypes
    [('age', 'bigint'), ('name', 'string')]
    >>> people.show()
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    
    >>> sqlContext.registerDataFrameAsTable(people, "people")
    >>> df2 = spark.sql("select name, age from people where age>20")
    >>> df2.show()
    +----+---+
    |name|age|
    +----+---+
    |Andy| 30|
    +----+---+

     

    - 프로그래밍적으로 만들기 (1): parallelize() 메서드 이용

    >>> parallelrdd = sc.parallelize(list(range(9)))
    >>> parallelrdd
    ParallelCollectionRDD[12] at readRDDFromFile at PythonRDD.scala:274
    >>> parallelrdd.count()
    9
    >>> parallelrdd.collect()
    [0, 1, 2, 3, 4, 5, 6, 7, 8]

     

    - 프로그래밍적으로 만들기 (2): range() 메서드 이용

    >>> range_rdd = sc.range(0, 1000, 1, 2)
    #0부터 1000까지 파티션 2개에서 1씩 증가하는 rdd
    >>> range
    range(    range_rdd
    >>> range_rdd.getNumPartitions()
    2
    >>> range_rdd.min()
    0
    >>> range_rdd.max()
    999
    >>> range_rdd.take(10)
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

     

    Persist()

    persist()를 사용하면 storage에 등록이 가능하다.

    >>> numbers = sc.range(0, 1000000, 1, 2)
    >>> evens = numbers.filter(lambda x: x % 2)
    >>> noelemnts = evens.count()
    >>> print("There are %s elements in the collections" % noelemnts)
    There are 500000 elements in the collections
    >>> listfelements = evens.collect()
    >>> print("The first five elements include", str(listfelements[:5])
    ... )
    The first five elements include [1, 3, 5, 7, 9]
    >>> evens.persist()
    PythonRDD[2] at collect at <stdin>:1
    >>> evens.count()
    500000

     

    뭐그리고 여러가지 메서드가 있다고한다.. map, filter, groupby 등.. 나중에 필요할 때 찾아보자.