Compute execution plan: A DAG representation of work that you want to get done. Individual nodes of the DAG could be simple python or shell tasks or complex deeply nested parallel branches or embedded DAGs themselves.

Overview

Hello from magnus

Magnus provides four capabilities for data teams:

  • Compute execution plan: A DAG representation of work that you want to get done. Individual nodes of the DAG could be simple python or shell tasks or complex deeply nested parallel branches or embedded DAGs themselves.

  • Run log store: A place to store run logs for reporting or re-running older runs. Along with capturing the status of execution, the run logs also capture code identifiers (commits, docker image digests etc), data hashes and configuration settings for reproducibility and audit.

  • Data Catalogs: A way to pass data between nodes of the graph during execution and also serves the purpose of versioning the data used by a particular run.

  • Secrets: A framework to provide secrets/credentials at run time to the nodes of the graph.

Design decisions:

  • Easy to extend: All the four capabilities are just definitions and can be implemented in many flavors.

    • Compute execution plan: You can choose to run the DAG on your local computer, in containers of local computer or off load the work to cloud providers or translate the DAG to AWS step functions or Argo workflows.

    • Run log Store: The actual implementation of storing the run logs could be in-memory, file system, S3, database etc.

    • Data Catalogs: The data files generated as part of a run could be stored on file-systems, S3 or could be extended to fit your needs.

    • Secrets: The secrets needed for your code to work could be in dotenv, AWS or extended to fit your needs.

  • Pipeline as contract: Once a DAG is defined and proven to work in local or some environment, there is absolutely no code change needed to deploy it to other environments. This enables the data teams to prove the correctness of the dag in dev environments while infrastructure teams to find the suitable way to deploy it.

  • Reproducibility: Run log store and data catalogs hold the version, code commits, data files used for a run making it easy to re-run an older run or debug a failed run. Debug environment need not be the same as original environment.

  • Easy switch: Your infrastructure landscape changes over time. With magnus, you can switch infrastructure by just changing a config and not code.

Magnus does not aim to replace existing and well constructed orchestrators like AWS Step functions or argo but complements them in a unified, simple and intuitive way.

Documentation

More details about the project and how to use it available here.

Installation

pip

magnus is a python package and should be installed as any other.

pip install magnus

Example Run

To give you a flavour of how magnus works, lets create a simple pipeline.

Copy the contents of this yaml into getting-started.yaml.


!!! Note

The below execution would create a folder called 'data' in the current working directory. The command as given should work in linux/macOS but for windows, please change accordingly.


> data/data.txt # For Linux/macOS next: success catalog: put: - "*" success: type: success fail: type: fail">
dag:
  description: Getting started
  start_at: step parameters
  steps:
    step parameters:
      type: task
      command_type: python-lambda
      command: "lambda x: {'x': int(x) + 1}"
      next: step shell
    step shell:
      type: task
      command_type: shell
      command: mkdir data ; env >> data/data.txt # For Linux/macOS
      next: success
      catalog:
        put:
          - "*"
    success:
      type: success
    fail:
      type: fail

And let's run the pipeline using:

 magnus execute --file getting-started.yaml --x 3

You should see a list of warnings but your terminal output should look something similar to this:

", "code_identifier_message": " " } ], "attempts": [ { "attempt_number": 0, "start_time": "2022-01-18 11:46:08.530138", "end_time": "2022-01-18 11:46:08.530561", "duration": "0:00:00.000423", "status": "SUCCESS", "message": "" } ], "user_defined_metrics": {}, "branches": {}, "data_catalog": [] }, "step shell": { "name": "step shell", "internal_name": "step shell", "status": "SUCCESS", "step_type": "task", "message": "", "mock": false, "code_identities": [ { "code_identifier": "c5d2f4aa8dd354740d1b2f94b6ee5c904da5e63c", "code_identifier_type": "git", "code_identifier_dependable": false, "code_identifier_url": " ", "code_identifier_message": " " } ], "attempts": [ { "attempt_number": 0, "start_time": "2022-01-18 11:46:08.576522", "end_time": "2022-01-18 11:46:08.588158", "duration": "0:00:00.011636", "status": "SUCCESS", "message": "" } ], "user_defined_metrics": {}, "branches": {}, "data_catalog": [ { "name": "data.txt", "data_hash": "8f25ba24e56f182c5125b9ede73cab6c16bf193e3ad36b75ba5145ff1b5db583", "catalog_relative_path": "20220118114608/data.txt", "catalog_handler_location": ".catalog", "stage": "put" } ] }, "success": { "name": "success", "internal_name": "success", "status": "SUCCESS", "step_type": "success", "message": "", "mock": false, "code_identities": [ { "code_identifier": "c5d2f4aa8dd354740d1b2f94b6ee5c904da5e63c", "code_identifier_type": "git", "code_identifier_dependable": false, "code_identifier_url": " ", "code_identifier_message": " " } ], "attempts": [ { "attempt_number": 0, "start_time": "2022-01-18 11:46:08.639563", "end_time": "2022-01-18 11:46:08.639680", "duration": "0:00:00.000117", "status": "SUCCESS", "message": "" } ], "user_defined_metrics": {}, "branches": {}, "data_catalog": [] } }, "parameters": { "x": 4 }, "run_config": { "executor": { "type": "local", "config": {} }, "run_log_store": { "type": "buffered", "config": {} }, "catalog": { "type": "file-system", "config": {} }, "secrets": { "type": "do-nothing", "config": {} } } }">
{
    "run_id": "20220118114608",
    "dag_hash": "ce0676d63e99c34848484f2df1744bab8d45e33a",
    "use_cached": false,
    "tag": null,
    "original_run_id": "",
    "status": "SUCCESS",
    "steps": {
        "step parameters": {
            "name": "step parameters",
            "internal_name": "step parameters",
            "status": "SUCCESS",
            "step_type": "task",
            "message": "",
            "mock": false,
            "code_identities": [
                {
                    "code_identifier": "c5d2f4aa8dd354740d1b2f94b6ee5c904da5e63c",
                    "code_identifier_type": "git",
                    "code_identifier_dependable": false,
                    "code_identifier_url": "
        
         "
        ,
                    "code_identifier_message": "
        
         "
        
                }
            ],
            "attempts": [
                {
                    "attempt_number": 0,
                    "start_time": "2022-01-18 11:46:08.530138",
                    "end_time": "2022-01-18 11:46:08.530561",
                    "duration": "0:00:00.000423",
                    "status": "SUCCESS",
                    "message": ""
                }
            ],
            "user_defined_metrics": {},
            "branches": {},
            "data_catalog": []
        },
        "step shell": {
            "name": "step shell",
            "internal_name": "step shell",
            "status": "SUCCESS",
            "step_type": "task",
            "message": "",
            "mock": false,
            "code_identities": [
                {
                    "code_identifier": "c5d2f4aa8dd354740d1b2f94b6ee5c904da5e63c",
                    "code_identifier_type": "git",
                    "code_identifier_dependable": false,
                    "code_identifier_url": "
        
         "
        ,
                    "code_identifier_message": "
        
         "
        
                }
            ],
            "attempts": [
                {
                    "attempt_number": 0,
                    "start_time": "2022-01-18 11:46:08.576522",
                    "end_time": "2022-01-18 11:46:08.588158",
                    "duration": "0:00:00.011636",
                    "status": "SUCCESS",
                    "message": ""
                }
            ],
            "user_defined_metrics": {},
            "branches": {},
            "data_catalog": [
                {
                    "name": "data.txt",
                    "data_hash": "8f25ba24e56f182c5125b9ede73cab6c16bf193e3ad36b75ba5145ff1b5db583",
                    "catalog_relative_path": "20220118114608/data.txt",
                    "catalog_handler_location": ".catalog",
                    "stage": "put"
                }
            ]
        },
        "success": {
            "name": "success",
            "internal_name": "success",
            "status": "SUCCESS",
            "step_type": "success",
            "message": "",
            "mock": false,
            "code_identities": [
                {
                    "code_identifier": "c5d2f4aa8dd354740d1b2f94b6ee5c904da5e63c",
                    "code_identifier_type": "git",
                    "code_identifier_dependable": false,
                    "code_identifier_url": "
        
         "
        ,
                    "code_identifier_message": "
        
         "
        
                }
            ],
            "attempts": [
                {
                    "attempt_number": 0,
                    "start_time": "2022-01-18 11:46:08.639563",
                    "end_time": "2022-01-18 11:46:08.639680",
                    "duration": "0:00:00.000117",
                    "status": "SUCCESS",
                    "message": ""
                }
            ],
            "user_defined_metrics": {},
            "branches": {},
            "data_catalog": []
        }
    },
    "parameters": {
        "x": 4
    },
    "run_config": {
        "executor": {
            "type": "local",
            "config": {}
        },
        "run_log_store": {
            "type": "buffered",
            "config": {}
        },
        "catalog": {
            "type": "file-system",
            "config": {}
        },
        "secrets": {
            "type": "do-nothing",
            "config": {}
        }
    }
}

You should see that data folder being created with a file called data.txt in it. This is according to the command in step shell.

You should also see a folder .catalog being created with a single folder corresponding to the run_id of this run.

To understand more about the input and output, please head over to the documentation.

You might also like...
The implementation of the CVPR2021 paper "Structure-Aware Face Clustering on a Large-Scale Graph with 10^7 Nodes"

STAR-FC This code is the implementation for the CVPR 2021 paper "Structure-Aware Face Clustering on a Large-Scale Graph with 10^7 Nodes" 🌟 🌟 . 🎓 Re

Addon and nodes for working with structural biology and molecular data in Blender.
Addon and nodes for working with structural biology and molecular data in Blender.

Molecular Nodes 🧬 🔬 💻 Buy Me a Coffee to Keep Development Going! Join a Community of Blender SciVis People! What is Molecular Nodes? Molecular Node

