I got this error while try to sync my mysql instance to elastic search by kafka connect. I used ksql to demoralizes data. It seen the message has a key, but i don’t know why the connector throw the null exception like that. Thanks for your help.!!!
{
"name": "elastic-sink-enriched",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "HOTELS_ENRICHED",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore" : "false",
"schema.ignore": "false",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url" : "http://schema-registry:8081",
"key.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081",
"behavior.on.null.values":"delete",
"value.converter.schemas.enable":"true",
"errors.tolerance":"all"
}
}
rowtime: 2024/05/03 11:44:18.001 Z, key: 17, value: {"OWNER_ID": 1, "NAME": "Khach san sieu sieu dep", "HOTEL_TYPE": 4, "LOGO": "{\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}", "IMAGES": "[{\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}, {\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}, {\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}]", "ADDRESS": "31 Ngo quyen", "PROVINCE": null, "DISTRICT": null, "WARD": null, "LAT": 20.865139, "LNG": 106.68383, "STAR": 4, "STATUS": 1, "CREATED_AT": "2024-05-03T15:23:38Z", "UPDATED_AT": "2024-05-03T15:23:38Z", "FACILITY_LIST": null}, partition: 0
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM HOTELS WITH(KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.hotels', PARTITIONS=1, VALUE_FORMAT='AVRO');
CREATE STREAM HOTEL_FACILITIES WITH(KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.hotel_facility_details', PARTITIONS=1, VALUE_FORMAT='AVRO');
CREATE STREAM ROOM_FACILITIES WITH(KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.room_facility_details', PARTITIONS=1 ,VALUE_FORMAT='AVRO');
CREATE STREAM ROOM_TYPES WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.room_types', PARTITIONS=1 ,VALUE_FORMAT='AVRO');
CREATE TABLE PROVINCES (ID VARCHAR PRIMARY KEY)
WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.provinces',
VALUE_FORMAT='AVRO');
CREATE TABLE DISTRICTS (ID VARCHAR PRIMARY KEY)
WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.districts',
VALUE_FORMAT='AVRO');
CREATE TABLE WARDS (ID VARCHAR PRIMARY KEY)
WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.wards',
VALUE_FORMAT='AVRO');
CREATE TABLE HOTEL_FACILITY_LIST AS
SELECT HF.HOTEL_ID, COLLECT_LIST(FACILITY_ID) AS FACILITY_LIST
FROM HOTEL_FACILITIES HF
GROUP BY HF.HOTEL_ID;
CREATE TABLE ROOM_FACILITY_LIST AS
SELECT RF.ROOM_ID, COLLECT_LIST(FACILITY_ID) AS FACILITY_LIST
FROM ROOM_FACILITIES RF
GROUP BY RF.ROOM_ID;
CREATE STREAM ROOM_TYPES_ENRICHED AS
SELECT
RT.ID ID,
RT.HOTEL_ID,
RT.NAME,
RT.MAX_CUSTOMER,
RT.AREA,
RT.BED,
RT.PRICE,
RT.STATUS,
RT.TOTAL_ROOM,
RT.PAY_IN_HOTEL,
RT.BREAK_FAST,
RT.FREE_CANCEL,
RT.CREATED_AT,
RT.UPDATED_AT,
RF.FACILITY_LIST
FROM ROOM_TYPES RT
LEFT JOIN ROOM_FACILITY_LIST RF ON RT.ID = RF.ROOM_ID
PARTITION BY ID;
CREATE STREAM HOTELS_ENRICHED
AS
SELECT
H.ID AS ID,
H.OWNER_ID,
H.NAME AS NAME,
H.HOTEL_TYPE,
H.LOGO,
H.IMAGES,
H.ADDRESS,
P.NAME AS PROVINCE,
D.NAME AS DISTRICT,
W.NAME AS WARD,
H.LAT,
H.LNG,
H.STAR,
H.STATUS,
H.CREATED_AT,
H.UPDATED_AT,
HF.FACILITY_LIST
FROM HOTELS H
LEFT JOIN PROVINCES P ON 'Struct{code='+H.PROVINCE_CODE+'}' = P.ID
LEFT JOIN DISTRICTS D ON 'Struct{code='+H.DISTRICT_CODE+'}' = D.ID
LEFT JOIN WARDS W ON 'Struct{code='+H.WARD_CODE+'}' = W.ID
LEFT JOIN HOTEL_FACILITY_LIST HF ON H.ID = HF.HOTEL_ID
PARTITION BY H.ID;
ERROR [elastic-sink-enriched|task-0] WorkerSinkTask{id=elastic-sink-enriched-0} Error converting message key in topic 'HOTELS_ENRICHED' partition 0 at offset 17 and timestamp 1714736658001: null (org.apache.kafka.connect.runtime.WorkerSinkTask:532)
java.nio.BufferUnderflowException
at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:651)
at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:402)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:302)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:160)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)