datetime을 이용해서 날짜를 더하고 뺄수가 있다.

예를 들어서 8월 24일에서 90일 전의 날짜를 알고싶으면 datetime.timedelta를 사용하면 된다.

import datetime
t0 = datetime.datetime(2019,8,24)
delta = datetime.timedelta(days=90)
t0-delta

데이터 다운로드 받는 웹서버를 띄워서 사용하기 좋음
보통 외부 호스트에 있는 데이터를 가져올때
SimpleHTTPServer, http.server를 이용하면 좋음

  • python2에서는 python -m SimpleHTTPServer
  • python3에서는 python3 -m http.server

Hive에 테이블이 있는 경우, 파일로 저장하고 싶을때 ,, , NULL처리를 쉘에서 처리하는게 좋다.
아래 sql에 내가 원하고자 하는 쿼리를 입력하면 테이블을 로컬로 가져올 수 있음


import sys
import time
import datetime
import os
import logging

try:
    from subprocess import getstatusoutput
except ImportError:
    from commands import getstatusoutput

import argparse

logger = None 
def get_data(args):

    if not os.path.exists(args.output_path): 
        os.makedirs(args.output_path)

    output_path = os.path.join(args.output_path, 'data.txt') 
        cmd1 = """<sql> sed 's/,/ /g; s/NULL//g' > %s""" % (args.input_table, output_path)

    print (cmd1)
    print ("start getting train_data at %s" % datetime.datetime.now())
    status, output = getstatusoutput(cmd1)
    print ("status %s output %s" % (status, output))
    if status == 0:
        print("finished!")
    print("finish getting train_data at %s" % datetime.datetime.now())

def parse_args():
    parser = argparse.ArgumentParser()

    parser.add_argument('--input_table', required=True)
    parser.add_argument('--output_path', required=True)
    # parser.add_argument('--mode', required=True, choices=["train", "pred"])

    args, _ = parser.parse_known_args()
    return args, parser

if __name__ == '__main__':
    args, parser = parse_args()
    print (args.output_path)

    try:
        get_data(args)
    except Exception as e:
        print (e)

DataFrame GroupBy를 하고 apply를 통해 내가 지정한 함수로 계산이 가능하도록 구현이 가능하다. 보통은 groupby('columnname').sum()을 통해 특정 컬럼에 대해서 그룹별로 합, 평균, 편차 등을 계산하게 된다. 조금더 복잡한 계산을 할 수 있을까?

여기서 복잡하다는 말은 특정 컬럼은 그룹별로 string의 리스트로 반환될 수 있다. 숫자의 경우 단순히 합을 계산하거나 평균을 계산하면 되지만, string은 concat을 하거나, dict, list의 형태로 반환을 해야하는 경우가 있다.

def f(x):
  x = x.C.unique()[0] # C컬럼에서 unique한 값을 빼내고 첫번째 값으로 (groupby로 포함해도 상관없음)
  return pd.Series(dict(A = x['A'].sum(), 
                    B = x['B'].sum(), 
                    C = "{%s}" % ', '.join(x['C'])))

df.groupby('A').apply(f)
"""
A         B               C
A                             
1  2  1.615586  {This, string}
2  4  0.421821         {is, !}
3  3  0.463468             {a}
4  4  0.643961        {random}
"""

pandas를 사용하다보면 여러개의 컬럼의 결과를 하나의 값으로 계산할때도 있지만, 여러개의 값으로 여러개의 값을 계산하고 싶을때가 있다. 이때는 아래와 같이 하면 multiple columns의 결과를 받을 수 있다.

udf에서 두개의 값을 반환한다면, df에서 각각의 컬럼에 대해서 반환값을 넣어주고, zip(*df.apply)를 해줘야 두개의 컬럼으로 각각 값이 들어간다

def preprocessing_udf(x):  
  keyword = preprocessing(x['keyword'])
  context = preprocessing(x['context'])
  return keyword, context

def parallel_preprocessing(df): 
  # df['pre_context'] = df.progress_apply(preprocessing_udf, axis=1)
  df['pre_keyword'], df['pre_context'] = zip(*df.apply(preprocessing_udf, axis=1))
  return df

parallelize_dataframe(result_sample_df, parallel_preprocessing,n_cores=2)
  • dataframe에서 apply의 수행시간이 매우 느리다면 여러개의 프로세스를 사용해 처리하는 방법을 고려해보는게 좋다.
  • multiprocessing.cpu_count()을 통해 cpu 코어의 수를 가져올 수 있다.
  • 코어수를 가져오고 udf를 생성했다면 아래와 같이 구현하면 된다.
def text_preprocessing(df): 
  # df['pre_context'] = df.progress_apply(preprocessing_udf, axis=1) # 다음과같이 진행상황을 출력하고 싶었으나 화면에 이상하게 출력됨
  df['p_context'] = df.apply(preprocessing_udf, axis=1)
  return df
from multiprocessing import  Pool
import multiprocessing
print ('cpu counts: %d' % multiprocessing.cpu_count())

def parallelize_dataframe(df, func, n_cores=8):
    df_split = np.array_split(df, n_cores) # core의 개수로 df를 나눈다. 
    pool = Pool(n_cores) # pool을 cpu 코어 개수 만큼 생성하고
    df = pd.concat(pool.map(func, df_split)) # 나눠진 df를 func으로 넘겨서 수행한다.
    pool.close()
    pool.join() # 모두가 완료될때까지 대기
    return df

result_sample = parallelize_dataframe(result_df, text_preprocessing)
  • pandas에서 apply함수를 통해 하나의 row를 처리할때 시간이 오래 걸리는 경우, 어느정도 얼마나 처리가 되었는지 확인이 어렵다
  • apply의 진행상황을 가져오기 위해서 tqdm을 사용해보자
  • tqdm을 사용하면 아래와 같이 진행상황을 알려준다.
 0%|          | 39/10000 [00:13<58:15,  2.85it/s]  
  • tqdm을 사용하면 수행해야 하는 row의 개수와 row 하나를 처리하는데 걸리는 수행시간을 함께 알려주기 때문에 속도개선을 하는데도 도움을 줄 수 있다. (row한번 작업하는데 얼마나 수행시간이 걸리는지 확인하면서 최적화)
import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Note: tqdm <= v4.8 에서는 tqdm.pandas() 대신에 아래와 같이 사용해야 한다.

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())

pandas에서 데이터를 읽을때 특정조건을 필터할필요가 있다. 전체를 읽고 필터링하기 보다는 내가 필요한 데이터만 읽고 필터링을 하자! 읽으면서 filtering을 하기 위해서는 chunksize를 사용해야 한다.

import pandas as pd
iter_csv = pd.read_csv('file.csv', iterator=True, chunksize=1000)
df = pd.concat([chunk[chunk['field'] > constant] for chunk in iter_csv])
def read_result():
  lines = open('/tmp/query_result.tsv', 'r').readlines()
  data = []
  cols = lines[0][:-1].split("\t")
  len_cols = len(cols)

  for line in lines[1:]:
    vals = line[:-1].split("\t")  
    if len(vals) != len_cols:
      # print (line[:-1])
      continue
    data.append(vals)  
  return pd.DataFrame(data, columns=cols)

+ Recent posts