KSQLDB Insert Fails: 'ROWKEY' does not exist message

I’m very new to KAFKA. I’m following the “Kafka Connect in Action: JDBC Sink” tutorial.

At the point when I insert data into the topic I receive the following error message:

"Failed to insert values into 'TEST01'. Column name `ROWKEY` does not exist."

The KSQLDB SERVER log reads the following:

"INFO Processed unsuccessfully: KsqlRequest{ksql='INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('X',1,'FOO');', configOverrides={}, requestProperties={}, sessionVariables={}, commandSequenceNumber=Optional[34]}, reason: (io.confluent.ksql.rest.server.resources.KsqlResource:325)
io.confluent.ksql.util.KsqlStatementException: Failed to insert values into 'TEST01'. Column name ROWKEY does not exist.
Statement: INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('X',1,'FOO');"

At first, I thought it was because of the missing “ROWKEY” column name not present in the CREATE statement; however, I understand- from the tutorial- ROWKEY is generated by AVRO, and is not explicitly enumerated in the CREATE statement.

I’m wondering if there may be something wrong with my KAFKA-CONNECT startup properties (bin/connect-distributed /etc/schema-registry/connect-avro-distributed.properties) or Schema Registry properties (./bin/schema-registry-start etc/schema-registry/schema-registry.properties) that is mismatching with AVRO?

Let me know if you need any further information on my set up, settings, etc.

Thank you for your help.

This is my KSQL CLI input/output…

ksql> show connectors;
Connector Name | Type | Class | Status
sink-jdbc-mysql-01 | SINK | io.confluent.connect.jdbc.JdbcSinkConnector | RUNNING (1/1 tasks RUNNING)

ksql> show streams;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false

ksql> CREATE STREAM TEST01 (COL1 INT, COL2 VARCHAR)
WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');

Message
Stream created

ksql> show streams;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false
TEST01 | test01 | KAFKA | AVRO | false

ksql> show topics;
Kafka Topic | Partitions | Partition Replicas
default_ksql_processing_log | 1 | 1
test01 | 1 | 1

ksql> INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES (‘X’,1,‘FOO’);
Failed to insert values into ‘TEST01’. Column name ROWKEY does not exist.
ksql>

This is my CONNECTOR:

curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config
-H “Content-Type: application/json” -d ‘{
“connector.class” : “io.confluent.connect.jdbc.JdbcSinkConnector”,
“connection.url” : “jdbc:mysql://192.168.1.12:3306/demo”,
“topics” : “test01”,
“key.converter” : “org.apache.kafka.connect.storage.StringConverter”,
“connection.user” : “username”,
“connection.password” : “password”,
“auto.create” : true,
“auto.evolve” : true,
“insert.mode” : “insert”,
“pk.mode” : “record_key”,
“pk.fields” : “MESSAGE_KEY”
}’

Could be related to Documentation outdated: 'insert into' statements invalid · Issue #5043 · confluentinc/ksql · GitHub ?

Or Improve behavior for INSERT INTO when source key field is inserted to sink value field · Issue #7445 · confluentinc/ksql · GitHub

Are you following the guide here? Because in that the SQL looks like this and doesn’t reference ROWKEY (earlier versions did, but I updated it as ksqlDB evolved its handling of key columns)

INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('X',1,'FOO');

@rmoff I tried the content of the guide previously, but ran into a SQL error ( “SQLSyntaxErrorException: BLOB/TEXT column … used in key specification without a key length”), so ran with the video content.

The guide content worked for me now, along with the help of your blog about the SQLSyntaxErrorException. Found the blog article after retrying the guide.

Thank you for your help.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.