By Ivan Morgillo, Sasa Sekulic and Fabrizio Chignoli

This article has been excerpted from Grokking ReactiveX.

After a life of imperative programming, switching to a reactive mindset could seem challenging:

“Why should I give up all the control and bend to this <data are in control> thing?”

Let’s look at it in an everyday life scenario: the air conditioning system. Today is a hot day and we want to start the AC system when the temperature in the room reaches 77F. We need a thermometer.


Well, we need a Thermometer class that provides a temperature() method to obtain the current temperature in the room.

Once we know how to obtain the current temperature, we can combine it with our AirConditioning class.

This class provides three methods:

class AirConditioning {
start()
stop()
isRunning()
}

To change the status of the air conditioning system, we can use start() and stop(). If we want to check if the status change was successful, isRunning() gives us info about the current status.

Now that we know the current temperature with temperature() and we can turn on the Air Conditioning system with start(), we can create an AirConManager class with a basic monitor() method that will contain the logic needed to retrieve the current temperature, evaluate a condition and execute the proper action:

fun monitor() {
while (true) {
if (thermometer.temperature() >= 77 && !ac.isRunning()) {
ac.start()
break;
}
}
}

The current implementation of the monitor() function is almost offensive and I will understand if your eyes start bleeding:

  • we have a while(true) loop to keep on evaluating the condition;
  • we have the if condition itself that retrieves the current temperature, checks if it’s lower than 77 degrees and if the Air Conditioning system is already on;
  • we have the action block that starts the Air Conditioning system and ends the loop.

So far, we don’t have any special requirements and we don’t care about details like stopping the system or restarting the loop if the temperature drops.

This function could be refactored and polished in a few ways, but the key point will still be:

Constantly checking the temperature and deciding what to do accordingly


How could we migrate to a reactive scenario?

In an Rx oriented scenario, our thermometer would be an observable thermometer, represented by an ObservableThermometer class.

The ObservableThermomenter class will provide a single method: rxTemperature(). If we subscribe to this Observable we’ll receive the current temperature every time it changes:

fun rxTemperature() : Observable<Int> {
[...]
}

We can go back to our AirConManager and refactor it to handle out new reactive thermometer.

AirConManager can subscribe to ObservableThermometer in its monitor() method and receive a new temperature value:

void monitor() {
rxTemperature()
.subscribe(temperature -> {
if (temperature >= 77 &&!ac.isRunning()) {
ac.start()
}
})
}

Note: This .subscribe() method has a Subscriber implementing the onNext() method as a lambda expression. Subscribers usually have three methods, but if you don’t need to be notified when the stream completes or you don’t want to manage possible error scenarios, you can avoid implementing onCompleted() and onError().

The moment we subscribe, the data will start coming to us: the onNext() will receive a temperature value, we’ll evaluate it and act accordingly.

“Wait a second! All this fuss to save me a while loop?”

One of the key principles of reactive programming is that it is all about data being in charge, being able to create the perfect data stream you need and working with the stream that is available.

Let me introduce you to the .filter() operator!

Push vs Pull: create the flow you need

In our scenario, we don’t need a temperature value every time it changes. We just need to know when the temperature reaches 77F to start the Air-conditioning system. Rx is all about flowing data, coming to us to be used, filtered and transformed.

Let’s improve our monitor() method to create the sequence we need by filtering out all the temperature values we don’t need:

fun monitor() {
rxTemperature()
.filter(!ac.isRunning)
.filter(temperature >= 77)
.subscribe(temperature -> ac.start())}

As you can see, filter() evaluates a Predicate and propagates the item only if the condition is satisfied.

We were supposed to receive a new temperature value in our onNext() every second, but what we wanted was instead to get into the game only if the Air Conditioning system was off and the temperature in the room was higher than 77F. We used filter() to filter out all those scenarios we weren’t interested in and receive only the values that match our use case. We created the perfect sequence for the task at hand from an initial generic sequence that we can’t control.

This is the power of the Push approach: data coming to us. We can focus on using the data instead of retrieving and managing them.

If you’re interested in learning more, download the free first chapter of Grokking ReactiveX and see this Slideshare presentation for more details and a discount code.

If you enjoyed this article, please click the 💚 below and spread the Rx love 😊