Chaining

작업을 chain의 형태로 진행한다고 해서 chaining 이라고 한다
chaining으로 package를 생성하고 관리하니 코드도 한결 보기 좋아졌다.

Transform

MyTransform

두개의 테이블을 Merge하는 Transform

Udf와 함께 Parameter를 받는 Transform

사용하는 방법

  • DataFrametransform을 이용하면 다음과 같이 사용이 가능하다.
  • withColumn의 성격이 비슷한 녀석끼리 모아주니까 확실히 편하다 (UDF, transform이 한 object에 있다고 생각해보면 잡)
  • 이 방법 외에도 chain을 먼저 설정하고 chain에 함수들을 추가해서 한번에 실행도 가능하다. chained-transformations 여기를 참고!

참고

DataFrameSuiteBase

  • DataFrameSuiteBase를 이용해서 주어진 두개의 DataFrame이 일치하는지 확인이 가능하다.
  • 간단하게 사용방법은 두개의 결과 df1, df2를 생성하고 assertDataFrameEquals를 사용하면 된다.
  • 사용하면서 까다로웠던것은 DataFrameSchema도 확인을 한다. 심지어 Nullable을 체크...
  • 만약 조인했던 결과에 null이 들어있고, 기대하는 값에 null이 없었다면 두개의 Schema는 다르고 결과적으로 Fail이 난다.
  • expectedDf에 만약 Null의 값을 포함시키기 위해서는 다음 Null값이 포함되는 DataFrame 생성하기 포스트 를 확인
  • 참고로 아이템의 순서도 동일해야 한다! 아이템 정렬하는 방법은 아래 DataFrame 정렬하기 포스트를 확인

참고

ExpectedDf에 null을 포함

  • Scala에서는 null의 값을 넣기 위해서는 다음과 같이 해야 한다.
  • 여기서 문제는 Double의 경우 null을 넣어주면 에러가 발생하는데

Double값 대신에 null을 넣으면 발생하는 에러

val account = sc.parallelize(Seq(
                                 (1, null, 2,"F", null), 
                                 (2, new Integer(2), 4, "F",  1.2),
                                 (3, new Integer(3), 6, "N", 1.0),
                                 (4, null,8,"F", 1.0))).toDF()
account.show()
java.lang.ClassNotFoundException: scala.Any
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:348)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203)
  at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49)
  at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
  at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
  at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
  at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:700)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
  at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:625)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:344)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:625)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
  at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
  at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
  at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
  at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
  ... 53 elided 
  • 위 에러를 잘보면 scala.Any라는 클래스를 찾을수 없다고 하는데
  • Any는 너무 general type이여서 Spark에서는 어떻게 serialize를 해야하는지 모른다 scala-unified-types
  • 그러니 Integer를 사용할때도 null.asInstanceOf[Integer] 다음과 같이 명시를 해야한다.

createDataFrame으로 생성

  • RDD를 사용하지 않고, 아래와 같이 createDataFrame을 사용하며 문제를 해결할 수 있다.
  • 아래와 같이 하면 Double값을 Some(1.5)로 사용해도 정상적으로 생성이 된다.
  • RDD로 생성하는 방법은 찾지 못함..

spark-testing-base를 사용해서 DataFrame의 일치를 확인하는 과정에서
DataFrame의 SchemaField의 nullable의 값이 틀려도 assert가 발생한다.
해결하는 방법은 아래와 같이 setNullableStateOfColumnnSparkUtil.scala에 작성한 뒤에
특정 컬럼의 NullableState를 변경하면 끝

scala에서 dataframe을 생성하는 코드

코드

spark scala dataframe을 withColumn을 통해 한줄한줄 iteration을 돌려 처리가 가능하다.
내가 원하는 함수에 한개의 row를 넘기고, 그 처리된 결과를 새로운 column으로 생성한다.

예제코드

  • sampleFunc을 생성하고 UserDefinedFunction, udf를 통해 sampleUdf를 생성
  • 생성된 sampleUdfDataFramewithColumn을 이용
    • 첫번째 파라미터는 withColumn을 통해 생성될 신규 컬럼명
    • 두번째 파라미터는 udfFunction + parameters
    • 여기서 lit은 constant 값을 넘길때 사용한다.

spark scala 코딩을 할때 datetime에 관련된 코딩을 한다면 아래 내용은 필수!!!
sbt build를 사용하기 때문에 libraryDependencies가 필요하다.

build.sbt에 아래와 같이 추가 
    libraryDependencies ++= Seq( 
      (...)
      "joda-time" % "joda-time" % "2.10.5",
      "org.joda" % "joda-convert" % "1.8.1"
    ),

joda-time만 설치하면 아마도 아래와 같이 WARN이 발생하니 org.joda도 함께 라이브러리에 추가

class org.joda.convert.fromstring not found - continuing with a stub

소스코드

  • 소스코드를 보면 DateTime, Days, DateTimeFormat을 이용
  • string을 DateTimeFormat.forPattern을 이용해 DateTime으로 변환
  • Days.daysBetween을 이용해 두개의 DateTime간의 날짜 차이를 계산
  • 오늘 날짜 DateTime 가져오기
  • 30일전 DateTime 생성하기
  • 두개의 DateTime의 차이 milliseconds 계산

+ Recent posts