I have written a kafka stream processor using the processor api.
I need two functionalities
- Reject late records (Not within last 30 mins)
- Remove duplicates based on last 30 mins window.
I am using persistent state store as below
a) LateRecordProcessor uses persistent kv-store Store(RocksDB) named as partitionTsStore. It is used to keep track of latest timestamp of partition. Based on that timestamp, late records are rejected.
b) DeduplicationProcessor uses WindowStore (RocksDB) named as dedupKeyStore. It is used to remove duplicate records. RetentionPeriod and windowSize of WindowStore is 30 mins.
Records gets processed by LateRecordProcessor first and then forwarded to DeduplicationProcessor.
Would u please let me know, Is it a correct way for doing it?
For reference I have attached processor code.
public class LateRecordProcessor extends ContextualProcessor<String, LogRecord, String, LogRecord>
{
private static Logger logger = LoggerFactory.getLogger(LateRecordProcessor.class);
private ProcessorContext<String, LogRecord> context;
private Long windowDurationMs;//30mins
private String DEDUPLICATOR_NODE;
LateRecordProcessor(String partitionTsStoreName,
Long windowDurationMs,
String DEDUPLICATOR_NODE)
{
this.windowDurationMs = windowDurationMs;
this.partitionTsStoreName = partitionTsStoreName;
this.DEDUPLICATOR_NODE = DEDUPLICATOR_NODE;
}
@Override
public void init(ProcessorContext<String, LogRecord> context)
{
this.context = context;
this.partitionTsStore = (KeyValueStore<String, Long>) context.getStateStore(partitionTsStoreName);
infolog("processor context initialized");
}
@Override
public void process(Record<String, LogRecord> record)
{
String key = record.key();
LogRecord value = record.value();
Long newTimestamp = value.getTimestamp();
RecordMetadata rMD = context.recordMetadata().get();
String tsKey = rMD.topic() + Constants.HASH_SEPERATOR + rMD.partition();
Long prevMaxObservedTimestamp = partitionTsStore.get(tsKey);
// update maxObserved time
if (prevMaxObservedTimestamp == null || newTimestamp > prevMaxObservedTimestamp)
{
partitionTsStore.put(tsKey, newTimestamp);
}
Long currentRecTimestamp = value.getTimestamp();
Long maxObservedTimestamp = partitionTsStore.get(tsKey);
debuglog("maxObservedTimestamp found in partitionTsStore with key:" + tsKey + " and value:"
+ maxObservedTimestamp);
// update maxObserved time
if (maxObservedTimestamp == null)
{
maxObservedTimestamp = 0L;
}
// accept the event within the timewindow(30min(1800s))
if (currentRecTimestamp >= (maxObservedTimestamp - windowDurationMs))
{
// emit the record
this.context.forward(record, DEDUPLICATOR_NODE);
}
else
{
infolog("latecoming record rejected as tsKey:" + tsKey + " ,maxObservedTimestamp:" + maxObservedTimestamp
+ " record key:" + key + " and value:" + value);
}
}
@Override
public void close()
{
// TODO Auto-generated method stub
}
private void debuglog(String str)
{
logger.debug(context.taskId() + ":" + str);
}
private void infolog(String str)
{
logger.info(context.taskId() + ":" + str);
}
}
public class DeduplicationProcessor extends ContextualProcessor<String, LogRecord, String, LogRecord>
{
private static Logger logger = LoggerFactory.getLogger(DeduplicationProcessor.class);
private ProcessorContext<String, LogRecord> context;
private String dedupKeyStoreName;
private WindowStore<String, Long> dedupKeyStore;
DeduplicationProcessor(String dedupKeyStoreName)
{
this.dedupKeyStoreName = dedupKeyStoreName;
}
@Override
public void init(ProcessorContext<String, LogRecord> context)
{
this.context = context;
this.dedupKeyStore = (WindowStore<String, Long>) context.getStateStore(dedupKeyStoreName);
infolog("processor context initialized");
}
@Override
public void process(Record<String, LogRecord> record)
{
String key = record.key();
LogRecord value = record.value();
String timestamp = String.valueOf(value.getTimestamp());
String nodeId = String.valueOf(value.getNodeId());
String source = value.getSource().name();
String seqno = String.valueOf(value.getSeqNum());
String eventId = timestamp + "_" + nodeId + "_" + source + "_" + seqno;
Long recTimestamp = value.getTimestamp();
if (!isDuplicate(eventId, recTimestamp))
{
rememberNewEvent(eventId, recTimestamp);
// emit the final record
this.context.forward(record);
}
else
{
infolog("duplicate event detected:" + eventId);
}
}
private void rememberNewEvent(String eventId,
final long timestamp)
{
dedupKeyStore.put(eventId, timestamp, timestamp);
}
private boolean isDuplicate(String eventId,
long timestamp)
{
final long eventTime = timestamp;
final WindowStoreIterator<Long> timeIterator = dedupKeyStore
.fetch(eventId, eventTime, eventTime);
final boolean isDuplicate = timeIterator.hasNext();
timeIterator.close();
return isDuplicate;
}
@Override
public void close()
{
// TODO Auto-generated method stub
}
private void debuglog(String str)
{
logger.debug(context.taskId() + ":" + str);
}
private void infolog(String str)
{
logger.info(context.taskId() + ":" + str);
}
}