Connect Framework offers REST API that is used to mange the lifecycle of the connector. Its imperative in most enterprises to secure the API and also add authorization to the end points. We could add the ability for authentication and authorization in the framework. But the security requirements are so broad that it's not practical to support all of them in the framework. Hence we must provide ability for users to plug resources that help achieve the required capabilities.
While security is prime use cases for this extension. Its not limited to that. Some of the common use cases are
- Build a custom Authentication filter
- Build a custom Authorization filter
- Complex extensions can even provide filters that rewrite/validate the connector requests to enforce additional constraints on the connector configurations
https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin
Debezium 사용 예제
@POST
@Path("/connectors/{connector-name}/restart")
@Consumes("application/json")
@Produces("application/json")
Response restartConnector(@PathParam("connector-name") String connectorName) throws ProcessingException, IOException;
@POST
@Path("/connectors/{connector-name}/tasks/{task-number}/restart")
@Consumes("application/json")
@Produces("application/json")
Response restartConnectorTask(@PathParam("connector-name") String connectorName, @PathParam("task-number") int taskNumber) throws ProcessingException, IOException;
// 새로운 endpoint
@GET
@Path("/debezium/transforms")
@Produces("application/json")
List<TransformsInfo> listTransforms() throws ProcessingException, IOException;
@GET
@Path("/debezium/topic-creation")
@Produces("application/json")
Boolean isTopicCreationEnabled() throws ProcessingException, IOException;
추가적인 rest endpoint를 활용한다.
@GET
@Path("/transforms")
public List<TransformsInfo> listTransforms() {
return this.getTransforms();
}
private synchronized List<TransformsInfo> getTransforms() {
if (this.transforms.isEmpty()) {
for (PluginDesc<Transformation<?>> plugin : herder.plugins().transformations()) {
if ("org.apache.kafka.connect.runtime.PredicatedTransformation".equals(plugin.className())) {
this.transforms.add(new TransformsInfo(HasHeaderKey.class.getName(), (new HasHeaderKey<>().config())));
this.transforms.add(new TransformsInfo(RecordIsTombstone.class.getName(), (new RecordIsTombstone<>().config())));
this.transforms.add(new TransformsInfo(TopicNameMatches.class.getName(), (new TopicNameMatches<>().config())));
}
else {
this.transforms.add(new TransformsInfo(plugin));
}
}
}
return Collections.unmodifiableList(this.transforms);
}
@GET
@Path("/topic-creation")
public boolean getTopicCreationEnabled() {
return this.isTopicCreationEnabled;
}
private synchronized Boolean isTopicCreationEnabled() {
Version kafkaConnectVersion = parseVersion(AppInfoParser.getVersion());
String topicCreationProperty = (String) config.get("topic.creation.enable");
if (null == topicCreationProperty) { // when config is not set, default to true
topicCreationProperty = "true";
}
return TOPIC_CREATION_KAFKA_VERSION.compareTo(kafkaConnectVersion) <= 0
&& Boolean.parseBoolean(topicCreationProperty);
}
반응형
'빅데이터 > Kafka' 카테고리의 다른 글
모던 데이터 플로우: 데이터 파이프라인을 잘 운영하는 방법 (1) | 2023.06.12 |
---|---|
아파치 카프카 브로커 설정에서 listener와 advertised.isteners의 차이? (1) | 2023.03.26 |
windows의 WSL환경에서 아파치 카프카 설치, 실행하는 방법 (0) | 2023.02.24 |
카프카 커넥터의 태스크에 Priority를 부여할 수 없을까? (0) | 2022.10.04 |
토픽을 GlobalKTable 구체화된 뷰(view) 키-값 저장소로 사용시 특이점 및 주의사항 (0) | 2022.09.15 |
카프카 커넥트/커넥터 내부 살펴보기 - 2.8기준, sinkTask 위주로 (2) | 2022.08.30 |