Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
The easy way to combine mlflow, hydra and optuna into one machine learning pipeline.

mlflow_hydra_optuna_the_easy_way The easy way to combine mlflow, hydra and optuna into one machine learning pipeline. Objective TODO Usage 1. build do

shibuiwilliam 9 Sep 09, 2022
A naive Bayes model for cancer classification using a set of documents

Naivebayes text classifcation model for cancer and noncancer documents Author: Alex King Purpose Requirements/files included How to use 1. Purpose The

Alex W King 1 Nov 24, 2021
Avocado hass time series vs predict price

AVOCADO HASS TIME SERIES VÀ PREDICT PRICE Trước khi vào Heroku muốn giao diện đẹp mọi người chuyển giúp mình theo hình bên dưới https://avocado-hass.h

hieulmsc 3 Dec 18, 2021
Bayesian Additive Regression Trees For Python

BartPy Introduction BartPy is a pure python implementation of the Bayesian additive regressions trees model of Chipman et al [1]. Reasons to use BART

187 Dec 16, 2022
Skoot is a lightweight python library of machine learning transformer classes that interact with scikit-learn and pandas.

Skoot is a lightweight python library of machine learning transformer classes that interact with scikit-learn and pandas. Its objective is to ex

Taylor G Smith 54 Aug 20, 2022
Interactive Parallel Computing in Python

Interactive Parallel Computing with IPython ipyparallel is the new home of IPython.parallel. ipyparallel is a Python package and collection of CLI scr

IPython 2.3k Dec 30, 2022
Production Grade Machine Learning Service

This project is made to help you scale from a basic Machine Learning project for research purposes to a production grade Machine Learning web service

Abdullah Zaiter 10 Apr 04, 2022
icepickle is to allow a safe way to serialize and deserialize linear scikit-learn models

icepickle It's a cooler way to store simple linear models. The goal of icepickle is to allow a safe way to serialize and deserialize linear scikit-lea

vincent d warmerdam 24 Dec 09, 2022
Dragonfly is an open source python library for scalable Bayesian optimisation.

Dragonfly is an open source python library for scalable Bayesian optimisation. Bayesian optimisation is used for optimising black-box functions whose

744 Jan 02, 2023
Uplift modeling and causal inference with machine learning algorithms

Disclaimer This project is stable and being incubated for long-term support. It may contain new experimental code, for which APIs are subject to chang

Uber Open Source 3.7k Jan 07, 2023
Python package for concise, transparent, and accurate predictive modeling

Python package for concise, transparent, and accurate predictive modeling. All sklearn-compatible and easy to use. 📚 docs • 📖 demo notebooks Modern

Chandan Singh 983 Jan 01, 2023
Implementation of linesearch Optimization Algorithms in Python

Nonlinear Optimization Algorithms During my time as Scientific Assistant at the Karlsruhe Institute of Technology (Germany) I implemented various Opti

Paul 3 Dec 06, 2022
Crunchdao - Python API for the Crunchdao machine learning tournament

Python API for the Crunchdao machine learning tournament Interact with the Crunc

3 Jan 19, 2022
This machine learning model was developed for House Prices

This machine learning model was developed for House Prices - Advanced Regression Techniques competition in Kaggle by using several machine learning models such as Random Forest, XGBoost and LightGBM.

serhat_derya 1 Mar 02, 2022
Apache (Py)Spark type annotations (stub files).

PySpark Stubs A collection of the Apache Spark stub files. These files were generated by stubgen and manually edited to include accurate type hints. T

Maciej 114 Nov 22, 2022
Python ML pipeline that showcases mltrace functionality.

mltrace tutorial Date: October 2021 This tutorial builds a training and testing pipeline for a toy ML prediction problem: to predict whether a passeng

Log Labs 28 Nov 09, 2022
Causal Inference and Machine Learning in Practice with EconML and CausalML: Industrial Use Cases at Microsoft, TripAdvisor, Uber

Causal Inference and Machine Learning in Practice with EconML and CausalML: Industrial Use Cases at Microsoft, TripAdvisor, Uber

EconML/CausalML KDD 2021 Tutorial 124 Dec 28, 2022
XManager: A framework for managing machine learning experiments 🧑‍🔬

XManager is a platform for packaging, running and keeping track of machine learning experiments. It currently enables one to launch experiments locally or on Google Cloud Platform (GCP). Interaction

DeepMind 620 Dec 27, 2022
PyNNDescent is a Python nearest neighbor descent for approximate nearest neighbors.

PyNNDescent PyNNDescent is a Python nearest neighbor descent for approximate nearest neighbors. It provides a python implementation of Nearest Neighbo

Leland McInnes 699 Jan 09, 2023
MooGBT is a library for Multi-objective optimization in Gradient Boosted Trees.

MooGBT is a library for Multi-objective optimization in Gradient Boosted Trees. MooGBT optimizes for multiple objectives by defining constraints on sub-objective(s) along with a primary objective. Th

Swiggy 66 Dec 06, 2022