본문 바로가기

빅데이터/Kafka

카프카 스트림즈 join 사용시 메시지 키 접근하기

카프카 스트림즈에서 KStream 또는 KTable을 사용하여 join을 사용할 때가 있습니다.

KStream<String, String> completedEventsStream = leftStream.
        join(
            rightStream,
            (leftValue, rightValue) -> leftValue + rightValue,
            JoinWindows.of(windowDuration)
        );

상기와 같이 leftStream과 rightStream 2개의 스트림데이터를 조인하는 것은 아주 일반적인 조인 사용 예시인데요. 여기서 lambda 식을 보면 알 수 있다 싶이 leftValue와 rightValue에만 접근이 가능합니다. 즉, 2개의 토픽에서 조인이 되는 조인 key에 대해서는 접근이 불가능하다는 것을 알 수 있습니다. 2개의 토픽에 각각의 레코드의 메시지 값만 조합을 하는 것이죠.

 

현재는 join을 수행시에 ValueJoiner에서는 메시지 키에 접근할 수 없습니다. 이를 해결하고 개선하기 위해 KIP-149에서는 ValueJoiner, ValueTransformer, ValueMapper에서 메시지 키를 참조할 수 있도록 제안했습니다. 

https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner 

 

KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner - Apache Kafka - Apache Software Foundation

Status Current state: "accepted" Discussion thread: HERE JIRA: KAFKA-4218, KAFKA-4726, KAFKA-3745, KAFKA-7842, KAFKA-7843 Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). The PR ca

cwiki.apache.org

- Key access to ValueTransformer: While transforming values via KStream.transformValues and ValueTransformer, the key associated with the value may be needed, even if it is not changed. For instance, it may be used to access stores. As of now, the key is not available within these methods and interfaces, leading to the use of KStream.transform and Transformer, and the unnecessary creation of new KeyValue objects.
- Key access to ValueMapper: ValueMapper should have read-only access to the key for the value it is mapping. Sometimes the value transformation will depend on the key. It is possible to do this with a full blown KeyValueMapper but that loses the promise that you won't change the key – so you might introduce a re-keying phase that is totally unnecessary.
- Key access to ValueJoiner interface: In working with Kafka Stream joining, it's sometimes the case that a join key is not actually present in the values of the joins themselves (if, for example, a previous transform generated an ephemeral join key.) In such cases, the actual key of the join is not available in the ValueJoiner implementation to be used to construct the final joined value. This can be worked around by explicitly threading the join key into the value if needed, but it seems like extending the interface to pass the join key along as well would be helpful 

그러므로 현재는 조인 key에 접근하는 방법은 2가지 방법이 있습니다.

 

방법1 : 조인이 완료된 이후에 스트림의 메시지 키 살펴보기

방법2 : 프로세서API를 사용하여 조인 구현하기

 

위 2가지 방법 외에는 KIP-419를 기다리는 방법외에는 없습니다.

 

참고 링크 : https://stackoverflow.com/questions/56171906/get-record-key-with-inner-join-in-kafka-streams-dsl

 

 

반응형