Forecast Sales System for Airline - Part IV

Executive summary

Project's goals gave our team a task regulary migrate data from one database to another, from one source of the data to a different one.

Some numbers:

~70-100 Gb of data, 100 000 000 records per table, a few days to migrate 5% of the data

During work on the project several approaches had been used.

Read Kafka by specific criteria. A separate service read the whole dataset stored in the Kafka and via filter on the client side process only required segment of the data. Also it becomes possible to avoid block of the main pipeline.

Migrate data from CSV to Postgres database. A whole pipeline was built to support regular migration of data from CSV in different formats to to a set of database's tables in the format currently confirmed in the specification.

Migrate data from Postgres to Posgress database. Dumping and restoring database is a known task, but when target database already has some data another approach should be used. A granular migrations of data across Postgres databases has caveats which were almost successfully overcomed.

Implementation challenges

CSV to Database pipeline

A part of the data had been requested for import were in csv files with size of near 100 Gb. Furthermore, this csv files agregated business data for different years where each year had its own format of the data. Therefore requirements were find out the cost effective way to import data of such size and such diversity of structure with potential future request to scale it further.

Decision made earlier by my team member is to leverage our data pipeline (Kafka -> Message Queue -> Postgres) to process such data. In this way migration module should read datasource (csv files), support variety of formats and prepare dataset to the current confirmed specification and passed to the Kafka where the rest of the system will do processing and import into the database.

In some situations only a part of the data (~5% of the total) were required. To get such segment of data awk command were used to filter necessary records by specific criteria.

Postgres-to-Postgres migration

In case of Postgres to Postgres migration it is possible to use backup and restore commands which gives dump of the database in the binary format.

                    $ pg_dump -U myuser -h localhost -F c -b -v -f mydb_backup.backup mydatabase
                
                    pg_restore -U myuser -h localhost -d mydatabase -v mydb_backup.backup
                

In case the target database already has data we will need a different way.

It is possible to query data by condition and store the result into csv which later would be used to import data back to database. That's how it would like:

                    COPY (SELECT * FROM table_name WHERE condition) TO '/path/to/output.csv' WITH CSV;
                
                    COPY table_name FROM '/path/to/output.csv' WITH CSV;
                
In case a database is hosted on the remote server, the command will be a bit different:
                    psql -h remote_host -d database_name -U username -c "\COPY table_name FROM '/path/to/output.csv' WITH CSV;"
                

In PostgreSQL, the most common way to auto-generate the next ID for a table is by using a serial data type or an identity column. Due migration this sequences value should be keept in sync with recently imported data.

Basically we have to take MAX(id) of the table and update associated sequence according to it:

                    SELECT setval('your_sequence_name', COALESCE((SELECT MAX(id) FROM your_table_name), 1));
                

Address space of primary and foreign keys of data which is going to be imported could overlap ids of records which are already stored in the database. In this way ids should be adjusted to avoid such clash.

Max id of each table should be taken from the target database and records which are going to be imported should be updated by value which is diff between this MAX id and MIN id plus current id of the current record. This tedios work could be done either over script code sql (preferable) or with help of awk command which might be an option for some cases.

Reading Kafka's data by segment pipeline

With obtaining more history data in Kafka it becomes possible to evolve migration process further.

The solution offered by my colleague was to use a separate group to read Kafka history data. Kafka offers to reset data queue to start read data since the beginning and a separate group would not block the main pipline. The filter on the consumer's side will speed up the process and process only required data.

The important point is Kafka allows parallel reading of data only by partition: one partition - one consumer. To support parallel read of the data number of partitions should be equal or more to a number of consumers.
Another valuable point is we can add new partitions at any time, but the data already has been uploaded in Kafka would not be rebalanced. Only data which has been upload after the point when new partition were added would be available for read from all this partitions.

Existing ETL solutions

All of this sounds like as a good fit for existing ETL solutions.

Indeed it is. Quick research of existing solutions on the market shown Apache NiFi as a promisng candidate. However, isolated network of the customer makes its application limited. Its potential should be explored in future projects.



Related publications: