Testing Asynchronous Code

Testing asynchronous code fragments is something that we do on a weekly or monthly basis. Sometimes more, most of the time way less. However: having microservices that talk to each other brings with it such an issue. You want to make sure, that the connection between the services works and that when our isolated service receives a message through one of it’s inputs, it does what it is supposed to do.

Now, there a lot of potential ways of doing that. Let’s have a look on that and construct a simple example application, which looks like this:

I decided to use Kafka as a means of communications, since this is quite common for the communications between microservices; At least for me. Some other approaches might include RabbitMQ, some other Messaging system or REST/Soap. The last category is not technically asynchronous. Yes, the underlying system for sending and receiving messages through sockets is asynchronous, as is reading from the disc on OS-level, but on the application level this is synchronised through a whole lot of different means.

Using Kafka, something like the following is quite common: A Kafka consumer is listening on a certain topic. When data is written to the topic, the consumer receives a new message from the Kafka server. The consumer delegates the message to so called “services”, which contain the business logic for certain parts of our application. They utilise repositories, to access the database.

Before jumping into the code, here are the frameworks and base technologies I’ll be using across the code examples (plus Java, obviously).

Also, i use the AAA (tripple A) naming scheme throughout my tests, writing the three comments so that you can follow along, even if the code is dissected into multiple different sections.

With that out of the way, let’s jump into it. Our goal is, to test that everything works as we expect it to. Unit tests are a great way of doing that. So, to validate that a does what it is supposed to do, we can construct it and inject mocks for outside dependencies. It then might look like this:

class ServiceATest {
    @Test
    fun `test that the service a operation works correctly`() {
        // Arrange
        val repositoryA: RepositoryA = mockk()
        val repositoryB: RepositoryB= mockk()
        val serviceC = ServiceC(repositoryA, repositoryB)
        val testSubject = ServiceA(serviceC)
        // Other arranges and stubs for the mocks

        // Act
        testSubject.handle(/* Input data for the scenario */)

        // Assert
        verify { repository.save(Entity()) }
        // Other asserts
    }
}

This works good as is. We can construct our test scenario and verify that our ServiceA works as expected. Now, obviously this is not adequate for our scenario. The KafkaListener calls multiple services instead of just one. So, in this scenario we can also construct the KafkaConsumer, pass him the dependencies and go. Buuuut, the test setup will now get very big. We will have to mock all the repositories, construct all the classes and so on. The Arrange block alone will surpass the size of our current test quite easily.

So, to make it easier, we can use a dependency injection framework. Something like the Spring Extension or SimpleDI (shameless self promotion). This would make it simpler and cleaner, but let’s not focus on unit tests right now. They are beautiful, for sure, but some times you want to test the integration in the whole application context. Since at least in my bubble, Spring is used quite commonly, I will use this in code examples for now.

Integration Tests With Asynchronous Method Calls

Okay. So far so good. Let us now change the test and setup a concrete integration test. We want to go ahead and also verify that the Kafka integration as well as our assumptions towards JPA work. Normally this would not be needed. If we cannot rely on the Kafka or JPA integration of Spring to work, what can we rely on? But it is a valid scenario, to ensure that (for example) the serialisation and de-serialisation for the message objects works and we did not goof up while integrating Kafka. We can construct this test for example like this and run it:

@SpringBootIntegrationTest // Setup DB, Kafka and all required dependencies
class ServiceATest {

    @Autowired
    private lateinit var consumer: KafkaConsumer

    @Autowired
    private lateinit var kafkaTemplate: KafkaTemplate<String, Messages>

    @Autowired
    private lateinit var testEntityRepository: TestEntityRepository

    @Test
    fun `test that the create message creates the entity`() {
        // No Arrange

        // Act
        kafkaTemplate.sendDefault(CreateMessage()).get()

        // Assert
        assertThat(testEntityRepository.findById(/* id */)).isNotEmpty
    }
}

This won’t work

If you execute this test, it will fail. The method repository.save will not have been called, at least with 99.9999999…% certainty. Why is that? Lets have a look at the call flow.

