Introduction
In this tutorial, you'll learn the basics of the Cloud Dataflow service by running a simple example pipeline using the Apache Beam Python SDK. This pipeline will show you the basics of reading a text file from Google Cloud Storage, counting the number of unique words in the file and finally writing the word counts back to Google Cloud Storage.
To see what code we will be running today, you can visit the Apache Beam GitHub repository's example word count.
Dataflow pipelines are either batch (processing bounded input like a file or database table) or streaming (processing unbounded input from a source like Cloud Pub/Sub). The example in this tutorial is a batch pipeline that counts words in a collection of Shakespeare's works.
Before you start, you'll need to check for prerequisites in your Cloud Platform project and perform initial setup.
Project setup
Google Cloud Platform organises resources into projects. This allows you to collect all the related resources for a single application in one place.
Select a project, or
Set up Cloud Dataflow
To use Dataflow, turn on the Cloud Dataflow APIs and open the Cloud Shell.
Turn on Google Cloud APIs
Dataflow processes data in many GCP data stores and messaging services, including BigQuery, Google Cloud Storage and Cloud Pub/Sub. Enable the APIs for these services to take advantage of Dataflow's data processing capabilities.
This will enable Google Cloud APIs.
- Compute Engine API
- Dataflow API
- Cloud Resource Manager API
- Cloud Logging API
- Cloud Storage
- Google Cloud Storage JSON API
- BigQuery API
- Cloud Pub/Sub API
Open Cloud Shell
Cloud Shell is a built-in, command-line tool for the console. You're going to use Cloud Shell to deploy your app.
Open Cloud Shell by clicking the button in the navigation bar in the upper right-hand corner of the console.
Install Cloud Dataflow samples on Cloud Shell
Dataflow runs jobs written using the Apache Beam SDK. To submit jobs to the Dataflow service using Python, your development environment will require Python, the Google Cloud SDK and the Apache Beam SDK for Python. Additionally, Cloud Dataflow uses pip3, Python's package manager, to manage SDK dependencies, and virtualenv to create isolated Python environments.
This tutorial uses a Cloud Shell that has Python and pip3 already installed. If you prefer, you can complete this tutorial on your local machine.
Install virtualenv and activate a Python virtual environment
Install virtualenv version 13.1.0 or above if it is not installed already.
Create a Python virtual environment
and activate it.
Download the samples and the Apache Beam SDK for Python using the pip3 command
In order to write a Python Dataflow job, you will first need to download the SDK from the repository.
When you run this command, pip3 will download and install the appropriate version of the Apache Beam SDK.
Run the pip3 command in Cloud Shell.
Set up a Cloud Storage bucket
Cloud Dataflow uses Cloud Storage buckets to store output data and cache your pipeline code.
Run gsutil mb
In Cloud Shell, use the command gsutil mb
to create a Cloud Storage bucket.
Create and launch a pipeline
In Cloud Dataflow, data processing work is represented by a pipeline. A pipeline reads input data, performs transformations on that data and then produces output data. A pipeline's transformations might include filtering, grouping, comparing or joining data.
The code for this example is located in the Apache Beam GitHub repository.
Launch your pipeline on the Dataflow service
Use Python to launch your pipeline on the Cloud Dataflow service. The running pipeline is referred to as a job.
-
project
is the GCP project. -
runner
is the specific execution engine used to run your pipeline. The DataflowRunner uses the Dataflow service as the execution engine. -
temp_location
is the storage bucket that Cloud Dataflow will use for the binaries and other data for running your pipeline. This location can be shared across multiple jobs. -
output
is the bucket used by the Word Count example to store the job results. -
job_name
is a user-given unique identifier. Only one job may be executed with the same name. -
region
specifies a regional endpoint for deploying your Dataflow jobs.
Your job is running
Congratulations! Your binary is now staged to the storage bucket that you created earlier, and Compute Engine instances are being created. Cloud Dataflow will split up your input file such that your data can be processed by multiple machines in parallel.
You can move to the next section when you see the 'JOB_STATE_RUNNING' message in the console.
Monitor your job
Check the progress of your pipeline on the Cloud Dataflow page.
Go to the Cloud Dataflow Monitoring UI page
If you haven't already, navigate to the Cloud Dataflow Monitoring UI page.
Open the on the left-hand side of the console.
Then select the Dataflow section.
Select your job
Click on the job name 'dataflow-intro' to view its details.
Explore pipeline details and metrics
Explore the pipeline on the left-hand side and the job information on the right-hand side. To see detailed job status, click . Try clicking a step in the pipeline to view its metrics.
As your job finishes, you'll see the job status change and the Compute Engine instances used by the job will stop automatically.
Note: When you see the 'JOB_STATE_DONE' message, you can close Cloud Shell.
View your output
Now that your job has been run, you can explore the output files in Cloud Storage.
Go to the Cloud Storage page
Open the on the left-hand side of the console.
Then select the Storage section and click on Browser. You can verify that you are on the correct screen if you can see your previously created GCS bucket 'angular-theorem-281823'.
Go to the storage bucket
In the list of buckets, select the bucket that you created earlier. If you used the suggested name, it will be named angular-theorem-281823
.
The bucket contains a 'results' folder and 'temp' folders. Dataflow saves the output in shards, so your bucket will contain several output files in the 'results' folder.
The 'temp' folder is for staging binaries needed by the workers, and for temporary files needed by the job execution.
Clean up
In order to prevent being charged for Cloud Storage usage, delete the bucket that you created.
Go back to the buckets browser
Click the link.
Select the bucket
Tick the box next to the bucket that you created.
Delete the bucket
Click and confirm your deletion.
Conclusion
Here's what you can do next:
Set up your local environment: