Hive에 테이블이 있는 경우, 파일로 저장하고 싶을때 ,, , NULL처리를 쉘에서 처리하는게 좋다.
아래 sql에 내가 원하고자 하는 쿼리를 입력하면 테이블을 로컬로 가져올 수 있음


import sys
import time
import datetime
import os
import logging

try:
    from subprocess import getstatusoutput
except ImportError:
    from commands import getstatusoutput

import argparse

logger = None 
def get_data(args):

    if not os.path.exists(args.output_path): 
        os.makedirs(args.output_path)

    output_path = os.path.join(args.output_path, 'data.txt') 
        cmd1 = """<sql> sed 's/,/ /g; s/NULL//g' > %s""" % (args.input_table, output_path)

    print (cmd1)
    print ("start getting train_data at %s" % datetime.datetime.now())
    status, output = getstatusoutput(cmd1)
    print ("status %s output %s" % (status, output))
    if status == 0:
        print("finished!")
    print("finish getting train_data at %s" % datetime.datetime.now())

def parse_args():
    parser = argparse.ArgumentParser()

    parser.add_argument('--input_table', required=True)
    parser.add_argument('--output_path', required=True)
    # parser.add_argument('--mode', required=True, choices=["train", "pred"])

    args, _ = parser.parse_known_args()
    return args, parser

if __name__ == '__main__':
    args, parser = parse_args()
    print (args.output_path)

    try:
        get_data(args)
    except Exception as e:
        print (e)

DataFrame GroupBy를 하고 apply를 통해 내가 지정한 함수로 계산이 가능하도록 구현이 가능하다. 보통은 groupby('columnname').sum()을 통해 특정 컬럼에 대해서 그룹별로 합, 평균, 편차 등을 계산하게 된다. 조금더 복잡한 계산을 할 수 있을까?

여기서 복잡하다는 말은 특정 컬럼은 그룹별로 string의 리스트로 반환될 수 있다. 숫자의 경우 단순히 합을 계산하거나 평균을 계산하면 되지만, string은 concat을 하거나, dict, list의 형태로 반환을 해야하는 경우가 있다.

def f(x):
  x = x.C.unique()[0] # C컬럼에서 unique한 값을 빼내고 첫번째 값으로 (groupby로 포함해도 상관없음)
  return pd.Series(dict(A = x['A'].sum(), 
                    B = x['B'].sum(), 
                    C = "{%s}" % ', '.join(x['C'])))

df.groupby('A').apply(f)
"""
A         B               C
A                             
1  2  1.615586  {This, string}
2  4  0.421821         {is, !}
3  3  0.463468             {a}
4  4  0.643961        {random}
"""

pandas를 사용하다보면 여러개의 컬럼의 결과를 하나의 값으로 계산할때도 있지만, 여러개의 값으로 여러개의 값을 계산하고 싶을때가 있다. 이때는 아래와 같이 하면 multiple columns의 결과를 받을 수 있다.

udf에서 두개의 값을 반환한다면, df에서 각각의 컬럼에 대해서 반환값을 넣어주고, zip(*df.apply)를 해줘야 두개의 컬럼으로 각각 값이 들어간다

def preprocessing_udf(x):  
  keyword = preprocessing(x['keyword'])
  context = preprocessing(x['context'])
  return keyword, context

def parallel_preprocessing(df): 
  # df['pre_context'] = df.progress_apply(preprocessing_udf, axis=1)
  df['pre_keyword'], df['pre_context'] = zip(*df.apply(preprocessing_udf, axis=1))
  return df

parallelize_dataframe(result_sample_df, parallel_preprocessing,n_cores=2)
  • dataframe에서 apply의 수행시간이 매우 느리다면 여러개의 프로세스를 사용해 처리하는 방법을 고려해보는게 좋다.
  • multiprocessing.cpu_count()을 통해 cpu 코어의 수를 가져올 수 있다.
  • 코어수를 가져오고 udf를 생성했다면 아래와 같이 구현하면 된다.
def text_preprocessing(df): 
  # df['pre_context'] = df.progress_apply(preprocessing_udf, axis=1) # 다음과같이 진행상황을 출력하고 싶었으나 화면에 이상하게 출력됨
  df['p_context'] = df.apply(preprocessing_udf, axis=1)
  return df
from multiprocessing import  Pool
import multiprocessing
print ('cpu counts: %d' % multiprocessing.cpu_count())

def parallelize_dataframe(df, func, n_cores=8):
    df_split = np.array_split(df, n_cores) # core의 개수로 df를 나눈다. 
    pool = Pool(n_cores) # pool을 cpu 코어 개수 만큼 생성하고
    df = pd.concat(pool.map(func, df_split)) # 나눠진 df를 func으로 넘겨서 수행한다.
    pool.close()
    pool.join() # 모두가 완료될때까지 대기
    return df

