Airflow ETL With EKS EFS Sagemaker

Overview

Airflow ETL With EKS EFS & Sagemaker (en desarrollo)

Diagrama de la solución

Importante

Si quiere subir esta app a algún repositorio, deberá primero instalar los hooks de pre-commit, así podrá tener una validación Ejecute en el directorio raíz poetry install & poetry run pre-commit install Esto tanto en el dir raíz de repo y también en el dir /Airflow-dags/github-dags

Definición del problema planteado



Contexto:

Acaba de ser contratado como el primer ingeniero de datos de una pequeña empresa de viajes. Su primera tarea para usted fue demostrar el valor y los conocimientos que se pueden generar a partir de las canalizaciones de datos. Su plan es que una vez que demuestre lo valiosos que pueden ser los datos, comenzarán a invertir en el uso de un proveedor de instancias en la nube. Por ahora, su propia computadora tendrá que hacerlo .

Objetivo:

Crear un DAG de Airflow que actúe de ETL para extraer extraiga datos estáticos S3 y los cargue en una base de datos de Postgres.

Datos a utilizar:

Para llevar a cabo el desarrollo se utilizará el dataset de demoras y cancelaciones de viajes aéreos de Kaggle que será hosteado en un bucket en S3. Lo primero será obtener los datos siguiendo estos pasos:

  • Instalar el cliente de Kaggle: pip install kaggle.
  • Instalar el cliente de aws siguiendo estas instrucciones acorde a su sistema operativo.
  • Instalar el cliente de aws eksctl siguiendo estas instrucciones
  • Configurar las credenciales siguiendo estas instrucciones.
  • Bajar datos de Kaggle:

cd to your local directory cd /path/to/dataset/

$ mkdir -p minio/data/flights-bucket

Download zipped dataset from kaggle $ kaggle datasets download -d yuanyuwendymu/airline-delay-and-cancellation-data-2009-2018

Unzip files $ unzip airline-delay-and-cancellation-data-2009-2018.zip -d raw/

Remove zipped data to save space $ aws s3 sync raw/ s3://[ml-dataset-raw-s3]/raw/

Remove zipped data to save space [optional] $ rm airline-delay-and-cancellation-data-2009-2018.zip

En este punto al correr el comando el siguiente comando debería aparecer un archivo CSV por año en el directorio de s3: aws s3 sync raw/ s3://[ml-dataset-raw-s3]/raw/



Desarrollo:

  1. Se configuro Airflow para que corra en AWS. Esto se puede hacer de varias maneras, pero aquí se desployo dentro de un cluster de kubernetes EKS. Se utilizo la herramienta git-sync para sincronizar los dags desde un repo CI/CD
  2. Se creo una instancia RDS de Postgres. La misma es Multi-AZ y posee instancia de backup. Esta instancia será utilizada como DB en los puntos siguientes.
  3. Se desarrollo un DAG de Airflow con schedule anual que:

    ○ Se calcula el promedio del tiempo de demora de salida (columna DEP_DELAY) por aeropuerto de salida (columna ORIGIN) y día.

    ○ Se utilizo un algoritmo de detección de anomalías para identificar por cada aeropuerto si hubo algún día con demoras fuera de lo normal.

    ○ Se utilizo los datos del punto anterior por cada aeropuerto para producir un gráfico desde Python usando Pandas o Matplotlib en el cual se pueda ver la cantidad de vuelos de cada día con alguna indicación en los días que fueron considerados anómalos.

    ○ Se carga la data sumarizada junto con un indicador para la fila correspondiente de cada día para indicar si para ese día en un aeropuerto particular las demoras estuvieron fuera de lo esperable. Asimismo los gráficos generados anteriormente son almacenados en S3 en un path fácilmente identificable por año y aeropuerto analizado.

  4. Se desarrollo una visualización de los datos cargados. Esto se puede hacer alternativamente de dos maneras (se realiza una de las dos): ○ Configurar Superset para que se levante utilizando Docker y muestre un dashboard. En caso de utilizar Docker o Docker Compose es necesario incluir instrucciones y archivos necesarios para llevar a cabo la configuración.

    ○ Configurar un dashboard con el servicio AWS Quicksight. En este caso es necesario incluir en la documentación del proyecto los pasos requeridos para que el servicio quede operativo.

    Notas:
  • El DAG funciona para cualquiera de los años 2009 a 2018 incluidos en el dataset. Se tiene en cuenta que si se corre dos veces para el mismo año podría haber una duplicación de datos y se resolvió.

