Fueled Reactive apps with Asynchronous Flow — Part 3 — Data layer Implementation

Raul Hernandez Lopez
ProAndroidDev
Published in
8 min readJun 13, 2020

--

Implementation details cover

(This article was featured at Android #418 Weekly)

The most exciting moment for a software engineer surely is when enjoys coding and gets stuff done. However, for the pragmatic developer, implementation details should be as important as the process to get things done, because the difference is on the details, isn’t?

Throughout Part 3, I will elaborate throughout the Implementations details from the Data layer. Firstly I will start with “Data Sources Design: Network & DB”. I will go deep inside “Repository Design Step 1: RxJava approach”, what I call the naive approach. Finally, I will cover “Repository Design Step 2: suspend approach”, a more natural way to writing Coroutines.

But first things first, to get all started we need to add a couple of dependencies to make Kotlin Coroutines and Flow happy:

Gradle dependencies needed to run Coroutines

Any version up from 1.3 makes Coroutines stable on Kotlin.

For the first step of the migration, my approach was to go deep inside, concretely from the Data Layer. Where the data is coming from the Internet and is transformed into Database (DB) models to be used where needed.

Data Sources Design: Network & DB

This transformation starts from an old code base mostly based on RxJava + Java towards a more modern Kotlin + Coroutines equivalent.

Network + Database (DB) data sources transformation to Kotlin & suspend functions returning Observables for backwards compatibility with Repository

For further details about this and other decisions on the Migration strategy, please read Part 1:

Network data source

The NetworkDataSource implementation will be the first candidate to migrate, but let’s see what we have inside:

NetworkDataSourceImpl constructor signature

For me a DataSource should be a @Singleton, in this way, we can avoid initialising heavy artefacts many times and using them on demand (I assumed we will use network requests more than once). From the set of dependencies injected we can see:

  • A TwitterApi to make API requests built upon Retrofit composed with an OkHttp client inside.
  • a ConnectionHandler to check any connection availability.
  • a RequestIOHandler to transform network responses to their network data source model.
  • and finally a TaskThreading.

Let’s stop here to analyse a little bit more what is within TaskThreading:

TaskThreadingImpl for TaskThreading, aka the container of Schedulers

My personal preference is using composition over inheritance, this usually makes the code more scalable, extendable and testable but again, this is my preference. By having final instances created from the static methods and getters, we can easily mock each correspondent Scheduler from getter public methods for unit tests. Of course, there are multiple ways to make this, we could inject those schedulers instances directly through the constructor.

Now it’s the turn for the main method to start processing a request with the query: search(). Let’s have a look at the legacy but declarative code I had at the time using RxJava with Java code:

NetworkDataSourceImpl search method returns a Single<List<TweetApiModel>>

This method retrieves a list of Tweets from the Twitter API by subscribeOn() on the IO thread pool. Of course, passing a valid token in combination with the query whose user is looking for. By using observeOn() on the computation thread pool, to perform any transformation operations until we retrieve those tweets. We finally would wrap them into a Single (because we don’t expect more interactions with the network), in order to achieve this, flatMapSingle() transforms Maybe into a Single.

Thinking on a Coroutines version of the same method, ideally, we would just have to write the following imperative code:

NetworkDataSourceImpl with a Coroutines style

The code looks simpler, we return a bad or a wrong response wrapped with Either. Either (from the Arrow Core library) is returned with the right result or the wrong (left) result. Notice this code doesn’t show further examples of error handling (for the Twitter API) in combination with Either to simplify this snippet.

For learning purposes, starting to play with Flow, the first naive approach that came to my mind is this:

NetworkDataSourceImpl returning an Observable (naive approach)

search() is using the flow {} builder that runs into the context provided for the specific Flow and its Coroutine. This context can be switched by means of flowOn() like the IO Dispatcher defines.

Now TaskThreadingImpl is extended with a combination of Dispatchers plus the previously added Schedulers, ioDispatcher() is defined within:

TaskThreadingImpl composed by Dispatchers and Schedulers

Moreover, asObservable() converts the given flow to a cold Observable. Here is where the magic of kotlinx-coroutines-rx2 is coming from:

This library is extremely helpful and convenient in order to transform towards both directions:

RxJava ← → Coroutines

To migrate amongst layers without breaking changes, it is definitely a must-do.

We would simply add this to make it work:

Gradle kotlinx-coroutines-rx2 dependency

However, I know the previous snippet of code isn’t the best. There is an important drawback:

  • returns an Observable - This is not ideal since we only need a Single. The stream closes immediately after execution.

We’ll get back to this point later to introduce a proper solution.

DB data source

Now looking at the DB data source, Room has a convenient interface where a selected set of tweetsIds can be extracted from the DB query with different outputs such as Observable, Single, suspend functions or Flow.

Tweet table or TweetDAO (DB Data source) returning an Observable

For a DB would make more sense than for a network request to return an Observable:

  • situations like a network Poll: refreshing in the background our DB would make sure to refresh the data rendered into the UI . Therefore any new changes added into the DB will be refreshed on due time by means of the open stream.

Next, I followed my instincts and I started what I call “the naive approach”, by using RxJava first.

Repository Design Step 1: RxJava approach

Repository transformation to Kotlin & Flow returning Observables for backwards compatibility with Use Case

Let’s have a look at the TweetsRepositoryImpl anatomy:

TweetsRepositoryImpl constructor signature

The repository, for me, must be a @Singleton too. It contains very heavy objects that surely will be reused a number of times. It is composed by the NetworkDataSource interface, the different DB DAO interfaces (each one is equivalent to a table in the DB), the mappers to transform data source data into the data expected for the business logic layer, without forgetting the TaskThreading to use the right Dispatcher when necessary.

Reminder to our future-selves, this should be what the NetworkDataSourceImpl should return: Single. In case we still need to return a RxJava object for backwards compatibility:

NetworkDataSourceImpl returning a Single

This is achieved by using the kotlinx-coroutines-rx2 library again.

However, I wanted to experiment a few more things with the RxJava approach, this hasn’t finished yet!

TweetsRepositoryImpl with RxJava approach returning an Observable

Here I am getting the Observable from the NetworkDataSource::search, mapping from an Either to an entity my DB DAOs support: tweetsToAdd. Now I will only receive one iteration of the doOnNext, but the workflow is like follows:

  • Make sure to delete (the previously saved in DB) tweets belonging to the same query to avoid duplications
  • insert new tweets (TweetDao)
  • insert query (QueryDao)
  • insert association across Tweets and Query (TweetQueryJoinDao)

The first flatMap retrieves all tweetIds for each query coming from the doOnNext, the second flatMap uses them to retrieve all Tweets for each tweet id returning an Observable from that point.

Let’s start migrating RxJava to its Kotlin Flow equivalent:

TweetsRepositoryImpl with RxJava approach but using Kotlin Flow and returning an Observable

Looking at the code above first, we need to include a few annotations: FlowPreview, ExperimentalCoroutinesApi and InternalCoroutinesApi. Note that at the time I prepared this presentation, certain Flow operators were still experimental and these annotations were needed.

Kotlin Flow looks very similar. Simply changing flatMap() to become flatMapConcat() and using onEach() instead of onNext(). Internally retrieveAllTweetsForTweetsIdsFlow is returning a Flow now, due to our return@flatMapConcat is expecting to return a Flow. Concluding this block of code, finally, we still need to return an Observable towards the UseCase by means of asObservable().

It is important to have in mind that we need to use flowOn() to switch contexts only if necessary. For instance, if we declare our preferred Dispatcher on the upper layer (the Repo), we wouldn’t need to change it downstream (data sources), because the coroutine context would be the same.

Repository Design Step 2: suspend approach

To really have an idiomatic and correct Coroutines approach, we need to use suspend functions as much as possible, and this means to get rid of our RxJava approach to writing easier to read Coroutines code, using only streams when is really needed.

Repository transformation to Kotlin & Flow with suspend functions returning Observables for backwards compatibility with Use Case

The first step here is, transform everything we can into suspend functions.

Once the TweetDao returns a List by means of a suspend function we can continue.

TweetsRepositoryImpl returns a suspend function with a single List<Tweet>

Within our suspend function we can write with an imperative style.

But I still need for my repo an open stream. Thus, I need to use an Observable here, or better said, a Flow transformed into an Observable:

Ideal case returning DB and network updates through the DB

TweetsRepositoryImpl returns an Observable of Tweets after invoking asObservable(). This will transform the result from the flow builder block. That block calls emit() with the previous DB data and later emit() again with the fresh data coming from the network, once the DB was updated by calling retrieveAllTweetsForTweetsIds method from the TweetDao.

Other possible usages:

  • operator emitAll: emits all values from the stream, for example by using the previously mentioned Poll:
fun getSearchTweets(query: String): Flow<List<Tweet>> {
// ...
emitAll(tweetsDBDataSource
.retrieveAllTweetsForTweetsIdsFlow(tweetIds))
}

In that case, we would substitute emit() for emitAll(), we would need to return a Flow (an open stream of values) at the end.

Other not possible usages:

  • we cannot use inside a suspend function with a method returning a Flow:
suspend fun getSearchTweets(query: String): List<Tweet> {
// ...
return tweetsDBDataSource
.retrieveAllTweetsForTweetsIdsFlow(tweetIds)
}

A suspend function is expecting to return an Object or any of its variants like we can read at Part 2:

Until here we have learned how to come from a naive RxJava to a more idiomatic Coroutines approach, using the right tools for the right job in Kotlin, with both suspend functions and Kotlin Flow.

This is all for Part 3, so far we have reviewed the deepest layer of the Implementation sub-section:

  • Data layer Implementation

If you liked this article, clap and share it, please!

Cheers!

Raul Hernandez Lopez

GitHub | Twitter | Gists

I want to give a special thanks to Jorge Castillo for reviewing this article and to make it more readable. He knows a ton about functional programming, especially about the Arrow library (contributor and maintainer), follow him!

(Update) The newest article talking about “Synchronous communication with the UI using StateFlow" (aka how to get rid of the Callbacks):

To follow up, other Implementation sub-sections will be:

  • “Use Case layer Implementation”. How-to.
  • “View Delegate Implementation”. How-to.

And finally:

  • “Lessons learned & Next steps”. The end chapter with some reflections and personal opinions as well as closing comparative notes.

--

--

Senior Staff Software Engineer. Continuous learner, sometimes runner, some time speaker & open minded. Opinions my own.