Chaining
작업을 chain의 형태로 진행한다고 해서 chaining 이라고 한다chaining
으로 package를 생성하고 관리하니 코드도 한결 보기 좋아졌다.
Transform
MyTransform
import org.apache.spark.sql.functions._ | |
import com.naver.ad.ranking.SparkSessionWrapper | |
object MyTransform extends SparkSessionWrapper { | |
import spark.implicits._ | |
def withHandleNull(cn: String)(df: DataFrame): DataFrame = { | |
df.withColumn(cn, when(col(cn)isNull, "").otherwise(col(cn))) | |
} | |
} |
두개의 테이블을 Merge하는 Transform
import org.apache.spark.sql.DataFrame | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.expressions.UserDefinedFunction | |
import com.naver.ad.ranking.SparkSessionWrapper | |
object MergeTransform extends SparkSessionWrapper { | |
import spark.implicits._ | |
def mergeDfs(oldDf: DataFrame, newDf: DataFrame, joinType: String ): DataFrame = { | |
val mergedCfDf = oldDf | |
.join(newDf,oldDf("Id") === newDf("newId"), joinType) | |
mergedCfDf | |
} | |
def withMerge(df: DataFrame): DataFrame = { | |
df.withColumn("mergedMCf", when($"newId"isNull, $"score").otherwise(lit($"newScore"))) | |
} | |
} | |
~ |
Udf와 함께 Parameter를 받는 Transform
import org.apache.spark.sql.DataFrame | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.expressions.UserDefinedFunction | |
object SampleTransform extends SparkSessionWrapper { | |
import spark.implicits._ | |
val sampleFunc: (Double, Double, Int, Int) => Double = { | |
(oldValue: Double, newValue: Double, score: Int, count: Int) => | |
oldValue + newValue + score + count | |
} | |
val sampleUdf: UserDefinedFunction = udf(sampleFunc) | |
def withSample(score: Int, count: Int)(df: DataFrame): DataFrame = { | |
df.withColumn("sampleScore", sampleUdf($"old", $"new", lit(score), lit(count))) | |
} | |
} |
사용하는 방법
df.transform(SampleTransform.withSample(1,2)) | |
.transform(MergeTransform.withMerge) |
DataFrame
의transform
을 이용하면 다음과 같이 사용이 가능하다.withColumn
의 성격이 비슷한 녀석끼리 모아주니까 확실히 편하다 (UDF, transform이 한 object에 있다고 생각해보면 잡)- 이 방법 외에도
chain
을 먼저 설정하고chain
에 함수들을 추가해서 한번에 실행도 가능하다. chained-transformations 여기를 참고!
참고
'우리는 개발자 > Data Engineering' 카테고리의 다른 글
[elasticsearch] 클러스터 관리에 필요한 내용 + plugin에 대한 짧막한 설명. (0) | 2020.03.03 |
---|---|
[Spark] Scala DataFrame 특정 컬럼으로 정렬하기 (+소스코드) (0) | 2020.02.28 |
[Spark] Scala Style Guide, (Vi/Vim)에서 편집할때 indentation/Highlight Style Plugin (0) | 2020.02.28 |
[Spark] spark-testing-base에서 DataFrameSuiteBase 사용 (0) | 2020.02.28 |
[Spark] None, null? DataFrame 생성시에 java.lang.ClassNotFoundException: scala.Any (0) | 2020.02.28 |