Kafka connect sink to elastic search got error message key null

I got this error while try to sync my mysql instance to elastic search by kafka connect. I used ksql to demoralizes data. It seen the message has a key, but i don’t know why the connector throw the null exception like that. Thanks for your help.!!!

{
  "name": "elastic-sink-enriched",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "HOTELS_ENRICHED",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore" : "false",
    "schema.ignore": "false",
    "value.converter" : "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url" :  "http://schema-registry:8081",
    "key.converter" : "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081",
    "behavior.on.null.values":"delete",
    "value.converter.schemas.enable":"true",
    "errors.tolerance":"all"
  }
}

rowtime: 2024/05/03 11:44:18.001 Z, key: 17, value: {"OWNER_ID": 1, "NAME": "Khach san sieu sieu dep", "HOTEL_TYPE": 4, "LOGO": "{\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}", "IMAGES": "[{\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}, {\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}, {\"id\": 0, \"url\": \"https://d3jwhct9rpti9n.cloudfront.net/hotel_images/658799386.jpg\", \"width\": 1000, \"height\": 600, \"extension\": \".jpg\", \"cloud_name\": \"s3\"}]", "ADDRESS": "31 Ngo quyen", "PROVINCE": null, "DISTRICT": null, "WARD": null, "LAT": 20.865139, "LNG": 106.68383, "STAR": 4, "STATUS": 1, "CREATED_AT": "2024-05-03T15:23:38Z", "UPDATED_AT": "2024-05-03T15:23:38Z", "FACILITY_LIST": null}, partition: 0

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM HOTELS WITH(KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.hotels', PARTITIONS=1, VALUE_FORMAT='AVRO');
CREATE STREAM HOTEL_FACILITIES WITH(KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.hotel_facility_details', PARTITIONS=1, VALUE_FORMAT='AVRO');
CREATE STREAM ROOM_FACILITIES WITH(KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.room_facility_details', PARTITIONS=1 ,VALUE_FORMAT='AVRO');
CREATE STREAM ROOM_TYPES WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.room_types', PARTITIONS=1 ,VALUE_FORMAT='AVRO');

CREATE TABLE PROVINCES (ID VARCHAR PRIMARY KEY)
    WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.provinces',
        VALUE_FORMAT='AVRO');

CREATE TABLE DISTRICTS (ID VARCHAR PRIMARY KEY)
    WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.districts',
        VALUE_FORMAT='AVRO');


CREATE TABLE WARDS (ID VARCHAR PRIMARY KEY)
    WITH (KAFKA_TOPIC='mysql-debezium-h5traveloto.h5traveloto.wards',
        VALUE_FORMAT='AVRO');


CREATE TABLE  HOTEL_FACILITY_LIST  AS
	SELECT HF.HOTEL_ID, COLLECT_LIST(FACILITY_ID) AS FACILITY_LIST
    FROM  HOTEL_FACILITIES HF
    GROUP BY HF.HOTEL_ID;

CREATE TABLE  ROOM_FACILITY_LIST  AS
    SELECT RF.ROOM_ID, COLLECT_LIST(FACILITY_ID) AS FACILITY_LIST
    FROM  ROOM_FACILITIES RF
    GROUP BY RF.ROOM_ID;


CREATE STREAM ROOM_TYPES_ENRICHED AS
    SELECT
        RT.ID ID,
        RT.HOTEL_ID,
        RT.NAME,
        RT.MAX_CUSTOMER,
        RT.AREA,
        RT.BED,
        RT.PRICE,
        RT.STATUS,
        RT.TOTAL_ROOM,
        RT.PAY_IN_HOTEL,
        RT.BREAK_FAST,
        RT.FREE_CANCEL,
        RT.CREATED_AT,
        RT.UPDATED_AT,
        RF.FACILITY_LIST
    FROM ROOM_TYPES RT
        LEFT JOIN ROOM_FACILITY_LIST RF ON RT.ID = RF.ROOM_ID
PARTITION BY ID;

CREATE STREAM HOTELS_ENRICHED
AS
    SELECT
        H.ID AS ID,
        H.OWNER_ID,
        H.NAME AS NAME,
        H.HOTEL_TYPE,
        H.LOGO,
        H.IMAGES,
        H.ADDRESS,
        P.NAME AS PROVINCE,
        D.NAME AS DISTRICT,
        W.NAME AS WARD,
        H.LAT,
        H.LNG,
        H.STAR,
        H.STATUS,
        H.CREATED_AT,
        H.UPDATED_AT,
        HF.FACILITY_LIST
    FROM HOTELS H
        LEFT JOIN PROVINCES P ON 'Struct{code='+H.PROVINCE_CODE+'}' = P.ID
        LEFT JOIN DISTRICTS D ON 'Struct{code='+H.DISTRICT_CODE+'}' = D.ID
        LEFT JOIN WARDS W ON 'Struct{code='+H.WARD_CODE+'}' = W.ID
        LEFT JOIN HOTEL_FACILITY_LIST HF ON H.ID = HF.HOTEL_ID
PARTITION BY H.ID;

ERROR [elastic-sink-enriched|task-0] WorkerSinkTask{id=elastic-sink-enriched-0} Error converting message key in topic 'HOTELS_ENRICHED' partition 0 at offset 17 and timestamp 1714736658001: null (org.apache.kafka.connect.runtime.WorkerSinkTask:532)
java.nio.BufferUnderflowException
        at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:651)
        at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:402)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:302)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:160)
        at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:493)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)