Migrating Data From Many Source With Change Data Capture on GCP (Part 1)
CDC
Change Data Capture is a software process that identifies and tracks changes to data in a database.
CDC provides real-time or near-real-time movement of data by moving and processing data continuously as new database events occur.
In high-velocity data environments where time-sensitive decisions are made, Change Data Capture is an excellent fit to achieve low latency, reliable, and scalable data replication.
CDC for ETL
ETL (extract, transform, load) is a data integration process in which data is extracted from various sources and delivered to a data warehouse, database, or data lake. Data can be extracted using database queries (batch-based) or Change Data Capture (near-real-time).
During the transformation phase, data is processed and converted into the appropriate format for the target destination. While legacy ETL has a slow transformation step, modern ETL platforms replace disk-based processing with in-memory processing to allow for real-time data processing, enrichment, and analysis. This is the purpose of using CDC in ETL. The final step of ETL involves loading data into the target destination.
How
There are multiple common Change Data Capture methods that you can implement depending on your application requirements and tolerance for performance overhead (Audit Columns, Table Deltas, Trigger-based CDC, Log-Based Change Data Capture,…). But in this blog aimed at data migration, I want to focus on Log-Based Change Data Capture.
Tools for Change Data Capture
I compare 2 tools(Datastream vs Debezium) that I feel are suitable for my data migration purposes. Debezium is an open-source platform for change data capture that is very common and Datastream is a CDC service provided by GCP.
It can be seen that Datastream is a convenient solution if you use GCP and want to CDC, the limitation of Datastream is that it only supports a small number of databases at the moment.
As for Debezium, it is open-source, so it is allowed by the development community to support many different types of databases, but if want to use it, you need to spend the effort to install and maintenance on your system.
Because there are many different data sources (including RDBMS and NoSQL), I use both tools so that each is appropriate. For data sources already supported by Datastream, we will make use of this service. Otherwise, if the data source is not supported we will use Debezium.
The data migration strategy is that for each database/table, we will take a snapshot (use Spark, JDBC) of them the first time to get the data from the beginning to time A, and then from time A onwards, we will use CDC to be able to update data.
In Part 1 of this blog, we will go through the steps to get a complete stream config in Datastream for CDC.
Datastream
Here are the steps that I do to be able to configure Datastream to use for getting CDC bin logs from MySQL to GCS.
1/ Enable Datastream on GCP
2/ Configure the data source
- My data source is a self-hosted MySQL, so I configure based on this guide of GCP. To configure another type you can see here.
3/ Must allow network connection from Datastream to the data source (whitelisting, IP filtering, etc.), check this doc for details. Also, need to create a username/password for Datastream can access MySQL.
4/ Create Connect profile(Source) and check the network connection from Datastream to the data source.
- Go to Connection profiles page
- Define connection settings
- Secure your connection to your source. For testing purposes, I used encryption type “None”— Connection is not encrypted. For product environments, I would recommend that you best consider using “Server client” — Encrypt connection, and authenticate both source and Datastream.
- Define connectivity method. There are 3 options (IP allowlist, Forward SSH tunnel, and VPC peering). For testing purposes, I choose “IP allowlisting”. For product environments, I would recommend that you best consider using “VPC peering”. After selecting, Datastream provides us with a list of IPs, we need to open a connection for these IPs so that Datastream can access MySQL.
- Next, you can test the connection from Datastream to MySQL by pressing TEST button
- The connection test will take a few seconds, and after a successful connection, you may see a notification like this.
5/ Create a Cloud Storage profile(Destination) and provide permissions for Datastream to write data in the GCS bucket/path, check this doc for details.
6/ Create Stream in Datastream(connect Source to Destination)
- Go to Streams page
- Define stream details
- Define MySQL connection profile. Select the previously created “mysql-dev-source” profile.
- Configure stream source
- Define Cloud Storage connection profile. Select the previously created “stream-cdc-bucket” profile.
- Configure stream destination.
- Finally, you can Review stream details and create.
Output
The data for a given stream is written(a 1-minute batch) to the provided bucket or file prefix at:
[bucket]/[prefix]/[objectname]/yyyy/mm/dd/hh/mm/[filename(idempotent)]
The object name in the path for database sources is the schema name followed by the table name (separated by an underscore ‘_’). To understand more details about the data written in GCS, you can read the details here.
Then, you can leverage this data to update data at Data Warehouse with near real-time latency. Congratulations!
- GCS Path:
gs://stream-cdc/mysql-dev/ph_table_test/2023/05/17/12/22/mysql_ph_table_test_2023_05_17_12_22_…_mysql-cdc-binlog_-...avro
- File format:
Schema:
|-- uuid: string (nullable = true)
|-- read_timestamp: timestamp (nullable = true)
|-- source_timestamp: timestamp (nullable = true)
|-- object: string (nullable = true)
|-- read_method: string (nullable = true)
|-- stream_name: string (nullable = true)
|-- schema_key: string (nullable = true)
|-- sort_keys: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- member0: string (nullable = true)
| | |-- member1: long (nullable = true)
|-- source_metadata: struct (nullable = true)
| |-- table: string (nullable = true)
| |-- database: string (nullable = true)
| |-- primary_keys: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- log_file: string (nullable = true)
| |-- log_position: long (nullable = true)
| |-- change_type: string (nullable = true)
| |-- is_deleted: boolean (nullable = true)
|-- payload: struct (nullable = true)
| |-- field_name_1: field_type_1 (nullable = true)
| |-- field_name_2: field_type_2 (nullable = true)
| |-- ...
source_timestamp
is a time in which a database is Inserted, Updated, or Deleted.source_metadata
will have bin log metadata.change_type
is operation: INSERT, UPDATE-INSERT, UPDATE-DELETE, and DELETE (for MySQL)payload
data of record in the database.- For more detail, you can read here.
- Example:
{
"uuid":"k8h654b3-6569-...",
"read_timestamp":"2023-04-18 23:57:32",
"source_timestamp":"2023-04-18 23:57:31",
"object":"ph_table_test",
"read_method":"mysql-cdc-binlog",
"stream_name":"projects/.../locations/asia-southeast1/streams/mysql-dev-to-gcs-bucket",
"schema_key":"ba0d2c5518da691382...",
"sort_keys":[
"1676433451000",
"mysql-bin.002136",
"80553258"
],
"source_metadata":{
"table":"test_table",
"database":"ph",
"primary_keys":[
"id"
],
"log_file":"mysql-bin.002136",
"log_position":80553258,
"change_type":"INSERT",
"is_deleted":false
},
"payload":{
"id":58727,
"name":"Hung Nguyen",
"status":1,
"created_date":"2023-04-19 07:57:31.000000",
"updated_date":null,
"deleted_at":null
}
}
A few things to keep in mind
- Pay attention to Networking Traffic Ingress/Egress data costs when moving data between regions when using GCS as data storage.
- Consider the transfer only specific tables and schemas feature from data sources to optimize the cost and performance of Datastream.
- Necessary to discuss how long it to store bin logs to avoid filling up the disk in the database source.
- In case there is a problem with the data source or Datastream, leading to the CDC flow error. It is possible to recreate the snapshot, then create a new stream to continue the CDC flow at this point.