Hey Folks,
I have a stream listener that listens to a Kafka Rest Proxy for payload. Using postman, I am sending JSON messages to my consumer.
The consumer listens to this:
@StreamListener(target = EventStreams.INPUT)
public void process(KStream<?, String> input) {
String temp = input.toString();log.info(temp); KTable<String, Long> count = input .flatMapValues(value -> Arrays.asList(pattern.split(value.toString()))).groupBy((key, word) -> word).count(); count.foreach(new ForeachAction<String, Long>() { @Override public void apply(String key, Long value) { System.out.println(key + ":" + value); } }); }
Also,
the application.yml is:
spring.task.execution.shutdown.await-termination: true
spring.task.execution.shutdown.await-termination-period: 300
logging:
level:
org.springframework.data.solr: DEBUG
org.apache.solr: DEBUG
org.springframework.security: ERROR
com.copart: DEBUG
management:
endpoints:
web:
exposure:
include: ‘*’
server:
port: 11016
error:
include-message: always
include-binding-errors: alwayssecurity:
oauth2:
resource:
jwt:
keyUri: http://c-auth-qa4.copart.com/employee/oauth/token_key
mybatis:
configuration:
map-underscore-to-camel-case: true
copart.active-dc: rn
router:
tableMapper.eventLogs: event_logs
tableMapper.callEvents: call_events
tableMapper.callheader: call_header
tableMapper.callevents: call_events
tableMapper.calldetails: call_details
tableMapper.calldetailstp: call_details_tp
tableMapper.callrecordings: call_recordings
tableMapper.taskevents: task_events
tableMapper.taskdetails: task_details
tableMapper.workerevents: worker_events
tableMapper.workerdetails: worker_details
topicMapper.calllogs: ‘${search_config.topic_prefix}solr_eventlogs’
collections:
calllogs: ‘calllogs’
dbRetryTemplate:
retryWaitTime: 1000
numOfRetries: 3
search_config:
retryWaitTime: 1000
numOfRetries: 3spring:
profiles: kubernetes
logging.config:server:
port: 8080
tomcat:
basedir: /tmp/
access-log-enabled: false
eureka:
client:
enabled: false
registerWithEureka: false
instance:
preferIpAddress: true
instanceId: ${spring.cloud.client.ipAddress}
healthCheckUrl: http://${spring.cloud.client.ipAddress}:${management.port}${management.context-path}/health
statusPageUrl: http://${spring.cloud.client.ipAddress}:${management.port}${management.context-path}/infomanagement:
port: 8081
security:
enabled: falsespring:
profiles: c-dev4
host-dc: rn
datasource:
jdbc-url: jdbc:mariadb://c-mdb-devdb.copart.com/miphone_events?useUnicode=yes&characterEncoding=UTF-8&noAccessToProcedureBodies=true
driverClassName: org.mariadb.jdbc.Driver
username: miphone_app_usr
password: 892sw=zibR?Y
cloud:
config:
allowOverride: true
overrideNone: true
overrideSystemProperties: false
refresh.extra-refreshable:
- javax.sql.DataSource
stream:
kafka:
binder:
brokers: c-kafka-qa4.copart.com:9092
configuration:
default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.value.default.type: com.copart.mwa.Avro.Account
spring.json.key.default.type: java.util.UUID
security.protocol: SASL_SSL
sasl.mechanism: SCRAM-SHA-256
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username=“test-qa-consumer” password=“Dert@101!cat”;
ssl.endpoint.identification.algorithm:
default:
consumer:
autoCommitOnError: true
bindings:
twilio-events-in:
destination: testqa2
contentType: application/json
group: test-consumer2
consumer:
max-attempts: 1
kafka:
streams:
binder.configuration:
default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
# spring.json.value.default.type: com.copart.mwa.Avro.Account
# spring.json.key.default.type: java.util.
producer:
key-serializer: org.springframework.kafka.support.serializer.JsonSerde
value-serializer: org.springframework.kafka.support.serializer.JsonSerde
acks: all
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 1
reconnect.backoff.max.ms: 20000 # default 1000
reconnect.backoff.ms: 10000 # default 50
properties:
bootstrap.servers: c-kafka-qa4.copart.com:9092
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username=“test-qa-producer” password=“Dert@101!cat”;
sasl.mechanism: SCRAM-SHA-256
security:
protocol: SASL_SSL
ssl.endpoint.identification.algorithm:
retries: 5
search_config:
topic_prefix: dev4_
I am getting an error on
2021-08-03 18:12:55.466 ERROR 13520 — [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binding.StreamListenerMessageHandler@652a1a17]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of
org.apache.kafka.streams.kstream.KStream
(no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information at [Source: (byte)"“WorkerActivityName=Available&EventType=worker.activity.update&ResourceType=worker&WorkerTimeInPreviousActivityMs=237&Timestamp=1626114642&WorkerActivitySid=WAc9030ef021bc1786d3ae11544f4d9883&WorkerPreviousActivitySid=WAf4feb231e97c1878fecc58b26fdb95f3&WorkerTimeInPreviousActivity=0&AccountSid=AC8c5cd8c9ba538090da104b26d68a12ec&WorkerName=Dorothy.Finegan%40Copart.Com&Sid=EV284c8a8bc27480e40865263f0b42e5cf&TimestampMs=1626114642204&WorkerSid=WKe638256376188fab2a98cccb3c803d67&WorkspaceSid=WS38b10”[truncated 1332 bytes]; line: 1, column: 1], failedMessage=GenericMessage [payload=byte[1832], headers={kafka_offset=1204, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6910caa, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=[B@23167522, kafka_receivedPartitionId=0, kafka_receivedTopic=testqa2, kafka_receivedTimestamp=1628028774502, contentType=application/json, __TypeId__=[B@1a0545bd, kafka_groupId=test-consumer2}]
I am best guessing there is a problem with the way I am reading the JSON payload and handling it.
Please can you’ll help?