Kafka Producer/Consumer API

Hi,

I am implementing Kafka based object pasrsing through serialization/deserialization

My Feilds

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SalesOrder {

private int applicationnumber;
private String customerid;
private String purchaseorderno;
private String materialno;
private String ordertype;
private String supplier;
private String unit;
private String salesdate;
private String address;
private long mobileno;
private String emailid;

}

Producer Data

<>

@Configuration
public class KafkaProducerConfig {
@Bean
public Map<String,Object> producerConfig(){
Map<String,Object> props=new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
“localhost:9092”);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}

@Bean
public ProducerFactory<String, Object> producerFactory(){
    return new DefaultKafkaProducerFactory<>(producerConfig());
}

@Bean
public KafkaTemplate<String,Object> kafkaTemplate()
{
    return new KafkaTemplate<>(producerFactory());
}

}

<>

@RestController
@RequestMapping(“/producer-app”)
public class EventController {

@Autowired
private KafkaMessagePublisher publisher;

@PostMapping("/publish")
public void sendEvents(@RequestBody SalesOrder salesOrder)
{
    publisher.sendEventsToTopic(salesOrder);
}

}

<>

@Service
public class KafkaMessagePublisher {

@Autowired
private KafkaTemplate<String,Object> template;

public void sendEventsToTopic(SalesOrder salesOrder) {
    try {
        CompletableFuture<SendResult<String, Object>> future = template.send("checkdata", salesOrder);
        future.whenComplete((result, ex) -> {

            if (ex == null) {

                System.out.println("Events"+ salesOrder.toString());
            //    System.out.println("Sent message=[" + salesOrder.toString() +
             //           "] with offset=[" + result.getRecordMetadata().offset() + "]");
            } else {
                System.out.println("Unable to send message=[" +
                        salesOrder.toString() + "] due to : " + ex.getMessage());
            }

        });

    } catch (Exception ex) {
        System.out.println("ERROR : "+ ex.getMessage());
    }
}

}


Consumer Data
<>

@Configuration
public class KafkaConsumerConfig {

@Bean
public Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "SalesOrder_Consumer.Bean;");
    return props;
}

@Bean
public ConsumerFactory<String,Object> consumerFactory(){
    return new DefaultKafkaConsumerFactory<>(consumerConfig());
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

}


Kafka Message Listener - Consumer

@Service
public class KafkaMessageListener {

Logger log = LoggerFactory.getLogger(KafkaMessageListener.class);

@KafkaListener(topics = "checkdata",groupId = "jt-group")
public void consumeEvents(SalesOrder salesOrder) {
    log.info("consumer consume the events {} ", salesOrder.toString());
}

}


I am getting below error, please help to check and let us know any clues.

Kafka Producer API side I am getting data as below.

from Producer logs, I found,

2024-02-18T23:06:10.617+05:30 INFO 31344 — [nio-9398-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
2024-02-18T23:06:10.618+05:30 INFO 31344 — [nio-9398-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1708277770606
2024-02-18T23:06:11.836+05:30 INFO 31344 — [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: F4WS8SFTRAu2I4yCEElfTA
2024-02-18T23:06:11.839+05:30 INFO 31344 — [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 1001 with epoch 0
EventsSalesOrder(applicationnumber=56999546, customerid=CkDFF, purchaseorderno=AkSDWD, materialno=MNk109, ordertype=Ok1L, supplier=SL1k01, unit=237, salesdate=2022-03-12 05:30:00, address=PjQ4R, mobileno=87709, emailid=abgsc@Gmail.com)

Daemon will be stopped at the end of the build stop command received

Task :SalesOrderProducerApplication.main()
2024-02-18T23:15:11.903+05:30 INFO 31344 — [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node -1 disconnected.
EventsSalesOrder(applicationnumber=56999546, customerid=CkDFF, purchaseorderno=AkSDWD, materialno=MNk109, ordertype=Ok1L, supplier=SL1k01, unit=237, salesdate=2022-03-12 05:30:00, address=PjQ4R, mobileno=87709, emailid=abgsc@Gmail.com)


Consumer Logs

2024-02-18T23:51:19.628+05:30 INFO 33240 — [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
2024-02-18T23:51:19.636+05:30 INFO 33240 — [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
2024-02-18T23:51:19.636+05:30 INFO 33240 — [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1708280479620
2024-02-18T23:51:19.648+05:30 INFO 33240 — [ main] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] Subscribed to topic(s): checkdata
2024-02-18T23:51:19.723+05:30 INFO 33240 — [ main] S.SalesOrderConsumerApplication : Started SalesOrderConsumerApplication in 10.264 seconds (process running for 11.864)
2024-02-18T23:51:21.510+05:30 INFO 33240 — [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] Cluster ID: F4WS8SFTRAu2I4yCEElfTA
2024-02-18T23:51:21.516+05:30 INFO 33240 — [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] Discovered group coordinator 127.0.0.1:9092 (id: 2147483647 rack: null)
2024-02-18T23:51:21.522+05:30 INFO 33240 — [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] (Re-)joining group
2024-02-18T23:51:21.585+05:30 INFO 33240 — [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] Request joining group due to: need to re-join with the given member-id: consumer-jt-group-1-7f060e9a-a56d-4528-86f0-09bd29bf0fb7
2024-02-18T23:51:21.587+05:30 INFO 33240 — [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] Request joining group due to: rebalance failed due to ‘The group member needs to have a valid member id before actually entering a consumer group.’ (MemberIdRequiredException)
2024-02-18T23:51:21.587+05:30 INFO 33240 — [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] (Re-)joining group
2024-02-18T23:51:21.609+05:30 INFO 33240 — [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] Successfully joined group with generation Generation{generationId=13, memberId=‘consumer-jt-group-1-7f060e9a-a56d-4528-86f0-09bd29bf0fb7’, protocol=‘range’}
2024-02-18T23:51:21.648+05:30 INFO 33240 — [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] Finished assignment for group at generation 13: {consumer-jt-group-1-7f060e9a-a56d-4528-86f0-09bd29bf0fb7=Assignment(partitions=[checkdata-0])}
2024-02-18T23:51:21.669+05:30 INFO 33240 — [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] Successfully synced group in generation Generation{generationId=13, memberId=‘consumer-jt-group-1-7f060e9a-a56d-4528-86f0-09bd29bf0fb7’, protocol=‘range’}
2024-02-18T23:51:21.671+05:30 INFO 33240 — [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] Notifying assignor about the new Assignment(partitions=[checkdata-0])
2024-02-18T23:51:21.685+05:30 INFO 33240 — [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] Adding newly assigned partitions: checkdata-0
2024-02-18T23:51:21.732+05:30 INFO 33240 — [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-jt-group-1, groupId=jt-group] Setting offset for partition checkdata-0 to the committed offset FetchPosition{offset=6, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 0 rack: null)], epoch=0}}
2024-02-18T23:51:21.738+05:30 INFO 33240 — [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : jt-group: partitions assigned: [checkdata-0]
2024-02-18T23:51:21.906+05:30 ERROR 33240 — [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException’s directly; please consider configuring an ‘ErrorHandlingDeserializer’ in the value and/or key deserializer
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1934) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1365) ~[spring-kafka-3.1.1.jar:3.1.1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition checkdata-0 at offset 6. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:309) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) ~[kafka-clients-3.6.1.jar:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition checkdata-0 at offset 6. If needed, please seek past the record to continue consumption.

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1649) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1624) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1421) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1313) ~[spring-kafka-3.1.1.jar:3.1.1]
... 2 common frames omitted

Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-6.1.3.jar:6.1.3]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:583) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) ~[kafka-clients-3.6.1.jar:na]
… 12 common frames omitted

