Recently I was working on adding a feature to the RxQ library that required me to use an RxJS operator that I hadn’t leveraged before: groupBy. In the library, API responses are received in a single stream of responses. In the new feature added, instead of receiving the values back in my response stream, I receive instructions for updating previously received values instead. These instructions describe how a previously seen value has changed so that I can update it locally, rather than getting a completely new value each time.
In order to make the response stream useful, I needed to apply these changes to the right values over time. This required having some fun with the groupBy operator. Let’s illustrate the concept with a simpler example.
A Game of Chance
Imagine you have 3 friends named Sally, Joe, and Sam that want to play a game. In this game, they will take turns rolling a 6-sided dice. Each individual gets points for the number that they roll. These points add up with each roll. The first person to get to 21 points wins.
This is where you come into the picture: your friends find you trustworthy enough to give you the task of scorekeeper. It’s now your job to tally up the scores until a winner emerges. Let’s see how Rx logic can be applied to solve this problem.
Observing the Gameplay
In the game, each player takes a turn rolling the dice. This roll produces a number of points for the rolling player. We can model this sequence of events as a stream over time, recording both the rolled value and the name of the player who rolled it:
Grouping the Dice Rolls by Player
This stream of rolls is not enough to keep score properly. We need to be able to keep track of the history of the rolls so that you can add their values up over time. However, each player earns points for themselves, so what you really need is to keep track of separate histories per player.
Enter the groupBy operator!
groupBy is an operator that takes a stream of values and collects the values into separate streams. These separate streams are determined using an attribute of the values. In our scorekeeping scenario, we need a separate stream of rolls for each player, so we will use our player’s name as the attribute for creating the separate streams.
In our diagram above, we can see that we now have a stream that carries inner streams within it. This is known as a Higher Order Observable, as in an Observable of Observables.
Adding Up the Player Scores
Now that we’ve created separate streams for each player, we can apply transformations to those streams to add up the scores. We want to take each individual stream of raw player rolls and transform them into streams of running totals of player rolls. This means we’ll need the map operator to turn the original streams into our new streams. Within that map function, we can use the scan operator on each individual player stream to add up their values over time.
Collapsing the Running Totals
At this point, having a stream of streams is no longer useful to us. It’d be better if we could put all of the running totals back into a single stream that we can watch as the game progresses. This task can be accomplished with the mergeAll operator, which will merge all of the values from the inner streams into a single output stream.
Finding the Winner
With our running totals calculated, we can announce a winner when the times comes. We’ll accomplish this with the first operator. first allows us to enter a function that is used to test each value from a stream. The first value that passes the test is passed on and then the stream is completed.
Reporting the Scores as the Game Progresses
Just announcing the winner at the end of the game isn’t going to satisfy your friends. They want to know the scores as the game is in play. Let’s take values from the running total stream until the winner is found using the takeWhile operator. Then we can merge that stream with the winner stream to get a complete recap of the game play.
We now have a stream of the player scores until the game ends!
Coding it with RxJS
You can play with the example game above and the RxJS code behind it in this live sandbox. You’ll find the following code:
// Stream of dice rolls
const diceRolls$ = interval(1000).pipe(
map(i => ({
player: ["Sally", "Joe", "Sam"][i % 3],
roll: Math.ceil(Math.random()*6)
})),
share()
);
// Running Scores
const runningScores$ = diceRolls$.pipe(
// Group the rolls by plaer
groupBy(diceRoll => diceRoll.player),
// Transform the player roll streams into player score streams
map(playerRoll$ => playerRoll$.pipe(
// Scan over the player values and add them together over time
scan((playerScore, playerRoll) => ({
player: playerRoll.player,
roll: playerRoll.roll,
score: playerScore.score + playerRoll.roll
}), {
score: 0
})
)),
// Merge the streams back into 1 stream
mergeAll()
)
// Find the winner from the stream of running scores
const winner$ = runningScores$.pipe(
first(playerScore => playerScore.score >= 21)
);
// Merge the running scores and the winner together while the game is ongoing
const scores$ = merge(
runningScores$.pipe(
takeWhile(playerScore => playerScore.score < 21)
),
winner$
);