카프카 커넥트는 카프카 클러스터와 기타 데이터베이스간 파이프라인을 반복적으로 만드는데 특화되어 있습니다. 카프카 커넥터는 파이프라인의 구현체 인데요. 싱크 커넥터와 소스 커넥터로 이루어져 있습니다.
여기서는 카우치 베이스싱크 커넥터를 살펴봅니다. 카우치베이스 싱크 커넥터는 카프카의 토픽을 카우치베이스로 저장 로직이 담긴 커넥터입니다. 카우치베이스 싱크 커넥터는 at least once 전달을 지원하며 중복이 발생했을 경우에는 재입수가 필요할 수도 있습니다.
카프카 싱크 커넥터를 사용하려면 깃허브 레포지토리에 있는 배포판을 다운받아서 커넥터에 포함시켜 사용할 수 있습니다.
- 카우치베이스 커넥터 깃헙 : https://github.com/couchbase/kafka-connect-couchbase
- 카우치베이스 배포판 다운로드 : https://docs.couchbase.com/kafka-connector/current/release-notes.html
카우치 베이스 커넥트 설치
1) 카우치 베이스 커넥터 다운로드
커넥터는 Realease Note에서 다운로드 가능
https://docs.couchbase.com/kafka-connector/current/release-notes.html
다운로드 받으면 다음과 같이 파일이 위치합니다.
$ tree
.
├── assets
│ └── couchbase_logo.png
├── doc
│ ├── LICENSE
│ └── README.adoc
├── etc
│ ├── migrate-config-3-to-4.sh
│ ├── quickstart-couchbase-sink.json
│ ├── quickstart-couchbase-sink.properties
│ ├── quickstart-couchbase-source.json
│ └── quickstart-couchbase-source.properties
├── lib
│ ├── HdrHistogram-2.1.12.jar
│ ├── LatencyUtils-2.0.3.jar
│ ├── core-io-2.2.0.jar
│ ├── dcp-client-0.37.0.jar
│ ├── java-client-3.2.0.jar
│ ├── jsoup-1.14.2.jar
│ ├── kafka-connect-couchbase-4.1.3.jar
│ ├── metrics-core-4.0.7.jar
│ ├── metrics-jmx-4.0.7.jar
│ ├── micrometer-core-1.5.5.jar
│ ├── micrometer-registry-jmx-1.5.5.jar
│ ├── reactive-streams-1.0.3.jar
│ ├── reactor-core-3.4.6.jar
│ ├── slf4j-api-1.7.30.jar
│ └── therapi-runtime-javadoc-0.12.0.jar
└── manifest.json
위 파일들 중 kafka-connect-couchbase-4.1.3.jar는 커넥터 파일이고 나머지는 실행에 필요한 라이브러리 파일들입니다.
2-1) 커넥터 파일 위치 설정
$ tree
.
└── connector-plugins
└── kafka-connect-couchbase-4.1.3.jar
커넥터 파일은 커넥트에서 설정한 디렉토리에 커넥터 jar파일들과 함께 둡니다.
2-2) 라이브러리 파일 위치 설정
$ tree
.
├── LICENSE
├── NOTICE
├── libs
│ ├── activation-1.1.1.jar
│ ├── aopalliance-repackaged-2.6.1.jar
│ ├── argparse4j-0.7.0.jar
│ ├── audience-annotations-0.5.0.jar
│ ├── commons-cli-1.4.jar
│ ├── commons-lang3-3.8.1.jar
│ ├── connect-api-2.7.0.jar
│ ├── connect-basic-auth-extension-2.7.0.jar
라이브러리 파일은 카프카 커넥트를 실행하는 카프카 바이너리 디렉토리 내부에 위치합니다. 카프카 커넥트를 실행시 여기 있는 jar파일을 포함하여 실행됩니다.
카우치 베이스 싱크 커넥트 설정
아래 설정 중 bold체는 필수 설정입니다. 나머지는 기본값으로 자동 설정됩니다.
https://docs.couchbase.com/kafka-connector/current/sink-configuration-options.html
Connection 설정
- couchbase.seed.nodes : 카우치베이스 서버 노드들을 콤마로 나누어 입력합니다
- couchbase.username : 카우치베이스 유저 id 값을 입력합니다
- couchbase.password : 카우칩에이스 유저의 password 값을 입력합니다
- couchbase.bucket : 카우치베이스에 연동할 버킷 이름을 입력합니다
- couchbase.network
- couchbase.bootstrap.timeout
Security 설정
- couchbase.enable.tls
- couchbase.enable.hostname.verification
- couchbase.trust.store.path
- couchbase.trust.store.password
- couchbase.trust.certificate.path
- couchbase.client.certificate.path
- couchbase.client.certificate.passowrd
Logging 설정
- couchbase.log.redaction
- couchbase.log.document.lifecycle
Sink Behavior 설정
- couchbase.default.collection
- couchbase.topic.to.collection
- couchbase.sink.handler
- couchbase.document.id
- couchbase.remove.document.id
- couchbase.document.expiration
Durability 설정
- couchbase.durability
- couchbase.persist.to
- couchbase.replicate.to
N1ql Sink Handler 설정
- couchbase.n1ql.operation
- couchbase.n1ql.where.fields
- couchbase.nq1l.create.document
Sub Document Sink Handler 설정
- couchbase.subdocument.path
- couchbase.subdocument.operation
- couchbase.subdocument.create.path
- couchbase.subdocument.create.document
Couchbase Java SDK 설정
- couchbase.env.*
카우치 베이스 싱크 커넥터 파이프라인 추가
POST http://localhost:8083/connectors
{
"name": "couchbase-sink-test",
"config": {
"connector.class": "com.couchbase.connect.kafka.CouchbaseSinkConnector",
"tasks.max": "1",
"topics": "sink-test",
"couchbase.seed.nodes": "local-couchbase",
"couchbase.bucket": "test",
"couchbase.username": "admin",
"couchbase.password": "admin",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
org.apache.kafka.connect.json.JsonConverter로 컨버터를 설정할 경우 String으로 들어온 json 값을 json으로 포맷을 변경하여 카우치 베이스에 데이터를 저장합니다.
'빅데이터 > Kafka' 카테고리의 다른 글
confluent developer certification 예시 문제 해설 (0) | 2021.12.15 |
---|---|
confluent HdfsSinkConnector 파티셔너 설명 (0) | 2021.11.25 |
macos에서 podman으로 rest-proxy 실행하기 (0) | 2021.11.16 |
카프카 스트림즈에서 stateful window 처리를 다루는 방법 그리고 커밋타이밍 (4) | 2021.09.08 |
카프카 스트림즈로 schedule operation 수행하기(번역) (3) | 2021.07.23 |
카프카 스트림즈에서 SlidingWindow에 대한 고찰 (0) | 2021.06.25 |