GCP – Accelerating CDC insights with Dataflow and BigQuery
Data-driven companies are increasingly infusing real-time data into their applications and user experiences, especially with the advent of new technologies that make data capture more on-demand – and at a higher volume – than ever before. Change data capture (CDC) is a long-standing mechanism that data practitioners use to connect their transactional systems with their analytical warehouses. Historically, customers have had to manage temp tables and schedule merge statements to keep their systems in sync, which can be a lot of work and prone to failures. To solve these problems, BigQuery includes native CDC support, which reduces complexity and makes results available to analysts immediately, accelerating time to insight.
In this post, we will cover how to use BigQuery’s new CDC capability in Dataflow along with the new Dataflow at-least-once streaming mode to drastically simplify your CDC pipeline and reduce costs.
Motivation
No single customer’s CDC requirements are the same as another’s. Some customers have data in their relational database that is perfectly aligned with their target systems. However, the reality is usually a little more messy, with various factors playing into an organization’s CDC requirements. At Google Cloud, we offer a comprehensive range of options for CDC workloads depending on your use case.
For CDC workloads that require an extract-load-transform (ELT) pattern, where mutations are handled in the destination, Datastream offers the most seamless experience. Datastream can read from MySQL, Oracle, or PostgreSQL databases and write to BigQuery or Cloud Storage. Datastream also integrates with Dataflow templates to support writes to Spanner, MySQL, or PostgreSQL.
For CDC workloads that require an extract-transform-load (ETL) pattern, where enrichment, transformations, or analytics are performed before writing to the data warehouse, we recommend using Pub/Sub for ingestion, Dataflow for transformation, and BigQuery for data warehousing.
In both scenarios, we recommend streaming data via the BigQuery Storage Write API, as it offers the most scalable and high performance way of writing data to BigQuery. We have published a guide to help navigate different Storage Write API configurations based on your requirements, and a deep-dive on how to use the Storage Write API to stream table updates with change data capture.
Use cases
There are many use cases where you might consider using Dataflow for your CDC pipeline:
You are computing statistics on a subset of the input data.
You are using the CDC pipeline as a triggering mechanism (e.g. audit all DELETEs in the source system, trigger a notification whenever an UPDATE is made to an order status).
You are detecting anomalies that prompts alerts in your monitoring systems.
You are joining a star schema relational database.
You are writing to a destination(s) that is not your data warehouse (i.e. Pub/Sub, Kafka, Elasticsearch, Spanner or another relational database).
You are extracting the single latest record for a single key to appear in the analytics workload (i.e., you want to avoid running a complex query to find the latest value).
You are combining multiple streams into a single pipeline to reduce overall cost (see this post for a walkthrough).
This list is a starting point and not intended to be comprehensive. In summary, Apache Beam’s connectors and expressiveness coupled with Dataflow’s serverless execution model can help you serve most, if not all, CDC use cases that you can imagine.
Technical details
Let’s take a look at what you need to use BigQuery CDC functionality with Dataflow’s BigQueryIO connector. We will use examples from a GitHub repository that uses Spanner’s Change Streams as the source of the CDC data.
If you have an existing pipeline that already uses the BigQueryIO connector, then it will require fairly small modifications. But first, you need to make sure that the destination table follows the CDC prequisites outlined in the BigQuery documentation — meaning the table needs to have a primary key defined. It’s recommended that your table is also clustered on the same set of columns as the primary key.
Once the destination table is ready, the data can be streamed using the STORAGE_API_AT_LEAST_ONCE method. Here’s what a typical configuration looks like:
<ListValue: [StructValue([(‘code’, ‘PCollection<OrderMutation> input = …;rninput.apply(“Save To BigQuery”, BigQueryIOrn .<OrderMutation>write()rn .to(tableReference)rn .withCreateDisposition(CreateDisposition.CREATE_NEVER)rn .withWriteDisposition(WriteDisposition.WRITE_APPEND)rn .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)rn .withFormatFunction(new OrderMutationToTableRow())rn .withRowMutationInformationFn(rn orderMutation -> orderMutation.getMutationInformation()));’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e8fe91d0d30>)])]>
Let’s walk over each of the methods:
“withCreateDisposition” must always be CREATE_NEVER because currently the connector doesn’t support creating a new table with the primary key constraint if CREATE_IF_NEEDED disposition is used.
“withWriteDisposition” must always be WRITE_APPEND or WRITE_EMPTY. WRITE_TRUNCATE can’t be used because CDC-enabled tables don’t currently allow DML statements.
“withMethod” must always be STORAGE_API_AT_LEAST_ONCE. This is the requirement of streaming CDC upserts using the underlying BigQuery Storage Write API.
“withFormatFunction” must take in an element of the input PCollection and convert this element to a TableRow to be persisted.
“withRowMutationInformationFn” must take in an element of the input PCollection and return a RowMutationInformation object. This object has two attributes: mutation type and sequence number. Mutation type can be UPSERT or DELETE. The sequence number is provided to specify the ordering of the mutation relative to other mutations.
The semantics of the connector are fairly intuitive. Each incoming element should provide two types of information: the actual data fields for a particular data row in the table and the information about whether this is an upsert or delete operation for that row. The row identification is determined by the primary keys in the incoming data. The rows are inserted into BigQuery using Storage Write API and BigQuery applies the CDC logic to only return a single row per primary key.
The sequence number in the RowMutationInformation object is very important. To achieve maximum throughput, Apache Beam pipelines don’t guarantee ordered processing. And for high-volume pipelines with frequent updates for a particular key, it’s possible that the row inserts for a particular primary key may arrive out of sequence. Thus the sequence number is used by BigQuery to determine the latest change, and retains the record with the higher sequence number, regardless of the chronological insertion order. In our case, we used Spanner’s commit timestamp to generate the sequence number.
There are additional nuances related to the data fields. Make sure that the source can provide a complete row of data in the upsert mode. This is because the connector currently doesn’t support partial upsert semantics where only the updated fields are provided. In our Spanner case we achieved this by using the “NEW_ROW” capture type. For the delete mode, the fields that constitute the primary key must be provided. The rest of the fields can be null, except the ones that are required fields in the database schema. You can set them to any values that will be valid for the defined schema, like we did here.
Note that when using BigQueryIO’s STORAGE_API_AT_LEAST_ONCE method in a pipeline with the Dataflow streaming engine enabled, you can consider running the pipeline in the At Least-Once streaming mode if you don’t have other transforms that require exactly-once processing semantics. This can result in a faster and cheaper pipeline.
Conclusion
We welcome you to try running CDC pipelines with Dataflow! We have many resources to help you get started:
Our Dataflow CDC templates (MySQL Spanner, or BigTable), which can get you up & started without having to write code
Our Beam documentation can help you if your use case requires business logic that is not met with Dataflow templates. We have a wide range of resources to help you get started on Beam.
Our BigQuery docs contains best practices for writing CDC pipelines to the Storage Write API
We look forward to seeing what you build on Dataflow!
Read More for the details.