Chaining

작업을 chain의 형태로 진행한다고 해서 chaining 이라고 한다
chaining으로 package를 생성하고 관리하니 코드도 한결 보기 좋아졌다.

Transform

MyTransform

두개의 테이블을 Merge하는 Transform

Udf와 함께 Parameter를 받는 Transform

사용하는 방법

  • DataFrametransform을 이용하면 다음과 같이 사용이 가능하다.
  • withColumn의 성격이 비슷한 녀석끼리 모아주니까 확실히 편하다 (UDF, transform이 한 object에 있다고 생각해보면 잡)
  • 이 방법 외에도 chain을 먼저 설정하고 chain에 함수들을 추가해서 한번에 실행도 가능하다. chained-transformations 여기를 참고!

참고

DataFrameSuiteBase

  • DataFrameSuiteBase를 이용해서 주어진 두개의 DataFrame이 일치하는지 확인이 가능하다.
  • 간단하게 사용방법은 두개의 결과 df1, df2를 생성하고 assertDataFrameEquals를 사용하면 된다.
  • 사용하면서 까다로웠던것은 DataFrameSchema도 확인을 한다. 심지어 Nullable을 체크...
  • 만약 조인했던 결과에 null이 들어있고, 기대하는 값에 null이 없었다면 두개의 Schema는 다르고 결과적으로 Fail이 난다.
  • expectedDf에 만약 Null의 값을 포함시키기 위해서는 다음 Null값이 포함되는 DataFrame 생성하기 포스트 를 확인
  • 참고로 아이템의 순서도 동일해야 한다! 아이템 정렬하는 방법은 아래 DataFrame 정렬하기 포스트를 확인

참고

ExpectedDf에 null을 포함

  • Scala에서는 null의 값을 넣기 위해서는 다음과 같이 해야 한다.
  • 여기서 문제는 Double의 경우 null을 넣어주면 에러가 발생하는데

Double값 대신에 null을 넣으면 발생하는 에러

val account = sc.parallelize(Seq(
                                 (1, null, 2,"F", null), 
                                 (2, new Integer(2), 4, "F",  1.2),
                                 (3, new Integer(3), 6, "N", 1.0),
                                 (4, null,8,"F", 1.0))).toDF()
account.show()
java.lang.ClassNotFoundException: scala.Any
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:348)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203)
  at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49)
  at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
  at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
  at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
  at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:700)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
  at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:625)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:344)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:625)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
  at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
  at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
  at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
  at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
  ... 53 elided 
  • 위 에러를 잘보면 scala.Any라는 클래스를 찾을수 없다고 하는데
  • Any는 너무 general type이여서 Spark에서는 어떻게 serialize를 해야하는지 모른다 scala-unified-types
  • 그러니 Integer를 사용할때도 null.asInstanceOf[Integer] 다음과 같이 명시를 해야한다.

createDataFrame으로 생성

  • RDD를 사용하지 않고, 아래와 같이 createDataFrame을 사용하며 문제를 해결할 수 있다.
  • 아래와 같이 하면 Double값을 Some(1.5)로 사용해도 정상적으로 생성이 된다.
  • RDD로 생성하는 방법은 찾지 못함..

spark-testing-base를 사용해서 DataFrame의 일치를 확인하는 과정에서
DataFrame의 SchemaField의 nullable의 값이 틀려도 assert가 발생한다.
해결하는 방법은 아래와 같이 setNullableStateOfColumnnSparkUtil.scala에 작성한 뒤에
특정 컬럼의 NullableState를 변경하면 끝

scala에서 dataframe을 생성하는 코드

코드

spark scala dataframe을 withColumn을 통해 한줄한줄 iteration을 돌려 처리가 가능하다.
내가 원하는 함수에 한개의 row를 넘기고, 그 처리된 결과를 새로운 column으로 생성한다.

예제코드

  • sampleFunc을 생성하고 UserDefinedFunction, udf를 통해 sampleUdf를 생성
  • 생성된 sampleUdfDataFramewithColumn을 이용
    • 첫번째 파라미터는 withColumn을 통해 생성될 신규 컬럼명
    • 두번째 파라미터는 udfFunction + parameters
    • 여기서 lit은 constant 값을 넘길때 사용한다.

spark scala 코딩을 할때 datetime에 관련된 코딩을 한다면 아래 내용은 필수!!!
sbt build를 사용하기 때문에 libraryDependencies가 필요하다.

build.sbt에 아래와 같이 추가 
    libraryDependencies ++= Seq( 
      (...)
      "joda-time" % "joda-time" % "2.10.5",
      "org.joda" % "joda-convert" % "1.8.1"
    ),

joda-time만 설치하면 아마도 아래와 같이 WARN이 발생하니 org.joda도 함께 라이브러리에 추가

class org.joda.convert.fromstring not found - continuing with a stub

소스코드

  • 소스코드를 보면 DateTime, Days, DateTimeFormat을 이용
  • string을 DateTimeFormat.forPattern을 이용해 DateTime으로 변환
  • Days.daysBetween을 이용해 두개의 DateTime간의 날짜 차이를 계산
  • 오늘 날짜 DateTime 가져오기
  • 30일전 DateTime 생성하기
  • 두개의 DateTime의 차이 milliseconds 계산
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.13.4" % "test",
"com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % "test"

sbt 테스트

  • 아래 명령어를 입력하면 Test 전체를 수행한다
sbt assembly
  • 아래 명령도 전체를 Test를 수행
sbt
> test
  • build하지 않고 테스트만
sbt test
  • 특정 테스트만 실행하기 (build를 따로 할필요가 없음)
 sbt "testOnly package.TestClass"

test를 진행할때마다 잘보면 아래와 같이 뜨는데... 테스트 마다 Session을 생성하는듯 (매우느림)

20/02/28 12:18:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/02/28 12:18:34 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.

이때 Spark의 경우 Session을 생성을 해야하기 때문에 sbt tool에 들어가서 test를 하면 매번 Session을 생성해서 불필요한 시간을 사용하게 된다.

Spark Test Sample

Project Main Modules:

참고

+ Recent posts