org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)\nCaused by: java.lang.NullPointerException
위 이슈는 Task클래스 내부에서 이슈가 발생하여 태스크가 중단된 상태를 뜻한다. 커넥트 로그를 확인하여 어떤 에러가 발생하였는지 확인하면 된다.
예를 들어 SinkConnector의 record가 null이 왔을 때 충분히 대응하지 못했을 경우가 있다. try, catch문으로 에러 구문을 충분히 감싸지 않는다면 위 에러와 함께 태스크가 종료(중단)된 상태로 대기할 수 있다.
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
if (record.value() != null) {
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
카프카 커넥터의 태스크에 Priority를 부여할 수 없을까? (0) | 2022.10.04 |
---|---|
토픽을 GlobalKTable 구체화된 뷰(view) 키-값 저장소로 사용시 특이점 및 주의사항 (0) | 2022.09.15 |
카프카 커넥트/커넥터 내부 살펴보기 - 2.8기준, sinkTask 위주로 (2) | 2022.08.30 |
WINDOW STORE CHANGE LOG ADDITIONAL RETENTION MS CONFIG 옵션 분석 (0) | 2022.06.15 |
초급자를 위한 [아파치 카프카 애플리케이션 개발]온라인 강의를 출시하였습니다. (0) | 2022.06.02 |
카프카를 활용한레이싱카 센서 실시간 수집 데이터 파이프라인 구축 (4) | 2022.05.02 |