Sql Server JdbcSinkConnector to multiple tables

Scenario:
I am using the JdbcSinkConnector in Kafka to sink data from Kafka to the sql server database. I am using C# .net core 5.

Problem:
I created tables in Sql server consisting of: Users & CreditCards tables.
The CreditCards tables has a FK to the Users table which is PublicId.

I created the Connector successfully, infact when I sink to the Users table ONLY it works flawlessly (Inserting & updating).

But when I sink to 2 tables ie Users & CreditCards tables it fails. The Kafka Connector fails with the following error:
“Table “users” is missing fields ([SinkRecordField{schema=Schema{ARRAY}, name=‘CreditCard’, isPrimaryKey=false}]) and auto-evolution is disabled”

This is the Avro Schema I have:
@“{”“type”“:”“record”“,”“name”“:”“User”“,”“namespace”“:”“sample”“,”“fields”“:[{”“name”“:”“publicid”“,”“type”“:”“bytes”“},{”“name”“:”“name”“,”“type”“:”“string”“},{”“name”“:”“surname”“,”“type”“:”“string”“},{”“name”“:”“CreditCard”“,”“type”“:[”“null”“,{”“type”“:”“array”“,”“items”“:{”“type”“:”“record”“,”“name”“:”“CreditCard”“,”“namespace”“:”“sample”“,”“fields”“:[{”“name”“:”“publicid”“,”“type”“:”“bytes”“},{”“name”“:”“cardnumber”“,”“type”“:”“string”“},{”“name”“:”“expiry”“,”“type”“:”“string”“}]}}]}]}”

The C# class has reference to the CreditCard object which implements the ISpecificRecord.

Is there something wrong with my connector?
Is there a working example where I can follow?

Connector details:
{
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“connection.url”: “jdbc:sqlserver://0.0.0.0:1433;databaseName=database”,
“connection.user”: “user”,
“connection.password”: “***”,
“topics”: “users”,
“key.converter”: “io.confluent.connect.avro.AvroConverter”,
“key.converter.schema.registry.url”: “http://schema-registry:8081”,
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“value.converter.schema.registry.url”: “http://schema-registry:8081”,
“auto.create”:“true”,
“pk.mode”:“record_value”,
“pk.fields”:“publicid”,
“insert.mode”:“upsert”,
“table.whitelist”: “dbo.users, dbo.creditcards”
}

SQL table Users
Id (int, not null) auto incrementing on Insert
PublicId (PK, uniqueidentifier, not null)
Name (varchar(50), not null)
Surname (varchar(50), not null)

SQL table CreditCards
Id (int, not null) auto incrementing on Insert
PublicId (PK, FK, uniqueidentifier, not null) → refers to the PublicId in the Users table
CardNumber (varchar(50), not null)
Expiry (varchar(50), not null)

@guz280 The schema you reference has a field CreditCard with a union type of either null or the CreditCard Record type. The error message indicates that the table users is missing that field, and the table definition you provide confirms that Users does not have a CreditCard field.

I don’t believe that Kafka Connect can do the magic for you to extract the Credit Card information from the user event and sink it into the creditcards table for you.

I also do not believe that the table.whitelist configuration value you provide is relevant to a JDBCSinkConnector. You provide the topics and the table.name.format configurations. This tells the connector which topics to consume and what the destination table names will be.

If possible, you could write the Credit Card events to an separate credit card topic at the source, or, If the credit card records are required to be embedded in the users topic events, I believe you will have to use some type of stream processing to extract them into their own topic if you wish to use Kafka Connect to sink them to a creditcards table.

Hope this helps

1 Like

@rick Thanks for your reply

I agree with what you said especially that kafka will not do the magic for you to extract the Credit Card information from the user event .

Thanks again & happy new year

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.