Kafka-connect-sink-to-elastic-search-got-error-message-key-null

I got some error why syn mysql with elastic search by kafka connect and use ksql to denomorlize data. It seen that my message topic has an key, but kafka connect throw a .BufferUnderflowException. Thanks for your helps!!!

 ERROR [elastic-sink-enriched|task-0] WorkerSinkTask{id=elastic-sink-enriched-0} Error converting message key in topic 'HOTELS_ENRICHED' partition 0 at offset 17 and timestamp 1714736658001: null (org.apache.kafka.connect.runtime.WorkerSinkTask:532)
java.nio.BufferUnderflowException
        at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:651)
        at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:402)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:302)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:160)
        at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:493)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

rowtime: 2024/05/03 11:44:18.001 Z, key: 17, value: {"OWNER_ID": 1, "NAME": "Khach san sieu sieu dep", "HOTEL_TYPE": 4, "LOGO": "{\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}", "IMAGES": "[{\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}, {\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}, {\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}]", "ADDRESS": "31 Ngo quyen", "PROVINCE": null, "DISTRICT": null, "WARD": null, "LAT": 20.865139, "LNG": 106.68383, "STAR": 4, "STATUS": 1, "CREATED_AT": "2024-05-03T15:23:38Z", "UPDATED_AT": "2024-05-03T15:23:38Z", "FACILITY_LIST": null}, partition: 0

{
  "name": "elastic-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "mysql-debezium-h5traveloto.h5traveloto.hotels,mysql-debezium-h5traveloto.h5traveloto.users",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "type.name=kafkaconnect",
    "key.ignore": "true",
    "schema.ignore": "true"
  }
}
SET 'auto.offset.reset' = 'earliest';

CREATE STREAM HOTELS WITH(KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.hotels', PARTITIONS=1, VALUE_FORMAT='AVRO');
CREATE STREAM HOTEL_FACILITIES WITH(KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.hotel_facility_details', PARTITIONS=1, VALUE_FORMAT='AVRO');
CREATE STREAM ROOM_FACILITIES WITH(KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.room_facility_details', PARTITIONS=1 ,VALUE_FORMAT='AVRO');
CREATE STREAM ROOM_TYPES WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.room_types', PARTITIONS=1 ,VALUE_FORMAT='AVRO');

CREATE TABLE PROVINCES (ID VARCHAR PRIMARY KEY)
    WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.provinces',
        VALUE_FORMAT='AVRO');

CREATE TABLE DISTRICTS (ID VARCHAR PRIMARY KEY)
    WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.districts',
        VALUE_FORMAT='AVRO');


CREATE TABLE WARDS (ID VARCHAR PRIMARY KEY)
    WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.wards',
        VALUE_FORMAT='AVRO');


CREATE TABLE  HOTEL_FACILITY_LIST  AS
	SELECT HF.HOTEL_ID, COLLECT_LIST(FACILITY_ID) AS FACILITY_LIST
    FROM  HOTEL_FACILITIES HF
    GROUP BY HF.HOTEL_ID;

CREATE TABLE  ROOM_FACILITY_LIST  AS
    SELECT RF.ROOM_ID, COLLECT_LIST(FACILITY_ID) AS FACILITY_LIST
    FROM  ROOM_FACILITIES RF
    GROUP BY RF.ROOM_ID;


CREATE STREAM ROOM_TYPES_ENRICHED AS
    SELECT
        RT.ID ID,
        RT.HOTEL_ID,
        RT.NAME,
        RT.MAX_CUSTOMER,
        RT.AREA,
        RT.BED,
        RT.PRICE,
        RT.STATUS,
        RT.TOTAL_ROOM,
        RT.PAY_IN_HOTEL,
        RT.BREAK_FAST,
        RT.FREE_CANCEL,
        RT.CREATED_AT,
        RT.UPDATED_AT,
        RF.FACILITY_LIST
    FROM ROOM_TYPES RT
        LEFT JOIN ROOM_FACILITY_LIST RF ON RT.ID = RF.ROOM_ID
PARTITION BY ID;

CREATE STREAM HOTELS_ENRICHED
AS
    SELECT
        H.ID AS ID,
        H.OWNER_ID,
        H.NAME AS NAME,
        H.HOTEL_TYPE,
        H.LOGO,
        H.IMAGES,
        H.ADDRESS,
        P.NAME AS PROVINCE,
        D.NAME AS DISTRICT,
        W.NAME AS WARD,
        H.LAT,
        H.LNG,
        H.STAR,
        H.STATUS,
        H.CREATED_AT,
        H.UPDATED_AT,
        HF.FACILITY_LIST
    FROM HOTELS H
        LEFT JOIN PROVINCES P ON 'Struct{code='+H.PROVINCE_CODE+'}' = P.ID
        LEFT JOIN DISTRICTS D ON 'Struct{code='+H.DISTRICT_CODE+'}' = D.ID
        LEFT JOIN WARDS W ON 'Struct{code='+H.WARD_CODE+'}' = W.ID
        LEFT JOIN HOTEL_FACILITY_LIST HF ON H.ID = HF.HOTEL_ID
PARTITION BY H.ID;