ExpectedDf에 null
을 포함
- Scala에서는
null
의 값을 넣기 위해서는 다음과 같이 해야 한다. - 여기서 문제는
Double
의 경우null
을 넣어주면 에러가 발생하는데
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val account = sc.parallelize(Seq( | |
(1, null.asInstanceOf[Integer], 2,"F", 2.0), | |
(2, new Integer(2), 4, "F", 1.2), | |
(3, new Integer(3), 6, "N", 1.0), | |
(4, null.asInstanceOf[Integer],8,"F", 1.0))).toDF() | |
account.show() |
Double값 대신에 null을 넣으면 발생하는 에러
val account = sc.parallelize(Seq(
(1, null, 2,"F", null),
(2, new Integer(2), 4, "F", 1.2),
(3, new Integer(3), 6, "N", 1.0),
(4, null,8,"F", 1.0))).toDF()
account.show()
java.lang.ClassNotFoundException: scala.Any
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44)
at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203)
at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:700)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:625)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:625)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
... 53 elided
- 위 에러를 잘보면
scala.Any
라는 클래스를 찾을수 없다고 하는데 Any
는 너무 general type이여서 Spark에서는 어떻게 serialize를 해야하는지 모른다 scala-unified-types- 그러니 Integer를 사용할때도
null.asInstanceOf[Integer]
다음과 같이 명시를 해야한다.
createDataFrame으로 생성
RDD
를 사용하지 않고, 아래와 같이createDataFrame
을 사용하며 문제를 해결할 수 있다.- 아래와 같이 하면
Double
값을Some(1.5)
로 사용해도 정상적으로 생성이 된다. RDD
로 생성하는 방법은 찾지 못함..
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import spark.implicits._ | |
val df = spark.createDataFrame(Seq( | |
(1, None, 2, "F"), | |
(2, Some(2), 4, "F"), | |
(3, Some(3), 6, "N"), | |
(4, None, 8, "F") | |
)).toDF("ACCT_ID", "M_CD", "C_CD","IND") | |
df.show | |
+-------+----+----+---+ | |
|ACCT_ID|M_CD|C_CD|IND| | |
+-------+----+----+---+ | |
| 1|null| 2| F| | |
| 2| 2| 4| F| | |
| 3| 3| 6| N| | |
| 4|null| 8| F| | |
+-------+----+----+---+ | |
df.printSchema | |
root | |
|-- ACCT_ID: integer (nullable = false) | |
|-- M_CD: integer (nullable = true) | |
|-- C_CD: integer (nullable = false) | |
|-- IND: string (nullable = true) |
'우리는 개발자 > Data Engineering' 카테고리의 다른 글
[Spark] Scala Style Guide, (Vi/Vim)에서 편집할때 indentation/Highlight Style Plugin (0) | 2020.02.28 |
---|---|
[Spark] spark-testing-base에서 DataFrameSuiteBase 사용 (0) | 2020.02.28 |
[Spark] SchemaField nullable state 변경하는 방법 (0) | 2020.02.28 |
[Spark] scala DataFrame 생성하기 for 예제 (0) | 2020.02.27 |
[Spark] UserDefinedFunction (udf) 구현하는 방법 (+예제코드) (0) | 2020.02.27 |