본문 바로가기

빅데이터/Kafka

카프카 스트림즈 KTable로 선언한 토픽을 key-value 테이블로 사용하기

카프카 스트림즈의 KTable은 토픽의 데이터를 key-value형태로 사용할 수 있도록 구체화된 뷰(Materialized View)를 제공합니다. 구현방법은 다음과 같습니다.

 

0. 카프카 스트림즈 디펜던시 추가

dependencies {
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'

    compile 'org.slf4j:slf4j-simple:1.7.30'

    implementation "org.apache.kafka:kafka-streams:2.8.0"
}

이 예제는 2.8.0을 기준으로 작성하였습니다.

1. 카프카 스트림즈 설정

Properties configs = new Properties();
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-materialized-view");
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
configs.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

기본 스트림즈 설정

2. 스트림즈 토폴로지 설정

StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> addressTable = builder.table("address",Materialized.as("address"));

구체화된 뷰를 만들 토픽을 KTable로 선언합니다. 파라미터로 Materalized.as()도 추가합니다. 결과적으로 Materialized 인스턴스로 생성된 KeyValueStore가 로컬에 생성됩니다. 추가적으로 changelog도 생성됩니다. 

 

3. 토폴로지 실행

KafkaStreams streams = new KafkaStreams(builder.build(), configs);
streams.start();

토폴로지와 설정을 입력합니다

4. 구체화된 뷰 가져오기

view = streams.store(StoreQueryParameters.fromNameAndType("address", QueryableStoreTypes.keyValueStore()));

여기서 주의해야할 점은 streams가 시작되고 난 이후에 일정시간이 지나야 Materialized View가 생성된다는 점입니다. 생성되고 난 이후에 streams.store()메서드를 호출해야 정상적으로 동작합니다. 그렇지 않을 경우에는 다음과 같은 에러가 발생합니다.

Exception in thread "Timer-0" org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store address because the stream thread is STARTING, not RUNNING
	at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
	at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
	at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.all(CompositeReadOnlyKeyValueStore.java:119)
	at com.example.stateful.MaterializedTable$ScheduledJob.run(MaterializedTable.java:60)
	at java.util.TimerThread.mainLoop(Timer.java:555)
	at java.util.TimerThread.run(Timer.java:505)

상기 에러는 KTable로 선언한 Materialized View가 준비되지 않은 상태라는 뜻입니다.

5. keyValue스토어 가져오기

KeyValueIterator<String, String> address = view.all();
address.forEachRemaining(keyValue -> log.info(keyValue.toString()));

결과는 다음과 같이 key-value 형태로 출력됩니다.

[Timer-0] INFO com.example.stateful.MaterializedTable - KeyValue(somin, pusan)
[Timer-0] INFO com.example.stateful.MaterializedTable - KeyValue(cory, newyork)

필요시에는 get()메서드를 사용하여 hashMap의 value를 얻어오는 형태로도 사용할 수 있습니다.

반응형