Unit Test with Spring's Embedded Kafka

Hello frens,
I’m a student of Big Data field and for my thesis I want to create streaming application with Apache Kafka Streams. The app will be streaming crypto currency trades, aggregating’em and send notifications to users about interesting opportunities.

I want to test my kafka streams job using Spring’s Embedded Kafka.
The job deduplicates records by uuid and extract a new records’ timestamps.

The test sometimes passes and sometimes throws exception:

java.nio.file.FileSystemException: C:\Users\fren\AppData\Local\Temp\kafka-942865213443983834\version-2\log.1: 
The process cannot access the file because it is in use by another process
        at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) ~[na:na]
        at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) ~[na:na]
        at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108) ~[na:na]
        at java.base/sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:274) ~[na:na]
        at java.base/sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:105) ~[na:na]
        at java.base/java.nio.file.Files.delete(Files.java:1146) ~[na:na]
        at org.apache.kafka.common.utils.Utils$2.visitFile(Utils.java:847) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.common.utils.Utils$2.visitFile(Utils.java:835) ~[kafka-clients-2.6.0.jar:na]
        at java.base/java.nio.file.Files.walkFileTree(Files.java:2804) ~[na:na]
        at java.base/java.nio.file.Files.walkFileTree(Files.java:2876) ~[na:na]
        at org.apache.kafka.common.utils.Utils.delete(Utils.java:835) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.common.utils.Utils.delete(Utils.java:821) ~[kafka-clients-2.6.0.jar:na]
        at org.apache.kafka.test.TestUtils.lambda$tempDirectory$3(TestUtils.java:303) ~[kafka-clients-2.6.1-test.jar:na]
        at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

In the test before every myKafkaStreams.start() I do myKafkaStreams.cleanUp() but it doesn’t help.

This is the test:
https://bitbucket.org/woblak/cstream-ingester/src/develop/cstream-ingester-core/src/test/java/com/woblak/cstream/ingester/core/io/kafka/streams/CryptowatTradesStreamJobTest.java

and this is the super class (configuration of Embedded Kafka):
https://bitbucket.org/woblak/cstream-ingester/src/develop/cstream-ingester-core/src/test/java/com/woblak/cstream/ingester/core/config/EmbeddedKafkaTest.java

Any suggestions what can I do about it?

Here was a discussion about unclean Kafka shutdown on windows env:
https://issues.apache.org/jira/browse/KAFKA-6075
but the issue was rejected: KAFKA-6075 Kafka cannot recover after an unclean shutdown by tedyu · Pull Request #4134 · apache/kafka · GitHub

Maybe it is possible to make the Embedded Kafka not to save files on disc but to do all operations in memory?

Seem the ticket you mentioned was actually fixed. The ticket status is “Resolved” and the PR comments says so, too: KAFKA-6075 Kafka cannot recover after an unclean shutdown by tedyu · Pull Request #4134 · apache/kafka · GitHub

Also note, that KafkaStreams#cleanup() is for client side local state (as configured via state.dir configuration, while the ticket you mentioned is about the brokers.

I am not familiar with the details of Spring, but if Spring does not properly cleanup, you might need to ask Spring people about it, or implement custom cleanup. It might also help to set a “random” directory (for broker as well as for client state) for each test you run, to effectively isolate runs. – Last, note that Windows OS is not officially supported; it’s only maintained with a “best effort”.

Hope this helps.

1 Like

Thx fren,
my additional worries are about time extractor in the same same streaming job.

streamsBuilder
                .addStateStore(storeBuilder)
                .stream(topic, Consumed.with(stringSerde, cryptowatTradeSerde)
                                .withTimestampExtractor(cryptoTradeTimestampExtractor))

Is it enough to set new timestamp on streaming records?
Cuz now when I run tests the extractor is omited. It runs only in debug mode. :open_mouth:


oh, look fren…
I have the kafka message listener and kafka stream based on same consumer group. That’s the reason for that strange behaviours. I have deleted the listener and now it works.

Could you resolve your issue?

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.