# Purify Your Tests Episode IV: The Monoids Strike Back

21 min read

Welcome back to yet another episode of test purification. In the last part we explored another benefit of adding type parameters to our code, the ability to work with very lean mocks. It's been a while since the last part, so please take a look there to refresh your memory.

In this part, rather than further improving our test code, we are going to see how to leverage our newly minted type parameters to improve our production code.

Onwards and forwards!

A Long Time Ago in a Galaxy Far, Far Away...

Recall that last time we ended up with the following code1:

class UberService[UserID, UserData, EnrichedUserData, Bookkept, Stored](
fetcher: Fetcher[UserID, UserData],
enricher: Enricher[UserID, UserData, EnrichedUserData],
bookkeeper: Bookkeeper[Bookkept, UserData, EnrichedUserData],
storage: Storage[Bookkept, Stored, EnrichedUserData]):
def fetchAndStore(user: UserID): Stored =
val data = fetcher.fetch(user)
val enriched = enricher.enrich(user, data)
val bookkept = bookkeeper.bookkeep(data, enriched)
val stored = storage.store(bookkept, enriched)
stored

With a glorious sum total of 5 parameters, we managed to make this code highly testable with stateless mocks and maximally lean input data. Those were the days...

Unfortunately, since then, someone in management discovered that what we so bravely called UberService is actually a glorified ETL job. And so, management decreed, our service was demoted to be a batch job. Oh well... Might as well use this opportunity to refactor our code and improve it, that's the thing us developers love doing the most.

Stream Data, Stream...

As a batch job we now need to handle multiple users all at once. To make this manageable and fun, we'll use streaming to get the data into our process.

The new Fetcer now looks like this:

trait Fetcher:
def fetch: Stream[UserData]

Instead of receiving a UserID argument and returning data for a single user, we now produce a Stream2 of UserData. This is the all data that we need to process in our current run.

It also turned out that nobody was ever looking at the bookkeeping data and so we decided to surreptitiously drop the bookkeeping logic from our new job 3.

Lastly, since we are in batch mode, it would be more efficient to store data in batches so as to not invoke the storage procedure on per entry basis. So our data storage now looks like this4:

trait Storage:
def storeBatch(data: List[EnrichedUserData]): Unit

With all those requirements in place, we can now write our new job:

class UberJob(fetcher: Fetcher, enricher: Enricher, storage: Storage): // 1
def fetchAndStore: Unit =
fetcher
.fetch // 2
.map(enricher.enrich) // 3
.grouped(5) // 4
.foreach(storage.storeBatch) // 5
.to(Sink.discard) // 6
  • We have a new class appropriately called UberJob (1).
  • For now we forget all the lessons we learned about type parameters, and write the code without them.
  • The streaming pipeline now starts from fetching (2).
  • The data is enriched in a streaming fashion using map (3).
  • We seamlessly create batches using the streaming grouped operator (4).
  • Which we store with storeBatch, since this is a pure side-effect we use foreach rather than map (5).
  • To actually run the stream we use the to operator, which "pulls" the stream into the provided sink (6).
  • The argument to to is a Sink[In, Out] which is a sink that consumes entries of type In and produces a single result of type Out.
  • In this case, since we are only running the stream for its side-effects, the sink is Sink[Unit, Unit], which discards the stream's (Unit) output5.

Although I promised you that we'll be improving the production code using type parameters, let's start first by asking how testable is this code? A good question to ask, since it's often the case that testable code happens to be better in ways that are not directly related to testing6.

Tests, Tests Everywhere

Well, this code is at least as bad as the original code in the very first post in the series. Since all the flow is producing is Unit the only way to test the logic here is to use some form of mutable mocks. But just like in the first post, we can try to improve testability by (ab)using type parameters, which we will scatter liberally all through the code.

First the helper traits:

trait Fetcher[UserData]:
def fetch: Stream[UserData]
trait Enricher[UserData, EnrichedUserData]:
def enrich(data: UserData): EnrichedUserData
trait Storage[EnrichedUserData, Stored]:
def storeBatch(data: List[EnrichedUserData]): Stored

We now have a type-parameter for every piece of data involved in the flow. Learning the lessons from the previous parts, this should mean that we can avoid mutable mocks (to inspect the outputs of each step) and can use lean fake data (for the inputs of the test).

Note the Stored type parameter that Storage now has. This stands for the Unit return type that storeBatch used to have, and should help us in reflecting the flow of data in the test, where we can choose Stored to be something more informative than Unit.

