Chaining

작업을 chain의 형태로 진행한다고 해서 chaining 이라고 한다
chaining으로 package를 생성하고 관리하니 코드도 한결 보기 좋아졌다.

Transform

MyTransform

import org.apache.spark.sql.functions._
import com.naver.ad.ranking.SparkSessionWrapper
object MyTransform extends SparkSessionWrapper {
import spark.implicits._
def withHandleNull(cn: String)(df: DataFrame): DataFrame = {
df.withColumn(cn, when(col(cn)isNull, "").otherwise(col(cn)))
}
}

두개의 테이블을 Merge하는 Transform

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction
import com.naver.ad.ranking.SparkSessionWrapper
object MergeTransform extends SparkSessionWrapper {
import spark.implicits._
def mergeDfs(oldDf: DataFrame, newDf: DataFrame, joinType: String ): DataFrame = {
val mergedCfDf = oldDf
.join(newDf,oldDf("Id") === newDf("newId"), joinType)
mergedCfDf
}
def withMerge(df: DataFrame): DataFrame = {
df.withColumn("mergedMCf", when($"newId"isNull, $"score").otherwise(lit($"newScore")))
}
}
~

Udf와 함께 Parameter를 받는 Transform

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction
object SampleTransform extends SparkSessionWrapper {
import spark.implicits._
val sampleFunc: (Double, Double, Int, Int) => Double = {
(oldValue: Double, newValue: Double, score: Int, count: Int) =>
oldValue + newValue + score + count
}
val sampleUdf: UserDefinedFunction = udf(sampleFunc)
def withSample(score: Int, count: Int)(df: DataFrame): DataFrame = {
df.withColumn("sampleScore", sampleUdf($"old", $"new", lit(score), lit(count)))
}
}

사용하는 방법

df.transform(SampleTransform.withSample(1,2))
.transform(MergeTransform.withMerge)
  • DataFrametransform을 이용하면 다음과 같이 사용이 가능하다.
  • withColumn의 성격이 비슷한 녀석끼리 모아주니까 확실히 편하다 (UDF, transform이 한 object에 있다고 생각해보면 잡)
  • 이 방법 외에도 chain을 먼저 설정하고 chain에 함수들을 추가해서 한번에 실행도 가능하다. chained-transformations 여기를 참고!

참고

+ Recent posts