SparkSessionWrapper

  • SparkSessionWrapper를 생성하는 이유는 SparkSession을 시작/중지하는 비용이 크기 때문에
  • SparkSession을 하나 생성해서 빠르게 코드 수행이 가능하다.
  • trait은 java에서 interface를 생각하면 된다. 사용하기 위해서는 with를 통해 확장이 가능하다.
  • 여기서 lazy를 사용하는 이유는 정의된 spark의 변수가 처음 사용될때 코드가 실행되도록
    • that a val is executed when it is defined
    • a lazy val is executed when it is accessed the first time.
  • getOrCreate의 메소드는 SparkSession이 있으면 가져오고, 없으면 새로 생성하는 메소드

Lazy Example

spark로 프로젝트할때 python? scala? 고민을 하게되는데, 이때 가장 큰 고민은
scala의 경우에는 build하는 sbt, maven을 사용을 해야하는 점이다.
python을 사용하는 사람은 보통 build하는데 익숙하지 않다. 그래서 package의 dependencies를 관리하는게 매우 번거로움
그래서 찾다가보니 proejct를 처음 시작할때 도움을 주는 툴이 있었다.

http://www.foundweekends.org/giter8/

$ mkdir <project_root>
$ sbt new scala/scala-seed.g8

위 명령을 실행하고 나면 해당 디렉토리에 아래와 같은 파일들이 생긴다

build.sbt  project  src

이렇게 제공하면 툴도아니지... 이미 여러개의 template이 존재한다.

Template

$ sbt new scala/scala-seed.g8 --branch myBranch 
$ sbt new holdenk/sparkProjectTemplate.g8
  • sparkProjectTemplate
  • 위 템플릿을 입력하면 아래와 같이 프로젝트 이름, organization, package, library 설정이 가능하다
  • 참고로 project의 이름은 package 이름에 organization + project_name이 되기 때문에 -을 쓰면 에러 발생
This is a g8 template for building a skeleton Spark project.

name [sparkProject]:

build 테스트

  • inputFile.txt에 단어 몇개를 입력하고 아래와 같이 명령어를 입력하면
sbt "run inputFile.txt outputFile.txt"
  • outputFile.txt의 디렉토리에 결과가 생긴다

build

  • sbt assembly를 하면 test가지 포함해서 실행해준다.

큰 데이터를 만지다보면, Spark의 DataFrame과 Pandas의 DataFrame의 서로 변환이 꼭 필수다.
예를들어서 Pandas의 DataFrame을 Spark SQL의 테이블로 등록하고, Spark에서 작업을 하기도 한다.
만약 이 방법을 모른다면, 어떻게 테이블로 변환을 할것인가?

Pandas의 DataFrame을 Spark의 DataFrame으로 변환하기 위해서는
spark.createDataFrame(df)를 하면된다. (너무 간단함...)
spark2 이상에서 사용했으니 1.x에서는 알아서 바꿔서 하면 될듯!

보통 spark와 pandas에서의 dataframe의 구분이 어렵기 때문에
pdf (=pandas data frame)으로 적는다. (아님 말고)

import pandas as pd
## Create Pandas Frame
pd_df = pd.DataFrame({u'2017-01-01': 1, u'2017-01-02': 2}.items())
## Convert into Spark DataFrame
spark_df = spark.createDataFrame(pd_df)
## Write Frame out as Table
spark_df.write.mode("overwrite").saveAsTable("db.table_name")

+ Recent posts