null을 골라내서 개수

join

print 하는 방법

데이터 읽기/쓰기

spark로 프로젝트할때 python? scala? 고민을 하게되는데, 이때 가장 큰 고민은
scala의 경우에는 build하는 sbt, maven을 사용을 해야하는 점이다.
python을 사용하는 사람은 보통 build하는데 익숙하지 않다. 그래서 package의 dependencies를 관리하는게 매우 번거로움
그래서 찾다가보니 proejct를 처음 시작할때 도움을 주는 툴이 있었다.

http://www.foundweekends.org/giter8/

$ mkdir <project_root>
$ sbt new scala/scala-seed.g8

위 명령을 실행하고 나면 해당 디렉토리에 아래와 같은 파일들이 생긴다

build.sbt  project  src

이렇게 제공하면 툴도아니지... 이미 여러개의 template이 존재한다.

Template

$ sbt new scala/scala-seed.g8 --branch myBranch 
$ sbt new holdenk/sparkProjectTemplate.g8
  • sparkProjectTemplate
  • 위 템플릿을 입력하면 아래와 같이 프로젝트 이름, organization, package, library 설정이 가능하다
  • 참고로 project의 이름은 package 이름에 organization + project_name이 되기 때문에 -을 쓰면 에러 발생
This is a g8 template for building a skeleton Spark project.

name [sparkProject]:

build 테스트

  • inputFile.txt에 단어 몇개를 입력하고 아래와 같이 명령어를 입력하면
sbt "run inputFile.txt outputFile.txt"
  • outputFile.txt의 디렉토리에 결과가 생긴다

build

  • sbt assembly를 하면 test가지 포함해서 실행해준다.

# 목표
es code분석이 필요해짐에 따라 es 소스를 intellij에 가져와서 debugging해보자.

# 가이드 문서
es버전 6.6이상에서 7.5이하면 아래 가이드 처럼 하면되는데,
https://www.elastic.co/kr/blog/how-to-debug-elasticsearch-source-code-in-intellij-idea 

7.5이상이라면, 아래 풀리퀘를 참고하여 변경된 부분만 원복해서 위와 같은 가이드로 진행하면 된다.
https://github.com/elastic/elasticsearch/pull/48188/files

 

Switch to debug with server=n by atorok · Pull Request #48188 · elastic/elasticsearch

Before this change one needed to re-start debugging several times, as we launched multiple JVMs in debug mode. With this option the IDE has the option to re-launch and listen for connections again ...

github.com


# 진행

1. es 소스 로컬에 가져오기.

git clone https://github.com/elastic/elasticsearch.git
cd elasticsearch 
git checkout --track origin/7.1


2.  gradle설치하고, java 설치하고, JAVA_HOME 설정하기.

((gradle 설치))
   * mac에서는 gradled은 homebrew로 간단하게 설치가되고
     window에서는 gradle홈피가서 zip파일 다운로드받아서 풀고, 환경변수, path만 잡아주면 됨.

((java 설치))
   * java는 es7.1기준 12가 필요해서, java12 설치함.
   . bash_profile에 java home설정해주고 source ~/.bash_profile
     vi ~/.bash_profile

export JAVA_HOME=/library/java/JavaVirtualMachines/jdk-12.0.2.jdk/Contents/Home
export PATH=${PATH}:$JAVA_HOME/bin


   * java 설치 파일 url : https://www.oracle.com/java/technologies/javase/jdk12-archive-downloads.html

 

Java Archive Downloads - Java SE 12

WARNING: These older versions of the JRE and JDK are provided to help developers debug issues in older systems. They are not updated with the latest security patches and are not recommended for use in production. For production use Oracle recommends downlo

www.oracle.com


3. ./gradlew idea
   * 일단 terminal에서 es repository를 클론해온 elasticsearch 디렉터리로 가서 ./gradlew idea 를 실행
   * 이거 해놓고 intellij에서 import해야 자연스럽게 gradle project import가 편해지는 듯.

4. intellij 열어서, import project > gradle 체크 하고 > 오픈.

5. intellij에 gradle auto enable 이런거 뜨면 ok 체크 해주고, 

6. intellij > file > project setting 가서 jdk 12로 변경해주기.
   * platform setting > sdk에서 jdk12 설치한 거 홈으로 잡아주고,
   * project setting > project에서 project sdk를 jdk12로 잡아주기.

