Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
A machine learning project that predicts the price of used cars in the UK

Car Price Prediction Image Credit: AA Cars Project Overview Scraped 3000 used cars data from AA Cars website using Python and BeautifulSoup. Cleaned t

Victor Umunna 7 Oct 13, 2022
A Python implementation of FastDTW

fastdtw Python implementation of FastDTW [1], which is an approximate Dynamic Time Warping (DTW) algorithm that provides optimal or near-optimal align

tanitter 651 Jan 04, 2023
A machine learning toolkit dedicated to time-series data

tslearn The machine learning toolkit for time series analysis in Python Section Description Installation Installing the dependencies and tslearn Getti

2.3k Jan 05, 2023
Predicting job salaries from ads - a Kaggle competition

Predicting job salaries from ads - a Kaggle competition

Zygmunt Zając 57 Oct 23, 2020
cleanlab is the data-centric ML ops package for machine learning with noisy labels.

cleanlab is the data-centric ML ops package for machine learning with noisy labels. cleanlab cleans labels and supports finding, quantifying, and lear

Cleanlab 51 Nov 28, 2022
Greykite: A flexible, intuitive and fast forecasting library

The Greykite library provides flexible, intuitive and fast forecasts through its flagship algorithm, Silverkite.

LinkedIn 1.4k Jan 15, 2022
A machine learning web application for binary classification using streamlit

Machine Learning web App This is a machine learning web application for binary classification using streamlit options this application contains 3 clas

abdelhak mokri 1 Dec 20, 2021
flexible time-series processing & feature extraction

A corona statistics and information telegram bot.

PreDiCT.IDLab 206 Dec 28, 2022
Machine Learning University: Accelerated Natural Language Processing Class

Machine Learning University: Accelerated Natural Language Processing Class This repository contains slides, notebooks and datasets for the Machine Lea

AWS Samples 2k Jan 01, 2023
A machine learning toolkit dedicated to time-series data

tslearn The machine learning toolkit for time series analysis in Python Section Description Installation Installing the dependencies and tslearn Getti

2.3k Dec 29, 2022
Microsoft contributing libraries, tools, recipes, sample codes and workshop contents for machine learning & deep learning.

Microsoft contributing libraries, tools, recipes, sample codes and workshop contents for machine learning & deep learning.

Microsoft 366 Jan 03, 2023
CorrProxies - Optimizing Machine Learning Inference Queries with Correlative Proxy Models

CorrProxies - Optimizing Machine Learning Inference Queries with Correlative Proxy Models

ZhihuiYangCS 8 Jun 07, 2022
Adaptive: parallel active learning of mathematical functions

adaptive Adaptive: parallel active learning of mathematical functions. adaptive is an open-source Python library designed to make adaptive parallel fu

741 Dec 27, 2022
A Tools that help Data Scientists and ML engineers train and deploy ML models.

Domino Research This repo contains projects under active development by the Domino R&D team. We build tools that help Data Scientists and ML engineers

Domino Data Lab 73 Oct 17, 2022
A Powerful Serverless Analysis Toolkit That Takes Trial And Error Out of Machine Learning Projects

KXY: A Seemless API to 10x The Productivity of Machine Learning Engineers Documentation https://www.kxy.ai/reference/ Installation From PyPi: pip inst

KXY Technologies, Inc. 35 Jan 02, 2023
PennyLane is a cross-platform Python library for differentiable programming of quantum computers

PennyLane is a cross-platform Python library for differentiable programming of quantum computers. Train a quantum computer the same way as a neural ne

PennyLaneAI 1.6k Jan 01, 2023
A naive Bayes model for cancer classification using a set of documents

Naivebayes text classifcation model for cancer and noncancer documents Author: Alex King Purpose Requirements/files included How to use 1. Purpose The

Alex W King 1 Nov 24, 2021
Learning --> Numpy January 2022 - winter'22

Numerical-Python Numpy NumPy is a library for the Python programming language, adding support for large, multi-dimensional arrays and matrices, along

Shahzaneer Ahmed 0 Mar 12, 2022
PyHarmonize: Adding harmony lines to recorded melodies in Python

PyHarmonize: Adding harmony lines to recorded melodies in Python About To use this module, the user provides a wav file containing a melody, the key i

Julian Kappler 2 May 20, 2022
Turns your machine learning code into microservices with web API, interactive GUI, and more.

Turns your machine learning code into microservices with web API, interactive GUI, and more.

Machine Learning Tooling 2.8k Jan 02, 2023