Fluxos de captura e subida de dados no datalake da Prefeitura do Rio de Janeiro.

Overview

Pipelines

Este repositório contém fluxos de captura e subida de dados no datalake da Prefeitura do Rio de Janeiro. O repositório é gerido pelo Escritório Municipal de Dados (EMD) e alimentado de forma colaborativa com as equipes de dados e tecnologia das Secretarias.

💜 Todo o código é desenvolvido em Python utilizando o software livre Prefect.

Configuração de ambiente para desenvolvimento

Requisitos

  • Um editor de texto (recomendado VS Code)
  • Python 3.9.x
  • pip
  • (Opcional, mas recomendado) Um ambiente virtual para desenvolvimento (miniconda, virtualenv ou similares)

Procedimentos

  • Clonar esse repositório
git clone https://github.com/prefeitura-rio/pipelines
  • Abrí-lo no seu editor de texto

  • No seu ambiente de desenvolvimento, instalar poetry para gerenciamento de dependências

pip3 install poetry
  • Instalar as dependências para desenvolvimento
poetry install
  • Instalar os hooks de pré-commit (ver #127 para entendimento dos hooks)
pre-commit install
  • Pronto! Seu ambiente está configurado para desenvolvimento.

Como desenvolver

Estrutura de diretorios

orgao/                       # diretório raiz para o órgão
|-- projeto1/                # diretório de projeto
|-- |-- __init__.py          # vazio
|-- |-- constants.py         # valores constantes para o projeto
|-- |-- flows.py             # declaração dos flows
|-- |-- schedules.py         # declaração dos schedules
|-- |-- tasks.py             # declaração das tasks
|-- |-- utils.py             # funções auxiliares para o projeto
...
|-- __init__.py              # importa todos os flows de todos os projetos
|-- constants.py             # valores constantes para o órgão
|-- flows.py                 # declaração de flows genéricos do órgão
|-- schedules.py             # declaração de schedules genéricos do órgão
|-- tasks.py                 # declaração de tasks genéricas do órgão
|-- utils.py                 # funções auxiliares para o órgão

orgao2/
...

utils/
|-- __init__.py
|-- flow1/
|-- |-- __init__.py
|-- |-- flows.py
|-- |-- tasks.py
|-- |-- utils.py
|-- flows.py                 # declaração de flows genéricos
|-- tasks.py                 # declaração de tasks genéricas
|-- utils.py                 # funções auxiliares

constants.py                 # valores constantes para todos os órgãos

Adicionando órgãos e projetos

O script manage.py é responsável por criar e listar projetos desse repositório. Para usá-lo, no entanto, você deve instalar as dependências em requirements-cli.txt:

pip3 install -r requirements-cli.txt

Você pode obter mais informações sobre os comandos com

python manage.py --help

O comando add-agency permite que você adicione um novo órgão a partir do template padrão. Para fazê-lo, basta executar

python manage.py add-agency nome-do-orgao

Isso irá criar um novo diretório com o nome nome-do-orgao em pipelines/ com o template padrão, já adaptado ao nome do órgão. O nome do órgão deve estar em snake case e deve ser único. Qualquer conflito com um projeto já existente será reportado.

Para listar os órgão existentes e nomes reservados, basta fazer

python manage.py list-projects

Em seguida, leia com anteção os comentários em cada um dos arquivos do seu projeto, de modo a evitar conflitos e erros. Links para a documentação do Prefect também encontram-se nos comentários.

Caso o órgão para o qual você desenvolverá um projeto já exista, basta fazer

python manage.py add-project nome-do-orgao nome-do-projeto

Adicionando dependências para execução

  • Requisitos de pipelines devem ser adicionados com
poetry add <package>
  • Requisitos do manage.py estão em requirements-cli.txt

  • Requisitos para a Action de deployment estão em requirements-deploy.txt

  • Requisitos para testes estão em requirements-tests.txt

Como testar uma pipeline localmente

Escolha a pipeline que deseja executar (exemplo pipelines.rj_escritorio.template_pipeline.flows.flow)

from pipelines.utils.utils import run_local
pipelines.rj_escritorio.template_pipeline.flows import flow

run_local(flow, parameters = {"param": "val"})

Como testar uma pipeline na nuvem

  1. Configure as variáveis de ambiente num arquivo chamado .env na raiz do projeto:
GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json  # Credenciais do Google Cloud
PREFECT__BACKEND=server
PREFECT__SERVER__HOST=http://prefect-apollo.prefect.svc.cluster.local
PREFECT__SERVER__PORT=4200
VAULT_ADDRESS=http://vault.vault.svc.cluster.local:8200/
VAULT_TOKEN=<token> # Valor do token do órgão para o qual você está desenvolvendo. Caso não saiba o token, entre em contato.
  1. Crie o arquivo test.py com a pipeline que deseja executar e adicione a função run_cloud com os parâmetros necessários:
from pipelines.utils import run_cloud
from pipelines.[secretaria].[pipeline].flows import flow # Complete com as infos da sua pipeline

run_cloud(
    flow,               # O flow que você deseja executar
    labels=[
        "example",      # Label para identificar o agente que irá executar a pipeline (ex: rj-sme)
    ],
    parameters = {
        "param": "val", # Parâmetros que serão passados para a pipeline (opcional)
    }
)
  1. Rode a pipeline com:
python test.py

A saída deve se assemelhar ao exemplo abaixo:

[2022-02-19 12:22:57-0300] INFO - prefect.GCS | Uploading xxxxxxxx-development/2022-02-19t15-22-57-694759-00-00 to datario-public
Flow URL: http://localhost:8080/default/flow/xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
 └── ID: xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
 └── Project: main
 └── Labels: []
Run submitted, please check it at:
http://prefect-ui.prefect.svc.cluster.local:8080/flow-run/xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
  • (Opcional, mas recomendado) Quando acabar de desenvolver sua pipeline, delete todas as versões da mesma pela UI do Prefect.
Comments
  • [WIP] Pipeline de viagens do SPPO

    [WIP] Pipeline de viagens do SPPO

    Descricao: Adiciona a pipeline para calculo de viagens do subsídio

    1. subsidio_sppo_planejado: Tratamento da operacao planejada da quinzena
    2. subsidio_sppo_viagens: Calculo das viagens da quinzena

    TODO:

    • [ ] Adicionar opção para rodar as viagens da quinzena completa (ou um período de datas) a inves de um unico dia: na pratica, chamar run_dbt_model multiplas vezes
    standby rj-smtr/subsidio_sppo 
    opened by fernandascovino 124
  • [WIP] [SMTR] Captura SIGMOB

    [WIP] [SMTR] Captura SIGMOB

    Descrição: Adiciona captura das APIs do SIGMOB e o flow de materialização do dataset br_rj_riodejaneiro_sigmob. As queries que serão materializadas se encontram no repo de queries da SMTR, e ainda estão em WIP. Algumas das tasks criadas podem ser incorporadas as tasks mais gerais relativas ao uso do DBT (i.e. pipelines.rj_smtr.br_rj_riodejaneiro_sigmob.run_dbt_schema). a task de backfill (build_incremental_model) ainda está num estado não tão refinado, testei usando a flag --vars da CLI do DBT para parametrizar as ranges de materialização, mas encontrei algumas complicações que pareceram desnecessárias, mas uma solução mais elegante ainda pode ser desenvolvida e incorporada as tasks do utils geral deste repositório. Ainda é necessário testar as tasks na cloud do Prefect, mas por enquanto estamos aguardando a finalização do desenvolvimento do SQL contido no repositorio de queries. A captura, porém, espera-se que já funcione. Além disso, a maioria dos parâmetros necessários para execução dos flows têm valores default definidos dentro de cada flow em vez de estarem sendo providos pelo schedule, caso isso não esteja de acordo com as melhores práticas, é fácil de lidar.

    Changelog:

    • new: pipelines/rj_smtr/init.py
    • new: pipelines/rj_smtr/br_rj_riodejaneiro_sigmob/constants.py
    • new: pipelines/rj_smtr/br_rj_riodejaneiro_sigmob/flows.py
    • new: pipelines/rj_smtr/br_rj_riodejaneiro_sigmob/schedules.py
    • new: pipelines/rj_smtr/br_rj_riodejaneiro_sigmob/tasks.py
    • new: pipelines/rj_smtr/br_rj_riodejaneiro_sigmob/utils.py
    • modified: pipelines/rj_smtr/tasks.py
    • modified: pipelines/rj_smtr/utils.py
    opened by Hellcassius 79
  • [WIP] Refatora recapturas de GPS do SPPO

    [WIP] Refatora recapturas de GPS do SPPO

    (1) Refatoração do flow captura_sppo_v2: Ajustes para uso da captura_sppo_v2 na recaptura *

    Changelog:

    • adição de 2 parâmetros opcionais de recaptura - bool indicando se é recaptura + erro que gerou a recaptura
    • modificação do parametro timestamp - quando há recaptura, utiliza parametro opcional datetime_filter, cc usa timestamp atual; fixa formatacao em isoformat
    • upload_logs_to_bq: mantém log do erro anterior que foi recapturado
    • run_config: limita uso de cpu e memoria

    (2) Refatoração do flow recaptura: Ajustes para uso da captura_sppo_v2 na recaptura

    Changelog:

    • create_flow_run: usa o flow de captura_sppo_v2 para criar múltiplas runs do mesmo, substituindo o uso do método .map em cada task
    • query_logs: aumenta limite de 40 para 60 recapturas por run; retorna dicionário com timestamp e erro + flag de recaptura, novos parametros que são passados no create_flow_run para a captura_sppo_v2
    • run_config: limita uso de cpu e memoria

    (3) Refatoração do flow materialize: Corrije atraso na materialização quando há falhas

    Changelog:

    • get_materialization_date_range: parametriza o delay da materialização de acordo com o flow + varia start_ts para preencher buracos de materialização (caso haja algum), fixando end_ts com base no delay_hour

    • Detalhamento do flow de captura:
    image conflict rj-smtr standby 
    opened by Hellcassius 67
  • [smtr][subsidio_sppo] Mudança dos sumários para `dashboard_subsidio_sppo`

    [smtr][subsidio_sppo] Mudança dos sumários para `dashboard_subsidio_sppo`

    • Criação do flow subsidio_sppo_apuracao com o objetivo de criação das views do subsídio no schema homônimo;
    • Ajuste da task get_run_dates para adaptar às necessidades do flow subsidio_sppo_apuracao e utilizar a data atual considerando a timezone (estava considerado UTC);
    • Teste do flow smtr_materialize_to_datario_viagem_sppo_flow. Em funcionamento com ou sem parâmetros;
    • Teste do subsidio_sppo_apuracao. Em funcionamento com ou sem parâmetros.
    rj-smtr/subsidio_sppo 
    opened by eng-rodrigocunha 63
  • [smtr][onibus_gps] Pipeline de realocação de linhas

    [smtr][onibus_gps] Pipeline de realocação de linhas

    Adiciona a pipeline para incorporar realocação de linhas, realizadas pelas empresas operadoras, visando corrigir o relacionamento entre linhas e veículos. Consequentemente, impacta na identificação de viagens e no subsídio.

    do-not-update 
    opened by eng-rodrigocunha 59
  • [WIP] Adiciona captura do RHO pro SPPO

    [WIP] Adiciona captura do RHO pro SPPO

    Descrição: PR para migração da pipeline de captura dos arquivos de RDO/RHO que hoje rodam no dagster. As funcionalidades são basicamente as mesmas, porém com alguma adaptação nas estruturas de saídas das funções para simplificar o código e adequar aos paradigmas do prefect. No primeiro commit, está sendo adicionada uma versão funcional para a captura do RHO do SPPO, única tabela que ainda não temos no BigQuery. O flow é parametrizado de forma genérica para que possamos usar a mesma definição, apenas variando parâmetros, para as capturas do RDO pro SPPO e dos RHO/RDO do STPL. TODO:

    • [x] Adicionar schedules para os outros arquivos
    • [ ] Ajustar os table_id's em rj-smtr.constants
    • [x] Adicionar os secrets para conexão com FTP ao vault
    • [ ] Definir a frequência com a qual cada flow será lançado
    • [ ] Dimensionar limites e requisições de memórias para o deploy
    conflict rj-smtr/rho 
    opened by Hellcassius 55
  • [smtr] [rdo] Adiciona flows de captura do RDO/RHO

    [smtr] [rdo] Adiciona flows de captura do RDO/RHO

    Descrição: Adiciona os flows de captura do RDO/RHO para os modais SPPO e STPL via FTP. A captura funciona buscando dentro do FTP os diretórios relativos ao modal e então, buscando todos os arquivos cujo nome inicie com o tipo do relatório ( parâmetro report type). A busca é limitada por padrão (parâmetro dump=False) para apenas arquivos entre o (datetime atual + 11,5 horas) e as 24h anteriores a isso, porém o flow pode ser utilizado também para realização de um dump de todos os arquivos presentes num dado momento (limitados a partir do ano de 2022). Após isso, seguimos com a sequência padrão de download local dos arquivos (originalmente em txt) e posterior pré tratamento para padronização de colunas e conversão para csv, seguido de upload para o bucket, sem upload de logs no momento. Devido às restrições impostas pelo uso de tabelas do tipo external_table e de não termos ainda pronto a infra para fácil geração das biglake tables, após a captura se segue uma etapa de materialização.

    rj-smtr/rho 
    opened by Hellcassius 44
  • [smtr] feat: captura GPS SPPO

    [smtr] feat: captura GPS SPPO

    Descrição: Migração da captura de GPS do SPPO para o prefect. A estrutura é basicamente idêntica à que está em https://github.com/RJ-SMTR/maestro/tree/main/repositories/capturas/br_rj_riodejaneiro_onibus_gps. Utiliza as tasks já testadas para a captura de gps do STPL em pipelines/rj-smtr/tasks.py, adicionando aos utils da SMTR uma função log_critical, que basicamente serve para postar mensagens no discord da SMTR visando diminuir o flood no servidor do escritório. Adiciona uma nova checagem do argumento kind em pipelines/rj_smtr/tasks.py::get_raw() para usar uma nova estrutura de autenticação.

    TODO:

    • [x] Adicionar schedules
    • [x] Adicionar webhook do canal critical aos secrets
    • [ ] Testar a postagem em caso de falha
    opened by Hellcassius 40
  • [WIP][smtr] fix: corrigir código de recaptura

    [WIP][smtr] fix: corrigir código de recaptura

    Closes #126

    Changelog:

    • [x] registros_logs: Adicionar busca pelos minutos sem timestamp_captura (além de erros capturados nos logs)
    • [x] upload_logs: Corrige status para registro dos logs

    TODO:

    • [x] Merge de ajustes nas falhas de captura https://github.com/prefeitura-rio/pipelines/pull/127
    • [ ] Alterar schedule para ~2:05
    • [ ] Testar a pipeline completa (recaptura + materialização)
    conflict 
    opened by fernandascovino 39
  • (add): pipeline to dump goals' dashboard data from Google Sheets to BQ

    (add): pipeline to dump goals' dashboard data from Google Sheets to BQ

    • Create function generate_dump_url_schedules to do the same as generate_dump_db_schedules, but in the url context;
    • Create flow and schedules to dump data from Google Sheets and use in the project SMFP - Dashboard de Metas
    opened by BrunodePauloAlmeida 34
  • [smtr] [rdo] Adiciona materialização

    [smtr] [rdo] Adiciona materialização

    Descrição

    O flow de materialização é um run_dbt_model padrão porém recebendo as variáveis run_dates (sempre em uma lista), para que possa materializar a tabela com base direta na data de partição usada para a captura, além de não possuir schedule próprio e somente ser ativada quando a captura baixar pelo menos um arquivo e fazer seu upload sem erros

    Recursos

    • Queries: (...)
    rj-smtr/rho 
    opened by Hellcassius 11
  • [FEATURE] Adiciona pipeline para gerar matriz de políticas de acesso

    [FEATURE] Adiciona pipeline para gerar matriz de políticas de acesso

    Código para testar as tasks

    from pipelines.utils.policy_matrix.tasks import *
    
    project_ids = ["rj-escritorio-dev"]
    
    discovery_api = get_discovery_api.run(mode="prod")
    policies = []
    for project_id in project_ids:
        policies.append(get_iam_policy.run(project_id=project_id, discovery_api=discovery_api))
    policies = merge_iam_policies.run(project_ids=project_ids, policies=policies)
    roles_matrix = generate_roles_matrix.run(policies=policies)
    dataframe = roles_matrix_to_pandas_dataframe.run(roles_matrix=roles_matrix)
    
    do-not-update 
    opened by gabriel-milan 1
  • [WIP] Adiciona parâmetros para BigLake tables

    [WIP] Adiciona parâmetros para BigLake tables

    Tarefas

    • [x] Adicionar parâmetros
    • [ ] Aguardar merge de https://github.com/basedosdados/mais/pull/1469
    • [ ] Fazer upgrade da versão do basedosdados
    • [ ] Resolver problemas de linting
    enhancement conflict escritorio-de-dados do-not-update 
    opened by gabriel-milan 3
Owner
Prefeitura do Rio de Janeiro
Prefeitura do Rio de Janeiro
Script de monitoramento das teclas do teclado, salvando todos os dados digitados em um arquivo de log juntamente com os dados de rede.

listenerPython Script de monitoramento das teclas do teclado, salvando todos os dados digitados em um arquivo de log juntamente com os dados de rede.

Vinícius Azevedo 4 Nov 27, 2022
Extrator de dados do jupiterweb

Extrator de dados do jupiterweb O programa é composto de dois arquivos: Um constando apenas de classes complementares que representam as unidades e as

Bruno Aricó 2 Nov 28, 2022
Desenvolvendo as habilidades básicas de programação visando a construção de aplicativos por meio de bibliotecas apropriadas à Ciência de Dados.

Algoritmos e Introdução à Computação Ementa: Conceitos básicos sobre algoritmos e métodos para sua construção. Tipos de dados e variáveis. Estruturas

Dyanna Cruz 1 Jan 6, 2022
Projeto de Jogo de dados em Python 3 onde é definido o lado a ser apostado e número de jogadas, pontuando os acertos e exibindo se ganhou ou perdeu.

Jogo de DadoX Projeto de script que simula um Jogo de dados em Python 3 onde é definido o lado a ser apostado (1, 2, 3, 4, 5 e 6) ou se vai ser um núm

Estênio Mariano 1 Jul 10, 2021
Script em python, utilizando PySimpleGUI, para a geração de arquivo txt a ser importado no sistema de Bilhetagem Eletrônica da RioCard, no Estado do Rio de Janeiro.

pedido-vt-riocard Script em python, utilizando PySimpleGUI, para a geração de arquivo txt a ser importado no sistema de Bilhetagem Eletrônica da RioCa

Carlos Bruno Gomes 1 Dec 1, 2021
Esse script procura qualquer, dados que você queira na wikipedia! Em breve traremos um com dados em toda a internet.

Buscador de dados simples Dependências necessárias Para você poder começar a utilizar esta ferramenta, você vai precisar da dependência "wikipedia", p

Erick Campoy 4 Feb 24, 2022
Script de monitoramento das teclas do teclado, salvando todos os dados digitados em um arquivo de log juntamente com os dados de rede.

listenerPython Script de monitoramento das teclas do teclado, salvando todos os dados digitados em um arquivo de log juntamente com os dados de rede.

Vinícius Azevedo 4 Nov 27, 2022
Bot Music Pintar. Created by Rio

?? Rio Music ?? Kalo Fork Star Ya Bang Hehehe Requirements ?? FFmpeg NodeJS nodesource.com Python 3.8+ or 3.7 PyTgCalls Generate String Using Replit ⤵

RioProjectX 7 Jun 15, 2022
Userbot Telegram + Music Voice Chats. Dibuat Untuk Bersenang - Senang , Dan Mempermudah Kegiatan. Created By Rio.

RIO - USERBOT Disclaimer Saya tidak bertanggung jawab atas penyalahgunaan bot ini. Bot ini dimaksudkan untuk bersenang-senang sekaligus membantu Anda

RioProjectX 1 Nov 10, 2021
Rio Userbot Adalah Bot Untuk Membantu Mempermudahkan Sesuatu Di Telegram, Last Repository With Pytgcalls v0.8.3

RIO - USERBOT Disclaimer Saya tidak bertanggung jawab atas penyalahgunaan bot ini. Bot ini dimaksudkan untuk bersenang-senang sekaligus membantu Anda

RioProjectX 4 Oct 18, 2022
Dados Públicos de CNPJ disponibilizados pela Receita Federal do Brasil

Dados Públicos CNPJ Fonte oficial da Receita Federal do Brasil, aqui. Layout dos arquivos, aqui. A Receita Federal do Brasil disponibiliza bases com o

Aphonso Henrique do Amaral Rafael 102 Dec 28, 2022
Criando Lambda Functions para Ingerir Dados de APIs com AWS CDK

LIVE001 - AWS Lambda para Ingerir Dados de APIs Fazer o deploy de uma função lambda com infraestrutura como código Lambda vai numa API externa e extra

Andre Sionek 12 Nov 20, 2022
Usando o Amazon Textract como OCR para Extração de Dados no DynamoDB

dio-live-textract2 Repositório de código para o live coding do dia 05/10/2021 sobre extração de dados estruturados e gravação em banco de dados a part

hugoportela 0 Jan 19, 2022
Biblioteca Python que extrai dados de mercado do Bacen (Séries Temporais)

Pybacen This library was developed for economic analysis in the Brazilian scenario (Investments, micro and macroeconomic indicators) Installation Inst

null 42 Jan 5, 2023
Script em python para carregar os arquivos de cnpj dos dados públicos da Receita Federal em MYSQL.

cnpj-mysql Script em python para carregar os arquivos de cnpj dos dados públicos da Receita Federal em MYSQL. Dados públicos de cnpj no site da Receit

null 17 Dec 25, 2022
Script que realiza a identificação de todos os logins e senhas dos wifis conectados em uma máquina e envia os dados para um e-mail especificado.

getWIFIConnection Script que realiza a identificação de todos os logins e senhas dos wifis conectados em uma máquina e envia os dados para um e-mail e

Vinícius Azevedo 3 Nov 27, 2022
Bancos de Dados Relacionais (SQL) na AWS com Amazon RDS

Bancos de Dados Relacionais (SQL) na AWS com Amazon RDS Repositório para o Live Coding DIO do dia 24/11/2021 Serviços utilizados Amazon RDS AWS Lambda

Cassiano Ricardo de Oliveira Peres 4 Jul 30, 2022
Extrator de dados do jupiterweb

Extrator de dados do jupiterweb O programa é composto de dois arquivos: Um constando apenas de classes complementares que representam as unidades e as

Bruno Aricó 2 Nov 28, 2022
Bancos de Dados Relacionais (SQL) na AWS com Amazon RDS.

Bancos de Dados Relacionais (SQL) na AWS com Amazon RDS Explorando o Amazon RDS, um serviço de provisionamente e gerenciamento de banco de dados relac

Lucas Magalhães 1 Dec 5, 2021
Análise de dados abertos do programa Taxigov.

Análise de dados do Taxigov Este repositório contém os cadernos Jupyter usados no projeto de análise de dados do Taxigov. Conjunto de dados O conjunto

Augusto Herrmann 1 Jan 10, 2022