Building house price data pipelines with Apache Beam and Spark on GCP

Overview

house-price-etl-pipeline

This project contains the process from building a web crawler to extract the raw data of house price to create ETL pipelines using Google Could Platform services.

Basic flow of the ETL pipeline

The ETL pipelines are built with both Apache Beam using Cloud Dataflow and Spark using Cloud Dataproc for loading real estate transactions data into BigQuery, and the data can be visualized in Data Studio. The project also uses Cloud Function to monitor if a new file is uploaded in the GCS bucket and trigger the pipeline automatically.

1. Get Started

The house price data

Actual price registration of real estate transactions data in Taiwan has been released since 2012, which refers to the transaction information includes: position and area of real estate, total price of land and building, parking space related information, etc. We can use the data to observe the changes in house prices over time or predict the house price trend in various regions.

Setup and requirements

Set up on Google Cloud Platform:

Project is created with:

  • Python version: 3.7
  • Apache beam version: 2.33.0
  • Pyspark version: 3.2.0

2. Use a web crawler to download the historical data

Run the web crawler to download historical actual price data in csv format, and upload the files to the Google Cloud Storage bucket.

First, set up the local Python development environment and install packages from requirements.txt:

$ pip install -r requirements.txt

Open crawler.py file, replace YOUR_DIR_PATH with a local directory to store download data, replace projectID with your Google Cloud project ID, and replace GCS_BUCKET_NAME with the name of your Cloud Storage bucket. Then run the web crawler:

$ python crawler.py

3. Build ETL pipelines on GCP

There are two versions of ETL pipelines that read source files from Cloud Storage, apply some transformations and load the data into BigQuery. One of the ETL pipelines based on Apache beam uses Dataflow to process the data for analytics of land transaction. The other ETL pipeline based on Apache Spark uses Dataproc to proccess the data for analytics of building transaction.

Let’s start by opening a session in Google Cloud Shell. Run the following commands to set the project property with your project ID.

$ gcloud config set project [projectID]

Run the pipeline using Dataflow for land data

The file etl_pipeline_beam.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Run actual_price_etl.py to create a Dataflow job which runs the DataflowRunner. Notice that we need to set the Cloud Storage location of the staging and template file, and set the region in which the created job should run.

$ python etl_pipeline_beam.py \
--project=projectID \
--region=region \
--runner=DataflowRunner \
--staging_location=gs://BUCKET_NAME/staging \
--temp_location=gs://BUCKET_NAME/temp \
--save_main_session

Run the pipeline using Dataproc for building data

The file etl_pipeline_spark.py contains the Python code for the etl pipeline with Apache Spark. We can upload the file using the Cloud Shell Editor.

Submit etl_pipeline_spark.py to your Dataproc cluster to run the Spark job. We need to set the cluster name, and set the region in which the created job should run. To write data to Bigquery, the jar file of spark-bigquery-connector must be available at runtime.

$ gcloud dataproc jobs submit pyspark etl_pipeline_spark.py \
--cluster=cluster-name \
--region=region \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar

4. Use a Cloud Function to trigger Cloud Dataflow

Use the Cloud Fucntion to automatically trigger the Dataflow pipeline when a new file arrives in the GCS bucket.

First, we need to create a Dataflow template for runnig the data pipeline with REST API request called by the Cloud Function. The file etl_pipeline_beam_auto.py contains the Python code for the etl pipeline with Apache beam. We can upload the file using the Cloud Shell Editor.

Create a Dataflow template

Use etl_pipeline_beam_auto.py to create a Dataflow template. Note that we need to set the Cloud Storage location of the staging, temporary and template file, and set the region in which the created job should run.

python -m etl_pipeline_beam_auto \
    --runner DataflowRunner \
    --project projectID \
    --region=region \
    --staging_location gs://BUCKET_NAME/staging \
    --temp_location gs://BUCKET_NAME/temp \
    --template_location gs://BUCKET_NAME/template \
    --save_main_session

Create a Cloud Function

Go to the Cloud Function GUI and manually create a function, set Trigger as Cloud Storage, Event Type as Finalize/Create , and choose the GCS bucket which needs to be monitored. Next, write the function itself, use the code in main.py file. Note that the user defined parameter input is passed to the Dataflow pipeline job. Finally, click on depoly and now your function is ready to execute and start the Dataflow pipeline when a file is uploaded in your bucket.

Results

When each ETL pipeline is completed and succeeded, navigating to BigQuery to verify that the data is successfully loaded in the table.

BigQuery - land_data table

Now the data is ready for analytics and reporting. Here, we calculate average price by year in BigQuery, and visualize the results in Data Studio.

Data Studio - Average land price by year in Yilan County

Data imputations library to preprocess datasets with missing data

Impyute is a library of missing data imputation algorithms. This library was designed to be super lightweight, here's a sneak peak at what impyute can do.

