How to automate replication of MySQL data to AWS Redshift?

We’ll use Rails framework and a Java service for this task. Ruby on Rails, or Rails, is a server-side web application framework written in Ruby under the MIT License.

Now before I explain, how to automate the DDL (Data Definition Language) statements, let me explain the data flow.

Data Pipeline

MySQL -> Kafka -> AWS S3 -> AWS Redshift

So let’s break it in steps:

1. MySQL -> Kafka
We’ll use a Java service to read MySQL binary logs and push data into Kafka.
This open source Java library, developed by Stanley Shyiko comes in handy.

Why we use Kafka?
One of the many uses of Kafka is stream processing. Kafka acts as a buffer between RDS (MySQL) and our OLAP (Online Analytical Processing System) i.e. AWS Redshift.

2. Kafka -> AWS S3
There are many Kafka clients for Ruby. But in this example we’ll be using ruby-kafka client. For this you have to install ruby-kafka gem, using the following command:

gem install ruby-kafka 

Now, to consume messages from Kafka topic, we’ll do

require "kafka" 
 
kafka = Kafka.new( [server-ip], 
sasl_scram_username: 'username', 
sasl_scram_password: 'password') 
 
consumer = kafka.consumer(group_id: "my_group") 
consumer.subscribe("my_topic") 
 
consumer.each_message do |message|  
	puts "#{message.topic}, #{message.partition}, 
           #{message.offset},  #{message.key}, #{message.value}" 
end 

We’ll use text/CSV format to save data into S3. And each table will have their own S3 folder with subdirectories in chronological order. To copy data to S3 we’ll use ‘aws-sdk-s3’ gem.

require 'aws-sdk-s3' 
 
target_bucket_name = '*** Provide bucket name ***' 
target_key = '*** Provide target key ***' 
 
begin   
	s3 = Aws::S3::Client.new(region: 'us-west-2')
    s3_object = s3.bucket('target_bucket_name').object(target_key)
	s3_object.put(body: 'data')
rescue StandardError => ex   
	puts 'Caught exception copying object to bucket ' + 
          target_bucket_name + ' as ' + target_key + ':'   
	puts ex.message' 
end 
 
puts 'Copied to bucket ' + target_bucket_name + ' as ' + target_key 

Why we use AWS S3?
AWS S3 is a storage service offered by Amazon. It stands for Simple Storage Service and provides cloud storage for various types of web development applications. S3 is easily scalable and as per AWS S3, guarantees 99.9% of monthly uptime. As a result it serves best as a backup and recovery service.

3. S3 -> Redshift
For final step all you need is AWS Redshift copy command to copy data from S3 to Redshift.

How to manipulate schema and manage DDL statements?
We’ll read MySQL binary logs using the aforementioned Java library and push each ALTER statement with an ALTER flag (so that we can identify it while consuming Kafka logs) into Kafka.
Now for consuming logs and for copying them into Redshift you need to maintain your table configurations to specify the column order to be used in copy command.
We’ll move the configurations (table schema) to meta database as JSON files.

ETL Process

So with each DDL statement, you encounter while consuming Kafka logs, your configuration needs to be updated. Now you can get the table schema from MySQL table “information_schema.columns”. You can dump this schema on server (where your Ruby code is running) into a text file. Now you have to parse this schema (a bit of String manipulation) and develop your table configuration(column order) to be later used for copying s3 file data into redshift (same column order is used to push data into s3).

So for example, if Kafka logs are like:
ADD 1, Name1, 100, Delhi
ADD 2, Name2, 200, Bangalore
ADD 3, Name3, 300, Mumbai
ALTER ALTER TABLE data ADD COLUMN newcolumn varchar(100)
ADD 4, Name4, 400 Delhi, India gate

Now consumer while reading(consuming) kafka logs, for first three lines, will just write the columnar data(of table) to s3. But when it encounters fourth line i.e. ALTER statement, it will update and reload the table configuration and now will read(consume) data according to new configuration. A similar ALTER flag will also be written to s3 file, so that while copying data(iteratively) from s3 to Redshift, latest(respective) configuration can be loaded.

However to facilitate backfilling of data you can maintain versions of configuration (MySQL schema) and add the version number to your S3 files. So that for each folder(table) you select the respected version.

Leave a Reply

Your email address will not be published. Required fields are marked *