Hi there,
I am currently working on a POC in using Kafka to help construct a data lake. One of the data sources is a PostgreSQL database, so I’ve been trying to use the PostgreSQL CDC Source Connector to send the appropriate messages to a Kafka cluster on Confluent Cloud.
My main confusion is with what happens if someone makes a change to the source DB resulting in a schema change that is incompatible with the current schema definition in the schema registry. For example what if there was a column that was changed from an integer to a string resulting in a schema registry change from this:
{
"fields": [
{
"name": "cakes_eaten",
"type": "int"
}
],
"name": "Count",
"namespace": "example.name.space",
"type": "record"
}
Into something like this:
{
"fields": [
{
"name": "cakes_eaten",
"type": "string"
}
],
"name": "Count",
"namespace": "example.name.space",
"type": "record"
}
Having tried this, it seems that the any rows created after the change from integer to string do not make its way to the brokers. This makes sense to me as consumers could potentially break as a result.
After seeing this, I tried to manually change the schema in the schema registry to:
{
"fields": [
{
"name": "cakes_eaten",
"type": ["int", "string"]
}
],
"name": "Count",
"namespace": "example.name.space",
"type": "record"
}
After making the change, the previously created “string rows” still did not make its way to the broker. Additionally, I tried to make more rows after the manual schema registry correction, but the new rows also did not make its way to the broker.
In the end I had three main questions:
- What is the recommended way to handle these horrific changes to the DB?
- The extra rows did not appear as messages when inspecting the topics. Are these failed rows/messages logged anywhere on confluent cloud?
- If a schema registry correction is made, is there a way to capture previously failed messages?
PS: I am extremely new to Kafka + Confluent. Apologies if this is a silly question. Any answers are very appreciated!