I encountered a strange problem yesterday at work and I’d like some opinions.
I have a Oracle Debezium connector, configured to use the JSON converter:
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"topic.creation.default.partitions": "30",
"topic.creation.default.replication.factor": "3",
"topic.creation.default.cleanup.policy": "compact",
"database.history.kafka.topic": "TEST.SCHEMA.HISTORY",
"database.schema": "TEST",
"snapshot.delay.ms": "360000",
"poll.interval.ms": "2000",
"database.history.retention.hours": "24",
"database.out.server.name": "TEST.UAT.PRICE.TEST",
"topic.creation.default.compression.type": "snappy",
"database.user": "logminer",
"database.dbname": "UAT0221",
"event.processing.failure.handling.mode": "warn",
"database.connection.adapter": "logminer",
"database.history.kafka.bootstrap.servers": "mykafka:9092",
"database.server.name": "TEST.UAT.PRICE",
"database.port": "1521",
"heartbeat.interval.ms": "300000",
"database.history.producer.ssl.truststore.password": "confluenttruststorepass",
"consumer.override.auto.offset.reset": "earliest",
"database.hostname": "mydb",
"database.password": "mypd",
"name": "connector-t3",
"table.include.list": "TEST.PRICE,TEST.ARTICLE,TEST.STOCK",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"transforms": "suffix,unwrap",
"transforms.suffix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.suffix.regex": "TEST\\.UAT\\.PRICE\\.TEST\\.(.*)",
"transforms.suffix.replacement": "TEST.$1",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false,
(some schema + db names have been changed)
This connector works fine. I developed a Kafka Stream app that is just a test. This app takes data from the TEST.PRICE
topic created by the above connector and then copies it to a TEST.PRICE.COPY
I then log the JSON data and try to add some ProcessorContext information into the data such as the topic name.
The problem I encounter when logging the data is that I sometimes (not for all records) get binary data, and not JSON as expected. I wrote a transformer that verify this:
private AtomicLong counter = new AtomicLong(0L);
class StoreCodeAdderTransformer implements Transformer<Bytes, Bytes, KeyValue<String, String>> {
private ProcessorContext context;
public void init(ProcessorContext context) {
this.context = context;
public KeyValue<String, String> transform(Bytes bytesKey, Bytes bytesValue) {
String key = new String(bytesKey.get());
String value = new String(bytesValue.get());
if(!key.startsWith("{") || !value.startsWith("{")) {
LOG.error("key: {} (hex: {})", key, Hex.encodeHexString(bytesKey.get()));
LOG.error("value: {} (hex: {})", value, Hex.encodeHexString(bytesValue.get()));
LOG.error("partition: {}, offset: {}", context.partition(), context.offset());
return new KeyValue<>(key, value);
public void close() {}
In my application logs, I get the following logs:
For the record, an example hex representation for the key is : 00000000193cd29a6480a0cae0e05a
And for the value: 000000001a00023cd29a6480a0cae0e05a02060088b80206524f4e000002020204312002144c505020202020202020020405fe000280a0cae0e05a02c0dd9ca7e15a02024c020603ab380000000000028090fdb2e15a000018312e342e302e416c706861320c6f7261636c651e4d4554492e5541542e505249434532eeee999de35d0008747275650e554154303232310e50524f533230310a4d4750564d0002e4babba5010000027202eeee999de35d00
(this is UAT, non sensitive data)
I can’t wrap my head around why some records are returned as binary data instead of expected text-encoded JSON.
What am I missing ? What could be the explanation ?
I ruled out:
- compression (source connector not configured as compressed) ; unless there is some kind of automatic compression by Debezium that would happens on some records only and that I’m not aware of.
- encoding issue (I’d expect fields value to be with weird characters, but JSON structure to be fine)