Python Actor concurrency library

Overview

Thespian Actor Library

This library provides the framework of an Actor model for use by applications implementing Actors.

Thespian Site with Documentation: http://thespianpy.com/doc

Please report any issues here via the Github issue tracker.

Latest PyPI version Supported Python versions Stability Travis CI build status

Contacts:

ChangeLog

See the Thespian Release Notes.

History

Thespian was developed at GoDaddy as part of the support for GoDaddy's dedicate and virtual server product line. At one point, Thespian was helping to manage 10,000 physical servers via a combination of on-server and remote actors.

The primary Thespian author (Kevin Quick) is no longer working at GoDaddy, but through GoDaddy's willingness to open-source the Thespian code and transfer the rights to this author, this repository is still being maintained and developed. With heartfelt thanks to the Vertigo team at GoDaddy, this Python Actor model has been developed and refined to provide a highly functional library for general use.

Comments
  • Message going AWOL

    Message going AWOL

    Hi Kevin

    Sorry I didn't get back the previous logging ticket yet, just haven't had time. Another issue has come up in the meantime. I am sending batches of 100 messages between actors. For some batches the first message of the batch is sent by the first actor (as confirmed by logs) but is never received by the second actor (again confirmed by logs). All the rest of the messages in the batch are sent and received successfully.

    I am pretty stumped right now as I have tried to set the THESPLOG_THRESHOLD env var to "Debug" and "Info" but there is nothing in /tmp/ dir (i.e. thespian.log does not exist). I have written some simplified code to try and reproduce what happens in my main system but so far the simplified code works fine.

    Again this is running inside a docker container. My simplified code is below but as I said this runs fine without dropping any messages. Anything you can suggest to help me get more information from the Thespian internal logging for example?

    Thanks

    Andrew

    from thespian.troupe import troupe
    from thespian.actors import ActorTypeDispatcher
    from thespian.actors import ActorSystem
    from thespian.actors import WakeupMessage
    import logging
    
    
    class ActorLogFilter(logging.Filter):
        def filter(self, logrecord):
            return 'actorAddress' in logrecord.__dict__
    
    
    class NotActorLogFilter(logging.Filter):
        def filter(self, logrecord):
            return 'actorAddress' not in logrecord.__dict__
    
    
    def log_config(log_file_path_1, log_file_path_2):
        return {
            'version': 1,
            'formatters': {
                'normal': {'format': '%(levelname)-8s %(message)s'},
                'actor': {'format': '%(levelname)-8s %(actorAddress)s => %(message)s'}},
            'filters': {'isActorLog': {'()': ActorLogFilter},
                        'notActorLog': {'()': NotActorLogFilter}},
            'handlers': {'h1': {'class': 'logging.FileHandler',
                                'filename': log_file_path_1,
                                'formatter': 'normal',
                                'filters': ['notActorLog'],
                                'level': logging.INFO},
                         'h2': {'class': 'logging.FileHandler',
                                'filename': log_file_path_2,
                                'formatter': 'actor',
                                'filters': ['isActorLog'],
                                'level': logging.INFO}, },
            'loggers': {'': {'handlers': ['h1', 'h2'], 'level': logging.DEBUG}}
        }
    
    
    class PrimaryActor(ActorTypeDispatcher):
    
        def receiveMsg_WakeupMessage(self, msg ,sender):
    
            if not hasattr(self, "batch_number"):
                self.submission_actor_pool_size = 2
                self.batch_number = 0
                self.last_secondary_actor_used = -1
                self.secondary_actors = self.create_secondary_actor_pool(
                    SecondaryActor,
                    self.submission_actor_pool_size
                )
            next_submission_actor_to_use = self.last_secondary_actor_used + 1
            for x in range(0, 100):
                message = {"number": x, "batch": self.batch_number}
                logging.info("Sending message number {0} {1}".format(self.batch_number, x))
    
                self.send(self.secondary_actors[next_submission_actor_to_use], message)
    
            self.batch_number += 1
    
            if self.batch_number >= 5:
                return
    
            if next_submission_actor_to_use > self.submission_actor_pool_size - 1:
                next_submission_actor_to_use = -1
    
            self.last_submission_actor_used = next_submission_actor_to_use
            self.wakeupAfter(1)
    
        def create_secondary_actor_pool(self, actor_code, pool_size):
            submission_actor_pool = []
            for x in range(0, pool_size):
                submission_actor_pool.append(self.createActor(actor_code))
            return submission_actor_pool
    
    
    @troupe(max_count=4000, idle_count=1)
    class SecondaryActor(ActorTypeDispatcher):
    
        def receiveMsg_dict(self, msg, sender):
    
            logging.info("Received message number {0} {1}".format(msg["batch"], msg["number"]))
    
    thespian_system = ActorSystem(
        "multiprocTCPBase",
        {},
        logDefs=log_config("bug_check_1.log", "bug_check_2.log")
    )
    
    primary_actor = thespian_system.createActor(PrimaryActor)
    
    thespian_system.tell(primary_actor, WakeupMessage(delayPeriod=1))
    
    opened by andatt 21
  • High memory use and actor system timeout with about 2,000 actors

    High memory use and actor system timeout with about 2,000 actors

    I'm designing a simulation that should be able to scale across a cluster of computers. On my Windows 10 laptop with 6 cores and 32GB of memory, I am encountering both a memory issue and actor system timeout. I'm using multiprocTCPBase and get a timeout error (trace below) after 1,994 have been created. Together these actors use 20GB of memory (not including 5GB used at machine startup). Two actors store large dictionaries that include the addresses of many other actors. The remaining actors store only a few integer and string values. I was surprised by the memory usage, though my laptop did not actually run out of memory.

    Since I'm interested in multi-core, multi-machine performance, I thought the actors should be separate processes, but perhaps there is too much overhead? I'd welcome advice about how to redesign the actors to use less memory as well as help identifying the reason for the time out.

            [lots of lines like this one below...]
    	Cell {A:Cell @ ActorAddr-(T|:58746)} initialized: oid:41_34 x:41 y:34
    Traceback (most recent call last):
      File "D:/Documents/Programming/ActiveFiction/python_code/thespian_test_01.py", line 409, in <module>
        cell = asys.createActor(Cell)
      File "C:\Users\dvyd\.conda\envs\activefiction\lib\site-packages\thespian\actors.py", line 705, in createActor
        sourceHash)
      File "C:\Users\dvyd\.conda\envs\activefiction\lib\site-packages\thespian\system\systemBase.py", line 217, in newPrimaryActor
        str(self.transport.myAddress)))
    thespian.actors.ActorSystemRequestTimeout: No response received to PendingActor request to Admin at ActorAddr-(T|:1900) from ActorAddr-(T|:62606)
    
    Process finished with exit code 1
    
    opened by davideps 16
  • Thespian + RabbitMQ (Pika)

    Thespian + RabbitMQ (Pika)

    In my actor based application I want to use Thespian actors along with the Pika integration to RabbitMQ instance. My problem is: what should be the best pattern of such integration? In other words: how to read RabbitMQ messages delivered by Pika to one of a Thespian actor? In a non blocking and best asynchronous way?

    opened by htarnacki 14
  • system hangs with 3 troupe actors

    system hangs with 3 troupe actors

    Hi Kevin,

    I have another one for you:

    from thespian.troupe import troupe
    from thespian.actors import ActorTypeDispatcher, Actor
    from thespian.actors import ActorSystem
    import logging
    import time
    
    
    
    def logfile_extraction(log_files):
        """
        Gets data from logfile and returns as string
        :param log_files: string, path to logfile
        :return: string
        """
        consolidated_output = ""
        for logfile in log_files:
            with open(logfile, "r+") as file:
                consolidated_output += file.read()
    
        return consolidated_output
    
    
    class ActorLogFilter(logging.Filter):
        def filter(self, logrecord):
            return 'actorAddress' in logrecord.__dict__
    
    
    class NotActorLogFilter(logging.Filter):
        def filter(self, logrecord):
            return 'actorAddress' not in logrecord.__dict__
    
    
    def log_config(log_file_path_1, log_file_path_2):
        return {
            'version': 1,
            'formatters': {
                'normal': {'format': '%(levelname)-8s %(message)s'},
                'actor': {'format': '%(levelname)-8s %(actorAddress)s => %(message)s'}},
            'filters': {'isActorLog': {'()': ActorLogFilter},
                        'notActorLog': {'()': NotActorLogFilter}},
            'handlers': {'h1': {'class': 'logging.FileHandler',
                                'filename': log_file_path_1,
                                'formatter': 'normal',
                                'filters': ['notActorLog'],
                                'level': logging.INFO},
                         'h2': {'class': 'logging.FileHandler',
                                'filename': log_file_path_2,
                                'formatter': 'actor',
                                'filters': ['isActorLog'],
                                'level': logging.INFO}, },
            'loggers': {'': {'handlers': ['h1', 'h2'], 'level': logging.DEBUG}}
        }
    
    
    class PrimaryActor(Actor):
    
        def receiveMessage(self, msg, sender):
    
            test_data = [
                [1, 2, 3, 4, 5] * 2,
                [6, 7, 8, 9, 10] * 2,
                [11, 12, 13, 14, 15] * 2,
                [16, 17, 18, 19, 20] * 2,
                [21, 22, 23, 24, 25] * 2,
                [1, 2, 3, 4, 5] * 2,
                [6, 7, 8, 9, 10] * 2,
                [11, 12, 13, 14, 15] * 2,
                [16, 17, 18, 19, 20] * 2,
                [21, 22, 23, 24, 25] * 2,
                [1, 2, 3, 4, 5] * 2
            ]
    
            if not hasattr(self, "helper"):
                self.helper = self.createActor(
                    SecondaryActor
                )
    
            for data in test_data:
    
                self.send(
                    self.helper,
                    data
                )
                
    
    @troupe(max_count=200, idle_count=1)
    class SecondaryActor(ActorTypeDispatcher):
        received_message_count = 0
        def receiveMessage(self, msg, sender):
    
            if isinstance(msg, list):
                if not hasattr(self, "helper"):
                    self.helper = self.createActor(
                        TertiaryActor
                    )
    
                for data in msg:
    
                    self.send(
                        self.helper,
                        data
                    )
    
    
    @troupe(max_count=200, idle_count=1)
    class TertiaryActor(ActorTypeDispatcher):
        received_message_count = 0
    
        def receiveMessage(self, msg, sender):
            qa = self.createActor(
                QuaternaryActor,
                globalName="quaternay"
            )
    
            self.send(
                qa,
                msg
            )
    
    
    @troupe(max_count=200, idle_count=1)
    class QuaternaryActor(ActorTypeDispatcher):
    
        def receiveMessage(self, msg, sender):
    
            if isinstance(msg, int):
    
                logging.info("Received message number {0}".format(msg))
    
    
    thespian_system = ActorSystem(
        "multiprocTCPBase",
        {},
        logDefs=log_config("bug_check_1.log", "bug_check_2.log")
    )
    
    primary_actor = thespian_system.createActor(PrimaryActor)
    
    quaternary_actor = thespian_system.createActor(
        QuaternaryActor,
        globalName="quaternay"
    )
    
    thespian_system.tell(primary_actor, {})
    

    I am expecting to see 110 'Received message' entries in logfile. Instead the actor system hangs, actors stop work but are still there. My logfile output (truncated) is:

    INFO     ActorAddr-(T|:39395) => Received message number 1
    INFO     ActorAddr-(T|:39395) => Received message number 6
    INFO     ActorAddr-(T|:41401) => Received message number 2
    INFO     ActorAddr-(T|:40165) => Received message number 9
    INFO     ActorAddr-(T|:45173) => Received message number 3
    INFO     ActorAddr-(T|:36873) => Received message number 7
    INFO     ActorAddr-(T|:45585) => Received message number 21
    INFO     ActorAddr-(T|:34161) => Received message number 4
    INFO     ActorAddr-(T|:40759) => Received message number 13
    INFO     ActorAddr-(T|:46429) => Received message number 8
    INFO     ActorAddr-(T|:42633) => Received message number 12
    INFO     ActorAddr-(T|:44969) => Received message number 5
    INFO     ActorAddr-(T|:42815) => Received message number 10
    INFO     ActorAddr-(T|:42815) => Received message number 4
    INFO     ActorAddr-(T|:34829) => Received message number 14
    INFO     ActorAddr-(T|:34829) => Received message number 9
    INFO     ActorAddr-(T|:38961) => Received message number 10
    INFO     ActorAddr-(T|:41837) => Received message number 24
    INFO     ActorAddr-(T|:41837) => Received message number 4
    INFO     ActorAddr-(T|:41837) => Received message number 25
    INFO     ActorAddr-(T|:41837) => Received message number 5
    INFO     ActorAddr-(T|:41837) => Received message number 5
    INFO     ActorAddr-(T|:41837) => Received message number 10
    INFO     ActorAddr-(T|:41837) => Received message number 14
    INFO     ActorAddr-(T|:41837) => Received message number 15
    INFO     ActorAddr-(T|:41837) => Received message number 24
    INFO     ActorAddr-(T|:41837) => Received message number 25
    INFO     ActorAddr-(T|:38785) => Received message number 15
    INFO     ActorAddr-(T|:33405) => Received message number 9
    ERROR    ActorAddr-(T|:46227) => Pending Actor create for ActorAddr-(T|:46227) failed (3585): None
    ERROR    ActorAddr-(T|:40795) => Pending Actor create for ActorAddr-(T|:40795) failed (3585): None
    ERROR    ActorAddr-(T|:42263) => Pending Actor create for ActorAddr-(T|:42263) failed (3585): None
    ERROR    ActorAddr-(T|:46227) => Pending Actor create for ActorAddr-(T|:46227) failed (3585): None
    ERROR    ActorAddr-(T|:35911) => Pending Actor create for ActorAddr-(T|:35911) failed (3585): None
    ERROR    ActorAddr-(T|:45909) => Pending Actor create for ActorAddr-(T|:45909) failed (3585): None
    ERROR    ActorAddr-(T|:35911) => Pending Actor create for ActorAddr-(T|:35911) failed (3585): None
    ERROR    ActorAddr-(T|:35911) => Pending Actor create for ActorAddr-(T|:35911) failed (3585): None
    ERROR    ActorAddr-(T|:46227) => Pending Actor create for ActorAddr-(T|:46227) failed (3585): None
    ERROR    ActorAddr-(T|:44393) => Pending Actor create for ActorAddr-(T|:44393) failed (3585): None
    ERROR    ActorAddr-(T|:45909) => Pending Actor create for ActorAddr-(T|:45909) failed (3585): None
    ERROR    ActorAddr-(T|:38583) => Pending Actor create for ActorAddr-(T|:38583) failed (3585): None
    ERROR    ActorAddr-(T|:46227) => Pending Actor create for ActorAddr-(T|:46227) failed (3585): None
    ERROR    ActorAddr-(T|:44393) => Pending Actor create for ActorAddr-(T|:44393) failed (3585): None
    ERROR    ActorAddr-(T|:45909) => Pending Actor create for ActorAddr-(T|:45909) failed (3585): None
    ERROR    ActorAddr-(T|:35119) => Pending Actor create for ActorAddr-(T|:35119) failed (3585): None
    ERROR    ActorAddr-(T|:38583) => Pending Actor create for ActorAddr-(T|:38583) failed (3585): None
    ERROR    ActorAddr-(T|:45909) => Pending Actor create for ActorAddr-(T|:45909) failed (3585): None
    ERROR    ActorAddr-(T|:44535) => Pending Actor create for ActorAddr-(T|:44535) failed (3585): None
    ERROR    ActorAddr-(T|:45909) => Pending Actor create for ActorAddr-(T|:45909) failed (3585): None
    ERROR    ActorAddr-(T|:35119) => Pending Actor create for ActorAddr-(T|:35119) failed (3585): None
    ERROR    ActorAddr-(T|:44393) => Pending Actor create for ActorAddr-(T|:44393) failed (3585): None
    ERROR    ActorAddr-(T|:38583) => Pending Actor create for ActorAddr-(T|:38583) failed (3585): None
    ERROR    ActorAddr-(T|:44535) => Pending Actor create for ActorAddr-(T|:44535) failed (3585): None
    ERROR    ActorAddr-(T|:44393) => Pending Actor create for ActorAddr-(T|:44393) failed (3585): None
    ERROR    ActorAddr-(T|:38583) => Pending Actor create for ActorAddr-(T|:38583) failed (3585): None
    ERROR    ActorAddr-(T|:35119) => Pending Actor create for ActorAddr-(T|:35119) failed (3585): None
    ERROR    ActorAddr-(T|:46227) => Pending Actor create for ActorAddr-(T|:46227) failed (3585): None
    ERROR    ActorAddr-(T|:35119) => Pending Actor create for ActorAddr-(T|:35119) failed (3585): None
    ERROR    ActorAddr-(T|:44535) => Pending Actor create for ActorAddr-(T|:44535) failed (3585): None
    ERROR    ActorAddr-(T|:44393) => Pending Actor create for ActorAddr-(T|:44393) failed (3585): None
    ERROR    ActorAddr-(T|:44535) => Pending Actor create for ActorAddr-(T|:44535) failed (3585): None
    ERROR    ActorAddr-(T|:38503) => Pending Actor create for ActorAddr-(T|:38503) failed (3585): None
    ERROR    ActorAddr-(T|:38503) => Pending Actor create for ActorAddr-(T|:38503) failed (3585): None
    ERROR    ActorAddr-(T|:35245) => Pending Actor create for ActorAddr-(T|:35245) failed (3585): None
    ERROR    ActorAddr-(T|:37605) => Pending Actor create for ActorAddr-(T|:37605) failed (3585): None
    ERROR    ActorAddr-(T|:38503) => Pending Actor create for ActorAddr-(T|:38503) failed (3585): None
    ERROR    ActorAddr-(T|:35245) => Pending Actor create for ActorAddr-(T|:35245) failed (3585): None
    ERROR    ActorAddr-(T|:37605) => Pending Actor create for ActorAddr-(T|:37605) failed (3585): None
    ERROR    ActorAddr-(T|:34611) => Pending Actor create for ActorAddr-(T|:34611) failed (3585): None
    ERROR    ActorAddr-(T|:37605) => Pending Actor create for ActorAddr-(T|:37605) failed (3585): None
    ERROR    ActorAddr-(T|:34611) => Pending Actor create for ActorAddr-(T|:34611) failed (3585): None
    

    /tmp/thespian.log is:

    2019-03-28 20:31:44.250386 p10786 ERR  Socket error sending to ActorAddr-(T|:44599) on <socket.socket fd=29, family=AddressFamily.AF_INET, type=2049, proto=6, laddr=('192.168.0.14', 35535)>: [Errno 104] Connection reset by peer / 104: ************* TransportIntent(ActorAddr-(T|:44599)-pending-ExpiresIn_0:04:59.999222-<class 'thespian.actors.ChildActorExited'>-ChildActorExited:ActorAddr-(T|:35535)-quit_0:04:59.999193)
    2019-03-28 20:31:53.808630 p10385 ERR  No response to Admin shutdown request; Actor system not completely shutdown
    

    Any ideas what'ts going on?

    Thanks

    opened by andatt 10
  • Logging makes actor hang

    Logging makes actor hang

    Been having this issue intermittently for a while now. When there is a large amount of logging output, for example a logging message inside a for loop, then beyond a certain number of iterations the actor will hang.

    I have confirmed the cause is the logging statement inside the for loop as everything behaves as expected when logging statement is removed. I tried adding additional log files in case there is some file locking issue. I am using the standard logging config as recommended in the docs:

    class ActorLogFilter(logging.Filter):
        def filter(self, logrecord):
            return 'actorAddress' in logrecord.__dict__
    
    
    class NotActorLogFilter(logging.Filter):
        def filter(self, logrecord):
            return 'actorAddress' not in logrecord.__dict__
    
    
    def log_config(log_file_path_1, log_file_path_2):
        return {
            'version': 1,
            'formatters': {
                'normal': {'format': '%(levelname)-8s %(message)s'},
                'actor': {'format': '%(levelname)-8s %(actorAddress)s => %(message)s'}},
            'filters': {'isActorLog': {'()': ActorLogFilter},
                        'notActorLog': {'()': NotActorLogFilter}},
            'handlers': {'h1': {'class': 'logging.FileHandler',
                                'filename': log_file_path_1,
                                'formatter': 'normal',
                                'filters': ['notActorLog'],
                                'level': logging.INFO},
                         'h2': {'class': 'logging.FileHandler',
                                'filename': log_file_path_2,
                                'formatter': 'actor',
                                'filters': ['isActorLog'],
                                'level': logging.INFO}, },
            'loggers': {'': {'handlers': ['h1', 'h2'], 'level': logging.DEBUG}}
        }
    

    Is there any known issues around logging that could be causing this?

    Edit 1:

    Some additional information - when exiting the hanging actor with control-c the Thespian part of the the traceback looks like:

      File "/app/tests/testing_helpers.py", line 505, in wrapper
        return wrapped_func(*args, **kwargs)
      File "/app/tests/system/system_test_structure.py", line 56, in _get_test_results
        actor_system.ask(defd_actor, setup_message)
      File "/usr/local/lib/python3.5/site-packages/thespian/actors.py", line 736, in ask
        return self._systemBase.ask(actorAddr, msg, timeout)
      File "/usr/local/lib/python3.5/site-packages/thespian/system/systemBase.py", line 264, in ask
        response = self._run_transport(remTime.remaining())
      File "/usr/local/lib/python3.5/site-packages/thespian/system/systemBase.py", line 140, in _run_transport
        max_runtime.view().remaining())
      File "/usr/local/lib/python3.5/site-packages/thespian/system/transport/wakeupTransportBase.py", line 71, in run
        rval = self._run_subtransport(incomingHandler)
      File "/usr/local/lib/python3.5/site-packages/thespian/system/transport/wakeupTransportBase.py", line 80, in _run_subtransport
        rval = self._runWithExpiry(incomingHandler)
      File "/usr/local/lib/python3.5/site-packages/thespian/system/transport/TCPTransport.py", line 1140, in _runWithExpiry
        set(wsend+wrecv), delay)
    

    Edit 2:

    This actor system / actor is being run inside a docker container. I have run the same code outside docker and the issue is resolved. So the problem in question seems to be caused by some interaction between logging and docker.

    opened by andatt 9
  • Pass max_count to troupe decorator dynamically

    Pass max_count to troupe decorator dynamically

    I am struggling to be able to pass a max_count value to the troupe decorator dynamically. I am calling like this:

    from thespian.troupe import troupe
    from thespain.actors import *
    
    count = 30
    decorated_actor= troupe(max_count=count)(Actor)
    

    But when I pass the decorated_actor to the actor system it behaves as if its not decorated. What am I doing wrong here?

    opened by andatt 8
  • Actors won´t exit

    Actors won´t exit

    Hello....

    I'm working on Windows 10 with OpenCV and Thespian to read multiple real time video feeds. Since videos come at 30 FPS I have an Actor per camera capturing the videos. The captured frame is stored on a memory mapped file for processing by other actors. The Actor capturing frames (VideoDecoder) starts a helper Actor (DrumBeat) that beats every 30ms notifying the actor that it should capture a new frame. DrumBeat uses wakeupAfter to beat at the 30ms rhythm. So far, so good...

    When I shutdown the system, I see the exitActor message being propagated from the system to all the created Actors. However, VideoDecoder never gets the exitActor message. Seems to me that the wakeup messages from DrumBeat don´t let VideoDecoder receive any other messages. So, VideoDecoder never exists and the same for DrumBeat.

    Any ideas on how to exit those processes? Am I doing something wrong?

    Thanks

    opened by rbotafogo 7
  • Thespian with openCV

    Thespian with openCV

    Hello,

    I´m trying to use Thespian with openCV and I would appreciate some advice on the architecture.

    I need to develop a video processing application that works with more than 20 cameras simultaneously on a single machine. My idea was to create parallel actors that would create, let´s say, 10 new threads each, one to read each camera. So, for 20 cameras there would be 2 parallel actors and 20 threaded actors. After reading the data, each feed goes through a process of object counting and tracking using Tensorflow and will require more parallel and threaded actors (but this is not really relevant here).

    According to the 'in_depth' manual: "There can be bases for implementing Actors as separate processes, or separate threads, or simply sequential execution", but I could not find any base for separate threads.

    The alternative seems to be to create a separate process per camera, but I´m afraid this might consume to many processes... can this be an issue, considering that many more processes might be required for the whole system.

    Do you have any advice on how to proceed? Thanks a lot!

    opened by rbotafogo 7
  • ActorFailure Clarification

    ActorFailure Clarification

    From the Using.pdf (page 13 section 2.2.5) documentation's Actor Failure section:

    If the Actor code handling a message fails, the ActorSystem will automatically handle that failure and restart the Actor. For this reason, Actor-based code is typically much simpler than conventional code because it does not have the overhead of lots of exception checking and error condition handling.

    When writing Actor code, the general philosophy is "fail early"; the ActorSystem itself will restart the Actor in the event of a failure and possibly retry the message or abort that specific message. See PoisonMessage Returns for more details.

    According to this, the actor will be restarted following a failure. However I am unsure of what is meant by restarting the Actor in this context.

    I set up some test code with a parent and a child actor.

    The parent actor code is below:

    from thespian.actors import ActorTypeDispatcher
    import child
    import random
    
    class Parent(ActorTypeDispatcher):
        failed = False
        def __init__(self, *args, **kwargs):
            super(Parent, self).__init__(*args, **kwargs)
    
        def receiveMsg_Initialize(self, msg, sender):
            self.child_actor = self.createActor(child.Child)
            x = random.randint(0, 100)
            self.send(self.child_actor, child.Initialize(x=x))
            self.wakeupAfter(timePeriod=1)
    
        def receiveMsg_WakeupMessage(self, msg, sender):
            if self.failed is False:
                self.send(self.child_actor, child.FailMessage())
                self.failed = True
                self.wakeupAfter(timePeriod=1)
            else:
                self.send(self.child_actor, child.PrintValues())
    
        def receiveMsg_PoisonMessage(self, msg, sender):
            self.send(self.child_actor, child.PrintValues())
    

    ...and the child code:

    from thespian.actors import ActorTypeDispatcher
    import random
    
    class FailMessage:
        def __init__(self):
            pass
    
    class PrintValues:
        def __init__(self):
            pass
    
    class Initialize:
        def __init__(self, x=None) -> None:
            self.x = x
    
    class Child(ActorTypeDispatcher):
        x1 = None
        x2 = None
        def __init__(self, *args, **kwargs):
            super(Child, self).__init__(*args, **kwargs)
            x1 = random.randint(0, 100)
            self.x1 = x1
            print(f'__init__: addr={hex(id(self))}, x1={self.x1}, x2={self.x2}')
    
        def receiveMsg_Initialize(self, msg, sender):
            self.x2 = msg.x
            print(f'Initialize: addr={hex(id(self))}, x1={self.x1}, x2={self.x2}')
    
        def receiveMsg_FailMessage(self, msg, sender):
            print('Forcing exception in child!')
            raise ValueError
    
        def receiveMsg_PrintValues(self, msg, sender):
            print(f'PrintValues: addr={hex(id(self))}, x1={self.x1}, x2={self.x2}')
    

    When running this code, the child actor will print out its class' address in memory and the two random values; x1 being set in the class __init__ and x2 being set after receiving the Initialize message.

    In this example, all three of class address, x1, and x2 have the same values after the actor has failed and restarted. From the documentation, I had assumed that when an actor restarts, it would essentially run the createActor function while retaining the previous ActorAddress however that doesn't appear to be the case since the __init__ function isn't being run again.

    Here is my current understanding of the Actor Failure process:

    1. Message is sent to an actor.
    2. receiveMsg function raises exception in called actor.
    3. PoisonMessage returned to sender and message is sent a second time.
    4. receiveMsg function raises exception again in the called actor.
    5. PoisonMessage returned to sender again.
    6. Failing actor is restarted by unknown means.
    7. Failing actor's ActorAddress is retained after restart. Sort of like a rollback of the actor's state to just before the exception was raised, maybe?

    So what I'm looking for is a more in depth explanation of HOW actors restart upon an Actor Failure since the documentation is pretty vague.

    opened by bryang-spindance 6
  • Missing ChildActorExited messages

    Missing ChildActorExited messages

    Hi Kevin,

    I think this might just be me being stupid but I am using the following code:

    from thespian.troupe import troupe
    from thespian.actors import ActorTypeDispatcher, Actor
    from thespian.actors import ActorSystem
    import logging
    
    class ActorLogFilter(logging.Filter):
        def filter(self, logrecord):
            return 'actorAddress' in logrecord.__dict__
    
    
    class NotActorLogFilter(logging.Filter):
        def filter(self, logrecord):
            return 'actorAddress' not in logrecord.__dict__
    
    
    def log_config(log_file_path_1, log_file_path_2):
        return {
            'version': 1,
            'formatters': {
                'normal': {'format': '%(levelname)-8s %(message)s'},
                'actor': {'format': '%(levelname)-8s %(actorAddress)s => %(message)s'}},
            'filters': {'isActorLog': {'()': ActorLogFilter},
                        'notActorLog': {'()': NotActorLogFilter}},
            'handlers': {'h1': {'class': 'logging.FileHandler',
                                'filename': log_file_path_1,
                                'formatter': 'normal',
                                'filters': ['notActorLog'],
                                'level': logging.INFO},
                         'h2': {'class': 'logging.FileHandler',
                                'filename': log_file_path_2,
                                'formatter': 'actor',
                                'filters': ['isActorLog'],
                                'level': logging.INFO}, },
            'loggers': {'': {'handlers': ['h1', 'h2'], 'level': logging.DEBUG}}
        }
    
    
    class PrimaryActor(ActorTypeDispatcher):
    
        def receiveMsg_dict(self, msg, sender):
            if not hasattr(self, "secondary_actor"):
                self.secondary_actor = self.createActor(SecondaryActor)
            else:
                logging.info("sending from primary...")
                self.send(self.secondary_actor, msg)
    
        def receiveMsg_ChildActorExited(self, message, sender):
            logging.info("A child of actor {} has exited...".format(
                self.__class__.__name__
            ))
    
    
    @troupe(max_count=20, idle_count=1)
    class SecondaryActor(ActorTypeDispatcher):
        received_message_count = 0
    
        def receiveMessage(self, msg, sender):
            logging.info("received in secondary...")
    
        def receiveMsg_ChildActorExited(self, message, sender):
            logging.info("A child of actor {} has exited...".format(
                self.__class__.__name__
            ))
    
    thespian_system = ActorSystem(
        "multiprocTCPBase",
        {},
        logDefs=log_config("bug_check_1.log", "bug_check_2.log")
    )
    
    primary_actor = thespian_system.createActor(PrimaryActor)
    
    
    for x in range(0, 1000):
        message = {"number": x}
        thespian_system.tell(primary_actor, message)
    

    I would expect to see some logging messages stating the child actors have exited but I get nothing. I just see the send / received messages. Is there something wrong with the code?

    Thanks

    opened by andatt 6
  • Low performance sending low latency messages between actors

    Low performance sending low latency messages between actors

    I using Thespian to develop an application to consume low latency data via sockets. I had created initially two actors: one consumer (socket client connection) and the handler. Testing, I detected some delays between the time used to send one message in the consumer (Actor1) and when this message has arrived in the handler (Actor2).

    To test that, I had used the following code.

    from thespian.actors import ActorSystem, Actor
    import time
    
    
    class Actor1(Actor):
        def receiveMessage(self, handler, sender):
            # benchmark
            data = {'tic': time.perf_counter(), 'lim': 10000}
            print('Elapsed Time to process {} messages'.format(data['lim']))
            for i in range(data['lim']):
                self.send(handler, data)
            # perf
            toc = time.perf_counter() - data['tic']
            print('Actor 1: {} sec'.format(round(toc, 3)))
            # self.actorSystemShutdown()
    
    
    class Actor2(Actor):
        def __init__(self):
            self.msg_count = 0
    
        def receiveMessage(self, data, sender):
            self.msg_count += 1
            if self.msg_count == data['lim']:
                toc = time.perf_counter() - data['tic']
                print('Actor 2: {} sec'.format(round(toc, 3)))
    
    def main():
        asys = ActorSystem('multiprocTCPBase')
        consumer = asys.createActor(Actor1)
        handler = asys.createActor(Actor2)
        asys.tell(consumer, handler)
    
    if __name__ == '__main__':
        main()
    

    Results:

    Elapsed Time to process 10 messages
    Actor 1: 0.002 sec
    Actor 2: 0.008 sec
    
    Elapsed Time to process 100 messages
    Actor 1: 0.019 sec
    Actor 2: 0.099 sec
    
    Elapsed Time to process 1000 messages
    Actor 1: 0.131 sec
    Actor 2: 0.769 sec
    
    Elapsed Time to process 10000 messages
    Actor 1: 1.219 sec
    Actor 2: 7.608 sec
    
    Elapsed Time to process 100000 messages
    Actor 1: 22.012 sec
    Actor 2: 91.419 sec
    

    Looking these results, There is something wrong or missing that I have in my code?, or there is another faster way to send the messages between actors? Also, there is any other benchmark that help us to analyze the performancecommented in the documentation.

    opened by jonatelo 6
  • Question - Communicate metadata with message

    Question - Communicate metadata with message

    Hi, I'm looking to implement a simple OpenTelemetry (OT) instrumentation to have distributed tracing between my actors (the goal is to find out where are the performances issues I have on another project using Thespian). I have several actors communicating with each other and basically, I want to trace the different messages exchange (ex: actor A received a message that took 2ms to process and then sent a message to actor B that took 1ms to process...).

    The way OT works, I would need to propagate a context between the different actors, along with the messages exchanged (so OT can group the different calls together). I wanted to know if there were any way to send this context as a metadata along with the message I send between two actors.

    A quick and dirty way to implement this would be to bundle the message with the context. The issue I have with this is that it impact all the application (as every actor now needs to separate the tracing context and the received message). That means that I need two implementations of my app if i want the tracing to be optional. That's why I thought that sending the context as a metadata would be "cleaner".

    Does the current implementation of Thespian have anything like his? If not, would you consider it worth implementing for Thespian (I could try to implement it).

    Thanks

    opened by dsaingre 1
  • Logging differences based on startup network state

    Logging differences based on startup network state

    Hi again. Sorry to keep posting new issues for you.

    Environment

    Tested on Debian 10 (buster) with an ARM Cortex A8 (1 core) Also tested on an Ubuntu 20.04 LTS VM with 4 cores allocated to test if this was a threading issue.

    Python Versions: 3.7.3 3.8.10, 3.9.5

    Manually set the following environment variables: export THESPLOG_FILE=./thespian.log export THESPLOG_THRESHOLD=DEBUG

    Problem

    The Thespian logging system has unexpected behavior when network connectivity is lost on the host machine, specifically when network connectivity is lost in the form of the network adapter becoming disabled or disconnected. This is an issue since if WiFi connection is lost or an ethernet cable becomes unplugged, the network adapter is disabled.

    There are two main scenarios that I've discovered when testing this issue. The two files below are used in both scenarios.

    start.py

    This is the startup script for this test. It sets up the actorsystem with logging then creates the top-level actor. It then enters a loop where it waits for a Ctrl+C (SIGINT) signal. When this signal is caught, the actor system is shut down and the program exits.

    import signal
    from thespian.actors import ActorSystem
    import time
    import top
    
    import logging
    class actorLogFilter(logging.Filter):
        def filter(self, logrecord):
            return 'actorAddress' in logrecord.__dict__
    
    class notActorLogFilter(logging.Filter):
        def filter(self, logrecord):
            return 'actorAddress' not in logrecord.__dict__
    
    logcfg = {
        'version': 1,
        'formatters': {
            'normal': {'format': '%(levelname)-8s %(message)s'}, 'actor': {'format': '%(levelname)-8s %(actorAddress)s => %(message)s'}
        },
        'filters': {'isActorLog': {'()': actorLogFilter}, 'notActorLog': {'()': notActorLogFilter}},
        'handlers': {
            'h1': {
                'class': 'logging.FileHandler',
                'filename': 'actor_log.log',
                'formatter': 'normal',
                'filters': ['notActorLog'],
                'level': logging.DEBUG
            },
            'h2': {
                'class': 'logging.FileHandler',
                'filename': 'actor_log.log',
                'formatter': 'actor',
                'filters': ['isActorLog'],
                'level': logging.DEBUG}
            },
        'loggers' : {'': {'handlers': ['h1', 'h2'], 'level': logging.DEBUG}
        }
    }
    
    def handler(signum: int, frame):
        ActorSystem('multiprocTCPBase').shutdown()
        exit(1)
    
    if __name__ == "__main__":
        asys = ActorSystem('multiprocTCPBase', logDefs=logcfg)
        # Signal handling for stopping the actor system using Ctrl+C
        signal.signal(signal.SIGINT, handler)
    
        top_actor = asys.createActor(top.Top, globalName="top")
        asys.tell(top_actor, top.Initialize())
    
        while True:
            time.sleep(1)
    

    top.py

    This is the top level actor. It has two wakeup loops that we are using to figure out what is happening in the logs. The first wakeup loop is print which just prints out the number of times that it has woken up. We use this to ensure that the system is still running while network is disconnected. The second loop is network which checks if we have network connectivity every second and prints out "CONNECTED" or "DISCONNECTED" whenever the network state changes.

    import logging as log
    import os
    import http.client as httplib
    from thespian.actors import ActorTypeDispatcher
    
    class Initialize:
        def __init__(self) -> None:
            pass
    
    def ping(ip, timeout=0.5):
        conn = httplib.HTTPSConnection(ip, timeout=timeout)
        try:
            conn.request("HEAD", "/")
            return True
        except Exception:
            return False
        finally:
            conn.close()
    
    def is_online(ips=None):
        ips = ["1.1.1.1", "8.8.8.8", "8.8.4.4", "4.2.2.4"] if ips is None else ips
        return any([ping(ip) for ip in ips])
    
    class Top(ActorTypeDispatcher):
        def __init__(self, *args, **kwargs) -> None:
            super().__init__(*args, **kwargs)
            self.wakeup_count = 0
            self.online = False
    
        def receiveMsg_Initialize(self, msg, sender):
            log.debug(f"Top {self.myAddress} (P|:{os.getpid()}) received Initialize command from {sender}")
            self.wakeupAfter(timePeriod=1, payload='network')
            self.wakeupAfter(timePeriod=5, payload='print')
    
        def receiveMsg_WakeupMessage(self, msg, sender):
            if msg.payload == 'network':
                log.debug(f'CHECKING NETWORK STATUS')
                prev_online = self.online
                self.online = is_online()
                if self.online != prev_online:
                    if self.online:
                        log.critical(f'CONNECTED!')
                    else:
                        log.critical(f'DISCONNECTED!')
                self.wakeupAfter(timePeriod=1, payload='network')
                return None
    
            if msg.payload == 'print':
                log.debug(f'WAKEUP COUNT: {self.wakeup_count}')
                self.wakeup_count += 1
                self.wakeupAfter(timePeriod=5, payload='print')
                return None
    

    Scenario 1: Startup Online

    In this scenario, the Thespian ActorSystem is created and initialized while there is an active network connection on the host machine (connected both to the local network and able to ping out to the internet).

    Log Files: actor_log, thesplog

    1. Ensure network is connected and we can ping out to the internet (like 8.8.8.8)
    2. Run python start.py
    3. After logs start coming in to actor_log.log, disable network (disconnect from wifi, unplug ethernet cable, disable network adapter, etc.)
      • At this point, we notice that actor_log.log is no longer receiving any updates while thespian.log continues showing the top actor continuing execution as expected.
    4. Enable/reconnect network whenever you want.
      • After a short time after reconnecting to the network, actor_log.log will receive all of the queued messages that it missed while the network was disconnected.

    In this scenario, it seems that the ActorSystem is queueing all messages to the outward-facing logging actor and waiting for the network connection to reestablish. However this seems odd to me since I would have thought that thespian would use the localhost (127.0.0.1) for all internal operations such as allowing the logging actor to write to its output file. I suppose I don't fully understand the inner workings of the TCP/UDP base.

    My main concern with this is that the thespian internal logging may continue queueing these messages during long periods of network loss and it will cause a system crash. I noticed that the longer I waited before reconnecting the network, the longer it took for the queued messages to get written to the actor_log.log file.

    Scenario 2: Startup Offline

    In this scenario, the Thespian ActorSystem is created and initialized while there is not an active network connection on the host machine (network cable unplugged, wifi not connected, adapter disabled, etc.).

    Log Files: actor_log, thesplog

    1. Ensure network is not connected so that we cannot ping out to the internet.
    2. Run python start.py
    3. We notice that logs will start coming in to actor_log.log.
    4. Connect to network.
      • We will see a "CONNECTED" message in actor_log.log.
    5. Disable/disconnect network connection.
      • We will see a "DISCONNECTED" message in actor_log.log.

    This scenario is how I was expecting the ActorSystem's logging to operate in the event of network loss (such as in scenario 1's case).

    Additional Notes

    With multiprocUDPBase, Scenario 1 just errored out when I lost network connectivity. actor_log, thesplog

    With multiprocUDPBase, Scenario 2 operated the same as with multiprocTCPBase.

    Gists

    Here is a Gist containing the log files that were mentioned in this problem: https://gist.github.com/bryang-spindance/9bfbe442f04aa588f5af20305c187d61

    opened by bryang-spindance 2
  • Invalid Socket Address on Mac with Multiproc bases when disconnected from network.

    Invalid Socket Address on Mac with Multiproc bases when disconnected from network.

    Similar to #28

    I found that on Mac OSX (not tested on Windows), when I disconnect from the network (not internet, but local network) I receive the following error:

    Traceback (most recent call last):
      File "start.py", line 23, in <module>
        asys = ActorSystem('multiprocTCPBase')
      File "/Users/username/project/venv/lib/python3.7/site-packages/thespian/actors.py", line 640, in __init__
        systemBase, capabilities, logDefs)
      File "/Users/username/project/venv/lib/python3.7/site-packages/thespian/actors.py", line 678, in _startupActorSys
        systemBase = sbc(self, logDefs=logDefs)
      File "/Users/username/project/venv/lib/python3.7/site-packages/thespian/system/multiprocTCPBase.py", line 28, in __init__
        super(ActorSystemBase, self).__init__(system, logDefs)
      File "/Users/username/project/venv/lib/python3.7/site-packages/thespian/system/multiprocCommon.py", line 82, in __init__
        self.mpcontext)
      File "/Users/username/project/venv/lib/python3.7/site-packages/thespian/system/transport/TCPTransport.py", line 275, in __init__
        templateAddr     = ActorAddress(TCPv4ActorAddress(None, 0, external = externalAddr))
      File "/Users/username/project/venv/lib/python3.7/site-packages/thespian/system/transport/IPBase.py", line 275, in __init__
        external)
      File "/Users/username/project/venv/lib/python3.7/site-packages/thespian/system/transport/IPBase.py", line 171, in __init__
        raise RuntimeError('Unable to determine valid external socket address.')
    RuntimeError: Unable to determine valid external socket address.
    

    So I went into IPBase.py and printed out the baseaddr it was receiving on line 110.

    class IPActorAddress(object):
        def __init__(self, af, socktype, proto, baseaddr, port, external=False):
            """If external is "truthy", this should be an address that is reachable
               from external nodes.  If external is false, this is usually
               an address that is going to be listened on locally.  For
               example, "0.0.0.0" can be used for a non-external
               listen-any address, but cannot be used for sending messages.
    
               A "truthy" value of external can be an external address to
               try.  Using the address of the Convention Leader (if any)
               is recommended to ensure that the address chosen is
               appropriate for the network supporting the Convention.  By
               default, the address is Go Daddy's public webserver
               address.
            """
           
            print(baseaddr)
    

    Here is where it differs between Mac and Linux. On my Mac, the baseaddr is None. On Linux, the baseaddr prints None and then this class is instantiated again and it prints 127.0.0.1 as expected and it will cycle between printing None and 127.0.0.1 many times but the actor system appears to be working as expected at this point.

    Then I went into TCPTransport.py and printed out the value of externalAddr on line 275. Both Mac and Linux give me the same result of ('', 1900) when disconnected from networks.

    class TCPTransport(asyncTransportBase, wakeupTransportBase):
        "A transport using TCP IPv4 sockets for communications."
    
        def __init__(self, initType, *args):
            super(TCPTransport, self).__init__()
    
            if isinstance(initType, ExternalInterfaceTransportInit):
                # External process that is going to talk "in".  There is
                # no parent, and the child is the systemAdmin.
                capabilities, logDefs, concurrency_context = args
                adminRouting     = False
                self.txOnly      = False  # communications from outside-in are always local and therefore not restricted.
                convAddr = capabilities.get(CONV_ADDR_IPV4_CAPABILITY, '')
                if isinstance(convAddr, list):
                    convAddr = convAddr[0]
                if convAddr and type(convAddr) == type( (1,2) ):
                    externalAddr = convAddr
                elif type(convAddr) == type("") and ':' in convAddr:
                    externalAddr = convAddr.split(':')
                    externalAddr = externalAddr[0], int(externalAddr[1])
                else:
                    externalAddr = (convAddr, capabilities.get('Admin Port', DEFAULT_ADMIN_PORT))
                print('externalAddr:', externalAddr) # <================ HERE IS THE PRINT STATEMENT====================
                templateAddr     = ActorAddress(TCPv4ActorAddress(None, 0, external = externalAddr))
                self._adminAddr  = TCPTransport.getAdminAddr(capabilities)
                self._parentAddr = None
                isAdmin = False
    

    This is the extent to which I have debugged this issue due to project deadlines but I figured I'd make you aware of this issue. In the mean time, I am going to switch my development environment to Linux.

    Thank you for your time.

    opened by bryang-spindance 1
  • ChildActorExited - Manually killing child PID does not result in ChildActorExited Message to Parent

    ChildActorExited - Manually killing child PID does not result in ChildActorExited Message to Parent

    Code

    Below is example code which has reproduced this issue for me.

    Project structure

    .
    ├── app
    │   ├── __init__.py
    │   ├── parent.py
    │   └── child.py
    ├── start.py
    └── stop.py
    

    start.py

    from thespian.actors import ActorSystem
    from app import parent
    
    if __name__ == '__main__':
      asys = ActorSystem('multiprocTCPBase')
      actor = asys.createActor(parent.Parent)
      asys.tell(actor, parent.Initialize())
    

    stop.py

    from thespian.actors import ActorSystem
    
    if __name__ == '__main__':
      ActorSystem('multiprocTCPBase').shutdown()
    

    parent.py

    from thespian.actors import ActorTypeDispatcher
    import os
    
    class Initialize:
      def __init__(self):
        pass
    
    class Parent(ActorTypeDispatcher):
      def __init__(self, *args, **kwargs):
        super(Parent, self).__init__(*args, **kwargs)
        self._child = None
    
      def receiveMsg_Initialize(self, msg, sender):
        print(f'{self.myAddress} (P|:{os.getpid()}) Parent received Initialize command from {sender}')
        self._child = self.createActor('app.child.Child')
        self.send(self._child, Initialize())
    
      def receiveMsg_ChildActorExited(self, msg, sender):
        print(f'In Parent: ChildActorExited')
    

    child.py

    from thespian.actors import ActorTypeDispatcher
    import os
    
    class Child(ActorTypeDispatcher):
      def __init__(self, *args, **kwargs):
        super(Child, self).__init__(*args, **kwargs)
    
      def receiveMsg_Initialize(self, msg, sender):
        print(f'{self.myAddress} (P|:{os.getpid()}) Child received Initialize command from {sender}')
    

    Procedure

    1. Navigate to the root of my project.
    ~$ cd ~/Documents/thespian/test
    
    1. Start the actor system using start.py
    ~/Documents/thespian/test$ python start.py
    
    ActorAddr-(T|:1111) (P|:5555) Parent received Initialize command from ActorAddr-(T|:0000)
    ActorAddr-(T|:2222) (P|:6666) Child received Initialize command from ActorAddr-(T|:1111)
    
    1. Kill the child actor process by its PID.
    ~/Documents/thespian/test$ kill -9 6666
    
    1. After tests, shut down actor system.
    ~/Documents/thespian/test$ python stop.py
    

    Problem

    From the procedure above, upon killing the child process in step 3, it is to my understanding that the parent actor should immediately receive a ChildActorExited message and my example program should print out ChildActorExited to the terminal; this, however, does not happen. Instead the remaining parent actor will stay alive and report nothing.

    I have tested this same functionality on MacOS and Windows with the same results. I also tried using multiprocUDPBase but again got the same results.

    Another thing to note is after killing the child process and running stop.py, the actor system takes a bit longer than usual to shutdown however it does not print any additional information.

    Environment

    • These tests were performed in fresh virtual environments where the only pip package installed was thespian.
    • I have tested this on MacOS Monterey (12.1) as well as Windows 10 (20H2).
      • On both operating systems, I have tested using various python versions (3.7.3, 3.8.6, 3.9.9).
      • I have also tested on a couple versions of Thespian (3.9.11, 3.10.6).

    If you need any additional information, please let me know.

    opened by bryang-spindance 9
  • [EXAMPLE] Thespian director - dockerized cluster working example

    [EXAMPLE] Thespian director - dockerized cluster working example

    hi, during my personal work i prepared some example project that uses Thespian and has following features:

    • cluster of 2 nodes: leader and worker
    • nodes are separated docker containers
    • actor implementation code is loaded into leader node and propagated to worker node through Thespian director
    • actors on worker node are created from actors on leader node
    • built in mechanism for an actor creation on selected worker nodes identified by their individual node id

    requirements:

    • docker (to run example)
    • java with JAVA_HOME variable set (to build project. I use gradle as a build tool and it requires java to run)
    • python 3.8
    • pipenv

    description of steps which happen after running an example:

    • leader node is started
    • worker node is started
    • on leader node Thespian director starts 2 actors: ConventionListener and LeaderActor. After starting it sends to them "start messages"
    • Leader actor starts "wake up" process. It will wake up every 60s to just log: "I am alive" message
    • ConventionListener actor starts listening on ActorSystemConventionUpdate events
    • When worker node starts then ActorSystemConventionUpdate event is emitted and then ConventionListener actor records node id of a started worker node and sends this information to Leader actor (in a WorkerNodeAdded message)
    • Leader actor after receiving WorkerNodeAdded message records node id of started worker node and creates specifically on this node an instance of a WorkerActor. After that it sends a message to this WorkerActor asking it to say hello
    • WorkerActor on worker node after receiving SayHello message just logs hello message

    usage:

    # unpack archive file
    cd ./thespian/example
    vim ./docker/worker-node/THESPIAN_DIRECTOR_DIR/convleader.cfg
    # change string "x.x.x.x" to your real external ip adress (not local host!. Do not remove port from string!)
    ./gradlew install
    ./gradlew build
    ./start
    
    # observe logs:
    # ./docker/leader-node/logs/leader.log
    # ./docker/worker-node/logs/worker.log
    
    ./stop
    

    @kquick feel free to use it as an example in Thespian project if you find it usefull

    thespian-example.zip

    opened by htarnacki 0
  • Different questions about Thespian and its functionalities

    Different questions about Thespian and its functionalities

    hi, i hope you do not mind this type of task, collecting all my questions in one thread


    1. What is the preferable way of enforcing actor system that some named actor always exist? so:
    • we create an actor system
    • then we create first main named actor (A) that we want it to exist forever
    • even in case of an "A" actor crash we need some mechanism to bring it back to alive

    Looking into documentation i see that named actors do not have parents and therefore there is no any actor that could receive "ChildActorExited" message. But i guess that an actor system itself is receiving such a notification? So i am looking for something like this: ActorSystem().createActor(actor_class, globalName=attr, keep_alive=True)


    1. Why there is no "ask" method on an actor class? There is an actor system method "ask" but no corresponding method on an actor class. What is the reason of this? Without that how should look like normal processing like this:
    • start processing of received message A
    • ask some other actor for some additional data needed to process message A
    • after receiving additional data process message A

    Without something like "await" are we going to some kind of callback hell? (or a message/response hell? ;-)) or is there something like: self.await( BankAccountData, self.send(BankActorAddress, AskForBankAccountData(client_number) )

    self.await would stop an actor processing until there is a message of type BankAccountData that an actor receive from BankActorAddress


    opened by htarnacki 13
Owner
Kevin Quick
Kevin Quick
Asynchronous Advantage Actor-Critic in PyTorch

Asynchronous Advantage Actor-Critic in PyTorch This is PyTorch implementation of A3C as described in Asynchronous Methods for Deep Reinforcement Learn

Reiji Hatsugai 38 Dec 12, 2022
PyTorch implementation of Advantage async actor-critic Algorithms (A3C) in PyTorch

Advantage async actor-critic Algorithms (A3C) in PyTorch @inproceedings{mnih2016asynchronous, title={Asynchronous methods for deep reinforcement lea

LEI TAI 111 Dec 8, 2022
PyTorch implementation of Advantage Actor Critic (A2C), Proximal Policy Optimization (PPO), Scalable trust-region method for deep reinforcement learning using Kronecker-factored approximation (ACKTR) and Generative Adversarial Imitation Learning (GAIL).

PyTorch implementation of Advantage Actor Critic (A2C), Proximal Policy Optimization (PPO), Scalable trust-region method for deep reinforcement learning using Kronecker-factored approximation (ACKTR) and Generative Adversarial Imitation Learning (GAIL).

Ilya Kostrikov 3k Dec 31, 2022
Softlearning is a reinforcement learning framework for training maximum entropy policies in continuous domains. Includes the official implementation of the Soft Actor-Critic algorithm.

Softlearning Softlearning is a deep reinforcement learning toolbox for training maximum entropy policies in continuous domains. The implementation is

Robotic AI & Learning Lab Berkeley 997 Dec 30, 2022
Advantage Actor Critic (A2C): jax + flax implementation

Advantage Actor Critic (A2C): jax + flax implementation Current version supports only environments with continious action spaces and was tested on muj

Andrey 3 Jan 23, 2022
Multi-task Multi-agent Soft Actor Critic for SMAC

Multi-task Multi-agent Soft Actor Critic for SMAC Overview The CARE formulti-task: Multi-Task Reinforcement Learning with Context-based Representation

RuanJingqing 8 Sep 30, 2022
A python library for face detection and features extraction based on mediapipe library

FaceAnalyzer A python library for face detection and features extraction based on mediapipe library Introduction FaceAnalyzer is a library based on me

Saifeddine ALOUI 14 Dec 30, 2022
Python wrappers to the C++ library SymEngine, a fast C++ symbolic manipulation library.

SymEngine Python Wrappers Python wrappers to the C++ library SymEngine, a fast C++ symbolic manipulation library. Installation Pip See License section

null 136 Dec 28, 2022
Fast image augmentation library and easy to use wrapper around other libraries. Documentation: https://albumentations.ai/docs/ Paper about library: https://www.mdpi.com/2078-2489/11/2/125

Albumentations Albumentations is a Python library for image augmentation. Image augmentation is used in deep learning and computer vision tasks to inc

null 11.4k Jan 9, 2023
Bayesian optimisation library developped by Huawei Noah's Ark Library

Bayesian Optimisation Research This directory contains official implementations for Bayesian optimisation works developped by Huawei R&D, Noah's Ark L

HUAWEI Noah's Ark Lab 395 Dec 30, 2022
Technical Indicators implemented in Python only using Numpy-Pandas as Magic - Very Very Fast! Very tiny! Stock Market Financial Technical Analysis Python library . Quant Trading automation or cryptocoin exchange

MyTT Technical Indicators implemented in Python only using Numpy-Pandas as Magic - Very Very Fast! to Stock Market Financial Technical Analysis Python

dev 34 Dec 27, 2022
Theano is a Python library that allows you to define, optimize, and evaluate mathematical expressions involving multi-dimensional arrays efficiently. It can use GPUs and perform efficient symbolic differentiation.

============================================================================================================ `MILA will stop developing Theano <https:

null 9.6k Dec 31, 2022
Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow

eXtreme Gradient Boosting Community | Documentation | Resources | Contributors | Release Notes XGBoost is an optimized distributed gradient boosting l

Distributed (Deep) Machine Learning Community 23.6k Dec 31, 2022
A python library for self-supervised learning on images.

Lightly is a computer vision framework for self-supervised learning. We, at Lightly, are passionate engineers who want to make deep learning more effi

Lightly 2k Jan 8, 2023
💡 Learnergy is a Python library for energy-based machine learning models.

Learnergy: Energy-based Machine Learners Welcome to Learnergy. Did you ever reach a bottleneck in your computational experiments? Are you tired of imp

Gustavo Rosa 57 Nov 17, 2022
Lightweight, Python library for fast and reproducible experimentation :microscope:

Steppy What is Steppy? Steppy is a lightweight, open-source, Python 3 library for fast and reproducible experimentation. Steppy lets data scientist fo

minerva.ml 134 Jul 10, 2022
GPU-Accelerated Deep Learning Library in Python

Hebel GPU-Accelerated Deep Learning Library in Python Hebel is a library for deep learning with neural networks in Python using GPU acceleration with

Hannes Bretschneider 1.2k Dec 21, 2022
PyBrain - Another Python Machine Learning Library.

PyBrain -- the Python Machine Learning Library =============================================== INSTALLATION ------------ Quick answer: make sure you

null 2.8k Dec 31, 2022