Chaining

작업을 chain의 형태로 진행한다고 해서 chaining 이라고 한다
chaining으로 package를 생성하고 관리하니 코드도 한결 보기 좋아졌다.

Transform

MyTransform

두개의 테이블을 Merge하는 Transform

Udf와 함께 Parameter를 받는 Transform

사용하는 방법

  • DataFrametransform을 이용하면 다음과 같이 사용이 가능하다.
  • withColumn의 성격이 비슷한 녀석끼리 모아주니까 확실히 편하다 (UDF, transform이 한 object에 있다고 생각해보면 잡)
  • 이 방법 외에도 chain을 먼저 설정하고 chain에 함수들을 추가해서 한번에 실행도 가능하다. chained-transformations 여기를 참고!

참고

* elasticsearch에서 백업/복원을 위한 snapshot적용을 위해서는 shared file system이 필요하여 nfs 환경을 구성하였다.

1. nfs-server 구성.

sudo yum install nfs-utils nfs-utils-lib
sudo mkdir -p /home/data

-- mount할 path와 열어줄 client ip작성
sudo vim /etc/exports
ex) /home/data xx.xxx.xxx.xxx(rw,sync,no_root_squash)
sudo /sbin/service rpcbind start

sudo systemctl start nfs-server
sudo systemctl enable nfs-server.service

2. nfs-client 구성.

--xx.xxx.xxx.xxx는 위에 설정한 nfs-server의 ip주소

sudo mkdir -p /home/data
sudo yum -y install showmount
sudo /sbin/service rpcbind start
sudo /sbin/showmount -e xx.xxx.xxx.xxx
sudo mount -t nfs xx.xxx.xxx.xxx:/home/data/ /home/data

* 설치 완료후에서로 동기화 및 읽고쓰기 가능함을 확인할 수 있음.

보통 vi/vim 에서 소스코드를 편집하게 되는데, 이때 indentation이 내가 원하는 포맷에 맞지 않을때가 있다.예를 들어서 들여쓰기가 2가 아닌 4가 될수도 있고, multiple line으로 작성하는 방식도 통일되어 있지 않다면 협업능력이 떨어져 보인다... 나만 보는게 아니니까! 나만 보기에는 편할수 있어도 일반적인 코드스타일을 맞추는게 중요하다고 최근에 생각... 이전에는 빨리 신속하게 기능을 구현하는게 중요하다고 생각했지만 이제는 협업도 중요하게 생각하면서! (잡소리 이상)

Scala Style Guide Indentation

vim-scala

  • vim-scala 에서 plugin vim plugin 다운로드
:SortScalaImports

scala-style

project/plugins.sbt
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")

config를 생성하면 현재 폴더에 scalastyle-config.xml이 생성

$ sbt scalastyleGenerateConfig

scalastyle을 통해서 현재 코드들의 sytle 체크를 하고 target/scalastyle-result.xml을 확인하면 된다.

$ sbt scalastyle # src/main
$ sbt test:scalastyle # src/test

scalastyle 의 룰이 약 60개정도 정해져있다. 이룰을 어기면 warn을 주는데, 각각 코드는 내가 수동으로 고쳐야한다.? 일단 그렇게 하고 있음

참고

DataFrameSuiteBase

  • DataFrameSuiteBase를 이용해서 주어진 두개의 DataFrame이 일치하는지 확인이 가능하다.
  • 간단하게 사용방법은 두개의 결과 df1, df2를 생성하고 assertDataFrameEquals를 사용하면 된다.
  • 사용하면서 까다로웠던것은 DataFrameSchema도 확인을 한다. 심지어 Nullable을 체크...
  • 만약 조인했던 결과에 null이 들어있고, 기대하는 값에 null이 없었다면 두개의 Schema는 다르고 결과적으로 Fail이 난다.
  • expectedDf에 만약 Null의 값을 포함시키기 위해서는 다음 Null값이 포함되는 DataFrame 생성하기 포스트 를 확인
  • 참고로 아이템의 순서도 동일해야 한다! 아이템 정렬하는 방법은 아래 DataFrame 정렬하기 포스트를 확인

참고

ExpectedDf에 null을 포함

  • Scala에서는 null의 값을 넣기 위해서는 다음과 같이 해야 한다.
  • 여기서 문제는 Double의 경우 null을 넣어주면 에러가 발생하는데

Double값 대신에 null을 넣으면 발생하는 에러

val account = sc.parallelize(Seq(
                                 (1, null, 2,"F", null), 
                                 (2, new Integer(2), 4, "F",  1.2),
                                 (3, new Integer(3), 6, "N", 1.0),
                                 (4, null,8,"F", 1.0))).toDF()
account.show()
java.lang.ClassNotFoundException: scala.Any
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:348)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211)
  at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203)
  at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49)
  at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
  at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
  at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
  at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
  at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:700)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
  at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:625)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:344)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:625)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
  at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
  at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
  at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
  at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
  ... 53 elided 
  • 위 에러를 잘보면 scala.Any라는 클래스를 찾을수 없다고 하는데
  • Any는 너무 general type이여서 Spark에서는 어떻게 serialize를 해야하는지 모른다 scala-unified-types
  • 그러니 Integer를 사용할때도 null.asInstanceOf[Integer] 다음과 같이 명시를 해야한다.

createDataFrame으로 생성

  • RDD를 사용하지 않고, 아래와 같이 createDataFrame을 사용하며 문제를 해결할 수 있다.
  • 아래와 같이 하면 Double값을 Some(1.5)로 사용해도 정상적으로 생성이 된다.
  • RDD로 생성하는 방법은 찾지 못함..

spark-testing-base를 사용해서 DataFrame의 일치를 확인하는 과정에서
DataFrame의 SchemaField의 nullable의 값이 틀려도 assert가 발생한다.
해결하는 방법은 아래와 같이 setNullableStateOfColumnnSparkUtil.scala에 작성한 뒤에
특정 컬럼의 NullableState를 변경하면 끝

scala에서 dataframe을 생성하는 코드

코드

spark scala dataframe을 withColumn을 통해 한줄한줄 iteration을 돌려 처리가 가능하다.
내가 원하는 함수에 한개의 row를 넘기고, 그 처리된 결과를 새로운 column으로 생성한다.

예제코드

  • sampleFunc을 생성하고 UserDefinedFunction, udf를 통해 sampleUdf를 생성
  • 생성된 sampleUdfDataFramewithColumn을 이용
    • 첫번째 파라미터는 withColumn을 통해 생성될 신규 컬럼명
    • 두번째 파라미터는 udfFunction + parameters
    • 여기서 lit은 constant 값을 넘길때 사용한다.

+ Recent posts