Converging on Declarative Data Materialization
Written by Alex Rasmussen on August 31, 2022
Benn Stancil wrote a post on his blog a few weeks ago called “Down with the DAG”, in which he laments the current state of scheduling in the (ugh) Modern Data Stack. The entire post - and his Substack generally - is worth a read, but this excerpt sums up his main point:
I don’t actually want to think about when to run jobs, how to define DAGs, or to manually orchestrate anything. I just want my data to be fresh—where I can declare what fresh means—and to know when it’s not.
I spend way too much time thinking about data processing systems, and I’ve worked with more than my fair share of dysfunctional workflows, so Benn’s article really resonated with me. It also got me thinking: what would that successor system to the workflow scheduler actually look like, and what would be the most challenging parts of building it?
This article doesn’t propose anything novel. It’s more of an attempt to describe the problem from a different angle and articulate a potential solution (and its corresponding challenges) from there.
Defining the Problem
Workflow schedulers execute jobs, which are typically represented as directed acyclic graphs (DAGs) of tasks. Edges in a job’s DAG encode that job’s execution dependencies: a task’s execution dependencies must complete successfully before that task itself can run. Jobs can run on a schedule, or they can be run in response to a triggering event.
Individual tasks are imperative, and are often assumed to be idempotent. The scheduler handles running tasks in execution-dependent order and retrying tasks if they fail, and it’s up to the tasks themselves to do the rest.
Job DAGs aren’t the only kinds of DAGs that data engineers are used to seeing. Let’s consider another kind of DAG: the DAG contained within dbt’s manifest. I’m using dbt as an example here because it’s popular, and also because some equivalent of dbt’s manifest exists at some level in pretty much every equivalent “maintain a bunch of tables in a data warehouse” system that I’ve ever seen.
The nodes in dbt’s manifest DAG are “models” (usually, but not always, declarative specifications of tables written in SQL), and the edges denote data dependencies between models. In my view, dbt’s key innovation is that these data dependencies are declared inside the definitions of the models themselves; as long as you use
source() to refer to other models instead of referring to them by name, you get data dependency information for free.
By default, dbt writes its models to the data warehouse as views, and leaves all decision-making about how data should be processed to the data warehouse. If we were able to stay in this default mode where everything is a view forever, we wouldn’t really need to worry about orchestration; all our models would be constantly up-to-date as long as any source fact data kept coming in. Unfortunately, most organizations can’t stay in that default mode for long.
Materialization is Hard
In almost all data warehouses, views are just virtual tables; if a query references a view, the view’s definition is inlined into the query at runtime. As the volume of data being queried and the number and interdependence of views increases, views can become prohibitively slow and expensive to query. To make querying frequently-accessed views faster, we materialize them periodically, pre-computing and storing their results. Once we introduce materialization (and, consequently, add caching), keeping the data up-to-date gets a lot more complicated and data dependencies between models suddenly matter a lot more.
To ensure that our materialized model is correct, we have to make sure that we wait for its data dependencies to be up-to-date (in dbt’s parlance, “fresh”) before we read from them. If we don’t wait long enough, our materialization could be derived from incomplete data. If we wait too long, we may miss our recency SLOs or delay the materialization of downstream models. In practice, this can be a difficult tightrope to walk, especially if models have varying (and perhaps even conflicting!) SLOs.
In addition to figuring out when to materialize the model, we also have to figure out how to materialize it. If the resulting table is small enough, we may be able to afford to completely re-materialize it every time we need to freshen it. If the table is huge we may need to materialize it incrementally, one day (or minute, or week) at a time, to make it fast enough to meet our SLOs. We may even need to go back and re-materialize some or all of a model’s data.
Square Peg, Meet Round Hole
If you’ve already got a workflow scheduler, it’s very tempting to cast the problem of materializing models as a scheduling problem. After all, the problem sounds like a straightforward set of imperative tasks: long-poll sources until they become fresh, then freshen models in data-dependent order, and repeat this process as frequently as you need to meet your objectives. Unfortunately, models are rarely that well-behaved. Models get created and destroyed all the time. SLOs change as organizational needs change. A model may be data-dependent on only certain partitions of another model. You may decide to fully re-materialize a model every day when it’s small, and move to materializing it incrementally when it gets too large and unwieldy. Different models may have different definitions of what constitutes “fresh” for the same source. Backfills, data deletion requests, and run-of-the-mill bugs in your queries may necessitate re-materializing years’ worth of data. The more complex and dynamic the situation becomes, the more shoehorning you have to do to fit all that dynamism into the workflow scheduler’s imperative, scheduled, task-oriented abstraction. All too often, the result is a giant shambling mess of interdependent job DAGs that are tedious to operate, difficult to reason about, and dangerous to change.
These are the kinds of situations that Benn laments in his post - this tangled mess of jobs that result from forcing a declarative, data-centric abstraction into an imperative, task-centric runtime.
In a perfect world, we’d tell the system what models we have, when we need them, and maybe what queries we’re running on them, and let some kind of data auto-materialization system do the grunt work of turning those models into tables in our warehouse. Instead, we’re left telling a workflow scheduler both what we want and how to get it, and having to do a lot of undifferentiated toil when either of those things change.
Where Do We Go From Here?
Workflow schedulers are a relatively old and entrenched part of many organizations’ infrastructure, and it’s going to take a major leap forward in capability or efficiency to convince engineering teams that adding yet another piece of infrastructure for automated data materialization is a good idea. In my view, any successor system needs to be able to do two things really well in order to provide that value. First, the system needs to be able to figure out how to make materialization happen both efficiently and cost-effectively, leaving data teams to focus on the models. Second, it needs to be able to change how materialization happens in a controlled and sensible way as circumstances change.
Figuring out how to materialize a given set of models sounds like something you might want to use a cost-based query optimizer for, but there’s a bit of an impedance mismatch here too. Query optimizers are good at choosing a pretty good query plan from an enormous space of potential plans, based mainly on the data’s statistical properties, its physical layout, and a bunch of heuristics and simplifying assumptions. By contrast, there really aren’t that many ways to materialize a single table, and users typically have some inkling of the way a model should be partitioned ahead of time - they just don’t want to hand-hold the system through the process of generating those partitions. Query optimizers come with cost models of varying sophistication for every operation they know how to perform. Nothing stops you from picking a materialization strategy for incrementally-materialized models based on some kind of a cost model, but that feels like more of a nice-to-have feature than a hard requirement. The user is mainly concerned with two things: each materialization should happen within its SLO, and it should happen in the correct data-dependent order with respect to all other materializations.
In the abstract, then, the job of this hypothetical data auto-materialization system is to continually compare the state of the warehouse against the desired state of the models and their SLOs and make modifications to the warehouse until the models are materialized and the SLOs remain satisfied. Stated in this way, the problem isn’t fundamentally an orchestration, scheduling, or even optimization problem, it’s a convergence problem.
Converging on a Solution
The software world is littered with solutions to convergence problems. Infrastructure-as-Code tools like Terraform and CloudFormation (and, to a lesser extent, Chef, Puppet, Ansible, etc.) are convergence-based tools. They read a representation of the desired end state, compare that desired state to the current state of the infrastructure, and construct a plan to converge the current state with the desired one. Kubernetes is also a convergence-based tool: its controller-manager compares the set of declarative resource specifications with the current state of the cluster, and makes changes to the cluster (adding and removing cluster nodes, starting and stopping containers, etc.) until all the desired resources are running.
In a convergence-based world, a “materialization controller” could periodically evaluate the state of all models, inspect their materialized counterparts, and issue queries to the warehouse to partially or completely (re-)materialize them as necessary. If a few of a model’s partitions need to be recomputed, the user could simply drop the impacted partitions and allow the controller to notice their absence and re-populate them. As new models get added to an organization’s dbt project, they’re noticed by the controller and then simply appear in the warehouse. The controller can handle prioritizing queries based on the warehouse’s current load and the models’ SLOs, automatically shifting costly materializations that can endure some amount of delay to times when demand on the warehouse is low. All the incidental complexity that was once the domain of human operators disappears, leaving analytics engineers to spend more of their time on analytics and less of their time on engineering.
This appears to be an idea whose time has come. Dagster is already taking a crack at this idea with their notion of software-defined assets. What I’m calling “convergence” here, they’re calling “reconciliation”, but we’re both essentially hitting the same high points. I haven’t played with Dagster’s implementation enough to form an opinion on it yet, but what I’ve seen so far looks promising. I’ll admit that the idea that assets are defined as Python functions whose side-effect-freedom is assumed rather than enforced makes me a little itchy, though.
If the benefits of this approach are so apparent and work is already underway, you might wonder why we’re not swimming in competing materialization controller implementations. I think the answer is that building an honest-to-goodness materialization controller is going to be hard.
The Trouble with Convergence
Any implementer of a materialization controller has their work cut out for them. The industry has enough collective production experience and battle scars with convergence-based systems to know that they’re not as easy to build or operate as they might first appear. Fortunately, we’re in such an early stage that we can and should be bringing all the lessons that we’ve learned from prior systems into the development of any future materialization controller. In particular, we need to be wary of three big problems with building and operating convergence-based systems: explainability, over-eagerness, and drift.
The first big problem with convergence-based systems centers around explainability. In convergence-based systems, everything looks like magic when things are going well, but if something goes wrong it can be difficult to understand what’s going wrong. Detailed, machine- and human-readable logs would go a long way toward making problems easier to solve, and could also serve as an important piece of rich lineage information. To be truly useful, though, that information needs to be built into the system as a first-order architectural concern - bolting it onto the side once the system is already built won’t cut it.
The second big problem with convergence-based systems is that it’s easy for an over-eager controller to spend a lot of money and/or resources by either doing too much or doing it too fast. The controller will need some guardrails to prevent it from rebuilding the universe from scratch in reaction to one misconfigured model. If you’ve used either Kubernetes or Terraform in production, you can probably recall a time when your blood pressure spiked as a seemingly minor change caused the controller to want to change everything. In a similar vein, operators will need to be able to tell the controller to stop trying to converge parts of the environment in an emergency or other exceptional situation.
The third and, in my view, thorniest problem with convergence-based systems in this context is drift. If the controller is the only thing changing the environment, it can keep a pretty warm cache of the environment’s state that it can use to work more efficiently and thereby support larger environments. In reality, the controller is rarely the only thing manipulating the environment and its state cache can get stale very quickly. This is doubly true in the current data ecosystem, where dozens of competing products could all be working on different parts of your data warehouse at the same time.
There are a few ways you might be able to keep the controller’s cache relatively warm in the presence of drift. The controller could hook into some kind of notification system so that it’s aware of environmental changes quickly. Ultimately, you’d probably want to assume some amount of staleness and limit the controller to operations that are “safe” in the presence of that staleness. These additions introduce a lot of additional complexity. Luckily for us, data warehouses are good at exposing what’s being done to them (through query logs) and at reasonably isolating competing modifications from one another (with ACID transactions), which will hopefully make things easier.
I, For One, Welcome our Convergent Overlords
One of the enduring truths of software engineering, and data engineering in particular, is that we’re constantly moving the goal-posts. New capabilities unlock new use-cases, which increases demand, which in turn puts additional pressure on our tools. Eventually, that pressure builds to the point that some of those tools start to break; we’re currently seeing that happen with workflow schedulers. I’m hopeful that we’re gaining clarity on not just what’s breaking, but why it’s breaking, and that we’ll use that clarity and a healthy dose of past experience from adjacent parts of software engineering to build a new generation of even better tools.