Efficient Data Engineering

A typical data engineering problem, often referred to as extract, transform and load (ETL), consists of the following:

  1. take data in one place (extract)
  2. change its form (transform)
  3. move it to a new place, in this new form (load)

This process gets interesting when data volumes are large, and you have to consider performance. Long turnaround time (e.g., a run taking several hours or days) makes the typical serially iterative software engineering approach inefficient. In this article, we offer some tips on re-structuring the software engineering process and leveraging the cloud to make iteration more efficient.

The tips below may not look novel to you; they are well-established good engineering practices. However, reflecting on them in this context will help you approach data engineering problems with a fresh perspective.

The Structure of a Data Engineering Problem

Each data engineering problem has unique challenges, but they all share some basic structure. Here is a description of a typical problem. Yours may be different, but I'm sure you can recognize some patterns.

A data pipeline looks like this figure:

Data pipeline process

The steps — Parsing/Mapping, Cleaning, and Merging — will be different based on what you are trying to do. You may also have additional steps, but this will do for our example. We will also make the following assumptions, typical of most data engineering problems:

  • The amount of data to be processed is large: 100s of millions of records
  • The time needed to process the data is non-trivial: each run can take hours
  • There is a non-trivial complexity in some of the steps
  • The requirements will change (e.g., the data format, or what Cleaning means will change)
  • The pipeline needs to be run periodically (e.g., nightly)
  • There is a requirement on the processing time (e.g., < 2 hours)

As always, you will be going through the typical software development cycle -- writing some code, testing it, finding bugs, doing it again, and again. If the run fails after two hours, we will only then make a code change and re-run the process. Because of this scale, it will take a long time to get through even a small number of consecutive changes. You could, of course, always batch a bunch of changes together, but we also know from experience that that makes identifying problems harder. How do we speed up this process?

Let's look at some patterns that can help.

Break it Down

It's important to do this step correctly. It is not enough to recognize that a separate piece of code will be responsible for, for example, the cleaning versus the merging phase. The goal of the break-down is to be able to repeatedly execute one phase only.

Say you want to focus on the cleaning phase. Where is the input data? How do you know the output is correct? Is it fast enough? To help work on the cleaning phase alone without worrying about the parsing/mapping or merging phases, you need to:

  • Define how the input data looks: format, schema, volumes, value distributions, etc.
  • Create and store a data set that conforms to the above. You will need to run your step on some input. You can create a dataset by running the previous steps (if possible) or making one up.
  • Define where the data resides. It can be on a local disk, S3, or database, but your process needs to consume data from a given location. Make it as flexible as practicable (e.g., S3 or local file and pass a URL to the process).
  • Define how the output of your step should look. Again: format, schema, etc.
  • Define where the output should be stored. Similar to input, make it as flexible as practicable and pass the URL to the step. Note that your step should be able to store the output in a place where it can be easily accessed for verification (e.g., a local file, S3, or database, and not memory).
  • Define the process for verifying correctness of the output. For example, diff with output from the previous run. Remember, you are dealing with large data sets, so whatever method you have, it is not manual! Make sure that your checks do not produce a lot of noise (e.g., false positives).

Here is an example how your cleanup step may look (as a CLI).
python cleanup_step.py --input file://... --output file://... --other-options ...
Think of defining each step in these terms. You can even build a dummy implementation of each step to start with. If you follow this approach, you'll be able to iterate on one step only, and you'd be well positioned to take advantage of some of the patterns below.

It is important that each step does not include any other hidden dependencies. An easy trap to fall into is a cross-walk or mapping file or even a database table. If you need a mapping file or access to a service, pass the URL of the file or the service as a parameter.

What we have done here is defined the “API” for each step. Our API differs from traditional ones in one important way: Unlike APIs implemented as libraries or REST services, our specification of the API persists data that is to be passed in as well as the resulting output.

By defining each task in your pipeline this way, you have also implemented a very important separation of concerns between pipeline orchestration and the tasks. Thus, a simple version of your complete pipeline can look like
#!/bin/bash