A very simple tool for situations where optimization with onnx-simplifier would exceed the Protocol Buffers upper file size limit of 2GB, or simply to separate onnx files to any size you want.
A very simple tool for situations where optimization with onnx-simplifier would exceed the Protocol Buffers upper file size limit of 2GB, or simply to separate onnx files to any size you want.

sne4onnx A very simple tool for situations where optimization with onnx-simplifier would exceed the Protocol Buffers upper file size limit of 2GB, or

Code for ICML 2021 paper: How could Neural Networks understand Programs?
Code for ICML 2021 paper: How could Neural Networks understand Programs?

OSCAR This repository contains the source code of our ICML 2021 paper How could Neural Networks understand Programs?. Environment Run following comman

GAN encoders in PyTorch that could match PGGAN, StyleGAN v1/v2,  and BigGAN.  Code also integrates the implementation of these GANs.
GAN encoders in PyTorch that could match PGGAN, StyleGAN v1/v2, and BigGAN. Code also integrates the implementation of these GANs.

MTV-TSA: Adaptable GAN Encoders for Image Reconstruction via Multi-type Latent Vectors with Two-scale Attentions. This is the official code release fo

Given a 2D triangle mesh, we could randomly generate cloud points that fill in the triangle mesh
Given a 2D triangle mesh, we could randomly generate cloud points that fill in the triangle mesh

generate_cloud_points Given a 2D triangle mesh, we could randomly generate cloud points that fill in the triangle mesh. Run python disp_mesh.py Or you

DGN pymarl - Implementation of DGN on Pymarl, which could be trained by VDN or QMIX

This is the implementation of DGN on Pymarl, which could be trained by VDN or QM

Static Features Classifier - A static features classifier for Point-Could clusters using an Attention-RNN model

Static Features Classifier This is a static features classifier for Point-Could

Request execution of Galaxy SARS-CoV-2 variation analysis workflows on input data you provide.
Request execution of Galaxy SARS-CoV-2 variation analysis workflows on input data you provide.

SARS-CoV-2 processing requests Request execution of Galaxy SARS-CoV-2 variation analysis workflows on input data you provide. Prerequisites This autom

FairFuzz: AFL extension targeting rare branches

FairFuzz An AFL extension to increase code coverage by targeting rare branches. FairFuzz has a particular advantage on programs with highly nested str

Caroline Lemieux 222 Nov 16, 2022
Some tentative models that incorporate label propagation to graph neural networks for graph representation learning in nodes, links or graphs.

Some tentative models that incorporate label propagation to graph neural networks for graph representation learning in nodes, links or graphs.

zshicode 1 Nov 18, 2021
Complex-Valued Neural Networks (CVNN)Complex-Valued Neural Networks (CVNN)

Complex-Valued Neural Networks (CVNN) Done by @NEGU93 - J. Agustin Barrachina Using this library, the only difference with a Tensorflow code is that y

youceF 1 Nov 12, 2021
Official implement of Paper:A deeply supervised image fusion network for change detection in high resolution bi-temporal remote sening images

A deeply supervised image fusion network for change detection in high resolution bi-temporal remote sensing images 深度监督影像融合网络DSIFN用于高分辨率双时相遥感影像变化检测 Of

Chenxiao Zhang 135 Dec 19, 2022
Official code of our work, AVATAR: A Parallel Corpus for Java-Python Program Translation.

AVATAR Official code of our work, AVATAR: A Parallel Corpus for Java-Python Program Translation. AVATAR stands for jAVA-pyThon progrAm tRanslation. AV

Wasi Ahmad 26 Dec 3, 2022
High performance Cross-platform Inference-engine, you could run Anakin on x86-cpu,arm, nv-gpu, amd-gpu,bitmain and cambricon devices.

Anakin2.0 Welcome to the Anakin GitHub. Anakin is a cross-platform, high-performance inference engine, which is originally developed by Baidu engineer

null 514 Dec 28, 2022
implementation of paper - You Only Learn One Representation: Unified Network for Multiple Tasks

YOLOR implementation of paper - You Only Learn One Representation: Unified Network for Multiple Tasks To reproduce the results in the paper, please us

Kin-Yiu, Wong 1.8k Jan 4, 2023
The official implementation of the research paper "DAG Amendment for Inverse Control of Parametric Shapes"

DAG Amendment for Inverse Control of Parametric Shapes This repository is the official Blender implementation of the paper "DAG Amendment for Inverse

Elie Michel 157 Dec 26, 2022
source code for 'Finding Valid Adjustments under Non-ignorability with Minimal DAG Knowledge' by A. Shah, K. Shanmugam, K. Ahuja

Source code for "Finding Valid Adjustments under Non-ignorability with Minimal DAG Knowledge" Reference: Abhin Shah, Karthikeyan Shanmugam, Kartik Ahu

Abhin Shah 1 Jun 3, 2022
Related resources for our EMNLP 2021 paper

Plan-then-Generate: Controlled Data-to-Text Generation via Planning Authors: Yixuan Su, David Vandyke, Sihui Wang, Yimai Fang, and Nigel Collier Code

Yixuan Su 61 Jan 3, 2023