This leads us to this second version of UberJob:

class UberJob[UserData, EnrichedUserData, Stored]( // 1
fetcher: Fetcher[UserData],
enricher: Enricher[UserData, EnrichedUserData],
storage: Storage[EnrichedUserData, Stored]):
def fetchAndStore: Unit =
fetcher
.fetch
.map(enricher.enrich)
.grouped(5)
.map(storage.storeBatch) // 2
.to(Sink.discard)

The new code is fairly similar to the one we started with. We use a multitude of type parameters on UberJob (1). But the only real difference is that we use map rather than foreach when calling storeBatch (2), since the result is a non-Unit value.

The promise of all these fake type parameters is better testability. Is this what we got here?

Well, not quite. We still have Sink.discard stuck at the very end of our flow. Meaning that the output of fetchAndStore is still Unit, and is as untestable as ever7.

In the non-streaming variant we could return a single Stored value to reflect everything we wanted to know about the flow in the test. But in the streaming version we have (potentially) more or less than one Stored value, one per every batch stored. How do we reflect that in the final output?

One way to achieve this would be to swap out the Sink we are using in favor of Sink.toList8, a sink that accumulates all of the stream's output: Sink[A, List[A]]. If we choose the output of storeBatch to be List[X], where X contains all the data about a single processed entry9, we'll get the following signature:

def fetchAndStore: List[List[X]]

This solves the testing problems, we can get all the info we need for the test by embedding it inside the List[List[X]] value, just like we did in previous parts. But this comes at the cost of modifying the production code in a noticeable way. Now, instead of returning Unit we are going to return a List[Unit], needlessly allocating a (potentially) large list just to improve our tests. This is too high a price to pay for a bit of testability.

We can do better though.

Is There an Algebraic Abstraction in the Room?

The magic of using type parameters for testing stemmed from the fact that we can write the code once and instantiate the type parameters in two different ways. Once for testing and once in a way that's compatible with the original production code. That was parametric polymorphism in action.

In UberJob things are different, not only do we need two different return types (Unit vs. List[...]), but we also need distinct behaviors, one for each return type (discarding results vs. result accumulation). The whole point of parametric polymorphism is that the code's behavior is independent of the choice of type parameters. And so parametric polymorphism doesn't seem to cut it for us now.

Not all is lost though. Luckily, there are other forms of polymorphism we can take advantage of. Namely ad hoc polymorphism, and more specifically typeclasses, which allow us to choose different behaviors based on the specific instantiation of a type parameter.

Our task now is to find an appropriate typeclass and concrete types that capture both our production use case and the test use case. Let's try to "derive" the typeclass we need from the requirements that we have. Step by step:

  1. Let's call the input to our final Sink In and the final output Out, i.e., we are working with Sink[In, Out].
  2. In the production use case the requirement is that:
    • type In = Unit - the result of the storeBatch in production.
    • type Out = Unit - the whole process is a side effect, so we don't keep anything in memory from it.
  3. Conclusion from 2: in the general case In =:= Out10, we are working with Sink[Unit, Unit].
  4. In the test use case we have:
    • For storeBatch we choose Stored to be List[X]
    • From 3 we know that type In = Out = List[X]
  5. Conclusion from 4: we need a Sink[List[X], List[X]].
  6. The requirements for this Sink are:
    • We need to be able to handle the empty case, if there's no data in the stream, the result should be List.empty.
    • To be able to assert on all the relevant data, we want to combine all the individual List[X] values that we get11, we can use ++ for that.
  7. Ergo, we need a Sink that supports the following:
    def mysterySink(empty: List[X])
    (combine: (List[X], List[X]) => List[X]): Sink[List[X], List[X]]
  8. This signature has a very familiar shape. If we generalize this signature to arbitrary types, this looks exactly like fold12:
    def fold[A](empty: A)
    (combine: (A, A) => A): Sink[A, A]
  9. In the test this can be called as:
    Sink.fold(List.empty[X])(_ ++ _)
  10. The generic Sink.fold can be used with Unit as well, we just need to choose the appropriate implementations for empty and combine:
    def empty: Unit = ()
    def combine(x: Unit, y: Unit): Unit = ()
  11. The types match, we still have a Sink[Unit, Unit], but we also avoid in-memory accumulation when running the production code, we just return Unit on every step.
  12. To be (ad hoc) polymorphic over our inputs we need to support only two functions, empty and combine, since these are the only requirements for using Sink.fold.
  13. Which brings us to the following typeclass:
    trait MysteryTypeclass[A]:
    def empty: A
    def combine(x: A, y: A): A
  14. If you've ever been exposed to functional programming, this should ring a bell, we just rediscovered Monoid13:
    trait Monoid[A]:
    def empty: A
    def combine(x: A, y: A): A
  15. Both List[X] and Unit have a Monoid implementation, one with List.empty and ++ and the other is the trivial Monoid with the code from 10.
  16. We can use Sink.fold with any type that has a Monoid instance:
    def foldMonoid[A: Monoid] = Sink.fold(A.empty)(_ combine _)

