Hera is a Python framework for constructing and submitting Argo Workflows.

Overview

Hera (hera-workflows)

The Argo was constructed by the shipwright Argus, and its crew were specially protected by the goddess Hera.

(https://en.wikipedia.org/wiki/Argo)

License: MIT

Hera is a Python framework for constructing and submitting Argo Workflows. The main goal of Hera is to make Argo Workflows more accessible by abstracting away some setup that is typically necessary for constructing Argo workflows.

Python functions are first class citizens in Hera - they are the atomic units (execution payload) that are submitted for remote execution. The framework makes it easy to wrap execution payloads into Argo Workflow tasks, set dependencies, resources, etc.

You can watch the introductory Hera presentation at the "Argo Workflows and Events Community Meeting 20 Oct 2021" here!

Table of content

Assumptions

Hera is exclusively dedicated to remote workflow submission and execution. Therefore, it requires an Argo server to be deployed to a Kubernetes cluster. Currently, Hera assumes that the Argo server sits behind an authentication layer that can authenticate workflow submission requests by using the Bearer token on the request. To learn how to deploy Argo to your own Kubernetes cluster you can follow the Argo Workflows guide!

Another option for workflow submission without the authentication layer is using port forwarding to your Argo server deployment and submitting workflows to localhost:2746 (2746 is the default, but you are free to use yours). Please refer to the documentation of Argo Workflows to see the command for port forward!

In the future some of these assumptions may either increase or decrease depending on the direction of the project. Hera is mostly designed for practical data science purposes, which assumes the presence of a DevOps team to set up an Argo server for workflow submission.

Installation

There are multiple ways to install Hera:

  1. You can install from PyPi:
pip install hera-workflows
  1. Install it directly from this repository using:
python -m pip install git+https://github.com/argoproj-labs/hera-workflows --ignore-installed
  1. Alternatively, you can clone this repository and then run the following to install:
python setup.py install

Contributing

If you plan to submit contributions to Hera you can install Hera in a virtual environment managed by pipenv:

pipenv shell
pipenv sync --dev --pre

Also, see the contributing guide!

Concepts

Currently, Hera is centered around two core concepts. These concepts are also used by Argo, which Hera aims to stay consistent with:

  • Task - the object that holds the Python function for remote execution/the atomic unit of execution;
  • Workflow - the higher level representation of a collection of tasks.

Examples

A very primitive example of submitting a task within a workflow through Hera is:

from hera.v1.task import Task
from hera.v1.workflow import Workflow
from hera.v1.workflow_service import WorkflowService


def say(message: str):
    """
    This can be anything as long as the Docker image satisfies the dependencies. You can import anything Python 
    that is in your container e.g torch, tensorflow, scipy, biopython, etc - just provide an image to the task!
    """
    print(message)


ws = WorkflowService('my-argo-domain.com', 'my-argo-server-token')
w = Workflow('my-workflow', ws)
t = Task('say', say, [{'message': 'Hello, world!'}])
w.add_task(t)
w.submit()

Examples

See the examples directory for a collection of Argo workflow construction and submission via Hera!

Comparison

There are other libraries currently available for structuring and submitting Argo Workflows:

  • Couler, which aims to provide a unified interface for constructing and managing workflows on different workflow engines;
  • Argo Python DSL, which allows you to programmaticaly define Argo worfklows using Python.

While the aforementioned libraries provide amazing functionality for Argo workflow construction and submission, they require an advanced understanding of Argo concepts. When Dyno Therapeutics started using Argo Workflows, it was challenging to construct and submit experimental machine learning workflows. Scientists and engineers at Dyno Therapeutics used a lot of time for workflow definition rather than the implementation of the atomic unit of execution - the Python function - that performed, for instance, model training.

Hera presents a much simpler interface for task and workflow construction, empowering users to focus on their own executable payloads rather than workflow setup. Here's a side by side comparison of Hera, Argo Python DSL, and Couler:

Hera Couler Argo Python DSL

from hera.v1.task import Task
from hera.v1.workflow import Workflow
from hera.v1.workflow_service import WorkflowService


def say(message: str):
    print(message)


ws = WorkflowService('my-argo-server.com', 'my-auth-token')
w = Workflow('diamond', ws)
a = Task('A', say, [{'message': 'This is task A!'}])
b = Task('B', say, [{'message': 'This is task B!'}])
c = Task('C', say, [{'message': 'This is task C!'}])
d = Task('D', say, [{'message': 'This is task D!'}])

a.next(b).next(d)  # a >> b >> d
a.next(c).next(d)  # a >> c >> d

w.add_tasks(a, b, c, d)
w.submit()

B [lambda: job(name="A"), lambda: job(name="C")], # A -> C [lambda: job(name="B"), lambda: job(name="D")], # B -> D [lambda: job(name="C"), lambda: job(name="D")], # C -> D ] ) diamond() submitter = ArgoSubmitter() couler.run(submitter=submitter) ">
import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter


def job(name):
    couler.run_container(
        image="docker/whalesay:latest",
        command=["cowsay"],
        args=[name],
        step_name=name,
    )


def diamond():
    couler.dag(
        [
            [lambda: job(name="A")],
            [lambda: job(name="A"), lambda: job(name="B")],  # A -> B
            [lambda: job(name="A"), lambda: job(name="C")],  # A -> C
            [lambda: job(name="B"), lambda: job(name="D")],  # B -> D
            [lambda: job(name="C"), lambda: job(name="D")],  # C -> D
        ]
    )


diamond()
submitter = ArgoSubmitter()
couler.run(submitter=submitter)

V1alpha1Template: return self.echo(message=message) @task @parameter(name="message", value="B") @dependencies(["A"]) def B(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @task @parameter(name="message", value="C") @dependencies(["A"]) def C(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @task @parameter(name="message", value="D") @dependencies(["B", "C"]) def D(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @template @inputs.parameter(name="message") def echo(self, message: V1alpha1Parameter) -> V1Container: container = V1Container( image="alpine:3.7", name="echo", command=["echo", "{{inputs.parameters.message}}"], ) return container ">
from argo.workflows.dsl import Workflow

from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *


class DagDiamond(Workflow):

    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )

        return container

Comments
  • CICD Improvements

    CICD Improvements

    About

    this is a work in progress; it aims to check feasibility for further discussion

    Following #93, further improve CI + CD

    Tasks

    • [x] add coverage reports tasks and ci
      • [x] store all py vers X OS coverage files
      • [x] combine all py vers X OS coverage files
      • [x] create html cov
      • [x] store coverage report artifact
    • [x] add optional binary distribution to wheels via setup.py
      • [x] add a none-any build just in case and publish it
      • [x] add sdist build ( .tar.gz ) build for publishing
    • [x] store built wheels by tox
      • reason for publishing to tested wheels
    • [x] build wheels for all distributions ( matrix = py ver + os )
    • [x] build py3-none-any wheel + sdist ( .tar.gz )
    • [x] consolidation hera CICD into a single gh workflow
      • motivation re-use test / build job as a pre-publishing job
      • motivation to leverage artifacts passing between jobs rather than between different workflows
      • [x] add publish job ( similar to exiting one )
      • [x] change event triggers
      • [x] add if: to publish job
      • [x] delete file .github/workflows/hera_build_and_publish.yaml
      • [x] rename file .github/workflows/hera_pr.yaml -> .github/workflows/cicd.yaml ( or .. )
      • [x] fix jobs badge workflow name hera_build_and_publish.yaml -> cicd.yaml ( or .. ) in README.md
    • [x] enable upload to pypi-test for the purpose of validating entire publish job ( see publish job failure )
    • [x] replace upload to pypi-test with upload to pypi in publish job
    • [ ] setup codecov if we want a coverage badge to be introduced in this PR ( or propose a different solution )

    inspiration credit https://github.com/samuelcolvin/pydantic/blob/master/.github/workflows/ci.yml

    opened by harelwa 21
  • Add context management for workflow submission

    Add context management for workflow submission

    Currently, users have to add tasks to a workflow independently. After a task is defined, it is necessary to call workflow.add_tasks(t1, t2, ...) for otherwise the tasks are not added to the template of the DAG in the main workflow. To save some effort in this arena, a context manager can be introduced to auto-insert tasks into a workflow, and submit a workflow during the exit phase of the context. This will provide two ways of adding tasks to a workflow.

    Current, and supported way of adding tasks and submitting a workflow:

    wf = Workflow('test')
    t1 = Task('t1', image='docker/whalesay', command=['cowsay', 'foo'])
    t2 = Task('t2', image='docker/whalesay', command=['cowsay', 'foo'])
    t1 >> t2
    wf.add(t1, t2)
    wf.submit()
    

    Potential future (second) way of adding tasks and submitting a workflow:

    with Workflow('test') as wf:
        Task('t1', image='docker/whalesay', command=['cowsay', 'foo']) 
        >> Task('t2', image='docker/whalesay', command=['cowsay', 'foo'])
    

    Alternatively:

    with Workflow('test') as wf:
        t1 = Task('t1', image='docker/whalesay', command=['cowsay', 'foo']) 
        t2 = Task('t2', image='docker/whalesay', command=['cowsay', 'foo'])
        t1 >> t2
    

    This will require the implementation of a context manager on Workflow and a container for Task, which tasks insert themselves into during __init__:

    class Workflow:
    
        def __enter__(self, ...):
            ...
            return self
    
        def __exit__(self, ...):
            ...
            return self.submit()
    
    class _Container(list):
        
        @classmethod
        def add(cls, t: Task):
            cls.append(t)
    
    class Task:
    
        def __init__(...):
            ...
            _Container.add(self)
    

    Questions to consider:

    • Does this solve a real problem or would it cause confusion because of of the implicit behavior of task additions?
    • Is this an interface that abides by Hera's principle of simplicity? Is the _Container usage a bit too abstract?
    • Is it safe to assume that exiting a context should indeed submit a workflow? If not but the design is something desirable, should users be provided with the feature, along with a parameter on Workflow that says "auto_submit? Does that pollute the interface ofWorkflow` unnecessarily with QOL features rather than focusing on exposing Argo specific parameters?

    Would love to discuss this @bchalk101 and @bobh66! Any thoughts? Would this be useful to your use case?

    enhancement 
    opened by flaviuvadan 20
  • Multiple Python Versions CI

    Multiple Python Versions CI

    Purpose of PR

    related issues and PRs #73 #92 #94

    Provide multiple python versions CI ( CD will be handled in a different PR )

    To achieve this, introduce usage of tox for testing built wheels in "isolated envs" of multiple python versions.

    Notes regarding similar open PRs

    1. The difference between this PR and #92 is that it does not run pipenv in tox, but rather lets tox handle dependencies and venvs
    2. The difference between this PR and #94 that it keep pipenv as the project's dev env manager

    Tasks

    • [x] add tox.ini with python{3.7,3.8,3.9,3.10} test envs
    • [x] add build matrix of python versions to ci
      • ref docs: https://docs.github.com/en/actions/using-workflows/advanced-workflow-features#using-a-build-matrix
      • ref proj: https://github.com/spotify/luigi/
      • ref proj: https://github.com/samuelcolvin/pydantic/
    • [x] remove caching of Pipfile.lock
      • Pipfile.lock is a py37 lock file
    • [x] for dev purposes, update Pipfile and Pipfile.lock
      • for ci, tox is used directly as Pipfile.lock is a py37 lock file

    Notes for future changes

    1. Evaluate usage of pipenv in tox 1.1 See https://pipenv-fork.readthedocs.io/en/latest/advanced.html#tox-automation-project 1.2 Evaluate tox-pipenv
    2. Evaluate poetry to replace pipenv 2.1 See #94 2.2 See related discussion of "who does what and why" regarding tox and poetry: https://github.com/python-poetry/poetry/discussions/4307

    Changes to Pipfile

    To use tox, it was added to pipfile as a dev dependency, with the following command:

    pipenv install pytest-cov "tox<4.0" tox-wheel --dev
    

    tox-wheel was added as well to make tox build wheels rather than zip, so these can be used for publishing [ more below ].

    After running tox, you'll find the wheels under .tox/dist:

    ❯ ls .tox/dist
    hera_workflows-1.8.0rc7-py3-none-any.whl
    

    pytest-cov was added as well so we can add a coverage reports and tests, and perhaps add a coverage test.

    Even though this might be related to a different PR, I think it's also related to this PR as well, as it concerns major changes to how hera is tested. In this context, I've added the relevant pytest opt in setup.cfg.

    More on added tox.ini

    To use tox update / recreate your pipenv virtual env according to the updated Pipfile

    An initial tox.ini was created with a few tox envs.

    The "main" env - testenv -- provides the first desired functionality discussed in #73 and tests 4 python versions, including 3.10 ( latest ).

    To list all tox envs you can run:

    ❯ tox -a
    python3.7
    python3.8
    python3.9
    python3.10
    lint
    typecheck
    

    And to run a specific one, e.g. lint, run -

    ❯ tox -e lint
    
    opened by harelwa 15
  • Unable to resolve methods outside script

    Unable to resolve methods outside script

    Hi there, I'm new to Hera and Argo so bear with me if the question is rookie:

    Here is my code snippet:

    from app.common.image.image_utils import read_image
    from app.common.models.thumbnail_metadata import ThumbnailMetadata
    from app.common.s3_utils import resolve_s3_url, upload
    from app.common.tasks.argo_task import ArgoTask
    from app.services.argo_client import ArgoWorkflowService
    
    class ThumbnailTask(ArgoTask):
        def __init__(self, metadata: ThumbnailMetadata):
            # .... some more code
    
        def create_thumbnails(self, image_url: str, thumbnail_sizes: list[dict]):
            blob = read_image(self.image_url)
    
            # .... some more code
    
        def create(self):
            ArgoWorkflowService.create_task('create_thumbnails', ThumbnailTask.create_thumbnails, [{
                'image_url': self.image_url,
                'thumbnail_sizes': self.thumbnail_sizes
            }])
    
    from typing import Callable
    from hera.retry import Retry
    from hera.task import Task
    from hera.workflow import Workflow
    from hera.workflow_service import WorkflowService
    
    from app.core.config import get_app_settings
    
    class ArgoWorkflowService(object):
        workflow_service = None
        workflow = None
    
        @staticmethod
        def create_task(name: str, func: Callable, func_params: list[dict]):
            # .... some more code
    
            task = Task(name, func, func_params, image='my-repo.dkr.ecr.us-east-2.amazonaws.com/my-app:latest', retry=Retry(duration=3, max_duration=60))
            ArgoWorkflowService.workflow.add_task(task)
            ArgoWorkflowService.workflow.create()
    

    The error I received in Argo is:

    │ main Traceback (most recent call last):                                                                                                                                                                                         │
    │ main   File "/argo/staging/script", line 5, in <module>                                                                                                                                                                         │
    │ main     blob = read_image(self.image_url)                                                                                                                                                                                      │
    │ main NameError: name 'read_image' is not defined                                                                                                                                                                                │
    │
    

    The method read_image is from other packages.

    In addition, Argo was unable to import 3rd party python libraries. I used the same application image but still not working.

    Any help would be appreciated!

    question 
    opened by linyaoli 14
  • How to create a workflow with 10k tasks

    How to create a workflow with 10k tasks

    I'd like to have a workflow with many tasks, but I'm running into the 1.5MB k8s/etcd file limit. The workflow isn't complicated, it's basically 10k very-short bash commands that all run on the same image with the same resources etc.

    I think the solution here is to use a WorkflowTemplate, but I haven't figured out how to use input parameters with hera.

    I have something like this:

    # set up the WorkflowTemplate
    wt = WorkflowTemplate(...)
    
    # is this the right way to pass input in?
    t = Task("cmd", lambda: _, command=["bash", "-c", "{{inputs.parameters.cmd}}"], ...)
    wt.add_task(t)
    wt.create()
    
    # how do I get these commands into the workflow as parameters?
    commands = ["echo foo", "echo bar"]
    
    # create the Workflow
    workflow = Workflow(..., workflow_template_ref=wt.name)
    workflow.create()
    
    question stale 
    opened by dmerrick 13
  • How to ignore certificate errors?

    How to ignore certificate errors?

    When I attempt to port-forward Argo running in GKE, I get a certificate error. I tried the suggestion from the first comment here, but I still get the certificate error.

    C:\Users\ryanr\Code\hera-workflows\examples [main ≡ +0 ~1 -0 !]> python .\diamond.py
    Traceback (most recent call last):
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 706, in urlopen      
        chunked=chunked,
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 382, in _make_request
        self._validate_conn(conn)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 1010, in _validate_conn
        conn.connect()
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connection.py", line 426, in connect
        tls_in_tls=tls_in_tls,
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\util\ssl_.py", line 450, in ssl_wrap_socket
        sock, context, tls_in_tls, server_hostname=server_hostname
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\util\ssl_.py", line 493, in _ssl_wrap_socket_impl
        return ssl_context.wrap_socket(sock, server_hostname=server_hostname)
      File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\lib\ssl.py", line 423, in wrap_socket
        session=session
      File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\lib\ssl.py", line 870, in _create
        self.do_handshake()
      File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\lib\ssl.py", line 1139, in do_handshake
        self._sslobj.do_handshake()
    ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1091)
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File ".\diamond.py", line 32, in <module>
        w.submit()
      File "c:\users\ryanr\code\hera-workflows\src\hera\v1\workflow.py", line 129, in submit
        self.service.submit(self.workflow, namespace)
      File "c:\users\ryanr\code\hera-workflows\src\hera\v1\workflow_service.py", line 48, in submit
        return self.service.create_workflow(namespace, V1alpha1WorkflowCreateRequest(workflow=workflow))
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api\workflow_service_api.py", line 62, in create_workflow
        return self.create_workflow_with_http_info(namespace, body, **kwargs)  # noqa: E501
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api\workflow_service_api.py", line 162, in create_workflow_with_http_info
        collection_formats=collection_formats)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api_client.py", line 369, in call_api
        _preload_content, _request_timeout, _host)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api_client.py", line 185, in __call_api
        _request_timeout=_request_timeout)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api_client.py", line 413, in request
        body=body)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\rest.py", line 271, in POST
        body=body)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\rest.py", line 168, in request
        headers=headers)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\request.py", line 79, in request
        method, url, fields=fields, headers=headers, **urlopen_kw
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\request.py", line 170, in request_encode_body
        return self.urlopen(method, url, **extra_kw)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\poolmanager.py", line 375, in urlopen
        response = conn.urlopen(method, u.request_uri, **kw)
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 796, in urlopen
        **response_kw
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 796, in urlopen
        **response_kw
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 796, in urlopen
        **response_kw
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 756, in urlopen
        method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
      File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\util\retry.py", line 574, in increment
        raise MaxRetryError(_pool, url, error or ResponseError(cause))
    urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='localhost', port=2746): Max retries exceeded with url: /api/v1/workflows/default (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1091)')))
    
    opened by tachyus-ryan 13
  • Support latest Python version

    Support latest Python version

    Hi! Thank you for developing such a great tool! Currently, Hera supports only Python 3.7. The version is now security status, will become end of support soon.

    reference: https://www.python.org/downloads/

    So it is better that supporting Python 3.8+, IMO. I'm happy to create PR for supporting newer version of Python, if it is OK.

    Thank you.

    enhancement 
    opened by hirosassa 12
  • Bugfix: env value_from_input was ignored

    Bugfix: env value_from_input was ignored

    In case of Env being specified via value_from_input, the actual value of the value_from_input is ignored and just its not-Noneness is checked. We fix it here to use it. Current buggy state has the unfortunate consequence of having to name the env var the same as the parameter

    opened by tmi 11
  • Add task template mechanism

    Add task template mechanism

    Description

    This MR provides additional funcitonality for templating tasks with a class TaskTemplate.

    Example

    task_template = TaskTemplate('myTemplate', say,
        [{
            'message': "default value for param 'message'",
            "other_message": {"default": {"value": {"for": "other_message"}}},
        }],
    )
    a = task_template.task("taskWithDefaultValues")
    b = task_template.task("taskWithNonDefaultValues", [{"other_message": "hello again"}])
    
    
    opened by kromanow94 10
  • How about supporting onExit?

    How about supporting onExit?

    Argo Workflows provides a function, onExit, to execute a template after the workflow has finished. https://github.com/argoproj/argo-workflows/blob/master/examples/exit-handlers.yaml

    How about supporting onExit for hera-workflows as well? I have tested the following implementation.

    class WorkflowWithOnExit(Workflow):
        def __init__(self, name: str, service: WorkflowService, **kw):
            super().__init__(name, service, **kw)
    
        def add_tasks_on_exit(self, on_exit_task: "OnExitTask", *ts: Task) -> None:
            template_on_exit_name = self.name + "-onExit"
            setattr(self.spec, "on_exit", template_on_exit_name)
    
            # prepare template for onExit
            dag_template_on_exit = IoArgoprojWorkflowV1alpha1DAGTemplate(tasks=[])
            template_on_exit = IoArgoprojWorkflowV1alpha1Template(
                name=template_on_exit_name,
                steps=[],
                dag=dag_template_on_exit,
                parallelism=self.parallelism,
            )
            self.spec.templates.append(template_on_exit)
    
            # HACK: Not using workflow_editors.add_tasks to add to dag_template_on_exit.tasks
            for t in (on_exit_task,) + ts:
                self.spec.templates.append(t.argo_template)
                if t.resources.volumes:
                    for vol in t.resources.volumes:
                        if isinstance(vol, Volume):
                            self.spec.volume_claim_templates.append(vol.get_claim_spec())
                        else:
                            self.spec.volumes.append(vol.get_volume())
                dag_template_on_exit.tasks.append(t.argo_task)
    
    
    class OnExitTask(Task):
        def when_by_workflow_status(
            self, operator: Operator, workflow_status: WorkflowStatus
        ):
            self.argo_task.when = (
                f"{{{{workflow.status}}}} {operator.value} {workflow_status.value}"
            )
    
    def f_with_error():
        raise RuntimeError("Err!")
    
    
    def exit_handler():
        print("exit_handler")
    
    
    def exit_handler_next():
        print("exit_handler_next")
    
    ws = WorkflowService(
        host=host,
        token=token,
        namespace=namespace,
    )
    
    w = WorkflowWithOnExit(
        "test_exit_handler",
        ws,
        namespace=namespace,
        service_account_name=service_account,
    )
    
    task_exit_handler = OnExitTask("exit_handler", exit_handler, image=image)
    task_exit_handler.when_by_workflow_status(Operator.not_equal, WorkflowStatus.Succeeded)
    task_exit_handler_next = Task("exit_handler_next", exit_handler_next, image=image)
    task_exit_handler >> task_exit_handler_next
    
    task_with_error = Task("task_with_error", f_with_error, image=image)
    
    w.add_tasks(task_with_error)
    w.add_tasks_on_exit(task_exit_handler, task_exit_handler_next)
    
    w.create()
    
    STEP                         TEMPLATE                  PODNAME                       DURATION  MESSAGE
     ✖ test-exit-handler         test-exit-handler
     └─✖ task-with-error         task-with-error           test-exit-handler-153195892   12s       Error (exit code 1)
    
     ✔ test-exit-handler.onExit  test-exit-handler-onExit
     ├─✔ exit-handler            exit-handler              test-exit-handler-2402822171  10s
     └─✔ exit-handler-next       exit-handler-next         test-exit-handler-2592397439  10s
    
    enhancement 
    opened by szdr 10
  • Support multiple python using tox with Poetry

    Support multiple python using tox with Poetry

    fix #73

    What is this PR?

    Related to #93, I implemented support for multiple Python versions using tox. Different from #93, this PR uses Poetry as a package management to simplify configurations of linters, tests and builds.

    opened by hirosassa 10
  • SSL: CERTIFICATE_VERIFY_FAILED

    SSL: CERTIFICATE_VERIFY_FAILED

    Hello I am try to submit flow to argo with the following codes: argo-workflows 6.3.5

    from hera import Task, Workflow from hera import global_config

    global_config.GlobalConfig.host = xxx global_config.GlobalConfig.token = xxx global_config.GlobalConfig.verify_ssl = False

    def say(message: str): print(message)

    with Workflow("diamond") as w: a = Task('a', say, ['This is task A!']) b = Task('b', say, ['This is task B!']) c = Task('c', say, ['This is task C!']) d = Task('d', say, ['This is task D!']) a >> [b, c] >> d

    w.create()

    when i run this scrpt , i got the following error: urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='xxx', port=xxx): Max retries exceeded with url: /api/v1/workflows/default (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1131)')))

    My question is why i get a ssl error after i set global_config.GlobalConfig.verify_ssl = False?

    It seems that set ssl by GlobalConfig is not work...

    And i try to change the source code of hera/workflow_service.py: class WorkflowService: def init( self, host: Optional[str] = None, verify_ssl: bool = False, # change True to False token: Optional[str] = None, namespace: Optional[str] = None, ): After i change the default value of verify_ssl from True to False, i can submit to Argo server successfully.

    So is it a bug or something that global_config.GlobalConfig.verify_ssl does not affect "WorkflowService"'s verify_ssl?

    Thanks

    opened by xuanbo241 4
  • Custom image and inter task communication

    Custom image and inter task communication

    I am facing the issue of not being able to load pickle files from the task that previously saved them. My assumption is it has to do something with the custom image because the example by hera runs without any problems. Any ideas? Here is my code:

    from hera import Artifact, ImagePullPolicy
    from hera.task import Task
    from hera.workflow import Workflow
    from hera.workflow_service import WorkflowService
    
    
    def task_cleaned_data_housing(data_url):
        import pathlib
        import pickle
    
        import data_housing_pipeline_module
    
        data_url = str(data_url)
    
        training_data = data_housing_pipeline_module.get_cleaned_data_housing(
            data_url
        )
    
        if not pathlib.Path("/tmp").joinpath("data_housing_pipeline").exists():
            pathlib.Path("/tmp").joinpath("data_housing_pipeline").mkdir()
        pickle.dump(
            training_data,
            open("/tmp/data_housing_pipeline/variable_training_data.pickle", "wb"),
        )
    
    
    def task_linear_model_housing():
        import pathlib
        import pickle
    
        import data_housing_pipeline_module
    
        training_data = pickle.load(
            open("/tmp/data_housing_pipeline/variable_training_data.pickle", "rb")
        )
    
        linear_model = data_housing_pipeline_module.get_linear_model_housing(
            training_data
        )
    
        if not pathlib.Path("/tmp").joinpath("data_housing_pipeline").exists():
            pathlib.Path("/tmp").joinpath("data_housing_pipeline").mkdir()
        pickle.dump(
            linear_model,
            open("/tmp/data_housing_pipeline/variable_linear_model.pickle", "wb"),
        )
    
    
    ws = WorkflowService(
        host="https://localhost:2746",
        verify_ssl=False,
        token="None",
        namespace="argo",
    )
    
    with Workflow("data-housing-pipeline", service=ws) as w:
    
        cleaned_data_housing = Task(
            "cleaned-data-housing",
            task_cleaned_data_housing,
            [
                {
                    "data_url": "https://raw.githubusercontent.com/LineaLabs/lineapy/main/examples/use_cases/predict_house_price/data/ames_train_cleaned.csv",
                }
            ],
            image="argo_pipeline:latest",
            image_pull_policy=ImagePullPolicy.Never,
            outputs=[
                Artifact(
                    "training_data",
                    "/tmp/data_housing_pipeline/variable_training_data.pickle",
                ),
            ],
        )
    
        linear_model_housing = Task(
            "linear-model-housing",
            task_linear_model_housing,
            image="argo_pipeline:latest",
            image_pull_policy=ImagePullPolicy.Never,
            inputs=[cleaned_data_housing.get_artifact("training_data")],
            outputs=[
                Artifact(
                    "linear_model",
                    "/tmp/data_housing_pipeline/variable_linear_model.pickle",
                ),
            ],
        )
    
        cleaned_data_housing >> linear_model_housing
    
    w.create()
    

    The message I get is: Unable to resolve: "tasks.cleaned-data-housing.outputs.artifacts.training_data" image

    opened by lazargugleta 2
  • Fix name validation, add support for generate_name

    Fix name validation, add support for generate_name

    While browsing through the code, I spotted an issue in validate_name in that it uses re.match (match the beginning of the string) rather than re.fullmatch (match the whole string). One thing led to another, and I found and fixed a couple more bugs, and added a new parameter for proper validation of metadata.generateName.

    Names in k8s cannot end in hyphen "-" or period ".", however the same is a valid choice for metadata.generateName. I have extended validate_name with an optional generate_name parameter to support this, synchronized with the identical parameter of workflow.__init__.

    Changes

    • Validate entire name in validate_name, not only the beginning of the string
      • Without this, the invalid name "test#" would unexpectedly pass validation.
    • Fix quoting of "." in pattern used in validate_name
      • Without this, the invalid name r"test\@" would unexpectedly pass validation
    • Add a new generate_name boolean parameter, default to False; if True, a name ending in a single period "." or any number of hyphens "-" will pass validation, as alphanumeric characters are expected to be appended in the final name
      • Without this, the valid generate name "test-" would unexpectedly fail validation
    • Strip trailing "-" and "." characters when workflow name is reused for an unnamed dag
      • Without this, the valid generate name "test-" would result in a validation error if the dag is not named explicitly
    opened by tachylatus 4
  • Multitask Dag

    Multitask Dag

    Hi,

    I was wondering if there is an example of how to create complex workflows.

    I would like to create a workflow similar to this one, but I haven\t figured out how to specify the steps when I'm creating the workflow.

    Thanks for the help,

    image

    question 
    opened by alejoGT1202 2
  • Feature request: support for archivelog

    Feature request: support for archivelog

    It appears argo supports setting archive log on the Workflow spec and template. It would be nice to have this supported in hera.

    https://argoproj.github.io/argo-workflows/configure-archive-logs/

    enhancement 
    opened by ipl31 0
Owner
argoproj-labs
argoproj-labs
WATTS provides a set of Python classes that can manage simulation workflows for multiple codes where information is exchanged at a coarse level

WATTS (Workflow and Template Toolkit for Simulation) provides a set of Python classes that can manage simulation workflows for multiple codes where information is exchanged at a coarse level.

null 13 Dec 23, 2022
A collection of Workflows samples for various use cases

Workflows Samples Workflows allow you to orchestrate and automate Google Cloud and HTTP-based API services with serverless workflows.

Google Cloud Platform 76 Jan 7, 2023
Automation of VASP DFT workflows with ASE - application scripts

This repo contains a library that aims at automatizing some Density Functional Theory (DFT) workflows in VASP by using the ASE toolkit.

Frank Niessen 5 Sep 6, 2022
Bionic is Python Framework for crafting beautiful, fast user experiences for web and is free and open source.

Bionic is Python Framework for crafting beautiful, fast user experiences for web and is free and open source. Getting Started This is an example of ho

null 14 Apr 10, 2022
Python 3.9.4 Graphics and Compute Shader Framework and Primitives with no external module dependencies

pyshader Python 3.9.4 Graphics and Compute Shader Framework and Primitives with no external module dependencies Fully programmable shader model (even

Alastair Cota 1 Jan 11, 2022
Minos-python - A framework which helps you create reactive microservices in Python

minos-python Summary [TODO] Packages minos-microservice-aggregate minos-microser

Minos Framework 380 Jan 4, 2023
An end-to-end Python-based Infrastructure as Code framework for network automation and orchestration.

Nectl An end-to-end Python-based Infrastructure as Code framework for network automation and orchestration. Features Data modelling and validation. Da

Adam Kirchberger 15 Oct 14, 2022
Simple dependency injection framework for Python

A simple, strictly typed dependency injection library.

BentoML 14 Jun 29, 2022
A topology optimization framework written in Taichi programming language, which is embedded in Python.

Taichi TopOpt (Under Active Development) Intro A topology optimization framework written in Taichi programming language, which is embedded in Python.

Li Zhehao 41 Nov 17, 2022
Sabe is a python framework written for easy web server setup.

Sabe is a python framework written for easy web server setup. Sabe, kolay web sunucusu kurulumu için yazılmış bir python çerçevesidir. Öğrenmesi kola

null 2 Jan 1, 2022
Flames Calculater App used to calculate flames status between two names created using python's Flask web framework.

Flames Finder Web App Flames Calculater App used to calculate flames status between two names created using python's Flask web framework. First, App g

Siva Prakash 4 Jan 2, 2022
Python framework to build apps with the GASP metaphor

Gaspium Python framework to build apps with the GASP metaphor This project is part of the Pyrustic Open Ecosystem. Installation | Documentation | Late

null 5 Jan 1, 2023
Developing a python based app prototype with KivyMD framework for a competition :))

