Binary data extracted from a JSON encoded connector

I encountered a strange problem yesterday at work and I’d like some opinions.

I have a Oracle Debezium connector, configured to use the JSON converter:

{
  "connector.class": "io.debezium.connector.oracle.OracleConnector",
  "tasks.max": "1",
  "topic.creation.default.partitions": "30",
  "topic.creation.default.replication.factor": "3",
  "topic.creation.default.cleanup.policy": "compact",
  "database.history.kafka.topic": "TEST.SCHEMA.HISTORY",
  "database.schema": "TEST",
  "snapshot.delay.ms": "360000",
  "poll.interval.ms": "2000",
  "database.history.retention.hours": "24",
  "database.out.server.name": "TEST.UAT.PRICE.TEST",
  "topic.creation.default.compression.type": "snappy",
  "database.user": "logminer",
  "database.dbname": "UAT0221",
  "event.processing.failure.handling.mode": "warn",
  "database.connection.adapter": "logminer",
  "database.history.kafka.bootstrap.servers": "mykafka:9092",
  "database.server.name": "TEST.UAT.PRICE",
  "database.port": "1521",
  "heartbeat.interval.ms": "300000",
  "database.history.producer.ssl.truststore.password": "confluenttruststorepass",
  "consumer.override.auto.offset.reset": "earliest",
  "database.hostname": "mydb",
  "database.password": "mypd",
  "name": "connector-t3",
  "table.include.list": "TEST.PRICE,TEST.ARTICLE,TEST.STOCK",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable": false,
  "value.converter.schemas.enable": false,
  "transforms": "suffix,unwrap",
  "transforms.suffix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.suffix.regex": "TEST\\.UAT\\.PRICE\\.TEST\\.(.*)",
  "transforms.suffix.replacement": "TEST.$1",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones": false,
}

(some schema + db names have been changed)

This connector works fine. I developed a Kafka Stream app that is just a test. This app takes data from the TEST.PRICE topic created by the above connector and then copies it to a TEST.PRICE.COPY.
I then log the JSON data and try to add some ProcessorContext information into the data such as the topic name.

The problem I encounter when logging the data is that I sometimes (not for all records) get binary data, and not JSON as expected. I wrote a transformer that verify this:

    private AtomicLong counter = new AtomicLong(0L);
    class StoreCodeAdderTransformer implements Transformer<Bytes, Bytes, KeyValue<String, String>> {
        private ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
        }

        @Override
        public KeyValue<String, String> transform(Bytes bytesKey, Bytes bytesValue) {
            String key = new String(bytesKey.get());
            String value = new String(bytesValue.get());
            if(!key.startsWith("{") || !value.startsWith("{")) {
                LOG.error("key: {} (hex: {})", key, Hex.encodeHexString(bytesKey.get()));
                LOG.error("value: {} (hex: {})", value, Hex.encodeHexString(bytesValue.get()));
                LOG.error("partition: {}, offset: {}", context.partition(), context.offset());
                counter.incrementAndGet();
            }
            return new KeyValue<>(key, value);
        }

        @Override
        public void close() {}
    }

In my application logs, I get the following logs:

For the record, an example hex representation for the key is : 00000000193cd29a6480a0cae0e05a
And for the value: 000000001a00023cd29a6480a0cae0e05a02060088b80206524f4e000002020204312002144c505020202020202020020405fe000280a0cae0e05a02c0dd9ca7e15a02024c020603ab380000000000028090fdb2e15a000018312e342e302e416c706861320c6f7261636c651e4d4554492e5541542e505249434532eeee999de35d0008747275650e554154303232310e50524f533230310a4d4750564d0002e4babba5010000027202eeee999de35d00

(this is UAT, non sensitive data)

I can’t wrap my head around why some records are returned as binary data instead of expected text-encoded JSON.
What am I missing ? What could be the explanation ?

I ruled out:

  • compression (source connector not configured as compressed) ; unless there is some kind of automatic compression by Debezium that would happens on some records only and that I’m not aware of.
  • encoding issue (I’d expect fields value to be with weird characters, but JSON structure to be fine)

I’ve got to ask – why not use Avro/Protobuf/JSON Schema?

JSON with "schemas.enable": falsejust throws away all of that lovely schema data that the RDBMS so obligingly provides :crying_cat_face:

1 Like

I can’t manage strict structured schema because I have unreliable schema structures in input (todays I have 1 topic, in production I’ll have like 300+). I want to use JSON so that I can clean / certify the data and then convert it to avro !

Hmmm. Do you have any BLOBs in the source data? Are you able to share the source schema?

They are no blob in the source data.
The source schema looks like this for the key:
{"PVM_CDMAG":123,"PVM_NOART":456,"PVM_DT":1547078400000}
And for the value:

{"PVM_CDMAG":123,
"PVM_NOART":456,
"PVM_DT":789,
"PVM_PX":"JxA=",
"PVM_CDDEVISE":"EUR",
"PVM_PXVTEUR":null,
"PVM_CDACCOM":null,
"PVM_NBETIQ":1,
"PVM_TYETIQ":"1 ",
"PVM_DSOPER":"LPP",
"PVM_PTBBRUT":"+BA=",
"PVM_MDACCO":null,
"PVM_DTSAISIE":1566000000000,
"PVM_HHSAISIE":1566073919000,
"PVM_TYPROVNC":"L",
"PVM_PXACHAT":"AZee",
"PVM_PXCONS":null,
"PVM_TYPXVEN":null,
"PVM_PXVTMAJO":null,
"PVM_DTFNPROT":null,
"PVM_CDSOLDE":null,
"PVM_DTAPPLCS":1566086400000,
"PVM_CDSTATUT":null,
"PVM_FLRER":0}

I finally solved this issue.
The topic regex was too large and included some non-intended topic which used avro serialization.

:sweat_smile: Phew! That was a puzzler, for sure :slight_smile:

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.