ExpectedDf에 null을 포함

  • Scala에서는 null의 값을 넣기 위해서는 다음과 같이 해야 한다.
  • 여기서 문제는 Double의 경우 null을 넣어주면 에러가 발생하는데

Double값 대신에 null을 넣으면 발생하는 에러

val account = sc.parallelize(Seq(
                                 (1, null, 2,"F", null), 
                                 (2, new Integer(2), 4, "F",  1.2),
                                 (3, new Integer(3), 6, "N", 1.0),
                                 (4, null,8,"F", 1.0))).toDF()
account.show()
java.lang.ClassNotFoundException: scala.Any
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:348)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203)
  at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49)
  at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
  at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
  at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
  at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:700)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
  at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:625)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:344)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:625)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
  at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
  at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
  at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
  at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
  ... 53 elided 
  • 위 에러를 잘보면 scala.Any라는 클래스를 찾을수 없다고 하는데
  • Any는 너무 general type이여서 Spark에서는 어떻게 serialize를 해야하는지 모른다 scala-unified-types
  • 그러니 Integer를 사용할때도 null.asInstanceOf[Integer] 다음과 같이 명시를 해야한다.

createDataFrame으로 생성

  • RDD를 사용하지 않고, 아래와 같이 createDataFrame을 사용하며 문제를 해결할 수 있다.
  • 아래와 같이 하면 Double값을 Some(1.5)로 사용해도 정상적으로 생성이 된다.
  • RDD로 생성하는 방법은 찾지 못함..

spark-testing-base를 사용해서 DataFrame의 일치를 확인하는 과정에서
DataFrame의 SchemaField의 nullable의 값이 틀려도 assert가 발생한다.
해결하는 방법은 아래와 같이 setNullableStateOfColumnnSparkUtil.scala에 작성한 뒤에
특정 컬럼의 NullableState를 변경하면 끝

scala에서 dataframe을 생성하는 코드

코드

spark scala dataframe을 withColumn을 통해 한줄한줄 iteration을 돌려 처리가 가능하다.
내가 원하는 함수에 한개의 row를 넘기고, 그 처리된 결과를 새로운 column으로 생성한다.

예제코드

  • sampleFunc을 생성하고 UserDefinedFunction, udf를 통해 sampleUdf를 생성
  • 생성된 sampleUdfDataFramewithColumn을 이용
    • 첫번째 파라미터는 withColumn을 통해 생성될 신규 컬럼명
    • 두번째 파라미터는 udfFunction + parameters
    • 여기서 lit은 constant 값을 넘길때 사용한다.

spark scala 코딩을 할때 datetime에 관련된 코딩을 한다면 아래 내용은 필수!!!
sbt build를 사용하기 때문에 libraryDependencies가 필요하다.

build.sbt에 아래와 같이 추가 
    libraryDependencies ++= Seq( 
      (...)
      "joda-time" % "joda-time" % "2.10.5",
      "org.joda" % "joda-convert" % "1.8.1"
    ),

joda-time만 설치하면 아마도 아래와 같이 WARN이 발생하니 org.joda도 함께 라이브러리에 추가

class org.joda.convert.fromstring not found - continuing with a stub

소스코드

  • 소스코드를 보면 DateTime, Days, DateTimeFormat을 이용
  • string을 DateTimeFormat.forPattern을 이용해 DateTime으로 변환
  • Days.daysBetween을 이용해 두개의 DateTime간의 날짜 차이를 계산
  • 오늘 날짜 DateTime 가져오기
  • 30일전 DateTime 생성하기
  • 두개의 DateTime의 차이 milliseconds 계산
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.13.4" % "test",
"com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % "test"

sbt 테스트

  • 아래 명령어를 입력하면 Test 전체를 수행한다
sbt assembly
  • 아래 명령도 전체를 Test를 수행
sbt
> test
  • build하지 않고 테스트만
sbt test
  • 특정 테스트만 실행하기 (build를 따로 할필요가 없음)
 sbt "testOnly package.TestClass"

test를 진행할때마다 잘보면 아래와 같이 뜨는데... 테스트 마다 Session을 생성하는듯 (매우느림)

20/02/28 12:18:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/02/28 12:18:34 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.

이때 Spark의 경우 Session을 생성을 해야하기 때문에 sbt tool에 들어가서 test를 하면 매번 Session을 생성해서 불필요한 시간을 사용하게 된다.

Spark Test Sample

Project Main Modules:

참고

SparkSessionWrapper

  • SparkSessionWrapper를 생성하는 이유는 SparkSession을 시작/중지하는 비용이 크기 때문에
  • SparkSession을 하나 생성해서 빠르게 코드 수행이 가능하다.
  • trait은 java에서 interface를 생각하면 된다. 사용하기 위해서는 with를 통해 확장이 가능하다.
  • 여기서 lazy를 사용하는 이유는 정의된 spark의 변수가 처음 사용될때 코드가 실행되도록
    • that a val is executed when it is defined
    • a lazy val is executed when it is accessed the first time.
  • getOrCreate의 메소드는 SparkSession이 있으면 가져오고, 없으면 새로 생성하는 메소드

Lazy Example

+ Recent posts