This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

Overview

Sample streaming Dataflow pipeline written in Python

This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

For more details, see the following Beam Summit 2021 talk:

To run this pipeline, you need to have the SDK installed, and a project in Google Cloud Platform, even if you run the pipeline locally with the direct runner:

Description of the pipeline

Data input

We are using here a public PubSub topic with data, so we don't need to setup our own to run this pipeline.

The topic is projects/pubsub-public-data/topics/taxirides-realtime.

That topic contains messages from the NYC Taxi Ride dataset. Here is a sample of the data contained in a message in that topic:

{
  "ride_id": "328bec4b-0126-42d4-9381-cb1dbf0e2432",
  "point_idx": 305,
  "latitude": 40.776270000000004,
  "longitude": -73.99111,
  "timestamp": "2020-03-27T21:32:51.48098-04:00",
  "meter_reading": 9.403651,
  "meter_increment": 0.030831642,
  "ride_status": "enroute",
  "passenger_count": 1
}

But the messages also contain metadata, that is useful for streaming pipelines. In this case, the messages contain an attribute of name ts, which contains the same timestamp as the field of name timestamp in the data. Remember that PubSub treats the data as just a string of bytes, so it does not know anything about the data itself. The metadata fields are normally used to publish messages with specific ids and/or timestamps.

To inspect the messages from this topic, you can create a subscription, and then pull some messages.

To create a subscription, use the gcloud cli utility (installed by default in the Cloud Shell):

export TOPIC=projects/pubsub-public-data/topics/taxirides-realtime
gcloud pubsub subscriptions create taxis --topic $TOPIC

To pull messages:

gcloud pubsub subscriptions pull taxis --limit 3

or if you have jq (for pretty printing of JSON)

gcloud pubsub subscriptions pull taxis --limit 3 | grep " {" | cut -f 2 -d ' ' | jq

Pay special attention to the Attributes column (metadata). You will see that the timestamp included as a field in the metadata, as well as in the data. We will leverage that metadata field for the timestamps used in our streaming pipeline.

Data output

This pipeline writes the output to BigQuery, in streaming append-only mode.

The destination tables must exist prior to running the pipeline.

If you have the GCloud cli utility installed (for instance, it is installed by default in the Cloud Shell), you can create the tables from the command line.

You need to create a BigQuery dataset too, in the same region:

After that, you can create the destination tables with the provided script

./scripts/create_tables.sh taxi_rides

Algorithm / business rules

We are using a session window with a gap of 10 seconds. That means that all the messages with the same ride_id will be grouped together, as long as their timestamps are 10 seconds within each other. Any message with a timestamp more than 10 seconds apart will be discarded (for old timestamps) or will open a new window (for newer timestamps).

With the messages inside each window (that is, each different ride_id will be part of a different window), we will calculate the duration of the session, as the difference between the min and max timestamps in the window. We will also calculate the number of events in that session.

We will use a GroupByKey to operate with all the messages in a window. This will load all the messages in the window into memory. This is fine, as in Beam streaming, a window is always processed in a worker (windows cannot be split across different workers).

This is an example of the kind of logic that can be implemented leveraging windows in streaming pipelines. This grouping of messages across ride_id and event timestamps is automatically done by the pipeline, and we just need to express the generic operations to be performed with each window, as part of our pipeline.

Running the pipeline

Prerequirements

You need to have a Google Cloud project, and the gcloud SDK configured to run the pipeline. For instance, you could run it from the Cloud Shell in Google Cloud Platform (gcloud would be automatically configured).

Then you need to create a Google Cloud Storage bucket, with the same name as your project id, and in the same region where you will run Dataflow:

Make sure that you have a Python environment with Python 3 (<3.9). For instance a virtualenv, and install apache-beam[gcp] and python-dateutil in your local environment. For instance, assuming that you are running in a virtualenv:

pip install "apache-beam[gcp]" python-dateutil

Run the pipeline

Once the tables are created and the dependencies installed, edit scripts/launch_dataflow_runner.sh and set your project id and region, and then run it with:

./scripts/launch_dataflow_runner.sh

The outputs will be written to the BigQuery tables, and in the profile directory in your bucket you should see Python gprof files with profiling information.

CPU profiling

Beam uses the Python profiler to produce files in Python gprof format. You will need some scripting to interpret those files and extracts insights out of them.

In this repository, you will find some sample output in data/beam.prof, that you can use to check what the profiling output looks like. Use the following Colab notebook with an example analyzing that sample profiling data:

Refer to this post for more details about how to interpret that file:

License

Copyright 2021 Israel Herraiz

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Owner
Israel Herraiz
Strategic Cloud Engineer @GoogleCloudPlatform
Israel Herraiz
Editor and Presenter for Manim Generated Content.

Editor and Presenter for Manim Generated Content. Take a look at the Working Example. More information can be found on the documentation. These Browse

