Hello frens,
I get such exception from Kafka while runtime:
Failed to clean up log for statefull_app_id-raw-trades-changelog-0 in dir C:\Kafka\kafka_2.13-2.7.0\tmp\kafka-logs due to IOException (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: C:\Kafka\kafka_2.13-2.7.0\tmp\kafka-logs\statefull_app_id-raw-trades-changelog-0\00000000000000000000.timeindex.cleaned β C:\Kafka\kafka_2.13-2.7.0\tmp\kafka-logs\statefull_app_id-raw-trades-changelog-0\00000000000000000000.timeindex.swap: The process cannot access the file because it is in use by another process
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
at java.base/java.nio.file.Files.move(Files.java:1426)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:917)
at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:211)
at kafka.log.LazyIndex$IndexValue.renameTo(LazyIndex.scala:155)
at kafka.log.LazyIndex.$anonfun$renameTo$1(LazyIndex.scala:79)
at kafka.log.LazyIndex.renameTo(LazyIndex.scala:79)
at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
at kafka.log.Log.$anonfun$replaceSegments$4(Log.scala:2338)
at kafka.log.Log.$anonfun$replaceSegments$4$adapted(Log.scala:2338)
at scala.collection.immutable.List.foreach(List.scala:333)
at kafka.log.Log.replaceSegments(Log.scala:2338)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:614)
at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:539)
at kafka.log.Cleaner.doClean(LogCleaner.scala:538)
at kafka.log.Cleaner.clean(LogCleaner.scala:512)
at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:381)
at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:353)
at kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:333)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:322)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
Suppressed: java.nio.file.FileSystemException: C:\Kafka\kafka_2.13-2.7.0\tmp\kafka-logs\statefull_app_id-raw-trades-changelog-0\00000000000000000000.timeindex.cleaned β C:\Kafka\kafka_2.13-2.7.0\tmp\kafka-logs\statefull_app_id-raw-trades-changelog-0\00000000000000000000.timeindex.swap: The process cannot access the file because it is in use by another process
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
at java.base/java.nio.file.Files.move(Files.java:1426)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:914)
β¦ 18 more
this is my stream:
KStream<String, CryptoTradeRecord> tradeStream = streamsBuilder
.addStateStore(storeBuilder)
.stream(KafkaTopics.CRYPTO_TRADES,
Consumed.with(stringSerde, cryptoTradeSerde)
.withTimestampExtractor(myTimestampExtractor));
KStream<String, CryptoTradeRecord> distinctTradeStream = tradeStream
.filter((k, v) -> cryptoTradeRecordValidator.validate(v).isValid)
.transformValues(() -> new CryptoTradeRecordDistinctTransformer(storeName), storeName)
.filter((k, v) -> v != null);
and this is the TimestampExtractor:
@Override
public long extract(ConsumerRecord<Object, Object> consumerRecord, long l) {
CryptoTradeRecord cryptoTradeRecord = (CryptoTradeRecord) consumerRecord.value();
return cryptoTradeRecord .getEventTimestamp();
}
It fails after 1000 to 2000 messages.