I’m very new to KAFKA. I’m following the “Kafka Connect in Action: JDBC Sink” tutorial.
At the point when I insert data into the topic I receive the following error message:
"Failed to insert values into 'TEST01'. Column name `ROWKEY` does not exist."
The KSQLDB SERVER log reads the following:
"INFO Processed unsuccessfully: KsqlRequest{ksql='INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('X',1,'FOO');', configOverrides={}, requestProperties={}, sessionVariables={}, commandSequenceNumber=Optional[34]}, reason: (io.confluent.ksql.rest.server.resources.KsqlResource:325)
io.confluent.ksql.util.KsqlStatementException: Failed to insert values into 'TEST01'. Column name
ROWKEY does not exist.
Statement: INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('X',1,'FOO');"
At first, I thought it was because of the missing “ROWKEY” column name not present in the CREATE statement; however, I understand- from the tutorial- ROWKEY is generated by AVRO, and is not explicitly enumerated in the CREATE statement.
I’m wondering if there may be something wrong with my KAFKA-CONNECT startup properties (bin/connect-distributed /etc/schema-registry/connect-avro-distributed.properties) or Schema Registry properties (./bin/schema-registry-start etc/schema-registry/schema-registry.properties) that is mismatching with AVRO?
Let me know if you need any further information on my set up, settings, etc.
Thank you for your help.
This is my KSQL CLI input/output…
ksql> show connectors;
Connector Name | Type | Class | Status
sink-jdbc-mysql-01 | SINK | io.confluent.connect.jdbc.JdbcSinkConnector | RUNNING (1/1 tasks RUNNING)
ksql> show streams;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false
ksql> CREATE STREAM TEST01 (COL1 INT, COL2 VARCHAR)
WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');
Message
Stream created
ksql> show streams;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false
TEST01 | test01 | KAFKA | AVRO | false
ksql> show topics;
Kafka Topic | Partitions | Partition Replicas
default_ksql_processing_log | 1 | 1
test01 | 1 | 1
ksql> INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES (‘X’,1,‘FOO’);
Failed to insert values into ‘TEST01’. Column name ROWKEY does not exist.
ksql>
This is my CONNECTOR:
curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config
-H “Content-Type: application/json” -d ‘{
“connector.class” : “io.confluent.connect.jdbc.JdbcSinkConnector”,
“connection.url” : “jdbc:mysql://192.168.1.12:3306/demo”,
“topics” : “test01”,
“key.converter” : “org.apache.kafka.connect.storage.StringConverter”,
“connection.user” : “username”,
“connection.password” : “password”,
“auto.create” : true,
“auto.evolve” : true,
“insert.mode” : “insert”,
“pk.mode” : “record_key”,
“pk.fields” : “MESSAGE_KEY”
}’