Building house price data pipelines with Apache Beam and Spark on GCP

Overview

house-price-etl-pipeline

This project contains the process from building a web crawler to extract the raw data of house price to create ETL pipelines using Google Could Platform services.

Basic flow of the ETL pipeline

The ETL pipelines are built with both Apache Beam using Cloud Dataflow and Spark using Cloud Dataproc for loading real estate transactions data into BigQuery, and the data can be visualized in Data Studio. The project also uses Cloud Function to monitor if a new file is uploaded in the GCS bucket and trigger the pipeline automatically.

1. Get Started

The house price data

Actual price registration of real estate transactions data in Taiwan has been released since 2012, which refers to the transaction information includes: position and area of real estate, total price of land and building, parking space related information, etc. We can use the data to observe the changes in house prices over time or predict the house price trend in various regions.

Setup and requirements

Set up on Google Cloud Platform:

Project is created with:

  • Python version: 3.7
  • Apache beam version: 2.33.0
  • Pyspark version: 3.2.0

2. Use a web crawler to download the historical data

Run the web crawler to download historical actual price data in csv format, and upload the files to the Google Cloud Storage bucket.

First, set up the local Python development environment and install packages from requirements.txt:

$ pip install -r requirements.txt

Open crawler.py file, replace YOUR_DIR_PATH with a local directory to store download data, replace projectID with your Google Cloud project ID, and replace GCS_BUCKET_NAME with the name of your Cloud Storage bucket. Then run the web crawler:

$ python crawler.py

3. Build ETL pipelines on GCP

There are two versions of ETL pipelines that read source files from Cloud Storage, apply some transformations and load the data into BigQuery. One of the ETL pipelines based on Apache beam uses Dataflow to process the data for analytics of land transaction. The other ETL pipeline based on Apache Spark uses Dataproc to proccess the data for analytics of building transaction.

Let’s start by opening a session in Google Cloud Shell. Run the following commands to set the project property with your project ID.

$ gcloud config set project [projectID]

Run the pipeline using Dataflow for land data

The file etl_pipeline_beam.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Run actual_price_etl.py to create a Dataflow job which runs the DataflowRunner. Notice that we need to set the Cloud Storage location of the staging and template file, and set the region in which the created job should run.

$ python etl_pipeline_beam.py \
--project=projectID \
--region=region \
--runner=DataflowRunner \
--staging_location=gs://BUCKET_NAME/staging \
--temp_location=gs://BUCKET_NAME/temp \
--save_main_session

Run the pipeline using Dataproc for building data

The file etl_pipeline_spark.py contains the Python code for the etl pipeline with Apache Spark. We can upload the file using the Cloud Shell Editor.

Submit etl_pipeline_spark.py to your Dataproc cluster to run the Spark job. We need to set the cluster name, and set the region in which the created job should run. To write data to Bigquery, the jar file of spark-bigquery-connector must be available at runtime.

$ gcloud dataproc jobs submit pyspark etl_pipeline_spark.py \
--cluster=cluster-name \
--region=region \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar

4. Use a Cloud Function to trigger Cloud Dataflow

Use the Cloud Fucntion to automatically trigger the Dataflow pipeline when a new file arrives in the GCS bucket.

First, we need to create a Dataflow template for runnig the data pipeline with REST API request called by the Cloud Function. The file etl_pipeline_beam_auto.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Create a Dataflow template

Use etl_pipeline_beam_auto.py to create a Dataflow template. Note that we need to set the Cloud Storage location of the staging, temporary and template file, and set the region in which the created job should run.

python -m etl_pipeline_beam_auto \
    --runner DataflowRunner \
    --project projectID \
    --region=region \
    --staging_location gs://BUCKET_NAME/staging \
    --temp_location gs://BUCKET_NAME/temp \
    --template_location gs://BUCKET_NAME/template \
    --save_main_session

Create a Cloud Function

Go to the Cloud Function GUI and manually create a function, set Trigger as Cloud Storage, Event Type as Finalize/Create , and choose the GCS bucket which needs to be monitored. Next, write the function itself, use the code in main.py file. Note that the user defined parameter input is passed to the Dataflow pipeline job. Finally, click on depoly and now your function is ready to execute and start the Dataflow pipeline when a file is uploaded in your bucket.

Results

When each ETL pipeline is completed and succeeded, navigating to BigQuery to verify that the data is successfully loaded in the table.

BigQuery - land_data table

Now the data is ready for analytics and reporting. Here, we calculate average price by year in BigQuery, and visualize the results in Data Studio.

Data Studio - Average land price by year in Yilan County

Calculate multilateral price indices in Python (with Pandas and PySpark).

IndexNumCalc Calculate multilateral price indices using the GEKS-T (CCDI), Time Product Dummy (TPD), Time Dummy Hedonic (TDH), Geary-Khamis (GK) metho

