Scenario:
I am using the JdbcSinkConnector in Kafka to sink data from Kafka to the sql server database. I am using C# .net core 5.
Problem:
I created tables in Sql server consisting of: Users & CreditCards tables.
The CreditCards tables has a FK to the Users table which is PublicId.
I created the Connector successfully, infact when I sink to the Users table ONLY it works flawlessly (Inserting & updating).
But when I sink to 2 tables ie Users & CreditCards tables it fails. The Kafka Connector fails with the following error:
“Table “users” is missing fields ([SinkRecordField{schema=Schema{ARRAY}, name=‘CreditCard’, isPrimaryKey=false}]) and auto-evolution is disabled”
This is the Avro Schema I have:
@“{”“type”“:”“record”“,”“name”“:”“User”“,”“namespace”“:”“sample”“,”“fields”“:[{”“name”“:”“publicid”“,”“type”“:”“bytes”“},{”“name”“:”“name”“,”“type”“:”“string”“},{”“name”“:”“surname”“,”“type”“:”“string”“},{”“name”“:”“CreditCard”“,”“type”“:[”“null”“,{”“type”“:”“array”“,”“items”“:{”“type”“:”“record”“,”“name”“:”“CreditCard”“,”“namespace”“:”“sample”“,”“fields”“:[{”“name”“:”“publicid”“,”“type”“:”“bytes”“},{”“name”“:”“cardnumber”“,”“type”“:”“string”“},{”“name”“:”“expiry”“,”“type”“:”“string”“}]}}]}]}”
The C# class has reference to the CreditCard object which implements the ISpecificRecord.
Is there something wrong with my connector?
Is there a working example where I can follow?
Connector details:
{
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“connection.url”: “jdbc:sqlserver://0.0.0.0:1433;databaseName=database”,
“connection.user”: “user”,
“connection.password”: “***”,
“topics”: “users”,
“key.converter”: “io.confluent.connect.avro.AvroConverter”,
“key.converter.schema.registry.url”: “http://schema-registry:8081”,
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“value.converter.schema.registry.url”: “http://schema-registry:8081”,
“auto.create”:“true”,
“pk.mode”:“record_value”,
“pk.fields”:“publicid”,
“insert.mode”:“upsert”,
“table.whitelist”: “dbo.users, dbo.creditcards”
}
SQL table Users
Id (int, not null) auto incrementing on Insert
PublicId (PK, uniqueidentifier, not null)
Name (varchar(50), not null)
Surname (varchar(50), not null)
SQL table CreditCards
Id (int, not null) auto incrementing on Insert
PublicId (PK, FK, uniqueidentifier, not null) → refers to the PublicId in the Users table
CardNumber (varchar(50), not null)
Expiry (varchar(50), not null)