이 글은 오픈소스 아파치 카프카 공식 자바 라이브러리를 사용하여 개발할 때 JMX exporter를 사용하여 producer의 지표를 수집하기 위한 글입니다.
1) KafkaProducer 애플리케이션 개발
build.gradle 코드
plugins {
id 'java'
}
group 'com.example'
version '1.0'
repositories {
mavenCentral()
}
dependencies {
compile 'org.apache.kafka:kafka-clients:2.5.0'
compile 'org.slf4j:slf4j-simple:1.7.30'
}
task uberJar(type: Jar) {
from sourceSets.main.output
dependsOn configurations.runtimeClasspath
from {
configurations.runtimeClasspath.findAll { it.name.endsWith('jar') }.collect { zipTree(it) }
}
manifest {
attributes "Main-Class": "com.example.SimpleProducer"
}
}
SimpleProducer.java 코드
package com.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class SimpleProducer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private final static String TOPIC_NAME = "test-log";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) throws Exception{
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
int i=0;
for(;i<100000;i++){
String messageValue = "{\"message\":\"test\"}";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
producer.send(record);
Thread.sleep(100);
}
producer.flush();
producer.close();
}
}
2) 빌드
uberJar를 통해 해당 코드를 빌드를 수행하면 /build/libs 디렉토리에 실행가능한 java jar파일이 생성됩니다.
$ ls build/libs 130 ↵
simple-kafka-producer-1.0.jar
3) JMX를 위한 jar파일과 yaml 준비
자바 애플리케이션에서 jmx 데이터를 뽑아내기 위해서 다양한 방법이 있는는데, 프로메테우스를 통해 데이터를 수집하기 위해서 jmx_prometheus_javaagent-0.16.1.jar를 사용하면 http 엔드포인트를 통해 데이터를 수집할 수 있습니다.
jmx_prometheus_javaagent-0.16.1.jar는 https://github.com/prometheus/jmx_exporter/releases 에서 다운로드 할 수 있습니다.
그리고 두번째로 필요한 것은 jmx에 대한 정의가 담긴 yaml입니다.
producer.yaml
lowercaseOutputName: true
rules:
#kafka.connect:type=app-info,client-id="{clientid}"
#kafka.consumer:type=app-info,client-id="{clientid}"
#kafka.producer:type=app-info,client-id="{clientid}"
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>start-time-ms'
name: kafka_$1_start_time_seconds
labels:
clientId: "$2"
help: "Kafka $1 JMX metric start time seconds"
type: GAUGE
valueFactor: 0.001
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>(commit-id|version): (.+)'
name: kafka_$1_$3_info
value: 1
labels:
clientId: "$2"
$3: "$4"
help: "Kafka $1 JMX metric info version and commit-id"
type: GAUGE
#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+), partition=(.+)><>(.+-total|compression-rate|.+-avg|.+-replica|.+-lag|.+-lead)
name: kafka_$2_$6
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
help: "Kafka $1 JMX metric type $2"
type: GAUGE
#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+)><>(.+-total|compression-rate|.+-avg)
name: kafka_$2_$5
labels:
clientId: "$3"
topic: "$4"
help: "Kafka $1 JMX metric type $2"
type: GAUGE
#kafka.connect:type=connect-node-metrics,client-id="{clientid}",node-id="{nodeid}"
#kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id="{nodeid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), node-id=(.+)><>(.+-total|.+-avg)
name: kafka_$2_$5
labels:
clientId: "$3"
nodeId: "$4"
help: "Kafka $1 JMX metric type $2"
type: UNTYPED
#kafka.connect:type=kafka-metrics-count,client-id="{clientid}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-coordinator-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-metrics,client-id="{clientid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.*)><>(.+-total|.+-avg|.+-bytes|.+-count|.+-rate|.+-ratio|.+-age|.+-flight|.+-threads|.+-connectors|.+-tasks|.+-ago)
name: kafka_$2_$4
labels:
clientId: "$3"
help: "Kafka $1 JMX metric type $2"
type: GAUGE
#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}<> status"
- pattern: 'kafka.connect<type=connector-task-metrics, connector=(.+), task=(.+)><>status: ([a-z-]+)'
name: kafka_connect_connector_status
value: 1
labels:
connector: "$1"
task: "$2"
status: "$3"
help: "Kafka Connect JMX Connector status"
type: GAUGE
#kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
- pattern: kafka.connect<type=(.+)-metrics, connector=(.+), task=(.+)><>(.+-total|.+-count|.+-ms|.+-ratio|.+-avg|.+-failures|.+-requests|.+-timestamp|.+-logged|.+-errors|.+-retries|.+-skipped)
name: kafka_connect_$1_$4
labels:
connector: "$2"
task: "$3"
help: "Kafka Connect JMX metric type $1"
type: GAUGE
#kafka.connect:type=connector-metrics,connector="{connector}"
#kafka.connect:type=connect-worker-metrics,connector="{connector}"
- pattern: kafka.connect<type=connect-worker-metrics, connector=(.+)><>([a-z-]+)
name: kafka_connect_worker_$2
labels:
connector: "$1"
help: "Kafka Connect JMX metric $1"
type: GAUGE
#kafka.connect:type=connect-worker-metrics
- pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+)
name: kafka_connect_worker_$1
help: "Kafka Connect JMX metric worker"
type: GAUGE
#kafka.connect:type=connect-worker-rebalance-metrics
- pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+)
name: kafka_connect_worker_rebalance_$1
help: "Kafka Connect JMX metric rebalance information"
type: GAUGE
상기 2개 파일을 특정 경로에 저장합니다.
4) 실행
javaagent를 통해 jar파일과 yaml파일을 참조하도록 하고 jar파일을 실행합니다.
$ java -javaagent:/Users/dvwy/Desktop/jmx_prometheus_javaagent-0.16.1.jar=9400:/Users/dvwy/Desktop/producer.yaml -jar simple-kafka-producer-1.0.jar
5) jmx 데이터 확인
localhost:9400 엔드포인트로 접근하면 kafka producer 애플리케이션에서 jmx 데이터를 노출시키고 있는 것을 확인할 수 있습니다.
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
WINDOW STORE CHANGE LOG ADDITIONAL RETENTION MS CONFIG 옵션 분석 (0) | 2022.06.15 |
---|---|
초급자를 위한 [아파치 카프카 애플리케이션 개발]온라인 강의를 출시하였습니다. (0) | 2022.06.02 |
카프카를 활용한레이싱카 센서 실시간 수집 데이터 파이프라인 구축 (4) | 2022.05.02 |
대규모 데이터의 카프카 프로듀서 성능 향상 방법 (2) | 2022.04.14 |
[번역]카프카 스트림즈 운영하기(상태 복구) (0) | 2022.04.04 |
카프카 커넥트의 태스크 밸런싱 로직, DistributedHerder(양치기) 그리고 IncrementalCooperativeAssignor 내부 동작 소개 (0) | 2022.03.23 |