2024-02-18T23:51:21.924+05:30 ERROR 33240 — [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException’s directly; please consider configuring an ‘ErrorHandlingDeserializer’ in the value and/or key deserializer
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1934) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1365) ~[spring-kafka-3.1.1.jar:3.1.1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition checkdata-0 at offset 6. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:309) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1235) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) ~[kafka-clients-3.6.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1649) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1624) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1421) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1313) ~[spring-kafka-3.1.1.jar:3.1.1]
… 2 common frames omitted
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-6.1.3.jar:6.1.3]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:583) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) ~[kafka-clients-3.6.1.jar:na]
… 12 common frames omitted

2024-02-18T23:51:21.926+05:30 ERROR 33240 — [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException’s directly; please consider configuring an ‘ErrorHandlingDeserializer’ in the value and/or key deserializer
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1934) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1365) ~[spring-kafka-3.1.1.jar:3.1.1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition checkdata-0 at offset 6. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:309) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1235) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) ~[kafka-clients-3.6.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1649) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1624) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1421) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1313) ~[spring-kafka-3.1.1.jar:3.1.1]
… 2 common frames omitted
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-6.1.3.jar:6.1.3]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:583) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) ~[kafka-clients-3.6.1.jar:na]
… 12 common frames omitted

2024-02-18T23:51:21.930+05:30 ERROR 33240 — [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Consumer exception
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

java.lang.IllegalStateException: This error handler cannot process 'SerializationException’s directly; please consider configuring an ‘ErrorHandlingDeserializer’ in the value and/or key deserializer
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1934) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1365) ~[spring-kafka-3.1.1.jar:3.1.1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition checkdata-0 at offset 6. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:309) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1235) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) ~[kafka-clients-3.6.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1649) ~[spring-kafka-3.1.1.jar:3.1.1]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition checkdata-0 at offset 6. If needed, please seek past the record to continue consumption.

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1624) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1421) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1313) ~[spring-kafka-3.1.1.jar:3.1.1]
... 2 common frames omitted

Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-6.1.3.jar:6.1.3]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:583) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) ~[kafka-clients-3.6.1.jar:na]
… 12 common frames omitted

2024-02-18T23:51:21.933+05:30 ERROR 33240 — [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Consumer exception
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

java.lang.IllegalStateException: This error handler cannot process 'SerializationException’s directly; please consider configuring an ‘ErrorHandlingDeserializer’ in the value and/or key deserializer
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1934) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1365) ~[spring-kafka-3.1.1.jar:3.1.1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition checkdata-0 at offset 6. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:309) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1235) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) ~[kafka-clients-3.6.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1649) ~[spring-kafka-3.1.1.jar:3.1.1]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition checkdata-0 at offset 6. If needed, please seek past the record to continue consumption.

I am getting below error repeatedly.

Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition checkdata-0 at offset 6. If needed, please seek past the record to continue consumption.

Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

Please, help to check and provide me a solution. Thanks.