Demonstrate a Dataflow pipeline that saves data from an API into BigQuery table

Overview

Overview

dataflow-mvp provides a basic example pipeline that pulls data from an API and writes it to a BigQuery table using GCP's Dataflow (i.e., Apache Beam)

Table of Contents

File Description
main.py Main Python code for the Dataflow pipeline. The function defineBQSchema defines the BQ table schema
setup.py When the pipeline is deployed in GCP as a template, GCP uses setup.py to set up the worker nodes (e.g., install required Python dependencies).
build.bat Bash script to deploy the pipeline as a reusable template in GCP.

Environment

  • Local machine running Microsoft Windows 10 Home
  • Python 3.6.8
    • As of 12/1/21, Apache Beam only supports 3.6, 3.7, and 3.8 (not 3.9). However, orjson only supports 3.6.

Getting Started

Pre-Requisites

The following instructions assume that the project ID is dataflow-mvp and you have owner access to it.

  1. If you don't have it already, install the Google Cloud SDK:
    https://cloud.google.com/sdk/docs/install

  2. Authenticate your Google account:
    gcloud auth login

  3. Create a virtual environment for Python:
    py -3.8 venv venv

  4. Activate the virtual environment, upgrade pip, and install the Apache Beam library for GCP:

"./venv/Scripts/activate.bat"
python -m pip install --upgrade pip
python -m pip install apache_beam[gcp]

Run Build

  1. To make our lives easier later, set environment variables for the following:
  • DATAFLOW_BUCKET - the name of the bucket from step 10
  • DF_TEMPLATE_NAME - the name (of your choosing) for the Dataflow template (e.g., dataflow-mvp-dog)
  • PROJECT_ID - the name of the GCP project from step 4 (e.g., dataflow-mvp)
  • GCP_REGION - the GCP region (I like to choose the region closest to me e.g., useast-1)

For instance, to set the PROJECT_ID variable in the Windows CLI, use:
set PROJECT_ID=dataflow-mvp

On Linux machines, use
export PROJECT_ID=dataflow-mvp

The instructions below assume you're working on a Windows machine. Therefore, if you're working in a Linux environment, you'll have to use $PROJECT_ID instead of %PROJECT_ID% where appropriate in the instructions below.

  1. Set the GCP project via config:
    gcloud config set project %PROJECT_ID%
  • You can verify the project is correctly set using:
    gcloud config list
  1. Enable the necessary APIs:
gcloud services enable dataflow.googleapis.com && ^
gcloud services enable cloudscheduler.googleapis.com && ^
gcloud services enable bigquery.googleapis.com && ^
gcloud services enable cloudresourcemanager.googleapis.com  && ^
gcloud services enable appengine.googleapis.com
  1. Create a service account for the Dataflow runner:
gcloud iam service-accounts create dataflow-runner --display-name "Dataflow Runner service account"
  1. Add the required IAM roles to the Dataflow runner's service account:
gcloud projects add-iam-policy-binding %PROJECT_ID% --member serviceAccount:dataflow-runner@%PROJECT_ID%.iam.gserviceaccount.com --role roles/owner
  1. Create a GCS bucket to store Dataflow code, staging files and templates:
gsutil mb -p %PROJECT_ID% -l %GCP_REGION% gs://%DATAFLOW_BUCKET%

Build the Dataflow Template

  1. In build.bat, edit the variables in lines 1 through 4:
  • DATAFLOW_BUCKET - the name of the bucket from step 10
  • DF_TEMPLATE_NAME - the name (of your choosing) for the Dataflow template (e.g., dataflow-mvp-dog)
  • PROJECT_ID - the name of the GCP project from step 4 (e.g., dataflow-mvp)
  • GCP_REGION - the GCP region (I like to choose the region closest to me e.g., useast-1)
  1. Run the build.bat script:
build.bat

This will create the template for the Dataflow job in a the specified GCS bucket.

  1. Verify that the template has been uploaded to the GCS bucket:
    gsutil ls gs://%DATAFLOW_BUCKET%/templates/%DF_TEMPLATE_NAME%

Create the Cloud Scheduler Job

  1. Finally, submit a Cloud Scheduler job to run Dataflow on a desired schedule:
gcloud scheduler jobs create http api-to-gbq-scheduler ^
--schedule="0 */3 * * *" ^
--uri="https://dataflow.googleapis.com/v1b3/projects/%PROJECT_ID%/locations/%GCP_REGION%/templates:launch?gcsPath=gs://%DATAFLOW_BUCKET%/templates/%DF_TEMPLATE_NAME%" ^
--http-method="post" ^
--oauth-service-account-email="dataflow-runner@%PROJECT_ID%.iam.gserviceaccount.com" ^
--oauth-token-scope="https://www.googleapis.com/auth/cloud-platform" ^
--message-body="{""jobName"": ""api-to-bq-df"", ""parameters"": {""region"": ""%GCP_REGION%""}, ""environment"": {""numWorkers"": ""3""}}" ^
--time-zone=America/Chicago 

Notes:

  • Alternatively, you could use the message-body-from-file argument. However, you'll need to manually specify the GCP region since we can't use environment variables within the JSON.
  • The cron string 0 */3 * * * executes the job every 3 hours.
  • The jobName parameter, api-to-bq-df, names the job as it will be listed in the Cloud Scheduler app.

