# Purify Your Tests Episode IV: The Monoids Strike Back
Welcome back to yet another episode of test purification. In the last part we explored another benefit of adding type parameters to our code, the ability to work with very lean mocks. It's been a while since the last part, so please take a look there to refresh your memory.
In this part, rather than further improving our test code, we are going to see how to leverage our newly minted type parameters to improve our production code.
Onwards and forwards!
A Long Time Ago in a Galaxy Far, Far Away...
Recall that last time we ended up with the following code1:
class UberService[UserID, UserData, EnrichedUserData, Bookkept, Stored]( fetcher: Fetcher[UserID, UserData], enricher: Enricher[UserID, UserData, EnrichedUserData], bookkeeper: Bookkeeper[Bookkept, UserData, EnrichedUserData], storage: Storage[Bookkept, Stored, EnrichedUserData]):
def fetchAndStore(user: UserID): Stored = val data = fetcher.fetch(user) val enriched = enricher.enrich(user, data)
val bookkept = bookkeeper.bookkeep(data, enriched) val stored = storage.store(bookkept, enriched)
storedWith a glorious sum total of 5 parameters, we managed to make this code highly testable with stateless mocks and maximally lean input data. Those were the days...
Unfortunately, since then, someone in management discovered that what we so bravely called UberService is actually a glorified ETL job. And so, management decreed, our service was demoted to be a batch job. Oh well... Might as well use this opportunity to refactor our code and improve it, that's the thing us developers love doing the most.
Stream Data, Stream...
As a batch job we now need to handle multiple users all at once. To make this manageable and fun, we'll use streaming to get the data into our process.
The new Fetcer now looks like this:
trait Fetcher: def fetch: Stream[UserData]Instead of receiving a UserID argument and returning data for a single user, we now produce a Stream2 of UserData. This is the all data that we need to process in our current run.
It also turned out that nobody was ever looking at the bookkeeping data and so we decided to surreptitiously drop the bookkeeping logic from our new job 3.
Lastly, since we are in batch mode, it would be more efficient to store data in batches so as to not invoke the storage procedure on per entry basis. So our data storage now looks like this4:
trait Storage: def storeBatch(data: List[EnrichedUserData]): UnitWith all those requirements in place, we can now write our new job:
class UberJob(fetcher: Fetcher, enricher: Enricher, storage: Storage): // 1
def fetchAndStore: Unit = fetcher .fetch // 2 .map(enricher.enrich) // 3 .grouped(5) // 4 .foreach(storage.storeBatch) // 5 .to(Sink.discard) // 6- We have a new class appropriately called
UberJob(1). - For now we forget all the lessons we learned about type parameters, and write the code without them.
- The streaming pipeline now starts from
fetching (2). - The data is enriched in a streaming fashion using
map(3). - We seamlessly create batches using the streaming
groupedoperator (4). - Which we store with
storeBatch, since this is a pure side-effect we useforeachrather thanmap(5). - To actually run the stream we use the
tooperator, which "pulls" the stream into the provided sink (6). - The argument to
tois aSink[In, Out]which is a sink that consumes entries of typeInand produces a single result of typeOut. - In this case, since we are only running the stream for its side-effects, the sink is
Sink[Unit, Unit], which discards the stream's (Unit) output5.
Although I promised you that we'll be improving the production code using type parameters, let's start first by asking how testable is this code? A good question to ask, since it's often the case that testable code happens to be better in ways that are not directly related to testing6.
Tests, Tests Everywhere
Well, this code is at least as bad as the original code in the very first post in the series. Since all the flow is producing is Unit the only way to test the logic here is to use some form of mutable mocks. But just like in the first post, we can try to improve testability by (ab)using type parameters, which we will scatter liberally all through the code.
First the helper traits:
trait Fetcher[UserData]: def fetch: Stream[UserData]
trait Enricher[UserData, EnrichedUserData]: def enrich(data: UserData): EnrichedUserData
trait Storage[EnrichedUserData, Stored]: def storeBatch(data: List[EnrichedUserData]): StoredWe now have a type-parameter for every piece of data involved in the flow. Learning the lessons from the previous parts, this should mean that we can avoid mutable mocks (to inspect the outputs of each step) and can use lean fake data (for the inputs of the test).
Note the Stored type parameter that Storage now has. This stands for the Unit return type that storeBatch used to have, and should help us in reflecting the flow of data in the test, where we can choose Stored to be something more informative than Unit.
This leads us to this second version of UberJob:
class UberJob[UserData, EnrichedUserData, Stored]( // 1 fetcher: Fetcher[UserData], enricher: Enricher[UserData, EnrichedUserData], storage: Storage[EnrichedUserData, Stored]):
def fetchAndStore: Unit = fetcher .fetch .map(enricher.enrich) .grouped(5) .map(storage.storeBatch) // 2 .to(Sink.discard)The new code is fairly similar to the one we started with. We use a multitude of type parameters on UberJob (1). But the only real difference is that we use map rather than foreach when calling storeBatch (2), since the result is a non-Unit value.
The promise of all these fake type parameters is better testability. Is this what we got here?
Well, not quite. We still have Sink.discard stuck at the very end of our flow. Meaning that the output of fetchAndStore is still Unit, and is as untestable as ever7.
In the non-streaming variant we could return a single Stored value to reflect everything we wanted to know about the flow in the test. But in the streaming version we have (potentially) more or less than one Stored value, one per every batch stored. How do we reflect that in the final output?
One way to achieve this would be to swap out the Sink we are using in favor of Sink.toList8, a sink that accumulates all of the stream's output: Sink[A, List[A]]. If we choose the output of storeBatch to be List[X], where X contains all the data about a single processed entry9, we'll get the following signature:
def fetchAndStore: List[List[X]]This solves the testing problems, we can get all the info we need for the test by embedding it inside the List[List[X]] value, just like we did in previous parts. But this comes at the cost of modifying the production code in a noticeable way. Now, instead of returning Unit we are going to return a List[Unit], needlessly allocating a (potentially) large list just to improve our tests. This is too high a price to pay for a bit of testability.
We can do better though.
Is There an Algebraic Abstraction in the Room?
The magic of using type parameters for testing stemmed from the fact that we can write the code once and instantiate the type parameters in two different ways. Once for testing and once in a way that's compatible with the original production code. That was parametric polymorphism in action.
In UberJob things are different, not only do we need two different return types (Unit vs. List[...]), but we also need distinct behaviors, one for each return type (discarding results vs. result accumulation). The whole point of parametric polymorphism is that the code's behavior is independent of the choice of type parameters. And so parametric polymorphism doesn't seem to cut it for us now.
Not all is lost though. Luckily, there are other forms of polymorphism we can take advantage of. Namely ad hoc polymorphism, and more specifically typeclasses, which allow us to choose different behaviors based on the specific instantiation of a type parameter.
Our task now is to find an appropriate typeclass and concrete types that capture both our production use case and the test use case. Let's try to "derive" the typeclass we need from the requirements that we have. Step by step:
- Let's call the input to our final
SinkInand the final outputOut, i.e., we are working withSink[In, Out]. - In the production use case the requirement is that:
type In = Unit- the result of thestoreBatchin production.type Out = Unit- the whole process is a side effect, so we don't keep anything in memory from it.
- Conclusion from
2: in the general caseIn =:= Out10, we are working withSink[Unit, Unit]. - In the test use case we have:
- For
storeBatchwe chooseStoredto beList[X] - From
3we know thattype In = Out = List[X]
- For
- Conclusion from
4: we need aSink[List[X], List[X]]. - The requirements for this
Sinkare:- We need to be able to handle the empty case, if there's no data in the stream, the result should be
List.empty. - To be able to assert on all the relevant data, we want to combine all the individual
List[X]values that we get11, we can use++for that.
- We need to be able to handle the empty case, if there's no data in the stream, the result should be
- Ergo, we need a
Sinkthat supports the following:def mysterySink(empty: List[X])(combine: (List[X], List[X]) => List[X]): Sink[List[X], List[X]] - This signature has a very familiar shape. If we generalize this signature to arbitrary types, this looks exactly like
fold12:def fold[A](empty: A)(combine: (A, A) => A): Sink[A, A] - In the test this can be called as:
Sink.fold(List.empty[X])(_ ++ _)
- The generic
Sink.foldcan be used withUnitas well, we just need to choose the appropriate implementations foremptyandcombine:def empty: Unit = ()def combine(x: Unit, y: Unit): Unit = () - The types match, we still have a
Sink[Unit, Unit], but we also avoid in-memory accumulation when running the production code, we just returnUniton every step. - To be (ad hoc) polymorphic over our inputs we need to support only two functions,
emptyandcombine, since these are the only requirements for usingSink.fold. - Which brings us to the following typeclass:
trait MysteryTypeclass[A]:def empty: Adef combine(x: A, y: A): A
- If you've ever been exposed to functional programming, this should ring a bell, we just rediscovered
Monoid13:trait Monoid[A]:def empty: Adef combine(x: A, y: A): A - Both
List[X]andUnithave aMonoidimplementation, one withList.emptyand++and the other is the trivialMonoidwith the code from10. - We can use
Sink.foldwith any type that has aMonoidinstance:def foldMonoid[A: Monoid] = Sink.fold(A.empty)(_ combine _)
And so, using this rather lengthy step by step derivation, we conclude that we can make our code (ad hoc) polymorphic by requiring a Monoid instance on the Stored parameter:
class UberJob[UserData, EnrichedUserData, Stored: Monoid]( // 1 fetcher: Fetcher[UserData], enricher: Enricher[UserData, EnrichedUserData], storage: Storage[EnrichedUserData, Stored]):
def fetchAndStore: Stored = // 2 fetcher .fetch .map(enricher.enrich) .grouped(5) .map(storage.storeBatch) .to(summarize) // 3
def summarize: Sink[Stored, Stored] = Sink.foldMonoid[Stored] // 4This is the final version of UberJob:
- We now have a
Monoidconstraint on theStoredtype parameter (1). - Meaning that we can run this code with any concrete instantiation of
Storedas long as it has aMonoidimplementation. The code will adapt its behavior accordingly. - Instead of
Sink.discardwe now callsummarize(3), which is just (a slightly more fluent) way to callSink.foldMonoid(4), which compiles thanks to theMonoidconstraint. - Now the return type of
fetchAndStoreis no longerUnitbut ratherStored(2), a value we can use in a test to collect data without affecting the production logic withUnit.
The promise of ad hoc polymorphism has been fulfilled, with the same code we get different behaviors based on types, matching the requirements of both the production and test scenarios 14.
But Is It Actually Testable?
Yes!
We can now write a purely functional test using an appropriate selection of types and implementations.
To wit, we select the following types for the test:
type UserData = Stringtype EnrichedUserData = Stringtype Stored = List[String]Recall that using type parameters for the inputs lets us use "lean mocks", that is, we can choose whatever we want to be our inputs. In this case we choose everything to be String. We will augment the contents of the strings as we move between UserData to EnrichedUserData, finally reaching Stored which becomes a List[String] due to working with batches. Notably, the choice of type Stored = List[String] fulfills the requirement of having a Monoid instance.
Next, we implement the corresponding mocks:
object TestFetcher extends Fetcher[UserData]: def fetch: Stream[UserData] = Stream("data1", "data2", "data3") // 1
object TestEnricher extends Enricher[UserData, EnrichedUserData]: def enrich(data: UserData): EnrichedUserData = s"enriched: $data" // 2
object TestStorage extends Storage[EnrichedUserData, Stored]: def storeBatch(data: List[EnrichedUserData]): List[String] = data.map("stored: " + _) // 3We produce some mock data in a stream in the Fetcher (1). We augment the string with the word enriched in the Enricher (2). And finally, for each entry in a batch, Storage marks it with the word stored.
Our mocks are just pure functions working with lean data, just like we learned to use type parameters in the previous installments.
Lastly, we have the actual test:
"The uber-service" should: "fetch the user data, enrich it, and store the results" in: val service = UberJob(TestFetcher, TestEnricher, TestStorage) // 1
val result = service.fetchAndStore // 2
result shouldBe List( // 3 "stored: enriched: data1", "stored: enriched: data2", "stored: enriched: data3")- We initialize a new
UberJobwith our mocks (1). - We run
fetchAndStoreproducing aList[String]result (2) - We assert that the
Listcontains the correct entries (3) - Each entry contains traces of going through all the steps in the flow: enrichment and storage.
This test is once again a pure function15, taking inputs and producing deterministic outputs, thanks to the Monoid instance in the background that accumulates results for us16 17.
The production code can still work just as before:
val service = UberJob(Fetcher.Default, Enricher.Default, Storage.Default)
service.fetchAndStore // produces `Unit`The Monoid[Unit] instance doesn't accumulate anything, and fetcheAndStore only performs side-effects ending with a single Unit value.
This is all great, but somewhere at the beginning I promised you that the gratuitous addition of type parameters will improve our production code, not just tests. What was that all about?
A Product Manager Steps In...
And tells that we that our clients demand observability. More concretely, the data we're dealing with looks like this:
case class EnrichedUserData( userId: String, registered: Boolean, friends: List[String])Our clients want to know:
- How many users we processed
- How many of them were registered
- What is the average number of friends per user in the current run
That's a lot of data that we need to collect. How shall we approach this?
One thing we could do is to scatter some mutable variables in the flow and accumulate the statistics into them. But that would be iffy, because mutable variables suck18. And also, who wants to change code due to spurious product demands? Especially with such nice and clean looking code, polluting it with secondary concerns such as logging is unwarranted. Add to that the fact that we'll also have to modify the test we just written.
Lucky for us, our code is written generically for any instance of a monoid, and monoids are all about data accumulation (namely the combine function). If we can choose the appropriate monoid for the Stored type parameter we will get data accumulation for free. No need to modify fetchAndStore, nor will we need to change any of the existing tests.
Our first task is to come up with a type that will be accumulate all the data specified in the requirements. Here it is:
case class Stats(count: Int, registered: Int, friends: Int): // 1 def averageFriends = friends.toDouble / count // 2Here we have one field per a piece of data we want to count (1). Additionally we have a method to compute the average number of friends given the totals in the fields (2)19.
The next step is to provide a monoid instance for this type. For that we can just rely on the fact that integers form a monoid under addition, and a 0 as the empty value:
given Monoid[Stats]: def empty = Stats(0, 0, 0)
def combine(x: Stats, y: Stats): Stats = Stats(x.count + y.count, x.registered + y.registered, x.friends + y.friends)We also need a way to collect a summary from an EnrichedUserData instance:
def fromUserData(data: EnrichedUserData): Stats = Stats( count = 1, // 1 registered = if data.registered then 1 else 0, // 2 friends = data.friends.length) // 3- Each entry counts for
1in the summary, when we combineStatsinstances we'll eventually get the full count of entries (1). - We only add to the
registeredfield if the current user is actually registered (2). - For every entry we accumulate the sum total of friends, ignoring the actual values (3).
Lastly, since we operate in batches, we'll actually need to create a Stats instance per batch:
def fromBatch(data: List[EnrichedUserData]): Stats = Monoid.foldMap(data)(fromUserData)Here foldMap is a generic monoid function20 that converts each EnrichedUserData list entry into a Stats instance, and then accumulates the results into a single Stats value using combine.
With all that in place we can participate in the monoids game we have going on here. The only thing that's missing is an actual Storage that produces Stats values instead of Unit.
Here it is:
object WithStats extends Storage[EnrichedUserData, Stats]: // 1 def storeBatch(data: List[EnrichedUserData]): Stats = // 2 println(s"Storing: [${data.map(_.userId).mkString(", ")}]") // 3
Stats.fromBatch(data) // 4- This implementation chooses
Statsfor its output, which is compatible with the contract ofUberJobas we defined aMonoidinstance forStats(1). - Accordingly, the output of
storeBatchis nowStats(2). - For demonstration purposes we simulate data storage with an informative
printlnthat shows us the currently stored batch (3). - And then we summarize the current batch into a single
Statsvalue (4).
Done! We have all the pieces in place to produce statistics about our job.
Here's a sample run:
val service = UberJob( // 1 Fetcher.Default, Enricher.Default, Storage.WithStats) // 2
val stats = service.fetchAndStore // 3
println("---- Done ----")println(stats) // 4- In this very simplified running script we instantiate an
UberJob(1). - We swap the default
Storageinstance with the newWithStatsimplementation (2). - Then we run as usual, but unlike the original production code, the result is no longer
Unit, but rather aStatsvalue (3). - We can further process the stats for the run, by e.g., sending them to some monitoring dashboard. Here we just print the stats to the console (4).
A sample output for running this code might look like the following21:
Storing: [a, b, c, d, e]Storing: [f, g, h, i, j]Storing: [k]---- Done ----Stats(count = 11, registered = 6, friends = 31, avgFriends = 2.8181818181818183)We can see here how each batch is being stored, and at the very end we have all the statistics that we required.
Notice what we didn't have to do. We didn't further modify the code of UberJob, nor did we have to modify the test we've written. Everything just works due to the code being parameterized with the Monoid constraint, and the Monoid implementation for Stats doing the right thing22.
Conclusion
Adding a type parameter gained us flexibility that turns out to be useful beyond just tests. The type parameter revealed an essential truth about our code, a truth that we can use as requirements evolve. In this case the truth we got is about the accumulating nature of our flow. Constraining with different typeclasses can reveal different capabilities of the code.
The way we got to this code structure is also interesting. All we did was to try and improve the testability of the code, but what we got in return is more flexible and maintainable flow. This points to the even more general idea that it can be sometimes useful to think about code in a context that is more general then the one being currently solved for. Using type parameters is one (very powerful) way that we can remove code from its specific context. Even if you don't end up using type parameters in your final code, just thinking about code that way can also be beneficial.
Although the transformation to using type parameters was a bit more involved this time, with some practice you too will be able to spot those patterns and gain the benefits of parametric code.
That's it for this time. If you come up with any other benefits of using type parameters for testability, please reach out.
Happy parametrizing!
Footnotes
-
I'm not specifying the particular streaming library that we are using here. The example code can be adapted to any reasonable streaming library. Although the code below looks a bit like (a blocking version of) Akka Streams, the ideas will apply to any library that uses explicit effects (like fs2 and ZIO Streams). You can see the concrete (toy) implementation here ↩
-
It's almost as if it was there only for pedagogic reasons to begin with. ↩
-
Seeing the way we use streaming in the code that follows, it could be argued that we can leverage streaming abstractions to create a streaming version of the
Storagetrait that can hide away the batching logic as an implementation detail, while only exposing an interface that receives entries one by one. Streaming is indeed quite a powerful code structuring method. We won't be doing that here so as to not get too bogged down in streaming mechanics. ↩ -
Notice how clean streaming makes this code, imagine doing this kind of batching "by hand". There are many benefits to using streaming (like how easy it would be to parallelize this code), but that's a topic for another post. ↩
-
Surprising exactly no one... ↩
-
Without mutable mocks that is. ↩
-
We could take another approach where the final
Sinkis passed in as a parameter toUberJob. But doing that will still force us to use a mutable mock for the test sink, one that mutably accumulates results. So we don't count that as a solution to the problem that we posed. ↩ -
E.g., in the tests we can choose
Xto beString. ↩ -
The production case constrains the general case. ↩
-
That's why we had
List[List[X]]before. ↩ -
The actual signature in the repo is slightly more general than this, more similar to
foldLeft[A, B]↩ -
The actual code in the repo uses a slightly different syntax, using the currently experimental "better support for typeclasses" and support for
infixoperations, but the essence is still the same. ↩ -
It could be argued that we are still paying a bit of a tax for testability as we are needlessly "combining" unit values, something that wasn't happening in the original code. Although I don't think it really matters for any practical purposes, we can try to tackle this by parametrizing over the final
Sink. Once passing aSink.discardand once passing inSink.foldMonoid. That might be okay, but would mean that we are not testing an important feature of the production flow: that the logic ofSink.discarddoes the right thing for our purposes. This is not an issue in the approach described in the main text as the code uses the same sink in both cases. And if you keep on reading we'll see another reason to favor theMonoidapproach later on. ↩ -
If we go a step further and parameterize over the choice of streaming type, then given we don't use any too advanced streaming tools, we can choose
Listto stand for the stream in the test. Making the whole test a pure function over lists, which is something that's even easier to write tests for. ↩ -
Food for thought: our code is using monoids, and the test code is relying on lists, which are the "free monoid". It seems that more generally free structures are useful for testing. ↩
-
The eagle-eyed readers amongst you might notice that this doesn't check that batching was properly performed. I leave it as an exercise for the reader to choose the appropriate
Monoidinstance that can check that and amend the test accordingly. ↩ -
Although not specifically relevant to the current example, but if we add concurrency into the flow (like with some kind
mapAsync), things get even more complicated when we mix mutability with concurrency. ↩ -
It's an unfortunate fact of life that the averaging operation is not itself a monoidal action. We cannot, in general, combine two averages two produce a new average value. ↩
-
One of the biggest advantages of using generic abstractions such as monoids in our code is that we gain access to a wealth of library functions that work with these abstractions, for free. [^The collection "posts" does not exist or is empty. Please check your content config file for errors.] ↩
-
If you're following along with the repo, you can run this yourself with the command:
↩Terminal window scala-cli run . --main-class ncreep.streaming_purified.runUberJobStats -
Of course one would have to write tests for the new code we just written (the monoid instance and the new
Storageimplementation). But that's the beauty of the modularity that we gained: the new tests are completely self-contained and only need to deal with the new code. ↩