Skip to content

Building an ETL pipeline on Google Cloud, A beginner guide to Apache Beam and Dataflow - Part I

Posted on:May 12, 2022 at 12:40 AM

Apache beam and Dataflow

One of the main tasks that a data engineer could be asked to do is building pipelines; batch, or real time data streaming pipelines. Cloud-based solution, or on-premise solution.

Nowadays, tools for building data pipelines are various. A data pipeline could be based on a single tool or technology, or on multiple tools. And one of the trending tools in the recent period in the field of data pipelines is Apache Beam.

In this first part of beam guide, we will be covering some of the basics of apache beam for running a simple pipeline locally. In the upcoming part, we will deep dive in other advanced concepts with another tutorial.

What is apache beam?

If we take a look into the official documentation, we will find that Apache Beam is a unified model for defining both batch and streaming data-parallel processing pipelines. It simplifies the mechanics of large-scale data processing.

A small summary of its history shows that the model behind Beam evolved from a number of internal Google data processing projects, including MapReduce, FlumeJava, and Millwheel. This model was originally known as the “Dataflow Model” and first implemented as Google Cloud Dataflow — including a Java SDK on GitHub for writing pipelines and fully managed service for executing them on Google Cloud Platform. Others in the community began writing extensions, including a Spark Runner, Flink Runner, and Scala SDK. In January 2016, Google and a number of partners submitted the Dataflow Programming Model and SDKs portion as an Apache Incubator Proposal, under the name Apache Beam (unified Batch + strEAM processing). Apache Beam graduated from incubation in December 2016.

If you want to learn in detail the history of beam and how it was built, going through its main concepts, you can check the VLDB 2015 paper , which is a white paper titled as The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing.

How does it work?

As mentioned in the previous paragraph, Beam is used mainly for building data pipelines. And again, according to beam, a pipeline is a user-constructed graph of transformations that defines the desired data processing operations. It encapsulates your entire data processing task, from start to finish. This includes reading input data, transforming that data, and writing output data. So the pipeline is a graph of steps that can take multiple inputs as sources, multiple outputs as destinations, and is composed from a set of transformations that process the data.

Now, a person would ask, how do these transformations work? And before that, how the data is being distributed across the pipeline transformations? Apache beam has 2 other main concepts, which are PCollections and PTransformations.

A PCollection represents a distributed data set that the Beam pipeline operates on. This dataset is composed mainly from an unordered bag of elements that can be bounded; coming from a fixed source of data like a set of files, or a database. Or can be unbounded which can grow over time and its elements are processed as they come. We can summarise the difference between them: the bounded data is finite - batch data, and the unbounded data is infinite, continuous, never-ending data streams with no beginning or end.

On the other hand, a PTransformation represents a data processing operation, or a step, in the pipeline. Every PTransform takes one or more PCollection objects as input, performs a processing function that you provide on the elements of that PCollection, and produces zero or more output PCollection objects.

So our pipeline would like like this: we read our data from a fixed source (let’s assume that we are processing text files that are stored in GCS), the content of these text files are stored in a PCollection, this dataset is being passed to a PTransformation, that applies some processing over the data, and then this PCollection pass the result data to another PCollection that writes the result to an output (let’s say a BigQuery table).

Things are theoretical so far, let’s work on a real example and see how the beam works.

Getting our hands dirty

Beam comes with 3 language SDK: Java, Python, and Go. Java has been the first language released on Beam with a full set of features. Python has been coming after, with limited features at the beginning and an enrichment afterward.

In the following tutorial, we’ll be working with the Python SDK. To keep things simple of this first part of the article, we will only building a pipeline with simple transformations and run it locally. In the upcoming article (part II of this article) we will build a pipeline composed mainly from ParDo transformations and run it on google cloud dataflow.

We are going to work with the public dataset of google cloud bigquery-public-data.covid19_open_data.covid19_open_data where we are going to calculate the total covid19 confirmed cases for every country. The total amount of records is more than 19M recs.

First, let’s create our virtual environment to keep our dependencies separated. python -m virtualenv beam_venv source beam_venv/bin/activate

Now, let’s install apache beam and its dependencies. To do so, you can pick to install apache beam alone, or with its desired environment, such as GCP or AWS. Since we are going to work with GCP, let’s install the GCP version: pip install 'apache-beam[gcp]'

