jupyter notebook --generate-config의 명령을 통해 기본 경로 ~/.jupyterconfig파일을 생성할수 있다.

만약 이미 생성이 되어 있다면 jupyter --config-dir을 통해 경로를 확인할 수 있다. 경로를 확인하고 아래 코드를 통해 환경변수를 추가하자

import os
c = get_config()
os.environ['LD_LIBRARY_PATH'] = '/home1/jslee/library/lib'
os.environ['PYTHONPATH'] = '${PYTHONPATH}:/home1/jslee/library/binding/python'

c.Spawner.env.update('LD_LIBRARY_PATH')
c.Spawner.env.update('PYTHONPATH')

Ipython, JupyterNotebook을 사용하다보면,
df.head(100)의 결과를 출력할 경우가 있다. (하지만? 10개정도 보일것이다.)
df.head(1)의 결과를 출력하니 컬럼에 ...으로 나올때도 있다.
df.head(1)의 결과에서 dataframe의 폭이 좁을때가 있다.

이런 여러가지 상황에서 dataframe의 출력 결과를 설정하는게 필요하다.
아래 pd.option.display 또는 pd.set_option을 통해 변경이 가능하다.

pd.options.display.max_columns = 30
pd.options.display.max_rows = 20

pd.set_option('display.height', 1000)
pd.set_option('display.max_rows', 200)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

Hive에서의 파티션의 결과는 ymd=201807/hh24=03의 형태로 값이 넘어온다.
하나의 컬럼에 다음과 같이 들어오기 때문에 로우를 파싱해야한다.
내가 원하는 결과는 ymd=201807, hh24=03의 두개의 컬럼으로 나누고,
나눈 결과를 다시 한번더 처리해서 ymd의 컬럼에 201807, hh24의 컬럼에 03이 들어 가도록 처리하고 싶다.