7. 소스 폴더 distribution>build.gradle에서 xpack.security.enabled'를 false로 바꿔줘야 localhost에서 rest api사용가능함.

8. intellij 터미널에서 ./gradlew run --debug-jvm 실행.
   * 하다가 뻑나면 ./gradlew clean하고, ./gradlew idea하고 다시 실행.

9. 아래와 같이 run#start가 뜨면 된건데, 이때 intellij>run>attach to process를 누르고 방금 빌드완료한 snapshot넣어주면,
    디버깅 준비 완료!!
    ============-> 98% EXECUTING [33m 6s]
    > :distribution:run#start

10. 이제 break point 걸어서, localhost:9200/_cluster/settings 등등 api날려보면서 코드 디버깅 할 수 있음.

1. brew설치.

/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

2. 

아래 virtualbox 홈페이지에서 맥용으로 다운로드 하여 패키지 설치.
https://www.virtualbox.org/wiki/Downloads

Downloads – Oracle VM VirtualBox

Download VirtualBox Here you will find links to VirtualBox binaries and its source code. VirtualBox binaries By downloading, you agree to the terms and conditions of the respective license. If you're looking for the latest VirtualBox 6.0 packages, see Virt

www.virtualbox.org

3. minikube 설치

brew install minikube
#권한 에러시 아래 명령어로 권한 변경.
sudo chown -R [:username] /usr/....

 

5. minikube 시작

minikube start

이제 kubectl 명령어 사용 가능! 


minikube설치를 위해 하이퍼바이저가 필요해서 virtualbox를 설치한 것인데,
하이퍼바이저란, 컴퓨터에서 다수의 운영 체제를 동시에 실행하기 위한 논리적 플랫폼.

 

 

 

엘라스틱서치에서 샤드는 refresh, flush, optimize API 과정을 통해 관리된다.
엘라스틱서치의 위의 내용은 모두 루씬 내용인데 서로 용어가 다르기 때문에 잘 정리해 둘 필요가 있다.

루씬 엘라스틱서치
flush refresh
commit flush
merge optimize API

루씬에서는 데이터가 in-memory buffer 기반으로 처리된다.
데이터변경사항이 들어오면 segment를 생성하고, 시스템 캐시에 캐시된 후에, 디스크 동기화가 이루어짐.

1. 루씬에서의 flush = 엘라스틱서치의 refresh.
- segment 생성시 커널 시스템 캐시에 세그먼트가 캐시되어 읽기가 가능해진다.
- 루씬의 ReOpen() 함수를 이용해 IndexSearcher에서 읽을 수 있는 상태.
- 일정주기마다 업데이트 된 문서가 ReOpen() 함수로 처리.

- es 클러스터에 존재하는 모든 샤드에서는 기본적으로 1초마다 한번씩 refresh작업이 수행된다.
- 인덱스를 새로고침한다는 의미인데, refresh가되면 새로 추가한 데이터의 검색이 가능해진다.
- 대량 인덱스 시에는 -1로 비활성화해두면 인덱싱할때 이점이 있다.

2. 루씬에서의 commit = 엘라스틱서치의 flush.
- 물리적으로 디스크 기록을 수행하는 fsync() 함수 호출 작업이다.
- flush가 있기 때문에 매번 commit 필요가 없고, 일정 주기로 commit이 수행된다.
- 루씬에서의 flush작업은 디스크로 쓰기가 이루어지기 전이기 때문에, flush작업까지만 되고 시스템에 문제가 발생하면 데이터 유실 발생 가능성이 있다.

- es에서의 flush는 루씬의 commit 작업과 함께 새로운 translog를 시작한다.
- * translog는 루씬에는 없는 내용으로 샤드의 장애복구를 위해 재공 되는 특수한 파일이다.
- * 샤드는 자신에게 일어나는 모든 변경사항을 translog에 먼저 기록하고, 내부 루씬을 호출한다.
- * 시간이 지나면 translog 파일 크기도 증가한다. 루씬에서 commit이 이루어지면 translog에서 commit 지점까지의 내용이 삭제된다.
- * 데이터가 커널 시스템 캐시에 있다 디스크에 동기화 되지 못하고 유실될 가능성을 대비하여 transhlog를 만든 것이다.
- es에서 flush 작업은 default로 5초에 한번씩 수행되고, api를 통해 flush 주기 조절이 가능하나 추천하지 않는다.

