1. 스키마 레지스트리 설정, 실행
confluent-7.0.0/etc/schema-registry/schema-registry.properties 설정파일
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
kafkastore.topic=_schemas
debug=false
스키마 레지스트리 실행
$ bin/schema-registry-start etc/schema-registry/schema-registry.properties
[2021-12-16 21:36:27,121] INFO SchemaRegistryConfig values:
access.control.allow.headers =
access.control.allow.methods =
access.control.allow.origin =
access.control.skip.options = true
authentication.method = NONE
authentication.realm =
authentication.roles = [*]
...
[2021-12-16 21:36:30,410] INFO Started NetworkTrafficServerConnector@7a4ccb53{HTTP/1.1, (http/1.1)}{0.0.0.0:8081} (org.eclipse.jetty.server.AbstractConnector:331)
[2021-12-16 21:36:30,410] INFO Started @5668ms (org.eclipse.jetty.server.Server:415)
[2021-12-16 21:36:30,411] INFO Schema Registry version: 7.0.0 commitId: b16f82f4c22bd32ed24a8e935bf90ac30ee69c5c (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)
[2021-12-16 21:36:30,411] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:47)
default로 설정되는 schema.compatibility.level값은 backward이다. backward 호환성은 프로듀서에서 버전n-1 또는 버전n의 스키마가 전달될 때 컨슈머는 버전n을 기준으로 처리하는 것을 뜻한다.
스키마 레지스트리 접속 및 확인
$ curl localhost:8081
{}
$ curl localhost:8081/subjects
[]
2. 토픽 생성
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --list
test
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic test --describe
Topic: test PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
3. 프로듀서로 데이터 전송
build.gradle
plugins {
id 'java'
}
group 'org.com.example'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
maven {
url "https://packages.confluent.io/maven"
}
}
dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'org.slf4j:slf4j-simple:1.7.30'
implementation 'io.confluent:kafka-avro-serializer:7.0.1'
}
test {
useJUnitPlatform()
}
build.gradle 설정시 confluent의 maven repository를 추가해야 정상적으로 라이브러리를 가져온다.
Main.java
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
public class Main {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
configs.setProperty("schema.registry.url", "http://localhost:8081");
String schema = "{"
+ "\"namespace\": \"myrecord\","
+ " \"name\": \"orders\","
+ " \"type\": \"record\","
+ " \"fields\": ["
+ " {\"name\": \"orderTime\", \"type\": \"long\"},"
+ " {\"name\": \"orderId\", \"type\": \"long\"},"
+ " {\"name\": \"itemId\", \"type\": \"string\"}"
+ " ]"
+ "}";
Schema.Parser parser = new Schema.Parser();
Schema avroSchema1 = parser.parse(schema);
// generate avro generic record
GenericRecord avroRecord = new GenericData.Record(avroSchema1);
avroRecord.put("orderTime", System.nanoTime());
avroRecord.put("orderId", new Random().nextLong());
avroRecord.put("itemId", UUID.randomUUID().toString());
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(configs);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(TOPIC_NAME, avroRecord);
producer.send(record);
producer.flush();
producer.close();
}
}
4. 스키마 레지스트리에 등록된 내용 확인
$ curl localhost:8081/subjects
["test-value"]
$ curl localhost:8081/subjects/test-value/versions
[1]
$ curl localhost:8081/subjects/test-value/versions/1
{
"subject": "test-value",
"version": 1,
"id": 1,
"schema": "{\"type\":\"record\",\"name\":\"orders\",\"namespace\":\"myrecord\",\"fields\":[{\"name\":\"orderTime\",\"type\":\"long\"},{\"name\":\"orderId\",\"type\":\"long\"},{\"name\":\"itemId\",\"type\":\"string\"}]}"
}
subject이름이 {토픽명}-value로 자동생성되었다. 신규 생성되었으므로 버전과 아이디는 1
5. 컨슈머에서 데이터 확인
Main.java
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Main {
private final static Logger logger = LoggerFactory.getLogger(Main.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put("schema.registry.url", "http://localhost:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, GenericRecord> record : records) {
logger.info("=====");
logger.info("record:{}", record);
logger.info("value:{}", record.value().toString());
}
}
}
}
실행 로그
9:44:11 오후: Executing task ':Main.main()'...
> Task :compileJava
> Task :processResources NO-SOURCE
> Task :classes
> Task :Main.main()
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-test-group-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
[main] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values:
auto.register.schemas = true
avro.reflection.allow.null = false
avro.use.logical.type.converters = false
basic.auth.credentials.source = URL
basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
bearer.auth.token = [hidden]
context.name.strategy = class io.confluent.kafka.serializers.context.NullContextNameStrategy
id.compatibility.strict = true
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
latest.compatibility.strict = true
max.schemas.per.subject = 1000
normalize.schemas = false
proxy.host =
proxy.port = -1
schema.reflection = false
schema.registry.basic.auth.user.info = [hidden]
schema.registry.ssl.cipher.suites = null
schema.registry.ssl.enabled.protocols = [TLSv1.2]
schema.registry.ssl.endpoint.identification.algorithm = https
schema.registry.ssl.engine.factory.class = null
schema.registry.ssl.key.password = null
schema.registry.ssl.keymanager.algorithm = SunX509
schema.registry.ssl.keystore.certificate.chain = null
schema.registry.ssl.keystore.key = null
schema.registry.ssl.keystore.location = null
schema.registry.ssl.keystore.password = null
schema.registry.ssl.keystore.type = JKS
schema.registry.ssl.protocol = TLSv1.2
schema.registry.ssl.provider = null
schema.registry.ssl.secure.random.implementation = null
schema.registry.ssl.trustmanager.algorithm = PKIX
schema.registry.ssl.truststore.certificates = null
schema.registry.ssl.truststore.location = null
schema.registry.ssl.truststore.password = null
schema.registry.ssl.truststore.type = JKS
schema.registry.url = [http://localhost:8081]
specific.avro.reader = false
use.latest.version = false
use.schema.id = -1
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 7.0.1-ccs
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: b7e52413e7cb3e8b
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1639658652510
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-group-1, groupId=test-group] Subscribed to topic(s): test
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-group-1, groupId=test-group] Cluster ID: k41kHM4_RP6i3Lx7MYeeXA
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Request joining group due to: need to re-join with the given member-id
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Successfully joined group with generation Generation{generationId=1, memberId='consumer-test-group-1-3947892d-396a-453c-94a3-cc2e7bfa36f6', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Finished assignment for group at generation 1: {consumer-test-group-1-3947892d-396a-453c-94a3-cc2e7bfa36f6=Assignment(partitions=[test-0, test-1, test-2])}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Successfully synced group in generation Generation{generationId=1, memberId='consumer-test-group-1-3947892d-396a-453c-94a3-cc2e7bfa36f6', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Notifying assignor about the new Assignment(partitions=[test-0, test-1, test-2])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Adding newly assigned partitions: test-1, test-0, test-2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Found no committed offset for partition test-1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Found no committed offset for partition test-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Found no committed offset for partition test-2
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-group-1, groupId=test-group] Resetting offset for partition test-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-group-1, groupId=test-group] Resetting offset for partition test-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-group-1, groupId=test-group] Resetting offset for partition test-2 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
[main] INFO com.example.Main - =====
[main] INFO com.example.Main - record:ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1639658381544, serialized key size = -1, serialized value size = 60, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"orderTime": 1473982448382288, "orderId": -7246011478677708355, "itemId": "33ce3a10-9277-4240-a7f0-db11bc4d11e3"})
[main] INFO com.example.Main - value:{"orderTime": 1473982448382288, "orderId": -7246011478677708355, "itemId": "33ce3a10-9277-4240-a7f0-db11bc4d11e3"}
6. kafka-console-consumer.sh로 데이터 확인
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
��탉����提À��H33ce3a10-9277-4240-a7f0-db11bc4d11e3
당연하게도 해당 데이터는 Avro로 직렬화되었으므로 String으로는 print가 불가능하다. 즉, 스키마 레지스트리 또는 이미 알고 있는 avroSchema 형태로 컨슈머를 운영해야 한다는 점이다.
7. 스키마 레지스트리에 신규 스키마 적용
$ echo '{
"type": "record",
"name": "orders",
"fields": [
{
"name": "orderTime",
"type": "long"
},
{
"name": "orderId",
"type": "long"
},
{
"name": "itemNo",
"type": "int",
"default": 0
}
]
}' | \
jq '. | {schema: tojson}' | \
curl -X POST http://localhost:8081/subjects/test-value/versions \
-H "Content-Type:application/json" \
-d @-
{"id":2}
$ curl http://localhost:8081/subjects/test-value/versions
[1,2]
$ curl http://localhost:8081/subjects/test-value/versions/2
{
"subject": "test-value",
"version": 2,
"id": 2,
"schema": "{\"type\":\"record\",\"name\":\"orders\",\"fields\": [{\"name\":\"orderTime\",\"type\":\"long\"},{\"name\":\"orderId\",\"type\":\"long\"},{\"name\":\"itemNo\",\"type\":\"int\",\"default\":0}]}"
}
스키마 레지스트리 관련 치트시트 참고 : https://rmoff.net/2019/01/17/confluent-schema-registry-rest-api-cheatsheet/
backward 호환성은 버전1과 버전2가 있을 경우 버전2로 저장되는 컨슈머에서 버전1과 버전2를 읽을 수 있는 것입니다. 즉, 하위 버전을 읽을 수 있는 것이 backward 호환성. 이때 가능한 형태는 필드가 삭제되거나 기본 값이 지정된 필드를 추가하는 경우입니다. 위 경우에서는 itemId를 삭제하고 default value(0)이 있는 itemNo가 신규 필드를 추가하였습니다.
정상적으로 전달되는지 확인하기 위해 2개의 스키마의 레코드를 전송해봅니다. 2개 스키마 레코드를 전송하기 위해 다음과 같이 프로듀서의 Main.java를 수정하였습니다.
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
configs.setProperty("schema.registry.url", "http://localhost:8081");
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(configs);
// 버전1 스키마 전송
String schema = "{"
+ "\"namespace\": \"myrecord\","
+ " \"name\": \"orders\","
+ " \"type\": \"record\","
+ " \"fields\": ["
+ " {\"name\": \"orderTime\", \"type\": \"long\"},"
+ " {\"name\": \"orderId\", \"type\": \"long\"},"
+ " {\"name\": \"itemId\", \"type\": \"string\"}"
+ " ]"
+ "}";
Schema.Parser parser = new Schema.Parser();
Schema avroSchema1 = parser.parse(schema);
GenericRecord avroRecord = new GenericData.Record(avroSchema1);
avroRecord.put("orderTime", System.nanoTime());
avroRecord.put("orderId", new Random().nextLong());
avroRecord.put("itemId", UUID.randomUUID().toString());
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(TOPIC_NAME, avroRecord);
producer.send(record);
// 버전2 스키마 전송
String schema2 = "{"
+ "\"namespace\": \"myrecord\","
+ " \"name\": \"orders\","
+ " \"type\": \"record\","
+ " \"fields\": ["
+ " {\"name\": \"orderTime\", \"type\": \"long\"},"
+ " {\"name\": \"orderId\", \"type\": \"long\"},"
+ " {\"name\": \"itemNo\", \"type\": \"int\"}"
+ " ]"
+ "}";
Schema.Parser parser2 = new Schema.Parser();
Schema avroSchema2 = parser2.parse(schema2);
GenericRecord avroRecord2 = new GenericData.Record(avroSchema2);
avroRecord2.put("orderTime", System.nanoTime());
avroRecord2.put("orderId", new Random().nextLong());
avroRecord2.put("itemNo", 123);
ProducerRecord<String, GenericRecord> record2 = new ProducerRecord<>(TOPIC_NAME, avroRecord2);
producer.send(record2);
producer.flush();
producer.close();
컨슈머에서 출력되는 데이터는 다음과 같습니다.
[main] INFO com.example.Main - =====
[main] INFO com.example.Main - record:ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1639659750486, serialized key size = -1, serialized value size = 59, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"orderTime": 1475351495838191, "orderId": -111837104074635996, "itemId": "921031d6-900f-4dd3-9a5f-2cdc984ae6bc"})
[main] INFO com.example.Main - value:{"orderTime": 1475351495838191, "orderId": -111837104074635996, "itemId": "921031d6-900f-4dd3-9a5f-2cdc984ae6bc"}
[main] INFO com.example.Main - =====
[main] INFO com.example.Main - record:ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1639659750587, serialized key size = -1, serialized value size = 24, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"orderTime": 1475351787197929, "orderId": 2076111075740945742, "itemNo": 123})
[main] INFO com.example.Main - value:{"orderTime": 1475351787197929, "orderId": 2076111075740945742, "itemNo": 123}
backward 호환성에서는 배포 순서가 중요하다. 신규 호환 되는 n+1 버전 스키마를 가져가는 컨슈머를 먼저 배포해서 호환성을 유지하면서 프로듀서를 n -> n+1로 배포해야 하는 것이다.
그런데 상기 테스트를 진행하면서 알 수 있다시피 GenericRecord를 확인할 때 컨슈머에서 deserialize시 엄격하게 보지 않는다 이를 해결하기 위해서는 다음과 같은 옵션을 추가해야한다.
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
On-Premises Schema Registry Tutorial | Confluent Documentation
Home Schema Management Schema Registry Tutorials Looking for Confluent Cloud Schema Management docs? These pages cover some aspects of Schema Registry that are generally applicable, such as general concepts, schema formats, hybrid use cases, and tutorials,
docs.confluent.io
By default, each record is deserialized into an Avro GenericRecord, but in this tutorial the record should be deserialized using the application’s code-generated Payment class. Therefore, configure the deserializer to use Avro SpecificRecord, i.e., SPECIFIC_AVRO_READER_CONFIG should be set to true. For example:
...
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
...
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
...
KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props));
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
ConsumerRecords<String, Payment> records = consumer.poll(100);
for (ConsumerRecord<String, Payment> record : records) {
String key = record.key();
Payment value = record.value();
}
}
...
다시 컨슈머를 earliest부터 읽도록 실행하면 다음과 같은 에러가 발생한다.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-group3-1, groupId=test-group3] Resetting offset for partition test-2 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
Exception in thread "main" org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition test-1 at offset 0. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1304)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at com.example.Main.main(Main.java:39)
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class myrecord.orders specified in writer's schema whilst finding reader's schema for a SpecificRecord.
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class myrecord.orders specified in writer's schema whilst finding reader's schema for a SpecificRecord.
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getSpecificReaderSchema(AbstractKafkaAvroDeserializer.java:281)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getReaderSchema(AbstractKafkaAvroDeserializer.java:252)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getDatumReader(AbstractKafkaAvroDeserializer.java:196)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:391)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:114)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420)
... 9 more
'빅데이터 > Kafka' 카테고리의 다른 글
[번역]카프카 스트림즈 운영하기(상태 복구) (0) | 2022.04.04 |
---|---|
카프카 커넥트의 태스크 밸런싱 로직, DistributedHerder(양치기) 그리고 IncrementalCooperativeAssignor 내부 동작 소개 (0) | 2022.03.23 |
kafka consumer와 seekToBeginning를 활용하여 offset reset하기 (0) | 2022.01.17 |
confluent developer certification 예시 문제 해설 (0) | 2021.12.15 |
confluent HdfsSinkConnector 파티셔너 설명 (0) | 2021.11.25 |
macos에서 podman으로 rest-proxy 실행하기 (0) | 2021.11.16 |