We’ve been using Apache Beam Java SDK to build streaming and batch pipelines running on Google Cloud Dataflow. It’s solid, but we felt the code could be a bit more streamlined.
That’s why we took Kotlin for a spin! Find out how we leverage it to reduce the boilerplate code.
Why we switched to Kotlin
So what’s the rationale behind using Kotlin? Basically, we’ve found pipeline codes written in Java to be a bit overkill – although the SDK itself provides great Domain Specific Language (DSL) to build pipelines easily.
Kotlin, on the other hand, has been excellent – especially when inter-operating with Java – so we wanted some simple Kotlin DSL to help us build pipeline jobs effortlessly, as well as quickly process data migrations in Google Cloud Platform.
The “basics”
What are Apache Beam and Google Cloud Dataflow?
You can learn about Apache Beam and Google Cloud Dataflow in detail here , but in a nutshell:
- Google Cloud Dataflow is a fully managed service provided by Google that aims to process large volumes of data, and can also be seen as an ETL (Extract, Transform, Load) tool in Google Cloud Platform
- Apache Beam is an open source SDK for a unified programming model that provides pipeline portability, and allows jobs to run on multiple platforms.
What about Kotlin?
Kotlin is a JVM language invented by JetBrain. It’s also Google’s official programming language for Android development. Head here to learn more.
WordCount pipeline Java examples
Let’s have a look at a WordCount pipeline Java example . The example below is one from the Apache Beam git repo.
… CountWords and FormatAsTextFn classes are defined ...
static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
runWordCount(options);
}
It took me a little while to understand this during the initial learning phase – even though it’s just a simple pipeline reading lines from a file, processing (splitting and counting words in each line), and writing them into new file.
We need to understand how apply() method works together with various classes, such as PTransform, MapElements or ParDo to build proper transformation steps.
So why can’t we have convenient collection methods for transformation steps instead? For example:
Pipeline p = Pipeline.create(options);
p.fromText(options.getInputFile())
.flatMap(it.split(TOKENIZER_PATTERN).filter(it.isNotEmpty()).toList())
.countBy(it)
.map(it.key+”:”+it.value.size)
.toText(options.getOutput());
p.run();
The pseudo code above looks much simpler and clearer than the java example, as it shows whole steps together in a chain of methods, and doesn’t need to extract the logic into a class or method.
We can easily achieve this by using Kotlin extension functions, so let’s see how the extension functions help to build such a simple pipeline job.
Using Kotlin extension functions to build DSL
Kotlin supports extension functions and extension properties. Kotlin extension is a great feature that lets you add new properties or methods into existing classes without having to inherit the classes.
Below, we’ll use extension functions to add new methods into both Pipeline and PCollection, much like the pseudo code above.
Head here to learn more about Kotlin extensions.
Let’s implement the pseudo code!
The first step is to create a pipeline from PipelineOptions, like the java examples above.
Having a static method as a starting point of DSL is a good idea; it’ll let developers discover the following DSL from it. For example:
val (pipe, options) = KPipe.from<WordCountOptions>(args)
Here, we want to get both a pipeline and its options (a WordCountOptions, in this case) from the method and the options type. Let’s have a look at the actual implementation.
object KPipe {
inline fun <reified R : PipelineOptions> from(args: Array<String>): Pair<Pipeline, R> {
val options = PipelineOptionsFactory.fromArgs(*args)
.withValidation()
.`as`(R::class.java)
return Pipeline.create(options) to options
}
}
It’s a simple implementation that uses inline function with reified type that Kotlin provides (to reiterate: reified type only works with inline function).
KPipe object above has from() method as an inline function with reified R type of PipelineOptions.
Reified type in Kotlin lets you get a class for type-checking, creating an object, and whatever we can do with ‘Class’ class in Java.
In this case, we can return the same options instance as reified type R from the method without having to pass the class as a parameter, like Java.
Now that we’ve got a pipe with options, let’s build DSL for TextIO to read and write file.
Building DSL for TextIO (Source and Sink parts)
We can use TextIO directly, as it’s a pretty simple API. But I’d like to show you what it looks like when we build DSL for other component IOs.
These methods for specific component IOs could also be extensible in the future thanks to Kotlin’s optional named parameters.
We could set null by default for these optional parameters, so developers can skip those parameters for a simple pipeline job.
Let’s build fromText(), toText() methods to implement the pseudo code.
fun Pipeline.fromText(
name: String? = null,
path: String): PCollection<String> {
return this.apply(name?: "Read from Text",
TextIO.read().from(path))
}
fun PCollection<String>.toText(
name: String? = null,
filename: String
): PDone {
return this.apply(name?: "Write to Text",
TextIO.write().to(filename))
}
The named parameters (like ‘name’) give us some handy flexibility to later put a different step name in individual steps. Doing this makes the names of steps informative on the graph of a pipeline in Google Cloud Dataflow console.
If you’re not too fussed, you don’t need to provide names – the default ones will do just fine.
The fromText() method is added to a Pipeline class, while toText() method is added to a PCollection class.
The source part outlines where to pull the data from as a starting point from the pipeline, and returns PCollection. PCollection then gets stored in a Sink part as a termination step that returns PDone.
There are various Component IOs built in Apache Beam SDK; and we’ve built some useful methods for Google Cloud Dataflow ourselves – like fromPubsub(), toPubsub(), and fromBigQuery(), toBigQuery().
The benefit? They let us build pipelines quickly without having to know many options of the IOs.
To better illustrate, here’s a particular example of fromPubsub<t>() method to get different payload according to the Type T.</t>
inline fun <reified T> Pipeline.fromPubsub(
projectId: String,
subscription: String,
idAttr: String = "uid"): PCollection<T> {
val pubsubRead = when {
T::class.isSubclassOf(Message::class) -> PubsubIO.readProtos(T::class.java as Class<Message>)
T::class == String::class -> PubsubIO.readStrings()
T::class == PubsubMessage::class -> PubsubIO.readMessagesWithAttributes()
else -> {
throw RuntimeException("Invalid type. it must be ProtoMessage, String or PubsubMessage")
}
} as PubsubIO.Read<T>
return this.apply("Reading ($subscription) from Pubsub", pubsubRead
.fromSubscription("projects/$projectId/subscriptions/$subscription")
.withIdAttribute(idAttr))
}
It calls a different PubsubIO.read method according to reified type T using ‘when’ expression, and eventually calls apply() method with the PubsubIO.Read instance with extra options.
Therefore, we use this method without having to know PubSubIO.readXXX() methods. The method could also be extensible upon new type, e.g. JSON, Avro, etc. can easily be added to it in the future.
Now, let’s go back to building DSL for the most important part in a pipeline.
Building DSL for Transformation steps
So far, we have the actual code like:
val (pipe, options) = KPipe.from<WordCountOptions>(args)
pipe.fromText(path = options.inputFile)
... need transformation steps like splitting, counting, grouping and so on ...
.toText(filename = options.output)
pipe.run().waitUntilFinish()
This is an example of the main benefit of Kotlin extension functions. We can simply use map(), flatMap(), filter(), and others directly from PCollection.
Let’s build flatMap() method
Since Kotlin function extension allows us to attach new methods into existing classes, we can attach useful methods directly to Pipeline or PCollection.
This is what flatMap() method added into PCollection looks like:
inline fun <I, reified O> PCollection<I>.flatMap(
name: String? = null,
noinline transform: (I) -> Iterable<O>): PCollection<O> {
val pc = this.apply(name ?: "flatMap to ${O::class.simpleName}",
FlatMapElements.into(TypeDescriptor.of(O::class.java))
.via(transform))
return pc.setCoder(NullableCoder.of(pc.coder))
}
This function internally uses apply() method with FlatMapElements class to let a simple transform function (SerializableFunction<I, O>) be executed.
It also adds a specific Coder by using reified type O into the PCollection. Using reified type in a function – again, the function has to be with inline keyword – can get the class directly from the type (unlike Java, due to the type eraser on runtime).
Therefore, we don’t need to worry about what Coder we need to use when calling this flatMap() in PCollection – so it’s fairly simple! Not to mention, at under 10 lines, it saves us from having to know about MapElements and TypeDescriptor.
Like flatMap() in Java, this flatMap() method is to transform from PCollection<Iterable<o>> to PCollection<O>. So we can use this method to convert each line read from text file to a split list of words, and then flatten them out to PCollection of words, like:</o>
pipe.flatMap { it.split(TOKENIZER_PATTERN).filter(it.isNotEmpty()).toList() }
One-line code! So good 🙌
Let’s add map() method
map() method is also super similar (and just as simple), but it uses MapElements instead. For instance:
inline fun <I, reified O> PCollection<I>.map(
name: String? = null,
noinline transform: (I) -> O): PCollection<O> {
val pc = this.apply(name ?: "map to ${O::class.simpleName}",
MapElements.into(TypeDescriptor.of(O::class.java))
.via(transform))
return pc.setCoder(NullableCoder.of(pc.coder))
}
Likewise, this map() method is to transform PCollection to PCollection<o>. So it’ll be used to format each element as a last step in the PCollection, like:</o>
pipe.map { "${it.key}: ${it.value}" }
In Kotlin, we can directly use ‘it’ keyword when lambda expression has a single parameter.
How about countBy() method?
According to the pseudo code above, countBy() method needs to count the same words, and group the counts by word.
In this case, we can use builtin Count.perElement() method since Apache Beam SDK already provides many useful aggregation functionalities with classes.
fun <I> PCollection<I>.countPerElement(
name: String? = null): PCollection<KV<I, Long>> {
return this.apply(name ?: "count per element",
Count.perElement<I>())
.setTypeDescriptor(object : TypeDescriptor<KV<I, Long>>() {})
}
We can directly use apply() method with Count.perElement(), as it’s simple enough to be used (we’ll see how this looks a bit further down).
Wiring it all up together
So we’ve created Kotlin DSL to implement the pseudo code. Below is the working pipeline example using the DSL we’ve built so far:
val (pipe, options) = KPipe.from<WordCountOptions>(args)
pipe.fromText(path = options.inputFile)
.flatMap { it.split(Regex(TOKENIZER_PATTERN)).filter { it.isNotEmpty() }.toList() }
.countPerElement()
.map { "${it.key}: ${it.value}" }
.toText(filename = options.output)
pipe.run().waitUntilFinish()
Notice how similar it is to the pseudo code from the beginning of the article? Hopefully you have some ideas of your own to extend this for your pipelines in Kotlin and reduce overcomplicated code.
Mixing with Java API
Obviously, we can use all existing methods from Pipeline and PCollection mixed with extended methods. For instance, if we didn’t create fromText(), toText() and countPerElement() before, we could directly use apply() method with TextIO and Count classes, like:
pipe.apply(TextIO.read().from(options.getInputFile()))
.flatMap { it.split(Regex(TOKENIZER_PATTERN)).filter { it.isNotEmpty() }.toList() }
.apply(Count.perElement())
.map { "${it.key}: ${it.value}" }
.apply(TextIO.write().to(options.getOutput()))
...
Having said that, we should start with the minimum necessary methods added to Pipeline and PCollection.
Later on, we can consider using apply() method with ParDo or PTransform classes if we need to build complicated pipeline logic – like having side inputs/outputs in certain transformation steps.
Unit tests for pipeline
We can use Create class to seed static data for the tests, and call the methods we’ve built before. After that, we can validate the final PCollection by using PAssert utility.
val results = pipeline
.apply(Create.of(
"apache beam in kotlin",
"this is kotlin",
"awesome kotlin",
""))
.flatMap { it.split(Regex(WordCount.TOKENIZER_PATTERN)).filter { it.isNotEmpty() }.toList() }
.countPerElement()
.map { "${it.key}: ${it.value}" }
PAssert.that(results).containsInAnyOrder(
"this: 1", "apache: 1", "beam: 1", "is: 1", "kotlin: 3", "awesome: 1", "in: 1")
pipeline.run()
Wrapping up!
Phew. We made it. Let’s recap what we’ve learned to do:
- Use Kotlin extensions to add new methods into Pipeline and PCollection
- Use Kotlin reified type with inline function to use the type class for something useful
- Build DSL of Component IOs and transformation steps for the pipeline code
- Mix things with Java API.
Thanks for reading, and be sure to have a play around – you’re bound to find something useful!
Head here for the full code examples used throughout the article.