Use Confluent Platform all-in-one Docker Compose file to start up the environment.
docker-compose up -d rest-proxy
Using REST Proxy API with curl
to prepare data:
- create a topic
test1
with 3 partitions. - produce 3 records to this topic
docker-compose exec rest-proxy curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"key":"alice","value":{"count":0}},{"key":"lily","value":{"count":1}},{"key":"lucy","value":{"count":2}}]}' "http://localhost:8082/topics/test1" | jq .
Using REST Proxy API in java with the following steps:
- get clusterId with
GET /clusters
- create a consumer in a consumer group with
POST /consumers/(string:group_name)
The Request JSON Object:
JsonObject requestBody = new JsonObject()
.put("format", "json")
.put("auto.offset.reset", "earliest")
.put("auto.commit.enable", "true");
- subscribe the consumer to the topic with
POST /consumers/(string:group_name)/instances/(string:instance)/subscription
- get the partition num with
GET /clusters/{cluster_id}/topics/{topic_name}/partitions
- fetch data for the topic with
GET /consumers/(string:group_name)/instances/(string:instance)/records
Got the records as expected:
[ {
"topic" : "test1",
"key" : "alice",
"value" : {
"count" : 0
},
"partition" : 0,
"offset" : 0
}, {
"topic" : "test1",
"key" : "lucy",
"value" : {
"count" : 2
},
"partition" : 0,
"offset" : 1
}, {
"topic" : "test1",
"key" : "lily",
"value" : {
"count" : 1
},
"partition" : 1,
"offset" : 0
} ]
- fetch data for the topic with
GET /consumers/(string:group_name)/instances/(string:instance)/records
again
Got no record as expected:
[ ]
- get the last committed offsets for the given partitions with
GET /consumers/(string:group_name)/instances/(string:instance)/offsets
Got the following surprised me:
{
"offsets": []
}
I found messages from log Found no committed offset for partition test1-0/1/2 while getting records the first time and getting offsets. Why did this happen? Is there something wrong in my steps?
logs from REST Proxy:
rest-proxy | [2021-09-10 07:09:00,721] INFO 172.23.0.1 - - [10/Sep/2021:07:09:00 +0000] "GET /v3/clusters/ HTTP/1.1" 200 1001 5 (io.confluent.rest-utils.requests)
rest-proxy | [2021-09-10 07:09:00,728] INFO ConsumerConfig values:
rest-proxy | allow.auto.create.topics = true
rest-proxy | auto.commit.interval.ms = 5000
rest-proxy | auto.offset.reset = earliest
rest-proxy | bootstrap.servers = [broker:29092]
rest-proxy | check.crcs = true
rest-proxy | client.dns.lookup = use_all_dns_ips
rest-proxy | client.id = consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27
rest-proxy | client.rack =
rest-proxy | connections.max.idle.ms = 540000
rest-proxy | default.api.timeout.ms = 60000
rest-proxy | enable.auto.commit = true
rest-proxy | exclude.internal.topics = true
rest-proxy | fetch.max.bytes = 52428800
rest-proxy | fetch.max.wait.ms = 500
rest-proxy | fetch.min.bytes = 1
rest-proxy | group.id = eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c
rest-proxy | group.instance.id = null
rest-proxy | heartbeat.interval.ms = 3000
rest-proxy | interceptor.classes = []
rest-proxy | internal.leave.group.on.close = true
rest-proxy | internal.throw.on.fetch.stable.offset.unsupported = false
rest-proxy | isolation.level = read_uncommitted
rest-proxy | key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
rest-proxy | max.partition.fetch.bytes = 1048576
rest-proxy | max.poll.interval.ms = 300000
rest-proxy | max.poll.records = 30
rest-proxy | metadata.max.age.ms = 300000
rest-proxy | metric.reporters = []
rest-proxy | metrics.num.samples = 2
rest-proxy | metrics.recording.level = INFO
rest-proxy | metrics.sample.window.ms = 30000
rest-proxy | partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
rest-proxy | receive.buffer.bytes = 65536
rest-proxy | reconnect.backoff.max.ms = 1000
rest-proxy | reconnect.backoff.ms = 50
rest-proxy | request.timeout.ms = 30000
rest-proxy | retry.backoff.ms = 100
rest-proxy | sasl.client.callback.handler.class = null
rest-proxy | sasl.jaas.config = null
rest-proxy | sasl.kerberos.kinit.cmd = /usr/bin/kinit
rest-proxy | sasl.kerberos.min.time.before.relogin = 60000
rest-proxy | sasl.kerberos.service.name = null
rest-proxy | sasl.kerberos.ticket.renew.jitter = 0.05
rest-proxy | sasl.kerberos.ticket.renew.window.factor = 0.8
rest-proxy | sasl.login.callback.handler.class = null
rest-proxy | sasl.login.class = null
rest-proxy | sasl.login.refresh.buffer.seconds = 300
rest-proxy | sasl.login.refresh.min.period.seconds = 60
rest-proxy | sasl.login.refresh.window.factor = 0.8
rest-proxy | sasl.login.refresh.window.jitter = 0.05
rest-proxy | sasl.mechanism = GSSAPI
rest-proxy | security.protocol = PLAINTEXT
rest-proxy | security.providers = null
rest-proxy | send.buffer.bytes = 131072
rest-proxy | session.timeout.ms = 10000
rest-proxy | socket.connection.setup.timeout.max.ms = 30000
rest-proxy | socket.connection.setup.timeout.ms = 10000
rest-proxy | ssl.cipher.suites = null
rest-proxy | ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
rest-proxy | ssl.endpoint.identification.algorithm = https
rest-proxy | ssl.engine.factory.class = null
rest-proxy | ssl.key.password = null
rest-proxy | ssl.keymanager.algorithm = SunX509
rest-proxy | ssl.keystore.certificate.chain = null
rest-proxy | ssl.keystore.key = null
rest-proxy | ssl.keystore.location = null
rest-proxy | ssl.keystore.password = null
rest-proxy | ssl.keystore.type = JKS
rest-proxy | ssl.protocol = TLSv1.3
rest-proxy | ssl.provider = null
rest-proxy | ssl.secure.random.implementation = null
rest-proxy | ssl.trustmanager.algorithm = PKIX
rest-proxy | ssl.truststore.certificates = null
rest-proxy | ssl.truststore.location = null
rest-proxy | ssl.truststore.password = null
rest-proxy | ssl.truststore.type = JKS
rest-proxy | value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
rest-proxy | (org.apache.kafka.clients.consumer.ConsumerConfig)
rest-proxy | [2021-09-10 07:09:00,732] WARN The configuration 'metrics.context.resource.cluster.id' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
rest-proxy | [2021-09-10 07:09:00,732] WARN The configuration 'metrics.context._namespace' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
rest-proxy | [2021-09-10 07:09:00,732] WARN The configuration 'metrics.context.resource.version' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
rest-proxy | [2021-09-10 07:09:00,732] WARN The configuration 'auto.register.schemas' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
rest-proxy | [2021-09-10 07:09:00,732] WARN The configuration 'metrics.context.resource.type' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
rest-proxy | [2021-09-10 07:09:00,732] WARN The configuration 'metrics.context.resource.commit.id' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
rest-proxy | [2021-09-10 07:09:00,732] WARN The configuration 'schema.registry.url' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
rest-proxy | [2021-09-10 07:09:00,732] WARN The configuration 'use.latest.version' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
rest-proxy | [2021-09-10 07:09:00,732] INFO Kafka version: 6.2.0-ce (org.apache.kafka.common.utils.AppInfoParser)
rest-proxy | [2021-09-10 07:09:00,732] INFO Kafka commitId: 5c753752ae1445a1 (org.apache.kafka.common.utils.AppInfoParser)
rest-proxy | [2021-09-10 07:09:00,732] INFO Kafka startTimeMs: 1631257740732 (org.apache.kafka.common.utils.AppInfoParser)
rest-proxy | [2021-09-10 07:09:00,734] INFO 172.23.0.1 - - [10/Sep/2021:07:09:00 +0000] "POST /consumers/eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c HTTP/1.1" 200 232 9 (io.confluent.rest-utils.requests)
rest-proxy | [2021-09-10 07:09:00,739] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Subscribed to topic(s): test1 (org.apache.kafka.clients.consumer.KafkaConsumer)
rest-proxy | [2021-09-10 07:09:00,741] INFO 172.23.0.1 - - [10/Sep/2021:07:09:00 +0000] "POST /consumers/eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c/instances/rest-consumer-31e2aa47-7388-4e14-9432-31e73242c8f3/subscription/ HTTP/1.1" 204 0 5 (io.confluent.rest-utils.requests)
rest-proxy | [2021-09-10 07:09:00,753] INFO 172.23.0.1 - - [10/Sep/2021:07:09:00 +0000] "GET /v3/clusters/PIrHXSoSS_iZh29numEReQ/topics/test1/partitions HTTP/1.1" 200 2098 9 (io.confluent.rest-utils.requests)
rest-proxy | [2021-09-10 07:09:00,763] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Cluster ID: PIrHXSoSS_iZh29numEReQ (org.apache.kafka.clients.Metadata)
rest-proxy | [2021-09-10 07:09:00,764] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Discovered group coordinator broker:29092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
rest-proxy | [2021-09-10 07:09:00,765] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
rest-proxy | [2021-09-10 07:09:00,767] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
rest-proxy | [2021-09-10 07:09:00,769] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Successfully joined group with generation Generation{generationId=1, memberId='consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27-25f44ea5-8f37-4534-b15d-69a37cf6e017', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
rest-proxy | [2021-09-10 07:09:00,769] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Finished assignment for group at generation 1: {consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27-25f44ea5-8f37-4534-b15d-69a37cf6e017=Assignment(partitions=[test1-0, test1-1, test1-2])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:09:00,773] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Successfully synced group in generation Generation{generationId=1, memberId='consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27-25f44ea5-8f37-4534-b15d-69a37cf6e017', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
rest-proxy | [2021-09-10 07:09:00,774] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Notifying assignor about the new Assignment(partitions=[test1-0, test1-1, test1-2]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:09:00,774] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Adding newly assigned partitions: test1-0, test1-2, test1-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:09:00,776] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Found no committed offset for partition test1-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:09:00,776] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Found no committed offset for partition test1-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:09:00,776] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Found no committed offset for partition test1-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:09:01,759] INFO 172.23.0.1 - - [10/Sep/2021:07:09:00 +0000] "GET /consumers/eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c/instances/rest-consumer-31e2aa47-7388-4e14-9432-31e73242c8f3/records/ HTTP/1.1" 200 230 1003 (io.confluent.rest-utils.requests)
rest-proxy | [2021-09-10 07:09:02,763] INFO 172.23.0.1 - - [10/Sep/2021:07:09:01 +0000] "GET /consumers/eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c/instances/rest-consumer-31e2aa47-7388-4e14-9432-31e73242c8f3/records/ HTTP/1.1" 200 2 1002 (io.confluent.rest-utils.requests)
rest-proxy | [2021-09-10 07:09:02,767] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Found no committed offset for partition test1-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:09:02,768] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Found no committed offset for partition test1-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:09:02,769] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Found no committed offset for partition test1-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:09:02,770] INFO 172.23.0.1 - - [10/Sep/2021:07:09:02 +0000] "GET /consumers/eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c/instances/rest-consumer-31e2aa47-7388-4e14-9432-31e73242c8f3/offsets HTTP/1.1" 200 14 5 (io.confluent.rest-utils.requests)
rest-proxy | [2021-09-10 07:12:31,419] INFO [Consumer clientId=consumer-eds-group-for-query-3195063b-d623-4ac4-9519-cdc4522f96e5-26, groupId=eds-group-for-query-3195063b-d623-4ac4-9519-cdc4522f96e5] Failing OffsetCommit request since the consumer is not part of an active group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:12:31,420] WARN [Consumer clientId=consumer-eds-group-for-query-3195063b-d623-4ac4-9519-cdc4522f96e5-26, groupId=eds-group-for-query-3195063b-d623-4ac4-9519-cdc4522f96e5] Synchronous auto-commit of offsets {test1-0=OffsetAndMetadata{offset=2, leaderEpoch=0, metadata=''}, test1-2=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}, test1-1=OffsetAndMetadata{offset=1, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:12:31,421] INFO [Consumer clientId=consumer-eds-group-for-query-3195063b-d623-4ac4-9519-cdc4522f96e5-26, groupId=eds-group-for-query-3195063b-d623-4ac4-9519-cdc4522f96e5] Lost previously assigned partitions test1-0, test1-2, test1-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:12:31,421] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
rest-proxy | [2021-09-10 07:12:31,421] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
rest-proxy | [2021-09-10 07:12:31,421] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
rest-proxy | [2021-09-10 07:12:31,422] INFO App info kafka.consumer for consumer-eds-group-for-query-3195063b-d623-4ac4-9519-cdc4522f96e5-26 unregistered (org.apache.kafka.common.utils.AppInfoParser)
rest-proxy | [2021-09-10 07:14:02,838] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Member consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27-25f44ea5-8f37-4534-b15d-69a37cf6e017 sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
rest-proxy | [2021-09-10 07:14:03,431] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Failing OffsetCommit request since the consumer is not part of an active group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:14:03,431] WARN [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Synchronous auto-commit of offsets {test1-0=OffsetAndMetadata{offset=2, leaderEpoch=0, metadata=''}, test1-2=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}, test1-1=OffsetAndMetadata{offset=1, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:14:03,431] INFO [Consumer clientId=consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27, groupId=eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c] Lost previously assigned partitions test1-0, test1-2, test1-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
rest-proxy | [2021-09-10 07:14:03,441] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
rest-proxy | [2021-09-10 07:14:03,441] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
rest-proxy | [2021-09-10 07:14:03,441] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
rest-proxy | [2021-09-10 07:14:03,447] INFO App info kafka.consumer for consumer-eds-group-for-query-a9874156-aced-481e-87f8-90d99668829c-27 unregistered (org.apache.kafka.common.utils.AppInfoParser)
In command line, I tried again and found sometimes I have to run command similar to step 6 twice or more times, then I could get the following:
{
"offsets": [
{
"topic": "test1",
"partition": 0,
"offset": 2,
"metadata": ""
},
{
"topic": "test1",
"partition": 1,
"offset": 1,
"metadata": ""
},
{
"topic": "test1",
"partition": 2,
"offset": 0,
"metadata": ""
}
]
}
Do I have to use the following API to commits offsets with empty body?
POST /consumers/(string:group_name)/instances/(string:instance)/offsets
Commit a list of offsets for the consumer. When the post body is empty, it commits all the records that have been fetched by the consumer instance.
After step 5, I added this POST API, I got the offsets. So I have to do it manually. Could anybody explain something about this?