
# Purify Your Tests Episode IV: The Monoids Strike Back
Welcome back to yet another episode of test purification. In the last part we explored another benefit of adding type parameters to our code, the ability to work with very lean mocks. It's been a while since the last part, so please take a look there to refresh your memory.
In this part, rather than further improving our test code, we are going to see how to leverage our newly minted type parameters to improve our production code.
Onwards and forwards!
A Long Time Ago in a Galaxy Far, Far Away...
Recall that last time we ended up with the following code1:
class UberService[UserID, UserData, EnrichedUserData, Bookkept, Stored]( fetcher: Fetcher[UserID, UserData], enricher: Enricher[UserID, UserData, EnrichedUserData], bookkeeper: Bookkeeper[Bookkept, UserData, EnrichedUserData], storage: Storage[Bookkept, Stored, EnrichedUserData]):
def fetchAndStore(user: UserID): Stored = val data = fetcher.fetch(user) val enriched = enricher.enrich(user, data)
val bookkept = bookkeeper.bookkeep(data, enriched) val stored = storage.store(bookkept, enriched)
stored
With a glorious sum total of 5 parameters, we managed to make this code highly testable with stateless mocks and maximally lean input data. Those were the days...
Unfortunately, since then, someone in management discovered that what we so bravely called UberService
is actually a glorified ETL job. And so, management decreed, our service was demoted to be a batch job. Oh well... Might as well use this opportunity to refactor our code and improve it, that's the thing us developers love doing the most.
Stream Data, Stream...
As a batch job we now need to handle multiple users all at once. To make this manageable and fun, we'll use streaming to get the data into our process.
The new Fetcer
now looks like this:
trait Fetcher: def fetch: Stream[UserData]
Instead of receiving a UserID
argument and returning data for a single user, we now produce a Stream
2 of UserData
. This is the all data that we need to process in our current run.
It also turned out that nobody was ever looking at the bookkeeping data and so we decided to surreptitiously drop the bookkeeping logic from our new job 3.
Lastly, since we are in batch mode, it would be more efficient to store data in batches so as to not invoke the storage procedure on per entry basis. So our data storage now looks like this4:
trait Storage: def storeBatch(data: List[EnrichedUserData]): Unit
With all those requirements in place, we can now write our new job:
class UberJob(fetcher: Fetcher, enricher: Enricher, storage: Storage): // 1
def fetchAndStore: Unit = fetcher .fetch // 2 .map(enricher.enrich) // 3 .grouped(5) // 4 .foreach(storage.storeBatch) // 5 .to(Sink.discard) // 6
- We have a new class appropriately called
UberJob
(1). - For now we forget all the lessons we learned about type parameters, and write the code without them.
- The streaming pipeline now starts from
fetch
ing (2). - The data is enriched in a streaming fashion using
map
(3). - We seamlessly create batches using the streaming
grouped
operator (4). - Which we store with
storeBatch
, since this is a pure side-effect we useforeach
rather thanmap
(5). - To actually run the stream we use the
to
operator, which "pulls" the stream into the provided sink (6). - The argument to
to
is aSink[In, Out]
which is a sink that consumes entries of typeIn
and produces a single result of typeOut
. - In this case, since we are only running the stream for its side-effects, the sink is
Sink[Unit, Unit]
, which discards the stream's (Unit
) output5.
Although I promised you that we'll be improving the production code using type parameters, let's start first by asking how testable is this code? A good question to ask, since it's often the case that testable code happens to be better in ways that are not directly related to testing6.
Tests, Tests Everywhere
Well, this code is at least as bad as the original code in the very first post in the series. Since all the flow is producing is Unit
the only way to test the logic here is to use some form of mutable mocks. But just like in the first post, we can try to improve testability by (ab)using type parameters, which we will scatter liberally all through the code.
First the helper traits:
trait Fetcher[UserData]: def fetch: Stream[UserData]
trait Enricher[UserData, EnrichedUserData]: def enrich(data: UserData): EnrichedUserData
trait Storage[EnrichedUserData, Stored]: def storeBatch(data: List[EnrichedUserData]): Stored
We now have a type-parameter for every piece of data involved in the flow. Learning the lessons from the previous parts, this should mean that we can avoid mutable mocks (to inspect the outputs of each step) and can use lean fake data (for the inputs of the test).
Note the Stored
type parameter that Storage
now has. This stands for the Unit
return type that storeBatch
used to have, and should help us in reflecting the flow of data in the test, where we can choose Stored
to be something more informative than Unit
.
This leads us to this second version of UberJob
:
class UberJob[UserData, EnrichedUserData, Stored]( // 1 fetcher: Fetcher[UserData], enricher: Enricher[UserData, EnrichedUserData], storage: Storage[EnrichedUserData, Stored]):
def fetchAndStore: Unit = fetcher .fetch .map(enricher.enrich) .grouped(5) .map(storage.storeBatch) // 2 .to(Sink.discard)
The new code is fairly similar to the one we started with. We use a multitude of type parameters on UberJob
(1). But the only real difference is that we use map
rather than foreach
when calling storeBatch
(2), since the result is a non-Unit
value.
The promise of all these fake type parameters is better testability. Is this what we got here?
Well, not quite. We still have Sink.discard
stuck at the very end of our flow. Meaning that the output of fetchAndStore
is still Unit
, and is as untestable as ever7.
In the non-streaming variant we could return a single Stored
value to reflect everything we wanted to know about the flow in the test. But in the streaming version we have (potentially) more or less than one Stored
value, one per every batch stored. How do we reflect that in the final output?
One way to achieve this would be to swap out the Sink
we are using in favor of Sink.toList
8, a sink that accumulates all of the stream's output: Sink[A, List[A]]
. If we choose the output of storeBatch
to be List[X]
, where X
contains all the data about a single processed entry9, we'll get the following signature:
def fetchAndStore: List[List[X]]
This solves the testing problems, we can get all the info we need for the test by embedding it inside the List[List[X]]
value, just like we did in previous parts. But this comes at the cost of modifying the production code in a noticeable way. Now, instead of returning Unit
we are going to return a List[Unit]
, needlessly allocating a (potentially) large list just to improve our tests. This is too high a price to pay for a bit of testability.
We can do better though.
Is There an Algebraic Abstraction in the Room?
The magic of using type parameters for testing stemmed from the fact that we can write the code once and instantiate the type parameters in two different ways. Once for testing and once in a way that's compatible with the original production code. That was parametric polymorphism in action.
In UberJob
things are different, not only do we need two different return types (Unit
vs. List[...]
), but we also need distinct behaviors, one for each return type (discarding results vs. result accumulation). The whole point of parametric polymorphism is that the code's behavior is independent of the choice of type parameters. And so parametric polymorphism doesn't seem to cut it for us now.
Not all is lost though. Luckily, there are other forms of polymorphism we can take advantage of. Namely ad hoc polymorphism, and more specifically typeclasses, which allow us to choose different behaviors based on the specific instantiation of a type parameter.
Our task now is to find an appropriate typeclass and concrete types that capture both our production use case and the test use case. Let's try to "derive" the typeclass we need from the requirements that we have. Step by step:
- Let's call the input to our final
Sink
In
and the final outputOut
, i.e., we are working withSink[In, Out]
. - In the production use case the requirement is that:
type In = Unit
- the result of thestoreBatch
in production.type Out = Unit
- the whole process is a side effect, so we don't keep anything in memory from it.
- Conclusion from
2
: in the general caseIn =:= Out
10, we are working withSink[Unit, Unit]
. - In the test use case we have:
- For
storeBatch
we chooseStored
to beList[X]
- From
3
we know thattype In = Out = List[X]
- For
- Conclusion from
4
: we need aSink[List[X], List[X]]
. - The requirements for this
Sink
are:- We need to be able to handle the empty case, if there's no data in the stream, the result should be
List.empty
. - To be able to assert on all the relevant data, we want to combine all the individual
List[X]
values that we get11, we can use++
for that.
- We need to be able to handle the empty case, if there's no data in the stream, the result should be
- Ergo, we need a
Sink
that supports the following:def mysterySink(empty: List[X])(combine: (List[X], List[X]) => List[X]): Sink[List[X], List[X]] - This signature has a very familiar shape. If we generalize this signature to arbitrary types, this looks exactly like
fold
12:def fold[A](empty: A)(combine: (A, A) => A): Sink[A, A] - In the test this can be called as:
Sink.fold(List.empty[X])(_ ++ _)
- The generic
Sink.fold
can be used withUnit
as well, we just need to choose the appropriate implementations forempty
andcombine
:def empty: Unit = ()def combine(x: Unit, y: Unit): Unit = () - The types match, we still have a
Sink[Unit, Unit]
, but we also avoid in-memory accumulation when running the production code, we just returnUnit
on every step. - To be (ad hoc) polymorphic over our inputs we need to support only two functions,
empty
andcombine
, since these are the only requirements for usingSink.fold
. - Which brings us to the following typeclass:
trait MysteryTypeclass[A]:def empty: Adef combine(x: A, y: A): A
- If you've ever been exposed to functional programming, this should ring a bell, we just rediscovered
Monoid
13:trait Monoid[A]:def empty: Adef combine(x: A, y: A): A - Both
List[X]
andUnit
have aMonoid
implementation, one withList.empty
and++
and the other is the trivialMonoid
with the code from10
. - We can use
Sink.fold
with any type that has aMonoid
instance:def foldMonoid[A: Monoid] = Sink.fold(A.empty)(_ combine _)
And so, using this rather lengthy step by step derivation, we conclude that we can make our code (ad hoc) polymorphic by requiring a Monoid
instance on the Stored
parameter:
class UberJob[UserData, EnrichedUserData, Stored: Monoid]( // 1 fetcher: Fetcher[UserData], enricher: Enricher[UserData, EnrichedUserData], storage: Storage[EnrichedUserData, Stored]):
def fetchAndStore: Stored = // 2 fetcher .fetch .map(enricher.enrich) .grouped(5) .map(storage.storeBatch) .to(summarize) // 3
def summarize: Sink[Stored, Stored] = Sink.foldMonoid[Stored] // 4
This is the final version of UberJob
:
- We now have a
Monoid
constraint on theStored
type parameter (1). - Meaning that we can run this code with any concrete instantiation of
Stored
as long as it has aMonoid
implementation. The code will adapt its behavior accordingly. - Instead of
Sink.discard
we now callsummarize
(3), which is just (a slightly more fluent) way to callSink.foldMonoid
(4), which compiles thanks to theMonoid
constraint. - Now the return type of
fetchAndStore
is no longerUnit
but ratherStored
(2), a value we can use in a test to collect data without affecting the production logic withUnit
.
The promise of ad hoc polymorphism has been fulfilled, with the same code we get different behaviors based on types, matching the requirements of both the production and test scenarios 14.
But Is It Actually Testable?
Yes!
We can now write a purely functional test using an appropriate selection of types and implementations.
To wit, we select the following types for the test:
type UserData = Stringtype EnrichedUserData = Stringtype Stored = List[String]
Recall that using type parameters for the inputs lets us use "lean mocks", that is, we can choose whatever we want to be our inputs. In this case we choose everything to be String
. We will augment the contents of the strings as we move between UserData
to EnrichedUserData
, finally reaching Stored
which becomes a List[String]
due to working with batches. Notably, the choice of type Stored = List[String]
fulfills the requirement of having a Monoid
instance.
Next, we implement the corresponding mocks:
object TestFetcher extends Fetcher[UserData]: def fetch: Stream[UserData] = Stream("data1", "data2", "data3") // 1
object TestEnricher extends Enricher[UserData, EnrichedUserData]: def enrich(data: UserData): EnrichedUserData = s"enriched: $data" // 2
object TestStorage extends Storage[EnrichedUserData, Stored]: def storeBatch(data: List[EnrichedUserData]): List[String] = data.map("stored: " + _) // 3
We produce some mock data in a stream in the Fetcher
(1). We augment the string with the word enriched
in the Enricher
(2). And finally, for each entry in a batch, Storage
marks it with the word stored
.
Our mocks are just pure functions working with lean data, just like we learned to use type parameters in the previous installments.
Lastly, we have the actual test:
"The uber-service" should: "fetch the user data, enrich it, and store the results" in: val service = UberJob(TestFetcher, TestEnricher, TestStorage) // 1
val result = service.fetchAndStore // 2
result shouldBe List( // 3 "stored: enriched: data1", "stored: enriched: data2", "stored: enriched: data3")
- We initialize a new
UberJob
with our mocks (1). - We run
fetchAndStore
producing aList[String]
result (2) - We assert that the
List
contains the correct entries (3) - Each entry contains traces of going through all the steps in the flow: enrichment and storage.
This test is once again a pure function15, taking inputs and producing deterministic outputs, thanks to the Monoid
instance in the background that accumulates results for us16 17.
The production code can still work just as before:
val service = UberJob(Fetcher.Default, Enricher.Default, Storage.Default)
service.fetchAndStore // produces `Unit`
The Monoid[Unit]
instance doesn't accumulate anything, and fetcheAndStore
only performs side-effects ending with a single Unit
value.
This is all great, but somewhere at the beginning I promised you that the gratuitous addition of type parameters will improve our production code, not just tests. What was that all about?
A Product Manager Steps In...
And tells that we that our clients demand observability. More concretely, the data we're dealing with looks like this:
case class EnrichedUserData( userId: String, registered: Boolean, friends: List[String])
Our clients want to know:
- How many users we processed
- How many of them were registered
- What is the average number of friends per user in the current run
That's a lot of data that we need to collect. How shall we approach this?
One thing we could do is to scatter some mutable variables in the flow and accumulate the statistics into them. But that would be iffy, because mutable variables suck18. And also, who wants to change code due to spurious product demands? Especially with such nice and clean looking code, polluting it with secondary concerns such as logging is unwarranted. Add to that the fact that we'll also have to modify the test we just written.
Lucky for us, our code is written generically for any instance of a monoid, and monoids are all about data accumulation (namely the combine
function). If we can choose the appropriate monoid for the Stored
type parameter we will get data accumulation for free. No need to modify fetchAndStore
, nor will we need to change any of the existing tests.
Our first task is to come up with a type that will be accumulate all the data specified in the requirements. Here it is:
case class Stats(count: Int, registered: Int, friends: Int): // 1 def averageFriends = friends.toDouble / count // 2
Here we have one field per a piece of data we want to count (1). Additionally we have a method to compute the average number of friends given the totals in the fields (2)19.
The next step is to provide a monoid instance for this type. For that we can just rely on the fact that integers form a monoid under addition, and a 0
as the empty value:
given Monoid[Stats]: def empty = Stats(0, 0, 0)
def combine(x: Stats, y: Stats): Stats = Stats(x.count + y.count, x.registered + y.registered, x.friends + y.friends)
We also need a way to collect a summary from an EnrichedUserData
instance:
def fromUserData(data: EnrichedUserData): Stats = Stats( count = 1, // 1 registered = if data.registered then 1 else 0, // 2 friends = data.friends.length) // 3
- Each entry counts for
1
in the summary, when we combineStats
instances we'll eventually get the full count of entries (1). - We only add to the
registered
field if the current user is actually registered (2). - For every entry we accumulate the sum total of friends, ignoring the actual values (3).
Lastly, since we operate in batches, we'll actually need to create a Stats
instance per batch:
def fromBatch(data: List[EnrichedUserData]): Stats = Monoid.foldMap(data)(fromUserData)
Here foldMap
is a generic monoid function20 that converts each EnrichedUserData
list entry into a Stats
instance, and then accumulates the results into a single Stats
value using combine
.
With all that in place we can participate in the monoids game we have going on here. The only thing that's missing is an actual Storage
that produces Stats
values instead of Unit
.
Here it is:
object WithStats extends Storage[EnrichedUserData, Stats]: // 1 def storeBatch(data: List[EnrichedUserData]): Stats = // 2 println(s"Storing: [${data.map(_.userId).mkString(", ")}]") // 3
Stats.fromBatch(data) // 4
- This implementation chooses
Stats
for its output, which is compatible with the contract ofUberJob
as we defined aMonoid
instance forStats
(1). - Accordingly, the output of
storeBatch
is nowStats
(2). - For demonstration purposes we simulate data storage with an informative
println
that shows us the currently stored batch (3). - And then we summarize the current batch into a single
Stats
value (4).
Done! We have all the pieces in place to produce statistics about our job.
Here's a sample run:
val service = UberJob( // 1 Fetcher.Default, Enricher.Default, Storage.WithStats) // 2
val stats = service.fetchAndStore // 3
println("---- Done ----")println(stats) // 4
- In this very simplified running script we instantiate an
UberJob
(1). - We swap the default
Storage
instance with the newWithStats
implementation (2). - Then we run as usual, but unlike the original production code, the result is no longer
Unit
, but rather aStats
value (3). - We can further process the stats for the run, by e.g., sending them to some monitoring dashboard. Here we just print the stats to the console (4).
A sample output for running this code might look like the following21:
Storing: [a, b, c, d, e]Storing: [f, g, h, i, j]Storing: [k]---- Done ----Stats(count = 11, registered = 6, friends = 31, avgFriends = 2.8181818181818183)
We can see here how each batch is being stored, and at the very end we have all the statistics that we required.
Notice what we didn't have to do. We didn't further modify the code of UberJob
, nor did we have to modify the test we've written. Everything just works due to the code being parameterized with the Monoid
constraint, and the Monoid
implementation for Stats
doing the right thing22.
Conclusion
Adding a type parameter gained us flexibility that turns out to be useful beyond just tests. The type parameter revealed an essential truth about our code, a truth that we can use as requirements evolve. In this case the truth we got is about the accumulating nature of our flow. Constraining with different typeclasses can reveal different capabilities of the code.
The way we got to this code structure is also interesting. All we did was to try and improve the testability of the code, but what we got in return is more flexible and maintainable flow. This points to the even more general idea that it can be sometimes useful to think about code in a context that is more general then the one being currently solved for. Using type parameters is one (very powerful) way that we can remove code from its specific context. Even if you don't end up using type parameters in your final code, just thinking about code that way can also be beneficial.
Although the transformation to using type parameters was a bit more involved this time, with some practice you too will be able to spot those patterns and gain the benefits of parametric code.
That's it for this time. If you come up with any other benefits of using type parameters for testability, please reach out.
Happy parametrizing!
Footnotes
-
I'm not specifying the particular streaming library that we are using here. The example code can be adapted to any reasonable streaming library. Although the code below looks a bit like (a blocking version of) Akka Streams, the ideas will apply to any library that uses explicit effects (like fs2 and ZIO Streams). You can see the concrete (toy) implementation here ↩
-
It's almost as if it was there only for pedagogic reasons to begin with. ↩
-
Seeing the way we use streaming in the code that follows, it could be argued that we can leverage streaming abstractions to create a streaming version of the
Storage
trait that can hide away the batching logic as an implementation detail, while only exposing an interface that receives entries one by one. Streaming is indeed quite a powerful code structuring method. We won't be doing that here so as to not get too bogged down in streaming mechanics. ↩ -
Notice how clean streaming makes this code, imagine doing this kind of batching "by hand". There are many benefits to using streaming (like how easy it would be to parallelize this code), but that's a topic for another post. ↩
-
Surprising exactly no one... ↩
-
Without mutable mocks that is. ↩
-
We could take another approach where the final
Sink
is passed in as a parameter toUberJob
. But doing that will still force us to use a mutable mock for the test sink, one that mutably accumulates results. So we don't count that as a solution to the problem that we posed. ↩ -
E.g., in the tests we can choose
X
to beString
. ↩ -
The production case constrains the general case. ↩
-
That's why we had
List[List[X]]
before. ↩ -
The actual signature in the repo is slightly more general than this, more similar to
foldLeft[A, B]
↩ -
The actual code in the repo uses a slightly different syntax, using the currently experimental "better support for typeclasses" and support for
infix
operations, but the essence is still the same. ↩ -
It could be argued that we are still paying a bit of a tax for testability as we are needlessly "combining" unit values, something that wasn't happening in the original code. Although I don't think it really matters for any practical purposes, we can try to tackle this by parametrizing over the final
Sink
. Once passing aSink.discard
and once passing inSink.foldMonoid
. That might be okay, but would mean that we are not testing an important feature of the production flow: that the logic ofSink.discard
does the right thing for our purposes. This is not an issue in the approach described in the main text as the code uses the same sink in both cases. And if you keep on reading we'll see another reason to favor theMonoid
approach later on. ↩ -
If we go a step further and parameterize over the choice of streaming type, then given we don't use any too advanced streaming tools, we can choose
List
to stand for the stream in the test. Making the whole test a pure function over lists, which is something that's even easier to write tests for. ↩ -
Food for thought: our code is using monoids, and the test code is relying on lists, which are the "free monoid". It seems that more generally free structures are useful for testing. ↩
-
The eagle-eyed readers amongst you might notice that this doesn't check that batching was properly performed. I leave it as an exercise for the reader to choose the appropriate
Monoid
instance that can check that and amend the test accordingly. ↩ -
Although not specifically relevant to the current example, but if we add concurrency into the flow (like with some kind
mapAsync
), things get even more complicated when we mix mutability with concurrency. ↩ -
It's an unfortunate fact of life that the averaging operation is not itself a monoidal action. We cannot, in general, combine two averages two produce a new average value. ↩
-
One of the biggest advantages of using generic abstractions such as monoids in our code is that we gain access to a wealth of library functions that work with these abstractions, for free. [^The collection "posts" does not exist or is empty. Please check your content config file for errors.] ↩
-
If you're following along with the repo, you can run this yourself with the command:
↩Terminal window scala-cli run . --main-class ncreep.streaming_purified.runUberJobStats -
Of course one would have to write tests for the new code we just written (the monoid instance and the new
Storage
implementation). But that's the beauty of the modularity that we gained: the new tests are completely self-contained and only need to deal with the new code. ↩