본문 바로가기

빅데이터

Fluentd로 데이터파이프라인 구축하기 kafka→kafka→s3

Fluentd개요

fluentd는 대용량 데이터처리에 있어 input/output plugin들을 사용해서 파이프라인을 생성할 수 있다. 이 파이프라인은 데이터처리에 적합한데 다양한 플러그인을 폭넓게 개발할수 있을 뿐만아니라 제공되고 있다. fluentd는 다른 fluentd에 전달도 가능한데, 이를 통해 fluentd의 트래픽을 조정하거나 라우팅할 수도 있다. 아키텍쳐 단순성과 안정성으로 인해 많은 IT기업들에서 사용된다.

 

파이프라인 아키텍쳐 구상 및 준비

앞서 말했듯이 강력한 input/output 플러그인 기능을 가지고 있는데, 실제로 어떤 configuration으로 사용 가능할지 알아보기 위해 아래와 같은 아키텍쳐를 구현해보기로 하였다.

kafka, kafka, s3로 이어지는 데이터파이프라인

상기 아키텍쳐에서 파이프라인은 2개로 나뉘어져 있다.

1) kafka to kafka

2) kafka to s3

 

위 파이프라인을 구성하기 위해 사용한 플러그인은 아래와 같다.

- fluent-plugin-kafka : https://github.com/fluent/fluent-plugin-kafka

- fluent-plugin-s3 : https://github.com/fluent/fluent-plugin-s3

 

각 플러그인을 설치하기 위해서는 아래와 같이 command한다.(이미 fluentd가 설치되어있음을 가정)

$ fluent-gem install fluent-plugin-kafka
$ fluent-gem install fluent-plugin-s3

각 파이프라인별 설정

1) kakfa에서 kafka로 데이터 전송하기

특정 kafka topic에서 또 다른 kafka topic으로 데이터를 전송하기 위해서 아래와 같은 configuration을 사용했다.

<source>
  @type kafka

  brokers 123.123.123.71:9092,123.123.123.72:9092,123.123.123.73:9092
  topics fluentd-test-json
  format json
</source>
<match fluentd-test-json>
  @type kafka2

  brokers 11.11.11.71:9092,11.11.11.72:909211.11.11.73:9092
  default_topic fluentd-test

  <format>
    @type json
  </format>
  
  <buffer fluentd-test>
    @type memory
    flush_interval 3s
  </buffer>
</match>

123.123.123으로 시작하는 kafka의 fluentd-test-json 이라는 topic(json 형태)를 source로 한다. 해당 데이터를 최종 목적지로 11.11.11로 시작하는 kafka의 fluentd-test로 보내는 역할을 한다.

 

source에서 사용한 kafka의 topic format은 json인데, 현재 지원하는 format은 text, json, ltsv, msgpack이다. 추가로 ruby-kafka에 대응되는 여러 옵션들을 사용 할 수 있다. (ruby-kafka의 consumer options)

 

match의 kafka2에서는 producer역할을 한다고 볼수 있는데, 이 또한 ruby-kafka에 대응되는 producer option들을 사용할 수 있지만, 따로 설정없이 기본값을 사용하도록 하였다. 다만 buffer을 memory에 담도록하고 flush_interval을 3초로 설정하여 buffer을 두었다. 버퍼기능을 통해 네트워크장애등으로 인해 데이터가 output에 정상적으로 도달하지 못했을때 어떻게 수행할지에 대해 선언가능하다. 이 buffer관련 옵션은 buffer 설명 링크에서 확인할 수 있다. 크게 두가지 type이 있는데 파일 혹은 메모리 방식으로 나뉘어지며, 상기 테스트용 파이프라인에서는 크게 신경쓸 옵션이 아니라서 chunk_limit_sizetotal_limit_size 등의 설정은 하지 않았다.

 

2) kakfa에서 s3로 데이터 전송하기

특정 kafka topic에서 s3로 데이터를 저장하는 기능을 아래와 같이 configration설정했다.

<source>
  @type kafka

  brokers 11.11.11.71:9092,11.11.11:9092,11.11.11.73:9092
  topics fluentd-test
  format json
</source>
<match fluentd-test>
  @type s3

  aws_key_id A----------
  aws_sec_key 3------------------------
  s3_bucket fluentd-di-test
  s3_region ap-northeast-2
  path logs/
  <buffer tag,time>
    @type file
    path /Users/a1003855/Documents/fluentd-test/s3
    timekey 2 # 1 hour partition
    timekey_wait 1s
    timekey_use_utc true # use utc
    chunk_limit_size 10m
  </buffer>
</match>

11.11.11로 시작하는 kafka의 fluentd-test topic의 json 데이터들을 s3에 저장하는 파이프라인이다. 

 

source의 kafka에 대한 역할은 위에 kafka에서 kafka로 전송할때와 같이 consumer로서의 역할을 수행한다.

 