3. 루씬의 merge = 엘라스틱서치의 optimize API
- 검색 성능을 높이기 위해 검색 대상이 되는 세그먼트를 병합하여 세그먼트 수를 줄이는 작업이다.

- 검색 대상이 되는 세그먼트 수를 줄이면, 검색 횟수를 줄일 수 있고, 검색 성능이 올라간다.
- commit작업을 동반하기 때문에 비용이 크다. 

- es에서는 forced merge API를 통해 루씬 merge 작업을 강제 수행할 수 있다.
- 파편화된 다수 세그먼트들을 병합한다.
- 강제 수행하지 않더라도, 백그라운드로 주기적으로 수행된다.

elasticsearch에서는 nested type을 사용할 수 있다.
문서안에 object array를 저장할 수 있고, 그것이 nested type이다.

es가이드에 나온 예제 처럼 user object를 array형태로 넣을 수 있다.

{
  "group" : "fans",
  "user" : [
    {
      "first" : "John",
      "last" :  "Smith",
      "score" : 90
    },
    {
      "first" : "Alice",
      "last" :  "White",
      "scroe" : 100
    }
  ]
}


내가 임의로 score라는 필드도 추가해 넣어보았다.
그러면 nested 영역에 들어가지 않는 "group"이라는 필드를 key로 score의 sum을 구하고 싶다면 어떻게 해야할까?

아래와 같이 reverse_nested 구문을 사용하면 된다.
주의 할 것은 먼저 nested한 영역부터 작성해주고 그 안에 reverse_nested, nested 밖 내용을 작성해주면 된다.

  "aggs": {
    "user": {
      "nested": {
        "path": "user"
      },
      "aggs": {
        "score": {
          "sum": {
            "field": "combined_struct.score"
          },
          "aggs": {
            "group_by_key": {
              "reverse_nested": {}, 
              "aggs": {
                "group_sum_score": {
                  "terms": {
                    "field": "group"
                  }
                }
              }
            }
          }
        }
      }
    }
  }

 

 

https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html

 

Nested datatype | Elasticsearch Reference [7.5] | Elastic

Because nested documents are indexed as separate documents, they can only be accessed within the scope of the nested query, the nested/reverse_nested aggregations, or nested inner hits. For instance, if a string field within a nested document has index_opt

www.elastic.co

 

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-nested-aggregation.html

 

Nested Aggregation | Elasticsearch Reference [7.5] | Elastic

A special single bucket aggregation that enables aggregating nested documents. For example, lets say we have an index of products, and each product holds the list of resellers - each having its own price for the product. The mapping could look like: PUT /p

www.elastic.co

 

elasticsearch에는 fielddata와 doc_values라는 것이 있고, 주요 개념이므로 이해가 필요하다.
더 근본적으로는 루씬의 개념이기 때문에 루씬에서 storedField와 docValue내용을 찾아보는 것이 좋다.

elasticsearch에서 data를 mapping할때에 keyword type과 text type이 있다.
keyword type의 경우 exact매칭에서 사용하고, text type의 경우 analyzed 매칭에 사용된다.
text type의 경우는 형태소 분석을 통해 field를 여러 terms로 나눠서 역인덱싱 과정을 거치게 되고,
keyword type은 그대로 역인덱싱 된다.
* 역인덱스란 키워드가가 어떤 문서에 포함되는지를 저장한다.

검색(search)이라는 것은 "어떤 문서가 이 키워드를 포함하는지가 궁금"하므로 역인덱스된 정보를 통해 검색이 빠른 검색이 가능하다.
그러나 sort, aggregation, accessing field value 와 같은 패턴은 "이 문서에서 이 field value값이 무엇인지"가 관심이므로 역인덱스 정보가 아닌 document를 key로, field정보를 담은 데이터 구조가 필요하다. 그 데이터 구조가 fielddata라는 것이다.

key value
doc1 a:1, b:4, c:7
doc2 a:2, b:5, c:8
doc3 a:3, b:6, c:9

 