Dr. Usman Kayani 3 Apr 27, 2022
DaDRA (day-druh) is a Python library for Data-Driven Reachability Analysis.

DaDRA (day-druh) is a Python library for Data-Driven Reachability Analysis. The main goal of the package is to accelerate the process of computing estimates of forward reachable sets for nonlinear dy

2 Nov 08, 2021
TextDescriptives - A Python library for calculating a large variety of statistics from text

A Python library for calculating a large variety of statistics from text(s) using spaCy v.3 pipeline components and extensions. TextDescriptives can be used to calculate several descriptive statistic

150 Dec 30, 2022
Basis Set Format Converter

Basis Set Format Converter Repository for the online tool that allows you to enter a basis set in the form of text input for a variety of Quantum Chem

Manas Sharma 3 Jun 27, 2022
Data Intelligence Applications - Online Product Advertising and Pricing with Context Generation

Data Intelligence Applications - Online Product Advertising and Pricing with Context Generation Overview Consider the scenario in which advertisement

Manuel Bressan 2 Nov 18, 2021
Larch: Applications and Python Library for Data Analysis of X-ray Absorption Spectroscopy (XAS, XANES, XAFS, EXAFS), X-ray Fluorescence (XRF) Spectroscopy and Imaging

Larch: Data Analysis Tools for X-ray Spectroscopy and More Documentation: http://xraypy.github.io/xraylarch Code: http://github.com/xraypy/xraylarch L

xraypy 95 Dec 13, 2022
Approximate Nearest Neighbor Search for Sparse Data in Python!

Approximate Nearest Neighbor Search for Sparse Data in Python! This library is well suited to finding nearest neighbors in sparse, high dimensional spaces (like text documents).

Meta Research 906 Jan 01, 2023
Jupyter notebooks for the book "The Elements of Statistical Learning".

This repository contains Jupyter notebooks implementing the algorithms found in the book and summary of the textbook.

Madiyar 369 Dec 30, 2022
Vectorizers for a range of different data types

Vectorizers for a range of different data types

Tutte Institute for Mathematics and Computing 69 Dec 29, 2022
Airflow ETL With EKS EFS Sagemaker

Airflow ETL With EKS EFS & Sagemaker (en desarrollo) Diagrama de la solución Imp

1 Feb 14, 2022
Datashredder is a simple data corruption engine written in python. You can corrupt anything text, images and video.

Datashredder is a simple data corruption engine written in python. You can corrupt anything text, images and video. You can chose the cha

2 Jul 22, 2022
A Python package for modular causal inference analysis and model evaluations

Causal Inference 360 A Python package for inferring causal effects from observational data. Description Causal inference analysis enables estimating t

International Business Machines 506 Dec 19, 2022
Powerful, efficient particle trajectory analysis in scientific Python.

freud Overview The freud Python library provides a simple, flexible, powerful set of tools for analyzing trajectories obtained from molecular dynamics

Glotzer Group 195 Dec 20, 2022
Project under the certification "Data Analysis with Python" on FreeCodeCamp

Sea Level Predictor Assignment You will anaylize a dataset of the global average sea level change since 1880. You will use the data to predict the sea

Bhavya Gopal 3 Jan 31, 2022
wikirepo is a Python package that provides a framework to easily source and leverage standardized Wikidata information

Python based Wikidata framework for easy dataframe extraction wikirepo is a Python package that provides a framework to easily source and leverage sta

Andrew Tavis McAllister 35 Jan 04, 2023
Intercepting proxy + analysis toolkit for Second Life compatible virtual worlds

Hippolyzer Hippolyzer is a revival of Linden Lab's PyOGP library targeting modern Python 3, with a focus on debugging issues in Second Life-compatible

Salad Dais 6 Sep 01, 2022
track your GitHub statistics

GitHub-Stalker track your github statistics 👀 features find new followers or unfollowers find who got a star on your project or remove stars find who

Bahadır Araz 34 Nov 18, 2022
Exploring the Top ML and DL GitHub Repositories

This repository contains my work related to my project where I scraped data on the most popular machine learning and deep learning GitHub repositories in order to further visualize and analyze it.

Nico Van den Hooff 17 Aug 21, 2022
Data Analysis for First Year Laboratory at Imperial College, London.

Data Analysis for First Year Laboratory at Imperial College, London. For personal reference only, and to reference in lab reports and lab books.

Martin He 0 Aug 29, 2022
PostQF is a user-friendly Postfix queue data filter which operates on data produced by postqueue -j.

PostQF Copyright © 2022 Ralph Seichter PostQF is a user-friendly Postfix queue data filter which operates on data produced by postqueue -j. See the ma

Ralph Seichter 11 Nov 24, 2022