Hive에서의 파티션의 결과는 ymd=201807/hh24=03의 형태로 값이 넘어온다.
하나의 컬럼에 다음과 같이 들어오기 때문에 로우를 파싱해야한다.
내가 원하는 결과는 ymd=201807, hh24=03의 두개의 컬럼으로 나누고,
나눈 결과를 다시 한번더 처리해서 ymd의 컬럼에 201807, hh24의 컬럼에 03이 들어 가도록 처리하고 싶다.

str.split(delimiter', expand=True)를 통해서 하나의 컬럼을 두개의 컬럼으로 나눌 수 있다.

df[['First','Last']] = df.Name.str.split("_",expand=True) 

def parse_partition(df):
  df[['ymd', 'hh24']] = df['partition'].str.split("/", expand=True)
  df[['ymd', 'ymd_v']] = df['ymd'].str.split("=", expand=True)
  df[['hh24', 'hh24_v']] = df['hh24'].str.split("=", expand=True)
  df = df[['ymd_v','hh24_v']]
  df.columns = ['ymd', 'hh24']
  return df

DataFrame의 결과를 Elasticsearch의 Index로 넣어야 했다.
물론? python에서도 elasticsearch의 패키지가 있다.

아래와 같이 es_client를 정의 할때, 내가 넣고자 하는 ES_HOST를 파라미터로 넘겨주면 된다.

예: Elasticsearch('localhost:9200')

use_these_keys에는 dataframe의 여러 컬럼중에서 내가 es에 넣을 필드의 리스트를 넣어 주면 된다.
아래 helpers.bulk를 이용하여 doc_generator에 정의한 index, type, _id, _source 의 형태로 값이 들어간다.

from elasticsearch import Elasticsearch
from elasticsearch import helpers

es_client = Elasticsearch(http_compress=True)
def doc_generator(df):
    df_iter = df.iterrows()
    for index, document in df_iter:
        yield {
                "_index": 'your_index',
                "_type": "_doc",
                "_id" : f"{document['id']}",
                "_source": filterKeys(document),
            }
    raise StopIteration

use_these_keys = ['id', 'value', 'value1']

def filterKeys(document):
    return {key: document[key] for key in use_these_keys }    


helpers.bulk(es_client, doc_generator(your_dataframe))
  • 전체 컬럼에서 mean()을 계산하는데 계속 inf의 값이 나왔다.
  • 분명히 NaN의 값을 fillna(0.0)으로 했지만 계속 문제가 나옴.
  • head(100).tail(50).head(25) 이런식으로 원식적으로... 접근해보니 inf의 값이 있었다.
  • 아래 방법을 통해서 np.inf로 찾아내고, nan으로 변경하고 fillna(0.0)을 하자
import numpy as np

df.replace([np.inf, -np.inf], np.nan).dropna(subset=["col1", "col2"], how="all")

큰 데이터를 만지다보면, Spark의 DataFrame과 Pandas의 DataFrame의 서로 변환이 꼭 필수다.
예를들어서 Pandas의 DataFrame을 Spark SQL의 테이블로 등록하고, Spark에서 작업을 하기도 한다.
만약 이 방법을 모른다면, 어떻게 테이블로 변환을 할것인가?

Pandas의 DataFrame을 Spark의 DataFrame으로 변환하기 위해서는
spark.createDataFrame(df)를 하면된다. (너무 간단함...)
spark2 이상에서 사용했으니 1.x에서는 알아서 바꿔서 하면 될듯!

보통 spark와 pandas에서의 dataframe의 구분이 어렵기 때문에
pdf (=pandas data frame)으로 적는다. (아님 말고)

import pandas as pd
## Create Pandas Frame
pd_df = pd.DataFrame({u'2017-01-01': 1, u'2017-01-02': 2}.items())
## Convert into Spark DataFrame
spark_df = spark.createDataFrame(pd_df)
## Write Frame out as Table
spark_df.write.mode("overwrite").saveAsTable("db.table_name")

json의 파일이 있을때, dataframe으로 변환을 하고 싶을때가 있다.
보통 json은 리스트 형태로 안에 dictionary로 되어있다.
json.loads의 함수를 이용해 dictionary로 변환하고, from_dict을 이용하자.

import pandas as pd
import json

filename='file.json'
stats = open(filename, 'r').readline()
stats = json.loads(stats)
display(pd.DataFrame.from_dict([stats]))

dataframe에서 데이터 읽을때는 iterator를 이용해서 불필요한 데이터는 메모리에 올리지 않는게 좋다.
데이터를 다 메모리에 읽은 이후에 filter를 통해서 걸러내도 좋지만,
데이터가 크면 이 역시도 무리가 있을수 있으니
다음과 같이 iterator=Truechunksize=10000을 지정해서 메모리에 올리는 방법을 선택하는게 좋다.

import pandas as pd
iter_csv = pd.read_csv('data_in/data.txt', iterator=True, chunksize=1000, delimiter='\t', names=cols)
df = pd.concat([chunk[~np.isnan(chunk['cols'])] for chunk in iter_csv])
df = df.fillna('')  

데이터를 확인할때 가장먼저 NaN을 확인하게 되는데, NaN 컬럼별로 얼마나 있는지 확인하기 위해서는 아래와 같이 하면된다.
이때 axis=0column을 의미하고, axis=1index를 의미한다. (앞으로도 많이 사용한다)

df.isnull().sum(axis = 0) # columns

NaN을 확인할때는 np.isnan(), ~np.isnan()을 이용하자
NaN의 데이터에 내가 원하는 값을 채워넣기 위해서는 아래와 같이 fillna(원하는값)을 사용한다.

dataframe.fillna(0)

특정 컬럼에 대해서 null을 확인하기 위해서는 아래와 같이 isnull()을 추출하고 select하면 된다.

all_df[all_df['City'].isnull()]

+ Recent posts