Failing Kafka when KafkaStream has set own TimestampExtractor

Hello frens,
I get such exception from Kafka while runtime:

Failed to clean up log for statefull_app_id-raw-trades-changelog-0 in dir C:\Kafka\kafka_2.13-2.7.0\tmp\kafka-logs due to IOException (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: C:\Kafka\kafka_2.13-2.7.0\tmp\kafka-logs\statefull_app_id-raw-trades-changelog-0\00000000000000000000.timeindex.cleaned β†’ C:\Kafka\kafka_2.13-2.7.0\tmp\kafka-logs\statefull_app_id-raw-trades-changelog-0\00000000000000000000.timeindex.swap: The process cannot access the file because it is in use by another process
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
at java.base/java.nio.file.Files.move(Files.java:1426)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:917)
at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:211)
at kafka.log.LazyIndex$IndexValue.renameTo(LazyIndex.scala:155)
at kafka.log.LazyIndex.$anonfun$renameTo$1(LazyIndex.scala:79)
at kafka.log.LazyIndex.renameTo(LazyIndex.scala:79)
at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
at kafka.log.Log.$anonfun$replaceSegments$4(Log.scala:2338)
at kafka.log.Log.$anonfun$replaceSegments$4$adapted(Log.scala:2338)
at scala.collection.immutable.List.foreach(List.scala:333)
at kafka.log.Log.replaceSegments(Log.scala:2338)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:614)
at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:539)
at kafka.log.Cleaner.doClean(LogCleaner.scala:538)
at kafka.log.Cleaner.clean(LogCleaner.scala:512)
at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:381)
at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:353)
at kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:333)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:322)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
Suppressed: java.nio.file.FileSystemException: C:\Kafka\kafka_2.13-2.7.0\tmp\kafka-logs\statefull_app_id-raw-trades-changelog-0\00000000000000000000.timeindex.cleaned β†’ C:\Kafka\kafka_2.13-2.7.0\tmp\kafka-logs\statefull_app_id-raw-trades-changelog-0\00000000000000000000.timeindex.swap: The process cannot access the file because it is in use by another process
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
at java.base/java.nio.file.Files.move(Files.java:1426)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:914)
… 18 more

this is my stream:

KStream<String, CryptoTradeRecord> tradeStream = streamsBuilder
        .addStateStore(storeBuilder)
        .stream(KafkaTopics.CRYPTO_TRADES,
                Consumed.with(stringSerde, cryptoTradeSerde)
                        .withTimestampExtractor(myTimestampExtractor));

KStream<String, CryptoTradeRecord> distinctTradeStream = tradeStream
        .filter((k, v) -> cryptoTradeRecordValidator.validate(v).isValid)
        .transformValues(() -> new CryptoTradeRecordDistinctTransformer(storeName), storeName)
        .filter((k, v) -> v != null);

and this is the TimestampExtractor:

@Override
public long extract(ConsumerRecord<Object, Object> consumerRecord, long l) {
    CryptoTradeRecord cryptoTradeRecord = (CryptoTradeRecord) consumerRecord.value();
    return cryptoTradeRecord .getEventTimestamp();
}

It fails after 1000 to 2000 messages.

This does not look like a Kafka Streams issue. The log cleaner on the brokers throws the exception. The brokers are agnostic to who is reading the topics. Try to post your question in the Ops category.
Note that Windows is not very well supported for Kafka.

1 Like

I have fixed it by shrinking my timestamp by dividing it with 1000.
Before I used nano timestamp like:
1620818525470329111 now I use
1620818525470329 and now it works.
But still doesn’t understand why the nano timestamp was making failures.