str.split(delimiter', expand=True)를 통해서 하나의 컬럼을 두개의 컬럼으로 나눌 수 있다.

df[['First','Last']] = df.Name.str.split("_",expand=True) 

def parse_partition(df):
  df[['ymd', 'hh24']] = df['partition'].str.split("/", expand=True)
  df[['ymd', 'ymd_v']] = df['ymd'].str.split("=", expand=True)
  df[['hh24', 'hh24_v']] = df['hh24'].str.split("=", expand=True)
  df = df[['ymd_v','hh24_v']]
  df.columns = ['ymd', 'hh24']
  return df

DataFrame의 결과를 Elasticsearch의 Index로 넣어야 했다.
물론? python에서도 elasticsearch의 패키지가 있다.

아래와 같이 es_client를 정의 할때, 내가 넣고자 하는 ES_HOST를 파라미터로 넘겨주면 된다.

예: Elasticsearch('localhost:9200')

use_these_keys에는 dataframe의 여러 컬럼중에서 내가 es에 넣을 필드의 리스트를 넣어 주면 된다.
아래 helpers.bulk를 이용하여 doc_generator에 정의한 index, type, _id, _source 의 형태로 값이 들어간다.

from elasticsearch import Elasticsearch
from elasticsearch import helpers

es_client = Elasticsearch(http_compress=True)
def doc_generator(df):
    df_iter = df.iterrows()
    for index, document in df_iter:
        yield {
                "_index": 'your_index',
                "_type": "_doc",
                "_id" : f"{document['id']}",
                "_source": filterKeys(document),
            }
    raise StopIteration

use_these_keys = ['id', 'value', 'value1']

def filterKeys(document):
    return {key: document[key] for key in use_these_keys }    


helpers.bulk(es_client, doc_generator(your_dataframe))
  • 전체 컬럼에서 mean()을 계산하는데 계속 inf의 값이 나왔다.
  • 분명히 NaN의 값을 fillna(0.0)으로 했지만 계속 문제가 나옴.
  • head(100).tail(50).head(25) 이런식으로 원식적으로... 접근해보니 inf의 값이 있었다.
  • 아래 방법을 통해서 np.inf로 찾아내고, nan으로 변경하고 fillna(0.0)을 하자
import numpy as np

df.replace([np.inf, -np.inf], np.nan).dropna(subset=["col1", "col2"], how="all")

spring boot AOP 구현하기.
Aop를 잘 모르지만

  • Http 요청이 왔을때 끼어들어 요청에 대한 권한이 있는지 체크하기 위한 AOP를 개발하려한다.
    : http요청이 왔을때 실제 응답을 주기전에 권한을 체크하고 권한이 있으면 계속 진행하도록 해주고, 권한이 없으면 진행을 막는다.
  • 더불어 특정 controller에는 권한체크를 하지 않고, http request로 온 header안에 정보를 parameter로 전달하려한다.
  1. pom.xml
    <dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
  1. AopConfig.java
    @Around 이노테이션으로 requestMapping으로 들어오는 요청에 대해서 AOP를 적용할것임을 명시.
    특정 controller 접근시에는 parameter를 넘겨주도록했음.
    주석 참고.
@Configuration
@Aspect
@EnableAspectJAutoProxy() 
public class AopConfig { 
    ...
    @Around("@annotation(requestMapping)") 
    public final Object checkPermission(final ProceedingJoinPoint pjp
                                      , final RequestMapping requestMapping){
        .... 
        //권한을 체크하는 함수에서 true값을 return해주면 계속해서 진행 
        if (permission.check(pjp, requestMapping, httpServletRequest)) { 
            return pjp.proceed();
        } 

        ....
        String email = .... ;
        //특정 controller접근시에는 권한체크는 하지 않고 파라미터만 전달하여 계속 진행
        if (pjp.getSignature().toString().contains("com.xxx.xxx.controller.xxx")) {
            return pjp.proceed(new Object\[\]{email});
        } 
        ... 


        //권한 없는 경우 null을 return하여 계속해서 진행할 수 없음. 
        return null; 
        } 
 }

참고.
proceedjointpoint로 parameter를 넘겨줄때는 넘겨준 parameter와 동일한 명칭, 순서로 받아야한다.

@ResponseBody 
@RequestMapping(method = RequestMethod.GET, value = "/user") 
public LoginUser getLoginEmail(final String email) { 
    LoginUser loginUser = new LoginUser(); 
    loginUser.setEmail(email);
    return loginUser;
}

큰 데이터를 만지다보면, Spark의 DataFrame과 Pandas의 DataFrame의 서로 변환이 꼭 필수다.
예를들어서 Pandas의 DataFrame을 Spark SQL의 테이블로 등록하고, Spark에서 작업을 하기도 한다.
만약 이 방법을 모른다면, 어떻게 테이블로 변환을 할것인가?

Pandas의 DataFrame을 Spark의 DataFrame으로 변환하기 위해서는
spark.createDataFrame(df)를 하면된다. (너무 간단함...)
spark2 이상에서 사용했으니 1.x에서는 알아서 바꿔서 하면 될듯!

보통 spark와 pandas에서의 dataframe의 구분이 어렵기 때문에
pdf (=pandas data frame)으로 적는다. (아님 말고)

import pandas as pd
## Create Pandas Frame
pd_df = pd.DataFrame({u'2017-01-01': 1, u'2017-01-02': 2}.items())
## Convert into Spark DataFrame
spark_df = spark.createDataFrame(pd_df)
## Write Frame out as Table
spark_df.write.mode("overwrite").saveAsTable("db.table_name")

json의 파일이 있을때, dataframe으로 변환을 하고 싶을때가 있다.
보통 json은 리스트 형태로 안에 dictionary로 되어있다.
json.loads의 함수를 이용해 dictionary로 변환하고, from_dict을 이용하자.

import pandas as pd
import json

filename='file.json'
stats = open(filename, 'r').readline()
stats = json.loads(stats)
display(pd.DataFrame.from_dict([stats]))

+ Recent posts