I am using Confluent Basic cloud subscription in conjunction with Apache Nifi.
Publishing from Nifi to topics in the cloud works fine without problems.
However, consuming the same topics doesn’t: it seems that the Confluent servers just close the connection when the Nifi-consumer tries to join the topic’s consumer group:
Join group failed with org.apache.kafka.common.errors.DisconnectException
(See logs below)
Does anyone have any idea what could be the problem?
As a side note, other Java clients can consume just fine (comparing the clients’ configuration hasn’t shown any important difference, though)
I appreciate any help anyone cares to offer - thanks
This is Nifi’s consumer config (Nifi uses kafka-clients v2.6.0):
04:17:28,886 INFO o.a.k.clients.consumer.ConsumerConfig ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [xxx.westeurope.azure.confluent.cloud:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = ...
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 180000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = ...
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 10000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = PLAIN
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /opt/nifi/nifi-current/conf/kafka.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = PKCS12
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
These are the logs:
04:17:29,650 DEBUG o.a.kafka.clients.consumer.KafkaConsumer [...] Kafka consumer initialized
04:17:29,651 INFO o.a.kafka.clients.consumer.KafkaConsumer [...] Subscribed to topic(s): Mcc-TallAnalyticsChallengeConfiguration-v1
04:17:29,653 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Sending FindCoordinator request to broker xxx.westeurope.azure.confluent.cloud:9092 (id: -1 rack: null)
04:17:30,418 DEBUG org.apache.kafka.clients.NetworkClient [...] Initiating connection to node xxx.westeurope.azure.confluent.cloud:9092 (id: -1 rack: null) using address xxx.westeurope.azure.confluent.cloud/51.105.121.2
04:17:30,427 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to SEND_APIVERSIONS_REQUEST
04:17:30,432 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Creating SaslClient: client=null;service=kafka;serviceHostname=xxx.westeurope.azure.confluent.cloud;mechs=[PLAIN]
04:17:30,458 DEBUG org.apache.kafka.common.network.Selector [...] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
04:17:30,469 DEBUG org.apache.kafka.clients.NetworkClient [...] Completed connection to node -1. Fetching API versions.
04:18:30,515 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Coordinator discovery failed, refreshing metadata
04:19:30,523 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Sending FindCoordinator request to broker xxx.westeurope.azure.confluent.cloud:9092 (id: -1 rack: null)
04:20:30,539 DEBUG o.a.k.common.network.SslTransportLayer [SslTransportLayer channelId=-1 key=sun.nio.ch.SelectionKeyImpl@2360556e] SSL handshake completed successfully with peerHost 'xxx.westeurope.azure.confluent.cloud' peerPort 9092 peerPrincipal 'CN=*.westeurope.azure.confluent.cloud' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256'
04:20:30,679 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
04:20:30,679 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Coordinator discovery failed, refreshing metadata
04:21:30,681 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Sending FindCoordinator request to broker xxx.westeurope.azure.confluent.cloud:9092 (id: -1 rack: null)
04:21:30,694 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to SEND_HANDSHAKE_REQUEST
04:21:30,695 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
04:22:30,697 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to INITIAL
04:22:30,701 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to INTERMEDIATE
04:22:30,702 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Coordinator discovery failed, refreshing metadata
04:23:30,708 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Sending FindCoordinator request to broker xxx.westeurope.azure.confluent.cloud:9092 (id: -1 rack: null)
04:23:30,710 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to COMPLETE
04:23:30,711 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Finished authentication with no session expiration and no session re-authentication
04:23:30,712 DEBUG org.apache.kafka.common.network.Selector [...] Successfully authenticated with xxx.westeurope.azure.confluent.cloud/51.105.121.2
04:23:30,713 DEBUG org.apache.kafka.clients.NetworkClient [...] Initiating API versions fetch from node -1.
04:23:30,713 DEBUG org.apache.kafka.clients.NetworkClient [...] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=..., correlationId=4) and timeout 30000 to node -1: {client_software_name=apache-kafka-java,client_software_version=2.6.0,_tagged_fields={}}
04:24:30,720 DEBUG org.apache.kafka.clients.NetworkClient [...] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=..., correlationId=4): org.apache.kafka.common.requests.ApiVersionsResponse@16a88f9d
04:24:30,726 DEBUG org.apache.kafka.clients.NetworkClient [...] Recorded API versions for node -1: (Produce(0): 0 to 9 [usable: 8], Fetch(1): 0 to 12 [usable: 11], ListOffsets(2): 0 to 7 [usable: 5], Metadata(3): 0 to 11 [usable: 9], LeaderAndIsr(4): 0 to 5 [usable: 4], StopReplica(5): 0 to 3 [usable: 3], UpdateMetadata(6): 0 to 7 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 5], DeleteTopics(20): 0 to 6 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 3], OffsetForLeaderEpoch(23): 0 to 4 [usable: 3], AddPartitionsToTxn(24): 0 to 3 [usable: 1], AddOffsetsToTxn(25): 0 to 3 [usable: 1], EndTxn(26): 0 to 3 [usable: 1], WriteTxnMarkers(27): 0 to 1 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 3], AlterConfigs(33): 0 to 2 [usable: 1], AlterReplicaLogDirs(34): 0 to 2 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 2], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 0], AlterClientQuotas(49): 0 to 1 [usable: 0], UNKNOWN(50): 0, UNKNOWN(51): 0, UNKNOWN(56): 0, UNKNOWN(57): 0, UNKNOWN(60): 0, UNKNOWN(61): 0, UNKNOWN(65): 0, UNKNOWN(66): 0, UNKNOWN(67): 0, UNKNOWN(10000): 0 to 3, UNKNOWN(10001): 0 to 1, UNKNOWN(10002): 0 to 1, UNKNOWN(10003): 0 to 3, UNKNOWN(10004): 0 to 1, UNKNOWN(10005): 0, UNKNOWN(10006): 0 to 3, UNKNOWN(10007): 0 to 2, UNKNOWN(10008): 0 to 1, UNKNOWN(10009): 0 to 2, UNKNOWN(10010): 0, UNKNOWN(10011): 0, UNKNOWN(10012): 0, UNKNOWN(10013): 0 to 1, UNKNOWN(10014): 0 to 1, UNKNOWN(10015): 0, UNKNOWN(10016): 0)
04:24:30,726 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Coordinator discovery failed, refreshing metadata
04:24:30,732 DEBUG org.apache.kafka.clients.NetworkClient [...] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='Mcc-TallAnalyticsChallengeConfiguration-v1')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node xxx.westeurope.azure.confluent.cloud:9092 (id: -1 rack: null)
04:24:30,733 DEBUG org.apache.kafka.clients.NetworkClient [...] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=..., correlationId=5) and timeout 30000 to node -1: {topics=[{name=Mcc-TallAnalyticsChallengeConfiguration-v1,_tagged_fields={}}],allow_auto_topic_creation=true,include_cluster_authorized_operations=false,include_topic_authorized_operations=false,_tagged_fields={}}
04:25:30,737 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Sending FindCoordinator request to broker xxx.westeurope.azure.confluent.cloud:9092 (id: -1 rack: null)
04:25:30,738 DEBUG org.apache.kafka.clients.NetworkClient [...] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=..., correlationId=6) and timeout 30000 to node -1: {key=...,key_type=0,_tagged_fields={}}
04:25:30,742 DEBUG org.apache.kafka.clients.NetworkClient [...] Received METADATA response from node -1 for request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=..., correlationId=5): org.apache.kafka.common.requests.MetadataResponse@45a5d427
04:25:30,752 DEBUG org.apache.kafka.clients.Metadata [...] Updating last seen epoch for partition Mcc-TallAnalyticsChallengeConfiguration-v1-0 from null to epoch 11 from new metadata
04:25:30,752 DEBUG org.apache.kafka.clients.Metadata [...] Updating last seen epoch for partition Mcc-TallAnalyticsChallengeConfiguration-v1-1 from null to epoch 8 from new metadata
04:25:30,756 INFO org.apache.kafka.clients.Metadata [...] Cluster ID: xxx
04:25:30,756 DEBUG org.apache.kafka.clients.Metadata [...] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='lkc-pvjm5', nodes={0=b0-xxx.westeurope.azure.confluent.cloud:9092 (id: 0 rack: 0), 1=b1-xxx.westeurope.azure.confluent.cloud:9092 (id: 1 rack: 1), 2=b2-xxx.westeurope.azure.confluent.cloud:9092 (id: 2 rack: 2), 3=b3-xxx.westeurope.azure.confluent.cloud:9092 (id: 3 rack: 0), 4=b4-xxx.westeurope.azure.confluent.cloud:9092 (id: 4 rack: 1), 5=b5-xxx.westeurope.azure.confluent.cloud:9092 (id: 5 rack: 2), 6=b6-xxx.westeurope.azure.confluent.cloud:9092 (id: 6 rack: 0), 7=b7-xxx.westeurope.azure.confluent.cloud:9092 (id: 7 rack: 1), 8=b8-xxx.westeurope.azure.confluent.cloud:9092 (id: 8 rack: 2), 9=b9-xxx.westeurope.azure.confluent.cloud:9092 (id: 9 rack: 0), 10=b10-xxx.westeurope.azure.confluent.cloud:9092 (id: 10 rack: 1), 11=b11-xxx.westeurope.azure.confluent.cloud:9092 (id: 11 rack: 2), 12=b12-xxx.westeurope.azure.confluent.cloud:9092 (id: 12 rack: 0), 13=b13-xxx.westeurope.azure.confluent.cloud:9092 (id: 13 rack: 1), 14=b14-xxx.westeurope.azure.confluent.cloud:9092 (id: 14 rack: 2), 15=b15-xxx.westeurope.azure.confluent.cloud:9092 (id: 15 rack: 0), 16=b16-xxx.westeurope.azure.confluent.cloud:9092 (id: 16 rack: 1), 17=b17-xxx.westeurope.azure.confluent.cloud:9092 (id: 17 rack: 2)}, partitions=[PartitionMetadata(error=NONE, partition=Mcc-TallAnalyticsChallengeConfiguration-v1-0, leader=Optional[0], leaderEpoch=Optional[11], replicas=0,8,4, isr=8,4,0, offlineReplicas=), PartitionMetadata(error=NONE, partition=Mcc-TallAnalyticsChallengeConfiguration-v1-1, leader=Optional[10], leaderEpoch=Optional[8], replicas=10,15,17, isr=17,15,10, offlineReplicas=)], controller=b10-xxx.westeurope.azure.confluent.cloud:9092 (id: 10 rack: 1)}
04:25:30,759 DEBUG org.apache.kafka.clients.NetworkClient [...] Received FIND_COORDINATOR response from node -1 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=..., correlationId=6): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=14, host='b14-xxx.westeurope.azure.confluent.cloud', port=9092)
04:25:30,760 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Received FindCoordinator response ClientResponse(receivedTimeMs=1634185530759, latencyMs=21, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=..., correlationId=6), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=14, host='b14-xxx.westeurope.azure.confluent.cloud', port=9092))
04:25:30,760 INFO o.a.k.c.c.internals.AbstractCoordinator [...] Discovered group coordinator b14-xxx.westeurope.azure.confluent.cloud:9092 (id: 2147483633 rack: null)
04:25:30,787 DEBUG org.apache.kafka.clients.NetworkClient [...] Initiating connection to node b14-xxx.westeurope.azure.confluent.cloud:9092 (id: 2147483633 rack: null) using address b14-xxx.westeurope.azure.confluent.cloud/20.50.15.71
04:25:30,788 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to SEND_APIVERSIONS_REQUEST
04:25:30,788 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Creating SaslClient: client=null;service=kafka;serviceHostname=b14-xxx.westeurope.azure.confluent.cloud;mechs=[PLAIN]
04:26:30,794 DEBUG o.a.k.c.c.internals.ConsumerCoordinator [...] Executing onJoinPrepare with generation -1 and memberId
04:26:30,794 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Disabling heartbeat thread
04:26:30,794 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Heartbeat thread started
04:26:30,796 INFO o.a.k.c.c.internals.AbstractCoordinator [...] (Re-)joining group
04:26:30,798 DEBUG o.a.k.c.c.internals.ConsumerCoordinator [...] Joining group with current subscription: [Mcc-TallAnalyticsChallengeConfiguration-v1]
04:26:30,801 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Sending JoinGroup (JoinGroupRequestData(groupId='...', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 42, 77, 99, 99, 45, 84, 97, 108, 108, 65, 110, 97, 108, 121, 116, 105, 99, 115, 67, 104, 97, 108, 108, 101, 110, 103, 101, 67, 111, 110, 102, 105, 103, 117, 114, 97, 116, 105, 111, 110, 45, 118, 49, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator b14-xxx.westeurope.azure.confluent.cloud:9092 (id: 2147483633 rack: null)
04:26:30,806 DEBUG org.apache.kafka.common.network.Selector [...] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147483633
04:26:30,831 DEBUG org.apache.kafka.clients.NetworkClient [...] Completed connection to node 2147483633. Fetching API versions.
04:29:30,859 DEBUG o.a.k.common.network.SslTransportLayer [SslTransportLayer channelId=2147483633 key=sun.nio.ch.SelectionKeyImpl@42c46fba] SSL handshake completed successfully with peerHost 'b14-xxx.westeurope.azure.confluent.cloud' peerPort 9092 peerPrincipal 'CN=*.westeurope.azure.confluent.cloud' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256'
04:29:30,860 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
04:29:30,860 DEBUG org.apache.kafka.common.network.Selector [...] Connection with xxx.westeurope.azure.confluent.cloud/51.105.121.2 disconnected
java.io.EOFException: null
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:619)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:95)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:182)
at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6.onTrigger(ConsumeKafkaRecord_2_6.java:472)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
04:29:30,863 DEBUG org.apache.kafka.clients.NetworkClient [...] Node -1 disconnected.
04:29:30,865 DEBUG org.apache.kafka.clients.NetworkClient [...] Initialize connection to node b8-xxx.westeurope.azure.confluent.cloud:9092 (id: 8 rack: 2) for sending metadata request
04:29:30,891 DEBUG org.apache.kafka.clients.NetworkClient [...] Initiating connection to node b8-xxx.westeurope.azure.confluent.cloud:9092 (id: 8 rack: 2) using address b8-xxx.westeurope.azure.confluent.cloud/51.105.115.202
04:29:30,892 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to SEND_APIVERSIONS_REQUEST
04:29:30,892 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Creating SaslClient: client=null;service=kafka;serviceHostname=b8-xxx.westeurope.azure.confluent.cloud;mechs=[PLAIN]
04:29:30,895 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to SEND_HANDSHAKE_REQUEST
04:29:30,896 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
04:30:30,898 DEBUG org.apache.kafka.common.network.Selector [...] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 8
04:30:30,906 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to INITIAL
04:30:30,907 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to INTERMEDIATE
04:30:30,908 DEBUG org.apache.kafka.clients.NetworkClient [...] Completed connection to node 8. Fetching API versions.
04:31:30,919 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to COMPLETE
04:31:30,919 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Finished authentication with no session expiration and no session re-authentication
04:31:30,919 DEBUG org.apache.kafka.common.network.Selector [...] Successfully authenticated with b14-xxx.westeurope.azure.confluent.cloud/20.50.15.71
04:31:30,921 DEBUG org.apache.kafka.clients.NetworkClient [...] Initiating API versions fetch from node 2147483633.
04:31:30,921 DEBUG org.apache.kafka.clients.NetworkClient [...] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=..., correlationId=8) and timeout 30000 to node 2147483633: {client_software_name=apache-kafka-java,client_software_version=2.6.0,_tagged_fields={}}
04:32:30,924 DEBUG org.apache.kafka.clients.NetworkClient [...] Received API_VERSIONS response from node 2147483633 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=..., correlationId=8): org.apache.kafka.common.requests.ApiVersionsResponse@5eeec2b9
04:32:30,924 DEBUG org.apache.kafka.clients.NetworkClient [...] Recorded API versions for node 2147483633: (Produce(0): 0 to 9 [usable: 8], Fetch(1): 0 to 12 [usable: 11], ListOffsets(2): 0 to 7 [usable: 5], Metadata(3): 0 to 11 [usable: 9], LeaderAndIsr(4): 0 to 5 [usable: 4], StopReplica(5): 0 to 3 [usable: 3], UpdateMetadata(6): 0 to 7 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 5], DeleteTopics(20): 0 to 6 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 3], OffsetForLeaderEpoch(23): 0 to 4 [usable: 3], AddPartitionsToTxn(24): 0 to 3 [usable: 1], AddOffsetsToTxn(25): 0 to 3 [usable: 1], EndTxn(26): 0 to 3 [usable: 1], WriteTxnMarkers(27): 0 to 1 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 3], AlterConfigs(33): 0 to 2 [usable: 1], AlterReplicaLogDirs(34): 0 to 2 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 2], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 0], AlterClientQuotas(49): 0 to 1 [usable: 0], UNKNOWN(50): 0, UNKNOWN(51): 0, UNKNOWN(56): 0, UNKNOWN(57): 0, UNKNOWN(60): 0, UNKNOWN(61): 0, UNKNOWN(65): 0, UNKNOWN(66): 0, UNKNOWN(67): 0, UNKNOWN(10000): 0 to 3, UNKNOWN(10001): 0 to 1, UNKNOWN(10002): 0 to 1, UNKNOWN(10003): 0 to 3, UNKNOWN(10004): 0 to 1, UNKNOWN(10005): 0, UNKNOWN(10006): 0 to 3, UNKNOWN(10007): 0 to 2, UNKNOWN(10008): 0 to 1, UNKNOWN(10009): 0 to 2, UNKNOWN(10010): 0, UNKNOWN(10011): 0, UNKNOWN(10012): 0, UNKNOWN(10013): 0 to 1, UNKNOWN(10014): 0 to 1, UNKNOWN(10015): 0, UNKNOWN(10016): 0)
04:32:30,926 INFO o.a.k.c.c.internals.AbstractCoordinator [...] Join group failed with org.apache.kafka.common.errors.TimeoutException: Failed to send request after 305000 ms.
04:32:30,926 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Disabling heartbeat thread
04:32:30,927 INFO o.a.k.c.c.internals.AbstractCoordinator [...] (Re-)joining group
04:32:30,927 DEBUG o.a.k.c.c.internals.ConsumerCoordinator [...] Joining group with current subscription: [Mcc-TallAnalyticsChallengeConfiguration-v1]
04:32:30,929 DEBUG o.a.k.c.c.internals.AbstractCoordinator [...] Sending JoinGroup (JoinGroupRequestData(groupId='...', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 42, 77, 99, 99, 45, 84, 97, 108, 108, 65, 110, 97, 108, 121, 116, 105, 99, 115, 67, 104, 97, 108, 108, 101, 110, 103, 101, 67, 111, 110, 102, 105, 103, 117, 114, 97, 116, 105, 111, 110, 45, 118, 49, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator b14-xxx.westeurope.azure.confluent.cloud:9092 (id: 2147483633 rack: null)
04:33:30,935 DEBUG o.a.k.common.network.SslTransportLayer [SslTransportLayer channelId=8 key=sun.nio.ch.SelectionKeyImpl@12b28b1b] SSL handshake completed successfully with peerHost 'b8-xxx.westeurope.azure.confluent.cloud' peerPort 9092 peerPrincipal 'CN=*.westeurope.azure.confluent.cloud' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256'
04:33:30,938 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
04:34:30,943 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to SEND_HANDSHAKE_REQUEST
04:34:30,943 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
04:34:30,959 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to INITIAL
04:34:30,961 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to INTERMEDIATE
04:35:30,964 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Set SASL client state to COMPLETE
04:35:30,964 DEBUG o.a.k.c.s.a.SaslClientAuthenticator [...] Finished authentication with no session expiration and no session re-authentication
04:35:30,965 DEBUG org.apache.kafka.common.network.Selector [...] Successfully authenticated with b8-xxx.westeurope.azure.confluent.cloud/51.105.115.202
04:35:30,965 DEBUG org.apache.kafka.common.network.Selector [...] Connection with b14-xxx.westeurope.azure.confluent.cloud/20.50.15.71 disconnected
java.io.EOFException: null
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:619)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:95)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:182)
at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6.onTrigger(ConsumeKafkaRecord_2_6.java:472)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
04:35:30,967 DEBUG org.apache.kafka.clients.NetworkClient [...] Node 2147483633 disconnected.
04:35:30,968 DEBUG org.apache.kafka.clients.NetworkClient [...] Initiating API versions fetch from node 8.
04:35:30,968 DEBUG org.apache.kafka.clients.NetworkClient [...] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=..., correlationId=10) and timeout 30000 to node 8: {client_software_name=apache-kafka-java,client_software_version=2.6.0,_tagged_fields={}}
04:35:30,968 DEBUG o.a.k.c.c.i.ConsumerNetworkClient [...] Cancelled request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=7, clientId=..., correlationId=9) due to node 2147483633 being disconnected
04:35:30,969 INFO o.a.k.c.c.internals.AbstractCoordinator [...] Group coordinator b14-xxx.westeurope.azure.confluent.cloud:9092 (id: 2147483633 rack: null) is unavailable or invalid, will attempt rediscovery
04:35:30,969 INFO o.a.k.c.c.internals.AbstractCoordinator [...] Join group failed with org.apache.kafka.common.errors.DisconnectException