Resources

Warranty

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Owner
Chris Carbonell
Chris Carbonell
This cosmetics generator allows you to generate the new Fortnite cosmetics, Search pak and search cosmetics!

COSMETICS GENERATOR This cosmetics generator allows you to generate the new Fortnite cosmetics, Search pak and search cosmetics! Remember to put the l

ᴅᴊʟᴏʀ3xᴢᴏ 11 Dec 13, 2022
Statistical & Probabilistic Analysis of Store Sales, University Survey, & Manufacturing data

Statistical_Modelling Statistical & Probabilistic Analysis of Store Sales, University Survey, & Manufacturing data Statistical Methods for Decision Ma

Avnika Mehta 1 Jan 27, 2022
Supply a wrapper ``StockDataFrame`` based on the ``pandas.DataFrame`` with inline stock statistics/indicators support.

Stock Statistics/Indicators Calculation Helper VERSION: 0.3.2 Introduction Supply a wrapper StockDataFrame based on the pandas.DataFrame with inline s

Cedric Zhuang 1.1k Dec 28, 2022
bigdata_analyse 大数据分析项目

bigdata_analyse 大数据分析项目 wish 采用不同的技术栈,通过对不同行业的数据集进行分析,期望达到以下目标: 了解不同领域的业务分析指标 深化数据处理、数据分析、数据可视化能力 增加大数据批处理、流处理的实践经验 增加数据挖掘的实践经验

Way 2.4k Dec 30, 2022
A Python package for modular causal inference analysis and model evaluations

Causal Inference 360 A Python package for inferring causal effects from observational data. Description Causal inference analysis enables estimating t

International Business Machines 506 Dec 19, 2022
Analysiscsv.py for extracting analysis and exporting as CSV

wcc_analysis Lichess page documentation: https://lichess.org/page/world-championships Each WCC has a study, studies are fetched using: https://lichess

32 Apr 25, 2022
Bigdata Simulation Library Of Dream By Sandman Books

BIGDATA SIMULATION LIBRARY OF DREAM BY SANDMAN BOOKS ================= Solution Architecture Description In the realm of Dreaming, its ruler SANDMAN,

Maycon Cypriano 3 Jun 30, 2022
Tools for analyzing data collected with a custom unity-based VR for insects.

unityvr Tools for analyzing data collected with a custom unity-based VR for insects. Organization: The unityvr package contains the following submodul

Hannah Haberkern 1 Dec 14, 2022
collect training and calibration data for gaze tracking

Collect Training and Calibration Data for Gaze Tracking This tool allows collecting gaze data necessary for personal calibration or training of eye-tr

Pascal 5 Dec 17, 2022
Spectral Analysis in Python

SPECTRUM : Spectral Analysis in Python contributions: Please join https://github.com/cokelaer/spectrum contributors: https://github.com/cokelaer/spect

Thomas Cokelaer 280 Dec 16, 2022
Office365 (Microsoft365) audit log analysis tool

Office365 (Microsoft365) audit log analysis tool The header describes it all WHY?? The first line of code was written long time before other colleague

Anatoly 1 Jul 27, 2022
A powerful data analysis package based on mathematical step functions. Strongly aligned with pandas.

The leading use-case for the staircase package is for the creation and analysis of step functions. Pretty exciting huh. But don't hit the close button

48 Dec 21, 2022
An ETL Pipeline of a large data set from a fictitious music streaming service named Sparkify.

An ETL Pipeline of a large data set from a fictitious music streaming service named Sparkify. The ETL process flows from AWS's S3 into staging tables in AWS Redshift.

1 Feb 11, 2022
A probabilistic programming library for Bayesian deep learning, generative models, based on Tensorflow

ZhuSuan is a Python probabilistic programming library for Bayesian deep learning, which conjoins the complimentary advantages of Bayesian methods and

Tsinghua Machine Learning Group 2.2k Dec 28, 2022
Pandas and Dask test helper methods with beautiful error messages.

beavis Pandas and Dask test helper methods with beautiful error messages. test helpers These test helper methods are meant to be used in test suites.

Matthew Powers 18 Nov 28, 2022
Python Library for learning (Structure and Parameter) and inference (Statistical and Causal) in Bayesian Networks.

pgmpy pgmpy is a python library for working with Probabilistic Graphical Models. Documentation and list of algorithms supported is at our official sit

pgmpy 2.2k Dec 25, 2022
Sample code for Harry's Airflow online trainng course

Sample code for Harry's Airflow online trainng course You can find the videos on youtube or bilibili. I am working on adding below things: the slide p

102 Dec 30, 2022
Python for Data Analysis, 2nd Edition

Python for Data Analysis, 2nd Edition Materials and IPython notebooks for "Python for Data Analysis" by Wes McKinney, published by O'Reilly Media Buy

Wes McKinney 18.6k Jan 08, 2023
A real-time financial data streaming pipeline and visualization platform using Apache Kafka, Cassandra, and Bokeh.

Realtime Financial Market Data Visualization and Analysis Introduction This repo shows my project about real-time stock data pipeline. All the code is

6 Sep 07, 2022