Kafka Streams local state store size increasing

I have written a kafka stream processor using the processor api. I am cleaning up state stores with punctuation in regular intervals. But I am still seeing local state store sizes gradually increasing. Now it has reached GBs. Would somebody please let me know the reason?

Very hard to tell without more details… What store are you using kv-store, windowed-store, session-store? RocksDB or In-Memory? What does “clean up” even mean (ie, how do you use the store?) What is your data distribution (ie, unique keys)? What does “on a regular basis mean”?

Thanks for the quick reply.
I am using persistent kv-store Store(RocksDB) named as dedupKeyStore. I am deleting expired events from state store(dedupKeyStore) every 5 min interval in punctuation. I am still seeing dedupKeyStore gradually increasing. Source topic partition distribution is based on key as nodeId , source, seqno . For reference I have attached processor code.

public class DeduplicationProcessor extends AbstractProcessor<String, LogRecord>
{
	private static Logger logger = LoggerFactory.getLogger(DeduplicationProcessor.class);

	private ProcessorContext context;
	private String globalTsStoreName;
	private String dedupKeyStoreName;
	private Long maintainDurationMs;
	private Integer dedupKeyStorePurgeItvlInMin;
	private KeyValueStore<String, Long> dedupKeyStore;
	private KeyValueStore<String, Long> globalTsStore;
	private Set<String> partitionKeyStore = new LinkedHashSet<>();

	DeduplicationProcessor(Long maintainDurationMs,
			Integer dedupKeyStorePurgeItvlInMin,
			String dedupKeyStoreName,
			String globalTsStoreName)
	{
		this.maintainDurationMs = maintainDurationMs;
		this.dedupKeyStorePurgeItvlInMin = dedupKeyStorePurgeItvlInMin;
		this.dedupKeyStoreName = dedupKeyStoreName;
		this.globalTsStoreName = globalTsStoreName;
	}

	@Override
	public void init(ProcessorContext context)
	{
		this.context = context;
		this.dedupKeyStore = (KeyValueStore<String, Long>) context.getStateStore(dedupKeyStoreName);
		this.globalTsStore = (KeyValueStore<String, Long>) context.getStateStore(globalTsStoreName);

		this.context.schedule(Duration.ofMinutes(dedupKeyStorePurgeItvlInMin),
				PunctuationType.WALL_CLOCK_TIME,
				(timestamp) -> {
					infolog("purge starting, timestamp:" + timestamp);
					long currTime = System.currentTimeMillis();
					purgeExpiredEventIds();
					infolog("purge completed in" + (System.currentTimeMillis() - currTime) + " ms");
				});

		infolog("processor context purge puntuation scheduled, DurationInMin:" + dedupKeyStorePurgeItvlInMin);

		infolog("processor context initialized");
	}

	@Override
	public void process(String key,
			LogRecord value)
	{
		// add partition key
		updatePartitionKey(key, value);

		Long currentRecTimestamp = value.getTimestamp();
		String timestamp = String.valueOf(value.getTimestamp());
		String nodeId = String.valueOf(value.getNodeId());
		String source = value.getSource().name();
		String seqno = String.valueOf(value.getSeqNum());

		String eventId = timestamp + "_" + nodeId + "_" + source + "_" + seqno;

		if (!isDuplicate(eventId))
		{
			rememberNewEvent(eventId, currentRecTimestamp);
			// emit the final record
			this.context.forward(key, value);
		}
	}

	private void updatePartitionKey(String key,
			LogRecord value)
	{
		String partitionKey = context.topic() + Constants.HASH_SEPERATOR + context.partition();
		if (!partitionKeyStore.contains(partitionKey))
		{
			partitionKeyStore.add(partitionKey);
			infolog("updating partitionKeyStore with key:" + partitionKey);
		}
	}

	private void purgeExpiredEventIds()
	{
		Iterator<String> paritionKeyitr = partitionKeyStore.iterator();
		while (paritionKeyitr.hasNext())
		{
			final String partitionKey = paritionKeyitr.next();
			Long maxObservedTimestamp = globalTsStore.get(partitionKey);
			if (maxObservedTimestamp != null)
			{
				infolog("purge starting, partitionKey:" + partitionKey + " maxObservedTimestamp:"
						+ maxObservedTimestamp);
				long currTime = System.currentTimeMillis();
				try (final KeyValueIterator<String, Long> dedupKeyStoreItr = dedupKeyStore.all())
				{
					while (dedupKeyStoreItr.hasNext())
					{
						final KeyValue<String, Long> dedupKeyEntry = dedupKeyStoreItr.next();
						final long eventTimestamp = dedupKeyEntry.value;
						if (hasExpired(eventTimestamp, maxObservedTimestamp))
						{
							dedupKeyStore.delete(dedupKeyEntry.key);
						}
					}
				}

				infolog("purge completed for partitionKey:" + partitionKey + " in "
						+ (System.currentTimeMillis() - currTime) + " ms");
			}
			else
			{
				infolog("purge didn't proceed as partitionKey:" + partitionKey + " not found in globalTsStore");
			}
		}
	}

	private boolean isDuplicate(final String eventId)
	{
		return dedupKeyStore.get(eventId) != null;
	}

	private void rememberNewEvent(final String eventId,
			final long eventTimestamp)
	{
		dedupKeyStore.put(eventId, eventTimestamp);
	}

	private boolean hasExpired(final long eventTimestamp,
			final long maxTimeMs)
	{
		return (maxTimeMs - eventTimestamp) > maintainDurationMs;
	}

	@Override
	public void close()
	{
		// TODO Auto-generated method stub

	}

	private void debuglog(String str)
	{
		logger.debug(context.taskId() + ":" + str);
	}

I don’t understand why you need a global table. If you use the ID as key, all with the same ID should be in the same partition. Instead of working with the time yourself you should be able to use a sliding window to just have the messages of the last x time on memory/disk.
Another thing I don’t understand is that you create a eventID, with the timestamp. So as long long as the timestamp is different it will be another event anyway?

I am using globalTsStore to track partition max record timestamp. Based on that max timestamp I am purging expired events every 5 mins. I am using global table. because I need to use this store in other processor as well and to track all partition max record timestamp in further downstream pipeline. dedupKeyStore has key as eventID (timestamp + “" + nodeId + "” + source + “_” + seqno) and value as record timestamp.
yes eventID would be different as timestamp is different but those eventID is getting purged/deleted periodically in punctuation.

I cannot spot anything obvious that would look wrong.

Can you verify which rows you actually expect to get expired, but are not? Maybe using IQ to query to state store from outside?

How many new row do you add per time unit and how many rows does the punctuation delete? Should be easy to count. Do the numbers match up?

Thanks for reply…
I found out the reason. dedupKeyStore is getting purged properly in the code but MANIFEST file size in the rocksdb state-store was constantly increasing. MANIFEST file rolled over after reaching to 1G and older file got deleted. Now my state-store size reduced and looks fine.

1 Like

Ah. So it’s just a config “issue”. You can change RocksDB config via rocksdb.config.setter: Configuring a Streams Application | Confluent Documentation – Changing the config should allow you to set a smaller MANIFEST file size, so it rolls over earlier and thus keep disk footprint smaller.

Check out the RocksDB guide, too: Kafka Streams Memory Management | Confluent Documentation

There is also a blog post about perf tuning: Performance Tuning RocksDB for Kafka Streams’ State Stores

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.