python parse_step.py --input source_file.csv --ouput parsed_input.json
python cleanup_step.py --input parsed_input.json --output cleaned_input.json
...

Get the Data

You need the data. There is no way around this. If you are not able to get the data, your project will not work—period. However, you may not be able to get the actual data right away, and you can certainly mitigate that.

If you can get representative data, just not in realistic volumes, that's a good start. You can always duplicate or extrapolate to generate more volume.

If you know the shape of the input data, but can't get the actual data, make it up. There are a bunch of tools out there (e.g., mockaroo and faker) to help. Invest in creating the data as realistic as possible/reasonable. I suggest generating a name that looks like a name (e.g., Bob) for a name field, instead of a random string (e.g., XUJHYWE). Why? It's way easier to look at when testing things. It will also allow you to get better distributions in terms of value lengths as well as duplicates.

If you have the actual data, develop a way to sample a small fraction of it. You don't want to always be running things on a full dataset. You can start with sometime simple by just taking the first 1000 (or whatever) rows of the data, but you should be able to generate different samples. Make sure that your sample is representative (e.g., if you are merging two files, that both samples have some records that should be merged).

Develop Locally

If you followed the first two patterns, you can run each step locally. This means that you can be confident that your step is correct. In other words, for representative input, it produces expected output. Your step will run quickly on a small set of data and both input and output will be in a local file.

Your approach may require a better framework than just running a single threaded Python script. For example, you may leverage a Dask cluster, Hadoop, or any other platform out there to improve performance. You may have decided on the technology already, but one consideration should always be the ease to execute the job locally. Both Dask and mrjob make it easy to run processes designed for clusters locally. If you have a choice, pick a tool that allows you to do so.

Advantages of being able to run the job locally are:

  • You can use an interactive debugger
  • You don't need network access to resources

If you dismiss the second point by saying you always have network access, think of the hours you've spent debugging permissions and other access problems, not to mention that reading a local file is way faster than reading a file from S3.

If you are getting data from a database, spin one up locally using Docker, and load it with a reasonable sample of data.

Ok, you've run your step locally and you are confident that it is correct. You may be tempted to just run it on the entire data set. You fire it up and wait.... You wait for 10 min, 20 min. How much longer should you wait? This is not productive. While you are waiting for you run to finish, read on.

Measure Performance

You are moving large amounts of data. You will run into performance issues. With a little forward thinking, you can:

  • Anticipate performance issues without observing one
  • Identify areas you should invest in
  • Measure progress, i.e., the impact of your changes

Anticipating problems is not a very sophisticated prediction algorithm. You only want to know if you have a problem or not, and a rough magnitude of the problem.

First, you should instrument your code to make sure you can capture how much time it took. Simply logging elapsed time seconds will suffice in most cases. Make sure it is easy to capture this information programmatically. For example, a log message in a file like:
{ "job": "1234", "step": "cleanup", "time": 23.43 }
Is much better than
Done in 23.43 seconds.
Note: Use Python's logging library for this, as this will make it easier later. Don't use print.

Now that you can capture the processing time, run your step with a sample of 100, 500, and 1000 records. This will tell you whether you are scaling linearly or not. If you are scaling linearly, simply scale the processing time to the actual number of records you anticipate, and see if it is good enough.

For example, you run your tests and get:

blob-11

If you plot this, it's easy to see that you are scaling (roughly) linearly.

blob-1-1


If you need to process 10,000,000 records, it will take about 20 hours. This may or may not be good enough, but you have a general idea. Note: It probably took you way less than 20 hours to find this out!

Suppose, however, that you get the following results:

blob-2-1
 
This does not look linear!

blob-3

In other words, you have a problem. When you are scaling linearly, you can just make things more efficient (e.g., use a faster JSON parsing library, or use caching, etc.). You can also leverage a parallel infrastructure (i.e., run more processes, machines, etc.). However, if you are scaling super-linearly, you will likely need to rethink your algorithm. Better to know this sooner than later.