And so, using this rather lengthy step by step derivation, we conclude that we can make our code (ad hoc) polymorphic by requiring a Monoid instance on the Stored parameter:

class UberJob[UserData, EnrichedUserData, Stored: Monoid]( // 1
fetcher: Fetcher[UserData],
enricher: Enricher[UserData, EnrichedUserData],
storage: Storage[EnrichedUserData, Stored]):
def fetchAndStore: Stored = // 2
fetcher
.fetch
.map(enricher.enrich)
.grouped(5)
.map(storage.storeBatch)
.to(summarize) // 3
def summarize: Sink[Stored, Stored] = Sink.foldMonoid[Stored] // 4

This is the final version of UberJob:

  • We now have a Monoid constraint on the Stored type parameter (1).
  • Meaning that we can run this code with any concrete instantiation of Stored as long as it has a Monoid implementation. The code will adapt its behavior accordingly.
  • Instead of Sink.discard we now call summarize (3), which is just (a slightly more fluent) way to call Sink.foldMonoid (4), which compiles thanks to the Monoid constraint.
  • Now the return type of fetchAndStore is no longer Unit but rather Stored (2), a value we can use in a test to collect data without affecting the production logic with Unit.

The promise of ad hoc polymorphism has been fulfilled, with the same code we get different behaviors based on types, matching the requirements of both the production and test scenarios 14.

But Is It Actually Testable?

Yes!

We can now write a purely functional test using an appropriate selection of types and implementations.

To wit, we select the following types for the test:

type UserData = String
type EnrichedUserData = String
type Stored = List[String]

Recall that using type parameters for the inputs lets us use "lean mocks", that is, we can choose whatever we want to be our inputs. In this case we choose everything to be String. We will augment the contents of the strings as we move between UserData to EnrichedUserData, finally reaching Stored which becomes a List[String] due to working with batches. Notably, the choice of type Stored = List[String] fulfills the requirement of having a Monoid instance.

Next, we implement the corresponding mocks:

object TestFetcher extends Fetcher[UserData]:
def fetch: Stream[UserData] =
Stream("data1", "data2", "data3") // 1
object TestEnricher extends Enricher[UserData, EnrichedUserData]:
def enrich(data: UserData): EnrichedUserData =
s"enriched: $data" // 2
object TestStorage extends Storage[EnrichedUserData, Stored]:
def storeBatch(data: List[EnrichedUserData]): List[String] =
data.map("stored: " + _) // 3

We produce some mock data in a stream in the Fetcher (1). We augment the string with the word enriched in the Enricher (2). And finally, for each entry in a batch, Storage marks it with the word stored.

Our mocks are just pure functions working with lean data, just like we learned to use type parameters in the previous installments.

Lastly, we have the actual test:

"The uber-service" should:
"fetch the user data, enrich it, and store the results" in:
val service = UberJob(TestFetcher, TestEnricher, TestStorage) // 1
val result = service.fetchAndStore // 2
result shouldBe List( // 3
"stored: enriched: data1",
"stored: enriched: data2",
"stored: enriched: data3")
  • We initialize a new UberJob with our mocks (1).
  • We run fetchAndStore producing a List[String] result (2)
  • We assert that the List contains the correct entries (3)
  • Each entry contains traces of going through all the steps in the flow: enrichment and storage.

This test is once again a pure function15, taking inputs and producing deterministic outputs, thanks to the Monoid instance in the background that accumulates results for us16 17.

The production code can still work just as before:

val service = UberJob(Fetcher.Default, Enricher.Default, Storage.Default)
service.fetchAndStore // produces `Unit`

The Monoid[Unit] instance doesn't accumulate anything, and fetcheAndStore only performs side-effects ending with a single Unit value.

This is all great, but somewhere at the beginning I promised you that the gratuitous addition of type parameters will improve our production code, not just tests. What was that all about?

A Product Manager Steps In...

And tells that we that our clients demand observability. More concretely, the data we're dealing with looks like this:

