Unit Testing MutableSharedFlow

jeffreysoboe
4 min readJun 11, 2021

--

Unit testing a MutableSharedFlow, as opposed to a regular Flow or a MutableStateFlow, comes with some additional complexity.

The crux of the problem is that MutableSharedFlow does not retain a value, and .collect() (which is a suspend function) does not ever complete/cancel on its own.

For example, let’s say we have a simple class with a MutableSharedFlow.

class NameEmitter {

private var iteration = 0
private val _names = MutableSharedFlow<String>()
val names: SharedFlow<String>() = _names
suspend fun emitName() {
iteration++
_names.emit("name + $iteration")
}
}

In this class, we want to test that every time emitName() is called, the values are emitted from names in this sequence: "name 0", "name 1", "name 2", etc, etc.

First Attempt:

@Test
fun testNameEmitter() {
val nameEmitter = NameEmitter()
val emittedNames = mutableListOf<String>()

runBlocking {
nameEmitter.collect { nameEmitter.add(it) }
nameEmitter.emitName()
}

emittedNames shouldEqual listOf("name 0")
}

In this example, we create a runBlocking scope in order to call .collect() and .emitName() within a suspend function. We call .collect() before .emit(), because if we emitName() before observing the values, the value will simply be lost as there is no observer.

However, if we run this test, the test will run forever and never complete, because .collect() never completes. It is continuing to listen for more values, so the test is suspended forever.

If instead we change this to use runBlockingTest the test completes with a specific error:

java.lang.IllegalStateException: This job has not completed yet

This is much more useful to us, allowing the unit test to fail quickly so other tests may also be run.

So how do we ensure that .collect() completes?

Can we use a limited terminal operator?

Some articles recommend using .take(1) or first() on a Flow in order to get a certain number of values and then complete the flow, which would ensure the runBlocking scope completes, allowing the test to complete.

This would work for a MutableStateFlow where there is always at least one value, but in the case of MutableSharedFlow, there are no retained values. Both .take() and .first() eventually call .collect() under the hood, and so we end up having the same problem:

If we emit a value and then call .first(), .first()never completes because the value was emitted before there was an observer of the flow. If we call .first() before emitting a value, then the .collect() within .first() never completes because the flow has not yet emitted its first value.

We end up with the same problem — a test that never completes. Instead…

Manually cancel the containing .collect() coroutine.

Therefore we need to call .collect(), then emit a value, and then manually cancel the .collect() in order to ensure the test completes. We can use .launch()to create a new coroutine with the same context, get a handle on its Job, and then cancel it manually after emitting a value.

@Test
fun testNameEmitter() {
val nameEmitter = NameEmitter()
val emittedNames = mutableListOf<String>()

runBlocking {
val collectJob = launch {
nameEmitter.collect { nameEmitter.add(it) }
}
nameEmitter.emitName()

collectJob.cancel()
}

emittedNames shouldEqual listOf("name 0") // fails - actual = []
}

The tests fails with an empty list! Why???

One interesting thing about Coroutines is that they are not immediately scheduled, even when on the main thread. Our coroutine to .collect() within the launch block is not immediately scheduled, but instead, it is scheduled to happen soon. In this case, we called .cancel() before our .collect() is ever executed, so the flow is never collected and we are left with an empty list.

See https://medium.com/@trionkidnapper/launching-a-kotlin-coroutine-for-immediate-execution-on-the-main-thread-8555e701163b for more on why launch is not synchronous, even when on the main thread.

RunBlockingTest to the rescue!

Lucky for us, there is a quick fix! If you look back at our test, you will notice we used a runBlocking scope. Instead, we can use runBlockingTest to ensure that launch executes immediately. Notice in the runBlockingTest documentation:

This is similar to [runBlocking] but it will immediately progress past delays and into [launch] and [async] blocks.

Perfect! Now we have another reason to use RunBlockingTest instead of RunBlocking.

Working Solution:

@Test
fun testNameEmitter() {
val nameEmitter = NameEmitter()
val emittedNames = mutableListOf<String>()

runBlockingTest {
val collectJob = launch {
nameEmitter.collect { nameEmitter.add(it) }
}
nameEmitter.emitName()

collectJob.cancel()
}

emittedNames shouldEqual listOf("name 0")
}

Success! A proper way to test our MutableSharedFlow.

Important Notes:

Keep in mind while testing that each inner launch {} is also suspended and blocked when collect is called. That means calling your emitter within the launch {} after .collect()will cause the emitter to never fire, since .collect() is still suspended.

@Test
fun testNameEmitter() {
val nameEmitter = NameEmitter()
val emittedNames = mutableListOf<String>()

runBlocking {
val collectJob = launch {
nameEmitter.collect { nameEmitter.add(it) } // suspends
nameEmitter.emitName() // never reaches this line
}

collectJob.cancel()
}

emittedNames shouldEqual listOf("name 0") //test never completes
}

Testing multiple MutableSharedFlow:

This also means if we have multiple flows collecting within the same test, we also need to make sure each one has its own job, because we cannot call .collect() twice within the same launch {} block, as the second .collect()will not be executed.

@Test
fun testTwoNameEmitter() {
val nameEmitter = NameEmitter()
val firstEmittedNames = mutableListOf<String>()
val secondEmittedNames = mutableListOf<String>()
val jobs = mutableListOf<Job>()
runBlocking {
val jobs += launch {
firstNameEmitter.collect { firstEmittedNames.add(it) }
}
val jobs += launch {
secondNameEmitter.collect { secondEmittedNames.add(it) }
}
nameEmitter.emitNames()
jobs.cancel()
}

firstEmittedNames shouldEqual listOf("name 0")
secondEmittedNames shouldEqual listOf("second name 0")
}

--

--

jeffreysoboe

Jeff Padgett is an Android Developer at Accenture, Digital Products.