Chaining
작업을 chain의 형태로 진행한다고 해서 chaining 이라고 한다chaining
으로 package를 생성하고 관리하니 코드도 한결 보기 좋아졌다.
Transform
MyTransform
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.functions._ | |
import com.naver.ad.ranking.SparkSessionWrapper | |
object MyTransform extends SparkSessionWrapper { | |
import spark.implicits._ | |
def withHandleNull(cn: String)(df: DataFrame): DataFrame = { | |
df.withColumn(cn, when(col(cn)isNull, "").otherwise(col(cn))) | |
} | |
} |
두개의 테이블을 Merge하는 Transform
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.DataFrame | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.expressions.UserDefinedFunction | |
import com.naver.ad.ranking.SparkSessionWrapper | |
object MergeTransform extends SparkSessionWrapper { | |
import spark.implicits._ | |
def mergeDfs(oldDf: DataFrame, newDf: DataFrame, joinType: String ): DataFrame = { | |
val mergedCfDf = oldDf | |
.join(newDf,oldDf("Id") === newDf("newId"), joinType) | |
mergedCfDf | |
} | |
def withMerge(df: DataFrame): DataFrame = { | |
df.withColumn("mergedMCf", when($"newId"isNull, $"score").otherwise(lit($"newScore"))) | |
} | |
} | |
~ |
Udf와 함께 Parameter를 받는 Transform
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.DataFrame | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.expressions.UserDefinedFunction | |
object SampleTransform extends SparkSessionWrapper { | |
import spark.implicits._ | |
val sampleFunc: (Double, Double, Int, Int) => Double = { | |
(oldValue: Double, newValue: Double, score: Int, count: Int) => | |
oldValue + newValue + score + count | |
} | |
val sampleUdf: UserDefinedFunction = udf(sampleFunc) | |
def withSample(score: Int, count: Int)(df: DataFrame): DataFrame = { | |
df.withColumn("sampleScore", sampleUdf($"old", $"new", lit(score), lit(count))) | |
} | |
} |
사용하는 방법
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
df.transform(SampleTransform.withSample(1,2)) | |
.transform(MergeTransform.withMerge) |
DataFrame
의transform
을 이용하면 다음과 같이 사용이 가능하다.withColumn
의 성격이 비슷한 녀석끼리 모아주니까 확실히 편하다 (UDF, transform이 한 object에 있다고 생각해보면 잡)- 이 방법 외에도
chain
을 먼저 설정하고chain
에 함수들을 추가해서 한번에 실행도 가능하다. chained-transformations 여기를 참고!
참고
'우리는 개발자 > Data Engineering' 카테고리의 다른 글
[elasticsearch] 클러스터 관리에 필요한 내용 + plugin에 대한 짧막한 설명. (0) | 2020.03.03 |
---|---|
[Spark] Scala DataFrame 특정 컬럼으로 정렬하기 (+소스코드) (0) | 2020.02.28 |
[Spark] Scala Style Guide, (Vi/Vim)에서 편집할때 indentation/Highlight Style Plugin (0) | 2020.02.28 |
[Spark] spark-testing-base에서 DataFrameSuiteBase 사용 (0) | 2020.02.28 |
[Spark] None, null? DataFrame 생성시에 java.lang.ClassNotFoundException: scala.Any (0) | 2020.02.28 |