case class EnrichedUserData(
userId: String,
registered: Boolean,
friends: List[String])

Our clients want to know:

  • How many users we processed
  • How many of them were registered
  • What is the average number of friends per user in the current run

That's a lot of data that we need to collect. How shall we approach this?

One thing we could do is to scatter some mutable variables in the flow and accumulate the statistics into them. But that would be iffy, because mutable variables suck18. And also, who wants to change code due to spurious product demands? Especially with such nice and clean looking code, polluting it with secondary concerns such as logging is unwarranted. Add to that the fact that we'll also have to modify the test we just written.

Lucky for us, our code is written generically for any instance of a monoid, and monoids are all about data accumulation (namely the combine function). If we can choose the appropriate monoid for the Stored type parameter we will get data accumulation for free. No need to modify fetchAndStore, nor will we need to change any of the existing tests.

Our first task is to come up with a type that will be accumulate all the data specified in the requirements. Here it is:

case class Stats(count: Int, registered: Int, friends: Int): // 1
def averageFriends = friends.toDouble / count // 2

Here we have one field per a piece of data we want to count (1). Additionally we have a method to compute the average number of friends given the totals in the fields (2)19.

The next step is to provide a monoid instance for this type. For that we can just rely on the fact that integers form a monoid under addition, and a 0 as the empty value:

given Monoid[Stats]:
def empty = Stats(0, 0, 0)
def combine(x: Stats, y: Stats): Stats =
Stats(x.count + y.count, x.registered + y.registered, x.friends + y.friends)

We also need a way to collect a summary from an EnrichedUserData instance:

def fromUserData(data: EnrichedUserData): Stats =
Stats(
count = 1, // 1
registered = if data.registered then 1 else 0, // 2
friends = data.friends.length) // 3
  • Each entry counts for 1 in the summary, when we combine Stats instances we'll eventually get the full count of entries (1).
  • We only add to the registered field if the current user is actually registered (2).
  • For every entry we accumulate the sum total of friends, ignoring the actual values (3).

Lastly, since we operate in batches, we'll actually need to create a Stats instance per batch:

def fromBatch(data: List[EnrichedUserData]): Stats =
Monoid.foldMap(data)(fromUserData)

Here foldMap is a generic monoid function20 that converts each EnrichedUserData list entry into a Stats instance, and then accumulates the results into a single Stats value using combine.

With all that in place we can participate in the monoids game we have going on here. The only thing that's missing is an actual Storage that produces Stats values instead of Unit.

Here it is:

object WithStats extends Storage[EnrichedUserData, Stats]: // 1
def storeBatch(data: List[EnrichedUserData]): Stats = // 2
println(s"Storing: [${data.map(_.userId).mkString(", ")}]") // 3
Stats.fromBatch(data) // 4
  • This implementation chooses Stats for its output, which is compatible with the contract of UberJob as we defined a Monoid instance for Stats (1).
  • Accordingly, the output of storeBatch is now Stats (2).
  • For demonstration purposes we simulate data storage with an informative println that shows us the currently stored batch (3).
  • And then we summarize the current batch into a single Stats value (4).

Done! We have all the pieces in place to produce statistics about our job.

Here's a sample run:

val service = UberJob( // 1
Fetcher.Default,
Enricher.Default,
Storage.WithStats) // 2
val stats = service.fetchAndStore // 3
println("---- Done ----")
println(stats) // 4
  • In this very simplified running script we instantiate an UberJob (1).
  • We swap the default Storage instance with the new WithStats implementation (2).
  • Then we run as usual, but unlike the original production code, the result is no longer Unit, but rather a Stats value (3).
  • We can further process the stats for the run, by e.g., sending them to some monitoring dashboard. Here we just print the stats to the console (4).

A sample output for running this code might look like the following21:

Storing: [a, b, c, d, e]
Storing: [f, g, h, i, j]
Storing: [k]
---- Done ----
Stats(count = 11, registered = 6, friends = 31, avgFriends = 2.8181818181818183)

We can see here how each batch is being stored, and at the very end we have all the statistics that we required.

Notice what we didn't have to do. We didn't further modify the code of UberJob, nor did we have to modify the test we've written. Everything just works due to the code being parameterized with the Monoid constraint, and the Monoid implementation for Stats doing the right thing22.

Conclusion

Adding a type parameter gained us flexibility that turns out to be useful beyond just tests. The type parameter revealed an essential truth about our code, a truth that we can use as requirements evolve. In this case the truth we got is about the accumulating nature of our flow. Constraining with different typeclasses can reveal different capabilities of the code.

