Connect Framework offers REST API that is used to mange the lifecycle of the connector. Its imperative in most enterprises to secure the API and also add authorization to the end points. We could add the ability for authentication and authorization in the framework. But the security requirements are so broad that it's not practical to support all of them in the framework. Hence we must provide ability for users to plug resources that help achieve the required capabilities.
While security is prime use cases for this extension. Its not limited to that. Some of the common use cases are
- Build a custom Authentication filter
- Build a custom Authorization filter
- Complex extensions can even provide filters that rewrite/validate the connector requests to enforce additional constraints on the connector configurations
https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin
KIP-285: Connect Rest Extension Plugin - Apache Kafka - Apache Software Foundation
Status Current state: Accepted Discussion thread: here JIRA: KAFKA-6776 - 이슈 세부사항 가져오는 중... 상태 PR : https://github.com/apache/kafka/pull/4931 Released: 2.0.0 Motivation Connect Framework offers REST API that is used to mange t
cwiki.apache.org
Debezium 사용 예제
GitHub - debezium/debezium-ui: A web UI for Debezium; Please log issues at https://issues.redhat.com/browse/DBZ.
A web UI for Debezium; Please log issues at https://issues.redhat.com/browse/DBZ. - GitHub - debezium/debezium-ui: A web UI for Debezium; Please log issues at https://issues.redhat.com/browse/DBZ.
github.com
@POST
@Path("/connectors/{connector-name}/restart")
@Consumes("application/json")
@Produces("application/json")
Response restartConnector(@PathParam("connector-name") String connectorName) throws ProcessingException, IOException;
@POST
@Path("/connectors/{connector-name}/tasks/{task-number}/restart")
@Consumes("application/json")
@Produces("application/json")
Response restartConnectorTask(@PathParam("connector-name") String connectorName, @PathParam("task-number") int taskNumber) throws ProcessingException, IOException;
// 새로운 endpoint
@GET
@Path("/debezium/transforms")
@Produces("application/json")
List<TransformsInfo> listTransforms() throws ProcessingException, IOException;
@GET
@Path("/debezium/topic-creation")
@Produces("application/json")
Boolean isTopicCreationEnabled() throws ProcessingException, IOException;
추가적인 rest endpoint를 활용한다.
GitHub - debezium/debezium: Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/brows
Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ. - GitHub - debezium/debezium: Change data capture for a variety of databases. Please log i...
github.com
@GET
@Path("/transforms")
public List<TransformsInfo> listTransforms() {
return this.getTransforms();
}
private synchronized List<TransformsInfo> getTransforms() {
if (this.transforms.isEmpty()) {
for (PluginDesc<Transformation<?>> plugin : herder.plugins().transformations()) {
if ("org.apache.kafka.connect.runtime.PredicatedTransformation".equals(plugin.className())) {
this.transforms.add(new TransformsInfo(HasHeaderKey.class.getName(), (new HasHeaderKey<>().config())));
this.transforms.add(new TransformsInfo(RecordIsTombstone.class.getName(), (new RecordIsTombstone<>().config())));
this.transforms.add(new TransformsInfo(TopicNameMatches.class.getName(), (new TopicNameMatches<>().config())));
}
else {
this.transforms.add(new TransformsInfo(plugin));
}
}
}
return Collections.unmodifiableList(this.transforms);
}
@GET
@Path("/topic-creation")
public boolean getTopicCreationEnabled() {
return this.isTopicCreationEnabled;
}
private synchronized Boolean isTopicCreationEnabled() {
Version kafkaConnectVersion = parseVersion(AppInfoParser.getVersion());
String topicCreationProperty = (String) config.get("topic.creation.enable");
if (null == topicCreationProperty) { // when config is not set, default to true
topicCreationProperty = "true";
}
return TOPIC_CREATION_KAFKA_VERSION.compareTo(kafkaConnectVersion) <= 0
&& Boolean.parseBoolean(topicCreationProperty);
}
'빅데이터 > Kafka' 카테고리의 다른 글
모던 데이터 플로우: 데이터 파이프라인을 잘 운영하는 방법 (1) | 2023.06.12 |
---|---|
아파치 카프카 브로커 설정에서 listener와 advertised.isteners의 차이? (1) | 2023.03.26 |
windows의 WSL환경에서 아파치 카프카 설치, 실행하는 방법 (0) | 2023.02.24 |
카프카 커넥터의 태스크에 Priority를 부여할 수 없을까? (0) | 2022.10.04 |
토픽을 GlobalKTable 구체화된 뷰(view) 키-값 저장소로 사용시 특이점 및 주의사항 (0) | 2022.09.15 |
카프카 커넥트/커넥터 내부 살펴보기 - 2.8기준, sinkTask 위주로 (2) | 2022.08.30 |