Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
LightGBM + Optuna: no brainer

AutoLGBM LightGBM + Optuna: no brainer auto train lightgbm directly from CSV files auto tune lightgbm using optuna auto serve best lightgbm model usin

Rishiraj Acharya 22 Dec 15, 2022
A machine learning model for Covid case prediction

CovidcasePrediction A machine learning model for Covid case prediction Problem Statement Using regression algorithms we can able to track the active c

VijayAadhithya2019rit 1 Feb 02, 2022
Python Automated Machine Learning library for tabular data.

Simple but powerful Automated Machine Learning library for tabular data. It uses efficient in-memory SAP HANA algorithms to automate routine Data Scie

Daniel Khromov 47 Dec 17, 2022
Python implementation of Weng-Lin Bayesian ranking, a better, license-free alternative to TrueSkill

Python implementation of Weng-Lin Bayesian ranking, a better, license-free alternative to TrueSkill This is a port of the amazing openskill.js package

Open Debates Project 156 Dec 14, 2022
Pydantic based mock data generation

This library offers powerful mock data generation capabilities for pydantic based models. It can also be used with other libraries that use pydantic as a foundation, for example SQLModel, Beanie and

Na'aman Hirschfeld 396 Dec 28, 2022
mlpack: a scalable C++ machine learning library --

a fast, flexible machine learning library Home | Documentation | Doxygen | Community | Help | IRC Chat Download: current stable version (3.4.2) mlpack

mlpack 4.2k Jan 01, 2023
Module is created to build a spam filter using Python and the multinomial Naive Bayes algorithm.

Naive-Bayes Spam Classificator Module is created to build a spam filter using Python and the multinomial Naive Bayes algorithm. Main goal is to code a

Viktoria Maksymiuk 1 Jun 27, 2022
List of Data Science Cheatsheets to rule the world

Data Science Cheatsheets List of Data Science Cheatsheets to rule the world. Table of Contents Business Science Business Science Problem Framework Dat

Favio André Vázquez 11.7k Dec 30, 2022
A simple python program which predicts the success of a movie based on it's type, actor, actress and director

Movie-Success-Prediction A simple python program which predicts the success of a movie based on it's type, actor, actress and director. The program us

Mahalinga Prasad R N 1 Dec 17, 2021
Add built-in support for quaternions to numpy

Quaternions in numpy This Python module adds a quaternion dtype to NumPy. The code was originally based on code by Martin Ling (which he wrote with he

Mike Boyle 531 Dec 28, 2022
Test symmetries with sklearn decision tree models

Test symmetries with sklearn decision tree models Setup Begin from an environment with a recent version of python 3. source setup.sh Leave the enviro

Rupert Tombs 2 Jul 19, 2022
This repository contains full machine learning pipeline of the Zillow Houses competition on Kaggle platform.

Zillow-Houses This repository contains full machine learning pipeline of the Zillow Houses competition on Kaggle platform. Pipeline is consists of 10

2 Jan 09, 2022
scikit-multimodallearn is a Python package implementing algorithms multimodal data.

scikit-multimodallearn is a Python package implementing algorithms multimodal data. It is compatible with scikit-learn, a popul

12 Jun 29, 2022
A toolbox to iNNvestigate neural networks' predictions!

iNNvestigate neural networks! Table of contents Introduction Installation Usage and Examples More documentation Contributing Releases Introduction In

Maximilian Alber 1.1k Jan 05, 2023
Model Agnostic Confidence Estimator (MACEST) - A Python library for calibrating Machine Learning models' confidence scores

Model Agnostic Confidence Estimator (MACEST) - A Python library for calibrating Machine Learning models' confidence scores

Oracle 95 Dec 28, 2022
LibTraffic is a unified, flexible and comprehensive traffic prediction library based on PyTorch

LibTraffic is a unified, flexible and comprehensive traffic prediction library, which provides researchers with a credibly experimental tool and a convenient development framework. Our library is imp

432 Jan 05, 2023
A Python toolkit for rule-based/unsupervised anomaly detection in time series

Anomaly Detection Toolkit (ADTK) Anomaly Detection Toolkit (ADTK) is a Python package for unsupervised / rule-based time series anomaly detection. As

Arundo Analytics 888 Dec 30, 2022
Dual Adaptive Sampling for Machine Learning Interatomic potential.

DAS Dual Adaptive Sampling for Machine Learning Interatomic potential. How to cite If you use this code in your research, please cite this using: Hong

6 Jul 06, 2022
Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Olá! Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogr

Henrique de Paula 10 Apr 04, 2022
Bayesian optimization based on Gaussian processes (BO-GP) for CFD simulations.

BO-GP Bayesian optimization based on Gaussian processes (BO-GP) for CFD simulations. The BO-GP codes are developed using GPy and GPyOpt. The optimizer

KTH Mechanics 8 Mar 31, 2022