Developing a python based app prototype with KivyMD framework for a competition :))

Jay Desale 1 Jan 10, 2022
TriOTP, the OTP framework for Python Trio

TriOTP, the OTP framework for Python Trio See documentation for more informations. Introduction This project is a simplified implementation of the Erl

David Delassus 7 Nov 21, 2022
That is a example of a Book app on Python, made with support of all JS libraries on React framework

React+Python Books App You can use this repository whenever you want Used for a video Create the database: python -m dbutils Start the web server: pyt

Koma Human 1 Apr 20, 2022
A framework that let's you compose websites in Python with ease!

Perry Perry <= A framework that let's you compose websites in Python with ease! Perry works similar to Qt and Flutter, allowing you to create componen

Linkus 13 Oct 9, 2022
Plux - A dynamic code loading framework for building plugable Python distributions

Plux plux is the dynamic code loading framework used in LocalStack. Overview The

LocalStack 65 Dec 20, 2022
A type based dependency injection framework for Python 3.9+

Alluka A type based dependency injection framework for Python 3.9+. Installation You can install Alluka from PyPI using the following command in any P

Lucina 16 Dec 15, 2022
EasyBuild is a software build and installation framework that allows you to manage (scientific) software on High Performance Computing (HPC) systems in an efficient way.

EasyBuild is a software build and installation framework that allows you to manage (scientific) software on High Performance Computing (HPC) systems in an efficient way.

EasyBuild community 87 Dec 27, 2022