Kotlin Flow for Android: Getting Started

In this tutorial, you’ll learn about the basics of Kotlin Flow, and you’ll build an Android app that fetches weather forecast data using Flow. By Dean Djermanović.

4.8 (35) · 2 Reviews

Download materials
Save for later
Share
You are currently viewing page 3 of 4 of this article. Click here to view the first page.

Fetch Data

You'll start by implementing the logic to fetch the forecast data. Open HomeActivity.kt. In onCreate(), add a call to fetchLocationDetails(), right below initUi():

homeViewModel.fetchLocationDetails(851128)

fetchLocationDetails() accepts a cityId as an argument. For now, you'll pass the hardcoded ID. You'll add a search feature later that will allow you to search for a specific location.

Build and run the project. You still won't see anything on the screen:

Sunzoid Empty Screen

But this time the app has fetched the forecast data and saved it to the Room database! :]

Room and Flow

In Room 2.1, the library added coroutine support for one-off operations. Room 2.2 added Flow support for observable queries. This enables you to get notified any time you add or remove entries in the database.

In the current implementation, only the user can trigger data fetching. But you can easily implement logic that schedules and updates the database every three hours, for example. By doing this, you make sure your UI is up to date with the latest data. You'll use Kotlin Flow to get notified of every change in the table.

Plugging Into the Database
Open ForecastDao.kt and add a call to getForecasts(). This method returns Flow<List<DbForecast>>:

@Query("SELECT * FROM forecasts_table")
fun getForecasts(): Flow<List<DbForecast>>

getForecasts() returns forecast data for a specific city from forecasts_table. Whenever data in this table changes, the query executes again and Flow emits fresh data.

Next, open WeatherRepository.kt and add a function called getForecasts:

fun getForecasts(): Flow<List<Forecast>>

Next, add the implementation to WeatherRepositoryImpl.kt:

override fun getForecasts() =
    forecastDao
      .getForecasts()
      .map { dbMapper.mapDbForecastsToDomain(it) }

This method uses the forecastDao to get data from the database. The database returns the database model. It's a good practice for every layer in the app to work with its own model. Using map(), you convert the database model to the Forecast domain model.

Open HomeViewModel.kt and add forecasts, like so:

//1
val forecasts: LiveData<List<ForecastViewState>> = weatherRepository
    //2
    .getForecasts()
    //3
    .map {
      homeViewStateMapper.mapForecastsToViewState(it)
    }
    //4
    .asLiveData()

There are a few things going on here:

  1. First, you declare forecasts of the LiveData<List<ForecastViewState>> type. The Activity will observe changes in forecasts. forecasts could have been of the Flow<List<ForecastViewState>> type, but LiveData is preferred when implementing communication between View and ViewModel. This is because LiveData has internal lifecycle handling!
  2. Next, reference weatherRepository to get the Flow of forecast data.
  3. Then call map(). map() converts the domain models to the ForecastViewState model, which is ready for rendering.
  4. Finally, convert a Flow to LiveData, using asLiveData(). This function is from the AndroidX KTX library for Lifecycle and LiveData.

Context Preservation and Backpressure

The collection of a Flow always happens in the context of the parent coroutine. This property of Flow is called context preservation. But you can still change the context when emitting items. To change the context of emissions you can use flowOn().

You could have a scenario in which the Flow produces events faster than the collector can consume them. In reactive streams, this is called backpressure. Kotlin Flow supports backpressure out of the box since it's based on coroutines. When the consumer is in a suspended state or is busy doing some work, the producer will recognize that. It will not produce any items during this time.

Observing Values

Finally, open HomeActivity.kt and observe forecasts from initObservers():

homeViewModel.forecasts.observe(this, Observer {
  forecastAdapter.setData(it)
})

Whenever forecasts change in the database, you'll receive new data in the Observer, and display it on the UI.

Build and run the app. Now the home screen displays forecast data! :]

Sunzoid With Forecast Data

Congratulations! You've implemented communication between multiple layers of your app using Flow and LiveData!

Cancellation

In HomeViewModel.kt, you're observing the forecasts. You've noticed that you never stop observing. How long is this observed, then?

In this case, the Flow collection starts when LiveData becomes active. Then, if LiveData becomes inactive before the Flow completes, the flow collection is canceled.

The cancellation occurs after a timed delay unless LiveData becomes active again before that timeout. The default delay triggering cancellation is 5000 milliseconds. You can customize the timeout value if necessary. The timeout exists to handle cases like Android configuration changes.

If LiveData becomes active again after cancellation, the Flow collection restarts.

Exceptions

Flow streams can complete with an exception if an emitter or code inside the operators throws an exception. catch() blocks handle exceptions within Flows. You can do this imperatively or declaratively. A try-catch block on the collector's side is an example of an imperative approach.

It's imperative because these catch any exceptions that occur in the emitter or in any of the operators.

You can use catch() to handle errors declaratively instead. Declarative here means you declare the function to handle errors. And you declare it within the Flow itself, and not a try-catch block.

Open HomeViewModel.kt and navigate to forecasts. Add catch() right before map(). To simulate errors in the stream, throw an exception from map():

val forecasts: LiveData<List<ForecastViewState>> = weatherRepository
    .getForecasts()
    .catch {
      // Log Error
    }
    .map {
      homeViewStateMapper.mapForecastsToViewState(it)
      throw Exception()
    }
    .asLiveData()

Build and run the app. You'll notice that the app crashes! catch() catches only upstream exceptions. That is, it catches exceptions from all the operators above the catch. catch() doesn't catch any exception that occurs after the operator.

Now move catch() below map():

val forecasts: LiveData<List<ForecastViewState>> = weatherRepository
    .getForecasts()
    .map {
      homeViewStateMapper.mapForecastsToViewState(it)
      throw Exception()
    }
    .catch {
      // Log Error
    }
    .asLiveData()

Build and run the app again. Now you'll see an empty screen:

Sunzoid Thrown Exception

This is an example of exception transparency, where you're able to separate the handling of exceptions that occur in the Flow from the collection of values. You're also being transparent about exceptions, as you don't hide any errors, you explicitly handle them in an operator!

Before proceeding, remove the line that throws an exception from map().

Searching Locations

So far, your app displayed a forecast for a hardcoded location. Now you'll implement the search functionality! This will allow users to search for a specific location using coroutines and Flow. As the user types in the search box, the app will perform a search for every letter typed and will update the search result.

In HomeActivity.kt, you already have a listener attached to the search view. When the user changes query text, the app sends the new value to queryChannel in HomeViewModel.kt. HomeViewModel.kt uses a BroadcastChannel as a bridge to pass the text from the view to the ViewModel. offer() passes the text and synchronously adds the specified element to the channel.