Dynamic Sink Routing in Apache Flink

Apache Flink makes it incredibly seamless to take your streaming data and throw it exactly where you want it to go. It supports tons of different sources and sinks out of the box to easily plug and play to your ETL'ing hearts desire.

But what happens when you need to do something a little out of the box, such as handling dynamic routing. This is when you have to get a little creative and leverage some of Flink's building blocks to accomplish what you need. Let's take a look at how dynamic routing might be implemented.

The Challenge of Dynamic Routing

For example purposes, we'll focus the extent of the examples and problems discussed in this post around the KafkaSink as it's probably the most ubiquitous of all of the sinks and sources you'll encounter when building your Flink data pipelines.

Consider the following problem:

Company Foo has a globally configurable series of mappings for each of their customers that handles routing data to/from specific sources that are used downstream. All of these downstream sinks are composed of Kafka topics, but we need to support a pipeline that can support these "routes" as well as handle dynamically introduced routes as they are introduced.

Well, you might dive into your IDE du jour and start building the sink to handle this:

KafkaSink.builder<...>()
  ...
  .setRecordSerializer(
    KafkaRecordSerializationSchema.builder<...>()
      ...
      .setTopic(yourTopic)
  )

You'll quickly realize in the sink itself only supports a single value for the topic that you want to write your values to. This may be okay in scenarios where the cardinality of sinks to be written to is well-defined / small (i.e. just create a separate sink with an upstream filter), but for dynamic scenarios it can fall apart pretty quickly.

Note: It is possible to dynamically set the topic if it can be resolved from within the message itself using a custom serialization schema to determine the appropriate topic and set it within the serialization process itself.

So what's an easy way to accomplish this? There are a few approaches if you are working solely with Kafka, but what if you have other types of sinks that you need to dynamically route to for various scenarios? For those, we'll need to explore the idea of using a SinkRouter.

Leveraging a Sink Router

When I initially ran into this problem, it was a bit more involved of a process given that I was writing to multiple different types of sinks (Elasticsearch, JDBC, and Kafka), however the overall solution is all handled the same way - by introducing a router to handle translating a given message to the appropriate sink(s).

The following example is specific to Kafka (note the SinkT: KafkaSink<...>), but it could easily be further generalized to suit your needs:

class DynamicKafkaSink<ElementT, RouteT, SinkT : KafkaSink<ElementT, out AutoCloseable>>(
    /**
     * Defines a router that maps an element to its corresponding KafkaSink instance
     * @param sinkRouter A [KafkaSinkRouter] that takes an element of type [ElementT], a string-based route
     * defined as [RouteT] which is used for caching sinks, and finally the sink itself as [KafkaSink]
     */
    private val sinkRouter: KafkaSinkRouter<ElementT, RouteT, SinkT>
) : RichSinkFunction<ElementT>(), CheckpointedFunction {

    // Store a reference to all the current routes
    private val sinkRoutes: MutableMap<RouteT, SinkT> = ConcurrentHashMap()
    private lateinit var configuration: Configuration

    override fun open(parameters: Configuration) {
        configuration = parameters
    }

    override fun invoke(value: ElementT, context: SinkFunction.Context) {
        val route = sinkRouter.getRoute(value)
        var sink = sinkRoutes[route]
        if (sink == null) {
            // Build a new sink for this key and cache it for later use
            sink = sinkRouter.createSink(route, value)
            sink.runtimeContext = runtimeContext
            sink.open(configuration)
            sinkRoutes[route] = sink
        }

        // If you needed to support multiple sinks, you'd iterate and
        // invoke each here
        sink.invoke(value, context)
    }

    override fun initializeState(context: FunctionInitializationContext) {
        // No-op
    }

    override fun snapshotState(context: FunctionSnapshotContext) {
        // Flush any pending writes for
        for (sink in sinkRoutes.values) {
            sink.snapshotState(context)
        }
    }

    override fun close() {
        for (sink in sinkRoutes.values) {
            sink.close()
        }
    }
}

You'll notice that the above sink accepts an instance of a KafkaSinkRouter, which again could be tailored to suit your needs. This class would be responsible for taking an element and determining how it needed to be routed (e.g. determine the topic to route to):

/* Example interface */
interface KafkaSinkRouter<ElementT, RouteT, SinkT : KafkaSink<ElementT, *>> {
    fun getRoute(element: ElementT): RouteT
    fun createSink(cacheKey: RouteT, element: ElementT): SinkT
}

/* Example implementation */
class ExampleKafkaRouter(
    private val parameters: ParameterTool
): KafkaSinkRouter<ExampleClass, String, KafkaSink<ExampleClass>>, Serializable {
    private val logger = LoggerFactory.getLogger(this::class.java)

    override fun createSink(cacheKey: String, element: ExampleClass): KafkaSink<ExampleClass> {
        return buildSinkFromRoute(element)
    }

    override fun getRoute(element: YourClass): String {
        return getTopic(element)
    }

    private fun buildSinkFromRoute(element: ExampleClass): KafkaSink<ExampleClass> {
        /* Construct a KafkaSink here that points to your target topic */
    }
    
    private fun getTopic(element: ExampleClass): String {
        /* Resolve your topic from your element here */
    }
}

Bringing It All Together

Once you've created your dynamic sink as well as your router, you'd simply plug it into your pipeline as you would with any other sink:

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment()

// Define your source
streamEnv
  .fromSource(...)
  ...
  .addSink(
    DynamicKafkaSink(
      ExampleKafkaRouter(...)
    )
  )

Obviously, this type of wrapping approach along with the associated router will allow you to easily mix-match and tailor routing messages to suit your needs.