Simplifying Data Processing Pipeline Design

Architecturedesignobject-oriented-design

I have a data processing pipeline with well defined stages and IO boundaries. I can choose a language to suit the needs of this design. It starts with an InputObject. At the end of each stage, there is some additional data derived from some or all of the results of previous steps and the InputObject. That is, StageN extends Stage and produces an immutable ResultsObjN, which extends Result. After the pipeline is complete, the final output I'm interested in is some subset of all results from each step:

InputObject
    |
    |
    v
 -----------
|  Stage 1  | <-- Tunable Parameters
 -----------
    |
    |
InputObject
ResultsObj1
    |
    |
    v
 -----------
|  Stage 2  | <-- Tunable Parameters
 -----------
    |
    |
InputObject
ResultsObj1
ResultsObj2
    |
    |
    v
   ...

Currently, I'm modeling each stage as a Stage object. I'll explain what I mean by "tunable" parameters soon.

  • Each Stage has each tunable parameters as attributes exposed by getters and setters.
  • Each Stage is constructed with its input dependencies.
  • Each Stage has a method to compute its result based on the tunable parameters

Here it is in almost-UML:

 --------------------------------------------------
| <<abstract>>                                     |
| Stage                                            |
|--------------------------------------------------|
|--------------------------------------------------|
| +computeResult() : Result                        |
 --------------------------------------------------

For instance, Stage3 computes ResultObj3 using the InputObject and the results of Stage2. There are 2 parameters that can be set to change the results.

 --------------------------------------------------
| Stage3                                           |
|--------------------------------------------------|
| -param1 : Int                                    |
| -param2 : Float                                  |
|--------------------------------------------------|
| +Stage3(raw : InputObject, corners : ResultObj2) |
| +get/setParam1()                                 |
| +get/setParam2()                                 |
| +computeResult() : ResultObj3                    |
 --------------------------------------------------

I would like to reuse this processing pipeline pattern, with different stages in different quantities with different input dependencies. The tunable parameters may be tweaked by an automated optimizer or by a human using some UI. In either case, they follow the same feedback process. Here it is tuning stage 3:

ResultObj3 performStage(InputObject raw, ResultObj2 corners):

1. Stage processor = new Stage3(raw, corners)
2. ResultObj result = processor.computeResult()
3. while (notAcceptable(result)):
4.     tuneParams(processor, result)  // tune to "more acceptable" params 
5.     result = processor.computeResult()
6. return result

I feel like the tuning process should be performed for each step by a queue executor, but I'm not yet sure how to accommodate the growing set of differently typed results , or the varying construction requirements of each stage.

Sample Pipeline

The point of this pipeline is to read a set of coordinates from a CSV (into String raw) and pass the string through a pipeline that will parse the string, cluster the points, and plot the clusters. The cluster data and plot images are extracted from the pipeline after it is run.

The string is parsed into a collection of Point objects at the "ExtractPoints" stage. That collection is clustered/segmented into a set of Cluster objects at the "ClusterPoints" stage. The Point and Cluster data is used to plot the points visually into a Plot image object at the "PlotClusters" stage.

QueuedPipeline pipeline = new QueuedPipeline()

String raw = readFile("points.csv")
pipeline.setInput(raw)

// addStage() takes the name of the stage, the stage runner, and the names of the stages it needs results from
pipeline.addStage("ExtractPoints", new StageTuner(PointExtractor, ), [])
pipeline.addStage("ClusterPoints", new StageTuner(PointClusterer), ["ExtractPoints"])
pipeline.addStage("PlotClusters", new StageTuner(ClusterPlotter), ["ExtractPoints", "ClusterPoints"])

// tune and execute each stage with the StageTuners
PipelineResultList results = pipeline.run()

// pipeline is done, collect the interesting results
Result pointClusters = results.getResult("ClusterPoints")
Result clusterPlot = results.getResult("PlotClusters")

saveClusterFile(pointClusters)
saveCllusterPlotImage(clusterPlot)
  • The classes PointExtractor, PointClusterer, and ClusterPlotter all inherit from Stage.
  • Each StageTuner takes the class of the Stage to tune and implements the above feedback loop.
  • Variables pointClusters and clusterPlots are of Result type

Problem

But simply passing around classes for construction is a headache in some languages. Also, I'm not really sure how to generically construct and set parameters of each Stage subclass because of the different number/type of parameters – perhaps each Stage subclass needs a corresponding StageRunner subclass. Finally, casting Result objects to the type I need within the final two functions sounds dangerous. …but these are just symptoms of my real problem: This is all starting to get complex and fuzzy for what I expected to be a straightforward problem.

Am I defining my objects and behaviours badly? Or is this not as straightforward a problem as I thought? Something else entirely?

Best Answer

This sort of problem is well-suited to dynamic typing. That will give you the most straightforward solution, with the obvious trade offs.

If you wish to use static typing, you'll have better luck if you don't centralize your pipeline construction. Your stages are the ones who know the most about the types of their dependencies and results, so you should give them the responsibility for managing that. I would start by making your example read something like this:

extractedPoints = new PointExtractor(raw)
clusterPoints   = new PointClusterer(extractedPoints)
plot            = new ClusterPlotter(extractedPoints, clusterPoints)

saveClusterFile(clusterPoints.result())
saveClusterPlotImage(plot.result())

Note this directly uses the type system, which makes for a very natural-looking and idiomatic syntax. Inside result() you would call common code to handle the tuning and memoize the result. Because each calculation calls result() on each dependency before performing its own calculation, your "queue" ends up getting naturally formed in the call stack, and memoization prevents multiple execution.

Related Topic