kafka 30

카프카 컨슈머 멀티쓰레드 애플리케이션 예제코드(for scala)

Kafka-client library를 사용하여 JVM위에 올라가는 consumer/producer를 작성할 수 있습니다. 이번 포스팅에서는 scala로 Kafka consumer를 멀티쓰레드로 실행하는 애플리케이션 예제 코드를 공유, 설명 드리겠습니다. 제약조건 - Kafka consumer - Multi thread(2개 이상) 지원 - Scala 코드 Scala를 실행하는 멀티쓰레드 카프카 컨슈머 애플리케이션의 파일은 크게 4개로 나뉘어져 있습니다. 먼저, Scala application을 실행하는 Main.scala와 실제로 Consumer역할을 하게 되는 Runnable Thread인 ConsumerWorker.scala, Consumer의 상태를 기록할 ConsumerStatus.scala 마..

빅데이터/Kafka 2020.02.24
카프카 장애대응 - Consumer offset 지정하기(by partition)

카프카 consumer로 입수를 진행하다보면 예상치 못하게 데이터가 중복입수 또는 유실될 가능성이 있습니다. - Kafka broker 이슈 - Network 이슈 - Consumer application 이슈 위와 같은 이슈가 발생했을 경우 이슈가 발생했던 시점보다 더 이전의 데이터부터 입수를 진행해야하는데 이때 offset을 지정해야합니다. offset을 지정하는 방법은 아래와 같습니다. 1) Consumer 생성 Consumer 생성은 재입수 하고자 하는 Consumer에 지정하여 신규로 생성. 2) Offset 지정(by console shell script) ./kafka-consumer-groups shell을 통해서 offset을 reset할 수 있습니다. offset reset 옵션: --..

빅데이터/Kafka 2020.01.31
Kafka burrow 모니터링 하지 않는 consumer group 수동제거방법

Kafka burrow를 통해 모니터링 하다보면 더이상 모니터링 해도 되지 않는 consumer group이 남아있는 경우가 있습니다. 이런 경우에는 burrow의 http endpoint를 통해서 특정 consumer group을 제거하여 모니터링 대상에서 제거할 수 있습니다. 이번 포스팅에서는 어떻게 삭제하는지 알려드리도록 하겠습니다. Burrow에서 Consumer group 제거 URL path DELETE /v3/kafka/(cluster)/consumer/(group) CURL 예제 만약 cluster이름이 dev이고 consumer group이 di-test 라면 아래와 같이 작성합니다. curl -XDELETE http://localhost:8000/v3/kafka/dev/consumer/..

빅데이터/Kafka 2020.01.15
AWS MSK(Kafka) 실습 및 예제 코드(Java), 장단점, 가격

Amazon MSK(Managed Streaming for Apache Kafka)는 AWS에서 제공하는 완전 관리형 apache kafka 서비스입니다. 기존에 on-promise에서 사용하던 혹은 EC2로 관리하던 Apache kafka를 SaaS형태로 사용할 수 있습니다. Apache kafka의 특정 버젼을 그대로 사용할 수 있기 때문에 vanila apache kafka의 버젼별 api spec을 따라서 사용할 수 있습니다. 그럼 이번 포스트에서는 AWS MSK cluster를 직접 구성해보고, producer/consumer을 만들어 테스트해보겠습니다. MSK Cluster 생성 Cluster을 생성하기 위해서 AWS CLI를 사용하거나 혹은 AWS console을 사용할 수도 있습니다.이번 ..

개발이야기/AWS 2019.12.26
Kafka | MirrorMaker2 가 release되었습니다.

MirrorMaker 2.0은 KIP-382로 등록되어 있던 주제였으며, 2018년 10월 12일에 최초로 jira(KAFKA-7500)가 생성되었습니다. release될 때까지 약 1년이 걸렸는데요. 이번 포스팅에서는 MirrorMaker2.0이 왜 나왔고, 어떤 기능을 포함하고 있는지 살펴보겠습니다. 개발동기 MirrorMaker1은 꽤 오랜기간 사용되어 왔지만 몇가지 이슈가 있었습니다. Legacy MirrorMaker1의 문제점: - Topic들이 kafka default configuration기준으로 생성되었기 때문에 repartition하기 위해 재작업이 필요함 - ACL, configuration의 변경이 source/sink cluster사이에 sync되지 않음 - Record들은 Def..

빅데이터/Kafka 2019.12.18
스파크 스트리밍-Kafka Data source 소개

