Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

You might also like...
Microsoft Machine Learning for Apache Spark
Microsoft Machine Learning for Apache Spark

Microsoft Machine Learning for Apache Spark MMLSpark is an ecosystem of tools aimed towards expanding the distributed computing framework Apache Spark

Distributed Tensorflow, Keras and PyTorch on Apache Spark/Flink & Ray
Distributed Tensorflow, Keras and PyTorch on Apache Spark/Flink & Ray

A unified Data Analytics and AI platform for distributed TensorFlow, Keras and PyTorch on Apache Spark/Flink & Ray What is Analytics Zoo? Analytics Zo

[DEPRECATED] Tensorflow wrapper for DataFrames on Apache Spark

TensorFrames (Deprecated) Note: TensorFrames is deprecated. You can use pandas UDF instead. Experimental TensorFlow binding for Scala and Apache Spark

Spark development environment for k8s

Local Spark Dev Env with Docker Development environment for k8s. Using the spark-operator image to ensure it will be the same environment. Start conta

Code base of KU AIRS: SPARK Autonomous Vehicle Team

KU AIRS: SPARK Autonomous Vehicle Project Check this link for the blog post describing this project and the video of SPARK in simulation and on parkou

Automated Machine Learning Pipeline with Feature Engineering and Hyper-Parameters Tuning
Automated Machine Learning Pipeline with Feature Engineering and Hyper-Parameters Tuning

The mljar-supervised is an Automated Machine Learning Python package that works with tabular data. I

The easy way to combine mlflow, hydra and optuna into one machine learning pipeline.
The easy way to combine mlflow, hydra and optuna into one machine learning pipeline.

mlflow_hydra_optuna_the_easy_way The easy way to combine mlflow, hydra and optuna into one machine learning pipeline. Objective TODO Usage 1. build do

Python ML pipeline that showcases mltrace functionality.
Python ML pipeline that showcases mltrace functionality.

mltrace tutorial Date: October 2021 This tutorial builds a training and testing pipeline for a toy ML prediction problem: to predict whether a passeng

fMRIprep Pipeline To Machine Learning

fMRIprep Pipeline To Machine Learning(Demo) 所有配置均在config.py文件下定义 前置环境(lilab) 各个节点均安装docker,并有fmripre的镜像 可以使用conda中的base环境(相应的第三份包之后更新) 1. fmriprep scr

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
Distributed scikit-learn meta-estimators in PySpark

sk-dist: Distributed scikit-learn meta-estimators in PySpark What is it? sk-dist is a Python package for machine learning built on top of scikit-learn

Ibotta 282 Dec 9, 2022
Estudos e projetos feitos com PySpark.

PySpark (Spark com Python) PySpark é uma biblioteca Spark escrita em Python, e seu objetivo é permitir a análise interativa dos dados em um ambiente d

Karinne Cristina 54 Nov 6, 2022
Python library which makes it possible to dynamically mask/anonymize data using JSON string or python dict rules in a PySpark environment.

pyspark-anonymizer Python library which makes it possible to dynamically mask/anonymize data using JSON string or python dict rules in a PySpark envir

null 6 Jun 30, 2022
PySpark ML Bank Churn Prediction

PySpark-Bank-Churn Surname: corresponds to the record (row) number and has no effect on the output. CreditScore: contains random values and has no eff

kemalgunay 2 Nov 11, 2021
Implementation of K-Nearest Neighbors Algorithm Using PySpark

KNN With Spark Implementation of KNN using PySpark. The KNN was used on two separate datasets (https://archive.ics.uci.edu/ml/datasets/iris and https:

Zachary Petroff 4 Dec 30, 2022
Arquivos do curso online sobre a estatística voltada para ciência de dados e aprendizado de máquina.

Estatistica para Ciência de Dados e Machine Learning Arquivos do curso online sobre a estatística voltada para ciência de dados e aprendizado de máqui

Renan Barbosa 1 Jan 10, 2022
Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow

eXtreme Gradient Boosting Community | Documentation | Resources | Contributors | Release Notes XGBoost is an optimized distributed gradient boosting l

Distributed (Deep) Machine Learning Community 23.6k Jan 3, 2023
BigDL: Distributed Deep Learning Framework for Apache Spark

BigDL: Distributed Deep Learning on Apache Spark What is BigDL? BigDL is a distributed deep learning library for Apache Spark; with BigDL, users can w

null 4.1k Jan 9, 2023
Distributed Deep learning with Keras & Spark

Elephas: Distributed Deep Learning with Keras & Spark Elephas is an extension of Keras, which allows you to run distributed deep learning models at sc

Max Pumperla 1.6k Dec 29, 2022
TensorFlowOnSpark brings TensorFlow programs to Apache Spark clusters.

TensorFlowOnSpark TensorFlowOnSpark brings scalable deep learning to Apache Hadoop and Apache Spark clusters. By combining salient features from the T

Yahoo 3.8k Jan 4, 2023