Scala
- [Spark] Scala DataFrame 특정 컬럼으로 정렬하기 (+소스코드) 2020.02.28
- [Spark] Scala Chaining 을 이용해보자 (+예제코드) Transform, UDF, userDefineFunction 까지 코드를 보기좋게! 2020.02.28 1
- [Spark] spark-testing-base에서 DataFrameSuiteBase 사용 2020.02.28
- [Spark] None, null? DataFrame 생성시에 java.lang.ClassNotFoundException: scala.Any 2020.02.28
- [Spark] SchemaField nullable state 변경하는 방법 2020.02.28
- [Spark] scala DataFrame 생성하기 for 예제 2020.02.27
- [Spark] UserDefinedFunction (udf) 구현하는 방법 (+예제코드) 2020.02.27
- [Spark] Scala joda Datetime 사용하는 방법 (+예제코드) - Days, DateTimeFormat, DateTime 2020.02.27
[Spark] Scala DataFrame 특정 컬럼으로 정렬하기 (+소스코드)
2020. 2. 28. 20:21
[Spark] Scala Chaining 을 이용해보자 (+예제코드) Transform, UDF, userDefineFunction 까지 코드를 보기좋게!
2020. 2. 28. 20:18
Chaining
작업을 chain의 형태로 진행한다고 해서 chaining 이라고 한다chaining
으로 package를 생성하고 관리하니 코드도 한결 보기 좋아졌다.
Transform
MyTransform
두개의 테이블을 Merge하는 Transform
Udf와 함께 Parameter를 받는 Transform
사용하는 방법
DataFrame
의transform
을 이용하면 다음과 같이 사용이 가능하다.withColumn
의 성격이 비슷한 녀석끼리 모아주니까 확실히 편하다 (UDF, transform이 한 object에 있다고 생각해보면 잡)- 이 방법 외에도
chain
을 먼저 설정하고chain
에 함수들을 추가해서 한번에 실행도 가능하다. chained-transformations 여기를 참고!
참고
'우리는 개발자 > Data Engineering' 카테고리의 다른 글
[elasticsearch] 클러스터 관리에 필요한 내용 + plugin에 대한 짧막한 설명. (0) | 2020.03.03 |
---|---|
[Spark] Scala DataFrame 특정 컬럼으로 정렬하기 (+소스코드) (0) | 2020.02.28 |
[Spark] Scala Style Guide, (Vi/Vim)에서 편집할때 indentation/Highlight Style Plugin (0) | 2020.02.28 |
[Spark] spark-testing-base에서 DataFrameSuiteBase 사용 (0) | 2020.02.28 |
[Spark] None, null? DataFrame 생성시에 java.lang.ClassNotFoundException: scala.Any (0) | 2020.02.28 |
[Spark] spark-testing-base에서 DataFrameSuiteBase 사용
2020. 2. 28. 16:44
DataFrameSuiteBase
DataFrameSuiteBase
를 이용해서 주어진 두개의DataFrame
이 일치하는지 확인이 가능하다.- 간단하게 사용방법은 두개의 결과
df1
,df2
를 생성하고assertDataFrameEquals
를 사용하면 된다. - 사용하면서 까다로웠던것은
DataFrame
의Schema
도 확인을 한다. 심지어Nullable
을 체크...- Schema에서 Nullable의 상태를 true, false로 변경시키기 위해서는 다음 Schema Nullable 변경 포스트 확인
- 만약 조인했던 결과에
null
이 들어있고, 기대하는 값에null
이 없었다면 두개의 Schema는 다르고 결과적으로 Fail이 난다. - expectedDf에 만약 Null의 값을 포함시키기 위해서는 다음 Null값이 포함되는 DataFrame 생성하기 포스트 를 확인
- 참고로 아이템의 순서도 동일해야 한다! 아이템 정렬하는 방법은 아래 DataFrame 정렬하기 포스트를 확인
참고
'우리는 개발자 > Data Engineering' 카테고리의 다른 글
ExpectedDf에 null
을 포함
- Scala에서는
null
의 값을 넣기 위해서는 다음과 같이 해야 한다. - 여기서 문제는
Double
의 경우null
을 넣어주면 에러가 발생하는데
Double값 대신에 null을 넣으면 발생하는 에러
val account = sc.parallelize(Seq(
(1, null, 2,"F", null),
(2, new Integer(2), 4, "F", 1.2),
(3, new Integer(3), 6, "N", 1.0),
(4, null,8,"F", 1.0))).toDF()
account.show()
java.lang.ClassNotFoundException: scala.Any
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44)
at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203)
at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:700)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:625)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:625)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
... 53 elided
- 위 에러를 잘보면
scala.Any
라는 클래스를 찾을수 없다고 하는데 Any
는 너무 general type이여서 Spark에서는 어떻게 serialize를 해야하는지 모른다 scala-unified-types- 그러니 Integer를 사용할때도
null.asInstanceOf[Integer]
다음과 같이 명시를 해야한다.
createDataFrame으로 생성
RDD
를 사용하지 않고, 아래와 같이createDataFrame
을 사용하며 문제를 해결할 수 있다.- 아래와 같이 하면
Double
값을Some(1.5)
로 사용해도 정상적으로 생성이 된다. RDD
로 생성하는 방법은 찾지 못함..
'우리는 개발자 > Data Engineering' 카테고리의 다른 글
[Spark] Scala Style Guide, (Vi/Vim)에서 편집할때 indentation/Highlight Style Plugin (0) | 2020.02.28 |
---|---|
[Spark] spark-testing-base에서 DataFrameSuiteBase 사용 (0) | 2020.02.28 |
[Spark] SchemaField nullable state 변경하는 방법 (0) | 2020.02.28 |
[Spark] scala DataFrame 생성하기 for 예제 (0) | 2020.02.27 |
[Spark] UserDefinedFunction (udf) 구현하는 방법 (+예제코드) (0) | 2020.02.27 |
[Spark] SchemaField nullable state 변경하는 방법
2020. 2. 28. 12:03
spark-testing-base를 사용해서 DataFrame의 일치를 확인하는 과정에서
DataFrame의 SchemaField의 nullable
의 값이 틀려도 assert
가 발생한다.
해결하는 방법은 아래와 같이 setNullableStateOfColumnn
을 SparkUtil.scala
에 작성한 뒤에
특정 컬럼의 NullableState를 변경하면 끝
'우리는 개발자 > Data Engineering' 카테고리의 다른 글
[Spark] spark-testing-base에서 DataFrameSuiteBase 사용 (0) | 2020.02.28 |
---|---|
[Spark] None, null? DataFrame 생성시에 java.lang.ClassNotFoundException: scala.Any (0) | 2020.02.28 |
[Spark] scala DataFrame 생성하기 for 예제 (0) | 2020.02.27 |
[Spark] UserDefinedFunction (udf) 구현하는 방법 (+예제코드) (0) | 2020.02.27 |
[Spark] Scala joda Datetime 사용하는 방법 (+예제코드) - Days, DateTimeFormat, DateTime (0) | 2020.02.27 |
[Spark] scala DataFrame 생성하기 for 예제
2020. 2. 27. 18:36
scala에서 dataframe을 생성하는 코드
코드
'우리는 개발자 > Data Engineering' 카테고리의 다른 글
[Spark] None, null? DataFrame 생성시에 java.lang.ClassNotFoundException: scala.Any (0) | 2020.02.28 |
---|---|
[Spark] SchemaField nullable state 변경하는 방법 (0) | 2020.02.28 |
[Spark] UserDefinedFunction (udf) 구현하는 방법 (+예제코드) (0) | 2020.02.27 |
[Spark] Scala joda Datetime 사용하는 방법 (+예제코드) - Days, DateTimeFormat, DateTime (0) | 2020.02.27 |
[Spark] 시작할때 살펴보면 좋은 유용한 사이트 모음 (0) | 2020.02.27 |
[Spark] UserDefinedFunction (udf) 구현하는 방법 (+예제코드)
2020. 2. 27. 18:33
spark scala dataframe을 withColumn
을 통해 한줄한줄 iteration을 돌려 처리가 가능하다.
내가 원하는 함수에 한개의 row를 넘기고, 그 처리된 결과를 새로운 column으로 생성한다.
예제코드
sampleFunc
을 생성하고UserDefinedFunction
,udf
를 통해sampleUdf
를 생성- 생성된
sampleUdf
는DataFrame
의withColumn
을 이용- 첫번째 파라미터는
withColumn
을 통해 생성될 신규 컬럼명 - 두번째 파라미터는
udfFunction
+ parameters - 여기서
lit
은 constant 값을 넘길때 사용한다.
- 첫번째 파라미터는
'우리는 개발자 > Data Engineering' 카테고리의 다른 글
[Spark] SchemaField nullable state 변경하는 방법 (0) | 2020.02.28 |
---|---|
[Spark] scala DataFrame 생성하기 for 예제 (0) | 2020.02.27 |
[Spark] Scala joda Datetime 사용하는 방법 (+예제코드) - Days, DateTimeFormat, DateTime (0) | 2020.02.27 |
[Spark] 시작할때 살펴보면 좋은 유용한 사이트 모음 (0) | 2020.02.27 |
[Spark] Scala Test (Library, Sbt Test) (0) | 2020.02.26 |
[Spark] Scala joda Datetime 사용하는 방법 (+예제코드) - Days, DateTimeFormat, DateTime
2020. 2. 27. 18:22
spark scala 코딩을 할때 datetime에 관련된 코딩을 한다면 아래 내용은 필수!!!
sbt build를 사용하기 때문에 libraryDependencies
가 필요하다.
build.sbt에 아래와 같이 추가
libraryDependencies ++= Seq(
(...)
"joda-time" % "joda-time" % "2.10.5",
"org.joda" % "joda-convert" % "1.8.1"
),
joda-time
만 설치하면 아마도 아래와 같이 WARN이 발생하니 org.joda
도 함께 라이브러리에 추가
class org.joda.convert.fromstring not found - continuing with a stub
소스코드
- 소스코드를 보면
DateTime
,Days
,DateTimeFormat
을 이용 - string을
DateTimeFormat.forPattern
을 이용해 DateTime으로 변환 Days.daysBetween
을 이용해 두개의 DateTime간의 날짜 차이를 계산- 오늘 날짜 DateTime 가져오기
- 30일전 DateTime 생성하기
- 두개의
DateTime
의 차이 milliseconds 계산
'우리는 개발자 > Data Engineering' 카테고리의 다른 글
[Spark] scala DataFrame 생성하기 for 예제 (0) | 2020.02.27 |
---|---|
[Spark] UserDefinedFunction (udf) 구현하는 방법 (+예제코드) (0) | 2020.02.27 |
[Spark] 시작할때 살펴보면 좋은 유용한 사이트 모음 (0) | 2020.02.27 |
[Spark] Scala Test (Library, Sbt Test) (0) | 2020.02.26 |
[Spark] SparkSessionWrapper를 구현하자 (0) | 2020.02.26 |