Hey Readers, I am a Data Science Student and recently I have started learning more about Data Engineering. Data Science and Data Engineering teams co-exist in all data organisations. The Data Scientists are responsible for getting insights from data and applying machine learning on data, while the Data Engineers help the Data Scientist get access to the data and make their models run over huge datasets.
Though a Data Scientist is not expected to master Data Engineering but some small efforts in understanding the concepts can help us in great ways.
In this post, I will explain how I self taught myself Data Engineering in last few weeks.
If you are just getting started with Data Engineering, it might be worth checking my blogs on SQL basics and getting started with Spark and Zeppelin.
Once I started with basic SQL and learned about Spark on Amazon EMR, I decided to create a hypothetical Big Data ETL Project for myself. Working on project from a scratch can be enormously beneficial since it teaches us the minute details of the concepts and also forces us to think about the bigger picture and integrations of multiple tools for a complete project.
Here is my project idea to get started with Big Data concepts and ETL.
- We would like to generate synthetic log data on hourly/daily basis to emulate a production scenario.
- We would want to create a library of utility function that can be utilised by all future ETLs.
- We would like to create a small Data Lake on S3 and apply the Big Data optimisation techniques on our tables and datasets.
- We would like to run multiple Big Data ETL on Amazon EMR
- ETL 1 : Daily/Hourly injection job that cleans the raw logs on S3, extracts required fields and creates an incremental table.
- ETL2: Daily/Hourly summary job that reads from the clean table from ETL 1 to create an aggregated table that would be updated daily/hourly.
- We would like to use Hive and Spark-SQL for our ETL work loads.
Objectives and Outcomes
The project is designed to cover some key concepts of Big Data. By the end of this project we aim at learning and getting better with the following concepts :
- External Tables
- Optimisations via file formats
- Optimisations via Data partitioning
- Raw data processing
- Daily jobs and incremental ETLs
- Data Lake
- Boto and other python utilities to communicate with S3 and EMR cluster
- Amazon AWS
Code walk through
The best way to explain all the concepts is to walk through the code and understand how the concepts work. All the code discussed below can be found on my Github repo here.
This is a project configuration file where you can add the config like the path to pem file and the ip address of master node of EMR cluster. It’s a good practice to keep configurations in a separate file outside the main code base.
Creating a utility functions is a best practices as it makes your code more readable and you can re use utility functions for other ETL problems. Here I have created my collection of utility functions for our problem statements.
Another motivation behind breaking all utilities into separate files and functions is that, in future I am planning to migrate the whole project to Apache Airflow, which is a great scheduler for ETL pipelines. Each utility function can be seen as an Airflow Operator that would be performing a specific task. We will discuss on Apache Airflow in another post, Stay tuned.
System Utility function
In this I have written different functions like :
- execute_local() to execute a command on console via python script.
- del_local_file() to delete the file from local system.
- read_file() to read the content of the file by passing the path of file.
- generate_uuid() to create a random UUID generator
SSH Utility function
In this I have written a function called execute_remote() to log into a remote machine and execute commands. This functions expects the following parameters –
- path to AWS pem file(required for ssh)
- ip_address of master node of EMR cluster
- user name (typically ‘hadoop’ for EMR cluster)
- command to execute.
We will use this function to execute our Hive/Spark-SQL ETL script on EMR cluster.
S3 Utility function
In this I have written multiple functions for working with Amazon S3 . We are using Boto library for all our tasks because of its simplicity of use.
- split_s3_path() to separate bucket and key path from S3 path as Boto library expects the bucket name and path prefix separately.
- copy_to_s3() will copy the file to s3 path by passing destination path(s3 path) and file to copy.
- upload_file() will create a new file on S3 with the given content (I should probably rename it to create file ).
- delete_file() will delete the file content in s3 path by passing the s3 path to it.
EMR Utility function
This script would help us to run a Hive ETL on EMR cluster. This script is a wrapper around the ssh utils and s3 utils. This script :
- Takes a local file path to a Hive ETL script
- copies the script on S3
- SSH’s into master node of EMR cluster
- Execute the script
- Delete the script from S3
In this, I have defined a function called run_on_hive() which run and execute my script on hive by passing following parameter:
- Path of script to execute
- Path of pem file
- IP address of master node of EMR cluster
- Username (typically ‘hadoop’ for EMR cluster)
The ETL script is copied on S3 to make it available to master node. Alternatively we could have copied it to master node for the same.
Note : The same code can be used to run the ETL on Spark-SQL because Spark fully supports the Hive SQL format. Eg.
# run script on hive
$ hive –hiveconf RUN_DATE=yyyy-mm-dd -f <filename.hql>
# run same script on spark-sql
$ spark-sql –hiveconf RUN_DATE=yyyy-mm-dd -f <filename.hql>
Raw Data Processing
To start with Data Engineering, we need raw data .Raw data is not always available in hand, so here we will generate our own data to start ETL . Recently I have blogged on how to generate log data you can go through it.I have utilised an open source code to generate fake synthetic log data.
Here I have defined multiple functions for multiple jobs like :
- generate_data() which generally generate log data by using execute_local() defined in system utility function.
- run() will copy your log data from local to S3 path by passing location and current date. It will then delete the file from local path once the data is copied to S3.
This utility emulates the production scenario. We can run this script multiple times to achieved the desire rate of data ingestion.
Finally, it will generate multiple raw data based on current date.
This the most important part of the post where we create ETL for our use cases, and explore the Big data concepts.
Data Ingestion Job
Data ingestion is a process by which data is moved from one or more sources to a destination where it can be stored and further analysed.
Here I have written sql script to create a clean data table from raw data partitioned by date. After creating clean data table, we will delete the raw data. Have a look at the code below, and then we will discussed key concepts.
Key concepts in ETL:
Run date :
Here we are passing run date to prepare data for a particular date. Making the run date parameterised will enable us to trigger the same ETL for different dates independently without making any code changes. It would also help us for past data backfill if we have to refresh data for few dates without affecting other data in the table. This provides us the base for the incremental ETL.
External Table :
External table help us achieved our Data Lake dream. The idea of a Data Lake is to have all our data in a centralised location, and all other system operate on this data for their respective task like ETL or Machine Learning.
- Here we are creating external table with data saved on S3.
- Added advantage of external tables is that if we drop the table it will not delete the data from S3.
- Another advantage of external table is that multiple cluster and Big data engines (like Presto, Spark, Athena) can all work on data in parallel.
Data Partitioning :
Data partitioning is a very powerful optimisation technique. Data partitioning divides the data physically into sub directories and files based on a column value, such that, if we need to query data for a particular value, Hive will skip scanning any other data file with a different value. This helps us to skip a huge volume of data scan and we only work on subset of data that we care about. This helps in making the query very fast.Here we are creating our table partitioned by date so if we want data of a particular date we don’t have to refresh all the data.
Optimised file format :
The right choice of file format brings a huge performance improvements in all future queries on the table.There are multiple file formats that we can choose from. The most popular once are Parquet, ORC and Avro. Here we have stored our table as parquet because
- It is columnar storage, which means if we query a single column in a table with 100 columns, it won’t scan the other 99 columns. Hence, the query will be very fast.
- Data is compressed unlike the text format files. Also uses different encoding technique for better compression.
- Supports nested data.
- Ability to add new columns to table in future
Insert overwrite partition :
Insert overwrite partitioned is a simple way to populate/refresh data only for the partitioned in concern without affecting other data in the table. This works great with our incremental approach where we will add data to the production tables which is partitioned by date. The new data will get appended to the table daily via the ETL.
Note: I am using a lot of string splits for the data cleanup. This is done via a regex to read the data straight from raw log location via HiveRegexSerDe.
Job trigger Script
We have created a python wrapper called job.py for each module. This is to keep our design future proof.
- Will be helpful if more ETL scripts are added to our module. We keep orchestrating new tasks in python.
- This python script can also be schedule to run hourly/daily as required.
- In future I am planning to move project on Airflow scheduler, and the job.py can be converted to Airflow dag very easily.
To run the SQL script of clean data table, we need to use run_on_hive() defined in EMR utility function by passing the path of script file, run_date, path of your pem file and ip address of master node of EMR cluster.
Summary Table Job
Summary tables are very important for Data Engineering. The idea behind summary tables is to identify the most common aggregation queries being run on the table, and to create a new ETL job for it.By this we don’t have to run the same query numerous time and the ETL provides us the data pre computed.This save both time and money for the business.
This is our final step of our Big data ETL project where we create sql script of summary table and run this script on job file.
Here I have created an external summary table having URI (Uniform Resource Identifier)and count of URI partitioned by date.
Job trigger Script
To run the SQL script of summary table, we need to use run_on_hive() defined in EMR utility function by passing the path of script file, run_date, path of your pem file and ip address of master node of EMR cluster.
The next steps for this project that I have planned are :
- Move the repo to Airflow scheduler and create Airflow dag for each job.
- Working on temporary/ephemeral cluster so that the job can create and kill cluster.
Stay tuned for future post on these.
That’s all for this project and I have learned a lot from this project about Big data concepts, Hive/Spark-SQl, Amazon EMR and AWS S3 . I would recommend everyone to work on similar project of your own or clone this repo and try out the example mention here.