Kafka는 분산 메시징 플랫폼으로 폭넓은 확장성과 우수한 성능을 가진다. Kafka의 간단한 사용을 위해 Spring boot를 사용하여 consumer, producer개념을 익힐 수 있다. Spring boot의 scheduler기능을 통해서 producer가 kafka에 topic을 내려 주면, subscribe하고 있는 consumer가 해당 메시지를 받는 형태로 만들 것이다.
Spring boot scheduler와 kafka의 연동 구성도
Kafka의 설치과정은 아래 posting에서 확인할 수 있다.
Macbook에 Kafka 1분만에 설치하기(바로가기)
프로젝트 directory는 intellij의 spring boot default 설정을 따라간다.
이번 포스팅에서는 gradle을 사용하여 spring boot를 구성하고자 한다. gradle에 대한 자세한 설명은 Gradle build tool 4.0 가이드(바로가기) 포스팅에서 확인 가능하다.
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:2.0.5.RELEASE")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
bootJar {
baseName = 'gs-scheduling-tasks'
version = '0.1.0'
}
repositories {
mavenCentral()
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
dependencies {
compile("org.springframework.boot:spring-boot-starter")
compile "org.springframework.kafka:spring-kafka:2.1.10.RELEASE"
testCompile("org.springframework.boot:spring-boot-starter-test")
}
spring.kafka.consumer.group-id=kafka-intro
spring.kafka.bootstrap-servers=localhost:9092
package hello;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) throws Exception {
SpringApplication.run(Application.class);
}
}
kafka연동의 핵심적인 부분이다. 크게 두가지 역할을 하는 것으로 볼 수 있다.
1) 1000ms마다 producer는 "helloworld"+now Date() format의 데이터를 send.
2) consumer는 "test" topic에 들어오는 모든 메시지를 가져와서 log를 남김.
package hello;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class ScheduledTasks {
private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
@Autowired
private KafkaTemplate<string, string=""> kafkaTemplate;
public void send(String topic, String payload) {
kafkaTemplate.send(topic, payload);
log.info("Message: " + payload + " sent to topic: " + topic);
}
@Scheduled(fixedRate = 1000)
public void reportCurrentTime() {
send("test", "helloworld " + dateFormat.format(new Date()));
}
@KafkaListener(topics = "test")
public void receiveTopic1(ConsumerRecord consumerRecord) {
log.info("Receiver on topic1: "+consumerRecord.toString());
}
}
</string,>
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.5.RELEASE)
2018-11-06 20:37:52.755 INFO 58130 --- [ main] hello.Application : Starting Application on 1003855ui-MacBook-Pro.local with PID 58130
...
...
...
2018-11-06 20:37:53.925 INFO 58130 --- [pool-1-thread-1] hello.ScheduledTasks : Message: helloworld 20:37:53 sent to topic: test
2018-11-06 20:37:54.038 INFO 58130 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=kafka-intro] Successfully joined group with generation 1
2018-11-06 20:37:54.039 INFO 58130 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=kafka-intro] Setting newly assigned partitions [test-1, test-0, test-3, test-2, test-13, test-12, test-15, test-14, test-17, test-16, test-19, test-18, test-5, test-4, test-7, test-6, test-9, test-8, test-11, test-10]
2018-11-06 20:37:54.061 INFO 58130 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [test-1, test-0, test-3, test-2, test-13, test-12, test-15, test-14, test-17, test-16, test-19, test-18, test-5, test-4, test-7, test-6, test-9, test-8, test-11, test-10]
2018-11-06 20:37:54.744 INFO 58130 --- [pool-1-thread-1] hello.ScheduledTasks : Message: helloworld 20:37:54 sent to topic: test
2018-11-06 20:37:54.769 INFO 58130 --- [ntainer#0-0-C-1] hello.ScheduledTasks : Receiver on topic1: ConsumerRecord(topic = test, partition = 5, offset = 4, CreateTime = 1541504274743, serialized key size = -1, serialized value size = 19, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = helloworld 20:37:54)
| 아파치 Kafka Consumer의 데이터 처리 내부 architecture 설명 및 튜닝포인트 (4) | 2018.12.24 |
|---|---|
| 아파치 Kafka Producer의 데이터 처리 내부 architecture 설명 및 튜닝포인트 (0) | 2018.12.24 |
| 빅 데이터 처리를 위한 아파치 Kafka 개요 및 설명 (0) | 2018.12.24 |
| Spring boot에서 kafka 사용시 application.yaml 설정 (0) | 2018.11.06 |
| Macbook에 Kafka 1분만에 설치하기 (0) | 2018.11.06 |
| Kafka opensource 분석을 통한 replication assignment 로직 확인 (0) | 2018.09.28 |