Pasos

Clonamos el repositorio de la siguiente manera: git clone https://github.com/marcelogramma/Airflow-ETL-With-EKS-EFS-Sagemaker.git y luego ingresamos al directorio clonado.

Nos dirigimos con nuestro navegador web a la consola de AWS. Una vez logueados, vamos al servicio de Cloudformation mediante el cual nos permitirá desployar la infraestructura de nuestra solución. Los yml que se utilizaran para desployar la infraestructura se encuentran en el dir /CloudFormation, los mismos son 4 archivos numerados que deben ir subiéndose uno a uno, cuando el anterior termine.

Como desployar la infraestructura

Nos dirigimos a la herramienta CloudFormation

1- Sobre la izquierda, en el menú Crear pila, hacemos click en "con recursos nuevos (estándar)



2 - Seleccionamos Cargar un archivo de plantilla

3 - Seleccionamos el primer archivo del directorio /cloudformation (01-ML-Network.yml) y damos siguiente donde definiremos el nombre de nuestra pila y entorno

4 - Definimos etiquetas y rol,

siguiente dos veces y crear

comenzado de esta manera la creación del stack de red. En este punto se crea la VPC, Subnets, Internet GW y tabla de ruteo necesario para la implementación. Aguardar que finalice el proceso y verificar que no haya errores

5 - Repetimos este procedimiento con dada uno de los 3 restantes archivos yaml numerados hasta el 04. Solamente se muestran imágenes con los parámetros a tener en cuenta en cada creación

6 - Seguimos con la creación del stack 02-ML-SG.yaml

siguiente

Crear pila

7 - Crear pila con el archivo 03-ML-S3.yaml (igual que el paso anterior, solo se muestran configuración necesarias, el resto son a elección)

7.1 - Desde la consola web de aws, dirigirnos en otra pestaña, al servicio S3

7.2 - Ingresar al bucket ml-airflow-s3, y crear la carpeta logs

7.3 - ingresar al bucket ml-dataset-raw-s3, y crear la carpeta raw

8 - En este punto repetimos lo mismo con el archivo 04-ML-RDS.yaml. Tener en cuenta de seleccionar las subnets correctas. (ML-Network Private Subnet ML 1 y ML 2)

Setear password recordable, username postgres y security groups, como muestra la imagen

Configuraciones varias

Siguiente y luego crear. Este proceso demora aproximadamente unos 15 minutos en tener operativa la DB de postgres. Aguardar que termine completamente con resultado ok. Aproveche este tiempo para ondar en la infraestructura desplegada (VPC, subnets, SG, etc)

9 - En este punto crearemos nuestro cluster en EKS. Para esto dirigirse al servicio EKS desde la consola hacemos click en agregar nuevo cluser -> Crear

10 - Definimos el nombre del cluster, versión de kubernetes y role (probado con la versión 1.21) y damos siguiente

11 - Definimos la VPC, subnets (ML-Network Public Subnet ML 1 y ML 2) y el Secutiry Group ML-SG-EKSecurityGroup-xxxxxxxx

Luego dejamos todo por defecto asegurándonos que el acceso en este caso sea publico y le damos siguiente dos veces, revisamos y creamos. Este paso demora unos 20 minutos en tener el cluster activo Mientras esto sucede, vamos a configurar la CLI para poder tener acceso al cluster desde la misma. Obtener los datos de aws_access_key_id, aws_secret_access_key y aws_session_token y pegar en el siguiente archivo, si no existe crearlo,

Debería quedar similar a esta imagen.

Luego verificamos que este funcionando con el comando:

$ eksctl get cluster

12 - Una vez que el cluster este activo, pasáramos a la creación de los nodos, haciendo click en informática y luego en agregar grupo de nodos

Definimos un nombre y el rol, el resto por defecto y hacemos siguiente

Definimos el tipo de capacidad, la AMI y el tipo de instancia con su tamaño de disco

La cantidad de nodos para el escalado y siguiente

Dejamos seleccionadas las dos subnets que vienen (son las mismas que usa el control plane, no modificar) y el SG

SG

Siguiente y crear. Aguardamos que termine el proceso y queden los nodos activos.

13 - Una vez que quede el grupo de nodos activos, configuraremos kubectl (como instalar kubectl ) para que opere el cluster desde la CLI. Para esto ejecutamos desde la consola

$ aws eks update-kubeconfig --region us-east-1 --name ML-EKS

y lo verificamos con $ kubectl get svc y kubectl get nodes

14 - Ahora vamos a desployar el dashboard de kubernetes con el siguiente tutorial, el cual no voy a explicar acá, ya que se va de contexto. Una vez que haya pasado este paso podrá ingresar al dashboard con este link http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/#!/login

Donde podrá administrar por otro medio mas su cluster y vera algo similar a esta imagen

15 - Ahora desarrollaremos la implementación. de EFS en la nube, para ellos realice desde la terminal lo siguiente:

15.1 - $ kubectl apply -k "github.com/kubernetes-sigs/aws-efs-csi-driver/deploy/kubernetes/overlays/stable/ecr/?ref=release-1.1" Solo para regiones diferente a china y sin usar Fargate

15.2 - $ aws eks describe-cluster --name your_cluster_name --query "cluster.resourcesVpcConfig.vpcId" --output text

15.3 - $ aws ec2 describe-vpcs --vpc-ids YOUR_VPC_ID --query "Vpcs[].CidrBlock" --output text

15.4 - $ aws ec2 create-security-group --description efs-ml-efs-sg --group-name efs-sg --vpc-id YOUR_VPC_ID

15.5 - $ aws ec2 authorize-security-group-ingress --group-id sg-xxx --protocol tcp --port 2049 --cidr YOUR_VPC_CIDR

15.6 - $ aws efs create-file-system --creation-token eks-efs

15.7 - $ aws efs create-mount-target --file-system-id FileSystemId --subnet-id SubnetID --security-group sg-xxx (este punto deberá realizarlo dos veces, una por cada subnet)

En este punto tenemos creado el EFS en la nube y procederemos a utilizarlo dentro del cluster. Para esto debemos editar el archivo airflow-helm/efs-pvc.yml con el valor de nuestro fs en la clave server.

Para realizar este punto, desde la consola de aws, vamos al servicio EFS donde veremos el sistema de archivo recién creado

Y copiamos el ID del sistema de archivos al archivo de la imagen anterior. Guardamos el cambio y ejecutamos el siguiente comando

$ kubectl apply -f efs-pvc.yml

luego de que se aplique, podemos ver el deploy en el cluster con los siguientes comandos $ kubectl get pv y $ kubectl get pv

Aclaración, no se usara el FS, ya que requiere mas configuración y explicación y se va fuera de contexto y foco de lo pedido, solo se muestra como realizar su implementación. y deployarlo en el EKS.

16 - Modificar el archivos airflow-helm/value.yml en la sección connections, con las key de acceso a la CLI

También modificar por su repositorio de Github, donde implemente CI/CD para los dags

17 - Luego de estas modificaciones, ejecutamos los siguientes comando

$ kubectl apply -f airflow-db-secret.yml y $ helm install ml-airflow airflow-stable/airflow --versión 8.5.3 --values values.yml Nota. Debera tener instalado HELM y ademas el repo Helm Chart for Apache Airflow

Con este ultimo comando estamos ya desployando Apache Airflow en el cluster EKS, esto demora unos minutos, aguarde y verifique que el deploy se complete normalmente

Cuando helm termine, veremos este mensaje

Esto quiere decir que ya se desployo, pero para verificar que este ok, dirigirse al cluster en el servicio EKS y verificar las cargas de trabajo

Si todo esta ok, ejecutaremos los siguientes comandos

