Help getting POC working-Control Center won't connect to Kafka

yes this should work
but again: does kafka and zookeeper run properly before you try to start the control-center?

check these basic config files, should work for a single node setup

Trying these configs right now…first issue was wrong cluster in meta.properties…is it ok to delete this file or is there a better process?

This worked!!! Dude, you are the man! Now to get Debezium working!

1 Like

So it looks like we have things running…feels like removing all the license stuff and setting that ONE value to localhost, even though the comment says it should not be localhost, got us working.

Now we have to figure out how to use the Debezium SQL connector and it must use Windows Authentication to connect to SQL…any thoughts on that?
Do we configure Kafka? Kafka Connect? Or just the Debezium connector? We have to use windows auth for the connector to connect to SQL.

basically you need Kafka connect yes

I’m not the super debezium specialist but though according to the docs you should be able to configure Kerberos auth

https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#debezium-sqlserver-connector-pass-through-database-driver-configuration-properties

Ok working on that now…currently our Control Center is showing unhealthy cluster due to topic named confluent telemetry metrics…we disabled self balancing as we only have one node but unsure why we are getting this message. Thoughts?

you could disable telemetry metrics
with setting confluent.reporters.telemetry.auto.enable in server.properties
to false

https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html#brokerconfigs_confluent.reporters.telemetry.auto.enable

We could but we want to see these metrics to ensure our stuff is working.

so I would recommend to check the replication factor of the topic and change it to 1

1 Like

Ok we were able to resolve…now we are stuck trying to get the Debezium connector to connect to our SQL server via integrated security. Right now it fails with the following:
Connector configuration is invalid and contains the following 2 errors:
A value is required
A value is required

Super specific error message

We got it…documentation is not thorough at all…added the two history properties and working now…connector is pulling data into the topic.

Now for the target!

1 Like

Ok so we have made some progress but now we are stuck…you have helped us so much, hoping you can bring us to the finish line…we have the snapshot working as well as CDC changes flowing into a topic…the issue now is our JDBC sink connector…when we fire that up we get an error…

Error: Sink connector 'portfoliogrpmanconfigtarget' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='dbo.GroupMember',partition=0,offset=0,timestamp=1643844482140) with a String value and string value schema. (org.apache.kafka.connect.runtime.WorkerSinkTask:636)
org.apache.kafka.connect.errors.ConnectException: Sink connector 'portfoliogrpmanconfigtarget' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='GroupMember',partition=0,offset=0,timestamp=1643844482140) with a String value and string value schema.

could you share your connector configuration?

and some links for further reading:

blog post by @rmoff

hth,
michael

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java,/devkafka/confluent-7.0.1/share/confluent-hub-components

thx and your connector config?

Ok so we started up the schema registry service, changed our connector to use Avro and we are successfully sending rows from the source into a topic. The issue now is getting the rows from the topic into the target.
Here is our sink config:

name=portfoliogrpmanconfigtargetlatestv10
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
connection.url=jdbc:sqlserver://server;databaseName=PortfolioManager;integratedSecurity=true;authenticationScheme=JavaKerberos
table.include.list=GroupMember
topics=server.dbo.GroupMember
key.converter= io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
delete.enabled=true
insert.mode=upsert
batch.size=10000
table.name.format=GroupMember
pk.mode=record_key
pk.fields=GroupID,MemberGroupID

Here is what we are getting now…it is like it is trying to add a column bc the source table or schema has it? But it doesn’t

Unable to find fields [SinkRecordField{schema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, name='source', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRUCT}, name='transaction', isPrimaryKey=false}, SinkRecordField{schema=Schema{server.sddev.lpl.com.dbo.GroupMember.Value:STRUCT}, name='before', isPrimaryKey=false}, SinkRecordField{schema=Schema{INT64}, name='ts_ms', isPrimaryKey=false}, SinkRecordField{schema=Schema{server.sddev.lpl.com.dbo.GroupMember.Value:STRUCT}, name='after', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='op', isPrimaryKey=false}] among column names [MemberGroupID, SortOrder, GroupID] (io.confluent.connect.jdbc.sink.DbStructure:276)
[2022-02-04 07:14:21,658] ERROR [portfoliogrpmanconfigtargetlatestv11|task-0] WorkerSinkTask{id=portfoliogrpmanconfigtargetlatestv11-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "dbo"."GroupMember" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask:636)
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Cannot ALTER TABLE "dbo"."GroupMember" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value
        at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:182)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:83)

Hi @pebriggs
seems to be related to:

seems as there is difference in your schema and target table see also

We finally got things working but we had to add a bunch of transforms and column whitelist. It’s like the topic had schema had more data than we needed so we had to specify what we want extracted and written to target.
Thanks so much for your help!

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.