As we execute our test, the code kafkaTemplate.sendDefault(CreateMessage()).get() will write the serialised object onto the topic within an embedded Kafka (or a Kafka instance within a test container). Once this writing is done, the .get() will return. This however is only the beginning. The KafkaConsumer now runs in a separate thread and picks up this message. As the Test thread already moves to validate the results, the thread that triggers the test result (potentially) did not even start yet. It might happen, in extreme rare cases, that the second thread finishes before the asserting thread, but before this happens naturally we might see the universe die of heat death.

To better identify those two processes throughout the now following examples, lets give them the names Testing Thread and Executing Thread.

So, we somehow have to wait until the result is there. An abstract view of that might look like this:

We have to have some sort of synchronisation before asserting the results from the database on the Testing Thread. At the beginning of the assert block, we start it and we trigger the release of the synchronisation from the Executing Thread. This abstract concept obviously does not work like this, since we don’t want to change the business code, just because the tests need it.

Spoiler: It does work even without changing the business code using mockk and some magic, but let’s first look at other approaches.

The simple approach (aka what never to do)

If we have a look at the synchronisation and the way we test, we might directly couple it to the database. We could “simply” wait until the result is written to the database. There are some downfalls with that, but let me first of all show you how NOT to do that:

 // Assert
lateinit var result: Entity
while(true) {
    val current = testEntityRepository.findById(/* id */)
    if(current.isPresent) {
        result = current.get()
    }
}
assertThat(entity.version).isEqualTo(2)

This is not a good idea. Having such a while(true) loop is called busy waiting and is never a good idea to do. We over occupy the cpu, reduce the time the Executing Thread has, take away resources and let’s the cpu (or at least one of it’s cores) run really hot.

We can enhance this example though and turn it into something of a spin lock. This is not totally true though, but for the sake of explaining it, let’s give this approach the name spin lock.

The Spin Lock (aka what could be done)

To change our existing code into a spin lock based approach, we need do a lot of things. We can let the thread “sleep” for x milliseconds and then test the condition. If the condition is true (i.e. the data can be found in the database), we break the loop. If the condition is false, we check if we already tested for a long time. If so, we exit with an error, otherwise we wait again for x milliseconds and continue from the beginning.

Pseudo Code for such an approach might look like this:

var running = true
lateinit var result: T
val timeoutMilliseconds = 100
val startMilliseconds = System.currentTimeMillis()

while(running) {
    Thread.sleep(timeoutMilliseconds)
    if(condition.isMet()) {
        result = getResult()
        running = false
    } else if(startMilliseconds > maxMilliseconds) {
        running = false
        throw Exception("Execution took to long")
    }
}

It is important to note again, that this approach is not really (100%) a spin lock. I just choose it for the lack of a better name.

Now, we don’t have to implement this our self. In the JVM ecosystem there is awaitility for example, which we can use like this:

// Assert
await().atMost(5, SECONDS)
           .until(repository.findById(/* id */).isNotEmpty)

Alright! Cool! We now wait up to 5 seconds, trying to find the entity each time. This helps us tremendously, since now we have a synchronisation between the Testing Thread and the Executing Thread!

But this solution still is not 100% perfect. If we test the integration, we now might get false positive, or false negative tests, depending on our code.

Take for example our setup again. We had two services, accessing the RepositoryB. If we break our business logic and (just as an example) change the transaction border, the changes might be committed to early.

In the program order, it now looks like this.

Even though our new change did not break anything, it looks like it. This leads to wasted time trying to find the bug and some times, even worse, introducing a broken repair, which makes this test look green, but it really does not work as we expect it to.

I know that changing the transaction border might be a break you want to catch, but i challenge you to now understand this issue just by executing this test, that tries to validate everything else. You cannot differentiate between a broken business logic, a broken transaction border or code of a new college, that manually triggered a commit to compensate for another bug.

Another issue with this is, that every 100ms we dispatch a request against the database. This might be quite the overhead again. Tweaking this timeout is a science for itself. To small and we come closer and closer towards the busy waiting approach, over occupying one processor core. To high and the test might take annoyingly long to execute. Every second accumulates across all the integration tests leading to head banging against the table.

What would be another (potentially better) solution to the issue? To be certain that our integration test validates the workflow and eradicate side effects as much as humanly possible, we would need to synchronise as shown right here, which I’ll call the business lock:

Let us take a look at how we could implement this:

The Business Lock (aka making sure we are actually done without overbearing our processor)

