A list of useful RxJava things!

Over the past year I've learned a bunch of useful tricks for RxJava, and I'm gonna post them here as I remember them or use them. Hopefully this'll grow to a large list/cheatsheet!


Here is a way to take the first element multiple flowables that may emit the same data (but loaded from disk or memory or network) and then emit the first one that returns. Concat checks flowable1 first, and then flowable2. I got this from this blog post from Dan Lew.
Flowable
  .concat(flowable1, flowable2)
  .firstElement()
Dan uses it for loading data first from memory, then from disk, and finally from network, so there's a lot of other stuff you can add to it too. He demonstrates how you can use a side effect to create a flowable that saves to network requests to disk and memory whenever you load it from remote:
val cache: Flowable<Data> = ...
val network: Flowable<Data> = ...
val networkWithCaching: Flowable<Data> = 
  network
    .doOnNext { data ->
      addToCache(data)
    }

Flowable
  .concat(cache, networkWithCaching)
  .firstElement()
  .subscribe { data ->
    ...
  }
He also suggests applying a filter to your concatted flowable if you are worried about stale data:
Flowable
  .concat(flowable1, flowable2)
  .filter { data ->
    data.isValid()
  }
  .firstElement()

Another useful pattern I saw in my work recently was in our networking code detailing our retry strategy. We wanted to retry network requests with an exponential delay, but switch over to a constant delay when the exponential component is too long. I think this is also known as a truncated exponential backoff. To implement this, we used a retryWhen and Flowable.amb which subscribes to all listed flowables, but only emits values from the first one.
flowable
  .retryWhen { errors ->
    errors
      .zipWith(Flowable.range(1, maxRetryCount), ::Pair)
      .delay { (error, retryCount) ->
        Flowable.amb(
          listOf(
           Flowable.timer(constantDelay, TimeUnit.MILLISECONDS),
           Flowable.timer(exponentialDelay(retryCount), TimeUnit.MILLISECONDS),
          )
        )
      }
  }
This seems to be a really useful pattern that you can use for all sorts of things, for example: fetching data in parallel from various sources, and then taking whichever source returns first.
Recently, I was tasked with writing a feature that disables a button for 20 seconds after a certain action (with a countdown timer). This could be useful for any action that we don't want a user triggering way too often, but also want to alert the user as to how often they can trigger this action. Here is the code:
val processor = PublishProcessor.create<Unit>()

val MAX_COUNTDOWN_TIME = 20L

processor
  .switchMap {
    Flowable.interval(1L, TimeUnit.SECONDS)
      .map { MAX_COUNTDOWN_TIME - it }
      .takeWhile { it >= 0 }
  }
Then, when Flowable reaches 0, we should enable the button, but when it is not 0, then we should disable the button and display the countdown.
In this section, I'm going to use it to document what I learn about publish, refCount, replay, and autoConnect. Stay tuned!

Comments

Popular posts from this blog

First-Principles Derivation of A Bank

A Play-by-play of the Mirai botnet source code - Part 1 (scanner.c)

You can control individual packets using WinDivert!