그런데 fielddata의 경우 in-memory구조로 작동하기 때문에 많은 heap memory를 소비하게된다. 일단 field가 heap에 로딩되면 그것은 segment의 lifetime동안 남아있게된다. 따라서 비용이 높은 프로세스가된다.

text field를 사용하게 되면 fielddata 데이터 구조를 사용할 수 있는데 위의 설명과 같이 높은 비용때문에 default false로 되어있다.
필요한 경우는 fielddata=true로 옵션을 변경하여 사용하되, memory사용에 주의한다.

keyword field에서는 fileddata의 in-memory에서 동작하는 구조를 개선하여, on-disk data structure인 doc_values 사용이 가능하다.
doc_values는 아래와 같이 column-oriented fashion으로 더욱 유리하게 sort, aggregation 등을 할 수 있다.

key doc1 doc2 doc3
a 1 2 3
b 4 5 6
c 7 8 9


keyword type과 text type은 이렇게 analyzed 되냐 안되냐의 차이뿐 아니라 fielddata, doc_values와 같은 데이터 구조 사용 여부도 달라지므로 적절한 data mapping과 옵션 설정이 중요하다.

 

 

* 위의 설명은 친절한 es 가이드와 루씬내용을 따로 찾아 정리하였습니다.
* es 가이드를 상세히 읽고, 그에 따른 루씬 내용을 찾아보면 이해하기가 좋은 것 같아요.

 

https://www.elastic.co/guide/en/elasticsearch/reference/current/fielddata.html

 

fielddata | Elasticsearch Reference [7.5] | Elastic

Most fields are indexed by default, which makes them searchable. Sorting, aggregations, and accessing field values in scripts, however, requires a different access pattern from search. Search needs to answer the question "Which documents contain this term?

www.elastic.co

 

https://www.elastic.co/guide/en/elasticsearch/reference/current/doc-values.html

 

doc_values | Elasticsearch Reference [7.5] | Elastic

Most fields are indexed by default, which makes them searchable. The inverted index allows queries to look up the search term in unique sorted list of terms, and from that immediately have access to the list of documents that contain the term. Sorting, agg

www.elastic.co

 

elasticsearch.yml을 고치고 재시작하지 않아도, api를 통해 cluster setting을 수정할 수 있다.
얼마전에 cluster setting값을 수정할일이 있어 문서를 찾아보니, 

es는 진짜 가이드문서가 잘 나와있다.
https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-update-settings.html

 

Cluster update settings API | Elasticsearch Reference [7.5] | Elastic

Cluster update settings APIedit Updates cluster-wide settings. With specifications in the request body, this API call can update cluster settings. Updates to settings can be persistent, meaning they apply across restarts, or transient, where they don’t sur

www.elastic.co

 

get  /_cluster/settings?include_defaults=true 으로 살펴보면
cluster setting 값들이 나온다.

setting값의 영역이  'persistent', 'transient', 'default' 로 나뉘는데,
default는 우리가 수정할 수 있는건 아니다.

default에 나와있는 설정값중에 수정할 부분을 persistent나 transient에 수정하면된다. 
persistent에 수정을 하면 클러스터를 재시작해도 설정이 유지된다.
transient에 수정을 하면 클러스터가 재시작되면 설정이 사라지고, 원복된다.

즉, es cluster에서 설정값이 transient->persistent->defaults 순의 우선순위를 갖게되는 것이다.

 

1. 예를 들어, node를 재시작할때 샤드 route를 피하기 위해 아래와 같이 잠시 설정을 none으로 바꿀때는 transient에 변경. 

PUT /_cluster/settings
{
	"transient": {
		"cluster.routing.allocation.enable": "all"
	}
}

 

2. node를 재시작해도 설정을 유지하고 싶으면 persistent에 변경.

PUT /_cluster/settings
{
	"persistent": {
		"search.default_allow_partial_results": "false"
	}
}

 

3. default 영역은 설정파악할때 참고용.
나는 계속 default영역에 바꾸려고.. 이렇게 삽질을 ㅠ.ㅠ
json에서 "default": { } 여기에 바로 update하는거 아니고, 참고용이고
transient, persistent 에 업뎃하세요~

{
	"default" : {
    	"search": {
        	xxx_xxx_xx : "false"
        }
	}
}

 

+ Recent posts