This approach might now look very intimidating and complicated, but bear with me. Let’s tackle this one step at a time. How can we achieve this?

One approach would be, to wait for the execution of the Executing Thread in our Testing Thread. We could do something like this:

// Act
kafkaTemplate.sendDefault(CreateMessage()).get()
val executingThread = getExecutingThread()

// Assert
executingThread.join()
// We are done, now let's assert
...

This would be the cleanest solution, no questions asked. The thread.join() call will always finish, except when the Executing Thread is not finishing. However: it would also require a lot of setup.

(At least) Kafka is deeply interwoven with Spring if we use spring-boot-kafka and getting the Executing Thread from the Testing Thread might be an extreme overhead. Even going as far as being impossible, or at the very least breaking every so often, when we update spring.

Also also, the code would not work with Spring, because threads in Spring are constructed in a thread pool. Such threads not necessarily finish, but they try to get the next Runnable. So we would need to intercept the creation of the task that is passed to the thread pool, which is even more of a hurdle to overcome.

Maybe one day there will be a way for that, but for now, we will need to find a different approach. Let us actually go ahead and utilise spies from mockk.

So, here is the idea: We construct a spy on the KafkaConsumer. When the consume method is executed we call the original method and afterwards we wake up the Testing Thread. We then start the Executing Thread and park the Testing Thread.

This approach has some holes in them. What if the consumer is triggered to early and finishes before the Testing Thread is parked? This can be fixed by utilising a Semaphore or a CompletableFuture. Let’s have a look, first at the Semaphore approach and then at the CompletableFuture approach:

A business lock using Semaphores

We need to do 2 things to achieve this. First of, we need to add the Kafka consumer as a spied field in our class like this:

@SpyKBean
lateinit var consumer: KafkaConsumer

Then we need to define our wrapper around the consume method in the Arrange block like this

// Arrange
val semaphore = Semaphore(0)

every { consumer.consume(any()) } answers {
    val result = callOriginal() // Call the underlying real method
    semaphore.release() // Release the Testing Thread
    result // return the result to continue the Executing Thread and let it clean up resources
}

Once we call semaphore.release, we will increase the available permits in the semaphore and check, if there are any suspended threads. If so, it will invoke the next one and reduce the permits again. The last statement let’s the Executing Thread continue and let’s Kafka do it’s cleanup stuff, as well as increasing offset and other important things.

We have defined how the Executing Thread should release the Testing Thread with this every block. So, if we now send the Kafka message, the Executing Thread will start and, at the end, release the semaphore.

To now suspend our Testing Thread, we can do the following at the beginning of our Assert block:

// Assert
assertThat(semaphore.tryAcquire(5, TimeUnit.SECONDS))
         .withFailMessage("Consumer has not been called in time")
         .isTrue
// get the result and assert stuff
...

As we call semaphore.tryAcquire, the semaphore will check it’s permits. If we have <= 0 permits, the calling thread will be suspended (parked). If we have more though, we will reduce the permits and continue as if nothing happend. So, this approach (in theory) would also work with synchron code executions. The passed argument of 5 seconds tells the Thread parker to stop waiting for an increase in the permits after 5 seconds. With try acquire, we return false in such a scenario, meaning that the assertion tests that the execution was successful.

A business lock using CompletableFuture

A semaphore is a great Barrier, for knowing when something is finished. Using a CompletableFuture though, we can go one step further and even intercept results this way. Totally Probing the method that we wrap around. Let’s say we want to get the message that the KafkaConsumer is returning and validating against that for some reason. We could again go the length to introduce a spin lock, listening to the Kafka server and waiting for the message. Or, bear with me, we could use a CompletableFuture like this:

// Arrange
val future = CompleteableFuture<ConsumerResult>()

every { consumer.consume(any()) } answers {
    val result = callOriginal() // Call the underlying real method
    future.complete(result) // Release the Testing Thread
    result // return the result to continue the Executing Thread and let it clean up resources
}

This Arrange block looks very similar. But what now happens is that we get access to the result of the KafkaConsumer from the Executing Thread to the Testing Thread, without busy waiting or a spin lock. We can get it like this:

// Assert
val consumerResult = future.get(5, TimeUnit.Seconds)

The behaviour here is quite similar to the Semaphore. If we have a result, we just continue. Otherwise we park our thread and let it be invoked once there is a call to future.complete. So, also synchron friendly.

