KStreams - Message Handler Exception

Hey Folks,

I have a stream listener that listens to a Kafka Rest Proxy for payload. Using postman, I am sending JSON messages to my consumer.

The consumer listens to this:

@StreamListener(target = EventStreams.INPUT)
public void process(KStream<?, String> input) {
String temp = input.toString();

    log.info(temp);
    KTable<String, Long> count = input
            .flatMapValues(value -> Arrays.asList(pattern.split(value.toString()))).groupBy((key, word) -> word).count();


    count.foreach(new ForeachAction<String, Long>() {
        @Override
        public void apply(String key, Long value) {
            System.out.println(key + ":" + value);
        }
    });
}

Also,

the application.yml is:

spring.task.execution.shutdown.await-termination: true
spring.task.execution.shutdown.await-termination-period: 300
logging:
level:
org.springframework.data.solr: DEBUG
org.apache.solr: DEBUG
org.springframework.security: ERROR
com.copart: DEBUG
management:
endpoints:
web:
exposure:
include: ‘*’
server:
port: 11016
error:
include-message: always
include-binding-errors: always

security:
oauth2:
resource:
jwt:
keyUri: http://c-auth-qa4.copart.com/employee/oauth/token_key
mybatis:
configuration:
map-underscore-to-camel-case: true
copart.active-dc: rn
router:
tableMapper.eventLogs: event_logs
tableMapper.callEvents: call_events
tableMapper.callheader: call_header
tableMapper.callevents: call_events
tableMapper.calldetails: call_details
tableMapper.calldetailstp: call_details_tp
tableMapper.callrecordings: call_recordings
tableMapper.taskevents: task_events
tableMapper.taskdetails: task_details
tableMapper.workerevents: worker_events
tableMapper.workerdetails: worker_details
topicMapper.calllogs: ‘${​​​​​​​​search_config.topic_prefix}​​​​​​​​solr_eventlogs’
collections:
calllogs: ‘calllogs’
dbRetryTemplate:
retryWaitTime: 1000
numOfRetries: 3
search_config:
retryWaitTime: 1000
numOfRetries: 3

spring:
profiles: kubernetes
logging.config:

server:
port: 8080
tomcat:
basedir: /tmp/
access-log-enabled: false
eureka:
client:
enabled: false
registerWithEureka: false
instance:
preferIpAddress: true
instanceId: ${​​​​​​​​spring.cloud.client.ipAddress}​​​​​​​​
healthCheckUrl: http://${​​​​​​​​spring.cloud.client.ipAddress}​​​​​​​​:${​​​​​​​​management.port}​​​​​​​​${​​​​​​​​management.context-path}​​​​​​​​/health
statusPageUrl: http://${​​​​​​​​spring.cloud.client.ipAddress}​​​​​​​​:${​​​​​​​​management.port}​​​​​​​​${​​​​​​​​management.context-path}​​​​​​​​/info

management:
port: 8081
security:
enabled: false

spring:
profiles: c-dev4
host-dc: rn
datasource:
jdbc-url: jdbc:mariadb://c-mdb-devdb.copart.com/miphone_events?useUnicode=yes&characterEncoding=UTF-8&noAccessToProcedureBodies=true
driverClassName: org.mariadb.jdbc.Driver
username: miphone_app_usr
password: 892sw=zibR?Y
cloud:
config:
allowOverride: true
overrideNone: true
overrideSystemProperties: false
refresh.extra-refreshable:
- javax.sql.DataSource
stream:
kafka:
binder:
brokers: c-kafka-qa4.copart.com:9092
configuration:
default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.value.default.type: com.copart.mwa.Avro.Account
spring.json.key.default.type: java.util.UUID
security.protocol: SASL_SSL
sasl.mechanism: SCRAM-SHA-256
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username=“test-qa-consumer” password=“Dert@101!cat”;
ssl.endpoint.identification.algorithm:
default:
consumer:
autoCommitOnError: true
bindings:
twilio-events-in:
destination: testqa2
contentType: application/json
group: test-consumer2
consumer:
max-attempts: 1
kafka:
streams:
binder.configuration:
default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
# spring.json.value.default.type: com.copart.mwa.Avro.Account
# spring.json.key.default.type: java.util.
producer:
key-serializer: org.springframework.kafka.support.serializer.JsonSerde
value-serializer: org.springframework.kafka.support.serializer.JsonSerde
acks: all
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 1
reconnect.backoff.max.ms: 20000 # default 1000
reconnect.backoff.ms: 10000 # default 50
properties:
bootstrap.servers: c-kafka-qa4.copart.com:9092
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username=“test-qa-producer” password=“Dert@101!cat”;
sasl.mechanism: SCRAM-SHA-256
security:
protocol: SASL_SSL
ssl.endpoint.identification.algorithm:
retries: 5
search_config:
topic_prefix: dev4_

I am getting an error on

2021-08-03 18:12:55.466 ERROR 13520 — [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binding.StreamListenerMessageHandler@652a1a17]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of org.apache.kafka.streams.kstream.KStream (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information at [Source: (byte)"“WorkerActivityName=Available&EventType=worker.activity.update&ResourceType=worker&WorkerTimeInPreviousActivityMs=237&Timestamp=1626114642&WorkerActivitySid=WAc9030ef021bc1786d3ae11544f4d9883&WorkerPreviousActivitySid=WAf4feb231e97c1878fecc58b26fdb95f3&WorkerTimeInPreviousActivity=0&AccountSid=AC8c5cd8c9ba538090da104b26d68a12ec&WorkerName=Dorothy.Finegan%40Copart.Com&Sid=EV284c8a8bc27480e40865263f0b42e5cf&TimestampMs=1626114642204&WorkerSid=WKe638256376188fab2a98cccb3c803d67&WorkspaceSid=WS38b10”[truncated 1332 bytes]; line: 1, column: 1], failedMessage=GenericMessage [payload=byte[1832], headers={kafka_offset=1204, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6910caa, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=[B@23167522, kafka_receivedPartitionId=0, kafka_receivedTopic=testqa2, kafka_receivedTimestamp=1628028774502, contentType=application/json, __TypeId__=[B@1a0545bd, kafka_groupId=test-consumer2}]

I am best guessing there is a problem with the way I am reading the JSON payload and handling it.

Please can you’ll help?

Not sure, but I am wondering about your code: KTable#foreach() should not exist, but you would need to call KTable#toStream()#foreach() instead?

If you use KTable#toStream()#foreach() → How would you store it as a KTable?

Not sure if I understand the question? – I am also not familiar with the magic Spring does…