The way we got to this code structure is also interesting. All we did was to try and improve the testability of the code, but what we got in return is more flexible and maintainable flow. This points to the even more general idea that it can be sometimes useful to think about code in a context that is more general then the one being currently solved for. Using type parameters is one (very powerful) way that we can remove code from its specific context. Even if you don't end up using type parameters in your final code, just thinking about code that way can also be beneficial.

Although the transformation to using type parameters was a bit more involved this time, with some practice you too will be able to spot those patterns and gain the benefits of parametric code.

That's it for this time. If you come up with any other benefits of using type parameters for testability, please reach out.

Happy parametrizing!

If you enjoyed this, reach out for a workshop on Functional Programming, where I teach how to apply Functional Programming to improve code quality in any language.

Subscribe to the newsletter
Get new posts by email:

Footnotes

  1. The full code for the examples is available on GitHub.

  2. I'm not specifying the particular streaming library that we are using here. The example code can be adapted to any reasonable streaming library. Although the code below looks a bit like (a blocking version of) Akka Streams, the ideas will apply to any library that uses explicit effects (like fs2 and ZIO Streams). You can see the concrete (toy) implementation here

  3. It's almost as if it was there only for pedagogic reasons to begin with.

  4. Seeing the way we use streaming in the code that follows, it could be argued that we can leverage streaming abstractions to create a streaming version of the Storage trait that can hide away the batching logic as an implementation detail, while only exposing an interface that receives entries one by one. Streaming is indeed quite a powerful code structuring method. We won't be doing that here so as to not get too bogged down in streaming mechanics.

  5. Notice how clean streaming makes this code, imagine doing this kind of batching "by hand". There are many benefits to using streaming (like how easy it would be to parallelize this code), but that's a topic for another post.

  6. Surprising exactly no one...

  7. Without mutable mocks that is.

  8. We could take another approach where the final Sink is passed in as a parameter to UberJob. But doing that will still force us to use a mutable mock for the test sink, one that mutably accumulates results. So we don't count that as a solution to the problem that we posed.

  9. E.g., in the tests we can choose X to be String.

  10. The production case constrains the general case.

  11. That's why we had List[List[X]] before.

  12. The actual signature in the repo is slightly more general than this, more similar to foldLeft[A, B]

  13. The actual code in the repo uses a slightly different syntax, using the currently experimental "better support for typeclasses" and support for infix operations, but the essence is still the same.

  14. It could be argued that we are still paying a bit of a tax for testability as we are needlessly "combining" unit values, something that wasn't happening in the original code. Although I don't think it really matters for any practical purposes, we can try to tackle this by parametrizing over the final Sink. Once passing a Sink.discard and once passing in Sink.foldMonoid. That might be okay, but would mean that we are not testing an important feature of the production flow: that the logic of Sink.discard does the right thing for our purposes. This is not an issue in the approach described in the main text as the code uses the same sink in both cases. And if you keep on reading we'll see another reason to favor the Monoid approach later on.

  15. If we go a step further and parameterize over the choice of streaming type, then given we don't use any too advanced streaming tools, we can choose List to stand for the stream in the test. Making the whole test a pure function over lists, which is something that's even easier to write tests for.

  16. Food for thought: our code is using monoids, and the test code is relying on lists, which are the "free monoid". It seems that more generally free structures are useful for testing.

  17. The eagle-eyed readers amongst you might notice that this doesn't check that batching was properly performed. I leave it as an exercise for the reader to choose the appropriate Monoid instance that can check that and amend the test accordingly.

  18. Although not specifically relevant to the current example, but if we add concurrency into the flow (like with some kind mapAsync), things get even more complicated when we mix mutability with concurrency.

  19. It's an unfortunate fact of life that the averaging operation is not itself a monoidal action. We cannot, in general, combine two averages two produce a new average value.

  20. One of the biggest advantages of using generic abstractions such as monoids in our code is that we gain access to a wealth of library functions that work with these abstractions, for free. [^The collection "posts" does not exist or is empty. Please check your content config file for errors.]

  21. If you're following along with the repo, you can run this yourself with the command:

    Terminal window
    scala-cli run . --main-class ncreep.streaming_purified.runUberJobStats
  22. Of course one would have to write tests for the new code we just written (the monoid instance and the new Storage implementation). But that's the beauty of the modularity that we gained: the new tests are completely self-contained and only need to deal with the new code.


Article Series
1
2
3
4

Comments