$ kubectl port-forward svc/ml-airflow-web 8080:8080 --namespace default > /dev/null &

$ kubectl port-forward svc/ml-airflow-flower 5555:5555 --namespace default > /dev/null &

El primero para ingresar a Airflow Web (user/pass admin admin) y el segundo al dash del flower

Acá podemos ver el DAG que sincronizo desde GitHub

Y el home del flower

CONTINUARA...

You might also like...
Show you how to integrate Zeppelin with Airflow
Show you how to integrate Zeppelin with Airflow

Introduction This repository is to show you how to integrate Zeppelin with Airflow. The philosophy behind the ingtegration is to make the transition f

Sample code for Harry's Airflow online trainng course

Sample code for Harry's Airflow online trainng course You can find the videos on youtube or bilibili. I am working on adding below things: the slide p

SageMaker Python SDK is an open source library for training and deploying machine learning models on Amazon SageMaker.
SageMaker Python SDK is an open source library for training and deploying machine learning models on Amazon SageMaker.

SageMaker Python SDK SageMaker Python SDK is an open source library for training and deploying machine learning models on Amazon SageMaker. With the S

Fully Automated YouTube Channel ▶️with Added Extra Features.

Fully Automated Youtube Channel ▒█▀▀█ █▀▀█ ▀▀█▀▀ ▀▀█▀▀ █░░█ █▀▀▄ █▀▀ █▀▀█ ▒█▀▀▄ █░░█ ░░█░░ ░▒█░░ █░░█ █▀▀▄ █▀▀ █▄▄▀ ▒█▄▄█ ▀▀▀▀ ░░▀░░ ░▒█░░ ░▀▀▀ ▀▀▀░

Educational project on how to build an ETL (Extract, Transform, Load) data pipeline, orchestrated with Airflow. Viewflow is an Airflow-based framework that allows data scientists to create data models without writing Airflow code.
Viewflow is an Airflow-based framework that allows data scientists to create data models without writing Airflow code.

Viewflow Viewflow is a framework built on the top of Airflow that enables data scientists to create materialized views. It allows data scientists to f

This is a practice on Airflow, which is building virtual env, installing Airflow and constructing data pipeline (DAGs)

airflow-test This is a practice on Airflow, which is Builing virtualbox env and setting Airflow on that env Installing Airflow using python virtual en

Yet another Airflow plugin using CLI command as RESTful api, supports Airflow v2.X.
Yet another Airflow plugin using CLI command as RESTful api, supports Airflow v2.X.

中文版文档 Airflow Extended API Plugin Airflow Extended API, which export airflow CLI command as REST-ful API to extend the ability of airflow official API

A zero-dependency Python library for getting the Kubernetes token of a AWS EKS cluster
A zero-dependency Python library for getting the Kubernetes token of a AWS EKS cluster