KafkaSource는 스파크의 Structured Streaming에서 Apache kafka를 data source로 사용하기 위한 목적이다. 이 library의 source는 아래에서 확인할 수 있다. Spark Kafkasource : https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala 2개의 핵심 function을 아래와 같이 정리할 수 있다. - getOffset() : KafkaOffsetrReader를 사용하여 가장 최근의 offset을 가져온다. - getBatch() : offset의 처음부터 끝까지에 존재..

빅데이터/Kafka 2019.12.03
Udacity 데이터 스트리밍 강의 후기- Apache Kafka

Udacity의 Data streaming Nanodegree program을 듣고 난 후기를 공유하고자 합니다. Nanodegree program은 udacity의 certification program입니다.다. 강의를 모두 수강하고 난 뒤에는 Nanodegree(나노디그리) 라고 하는 학위는 아니고 증명서를 발급받을 수 있습니다. 이번에 들은 강의는 Udacity의 데이터스트리밍 강의입니다. 기본적으로 2달의 수강기간을 예상치로 잡고 있으며 Python, SQL, ETL에 대한 기본적인 개념을 가지고 있어야만 수강하는데 문제가 없다고합니다. 가격은 1달에 한화 약 47만원입니다.(2달에 약 80만원) 데이터 스트리밍은 현대의 빅데이터 비즈니스모델을 실시간으로 분석하고 처리하기 위해 사용됩니다. 이 ..

개발이야기 2019.11.18
Kafka client 2.0 부터 KafkaConsumer.poll(long)은 deprecated됩니다.

Kafka client 2.0부터는 KafkaConsumer.poll(long timoutMs)는 deprecated되었다. KafkaConsumer.poll(long timeoutMs)를 기존처럼 long type parameter로 사용할 경우 poll(Duration timout)으로 redirect된다. 이 수정사항은 KIP-266에 의해서 수정되었고 수정된 사유를 아래와 같이 적어보고자 한다. KafkaConsumer.poll(long) poll method는 consumer에서 빠트릴 수 없는 중요한 메서드이다. 데이터를 가져오는 역할을 하며 무한루프안에서 지속적으로 호출되어 topic으로부터 데이터를 가져오는데 사용된다. 기존에 사용되던 poll() method는 long type 파라미터로..

빅데이터/Kafka 2019.10.22
KSQL 소개 ppt

KSQL - 효과적이고 간편한 스트리밍 프로세스 SQL엔진 from Won young Choi KSQL은 스트리밍 application을 SQL 쿼리를 사용하여 만들 수 있습니다. KSQL에 대한 전반적인 내용을 슬라이드에 담아보았습니다. - Kafka environment - KSQL Examples - KSQL and stream - KSQL format - KSQL data types - KSQL architecture - KSQL 하위호환성 - Custom KSQL(UDF, UDAF) - Who need KSQL - KSQL License

빅데이터/Kafka 2019.10.16
KSQL에러 extraneous input 'properties' expecting