Now that our local environment is set up, we need to access to google cloud resources. For this reason, let’s create a service account, which is a special type of Google account intended to represent a non-human user that needs to authenticate and be authorized to access data in Google APIs

Without going through technical steps, you can check a tutorial to create your service account. Make sure to give the SA BigQuery admin and Dataflow admin roles, so you can access to these resources through this service account.

All done? let’s complete the local set up. After creating that SA, download its key as JSON (IAM and admins > service accounts > keys > generate key for the created service account). After you download the key, in your terminal, let’s specify GOOGLE_APPLICATION_CREDENTIALS file path: export GOOGLE_APPLICATION_CREDENTIALS="path/to/downloaded/json/key"

Now everything is set up locally, let’s create our pipeline.

Core pipeline

The core of the pipeline would look like this:

from  apache_beam.options.pipeline_options  import  PipelineOptions
import  apache_beam  as  beam
import  argparse
import  logging

def  run(argv=None):

	parser = argparse.ArgumentParser()
	parser.add_argument(
	'--param',
	help='param description',
	)

	args, beam_args = parser.parse_known_args(argv)
	pipeline_options = PipelineOptions(beam_args)

	with  beam.Pipeline(options=pipeline_options) as  p:
		# core transformations
	

if  __name__ == '__main__':

	logging.getLogger().setLevel(logging.INFO)
	run()

Specifying input parameters

In our example, we will need 3 parameters: the name of the dataset where our data is saved, the input BQ table, and the output BQ where to save our results.

parser.add_argument(
	'--dataset',
	help='BQ dataset',
	required=True
)

parser.add_argument(
	'--input',
	help='BQ input table',
	required=True
)

parser.add_argument(
	'--output',
	help='BQ output table',
	required=True
)

Reading data

Out input data is a public covid 19 dataset in BigQuery: bigquery-public-data.covid19_open_data.covid19_open_data It contains more than 19M records, and 701 columns. This is a huge dataset, and in the first part of this tutorial, we want to keep things easier, because we’re going to run out pipeline with a local runner, in the part II, where we are going to use the Dataflow runner, we will use the full dataset. That’s why, from this dataset, we’re going to export only 10.000 records with only 2 columns: country_name and new_confirmed and saving it in a BQ table.

covid_pcol = (p| 'Read delta' >> beam.io.ReadFromBigQuery(table=f"{args.dataset}.{args.input}"))

Our BQ table is saved in a pcollection now that we can pass to the next transformation

Processing data

In this simple code snippet, we’re using 2 simple transformations from the apache beam API. Map and CombinePerKey. Map is used for mapping elements, it accepts a function that is applied on PCollection elements. CombinePerKey accepts a function that takes a list of values as an input, and combines them for each key. Map is used for creating a tuple with country as key and number of new cases in every record, and CombinePerKey is combining the number of new cases into a total new cases of that country (it accepts sum as a function)

cases_per_country = (covid_pcol| 'Group' >> beam.Map(lambda  record: (record['country_name'], record['new_confirmed']))
								| 'Combine' >> beam.CombinePerKey(sum)
								| 'Format' >> beam.Map(lambda  rec: {'country': rec[0], 'total_cases': int(rec[1])}))

Writing results to BQ

Once our data is processed, it’s stored in cases_per_country PCollection that is being passed to the WriteToBigQuery PTransformation that writes results to BQ table.

table_schema = "country:STRING,total_cases:INTEGER"
(cases_per_country | 'Writing results to BQ' >> beam.io.WriteToBigQuery(
				f"{args.dataset}.{args.output}",
				schema=table_schema,
				write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
				create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)

Full code source can be found on GitHub

Running the pipeline locally

Apache beam pipelines have several runners: local runner, Dataflow runner, Flink runner, and others.

In this first part of our article, as mentioned before, we will use only a local runner for executing our pipeline, for that, in order to trigger it we execute the following command:

python main.py \
--project GCP_PROJECT \
--temp_location gs://GCS_BUCKET/jobs/tmp \
--dataset "beam_lab" \
--input "covid_input" \
--output "covid_results" 

I hope that this is article was clear to you. In the next part, we will deep dive into the data parallelisation using advanced ParDo PTransformations and Google Dataflow.