Avoiding Kotlin Minefields in Apache Beam

Without a doubt, the Java SDK is the most popular and full featured of the languages supported by Apache Beam and if you bring the power of Java's modern, open-source cousin Kotlin into the fold, you'll find yourself with a wonderful developer experience. As with most great relationships, not everything is perfect, and the Beam-Kotlin one isn't totally exempt.

This post will cover some of the unique interactions between the two technologies and help you avoid some of the potential landmine gotchas that could arise when you are getting started, so you can focus on the great experience between Kotlin and Beam.

Declaring Anonymous ParDos / DoFns

When scouring the web looking for examples, it’s fairly common to see something like the following that creates an anonymous DoFn to be used within a ParDo:

lines.apply("Extract Words", ParDo.of(new DoFn<String, String>() { ... }));

A simple conversion to Kotlin would yield the following:

lines.apply("Extract Words", ParDo.of(DoFn<String, String>() { ... }))

However, you’ll find that this causes type erasure to occur and Beam will complain about it. Instead in order to implement an anonymous function you must indicate that an object is inheriting from the DoFn explicitly:

lines.apply("Extract Words", ParDo.of(object : DoFn<String, String>() { ... }))

Defining TupleTags

TupleTags can be invaluable and necessary if you are dealing with transforms or operations that deal with multiple types, however you may find that issues bubble up related to the declarations of these that cause you to either explicitly require a Coder to be defined via the setCoder() function after retrieving a specific tag.

A dead giveaway would be the following error:

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Transform.out1 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producing PTransform failed: Unable to provide a Coder for V. Building a Coder using a registered CoderProvider failed.

If you encounter this, it’s very likely that you defined your TupleTags as follows:

val userTag = TupleTag<KV<String, User>>()

Unfortunately as with most issues in Kotlin, type erasure can be a problem. To avoid this issue, you need to be explicit as possible when defining a TupleTag and use the object implements pattern as seen below:

val usersTag = object: TupleTag<KV<String, User>>() {}

The use of the object and trailing open-close curly braces allow the specific types to not be lost when attempting to read from the tag.

IntelliJ Generated Overrides

One of the most appealing features of IntelliJ is the ability to allow the IDE to generate any missing overrides for you when implementing or inheriting from another class / interface. Due to Kotlin’s typechecking system, this can be a challenge since Kotlin explicitly uses a ? character to denote nullability, but Beam will want you to ensure that the types match exactly.

Consider the following function:

class ExampleTransform: PTransform<PCollection<KV<String, Test>>, PCollectionTuple>() {
    // Omitted for brevity
}

You know that you need to perform some type of operation here, so you take advantage of your IDE and allow it to generate the appropriate overrides:

When you do this, you’ll see that nullable instances of all of your types will be added, explicitly the parameters:

// Notice the trailing ? after the type definition the input
override fun expand(input: PCollection<KV<String, Test>>?): PCollectionTuple {
    TODO("Not yet implemented")
}

Beam is extremely explicit with regards to typing and nullability, so you’ll want to ensure that the PCollection in this case is not decorated with the nullable operator:

override fun expand(input: PCollection<KV<String, Test>>): PCollectionTuple {
    TODO("Not yet implemented")
}

Iterables, But Which Ones?

Both Java and Kotlin have notions of an Iterable interface for working with collections of items, however when leveraging them via a grouping/batching operation such as the GroupIntoBatchs transform, a Kotlin-Java JVM disconnect can occur between the types.

pipeline
    .apply("Batch Items", GroupIntoBatches.ofSize<Key, Value>(100))
    .apply("Apply Batching Transform", ParDo.of(SomeTransform.transform()))

You may encounter an error that looks like the following:

ProcessContext argument must have type DoFn<Iterable<? extends Value>, Result<? extends Value>>.ProcessContext

You can resolve this by adding a @JvmWildcard annotation to the type of the iterable (and not the iterable itself on the DoFn) by changing this:

class SomeTransform: DoFn<KV<Key, Iterable<Value>>, KV<Key, Value>>(){ 
  // Omitted for brevity 
}

to this:

class SomeTransform: DoFn<KV<Key, Iterable<@JvmWildcard Value>>, KV<Key, Value>>(){ 
  // Omitted for brevity
}

This hint to the JVM should allow it to determine the correct version of the interface to use and be serialized/deserialized by the Beam programming model.

Writing Pipeline Tests

Testing, particularly unit testing, is extremely important when writing Beam applications (and obviously always), however there are two major gotchas in the testing department that you should be aware of when working with Kotlin, namely:

  • Defining Your Pipeline
  • Apply PAsserts
  • Running Pipeline Tests

Defining Your Pipeline

Since the native PAsserts that are applied when writing unit tests against Beam pipelines rely on native Java code, they will require a bit of annotations when being used in Beam. You can use the following as an example for how to construct one:

@get:Rule
@Transitive
val testPipeline: TestPipeline = TestPipeline.create()

All of your individual unit tests can share this pipeline, but you should consider writing it exactly as above since both the @get:Rule and @Transitive annotations are required, as is the explicit type declaration (e.g. : TestPipeline).

Applying PAsserts

The PAssert library comes bundled with Beam and allows you to write tests explicitly targeting PCollection objects (e.g. you can write assertions against the contents of them, verify their contents, etc.). These generally will just “work” as expected, however one particular caveat comes when using the .satisfies() function:

PAssert.that(numbers).satisfies { elements ->
    assertTrue(elements.contains(42))
}

You’ll find that this will not work since the satisfies() function explicitly expects a Java Void to be returned. Since this doesn’t exist within Kotlin, you’ll be required to explicitly place a null at the end of the body of the function:

PAssert.that(numbers).satisfies { elements ->
    assertTrue(elements.contains(42))
    null // Required
}

Running Pipeline Tests

After potential gotcha can come when attempting to actually execute or run the tests themselves. You have to ensure that the pipeline itself is explicitly run, to completion, after the PAssert is defined:

PAssert.that(numbers).containsInAnyOrder(42)
testPipeline.run().waitUntilFinish()

Since the PAssert is constructed as part of the dynamic acyclic graph that executes the pipeline, it must be declared prior to running the tests. You’ll also find that you won’t be able to debug any of the ParDo level operations if you are missing the run() declaration.

Missing a Gotcha?

If you've been working with Apache Beam and Kotlin, I'd love to hear of any specific gotchas or use-cases that you ran into and how you overcame them!