tokeks A zero-dependency Python library for getting the Kubernetes token of a AWS EKS cluster. No AWS CLI, third-party client or library (boto3, botoc

A SageMaker Projects template to deploy a model from Model Registry, choosing your preferred method of deployment among async (Asynchronous Inference), batch (Batch Transform), realtime (Real-time Inference Endpoint). More to be added soon! Build an Amazon SageMaker Pipeline to Transform Raw Texts to A Knowledge Graph
Build an Amazon SageMaker Pipeline to Transform Raw Texts to A Knowledge Graph

Build an Amazon SageMaker Pipeline to Transform Raw Texts to A Knowledge Graph This repository provides a pipeline to create a knowledge graph from ra

Amazon SageMaker Delta Sharing Examples

This repository contains examples and related resources showing you how to preprocess, train, and serve your models using Amazon SageMaker with data fetched from Delta Lake.

MLOps pipeline project using Amazon SageMaker Pipelines
MLOps pipeline project using Amazon SageMaker Pipelines

This project shows steps to build an end to end MLOps architecture that covers data prep, model training, realtime and batch inference, build model registry, track lineage of artifacts and model drift detection. It utilizes SageMaker Pipelines that offers machine learning (ML) to orchestrate SageMaker jobs and author reproducible ML pipelines.

Ethereum ETL lets you convert blockchain data into convenient formats like CSVs and relational databases.

Python scripts for ETL (extract, transform and load) jobs for Ethereum blocks, transactions, ERC20 / ERC721 tokens, transfers, receipts, logs, contracts, internal transactions.

ETL python utilizando API do Spotify

Processo de ETL com Python e Airflow usando API do Spotify Sobre Projeto de ETL(Extract, Transform e Load) utilizando Python com API do Spotify e Airf

Pyspark Spotify ETL

This is my first Data Engineering project, it extracts data from the user's recently played tracks using Spotify's API, transforms data and then loads it into Postgresql using SQLAlchemy engine. Data is shown as a Spark Dataframe before loading and the whole ETL job is scheduled with crontab. Token never expires since an HTTP POST method with Spotify's token API is used in the beginning of the script.

Python ELT Studio, an application for building ELT (and ETL) data flows.
Python ELT Studio, an application for building ELT (and ETL) data flows.

The Python Extract, Load, Transform Studio is an application for performing ELT (and ETL) tasks. Under the hood the application consists of a two parts.

ETL flow framework based on Yaml configs in Python
ETL flow framework based on Yaml configs in Python

ETL framework based on Yaml configs in Python A light framework for creating data streams. Setting up streams through configuration in the Yaml file.

Owner
null
Pyspark Spotify ETL

This is my first Data Engineering project, it extracts data from the user's recently played tracks using Spotify's API, transforms data and then loads it into Postgresql using SQLAlchemy engine. Data is shown as a Spark Dataframe before loading and the whole ETL job is scheduled with crontab. Token never expires since an HTTP POST method with Spotify's token API is used in the beginning of the script.

null 16 Jun 9, 2022
Python ELT Studio, an application for building ELT (and ETL) data flows.

The Python Extract, Load, Transform Studio is an application for performing ELT (and ETL) tasks. Under the hood the application consists of a two parts.

Schlerp 55 Nov 18, 2022
ETL flow framework based on Yaml configs in Python

ETL framework based on Yaml configs in Python A light framework for creating data streams. Setting up streams through configuration in the Yaml file.

Павел Максимов 18 Jul 6, 2022
A Big Data ETL project in PySpark on the historical NYC Taxi Rides data

Processing NYC Taxi Data using PySpark ETL pipeline Description This is an project to extract, transform, and load large amount of data from NYC Taxi

Unnikrishnan 2 Dec 12, 2021
In this project, ETL pipeline is build on data warehouse hosted on AWS Redshift.

ETL Pipeline for AWS Project Description In this project, ETL pipeline is build on data warehouse hosted on AWS Redshift. The data is loaded from S3 t

Mobeen Ahmed 1 Nov 1, 2021
An ETL framework + Monitoring UI/API (experimental project for learning purposes)

Fastlane An ETL framework for building pipelines, and Flask based web API/UI for monitoring pipelines. Project structure fastlane |- fastlane: (ETL fr

Dan Katz 2 Jan 6, 2022
ETL pipeline on movie data using Python and postgreSQL

Movies-ETL ETL pipeline on movie data using Python and postgreSQL Overview This project consisted on a automated Extraction, Transformation and Load p

Juan Nicolas Serrano 0 Jul 7, 2021
PrimaryBid - Transform application Lifecycle Data and Design and ETL pipeline architecture for ingesting data from multiple sources to redshift

Transform application Lifecycle Data and Design and ETL pipeline architecture for ingesting data from multiple sources to redshift This project is composed of two parts: Part1 and Part2

Emmanuel Boateng Sifah 1 Jan 19, 2022
Using Data Science with Machine Learning techniques (ETL pipeline and ML pipeline) to classify received messages after disasters.

Using Data Science with Machine Learning techniques (ETL pipeline and ML pipeline) to classify received messages after disasters.

null 1 Feb 11, 2022
An ETL Pipeline of a large data set from a fictitious music streaming service named Sparkify.

An ETL Pipeline of a large data set from a fictitious music streaming service named Sparkify. The ETL process flows from AWS's S3 into staging tables in AWS Redshift.

null 1 Feb 11, 2022