[KafkaStreams] State store with only 1 value

Hello frens,
how can I create state store which will store only one value (the last)?
Is there a better way instead of removing element after every getting the element?

@RequiredArgsConstructor
public class CryptoTradeRecordDistinctTransformer implements ValueTransformer<CryptoTradeRecord, CryptoTradeRecord> {

    private final String storeName;
    private KeyValueStore<String, Long> timestampStore;

    @Override
    public void init(ProcessorContext processorContext) {
        this.timestampStore = (KeyValueStore<String, Long>) processorContext.getStateStore(storeName);
    }

    @Override
    public CryptoTradeRecord transform(CryptoTradeRecord record) {
        CryptoTradeRecord result = null;

        if(isDisctinct(record)) {
            result = record;
        }
        updateTimestampStore(record);

        return result;
    }

    @Override
    public void close() {
        timestampStore.close();
    }

    private boolean isDisctinct(CryptoTradeRecord record) {
        Optional<Long> lastTimestamp = poll(record.getUuid());
        return lastTimestamp.isEmpty();
    }

    private Optional<Long> poll(String uuid) {
        Optional<Long> lastTimestamp = Optional.ofNullable(timestampStore.get(uuid));
        if (lastTimestamp.isPresent()) {
            timestampStore.delete(uuid);
        }
        return lastTimestamp;
    }
    
    private void updateTimestampStore(CryptoTradeRecord record) {
        timestampStore.put(record.getUuid(), record.getEventTimestamp());
    }
}

myStream
    .transformValues(() -> new CryptoTradeRecordDistinctTransformer(storeName), storeName)
    .filter((k, v) -> v != null)

The context why I need it is:
I stream crypto currency trades to kafka with repetitions:
There are 3 independent services which streams the same data. Thanks this if 1 service fails then others still stream the data.
Then in KafkaStreams i want to filter the messages to make them distinctive. That’s why I want to compare every message event timestamp with the last message event timestamp for same uuid. If it the new timestamp is bigger then the last then the message is being processed. Otherwise it’s ignored.

Hey @programista4k , take a look at this tutorial and see if that helps you with your use case: Kafka Streams Tutorial: How to find distinct values i

1 Like

thx, it is interesting but I think I dont need windows here: I must deduplicate not only for chosen window period (for example last 5 sec.) but from “the begining of the world” so my window would be from -Inf to Inf.
The question is:
what is faster:
delete element from store on every get or use windowed store whose elements expire.