KSQL에서 stream을 생성하다가 아래와 같은 오류를 만났다. line 3:3: extraneous input 'properties' expecting {'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_..

빅데이터/Kafka 2019.10.16
링크드인에서 사용중인 커스텀 Kafka 공개

아래 포스트는 엔지니어 블로그를 번역하였습니다. 2019년 10월 8일 Linkedin이 엔지니어 블로그를 통해 내부에서 사용중인 kakfa를 공개하였다. blog : https://engineering.linkedin.com/blog/2019/apache-kafka-trillion-messages github : https://github.com/linkedin/kafka 아파치 카프카는 링크드인의 인프라에서 중요한 부분을 차지한다. 아파치 카프카는 처음에 in-house 스트림 프로세싱 플랫폼으로 개발하다가 오픈소스로 공개하였고 오늘날에는 많은 적용사례들이 있다. 아파치 카프카는 activity tracking, message exchange, metric gathering과 같은 역할을 하는 소프트..

빅데이터/Kafka 2019.10.15
Airbnb에서 Kafka의 활용

아래 포스트는 airbnb의 kafka meetup 발표내용을 토대로 정리하였습니다. youtube url : https://www.youtube.com/watch?v=-PmKeOpfE54 Kafka at Airbnb AWS에서 0.11 버젼의 kafka를 사용하고 있으며 10개 이상의 cluster과 500개 이상의 broker로 구성되어 있다. 3000개 이상의 topic들이 사용되어지고 있으며 모든 데이터의 양의 PB단위 이상이다. 아직 0.11버전이지만 다음달(2019년 11월)에는 그 이상의 버젼을 사용할 것이라고 한다. 위 데이터 파이프라인 아키텍쳐에서 Rest Proxy는 여러 language로 작성된 application에서 나온 데이터를 kafka로 prodcue하기 위한 역할을 하는데 ..

빅데이터/Kafka 2019.10.14
Kafka의 KSQL 컨셉, 아키텍쳐, 용어, 커스텀 function 적용하는 방법

KSQL은 스트리밍 application을 SQL 쿼리를 사용하여 만들 수 있다. KSQL은 Kafka stream으로 만들어져 있다. KSQL은 Kafka 클러스터와 연동되는데 이는 기본적인 Kafka stream application동작구조와 동일하다. KSQL 아키텍쳐와 주변 application들 KSQL은 아래와 같은 구성요소로 이루어져 있다. KSQL 아키텍쳐 - KSQL 엔진 : KSQL 쿼리가 실행되고 있는 곳 - REST 인터페이스 : ksql 엔진에 client로 access할 수 있는 인터페이스 주변 application - KSQL CLI : KSQL 엔진에 CLI(Command Line Interface)로 접속할 수 있게 도와주는 application - KSQL UI : Con..

빅데이터/Kafka 2019.10.11
아파치 카프카 테스트용 data generator 소개 - ksql-datagen

아파치 카프카는 대규모 분산 스트리밍 플랫폼으로서 데이터파이프라인을 만들때 주로 사용이 가능하다. 데이터파이프라인을 만듦에 있어서 어떤 용도로 어떻게 동작하는지 확인하기 위해서는 직접 consumer로 데이터를 넣어주어 producer을 개발하거나 혹은 처음엔 cli producer/consumer을 사용하여 data를 topic에 넣어준다. Confluent사에서 제공하는 ksql-datagen을 사용한다면 여러 format으로 data를 auto generate하여 topic에 produce가능하다. ksql-datagen은 ksql을 설명하기 위해 처음 소개되었지만, ksql뿐만 아니라 kafka에서 consumer을 테스트하거나 다양한 format(avro 등)을 테스트하기에도 알맞다. 이번 포스..

빅데이터/Kafka 2019.10.10
KSQL - Docker을 사용한 KSQL server, cli 설치 및 실행

KSQL은 SQL을 사용하여 Kafka topic으로 들어오는 record들에 대해 query문을 작성하여 transform하거나 aggregation등을 수행할 수 있게 만들어준다. KSQL을 작성하고 사용하기 위해서는 KSQL 서버가 반드시 필요한데, 이번 포스팅에서는 KSQL 서버를 Docker를 사용하여 설치 및 실행해보고자 한다. 1. KSQL 서버 설치 및 실행 본인의 macbook에서 docker image로 실행하기 때문에 아래와 같이 command를 실행한다. $ docker run \ -p 127.0.0.1:8088:8088 \ -e KSQL_BOOTSTRAP_SERVERS=localhost:9092 \ -e KSQL_LISTENERS=http://0.0.0.0:8088/ \ -e KS..

빅데이터/Kafka 2019.10.08
[빅데이터]Kafka stream과 KSQL 소개 및 설명, 차이점

출처 : slideshare-Kafka Streams vs. KSQL for Stream Processing on top of Apache Kafka Kafka는 Bigdata를 처리하고 운영함에 있어서 필수불가결하다. 이미 많은 IT기업들(카카오, 네이버 등)에서는 kafka로 동작하는 실서비스를 운영하고 있으며 그에 대한 know-how도 상당히 많이 공유되고 있다. Kafka는 단순히 produce, consumer 구조로 사용가능하지만, KSQL이나 kafka stream을 사용하여 더욱 효과적이고 유연하게 데이터를 조작할 수 있다. 이번 포스팅에서는 KSQL과 Kafka stream에 대해서 소개하고 차이점도 알아보는 시간을 가지려고 한다. KSQL KSQL은 streaming SQL 엔진으로서..

빅데이터/Kafka 2019.10.08
[KAFKA]commitSync() 사용시 rebalance발동시 offset variable을 초기화 해야하는 이유?

아래는 oreilly의 Kafka: The Definitive Guide(카프카 핵심가이드)의 commitSync()와 rebalanceListener를 사용하여 topic을 consume하는 예시 코드이다. url : https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html private Map currentOffsets = new HashMap(); private class HandleRebalance implements ConsumerRebalanceListener { public void onPartitionsAssigned(Collection partitions) { } public void onPartit..

빅데이터/Kafka 2019.09.30
Kafka consumer의 Automatic Commit은 중복이 생길 수 있다

https://books.google.co.kr/books?id=a3wzDwAAQBAJ&pg=PA77&lpg=PA77 Kafka: The Definitive Guide Every enterprise application creates data, whether it’s log messages, metrics, user activity, outgoing messages, or something else. And how to move all of this data becomes nearly as important as the data itself. If you’re an application architect, develop books.google.co.jp 참고 출처 - Kafka Definitive gui..

빅데이터/Kafka 2019.09.25
Fluentd로 데이터파이프라인 구축하기 kafka→kafka→s3

Fluentd개요 fluentd는 대용량 데이터처리에 있어 input/output plugin들을 사용해서 파이프라인을 생성할 수 있다. 이 파이프라인은 데이터처리에 적합한데 다양한 플러그인을 폭넓게 개발할수 있을 뿐만아니라 제공되고 있다. fluentd는 다른 fluentd에 전달도 가능한데, 이를 통해 fluentd의 트래픽을 조정하거나 라우팅할 수도 있다. 아키텍쳐 단순성과 안정성으로 인해 많은 IT기업들에서 사용된다. 파이프라인 아키텍쳐 구상 및 준비 앞서 말했듯이 강력한 input/output 플러그인 기능을 가지고 있는데, 실제로 어떤 configuration으로 사용 가능할지 알아보기 위해 아래와 같은 아키텍쳐를 구현해보기로 하였다. 상기 아키텍쳐에서 파이프라인은 2개로 나뉘어져 있다. 1)..

빅데이터 2019.09.17
enable.auto.commit 일 때 Kafka consumer close() method 동작분석

Kafka의 consumer를 사용할 때 offset에 대한 commit offset timing에 대해 여러가지 방법이 있다. 만약 enable.auto.commit=true 로 사용시 consumer에서 close()를 호출한다면 어떻게 kafka 내부에서 offset을 처리하는지 확인해보려고 한다. (아래는 kafka consumer 2.1.0 기준으로 작성되었습니다) 1. close() 호출 @Override public void close() { close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } Kafka의 consumer가 close될 때 timeout 시간 내에 consumer를 종료한다.(default는 30초) 만약 auto-commit=tr..

빅데이터/Kafka 2019.09.02
[confluent]Kafka에 대한 상식 퀴즈 14개

confluent에서 kafka에 관련된 지식 퀴즈 14개를 아래와 같이 public에 공개하였다. 간단하지만 핵심적인 kafka 질문들을 모아놓아서 재밌어서 각 질문들에 대해 정리해보고자 한다. quiz site : https://www.surveymonkey.com/r/FundamentalsSelfAssess 각 문제에 대한 정답은 정답 : 뒤를 드래그 하면 보인다. 1. Kafka에서 record가 실리는 곳은? (1) Category (2) Subject (3) Topic (4) Table 정답 : (3) Topic 2. Kafka에서 각 데이터(record)들이 파티션 내부에서 유니크하게 구분되는 값은? (1) Primary Key (2) Offset (3) Identifier (4) Times..

빅데이터/Kafka 2019.08.30
Kafka burrow http endpoint 정리

이전 포스팅에서 Kafka burrow의 정보와 lag을 평가하는 방법에 대해 알아보았다. Burrow 개요 및 설명 : https://blog.voidmainvoid.net/243 Burrow에서 lag을 정의(평가)하는 방법 : https://blog.voidmainvoid.net/244 이번 포스팅에서는 Burrow에서 제공하는 http endpoint를 알아보고 각 topic별 offset, lag등의 정보를 가져오는 예제를 알아보고자 한다. Burrow Endpoint Burrow의 http server는 kafka, zookeeper의 정보를 효과적으로 가져올 수 있다. 모든 요청은 간단한 HTTP call로 요청되며(대부분 GET request) response는 JSON기반이다. Healt..

빅데이터/Kafka 2019.08.02
Kafka Burrow에서 consumer의 lag을 정의하는(평가하는) 방법 - Consumer Lag Evaluation Rules

이전 포스팅(https://blog.voidmainvoid.net/243)에서 Burrow가 나오게된 배경에 대해 알아보았다. 이 포스팅에서는 burrow가 lag의 상태에 따라 상태를 정의하는 방법에 대해 알아보자. Consumer Lag Evaluation Rules Burrow에 있는 consumer group의 상태는 group이 consume하고 있는 각 partitin에 대한 offset의 규칙에 따라 결정된다. 분리된 threshold를 정하지 않더라도 이 kafka consumer들이 '정상'적으로 작동중인지, '비정상'적으로 작동중인지 판단 할 수 있다. consumer group이 consume하는 모든 파티션에 대해 평가를 함으로서 consumer group이 정상적으로 consume..

빅데이터/Kafka 2019.08.02
Burrow - kafka consumer의 지연(lag)을 모니터링할 수 있는 효과적인 opensource tool

Burrow github : https://github.com/linkedin/Burrow Burrow는 Kafka를 개발한 Linkedin에서 만든 consumer lag monitoring tool이며, opensource로 운영되고 있다. 각 consumer는 특정 topic에 대해 고유의 groupId를 가지고 consuming을 하는데, consume이 잘 되고 있는지 모니터링이 필요하다. Burrow가 나오게된 배경 기존 Kafka client의 consumer의 metrics() method를 사용하여 lag metric(records-lag-max)을 기록할수 있지만, 이는 가장 뒤처진 파티션의 현재 지연을 보여주므로 다른 파티션에서의 정상작동을 잘 감지하기가 어렵다. 또한, consume..

빅데이터/Kafka 2019.08.02
KSQL - Streaming SQL for Apache Kafka 개요 - readme 설명 번역

이 글은 KSQL github repository readme를 번역하였습니다. ☞ https://github.com/confluentinc/ksql KSQL은 아파치 카프카에서 사용가능한 스트리밍 SQL 엔진입니다. Java나 Python같은 프로그래밍 언어로 코드를 짤 필요 없이 간단하고 완전히 상호작용하는 SQL interface를 카프카를 통해 사용가능합니다. KSQL은 분산형, 확장형, 신뢰성, 실시간성이라는 특징을 가지고 있습니다. KSQL을 통해 강력한 aggregation, join, windowing, seessionization 등의 기능을 사용할 수 있습니다. 튜토리얼과 리소스를 사용하기 위해서는 이 링크(바로가기)를 누르세요. 그리고 아래 유튜브에서 KSQL demo를 확인 할 수 ..

빅데이터/Kafka 2019.07.07
아파치 Kafka Consumer의 데이터 처리 내부 architecture 설명 및 튜닝포인트

지난 포스트에서 Kafka producer의 데이터 처리 내부 architecture에 대해서 알아보았다. ☞ 아파치 Kafka Producer architecture 설명 포스팅 이번 포스트에서는 kafka architecture의 Consumer 내부 데이터 흐름에 대해 알아보려고 한다. Kafka Consumer 데이터 내부 처리 순서 #1 : poll(record 취득 api) 호출#2 : 가져오고자 하는 record가 Fetcher queue에 없는 경우, Fetch request를 발동하여 broker에서부터 record를 가져온다.#3 : record batch를 Fetcher queue에 저장#4 : 어디까지 읽었는지에 대한 offset을 consumer측에서 보관#5 : record ba..

빅데이터/Kafka 2018.12.24
아파치 Kafka Producer의 데이터 처리 내부 architecture 설명 및 튜닝포인트

지난 포스트에서 Kafka architecture 및 개요에 대해 알아보았다. ☞ 빅 데이터 처리를 위한 아파치 Kafka 개요 및 설명 포스팅 이번 포스트에서는 kafka architecture의 Producer 내부 데이터 흐름에 대해 알아보려고 한다. Kafka Producer 데이터 내부 처리 순서 #1 : User application thread에서 Record 추가#2 : Record Batch단위로 Record를 압축#3 : 복수의 Record Batch를 묶어 Broker로 보냄#4 : Record Batch를 각 Partition에 저장#5 : 지정시간에 request 에 대한 완료(ack)를 회신 - acks=0 : ack 응답없음(속도가장빠름, 데이터유실확률 가장 높음) - acks..

빅데이터/Kafka 2018.12.24
빅 데이터 처리를 위한 아파치 Kafka 개요 및 설명

Apache Kafka LinkedIn에서 최초로 만들고 opensource화 한 확장성이 뛰어난 분산 메시지 큐(FIFO : First In First Out) → 분산 아키텍쳐 구성, Fault-tolerance한 architecture(with zookeeper), 데이터 유실 방지를 위한 구성이 잘되어 있음→ AMQP, JMS API를 사용하지 않은 TCP기반 프로토콜 사용→ Pub / Sub 메시징 모델을 채용→ 읽기 / 쓰기 성능을 중시 → Producer가 Batch형태로 broker로 메시지 전송이 가능하여 속도 개선→ 파일 시스템에 메시지를 저장하므로, 데이터의 영속성 보장→ Consume된 메시지를 곧바로 삭제하지 않고 offset을 통한 consumer-group별 개별 consume..

빅데이터/Kafka 2018.12.24