Chaining

작업을 chain의 형태로 진행한다고 해서 chaining 이라고 한다
chaining으로 package를 생성하고 관리하니 코드도 한결 보기 좋아졌다.

Transform

MyTransform

두개의 테이블을 Merge하는 Transform

Udf와 함께 Parameter를 받는 Transform

사용하는 방법

  • DataFrametransform을 이용하면 다음과 같이 사용이 가능하다.
  • withColumn의 성격이 비슷한 녀석끼리 모아주니까 확실히 편하다 (UDF, transform이 한 object에 있다고 생각해보면 잡)
  • 이 방법 외에도 chain을 먼저 설정하고 chain에 함수들을 추가해서 한번에 실행도 가능하다. chained-transformations 여기를 참고!

참고

spark scala dataframe을 withColumn을 통해 한줄한줄 iteration을 돌려 처리가 가능하다.
내가 원하는 함수에 한개의 row를 넘기고, 그 처리된 결과를 새로운 column으로 생성한다.

예제코드

  • sampleFunc을 생성하고 UserDefinedFunction, udf를 통해 sampleUdf를 생성
  • 생성된 sampleUdfDataFramewithColumn을 이용
    • 첫번째 파라미터는 withColumn을 통해 생성될 신규 컬럼명
    • 두번째 파라미터는 udfFunction + parameters
    • 여기서 lit은 constant 값을 넘길때 사용한다.

DataFrame GroupBy를 하고 apply를 통해 내가 지정한 함수로 계산이 가능하도록 구현이 가능하다. 보통은 groupby('columnname').sum()을 통해 특정 컬럼에 대해서 그룹별로 합, 평균, 편차 등을 계산하게 된다. 조금더 복잡한 계산을 할 수 있을까?

여기서 복잡하다는 말은 특정 컬럼은 그룹별로 string의 리스트로 반환될 수 있다. 숫자의 경우 단순히 합을 계산하거나 평균을 계산하면 되지만, string은 concat을 하거나, dict, list의 형태로 반환을 해야하는 경우가 있다.

def f(x):
  x = x.C.unique()[0] # C컬럼에서 unique한 값을 빼내고 첫번째 값으로 (groupby로 포함해도 상관없음)
  return pd.Series(dict(A = x['A'].sum(), 
                    B = x['B'].sum(), 
                    C = "{%s}" % ', '.join(x['C'])))

df.groupby('A').apply(f)
"""
A         B               C
A                             
1  2  1.615586  {This, string}
2  4  0.421821         {is, !}
3  3  0.463468             {a}
4  4  0.643961        {random}
"""

pandas를 사용하다보면 여러개의 컬럼의 결과를 하나의 값으로 계산할때도 있지만, 여러개의 값으로 여러개의 값을 계산하고 싶을때가 있다. 이때는 아래와 같이 하면 multiple columns의 결과를 받을 수 있다.

udf에서 두개의 값을 반환한다면, df에서 각각의 컬럼에 대해서 반환값을 넣어주고, zip(*df.apply)를 해줘야 두개의 컬럼으로 각각 값이 들어간다

def preprocessing_udf(x):  
  keyword = preprocessing(x['keyword'])
  context = preprocessing(x['context'])
  return keyword, context

def parallel_preprocessing(df): 
  # df['pre_context'] = df.progress_apply(preprocessing_udf, axis=1)
  df['pre_keyword'], df['pre_context'] = zip(*df.apply(preprocessing_udf, axis=1))
  return df

parallelize_dataframe(result_sample_df, parallel_preprocessing,n_cores=2)

+ Recent posts