Hello Folks,
I am trying to create a KTable by consuming a topic. After that I want to use KafkaStreams to build my topology (code down)
Properties streamsProperties = new Properties();
streamsProperties.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "k-table-application");
streamsProperties.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "c-kafka-qa4.copart.com:9092");
streamsProperties.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
streamsProperties.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProperties.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProperties.putIfAbsent("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test-qa-consumer\" password=\"asdasat\";");
streamsProperties.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsProperties.put(ConsumerConfig.CHECK_CRCS_CONFIG, "TRUE");
streamsProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "60000");
streamsProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "FALSE");
streamsProperties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "TRUE");
streamsProperties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800");
streamsProperties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
streamsProperties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
streamsProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer2");
streamsProperties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "null");
streamsProperties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
streamsProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "[]");
streamsProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
streamsProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class.getName()");
streamsProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576");
streamsProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
streamsProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
streamsProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "class org.apache.kafka.clients.consumer.RangeAssignor]");
streamsProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "StringDeserializer.class.getName()");
KTable<String, String> kTable = builder.table("testqa2",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("queryableStoreName"));
KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
streams.start();
But this line throws an error saying -
KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
2021-07-27 17:46:35.518 INFO 4196 --- [ test-consumer2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-consumer2-2, groupId=test-consumer2] Group coordinator rnqckf402.corp.copart.com:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
2021-07-27 17:46:37.579 INFO 4196 --- [ test-consumer2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-test-consumer2-2, groupId=test-consumer2] Discovered group coordinator rnqckf402.corp.copart.com:9092 (id: 2147483645 rack: null)
2021-07-27 17:46:41.977 ERROR 4196 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Stopping container due to an Error
java.lang.NoSuchMethodError: 'void org.apache.kafka.common.metrics.Sensor.add(org.apache.kafka.common.MetricName, org.apache.kafka.common.metrics.MeasurableStat)'
at org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.<init>(StreamThread.java:505) ~[kafka-streams-1.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:653) ~[kafka-streams-1.0.0.jar:na]
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:652) ~[kafka-streams-1.0.0.jar:na]
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:506) ~[kafka-streams-1.0.0.jar:na]
at com.copart.mwa.service.TwilioEventsListener.twilioKTable(TwilioEventsListener.java:134) ~[classes/:na]
at com.copart.mwa.service.TwilioEventsListener.processTwilioEvents(TwilioEventsListener.java:83) ~[classes/:na]
at com.copart.mwa.service.TwilioEventsListener$$FastClassBySpringCGLIB$$98f1412b.invoke(<generated>) ~[classes/:na]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:88) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at com.copart.mwa.aop.LoggerAspect.logRequestsForData(LoggerAspect.java:41) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) ~[spring-aop-5.2.3.RELEASE.jar:5.2.3.RELEASE]
Please can you guide me? I’m stuck