This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

Overview

Sample streaming Dataflow pipeline written in Python

This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

For more details, see the following Beam Summit 2021 talk:

To run this pipeline, you need to have the SDK installed, and a project in Google Cloud Platform, even if you run the pipeline locally with the direct runner:

Description of the pipeline

Data input

We are using here a public PubSub topic with data, so we don't need to setup our own to run this pipeline.

The topic is projects/pubsub-public-data/topics/taxirides-realtime.

That topic contains messages from the NYC Taxi Ride dataset. Here is a sample of the data contained in a message in that topic:

{
  "ride_id": "328bec4b-0126-42d4-9381-cb1dbf0e2432",
  "point_idx": 305,
  "latitude": 40.776270000000004,
  "longitude": -73.99111,
  "timestamp": "2020-03-27T21:32:51.48098-04:00",
  "meter_reading": 9.403651,
  "meter_increment": 0.030831642,
  "ride_status": "enroute",
  "passenger_count": 1
}

But the messages also contain metadata, that is useful for streaming pipelines. In this case, the messages contain an attribute of name ts, which contains the same timestamp as the field of name timestamp in the data. Remember that PubSub treats the data as just a string of bytes, so it does not know anything about the data itself. The metadata fields are normally used to publish messages with specific ids and/or timestamps.

To inspect the messages from this topic, you can create a subscription, and then pull some messages.

To create a subscription, use the gcloud cli utility (installed by default in the Cloud Shell):

export TOPIC=projects/pubsub-public-data/topics/taxirides-realtime
gcloud pubsub subscriptions create taxis --topic $TOPIC

To pull messages:

gcloud pubsub subscriptions pull taxis --limit 3

or if you have jq (for pretty printing of JSON)

gcloud pubsub subscriptions pull taxis --limit 3 | grep " {" | cut -f 2 -d ' ' | jq

Pay special attention to the Attributes column (metadata). You will see that the timestamp included as a field in the metadata, as well as in the data. We will leverage that metadata field for the timestamps used in our streaming pipeline.

Data output

This pipeline writes the output to BigQuery, in streaming append-only mode.

The destination tables must exist prior to running the pipeline.

If you have the GCloud cli utility installed (for instance, it is installed by default in the Cloud Shell), you can create the tables from the command line.

You need to create a BigQuery dataset too, in the same region:

After that, you can create the destination tables with the provided script

./scripts/create_tables.sh taxi_rides

Algorithm / business rules

We are using a session window with a gap of 10 seconds. That means that all the messages with the same ride_id will be grouped together, as long as their timestamps are 10 seconds within each other. Any message with a timestamp more than 10 seconds apart will be discarded (for old timestamps) or will open a new window (for newer timestamps).

With the messages inside each window (that is, each different ride_id will be part of a different window), we will calculate the duration of the session, as the difference between the min and max timestamps in the window. We will also calculate the number of events in that session.

We will use a GroupByKey to operate with all the messages in a window. This will load all the messages in the window into memory. This is fine, as in Beam streaming, a window is always processed in a worker (windows cannot be split across different workers).

This is an example of the kind of logic that can be implemented leveraging windows in streaming pipelines. This grouping of messages across ride_id and event timestamps is automatically done by the pipeline, and we just need to express the generic operations to be performed with each window, as part of our pipeline.

Running the pipeline

Prerequirements

You need to have a Google Cloud project, and the gcloud SDK configured to run the pipeline. For instance, you could run it from the Cloud Shell in Google Cloud Platform (gcloud would be automatically configured).

Then you need to create a Google Cloud Storage bucket, with the same name as your project id, and in the same region where you will run Dataflow:

Make sure that you have a Python environment with Python 3 (<3.9). For instance a virtualenv, and install apache-beam[gcp] and python-dateutil in your local environment. For instance, assuming that you are running in a virtualenv:

pip install "apache-beam[gcp]" python-dateutil

Run the pipeline

Once the tables are created and the dependencies installed, edit scripts/launch_dataflow_runner.sh and set your project id and region, and then run it with:

./scripts/launch_dataflow_runner.sh

The outputs will be written to the BigQuery tables, and in the profile directory in your bucket you should see Python gprof files with profiling information.

CPU profiling

Beam uses the Python profiler to produce files in Python gprof format. You will need some scripting to interpret those files and extracts insights out of them.

In this repository, you will find some sample output in data/beam.prof, that you can use to check what the profiling output looks like. Use the following Colab notebook with an example analyzing that sample profiling data:

Refer to this post for more details about how to interpret that file:

License

Copyright 2021 Israel Herraiz

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Owner
Israel Herraiz
Strategic Cloud Engineer @GoogleCloudPlatform
Israel Herraiz
Resources for teaching & learning practical data visualization with python.

Practical Data Visualization with Python Overview All views expressed on this site are my own and do not represent the opinions of any entity with whi

Paul Jeffries 98 Sep 24, 2022
Extensible, parallel implementations of t-SNE

openTSNE openTSNE is a modular Python implementation of t-Distributed Stochasitc Neighbor Embedding (t-SNE) [1], a popular dimensionality-reduction al

Pavlin Poliฤar 1.1k Jan 03, 2023
Show Data: Show your dataset in web browser!

Show Data is to generate html tables for large scale image dataset, especially for the dataset in remote server. It provides some useful commond line tools and fully customizeble API reference to gen

Dechao Meng 83 Nov 26, 2022
Create a visualization for Trump's Tweeted Words Using Python

Data Trump's Tweeted Words This plot illustrates twitter word occurences. We already did the coding I needed for this plot, so I was very inspired to

7 Mar 27, 2022
A curated list of awesome Dash (plotly) resources

Awesome Dash A curated list of awesome Dash (plotly) resources Dash is a productive Python framework for building web applications. Written on top of

Luke Singham 1.7k Jan 07, 2023
The windML framework provides an easy-to-use access to wind data sources within the Python world, building upon numpy, scipy, sklearn, and matplotlib. Renewable Wind Energy, Forecasting, Prediction

windml Build status : The importance of wind in smart grids with a large number of renewable energy resources is increasing. With the growing infrastr

Computational Intelligence Group 125 Dec 24, 2022
Create 3d loss surface visualizations, with optimizer path. Issues welcome!

MLVTK A loss surface visualization tool Simple feed-forward network trained on chess data, using elu activation and Adam optimizer Simple feed-forward

7 Dec 21, 2022
A simple code for plotting figure, colorbar, and cropping with python

Python Plotting Tools This repository provides a python code to generate figures (e.g., curves and barcharts) that can be used in the paper to show th

Guanying Chen 134 Jan 02, 2023
๐Ÿ—พ Streamlit Component for rendering kepler.gl maps

streamlit-keplergl ๐Ÿ—พ Streamlit Component for rendering kepler.gl maps in a streamlit app. ๐ŸŽˆ Live Demo ๐ŸŽˆ Installation pip install streamlit-keplergl

Christoph Rieke 39 Dec 14, 2022
Graphing communities on Twitch.tv in a visually intuitive way

VisualizingTwitchCommunities This project maps communities of streamers on Twitch.tv based on shared viewership. The data is collected from the Twitch

Kiran Gershenfeld 312 Jan 07, 2023
Data Visualizations for the #30DayChartChallenge

The #30DayChartChallenge This repository contains all the charts made for the #30DayChartChallenge during the month of April. This project aims to exp

Isaac Arroyo 7 Sep 20, 2022
Geospatial Data Visualization using PyGMT

Example script to visualize topographic data, earthquake data, and tomographic data on a map

Utpal Kumar 2 Jul 30, 2022
This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

Sample streaming Dataflow pipeline written in Python This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, readin

Israel Herraiz 9 Mar 18, 2022
Process dataframe in a easily way.

Popanda Written by Shengxuan Wang at OSU. Used for processing dataframe, especially for machine learning. The name is from "Po" in the movie Kung Fu P

ShawnWang 1 Dec 24, 2021
A simple agent-based model used to teach the basics of OOP in my lectures

Pydemic A simple agent-based model of a pandemic. This is used to teach basic principles of object-oriented programming to master students. It is not

Fabien Maussion 2 Jun 08, 2022
Designed a greedy algorithm based on Markov sequential decision-making process in MATLAB/Python to optimize using Gurobi solver

Designed a greedy algorithm based on Markov sequential decision-making process in MATLAB/Python to optimize using Gurobi solver, the wheel size, gear shifting sequence by modeling drivetrain constrai

Sabbella Prasanna 1 Jan 11, 2022
A tool to plot and execute Rossmos's Formula, that helps to catch serial criminals using mathematics

Rossmo Plotter A tool to plot and execute Rossmos's Formula using python, that helps to catch serial criminals using mathematics Author: Amlan Saha Ku

Amlan Saha Kundu 3 Aug 29, 2022
Extract data from ThousandEyes REST API and visualize it on your customized Grafana Dashboard.

ThousandEyes Grafana Dashboard Extract data from the ThousandEyes REST API and visualize it on your customized Grafana Dashboard. Deploy Grafana, Infl

Flo Pachinger 16 Nov 26, 2022
A high-level plotting API for pandas, dask, xarray, and networkx built on HoloViews

hvPlot A high-level plotting API for the PyData ecosystem built on HoloViews. Build Status Coverage Latest dev release Latest release Docs What is it?

HoloViz 697 Jan 06, 2023
Farhad Davaripour, Ph.D. 1 Jan 05, 2022