Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
A library to generate synthetic time series data by easy-to-use factors and generator

timeseries-generator This repository consists of a python packages that generates synthetic time series dataset in a generic way (under /timeseries_ge

Nike Inc. 87 Dec 20, 2022
MIT-Machine Learning with Python–From Linear Models to Deep Learning

MIT-Machine Learning with Python–From Linear Models to Deep Learning | One of the 5 courses in MIT MicroMasters in Statistics & Data Science Welcome t

2 Aug 23, 2022
A Python toolkit for rule-based/unsupervised anomaly detection in time series

Anomaly Detection Toolkit (ADTK) Anomaly Detection Toolkit (ADTK) is a Python package for unsupervised / rule-based time series anomaly detection. As

Arundo Analytics 888 Dec 30, 2022
Open-Source CI/CD platform for ML teams. Deliver ML products, better & faster. ⚡️🧑‍🔧

Deliver ML products, better & faster Giskard is an Open-Source CI/CD platform for ML teams. Inspect ML models visually from your Python notebook 📗 Re

Giskard 335 Jan 04, 2023
Confidence intervals for scikit-learn forest algorithms

forest-confidence-interval: Confidence intervals for Forest algorithms Forest algorithms are powerful ensemble methods for classification and regressi

272 Dec 01, 2022
Reggy - Regressions with arbitrarily complex regularization terms

reggy Regressions with arbitrarily complex regularization terms. Currently suppo

Kim 1 Jan 20, 2022
A framework for building (and incrementally growing) graph-based data structures used in hierarchical or DAG-structured clustering and nearest neighbor search

A framework for building (and incrementally growing) graph-based data structures used in hierarchical or DAG-structured clustering and nearest neighbor search

Nicholas Monath 31 Nov 03, 2022
Machine Learning e Data Science com Python

Machine Learning e Data Science com Python Arquivos do curso de Data Science e Machine Learning com Python na Udemy, cliqe aqui para acessá-lo. O prin

Renan Barbosa 1 Jan 27, 2022
🌊 River is a Python library for online machine learning.

River is a Python library for online machine learning. It is the result of a merger between creme and scikit-multiflow. River's ambition is to be the go-to library for doing machine learning on strea

OnlineML 4k Jan 03, 2023
A fast, distributed, high performance gradient boosting (GBT, GBDT, GBRT, GBM or MART) framework based on decision tree algorithms, used for ranking, classification and many other machine learning tasks.

Light Gradient Boosting Machine LightGBM is a gradient boosting framework that uses tree based learning algorithms. It is designed to be distributed a

Microsoft 14.5k Jan 07, 2023
fMRIprep Pipeline To Machine Learning

fMRIprep Pipeline To Machine Learning(Demo) 所有配置均在config.py文件下定义 前置环境(lilab) 各个节点均安装docker,并有fmripre的镜像 可以使用conda中的base环境(相应的第三份包之后更新) 1. fmriprep scr

Alien 3 Mar 08, 2022
Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Olá! Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogr

Henrique de Paula 10 Apr 04, 2022
A unified framework for machine learning with time series

Welcome to sktime A unified framework for machine learning with time series We provide specialized time series algorithms and scikit-learn compatible

The Alan Turing Institute 6k Jan 06, 2023
onelearn: Online learning in Python

onelearn: Online learning in Python Documentation | Reproduce experiments | onelearn stands for ONE-shot LEARNning. It is a small python package for o

15 Nov 06, 2022
A Python package to preprocess time series

Disclaimer: This package is WIP. Do not take any APIs for granted. tspreprocess Time series can contain noise, may be sampled under a non fitting rate

Maximilian Christ 57 Dec 17, 2022
Coursera Machine Learning - Python code

Coursera Machine Learning This repository contains python implementations of certain exercises from the course by Andrew Ng. For a number of assignmen

Jordi Warmenhoven 859 Dec 10, 2022
Apache (Py)Spark type annotations (stub files).

PySpark Stubs A collection of the Apache Spark stub files. These files were generated by stubgen and manually edited to include accurate type hints. T

Maciej 114 Nov 22, 2022
ml4ir: Machine Learning for Information Retrieval

ml4ir: Machine Learning for Information Retrieval | changelog Quickstart → ml4ir Read the Docs | ml4ir pypi | python ReadMe ml4ir is an open source li

Salesforce 77 Jan 06, 2023
Applied Machine Learning for Graduate Program in Computer Science (PPGCC)

Applied Machine Learning for Graduate Program in Computer Science (PPGCC) - Federal University of Santa Catarina

Jônatas Negri Grandini 1 Dec 22, 2021
GRaNDPapA: Generator of Rad Names from Decent Paper Acronyms

Generator of Rad Names from Decent Paper Acronyms

264 Nov 08, 2022