One word about another Kafka approach

An alternative to wrapping the consumer would be, to have the same Semaphore, or CompletableFuture and pass it into a custom consumer, listening to the server. Once this consumer consumes the response, we then could go ahead and release the semaphore. The Arrange block would be reduced to something like kafkaServer.registerConsumer(TestBarrierConsumer("topic", semaphore)) , no every, no spies needed. Our assert block would not change, but now we even validated that the response is serialised correctly and dispatched correctly.

This is just a little bit more overhead to let the test go even further and be even more save not to interfere with the Executing Thread. Since this blog post is concerned about the general approach though, i will toss this aside and put a sticky not on it to pick it up in another blog post.

Of course I cast it in a framework

This problem is something that has been bogging me for a long time. Seeing spin lock or even busy waiting (did not see that yet, which I am very happy about) on application level in professional code always tingles the back of my head. So I cast it into an extension to mockk, a framework called MockK-Method-Probing. You can use it like this and achieve the same behaviour.

For example using a MethodBarrier. You can create one like this:

// Arrange
val barrier = barrier { consumer.consume(any()) }
...

And you can wait until the method is called whilst also asserting that no exception was raised like this:

// Assert
barrier.tryToTraverse()
...

A MethodBarrier alone is fast and easy, but does not hold any information about the caller or the called. It is just a Semaphore wrapper, yet with enhanced exception handling. If we want more information, we can construct a MethodProbe like this:

// Arrange
val methodProbe = probe { toTest.testMethod(any()) }
...

And the have the power to do (for example) the following:

// Assert
probe.assertThatExecutionTimeMillis()
            .isLessThanOrEqualTo(100)


val result = probe.getResult() 
// or, fluent:
probe.assertThatResult()
            .isNotNull


val args = probe.getArgument<Message>(0)
// or, fluent:
probe.assertThatArguments()
            .isNotEmpty

There is a lot of wrapping stuff, making absolutely sure that your test does what it is supposed to do. Also there is support for partial or strickt mocks (which we did not cover right here) and and and. But before drifting of to much, let’s wrap this up.

Conclusion and my own opinion

After all of this talk, what could potentially be the conclusion, other than “Use business locks“? As I sit here, trying to challenge myself to find a reason to use spin lock instead of business locks, I cannot find any I’d call “valid”. I might be wrong and someone will very likely and rightfully point this out, but for now i cannot find anything.

The only two thing that comes to my mind would be:

  • A scenario where you use a framework that generates code at compile time and does not allow you to encapsulate that. But if you are using such a framework, I guess you have a lot of different issues.
  • Fear about the complexity. This fear is totally valid. Asynchronous code is complex in nature. Synchronisation is by all means non trivial. But if you implement it once, or use an existing library that does it for you, at the application level you should not be confronted with the complexity

I provided this example right here using mockk and Kotlin, but really, you could do this in any language with any mocking library that allows for constructing spy classes.

To finish up, let’s pull up again something that i had stated earlier. I/O operations, like reading from a hard drive are, at the OS level, asynchronous. When and if the bytes arrive you can’t tell. So, what you do is, you setup a callback listener on the return lane for the data and suspend the process, that is reading. You then wake the process back up, once all of the data has arrived.

There are, like with the examples I brought up earlier, a lot more concepts that allow for partial wakup (so you see the loading bar crawl) and and and, but at it’s core, this is the way it works. It is done this way to free up the resources and still be as fast as possible. Using a spin lock right here would be very inefficient, since the process would be active all the time, meaning the core (or Thread) would be 100% occupied whilst checking the current state und interrupting other process regularly. By suspending the process, the processor core can do different operations while waiting for the so called interrupt of the I/O callback.

If you would not have such a mechanism, yet wanted to implement it in such a way to not block the process and therefor over consuming the processors resources, think about how it would look like on application level. Imagine pressing F5 all the time, to get the current amount of transferred data, seeing the amount of bytes that have been send up until this point and potentially even not recognise when it is done until the next day. Sure, it works, but how good would it be?

So, my personal (and likely naive) conclusion is:

Try to use business locks as much as possible, but implement them once. Have one approach and stick to it. Implement new once only very carefully.

5

Ein Gedanke zu “Testing Asynchronous Code”

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.