-
RDD를 생성하는 다양한 방법들
- 디렉터리 아래에 있는 파일들을 이용하기 (1): sc.textFile()
____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.1.1 /_/ Using Python version 3.8.1 (v3.8.1:1b293b6006, Dec 18 2019 14:08:53) Spark context Web UI available at http://jiwon.local:4040 Spark context available as 'sc' (master = spark://jiwon.local:7077, app id = app-20210406133738-0000). SparkSession available as 'spark'. >>> import os >>> spark_home = os.environ['SPARK_HOME'] >>> licensefiles = sc.textFile("file:///%s/licenses/" % spark_home) #디렉토리에 있는 첫 번째 파일의 첫 번째 줄을 포함하는 목록을 반환 >>> licensefiles.take(1) ['JTransforms'] #디렉토리의 각 파일에 대해 생성된 파티션 >>> licensefiles.getNumPartitions() #모든 파일의 라인 수 57 >>> licensefiles.count() 2973
- 디렉터리 아래에 있는 파일들을 이용하기 (2): wholeTextFiles()
>>> licensefiles = sc.wholeTextFiles("file:///%s/licenses/" % spark_home) >>> licensefiles org.apache.spark.api.java.JavaPairRDD@36689925 # key:file value:contents >>> licensefiles.take(1) [('file:/Users/jiwonwee/SPARK_BOOK/spark/spark-3.1.1-bin-hadoop2.7/licenses/LICENSE-J Transforms.txt', 'JTransforms\nCopyright (c) 2007 onward, Piotr Wendykier\nAll rights reserved.\n\nRedistribution and use in source and binary forms, with or without\nmodif ication, are permitted provided that the following conditions are met:\n\n1. Redistrib utions of source code must retain the above copyright notice, this\n list of condit ions and the following disclaimer. \n2. Redistributions in binary form must reproduce the above copyright notice,\n this list of conditions and the following disclaimer in the documentation\n and/or other materials provided with the distribution.\n\nTH IS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND\nANY EX PRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED\nWARRANTIES O F MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE\nDISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR\nANY DIRECT, INDIRECT, INCIDE NTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES\n(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;\nLOSS OF USE, DATA, OR PROFITS; OR BUSIN ESS INTERRUPTION) HOWEVER CAUSED AND\nON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT , STRICT LIABILITY, OR TORT\n(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS\nSOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.')] #디렉토리의 각 파일에 대한 키/값 쌍을 포함하는 단일 파티션을 생성한다 >>> licensefiles.getNumPartitions() 2 # 파일 또는 키/값 쌍의 수를 계산 >>> licensefiles.count() 57
- 아래 connector를 이용하면 db와도 연결이 가능하다. 뒤에 sql 챕터가 있어서 그 때 자세하게 알아보..자..⍢
dev.mysql.com/downloads/connector/j/
- json으로 만들기
>>> people = spark.read.json("file://%s/examples/src/main/resources/people.json"%spark_home) >>> people DataFrame[age: bigint, name: string] >>> people.dtypes [('age', 'bigint'), ('name', 'string')] >>> people.show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ >>> sqlContext.registerDataFrameAsTable(people, "people") >>> df2 = spark.sql("select name, age from people where age>20") >>> df2.show() +----+---+ |name|age| +----+---+ |Andy| 30| +----+---+
- 프로그래밍적으로 만들기 (1): parallelize() 메서드 이용
>>> parallelrdd = sc.parallelize(list(range(9))) >>> parallelrdd ParallelCollectionRDD[12] at readRDDFromFile at PythonRDD.scala:274 >>> parallelrdd.count() 9 >>> parallelrdd.collect() [0, 1, 2, 3, 4, 5, 6, 7, 8]
- 프로그래밍적으로 만들기 (2): range() 메서드 이용
>>> range_rdd = sc.range(0, 1000, 1, 2) #0부터 1000까지 파티션 2개에서 1씩 증가하는 rdd >>> range range( range_rdd >>> range_rdd.getNumPartitions() 2 >>> range_rdd.min() 0 >>> range_rdd.max() 999 >>> range_rdd.take(10) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Persist()
persist()를 사용하면 storage에 등록이 가능하다.
>>> numbers = sc.range(0, 1000000, 1, 2) >>> evens = numbers.filter(lambda x: x % 2) >>> noelemnts = evens.count() >>> print("There are %s elements in the collections" % noelemnts) There are 500000 elements in the collections >>> listfelements = evens.collect() >>> print("The first five elements include", str(listfelements[:5]) ... ) The first five elements include [1, 3, 5, 7, 9] >>> evens.persist() PythonRDD[2] at collect at <stdin>:1 >>> evens.count() 500000
뭐그리고 여러가지 메서드가 있다고한다.. map, filter, groupby 등.. 나중에 필요할 때 찾아보자.
'2021년 > Data' 카테고리의 다른 글
추천 시스템의 기본, 협업필터링(collaborative filtering) (0) 2021.04.20 Amazon Personalize는 어떻게 동작하는걸까? (2) 2021.04.09 vmware fushion을 이용한 스파크 완전분산 모드(CentOS+MAC) (0) 2021.04.01 [Spark] Data Analytics with Spark using Python (0) 2021.03.31 [티스토리 게시글 추천 시스템 만들기] #4 worldCloud 만들기 (0) 2021.03.26