Manim Community 149 Dec 29, 2022
An interactive GUI for WhiteboxTools in a Jupyter-based environment

whiteboxgui An interactive GUI for WhiteboxTools in a Jupyter-based environment GitHub repo: https://github.com/giswqs/whiteboxgui Documentation: http

Qiusheng Wu 105 Dec 15, 2022
GUI for visualization and interactive editing of SMPL-family body models ie. SMPL, SMPL-X, MANO, FLAME.

Body Model Visualizer Introduction This is a simple Open3D-based GUI for SMPL-family body models. This GUI lets you play with the shape, expression, a

Muhammed Kocabas 207 Jan 01, 2023
termplotlib is a Python library for all your terminal plotting needs.

termplotlib termplotlib is a Python library for all your terminal plotting needs. It aims to work like matplotlib. Line plots For line plots, termplot

Nico Schlömer 553 Dec 30, 2022
This tool is designed to help administrators get an overview of their Active Directory structure.

This tool is designed to help administrators get an overview of their Active Directory structure. In the group view you can see all elements of an AD (OU, USER, GROUPS, COMPUTERS etc.). In the user v

deexno 2 Oct 30, 2022
A custom qq-plot for two sample data comparision

QQ-Plot 2 Sample Just a gist to include the custom code to draw a qq-plot in python when dealing with a "two sample problem". This means when u try to

1 Dec 20, 2021
Statistical data visualization using matplotlib

seaborn: statistical data visualization Seaborn is a Python visualization library based on matplotlib. It provides a high-level interface for drawing

Michael Waskom 10.2k Dec 30, 2022
This is a Web scraping project using BeautifulSoup and Python to scrape basic information of all the Test matches played till Jan 2022.

Scraping-test-matches-data This is a Web scraping project using BeautifulSoup and Python to scrape basic information of all the Test matches played ti

Souradeep Banerjee 4 Oct 10, 2022
Create animated and pretty Pandas Dataframe or Pandas Series

Rich DataFrame Create animated and pretty Pandas Dataframe or Pandas Series, as shown below: Installation pip install rich-dataframe Usage Minimal exa

Khuyen Tran 92 Dec 26, 2022
Import, visualize, and analyze SpiderFoot OSINT data in Neo4j, a graph database

SpiderFoot Neo4j Tools Import, visualize, and analyze SpiderFoot OSINT data in Neo4j, a graph database Step 1: Installation NOTE: This installs the sf

Black Lantern Security 42 Dec 26, 2022
Leyna's Visualizing Data With Python

Leyna's Visualizing Data Below is information on the number of bilingual students in three school districts in Massachusetts. You will also find infor

11 Oct 28, 2021
Flow-based visual scripting for Python

A simple visual node editor for Python Ryven combines flow-based visual scripting with Python. It gives you absolute freedom for your nodes and a simp

Leon Thomm 3.1k Jan 06, 2023
A GUI for Pandas DataFrames

About Demo Installation Usage Features More Info About PandasGUI is a GUI for viewing, plotting and analyzing Pandas DataFrames. Demo Installation Ins

Adam Rose 2.8k Dec 24, 2022
Print matplotlib colors

mplcolors Tired of searching "matplotlib colors" every week/day/hour? This simple script displays them all conveniently right in your terminal emulato

Brandon Barker 32 Dec 13, 2022
Painlessly create beautiful matplotlib plots.

Announcement Thank you to everyone who has used prettyplotlib and made it what it is today! Unfortunately, I no longer have the bandwidth to maintain

Olga Botvinnik 1.6k Jan 06, 2023
A site that displays up to date COVID-19 stats, powered by fastpages.

https://covid19dashboards.com This project was built with fastpages Background This project showcases how you can use fastpages to create a static das

GitHub 1.6k Jan 07, 2023
Data Visualizations for the #30DayChartChallenge

The #30DayChartChallenge This repository contains all the charts made for the #30DayChartChallenge during the month of April. This project aims to exp

Isaac Arroyo 7 Sep 20, 2022
mysql relation charts

sqlcharts 自动生成数据库关联关系图 复制settings.py.example 重命名为settings.py 将数据库配置信息填入settings.DATABASE,目前支持mysql和postgresql 执行 python build.py -b,-b是读取数据库表结构,如果只更新匹

6 Aug 22, 2022
A Python Library for Self Organizing Map (SOM)

SOMPY A Python Library for Self Organizing Map (SOM) As much as possible, the structure of SOM is similar to somtoolbox in Matlab. It has the followin

Vahid Moosavi 497 Dec 29, 2022
649 Pokémon palettes as CSVs, with a Python lib to turn names/IDs into palettes, or MatPlotLib compatible ListedColormaps.

PokePalette 649 Pokémon, broken down into CSVs of their RGB colour palettes. Complete with a Python library to convert names or Pokédex IDs into eithe

11 Dec 05, 2022