result_sample = parallelize_dataframe(result_df, text_preprocessing)
  • pandas에서 apply함수를 통해 하나의 row를 처리할때 시간이 오래 걸리는 경우, 어느정도 얼마나 처리가 되었는지 확인이 어렵다
  • apply의 진행상황을 가져오기 위해서 tqdm을 사용해보자
  • tqdm을 사용하면 아래와 같이 진행상황을 알려준다.
 0%|          | 39/10000 [00:13<58:15,  2.85it/s]  
  • tqdm을 사용하면 수행해야 하는 row의 개수와 row 하나를 처리하는데 걸리는 수행시간을 함께 알려주기 때문에 속도개선을 하는데도 도움을 줄 수 있다. (row한번 작업하는데 얼마나 수행시간이 걸리는지 확인하면서 최적화)
import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Note: tqdm <= v4.8 에서는 tqdm.pandas() 대신에 아래와 같이 사용해야 한다.

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())

요즘은 java 프로젝트를 gradle로 많이 하지만 나는 maven을 사용 중이다.
맨날 쓰는 maven command line이 있지만, 정확히 어떤 의미인지를 파악해봐야겠다.
인텔리제이 로컬 환경에서 package명령어는 성공하나 deploy가 안되는데 그 이유가 궁금하여 확인해보게되었다.

아파치 공식문서

https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html

For the person building a project, this means that it is only necessary to learn a small set of commands to build any Maven project, and the POM will ensure they get the results they desired.

maven이란.

maven은 pom.xml에 description된 정보를 기반으로 프로젝트를 build한다.
goal이란 maven이 행할 수 있는 여러가지 동작을 수행하는 명령어를 의미.

어떤 goal이 있을까?

clean

  • 컴파일 결과물인 target 디렉토리 삭제.

compile

  • compile the source code of the project
  • 모든 소스코드를 컴파일하고 리소스파일은 target/classes 디렉토리에 복사.

package

  • take the compiled code and package it in its distributable format, such as a JAR.
  • compile 수행 후 pom에 있는 정보에 따라 패키징을 수행.
  • description example.
    <executions>
                   <execution>
                       <id>package</id>
                       <phase>package</phase>
                       <goals>
                           <goal>run</goal>
                       </goals>
                       <configuration>
                           <tasks>
                               <copy file="${project.build.directory}/${project.build.finalName}.jar.original"
                                     tofile="./deploy/my-project/${project.build.finalName}.jar"/>
                               <copy todir="./deploy/my-project/bin">
                                   <fileset dir="bin"/>
                               </copy>
                               <copy todir="./deploy/my-project/conf">
                                   <fileset dir="conf"/>
                               </copy>
                               <copy todir="./deploy/my-project/lib">
                                   <fileset dir="${project.build.directory}/dependency/"/>
                               </copy>
                           </tasks>
                       </configuration>
                   </execution>
                   ....
                 <executions>
                   <execution>
                       <phase>package</phase>
                       <goals>
                           <goal>copy-dependencies</goal>
                       </goals>
                       <configuration>
                           <outputDirectory>${project.build.directory}/dependency/</outputDirectory>
                       </configuration>
                   </execution>
               </executions>

install

  • install the package into the local repository, for use as a dependency in other projects locally
  • package 수행 후 local repo에 pakage를 설치. 로컬 다른 프로젝트에서 사용가능함.

validate

  • validate the project is correct and all necessary information is available
  • 프로젝트가 사용 가능한지 확인.

test

  • test the compiled source code using a suitable unit testing framework. These tests should not require the code be packaged or deployed
  • unit test 수행

deploy

  • done in the build environment, copies the final package to the remote repository for sharing with other developers and projects.
  • 환경구성을 마치고 remote repository에 package들을 copy. 실제 릴리즈할때의 배포.

실제로 내가 배포할때 사용하는 명령어.

mvn -U clean --update-snapshots dependency:copy-dependencies package -Dmaven.test.skip=true -Dmaven.test.skip=true

pandas에서 데이터를 읽을때 특정조건을 필터할필요가 있다. 전체를 읽고 필터링하기 보다는 내가 필요한 데이터만 읽고 필터링을 하자! 읽으면서 filtering을 하기 위해서는 chunksize를 사용해야 한다.

import pandas as pd
iter_csv = pd.read_csv('file.csv', iterator=True, chunksize=1000)
df = pd.concat([chunk[chunk['field'] > constant] for chunk in iter_csv])
def read_result():
  lines = open('/tmp/query_result.tsv', 'r').readlines()
  data = []
  cols = lines[0][:-1].split("\t")
  len_cols = len(cols)

  for line in lines[1:]:
    vals = line[:-1].split("\t")  
    if len(vals) != len_cols:
      # print (line[:-1])
      continue
    data.append(vals)  
  return pd.DataFrame(data, columns=cols)

 의 ㄸdata lake / dataware house / data mart

data lake 는 비정형화된 로우 로그 수준의 모든 데이터를 저장.
datawarehouse 는 모델링되고 구조화된 데이터를 저장.
data mart 는 datawarehouse에서 특정 목적이 뚜렷한 성격의 데이터를 따로 가져가는 것으로 datawarehouse에 일부분이 될 수 있다.

data engineering 포지션에 4-6년차 경력직 면접이라면 나올 수 있는 기본적인 정의들이다.
뿐만 아니라 기본적인 용어 정리는 잘 정립해놓는 것이 커뮤니케이션에 좋다.

+ Recent posts