Schema Registry + PostgreSQL CDC Source Connector

Hi there,

I am currently working on a POC in using Kafka to help construct a data lake. One of the data sources is a PostgreSQL database, so I’ve been trying to use the PostgreSQL CDC Source Connector to send the appropriate messages to a Kafka cluster on Confluent Cloud.

My main confusion is with what happens if someone makes a change to the source DB resulting in a schema change that is incompatible with the current schema definition in the schema registry. For example what if there was a column that was changed from an integer to a string resulting in a schema registry change from this:

{
  "fields": [
    {
        "name": "cakes_eaten",
        "type": "int"
    }
  ],
  "name": "Count",
  "namespace": "example.name.space",
  "type": "record"
}

Into something like this:

{
  "fields": [
    {
        "name": "cakes_eaten",
        "type": "string"
    }
  ],
  "name": "Count",
  "namespace": "example.name.space",
  "type": "record"
}

Having tried this, it seems that the any rows created after the change from integer to string do not make its way to the brokers. This makes sense to me as consumers could potentially break as a result.

After seeing this, I tried to manually change the schema in the schema registry to:

{
  "fields": [
    {
        "name": "cakes_eaten",
        "type": ["int", "string"]
    }
  ],
  "name": "Count",
  "namespace": "example.name.space",
  "type": "record"
}

After making the change, the previously created “string rows” still did not make its way to the broker. Additionally, I tried to make more rows after the manual schema registry correction, but the new rows also did not make its way to the broker.

In the end I had three main questions:

  1. What is the recommended way to handle these horrific changes to the DB?
  2. The extra rows did not appear as messages when inspecting the topics. Are these failed rows/messages logged anywhere on confluent cloud?
  3. If a schema registry correction is made, is there a way to capture previously failed messages?

PS: I am extremely new to Kafka + Confluent. Apologies if this is a silly question. Any answers are very appreciated!

The default compatibility level in Schema Registry is BACKWARD, and the field data type change in your example is not backward-compatible. One option is to disable compatibility checks for the Schema Registry subject, which will allow the connector to register new schema versions with non-compatible changes. This can be done through the Cloud UI or with the following API: Schema Registry API Reference | Confluent Documentation, by setting the compatibility level to NONE.

Hi Mike, thanks for the response.

Is there any way I’d be able to view a log of the incompatible messages?

The ability to view managed Connector logs is coming soon to Cloud.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.