I got some error why syn mysql with elastic search by kafka connect and use ksql to denomorlize data. It seen that my message topic has an key, but kafka connect throw a .BufferUnderflowException. Thanks for your helps!!!
ERROR [elastic-sink-enriched|task-0] WorkerSinkTask{id=elastic-sink-enriched-0} Error converting message key in topic 'HOTELS_ENRICHED' partition 0 at offset 17 and timestamp 1714736658001: null (org.apache.kafka.connect.runtime.WorkerSinkTask:532)
java.nio.BufferUnderflowException
at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:651)
at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:402)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:302)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:160)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
rowtime: 2024/05/03 11:44:18.001 Z, key: 17, value: {"OWNER_ID": 1, "NAME": "Khach san sieu sieu dep", "HOTEL_TYPE": 4, "LOGO": "{\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}", "IMAGES": "[{\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}, {\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}, {\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}]", "ADDRESS": "31 Ngo quyen", "PROVINCE": null, "DISTRICT": null, "WARD": null, "LAT": 20.865139, "LNG": 106.68383, "STAR": 4, "STATUS": 1, "CREATED_AT": "2024-05-03T15:23:38Z", "UPDATED_AT": "2024-05-03T15:23:38Z", "FACILITY_LIST": null}, partition: 0
{
"name": "elastic-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "mysql-debezium-h5traveloto.h5traveloto.hotels,mysql-debezium-h5traveloto.h5traveloto.users",
"connection.url": "http://elasticsearch:9200",
"type.name": "type.name=kafkaconnect",
"key.ignore": "true",
"schema.ignore": "true"
}
}
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM HOTELS WITH(KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.hotels', PARTITIONS=1, VALUE_FORMAT='AVRO');
CREATE STREAM HOTEL_FACILITIES WITH(KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.hotel_facility_details', PARTITIONS=1, VALUE_FORMAT='AVRO');
CREATE STREAM ROOM_FACILITIES WITH(KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.room_facility_details', PARTITIONS=1 ,VALUE_FORMAT='AVRO');
CREATE STREAM ROOM_TYPES WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.room_types', PARTITIONS=1 ,VALUE_FORMAT='AVRO');
CREATE TABLE PROVINCES (ID VARCHAR PRIMARY KEY)
WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.provinces',
VALUE_FORMAT='AVRO');
CREATE TABLE DISTRICTS (ID VARCHAR PRIMARY KEY)
WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.districts',
VALUE_FORMAT='AVRO');
CREATE TABLE WARDS (ID VARCHAR PRIMARY KEY)
WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.wards',
VALUE_FORMAT='AVRO');
CREATE TABLE HOTEL_FACILITY_LIST AS
SELECT HF.HOTEL_ID, COLLECT_LIST(FACILITY_ID) AS FACILITY_LIST
FROM HOTEL_FACILITIES HF
GROUP BY HF.HOTEL_ID;
CREATE TABLE ROOM_FACILITY_LIST AS
SELECT RF.ROOM_ID, COLLECT_LIST(FACILITY_ID) AS FACILITY_LIST
FROM ROOM_FACILITIES RF
GROUP BY RF.ROOM_ID;
CREATE STREAM ROOM_TYPES_ENRICHED AS
SELECT
RT.ID ID,
RT.HOTEL_ID,
RT.NAME,
RT.MAX_CUSTOMER,
RT.AREA,
RT.BED,
RT.PRICE,
RT.STATUS,
RT.TOTAL_ROOM,
RT.PAY_IN_HOTEL,
RT.BREAK_FAST,
RT.FREE_CANCEL,
RT.CREATED_AT,
RT.UPDATED_AT,
RF.FACILITY_LIST
FROM ROOM_TYPES RT
LEFT JOIN ROOM_FACILITY_LIST RF ON RT.ID = RF.ROOM_ID
PARTITION BY ID;
CREATE STREAM HOTELS_ENRICHED
AS
SELECT
H.ID AS ID,
H.OWNER_ID,
H.NAME AS NAME,
H.HOTEL_TYPE,
H.LOGO,
H.IMAGES,
H.ADDRESS,
P.NAME AS PROVINCE,
D.NAME AS DISTRICT,
W.NAME AS WARD,
H.LAT,
H.LNG,
H.STAR,
H.STATUS,
H.CREATED_AT,
H.UPDATED_AT,
HF.FACILITY_LIST
FROM HOTELS H
LEFT JOIN PROVINCES P ON 'Struct{code='+H.PROVINCE_CODE+'}' = P.ID
LEFT JOIN DISTRICTS D ON 'Struct{code='+H.DISTRICT_CODE+'}' = D.ID
LEFT JOIN WARDS W ON 'Struct{code='+H.WARD_CODE+'}' = W.ID
LEFT JOIN HOTEL_FACILITY_LIST HF ON H.ID = HF.HOTEL_ID
PARTITION BY H.ID;