match의 s3에서는 s3저장역할을 실질적으로 수행하는 녀석인데, s3의 bucket, region, aws key, aws sec를 기본적으로 설정해야한다. aws key, sec을 s3에 저장하는 용도로 사용하기 위해 IAM에서 s3FullAmazonS3FullAccess group으로 user를 생성하여 key, sec을 발급받았다.

aws IAM메뉴에서 새로운 접근 사용자를 생성할 수 있다
IAM그룹생성에서 s3를 검색하면 AmazonS3FullAccess를 찾을 수 있다.
유저가 생성되고나면 accessKey와 secKey를 발급받을 수 있다.

파이프라인 실행 및 데이터 입력하기

위에서 설정하고 저장한 configuration file을 기반으로 아래와 같이 2개의 fluentd를 실행시킨다.

$ fluentd -c kafka-kakfa.conf
$ fluentd -c kafka-s3.conf

파이프라인이 정상적으로 작동하는지 확인하기 위해 가장 처음 출발 토픽인 fluentd-test-json에 json을 입력해본다. json입력은 kafka console producer로 간편하게 입력할 수 있다.

$ /usr/local/Cellar/kafka/2.0.0/bin/kafka-console-producer \
 --broker-list 123.123.123.71:9092,123.123.123.72:9092,123.123.123.73:9092 \
 --topic fluentd-test-json
>{"name":"wonyoung","salary":56000,"region":"seoul"}
>{"name":"wonyoung","salary":56000,"region":"seoul"}
>{"name":"wonyoung","salary":56000,"region":"seoul"}
>{"name":"wonyoung","salary":56000,"region":"seoul"}
>{"name":"wonyoung","salary":56000,"region":"seoul"}
>{"name":"wonyoung","salary":56000,"region":"seoul"}
>{"name":"wonyoung","salary":56000,"region":"seoul"}
>

파이프라인 결과 확인하기

각 파이프라인에 위에서 입력한 데이터가 kafka를 지나 s3까지 정상적으로 적재되었는지 확인하기 위해서 kafka console consumer을 통해 확인해보자.

$ /usr/local/Cellar/kafka/2.0.0/bin/kafka-console-consumer \
 --bootstrap-server 123.123.123.71:9092,123.123.123.72:9092,123.123.123.73:9092 \
 --topic fluentd-test-json
{"name":"wonyoung","salary":56000,"region":"seoul"}
{"name":"wonyoung","salary":56000,"region":"seoul"}
{"name":"wonyoung","salary":56000,"region":"seoul"}
{"name":"wonyoung","salary":56000,"region":"seoul"}
{"name":"wonyoung","salary":56000,"region":"seoul"}
{"name":"wonyoung","salary":56000,"region":"seoul"}
{"name":"wonyoung","salary":56000,"region":"seoul"}
$ /usr/local/Cellar/kafka/2.0.0/bin/kafka-console-consumer \
 --bootstrap-server 11.11.11.71:9092,11.11.11.72:909211.11.11.73:9092 \
 --topic fluentd-test
{"name":"wonyoung","salary":56000,"region":"seoul"}
{"name":"wonyoung","salary":56000,"region":"seoul"}
{"name":"wonyoung","salary":56000,"region":"seoul"}
{"name":"wonyoung","salary":56000,"region":"seoul"}
{"name":"wonyoung","salary":56000,"region":"seoul"}
{"name":"wonyoung","salary":56000,"region":"seoul"}
{"name":"wonyoung","salary":56000,"region":"seoul"}

위와 같이 정상적으로 들어오는 모습을 확인할 수 있다. 두번째 kafka topic에 들어오는 속도가 약간 느린것 같은데 아마 producer설정이나 buffer로 인한 약간의 delay로 보인다. 이 부분은 추후 확인해볼것이다. 두번째 kafka topic인 fluentd-test까지 도착한것을 확인했으니 이제 s3에 정상적재되었는지 확인해보자.

s3에 gz으로 적재된 데이터들

gzip형태로 s3에 파일이 잘 적재된 것을 확인할 수 있다. gzip파일에 저장된 내용을 local에 다운받아 확인하면 아래와 같이 적재되어 있음을 확인할 수 있었다.

2019-09-17T17:59:38+09:00	fluentd-test	{"name":"wonyoung","salary":56000,"region":"seoul"}
2019-09-17T17:59:41+09:00	fluentd-test	{"name":"wonyoung","salary":56000,"region":"seoul"}
2019-09-17T17:59:41+09:00	fluentd-test	{"name":"wonyoung","salary":56000,"region":"seoul"}

tsv형태로 묶어서 gzip으로 저장되는 것으로 보인다. 맨 앞부터 utc time, topic, json형태의 data로 저장됨을 확인 할 수 있다. s3에 최종저장하는 방식으로 현재 gzip(default), json, text, lzo, lzma2, gzip_command가 지원되며, built-in variables를 통해 파일이름, directory이름에 %{hostname}, ${time_slice} 등 각종 날짜 등의 옵션을 지정할 수도 있다.

반응형