카프카 스트림즈는 상태기반/비상태기반 데이터 처리에 효과적인 라이브러리입니다. 특히 상태 기반 처리에 큰 도움이 되는데 여러 상태 기반 처리 중 SlidingWindow에 대해 살펴보고자 합니다.
SlidingWindow는 window종류 중 하나로서 일정 시간동안의 데이터들의 집합에 대해 연산을 하는 것을 뜻합니다.
마치 베란다의 슬라이딩 윈도우가 옆으로 지나가는 듯한 모습과 비슷합니다.
카프카 스트림즈에서는 aggregation연산 또는 join연산을 할 때 window를 적용할 수 있는데 총 4개의 윈도우를 지원합니다. 각 윈도우 이름과 특징은 다음과 같습니다.
- 호핑 윈도우 : 고정적인 사이즈의 윈도우, 윈도우끼리 겹치는 부분이 있음
- 텀블링 윈도우 : 고정적인 사이즈의 윈도우, 윈도우끼리 겹치는 부분이 없음
- 슬라이딩 윈도우 : 고정적인 사이즈의 윈도우, 레코드의 timestamp값에 따라 window가 겹칠 수도 안겹칠 수도 있음
- 세션 윈도우 : 윈도우 사이즈가 동적으로 변경됨. 데이터기반 윈도우
추가적으로 suppress()라고 불리는 윈도우 종료 연산도 지원합니다. 윈도우 종료 연산은 윈도우가 종료될 때 결괏값을 주는 데에 목적이 있습니다.
이 중 슬라이딩 윈도우에 대해 자세히 살펴보겠습니다. 고정된 사이즈의 윈도우로 이루어진 윈도우가 시간을 따라 계속 해서 움직이는 것을 뜻합니다. 만약 2개 이상의 레코드들이 동일한 윈도우 사이에 위치하게 되면 함께 연산이 이루어지게 됩니다. 그리고 윈도우가 시간에 따라 지날 때 레코드는 여러 번 연산에 참조될 수 있습니다.
위 그림에서 주의해야할 점은 윈도우에 속한 레코드의 개수가 달라지는 경우에만 윈도우 연산이 발동하는 것입니다. 예를 들어 아래와 같이 레코드가 2개가 있고 윈도우가 이동하더라도 단 한번만 연산이 발동합니다.
다음과 같은 레코드가 있을 경우를 예로 들 수 있습니다.
(timeDifferenceMs = 5000ms)
이 경우 아래와 같이 5번의 연산이 일어납니다.
- window [3000;8000] contains [1] (created when first record enters the window)
- window [4200;9200] contains [1,2] (created when second record enters the window)
- window [7400;12400] contains [1,2,3] (created when third record enters the window)
- window [8001;13001] contains [2,3] (created when the first record drops out of the window)
- window [9201;14201] contains [3] (created when the second record drops out of the window)
슬라이딩 윈도우는 다음과 같은 형태로 구현 됩니다.
import org.apache.kafka.streams.kstream.SlidingWindows;
// A sliding time window with a time difference of 10 minutes and grace period of 30 minutes
Duration timeDifferenceMs = Duration.ofMinutes(10);
Duration gracePeriodMs = Duration.ofMinutes(30);
SlidingWindows.withTimeDifferenceAndGrace(timeDifferenceMs,gracePeriodMs);
슬라이딩 윈도우의 윈도우 사이즈를 구하기 위해서는 다음과 같이 2가지 파라미터가 필요합니다.
- timeDifferenceMs : 서로 다른 두개의 레코드가 동일 윈도우에 들어가기 위한 최대 시간 간격
- gracePeriodMs : 레코드간에 timestamp의 순서를 허용하기 위한 시간
이 중 gace period의 동작은 독특한데요. 윈도우에서 레코드가 무조건 순서가 완벽하게 보장하지 않을 수 없습니다. 프로듀서가 여러개이고 파티션이 여러개인 이상 동일한 메시지 키라도 timestamp의 순서가 변경될 수 있습니다. 그래서 서로 다른 레코드 간의 timestamp가 일부 차이가 나더라도 허용을 하기 위해 사용됩니다.
참고자료
- https://kafka.apache.org/28/documentation/streams/developer-guide/dsl-api.html#windowing
- https://kafka-tutorials.confluent.io/sliding-windows/kstreams.html
- https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/TimeWindows.html
'빅데이터 > Kafka' 카테고리의 다른 글
couchbase 카프카 싱크 커넥트 사용 방법 (0) | 2021.10.29 |
---|---|
카프카 스트림즈에서 stateful window 처리를 다루는 방법 그리고 커밋타이밍 (4) | 2021.09.08 |
카프카 스트림즈로 schedule operation 수행하기(번역) (3) | 2021.07.23 |
카프카 스트림즈 join 사용시 메시지 키 접근하기 (0) | 2021.06.21 |
Cannot get state store TOPIC because the stream thread is STARTING, not RUNNING 에러 해결 (0) | 2021.06.16 |
카프카 스트림즈 KTable로 선언한 토픽을 key-value 테이블로 사용하기 (0) | 2021.06.16 |