Forum

Dataflow Word Count...
 
Notifications
Clear all

Dataflow Word Count Tutorial

1 Posts
1 Users
0 Reactions
9,364 Views
Posts: 108
Topic starter
(@taichi)
Member
Joined: 5 years ago

Introduction

In this tutorial, you'll learn the basics of the Cloud Dataflow service by running a simple example pipeline using the Apache Beam Python SDK. This pipeline will show you the basics of reading a text file from Google Cloud Storage, counting the number of unique words in the file and finally writing the word counts back to Google Cloud Storage.

To see what code we will be running today, you can visit the Apache Beam GitHub repository's example word count.

Dataflow pipelines are either batch (processing bounded input like a file or database table) or streaming (processing unbounded input from a source like Cloud Pub/Sub). The example in this tutorial is a batch pipeline that counts words in a collection of Shakespeare's works.

Before you start, you'll need to check for prerequisites in your Cloud Platform project and perform initial setup.

Project setup

Google Cloud Platform organises resources into projects. This allows you to collect all the related resources for a single application in one place.

Select a project, or

Set up Cloud Dataflow

To use Dataflow, turn on the Cloud Dataflow APIs and open the Cloud Shell.

Turn on Google Cloud APIs

Dataflow processes data in many GCP data stores and messaging services, including BigQuery, Google Cloud Storage and Cloud Pub/Sub. Enable the APIs for these services to take advantage of Dataflow's data processing capabilities.

This will enable Google Cloud APIs.

  • Compute Engine API
  • Dataflow API
  • Cloud Resource Manager API
  • Cloud Logging API
  • Cloud Storage
  • Google Cloud Storage JSON API
  • BigQuery API
  • Cloud Pub/Sub API

Open Cloud Shell

Cloud Shell is a built-in, command-line tool for the console. You're going to use Cloud Shell to deploy your app.

Open Cloud Shell by clicking the   button in the navigation bar in the upper right-hand corner of the console.

Install Cloud Dataflow samples on Cloud Shell

Dataflow runs jobs written using the Apache Beam SDK. To submit jobs to the Dataflow service using Python, your development environment will require Python, the Google Cloud SDK and the Apache Beam SDK for Python. Additionally, Cloud Dataflow uses pip3, Python's package manager, to manage SDK dependencies, and virtualenv to create isolated Python environments.

This tutorial uses a Cloud Shell that has Python and pip3 already installed. If you prefer, you can complete this tutorial on your local machine.

Install virtualenv and activate a Python virtual environment

Install virtualenv version 13.1.0 or above if it is not installed already.

pip3 install --upgrade virtualenv \ --user
 
 

Create a Python virtual environment

python3 -m virtualenv env
 
 

and activate it.

source env/bin/activate
 
 

Download the samples and the Apache Beam SDK for Python using the pip3 command

In order to write a Python Dataflow job, you will first need to download the SDK from the repository.

When you run this command, pip3 will download and install the appropriate version of the Apache Beam SDK.

pip3 install --quiet \ apache-beam[gcp]
 

Run the pip3 command in Cloud Shell.

 

Set up a Cloud Storage bucket

Cloud Dataflow uses Cloud Storage buckets to store output data and cache your pipeline code.

Run gsutil mb

In Cloud Shell, use the command gsutil mb to create a Cloud Storage bucket.

gsutil mb \ gs://angular-theorem-281823
 
For more information about the gsutil tool, see the documentation.

 

Create and launch a pipeline

In Cloud Dataflow, data processing work is represented by a pipeline. A pipeline reads input data, performs transformations on that data and then produces output data. A pipeline's transformations might include filtering, grouping, comparing or joining data.

The code for this example is located in the Apache Beam GitHub repository.

Launch your pipeline on the Dataflow service

Use Python to launch your pipeline on the Cloud Dataflow service. The running pipeline is referred to as a job.

python3 -m \ apache_beam.examples.wordcount \ --project \ angular-theorem-281823 \ --runner DataflowRunner \ --temp_location \ gs://angular-theorem-281823/temp \ --output \ gs://angular-theorem-281823/results/output \ --job_name dataflow-intro \ --region us-central1
 
  • project is the GCP project.

  • runner is the specific execution engine used to run your pipeline. The DataflowRunner uses the Dataflow service as the execution engine.

  • temp_location is the storage bucket that Cloud Dataflow will use for the binaries and other data for running your pipeline. This location can be shared across multiple jobs.

  • output is the bucket used by the Word Count example to store the job results.

  • job_name is a user-given unique identifier. Only one job may be executed with the same name.

  • region specifies a regional endpoint for deploying your Dataflow jobs.

Your job is running

Congratulations! Your binary is now staged to the storage bucket that you created earlier, and Compute Engine instances are being created. Cloud Dataflow will split up your input file such that your data can be processed by multiple machines in parallel.

You can move to the next section when you see the 'JOB_STATE_RUNNING' message in the console.

Monitor your job

Check the progress of your pipeline on the Cloud Dataflow page.

Go to the Cloud Dataflow Monitoring UI page

If you haven't already, navigate to the Cloud Dataflow Monitoring UI page.

Open the  on the left-hand side of the console.

Then select the Dataflow section.

Dataflow

Select your job

Click on the job name 'dataflow-intro' to view its details.

Explore pipeline details and metrics

Explore the pipeline on the left-hand side and the job information on the right-hand side. To see detailed job status, click . Try clicking a step in the pipeline to view its metrics.

As your job finishes, you'll see the job status change and the Compute Engine instances used by the job will stop automatically.

Note: When you see the 'JOB_STATE_DONE' message, you can close Cloud Shell.

View your output

Now that your job has been run, you can explore the output files in Cloud Storage.

Go to the Cloud Storage page

Open the  on the left-hand side of the console.

Then select the Storage section and click on Browser. You can verify that you are on the correct screen if you can see your previously created GCS bucket 'angular-theorem-281823'.

 

Storage

 

Go to the storage bucket

In the list of buckets, select the bucket that you created earlier. If you used the suggested name, it will be named angular-theorem-281823.

The bucket contains a 'results' folder and 'temp' folders. Dataflow saves the output in shards, so your bucket will contain several output files in the 'results' folder.

The 'temp' folder is for staging binaries needed by the workers, and for temporary files needed by the job execution.

 

Clean up

In order to prevent being charged for Cloud Storage usage, delete the bucket that you created.

Go back to the buckets browser

Click the  link.

Select the bucket

Tick the box next to the bucket that you created.

Delete the bucket

Click  and confirm your deletion.

 

Conclusion

 

How do you rate this tutorial?

Here's what you can do next:

Set up your local environment:

Share: