Building house price data pipelines with Apache Beam and Spark on GCP

Overview

house-price-etl-pipeline

This project contains the process from building a web crawler to extract the raw data of house price to create ETL pipelines using Google Could Platform services.

Basic flow of the ETL pipeline

The ETL pipelines are built with both Apache Beam using Cloud Dataflow and Spark using Cloud Dataproc for loading real estate transactions data into BigQuery, and the data can be visualized in Data Studio. The project also uses Cloud Function to monitor if a new file is uploaded in the GCS bucket and trigger the pipeline automatically.

1. Get Started

The house price data

Actual price registration of real estate transactions data in Taiwan has been released since 2012, which refers to the transaction information includes: position and area of real estate, total price of land and building, parking space related information, etc. We can use the data to observe the changes in house prices over time or predict the house price trend in various regions.

Setup and requirements

Set up on Google Cloud Platform:

Project is created with:

  • Python version: 3.7
  • Apache beam version: 2.33.0
  • Pyspark version: 3.2.0

2. Use a web crawler to download the historical data

Run the web crawler to download historical actual price data in csv format, and upload the files to the Google Cloud Storage bucket.

First, set up the local Python development environment and install packages from requirements.txt:

$ pip install -r requirements.txt

Open crawler.py file, replace YOUR_DIR_PATH with a local directory to store download data, replace projectID with your Google Cloud project ID, and replace GCS_BUCKET_NAME with the name of your Cloud Storage bucket. Then run the web crawler:

$ python crawler.py

3. Build ETL pipelines on GCP

There are two versions of ETL pipelines that read source files from Cloud Storage, apply some transformations and load the data into BigQuery. One of the ETL pipelines based on Apache beam uses Dataflow to process the data for analytics of land transaction. The other ETL pipeline based on Apache Spark uses Dataproc to proccess the data for analytics of building transaction.

Let’s start by opening a session in Google Cloud Shell. Run the following commands to set the project property with your project ID.

$ gcloud config set project [projectID]

Run the pipeline using Dataflow for land data

The file etl_pipeline_beam.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Run actual_price_etl.py to create a Dataflow job which runs the DataflowRunner. Notice that we need to set the Cloud Storage location of the staging and template file, and set the region in which the created job should run.

$ python etl_pipeline_beam.py \
--project=projectID \
--region=region \
--runner=DataflowRunner \
--staging_location=gs://BUCKET_NAME/staging \
--temp_location=gs://BUCKET_NAME/temp \
--save_main_session

Run the pipeline using Dataproc for building data

The file etl_pipeline_spark.py contains the Python code for the etl pipeline with Apache Spark. We can upload the file using the Cloud Shell Editor.

Submit etl_pipeline_spark.py to your Dataproc cluster to run the Spark job. We need to set the cluster name, and set the region in which the created job should run. To write data to Bigquery, the jar file of spark-bigquery-connector must be available at runtime.

$ gcloud dataproc jobs submit pyspark etl_pipeline_spark.py \
--cluster=cluster-name \
--region=region \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar

4. Use a Cloud Function to trigger Cloud Dataflow

Use the Cloud Fucntion to automatically trigger the Dataflow pipeline when a new file arrives in the GCS bucket.

First, we need to create a Dataflow template for runnig the data pipeline with REST API request called by the Cloud Function. The file etl_pipeline_beam_auto.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Create a Dataflow template

Use etl_pipeline_beam_auto.py to create a Dataflow template. Note that we need to set the Cloud Storage location of the staging, temporary and template file, and set the region in which the created job should run.

python -m etl_pipeline_beam_auto \
    --runner DataflowRunner \
    --project projectID \
    --region=region \
    --staging_location gs://BUCKET_NAME/staging \
    --temp_location gs://BUCKET_NAME/temp \
    --template_location gs://BUCKET_NAME/template \
    --save_main_session

Create a Cloud Function

Go to the Cloud Function GUI and manually create a function, set Trigger as Cloud Storage, Event Type as Finalize/Create , and choose the GCS bucket which needs to be monitored. Next, write the function itself, use the code in main.py file. Note that the user defined parameter input is passed to the Dataflow pipeline job. Finally, click on depoly and now your function is ready to execute and start the Dataflow pipeline when a file is uploaded in your bucket.

Results

When each ETL pipeline is completed and succeeded, navigating to BigQuery to verify that the data is successfully loaded in the table.

BigQuery - land_data table

Now the data is ready for analytics and reporting. Here, we calculate average price by year in BigQuery, and visualize the results in Data Studio.

Data Studio - Average land price by year in Yilan County

scikit-survival is a Python module for survival analysis built on top of scikit-learn.

scikit-survival scikit-survival is a Python module for survival analysis built on top of scikit-learn. It allows doing survival analysis while utilizi

Sebastian Pölsterl 876 Jan 04, 2023
Exploratory Data Analysis of the 2019 Indian General Elections using a dataset from Kaggle.

2019-indian-election-eda Exploratory Data Analysis of the 2019 Indian General Elections using a dataset from Kaggle. This project is a part of the Cou

Souradeep Banerjee 5 Oct 10, 2022
A 2-dimensional physics engine written in Cairo

A 2-dimensional physics engine written in Cairo

Topology 38 Nov 16, 2022
Evidence enables analysts to deliver a polished business intelligence system using SQL and markdown.

Evidence enables analysts to deliver a polished business intelligence system using SQL and markdown

915 Dec 26, 2022
ETL pipeline on movie data using Python and postgreSQL

Movies-ETL ETL pipeline on movie data using Python and postgreSQL Overview This project consisted on a automated Extraction, Transformation and Load p

Juan Nicolas Serrano 0 Jul 07, 2021
In this project, ETL pipeline is build on data warehouse hosted on AWS Redshift.

ETL Pipeline for AWS Project Description In this project, ETL pipeline is build on data warehouse hosted on AWS Redshift. The data is loaded from S3 t

Mobeen Ahmed 1 Nov 01, 2021
Pandas and Dask test helper methods with beautiful error messages.

beavis Pandas and Dask test helper methods with beautiful error messages. test helpers These test helper methods are meant to be used in test suites.

Matthew Powers 18 Nov 28, 2022
Calculate multilateral price indices in Python (with Pandas and PySpark).

IndexNumCalc Calculate multilateral price indices using the GEKS-T (CCDI), Time Product Dummy (TPD), Time Dummy Hedonic (TDH), Geary-Khamis (GK) metho

Dr. Usman Kayani 3 Apr 27, 2022
NumPy aware dynamic Python compiler using LLVM

Numba A Just-In-Time Compiler for Numerical Functions in Python Numba is an open source, NumPy-aware optimizing compiler for Python sponsored by Anaco

Numba 8.2k Jan 07, 2023
pyETT: Python library for Eleven VR Table Tennis data

pyETT: Python library for Eleven VR Table Tennis data Documentation Documentation for pyETT is located at https://pyett.readthedocs.io/. Installation

Tharsis Souza 5 Nov 19, 2022
Full automated data pipeline using docker images

Create postgres tables from CSV files This first section is only relate to creating tables from CSV files using postgres container alone. Just one of

1 Nov 21, 2021
Methylation/modified base calling separated from basecalling.

Remora Methylation/modified base calling separated from basecalling. Remora primarily provides an API to call modified bases for basecaller programs s

Oxford Nanopore Technologies 72 Jan 05, 2023
Py-price-monitoring - A Python price monitor

A Python price monitor This project was focused on Brazil, so the monitoring is

Samuel 1 Jan 04, 2022
Weather Image Recognition - Python weather application using series of data

Weather Image Recognition - Python weather application using series of data

Kushal Shingote 1 Feb 04, 2022
Pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).

AWS Data Wrangler Pandas on AWS Easy integration with Athena, Glue, Redshift, Timestream, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretMana

Amazon Web Services - Labs 3.3k Jan 04, 2023
Flexible HDF5 saving/loading and other data science tools from the University of Chicago

deepdish Flexible HDF5 saving/loading and other data science tools from the University of Chicago. This repository also host a Deep Learning blog: htt

UChicago - Department of Computer Science 255 Dec 10, 2022
Basis Set Format Converter

Basis Set Format Converter Repository for the online tool that allows you to enter a basis set in the form of text input for a variety of Quantum Chem

Manas Sharma 3 Jun 27, 2022
Tools for the analysis, simulation, and presentation of Lorentz TEM data.

ltempy ltempy is a set of tools for Lorentz TEM data analysis, simulation, and presentation. Features Single Image Transport of Intensity Equation (SI

McMorran Lab 1 Dec 26, 2022
Analysis scripts for QG equations

qg-edgeofchaos Analysis scripts for QG equations FIle/Folder Structure eigensolvers.py - Spectral and finite-difference solvers for Rossby wave eigenf

Norman Cao 2 Sep 27, 2022
An experimental project I'm undertaking for the sole purpose of increasing my Python knowledge

5ePy is an experimental project I'm undertaking for the sole purpose of increasing my Python knowledge. #Goals Goal: Create a working, albeit lightwei

Hayden Covington 1 Nov 24, 2021