De-duplicator Using Kafka Streams

I have written a kafka stream processor using the processor api.
I need two functionalities

  1. Reject late records (Not within last 30 mins)
  2. Remove duplicates based on last 30 mins window.
    I am using persistent state store as below
    a) LateRecordProcessor uses persistent kv-store Store(RocksDB) named as partitionTsStore. It is used to keep track of latest timestamp of partition. Based on that timestamp, late records are rejected.
    b) DeduplicationProcessor uses WindowStore (RocksDB) named as dedupKeyStore. It is used to remove duplicate records. RetentionPeriod and windowSize of WindowStore is 30 mins.

Records gets processed by LateRecordProcessor first and then forwarded to DeduplicationProcessor.

Would u please let me know, Is it a correct way for doing it?

For reference I have attached processor code.

public class LateRecordProcessor extends ContextualProcessor<String, LogRecord, String, LogRecord>
{
	private static Logger logger = LoggerFactory.getLogger(LateRecordProcessor.class);

	private ProcessorContext<String, LogRecord> context;

	private Long windowDurationMs;//30mins
	private String DEDUPLICATOR_NODE;

	LateRecordProcessor(String partitionTsStoreName,
			Long windowDurationMs,
			String DEDUPLICATOR_NODE)
	{
		this.windowDurationMs = windowDurationMs;
		this.partitionTsStoreName = partitionTsStoreName;
		this.DEDUPLICATOR_NODE = DEDUPLICATOR_NODE;
	}

	@Override
	public void init(ProcessorContext<String, LogRecord> context)
	{
		this.context = context;
		this.partitionTsStore = (KeyValueStore<String, Long>) context.getStateStore(partitionTsStoreName);
		infolog("processor context initialized");
	}

	@Override
	public void process(Record<String, LogRecord> record)
	{
		String key = record.key();
		LogRecord value = record.value();

		Long newTimestamp = value.getTimestamp();

		RecordMetadata rMD = context.recordMetadata().get();

		String tsKey = rMD.topic() + Constants.HASH_SEPERATOR + rMD.partition();

		Long prevMaxObservedTimestamp = partitionTsStore.get(tsKey);
		// update maxObserved time
		if (prevMaxObservedTimestamp == null || newTimestamp > prevMaxObservedTimestamp)
		{
			partitionTsStore.put(tsKey, newTimestamp);
		}

		Long currentRecTimestamp = value.getTimestamp();

		Long maxObservedTimestamp = partitionTsStore.get(tsKey);

		debuglog("maxObservedTimestamp found in partitionTsStore with key:" + tsKey + " and value:"
				+ maxObservedTimestamp);

		// update maxObserved time
		if (maxObservedTimestamp == null)
		{
			maxObservedTimestamp = 0L;
		}

		// accept the event within the timewindow(30min(1800s))
		if (currentRecTimestamp >= (maxObservedTimestamp - windowDurationMs))
		{
			// emit the record
			this.context.forward(record, DEDUPLICATOR_NODE);
		}
		else
		{
			infolog("latecoming record rejected as tsKey:" + tsKey + " ,maxObservedTimestamp:" + maxObservedTimestamp
					+ " record key:" + key + " and value:" + value);
		}
	}


	@Override
	public void close()
	{
		// TODO Auto-generated method stub
	}

	private void debuglog(String str)
	{
		logger.debug(context.taskId() + ":" + str);
	}

	private void infolog(String str)
	{
		logger.info(context.taskId() + ":" + str);
	}

}

public class DeduplicationProcessor extends ContextualProcessor<String, LogRecord, String, LogRecord>
{
	private static Logger logger = LoggerFactory.getLogger(DeduplicationProcessor.class);

	private ProcessorContext<String, LogRecord> context;
	private String dedupKeyStoreName;
	private WindowStore<String, Long> dedupKeyStore;

	DeduplicationProcessor(String dedupKeyStoreName)
	{
		this.dedupKeyStoreName = dedupKeyStoreName;
	}

	@Override
	public void init(ProcessorContext<String, LogRecord> context)
	{
		this.context = context;
		this.dedupKeyStore = (WindowStore<String, Long>) context.getStateStore(dedupKeyStoreName);

		infolog("processor context initialized");
	}

	@Override
	public void process(Record<String, LogRecord> record)
	{
		String key = record.key();
		LogRecord value = record.value();

		String timestamp = String.valueOf(value.getTimestamp());
		String nodeId = String.valueOf(value.getNodeId());
		String source = value.getSource().name();
		String seqno = String.valueOf(value.getSeqNum());

		String eventId = timestamp + "_" + nodeId + "_" + source + "_" + seqno;

		Long recTimestamp = value.getTimestamp();
		if (!isDuplicate(eventId, recTimestamp))
		{
			rememberNewEvent(eventId, recTimestamp);
			// emit the final record
			this.context.forward(record);
		}
		else
		{
			infolog("duplicate event detected:" + eventId);
		}

	}

	private void rememberNewEvent(String eventId,
			final long timestamp)
	{
		dedupKeyStore.put(eventId, timestamp, timestamp);
	}

	private boolean isDuplicate(String eventId,
			long timestamp)
	{
		final long eventTime = timestamp;
		final WindowStoreIterator<Long> timeIterator = dedupKeyStore
				.fetch(eventId, eventTime, eventTime);
		final boolean isDuplicate = timeIterator.hasNext();
		timeIterator.close();
		return isDuplicate;
	}

	@Override
	public void close()
	{
		// TODO Auto-generated method stub
	}

	private void debuglog(String str)
	{
		logger.debug(context.taskId() + ":" + str);
	}

	private void infolog(String str)
	{
		logger.info(context.taskId() + ":" + str);
	}

}

Would somebody please give me ur suggestion?

Hi, johny,

I’ve taken a quick look at the code you’ve supplied here. It seems correct, but I don’t know your requirements so I can’t say what I’d take out or change.

One thing I would suggest is to validate the behavior of your code with the Topology Test Driver

HTH,
Bill