Elton Law 329 Dec 05, 2022
BasstatPL is a package for performing different tabulations and calculations for descriptive statistics.

BasstatPL is a package for performing different tabulations and calculations for descriptive statistics. It provides: Frequency table constr

Angel Chavez 1 Oct 31, 2021
Weather analysis with Python, SQLite, SQLAlchemy, and Flask

Surf's Up Weather analysis with Python, SQLite, SQLAlchemy, and Flask Overview The purpose of this analysis was to examine weather trends (precipitati

Art Tucker 1 Sep 05, 2021
Validation and inference over LinkML instance data using souffle

Translates LinkML schemas into Datalog programs and executes them using Souffle, enabling advanced validation and inference over instance data

Linked data Modeling Language 7 Aug 07, 2022
An extension to pandas dataframes describe function.

pandas_summary An extension to pandas dataframes describe function. The module contains DataFrameSummary object that extend describe() with: propertie

Mourad 450 Dec 30, 2022
Python script to automate the plotting and analysis of percentage depth dose and dose profile simulations in TOPAS.

topas-create-graphs A script to automatically plot the results of a topas simulation Works for percentage depth dose (pdd) and dose profiles (dp). Dep

Sebastian Schäfer 10 Dec 08, 2022
Pizza Orders Data Pipeline Usecase Solved by SQL, Sqoop, HDFS, Hive, Airflow.

PizzaOrders_DataPipeline There is a Tony who is owning a New Pizza shop. He knew that pizza alone was not going to help him get seed funding to expand

Melwin Varghese P 4 Jun 05, 2022
A fast, flexible, and performant feature selection package for python.

linselect A fast, flexible, and performant feature selection package for python. Package in a nutshell It's built on stepwise linear regression When p

88 Dec 06, 2022
Feature engineering and machine learning: together at last

Feature engineering and machine learning: together at last! Lambdo is a workflow engine which significantly simplifies data analysis by unifying featu

Alexandr Savinov 14 Sep 15, 2022
PCAfold is an open-source Python library for generating, analyzing and improving low-dimensional manifolds obtained via Principal Component Analysis (PCA).

PCAfold is an open-source Python library for generating, analyzing and improving low-dimensional manifolds obtained via Principal Component Analysis (PCA).

Burn Research 4 Oct 13, 2022
ForecastGA is a Python tool to forecast Google Analytics data using several popular time series models.

ForecastGA is a tool that combines a couple of popular libraries, Atspy and googleanalytics, with a few enhancements.

JR Oakes 36 Jan 03, 2023
🧪 Panel-Chemistry - exploratory data analysis and build powerful data and viz tools within the domain of Chemistry using Python and HoloViz Panel.

🧪📈 🐍. The purpose of the panel-chemistry project is to make it really easy for you to do DATA ANALYSIS and build powerful DATA AND VIZ APPLICATIONS within the domain of Chemistry using using Python a

Marc Skov Madsen 97 Dec 08, 2022
A python package which can be pip installed to perform statistics and visualize binomial and gaussian distributions of the dataset

GBiStat package A python package to assist programmers with data analysis. This package could be used to plot : Binomial Distribution of the dataset p

Rishikesh S 4 Oct 17, 2022
a tool that compiles a csv of all h1 program stats

h1stats - h1 Program Stats Scraper This python3 script will call out to HackerOne's graphql API and scrape all currently active programs for informati

Evan 40 Oct 27, 2022
Mining the Stack Overflow Developer Survey

Mining the Stack Overflow Developer Survey A prototype data mining application to compare the accuracy of decision tree and random forest regression m

1 Nov 16, 2021
Fitting thermodynamic models with pycalphad

ESPEI ESPEI, or Extensible Self-optimizing Phase Equilibria Infrastructure, is a tool for thermodynamic database development within the CALPHAD method

Phases Research Lab 42 Sep 12, 2022
This creates a ohlc timeseries from downloaded CSV files from NSE India website and makes a SQLite database for your research.

NSE-timeseries-form-CSV-file-creator-and-SQL-appender- This creates a ohlc timeseries from downloaded CSV files from National Stock Exchange India (NS

PILLAI, Amal 1 Oct 02, 2022
Open-source Laplacian Eigenmaps for dimensionality reduction of large data in python.

Fast Laplacian Eigenmaps in python Open-source Laplacian Eigenmaps for dimensionality reduction of large data in python. Comes with an wrapper for NMS

17 Jul 09, 2022
Wafer Fault Detection - Wafer circleci with python

Wafer Fault Detection Problem Statement: Wafer (In electronics), also called a slice or substrate, is a thin slice of semiconductor, such as a crystal

Avnish Yadav 14 Nov 21, 2022
t-SNE and hierarchical clustering are popular methods of exploratory data analysis, particularly in biology.

tree-SNE t-SNE and hierarchical clustering are popular methods of exploratory data analysis, particularly in biology. Building on recent advances in s

Isaac Robinson 61 Nov 21, 2022