Reactive Streams on Kotlin: SharedFlow and StateFlow
In this tutorial, you’ll learn about reactive streams in Kotlin and build an app using two types of streams: SharedFlow and StateFlow. By Ricardo Costeira.
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Contents
Reactive Streams on Kotlin: SharedFlow and StateFlow
30 mins
- Getting Started
- SharedFlow
- Handling Shared Events
- Event Emission With SharedFlow
- Replay and Buffering
- Subscribing to Event Emissions
- Collecting the SharedFlow
- Applying the Stream Data to the View
- SharedFlow and Channels
- StateFlow
- Handling App State
- Event Emission With StateFlow
- Subscribing to State Updates
- StateFlow and Channels
- Hot Flows, RxJava and LiveData
- Challenge: Using SharedFlow To Handle Screen Events
- Where to Go From Here?
Event streams have become standard on Android. For years, RxJava has been the standard for reactive streams. Now, Kotlin provides its own reactive streams implementation, called Flow. Like RxJava, Kotlin Flow can create — and react to — streams of data. Also like RxJava, the event streams can come from cold or hot publishers. The difference between the two is simple: Although cold streams emit events only if there are any subscribers, hot streams can emit new events even without having any subscribers reacting to them. In this tutorial, you’ll learn about Flow’s hot stream implementations, called SharedFlow and StateFlow. More specifically, you’ll learn:
- What a SharedFlow is.
- What a StateFlow is and how it relates to SharedFlow.
- How these hot stream flows compare to RxJava, Channels and LiveData.
- How you can use them on Android.
You might ask yourself: “Why use Kotlin’s SharedFlow and StateFlow over RxJava though?” Although RxJava gets the job done well, some like to describe it as “using a bazooka to kill an ant”. In other words, although the framework works, it’s fairly easy to get carried away with all its capabilities. Doing so can lead to overly complex solutions and code that’s hard to understand. Kotlin Flow provides more direct and specific implementations for reactive streams.
You also need to be familiar with at least the basics of Kotlin coroutines and flow. For coroutines, you can check out our Kotlin Coroutines Tutorial for Android: Getting Started and Kotlin Coroutines Tutorial for Android: Advanced tutorials. For Flow, you can look at our Kotlin Flow for Android: Getting Started tutorial.
Getting Started
Download the project materials by clicking the Download Materials button at the top or bottom of this tutorial and open the starter project.
You’ll work on an app called CryptoStonks5000. This app has two screens: The first screen shows the user a few cryptocurrencies, while the second shows the price progression for a cryptocurrency in the past 24 hours.
To learn about shared flows and state flows, you’ll:
- Implement an event stream with
SharedFlow
that emits events shared between screens. - Refactor CryptoStonks5000 to use
StateFlow
to handle view state.
The project follows a Clean Architecture approach and MVVM pattern.
Build and run the project just to make sure everything is working. After that, it’s time to learn about shared flows!
SharedFlow
Before getting into the code, you need to at least be aware of what a SharedFlow is.
A shared flow is, at its core, a Flow. But it has two main differences from the standard Flow implementation. It:
- Emits events even if you don’t call
collect()
on it. After all, it is a hot stream implementation. - Can have multiple subscribers.
Notice the term “subscribers” used here instead of “collectors” like you would see with a regular Flow. This change in naming is because shared flows never complete. In other words, when you call Flow.collect()
on a shared flow, you’re not collecting all its events. Instead, you’re subscribing to the events that get emitted while that subscription exists.
Although this also means that calls to Flow.collect()
on shared flows don’t complete normally, the subscription can still be canceled. As you might expect, this cancellation happens by canceling the coroutine.
Flow.take(count: Int)
can force a shared flow to complete.
With that out of the way, it’s time to code.
Handling Shared Events
You’ll implement a fake price notification system to mimic coin value variations. It has to be a fake one because the real thing’s just too volatile. :]
Users should be aware of these variations no matter which screen they’re in. To make that possible, you’ll create a shared flow in a ViewModel
shared by all screens.
In the presentation package, find and open CoinsSharedViewModel.kt.
To start, you need to know how to create a shared flow. Well, it’s your lucky day, because you’re about to create two in a row! Add this code at the top of the class:
private val _sharedViewEffects = MutableSharedFlow<SharedViewEffects>() // 1
val sharedViewEffects = _sharedViewEffects.asSharedFlow() // 2
In this code:
- You call
MutableSharedFlow
. This creates a mutable shared flow that emits events of typeSharedViewEffects
, which is a simple sealed class to model the possible events. Note that this is a private property. You’ll use this one internally to emit events while exposing an immutable shared flow to make them visible externally. - You create the public immutable shared flow mentioned above by calling
asSharedFlow()
on the mutable shared flow. This way, the immutable exposed property always reflects the value of the mutable private one.
Having these two properties is a good practice. Not only does it give you the freedom to emit whatever you want internally through _sharedViewEffects
, but it also makes it so external code can only react to those emissions by subscribing to sharedViewEffects
. As such, the subscribing code has no power to change the shared flow, which is a neat way of forcing a robust design and separation of concerns and avoiding mutability bugs.
Event Emission With SharedFlow
OK, you have your flows. Now, you need to emit something with them: price variations. CoinsSharedViewModel
calls getPriceVariations()
in its init
block, but the method doesn’t do anything yet.
Add this code to getPriceVariations()
:
viewModelScope.launch { // 1
for (i in 1..100) { // 2
delay(5000) // 3
_sharedViewEffects.emit(SharedViewEffects.PriceVariation(i)) // 4
}
}
This code does a few different things. It:
- Launches a coroutine.
- Runs a
for
loop from one to 100 inclusive. - Delays the coroutine for five seconds.
delay()
checks for cancellation, so it’ll stop the loop if the job gets canceled. - Calls
emit
on the mutable shared flow, passing it an instance ofPriceVariation
, which is an event fromSharedViewEffects
.
That emit(value: T)
is one of the two event emission methods you can call on a shared flow. The alternative is to use tryEmit(value: T)
.
The difference between the two is that emit
is a suspending function, while tryEmit
isn’t. This small difference results in a huge behavioral contrast between the two methods. To explain this, though, you need to dive deep into shared flow’s replay cache and buffering. Buckle up!
Replay and Buffering
MutableSharedFlow()
accepts three parameters:
public fun <T> MutableSharedFlow(
replay: Int = 0, // 1
extraBufferCapacity: Int = 0, // 2
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND // 3
): MutableSharedFlow<T>
Here’s what they’re used for:
- replay: The number of values replayed to new subscribers. It can’t be negative and it defaults to zero.
-
extraBufferCapacity: The number of values buffered. It can’t be negative and it defaults to zero. The sum of this value plus
replay
comprises the total buffer of the shared flow. -
onBufferOverflow: Action to take when buffer overflow is reached. It can have three values:
BufferOverflow.SUSPEND
,BufferOverflow.DROP_OLDEST
orBufferOverflow.DROP_LATEST
. It defaults toBufferOverflow.SUSPEND
.
Default Behavior
This can get quite tricky to understand, so here’s a short animation of a possible interaction with a shared flow built with the default values. Assume the shared flow uses emit(value: T)
.
Going step by step:
- This shared flow has three events and two subscribers. The first event is emitted when there are no subscribers yet, so it gets lost forever.
- By the time the shared flow emits the second event, it already has one subscriber, which gets said event.
- Before reaching the third event, another subscriber appears, but the first one gets suspended and remains like that until reaching the event. This means
emit()
won’t be able to deliver the third event to that subscriber. When this happens, the shared flow has two options: It either buffers the event and emits it to the suspended subscriber when it resumes or it reaches buffer overflow if there’s not enough buffer left for the event. - In this case, there’s a total buffer of zero —
replay + extraBufferCapacity
. In other words, buffer overflow. BecauseonBufferOverflow
is set withBufferOverflow.SUSPEND
, the flow will suspend until it can deliver the event to all subscribers. - When the subscriber resumes, so does the stream, delivering the event to all subscribers and carrying on its work.
onBufferOverflow = BufferOverflow.SUSPEND
when the total buffer value amounts to zero. Because tryEmit(value: T)
doesn’t suspend, it won’t work if you use it with the default replay
and extraBufferCapacity
values. In other words, the only way to emit events with tryEmit(value: T)
is by having, at least, a total buffer of one.
With Replay
OK, that wasn’t so bad. What happens if there’s a buffer, though? Here’s an example with replay = 1
:
Breaking it down:
- When the shared flow reaches the first event without any active subscribers, it doesn’t suspend anymore. With
replay = 1
, there’s now a total buffer size of one. As such, the flow buffers the first event and keeps going. - When it reaches the second event, there’s no more room in the buffer, so it suspends.
- The flow remains suspended until the subscriber resumes. As soon as it does, it gets the buffered first event, along with the latest second event. The shared flow resumes, and the first event disappears forever because the second one now takes its place in the replay cache.
- Before reaching the third event, a new subscriber appears. Due to
replay
, it also gets a copy of the latest event. - When the flow finally reaches the third event, both subscribers get a copy of it.
- The shared flow buffers this third event while discarding the previous one. Later, when a third subscriber shows up, it also gets a copy of the third event.
With extraBufferCapacity and onBufferOverflow
The process is similar with extraBufferCapacity
, but without the replay-like behavior. This third example shows a shared flow with both extraBufferCapacity = 1
and onBufferOverflow = BufferOverflow.DROP_OLDEST
:
In this example:
- The behavior is the same at first: With a suspended subscriber and a total buffer size of one, the shared flow buffers the first event.
- The different behavior starts on the second event emission. With
onBufferOverflow = BufferOverflow.DROP_OLDEST
, the shared flow drops the first event, buffers the second one and carries on. Also, notice how the second subscriber does not get a copy of the buffered event: Remember, this shared flow hasextraBufferCapacity = 1
, butreplay = 0
. - The flow eventually reaches the third event, which the active subscriber receives. The flow then buffers this event, dropping the previous one.
- Shortly after, the suspended subscriber resumes, triggering the shared flow to emit the buffered event to it and cleaning up the buffer.
Subscribing to Event Emissions
OK, good job getting this far! You now know how to create a shared flow and customize its behavior. There’s only one thing left to do, which is to subscribe to a shared flow.
In the code, go to the coinhistory package inside presentation and open CoinHistoryFragment.kt. At the top of the class, declare and initialize the shared ViewModel
:
private val sharedViewModel: CoinsSharedViewModel by activityViewModels { CoinsSharedViewModelFactory }
You want the shared flow to emit no matter which screen you’re in, so you can’t bind this ViewModel
to this specific Fragment
. Instead, you want it bound to the Activity
so it survives when you go from one Fragment
to another. That’s why the code uses the by activityViewModels
delegate. As for CoinsSharedViewModelFactory
, don’t worry about it: Every ViewModel
factory in the app is already prepared to properly inject any dependencies.
Collecting the SharedFlow
Now that you have the shared ViewModel
, you can use it. Locate subscribeToSharedViewEffects()
. Subscribe to the shared flow here by adding the following code:
viewLifecycleOwner.lifecycleScope.launchWhenStarted { // 1
sharedViewModel.sharedViewEffects.collect { // 2
when (it) {
// 3
is SharedViewEffects.PriceVariation -> notifyOfPriceVariation(it.variation)
}
}
}
This code has a few important details:
- The coroutine is scoped to the
View
instead of theFragment
. This ensures the coroutine is alive only while theView
is alive, even if theFragment
outlives it. The code creates the coroutine withlaunchWhenStarted
, instead of the most commonlaunch
. This way, the coroutine launches only when the lifecycle is at least in theSTARTED
state, suspends when it’s at least in theSTOPPED
state and gets canceled when the scope is destroyed. Usinglaunch
here can lead to potential crashes, as the coroutine will keep processing events even in the background. - As you can see, subscribing to a shared flow is the same as subscribing to a regular flow. The code calls
collect()
on theSharedFlow
to subscribe to new events. - The subscriber reacts to the shared flow event.
Keep in mind at all times that even using launchWhenStarted
, the shared flow will keep emitting events without subscribers. As such, you always need to consider the wasted resources. In this case, the event emission code is fairly harmless. But things can get heavy, especially if you turn cold flows into hot ones using something like shareIn
.
Applying the Stream Data to the View
Back in the code, you can see notifyOfPriceVariation()
doesn’t exist yet. Add it as well:
private fun notifyOfPriceVariation(variation: Int) {
val message = getString(R.string.price_variation_message, variation)
showSnackbar(message)
}
Easy-peasy. Build and run the app. Now, when you go to the coin history screen, you’ll see some periodical Snackbar
messages at the bottom. The shared flow will only start emitting when you go to that screen, though. Even if the CoinsSharedViewModel
instance is bound to the Activity
, it’s only created when you first visit the coin history screen.
You want all screens to be aware of price changes, so this isn’t ideal. To fix it, do the exact same changes in CoinListFragment
:
- Create the
CoinsSharedViewModel
instance in the same way. - Add the code to
subscribeToSharedViewEffects()
. - Create
notifyOfPriceVariation()
.
Build and run the app. You’ll now see the periodical Snackbar
messages in CoinListFragment
as well. As you switch screens, you’ll see that the messages always show the next event and not the previous ones. MutableSharedFlow()
in CoinsSharedViewModel
is using the default parameters. But feel free to play around with it to see how it affects the shared flow!
SharedFlow and Channels
Like shared flows, channels represent hot streams. But this doesn’t mean shared flow will replace the channels API — not entirely, at least. :]
SharedFlow
is designed to completely replace BroadcastChannel
. Not only is SharedFlow
simpler and faster to use, but it’s a lot more versatile than BroadcastChannel
. Keep in mind, though, that other elements from the channels API can and should still be used when it makes sense to do so.
StateFlow
A state flow is structured like a shared flow. This is because StateFlow
is nothing more than a specialization of SharedFlow
. In fact, you can create a shared flow that behaves exactly like a state flow:
val shared = MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(InitialState()) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior
The code above creates a shared flow that emits the latest value only to any new subscribers. Due to that distinctUntilChanged
at the bottom, it’ll only emit any value if it’s different from the previous one. This is exactly what a state flow does, which makes it great for holding and handling state.
Handling App State
There are simpler ways of creating state flows though, which you’ll use now. Expand the coinlist package and, inside, open CoinListFragmentViewModel.kt. This simple ViewModel
uses LiveData
to expose a view state class to CoinListFragment
. The state class itself is also fairly simple, and it has default values to match the initial view state:
data class CoinListFragmentViewState(
val loading: Boolean = true,
val coins: List<UiCoin> = emptyList()
)
The Fragment
then uses the current state to update the view by observing the LiveData
:
// Code in CoinListFragment.kt
private fun observeViewStateUpdates(adapter: CoinAdapter) {
viewModel.viewState.observe(viewLifecycleOwner) { updateUi(it, adapter) }
}
Start the refactoring by changing MutableLiveData
to a MutableStateFlow
. So in CoinListFragmentViewModel
, go from:
private val _viewState = MutableLiveData(CoinListFragmentViewState())
To:
private val _viewState = MutableStateFlow(CoinListFragmentViewState())
Make sure to include the necessary import for MutableStateFlow
. This is how you create a mutable state flow. Unlike shared flows, state flows require an initial value or, in other words, an initial state. But because state flow is a specific implementation of shared flow, there’s no way for you to customize things like replay
or extraBufferCapacity
. Regardless, the generic rules and constraints for shared flows still apply.
Next, update the immutable LiveData
accordingly, from:
val viewState: LiveData<CoinListFragmentViewState> get() = _viewState
To:
val viewState: StateFlow<CoinListFragmentViewState> get() = _viewState
Of course, you could also do:
val viewState = _viewState.asStateFlow()
Add the import for StateFlow
. Be it a shared flow or a state flow, you can create an immutable one with both options. The advantage of using asStateFlow()
or asSharedFlow()
is that you get the extra safety of explicitly creating an immutable version of the flow. This avoids things like creating another mutable version by mistake.
Event Emission With StateFlow
A difference worth noting between shared and state flows is event emission. You can still use emit
and tryEmit
with state flow, but … don’t. :]
Instead, you should do:
mutableState.value = newState
The reason is that updates to value
are always conflated, which means that even if you update it faster than subscribers can consume it, they’ll get the most recent value only. One thing to keep in mind is that whatever you assign to value
has to be a completely different object from whatever was there before. For instance, take this code:
data class State(
var name: String = "",
var age: Int = -1
)
val mutableState = MutableStateFlow<State>(State())
// ...
// newState and mutableState.value will reference the same object
val newState = mutableState.value
// Reference is the same, so this is also changing mutableState.value!
newState.name = "Marc"
mutableState.value = newState
In this case, the state flow won’t emit the new value. Because the referenced object is the same, the equality comparison will return true, so the flow will assume it’s the same state.
To make this work, you need to use immutable
objects. For example:
data class State(
val name: String = "",
val age: Int = -1
)
val mutableState = MutableStateFlow<State>(State())
// ...
mutableState.value = State(name = "Marc")
This way, the state flow will properly emit a state update. Immutability saves the day once again. :]
Back at the code, the cool thing about replacing LiveData
with StateFlow
is that both of them use a property called value
, so nothing changes there.
There’s one last change to make in CoinListFragmentViewModel
, inside the requestCoinList()
method. You can now update that if
condition at the beginning to:
if (viewState.value.coins.isNotEmpty()) return
You don’t need the ?
anymore, because value
won’t be null
. Also, you invert the condition by using isNotEmpty()
instead of isNullOrEmpty()
and by dropping !
at the beginning. This makes the code a little more readable.
If you try to build the app, you get an error on CoinListFragment
stating that there’s an unresolved reference to observe
. StateFlow
doesn’t have an observe
method, so you need to refactor that as well.
Subscribing to State Updates
Open CoinListFragment.kt. Find observeViewStateUpdates()
and update it to:
private fun observeViewStateUpdates(adapter: CoinAdapter) {
viewLifecycleOwner.lifecycleScope.launchWhenStarted {
viewModel.viewState.collect { updateUi(it, adapter) }
}
}
This code is much like what you did with SharedFlow
in the sense that the same logic applies. Despite this, you might worry about the state flow emitting values when the app is in the background. But you don’t need to. It’s true that, because it’s scoped to viewModelScope
, it’ll still emit even without any subscribers as long as the ViewModel
exists. Regardless, state flow emissions are lightweight operations: It’s just updating the value
and notifying all subscribers. Plus, you probably do want the app to show you the latest UI state when it comes to the foreground.
Build and run the app. Everything should work as before because you just refactored the code. Good job on using StateFlow
!
StateFlow and Channels
Like SharedFlow
can replace BroadcastChannel
completely, StateFlow
can replace ConflatedBroadcastChannel
completely. There are a couple reasons for this. StateFlow
is simpler and more efficient than ConflatedBroadcastChannel
. It also has better distinction between mutability and immutability with MutableStateFlow
and StateFlow
.
Hot Flows, RxJava and LiveData
You’re now aware of how both SharedFlow
and StateFlow
work. But are they even useful on Android?
Although they might not bring anything “new”, they provide more direct and efficient alternatives to the table. For instance, wherever you’d use an RxJava‘s PublishSubject
, you can use a SharedFlow
. Or wherever you’d use a BehaviorSubject
, you can probably use a StateFlow
. In fact, if hot event emission is not an issue, StateFlow
can even easily replace LiveData
.
SharedFlow
and StateFlow
objects in to LiveData
as well with the AndroidX lifecycle-livedata-ktx
library. The library provides an extension method asLiveData()
that allows you to convert the flow and expose it as LiveData
for consumption in your view. For more details, see the StateFlow, Flow, and LiveData section of the Android Developers StateFlow and SharedFlow article.
So, putting it in simpler terms:
- If you have some kind of state management, you can use
StateFlow
. - Whenever you have some event stream going on, where it’s not a problem if events aren’t handled by all possible subscribers or past events might not be handled at all, you can use
SharedFlow
.
Challenge: Using SharedFlow To Handle Screen Events
Congratulations! This concludes the tutorial. If you want to go the extra mile, you can also handle the screen-specific events modeled in the CoinListFragmentViewEffects
and CoinHistoryFragmentViewEffects
classes using shared flows. These are events that should be handled exactly once, which means a simple Channel
would fit better — remember that shared flows drop events when there are no subscribers. Still, you can do it with shared flows for practice. If you’re curious, the final project in the project materials has a sample implementation.
Where to Go From Here?
If you want to learn more about StateFlow
and SharedFlow
, you can check their corresponding documentation pages here and here, respectively.
You can also find some information specific to shared and state flows usage on Android in the Android developers page.
If you’re curious about turning cold flows into hot ones, you can check out the following articles:
- A safer way to collect flows from Android UIs
- Things to know about Flow’s shareIn and stateIn operators
Finally, if the challenge section piqued your interest in using Channel
s to handle screen events, you can check out this interesting article on Medium.
I hope you enjoyed this tutorial. If you have any questions, tips or comments, feel free to join the discussion below. :]
All videos. All books.
One low price.
A Kodeco subscription is the best way to learn and master mobile development — plans start at just $19.99/month! Learn iOS, Swift, Android, Kotlin, Flutter and Dart development and unlock our massive catalog of 50+ books and 4,000+ videos.
Learn more