A note on what not to do: Do not optimize a piece of code that does not make a significant impact. It is tempting to optimize a step by doing something clever, just because you can. For example: you project that your entire pipeline will take 10 hours to complete. One of your steps takes 30 minutes, and you can reduce this time by 90% if you apply this clever algorithm. Do the math: you will spend 1 day saving 90% * 0.5 hour = 27 minutes of your processing time. If you improve a 5-hour step by 20%, you will spend 1 day saving 20% * 5 hours = 60 minutes. Less glamorous, but actually time better spent. Additionally, your 90% optimization may add risk and instability to the system. Is it worth it? Sometimes it is, and sometimes it is not.

Parallelize Experiments

So far, we've discussed good engineering practices, but have have not quantified the resulting time savings. However, these practices have positioned us to be able to accelerate further development by leveraging the cloud. This is a point where you probably need to change your perspective on looking at the problem, so let's break it down.

Make it EasY to Run in the Cloud

This is step one. You should be able to easily run a test of your entire pipeline or an individual task in the cloud. This can mean few different things, and the details will depend on your specific tasks. To make discussion more concrete, say you your parse task runs as an EMR job, and your cleanup task is a simple python script. To run these in the could, you need to:

  • Input and output data for each task accessible to wherever you task will run. S3 is the easiest option.
  • Run each task on a cloud resource. For EMR, you can just specify a cluster, for a python script, you can use Jenkins, or package the script in a Docker container and run it using ECS. There is also AWS Batch.
  • Run the orchestration in the cloud. Again, Airflow, Jenkins, or AWS batch are good options.
Additionally, you need to run two (or more) of these at the same time without interference:
  • Each run should use different location for intermediate results (task outputs/inputs) and outputs.
  • Each run should produce separate logs for performance measurement.
  • Each run should be able to run different code, e.g. if you are testing algorithm improvement.

You must be able to pass all of these are parameters to the job. This you can easily do as you have already followed the tips in the beginning of the article.

Start Small, Run in Parallel

We are finally ready to run some experiments. Suppose you want to improve the performance and you have identified three things you can fix. Do they make an impact? Before running full-scale experiments, implement each separately (say, on a branch) and run them on a small subset of data. You can run all the experiments in parallel: you made it easy to run in the cloud, and you made it easy to specify where input data is (can be the same) and where output should go, so kick off all your experiments in parallel.

Say you have 10 ideas to try. Each takes 12 minutes to run. You can spend two hours running them, or you can have all 10 results in 12 minutes. This will allow you to weed out those that are not promising for full-scale runs.

Do the Math

You are now ready to run some full-scale experiments. Say you want to run one of your steps on an EMR cluster and you estimate it will cost $200 to run and take five hours. You want to test three different changes to see that 1) they run correctly, and 2) they improve performance. The changes may be code changes or cluster configuration changes (instance type vs. number of instances).

You can run each step one after another. If you start at 9am, you will have a long day, and see the results by next morning.

  • Elapsed time to compare results: 24 hours (from 9am to 9am the next day)
  • EMR cost: $600
Instead, you decide to kick off three EMR jobs on three separate clusters at the same time.
  • Elapsed time to compare results: 5 hours
  • EMR cost: $600

You just got your results 19 hours sooner for the same cost!

Summary

We discussed several good engineering practices that ultimately enabled you to iterate quickly by partitioning the problem and the data you are operating on. Moreover, when operating in the cloud, the same practices enabled you to parallelize your experiments.

With the ability to spin up virtually unlimited computational resources when needed, we should not only think of parallelization to reduce processing time in production, but also reducing development time. Don't wait on experiments to finish. Keep working in parallel — your development time is (for now) more expensive that computation time.
Jakov Kucan, VP, Data & Platform Engineering
Prior to Manifold, Jakov was Chief Architect at Kyruus as well as Director of Product Strategy at PTC Mathcad. He is a skilled architect and engineer, able to see through the details of implementations, keep track of the dependencies within a large design, and communicate the vision and ideas to both technical and non-technical audience.