🎛
lazycluster
Distributed machine learning made simple.
Use your preferred distributed ML framework like a lazy engineer.
Getting Started • Highlights • Features • API Docs • Support • Report a Bug • Contribution
lazycluster is a Python library intended to liberate data scientists and machine learning engineers by abstracting away cluster management and configuration so that they are able to focus on their actual tasks. Especially, the easy and convenient cluster setup with Python for various distributed machine learning frameworks is emphasized.
Highlights
- High-Level API for starting clusters:
- Lower-level API for:
- Managing Runtimes or RuntimeGroups to:
- A-/synchronously execute RuntimeTasks by leveraging the power of ssh
- Expose services (e.g. a DB) from or to a
Runtime
or in a wholeRuntimeGroup
- Managing Runtimes or RuntimeGroups to:
- Command line interface (CLI)
- List all available
Runtimes
- Add a
Runtime
configuration - Delete a
Runtime
configuration
- List all available
Concept Definition: Runtime
A
Runtime
is the logical representation of a remote host. Typically, the host is another server or a virtual machine / container on another server. This python class provides several methods for utilizing remote resources such as the port exposure from / to aRuntime
as well as the execution of RuntimeTasks. ARuntime
has a working directory. Usually, the execution of aRuntimeTask
is conducted relatively to this directory if no other path is explicitly given. The working directory can be manually set during the initialization. Otherwise, a temporary directory gets created that might eventually be removed.
Concept Definition: RuntimeGroup
A
RuntimeGroup
is the representation of logically relatedRuntimes
and provides convenient methods for managing those relatedRuntimes
. Most methods are wrappers around their counterparts in theRuntime
class. Typical usage examples are exposing a port (i.e. a service such as a DB) in theRuntimeGroup
, transfer files, or execute aRuntimeTask
on theRuntimes
. Additionally, all concrete RuntimeCluster (e.g. the HyperoptCluster) implementations rely onRuntimeGroups
for example.
The
manager
refers to the host where you are actually using the lazycluster library, since all desired lazycluster entities are managed from here. Caution: It is not to be confused with the RuntimeManager class.
Concept Definition: RuntimeTask
A
RuntimeTask
is a composition of multiple elemantary task steps, namelysend file
,get file
,run command
(shell),run function
(python). ARuntimeTask
can be executed on a remote host either by handing it over to aRuntime
object or standalone by handing over a fabric Connection object to the execute method of theRuntimeTask
. Consequently, all invididual task steps are executed sequentially. Moreover, aRuntimeTask
object captures the output (stdout/stderr) of the remote execution in its execution log. An example for aRuntimeTask
could be to send a csv file to aRuntime
, execute a python function that is transforming the csv file and finally get the file back.
Getting started
Installation
pip install lazycluster
# Most up-to-date development version
pip install --upgrade git+https://github.com/ml-tooling/lazycluster.git@develop
Prerequisites
For lazycluster usage on the manager:
-
Unix based OS
-
Python >= 3.6
-
ssh client (e.g. openssh-client)
-
Passwordless ssh access to the
Runtime
hosts (recommended)Configure passwordless ssh access (click to expand...)
- Create a key pair on the manager as described here or use an existing one
- Install lazycluster on the manager
- Create the ssh configuration for each host to be used as Runtime by using the lazycluster CLI command
lazycluster add-runtime
as described here and do not forget to specify the--id-file
argument. - Finally, enable the passwordless ssh access by copying the public key to each Runtime as descibed here
Runtime host requirements:
- Unix based OS
- Python >= 3.6
- ssh server (e.g. openssh-server)
Note:
Passwordless ssh needs to be setup for the hosts to be used as Runtimes for the most convenient user experience. Otherwise, you need to pass the connection details to Runtime.__init__ via connection_kwargs. These parameters will be passed on to the fabric.Connection.
Usage example high-level API
Start a Dask cluster.
from lazycluster import RuntimeManager
from lazycluster.cluster.dask_cluster import DaskCluster
# Automatically generate a group based on the ssh configuration
runtime_manager = RuntimeManager()
runtime_group = runtime_manager.create_group()
# Start the Dask cluster instances using the RuntimeGroup
dask_cluster = DaskCluster(runtime_group)
dask_cluster.start()
# => Now, you can start using the running Dask cluster
# Get Dask client to interact with the cluster
# Note: This will give you a dask.distributed.Client which is not
# a lazycluster cluster but a Dask one instead
client = cluster.get_client()
Usage example lower-level API
Execute a Python function on a remote host and access the return data.
from lazycluster import RuntimeTask, Runtime
# Define a Python function which will be executed remotely
def hello(name:str):
return 'Hello ' + name + '!'
# Compose a `RuntimeTask`
task = RuntimeTask('my-first_task').run_command('echo Hello World!') \
.run_function(hello, name='World')
# Actually execute it remotely in a `Runtime`
task = Runtime('host-1').execute_task(task, execute_async=False)
# The stdout from from the executing `Runtime` can be accessed
# via the execution log of the `RuntimeTask`
task.print_log()
# Print the return of the `hello()` call
generator = task.function_returns
print(next(generator))
Support
The lazycluster project is maintained by Jan Kalkan. Please understand that we won't be able to provide individual support via email. We also believe that help is much more valuable if it's shared publicly so that more people can benefit from it.
Type | Channel |
---|---|
|
|
|
|
|
|
|
Features
CLI
) to manage local ssh configuration to enable Runtime
usage
Use the Command Line Interface (Details (click to expand...)
For a full list of CLI commands please use lazycluster --help
. For the help of a specific command please use lazycluster COMMAND --help
.
List all available runtimes incl. additional information like cpu, memory, etc.
Moreover, also incative hosts will be shown. Inactive means, that the host could not be reached via ssh and instantiated as a valid Runtime.
# Will print a short list of active / inactive Runtimes
lazycluster list-runtimes
# will print a list of active / inactive Runtimes incl. additional host information
# Note: This is slower as compared to omittin the -l option
lazycluster list-runtimes -l
Add host to ssh config
The host is named localhost
for user root
accessible on localhost
port 22
using the private key file found under ~/.ssh/id_rsa.
Note: Add command will only add the ssh configuration on the manager. For a complete guide on how to setup passwordless ssh check the prerequisites section.
lazycluster add-runtime localhost root@localhost:22 --id_file ~/.ssh/id_rsa
Runtime
Delete the ssh config of Note: Corresponding remote ikernel will be deleted too if present.
lazycluster delete-runtime host-1
Runtimes
& RuntimeGroups
Create Details (click to expand...)
A Runtime
has a working directory. Usually, the execution of a RuntimeTask is conducted relatively to this directory if no other path is explicitly given. The working directory can be manually set during the initialization. Otherwise, a temporary directory gets created that might eventually be removed.
from lazycluster import Runtime, RuntimeGroup
rt_1 = Runtime('host-1')
rt_2 = Runtime('host-2', working_dir='/workspace')
# In this case you get a group where both Runtimes have different working directories.
# The working directory on host-1 will be a temp one and gets removed eventually.
runtime_group = RuntimeGroup([rt_1, rt_2])
# Here, the group internally creates Runtimes for both hosts and sets its working directory.
runtime_group = RuntimeGroup(hosts=['host-1', 'host-2'], working_dir='/workspace')
Moreover, you can set environment variables for the Runtimes. These variables can then be accessed when executing a Python function on the Runtime or executing a shell command. Per default the working directory is set as an env variable and the class constant Runtime.WORKING_DIR_ENV_VAR_NAME
will give you the name of the variable. The working directory is always accessible also if manually update the env_variables.
# Directly set the env vars per Runtimes
rt = Runtime('host-1')
rt.env_variables = {'foo': 'bar'}
# Or use the convenient method to the the env vars
# for all Runtimes in a RuntimeGroup
runtime_group = RuntimeGroup(hosts=['host-1', 'host-2'])
group.set_env_variables({'foo': 'bar'})
RuntimeManager
to create a RuntimeGroup
based on the manager's ssh config
Use the Details (click to expand...)
The RuntimeManager can automatically detect all available Runtimes based on the manager's local ssh config and eventually create a necessary RuntimeGroup for you.
from lazycluster import RuntimeManager, RuntimeGroup
runtime_group = RuntimeManager().create_group()
Dask cluster for scalable analytics
Start aDetails (click to expand...)
Most simple way to use Dask in a cluster based on a RuntimeGroup created by the RuntimeManager. The RuntimeManager
can automatically detect all available Runtimes based on the manager's ssh config and eventually create a necessary RuntimeGroup
for you. This RuntimeGroup
is then handed over to DaskCluster during initialization.
The DASK scheduler
instance gets started on the manager. Additionally, multiple DASK worker
processes get started in the RuntimeGroup
, i.e. in the Runtimes
. The default number of workers is equal to the number of Runtimes
in the RuntimeGroup
.
Prerequisite: Please make sure that you have Dask installed on the manager. This can be done using pip install -q "dask[complete]"
.
Details (click to expand...)
from lazycluster import RuntimeManager
from lazycluster.cluster.dask_cluster import DaskCluster
# 1st: Create a RuntimeGroup, e.g. by letting the RuntimeManager detect
# available hosts (i.e. Runtimes) and create the group for you.
runtime_group = RuntimeManager().create_group()
# 2nd: Create the DaskCluster instance with the RuntimeGroup.
cluster = DaskCluster(runtime_group)
# 3rd: Let the DaskCluster instantiate all entities on Runtimes
# of the RuntimeGroup using default values. For custom
# configuration check the DaskCluster API documentation.
cluster.start()
# => Now, all cluster entities should be started and you can simply use
# it as documented in the hyperopt documentation.
Test the cluster setup
# Define test functions to be executed in parallel via DASK
def square(x):
return x ** 2
def neg(x):
return -x
# Get a DASK client instance
client = cluster.get_client()
# Execute the computation
A = client.map(square, range(10))
B = client.map(neg, A)
total = client.submit(sum, B, )
res = total.result()
print('Result: ' + str(res))
Use different strategies for launching the master and the worker instances.
Details (click to expand...)
Use different strategies for launching the master and the worker instance by providing custom implementation of lazycluster.cluster.MasterLauncher
and lazycluster.cluster.WorkerLauncher
. The default implementations are lazycluster.cluster.dask_cluster.LocalMasterLauncher
and lazycluster.cluster.dask_cluster.RoundRobinLauncher
.
cluster = DaskCluster(RuntimeManager().create_group(),
MyMasterLauncherImpl(),
MyWorkerLauncherImpl())
cluster.start()
Hyperopt
Distributed hyperparameter tuning withDetails (click to expand...)
Most simple way to use Hyperopt in a cluster based on a RuntimeGroup created by the RuntimeManager. The RuntimeManager
can automatically detect all available Runtimes based on the manager's ssh config and eventually create a necessary RuntimeGroup
for you. This RuntimeGroup
is then handed over to HyperoptCluster during initialization.
A MongoDB instance gets started on the manager. Additionally, multiple hyperopt worker
processes get started in the RuntimeGroup
, i.e. on the contained Runtimes
. The default number of workers is equal to the number of Runtimes
in the RuntimeGroup
.
Prerequisites:
- MongoDB server must be installed on the manager.
- Note: When using the ml-workspace as the
master
then you can use the provided install script for MongoDB which can be found under/resources/tools
.
- Note: When using the ml-workspace as the
- Hyperopt must be installed on all
Runtimes
where hyperopt workers will be started- Note: When using the ml-workspace as hosts for the
Runtimes
then hyperopt is already pre-installed.
- Note: When using the ml-workspace as hosts for the
Launch a cluster (click to expand...)
For a detailed documentation of customizing options and default values check out the API docs
from lazycluster import RuntimeManager
from lazycluster.cluster.hyperopt_cluster import HyperoptCluster
# 1st: Create a RuntimeGroup, e.g. by letting the RuntimeManager detect
# available hosts (i.e. Runtimes) and create the group for you.
runtime_group = RuntimeManager().create_group()
# 2nd: Create the HyperoptCluster instance with the RuntimeGroup.
cluster = HyperoptCluster(runtime_group)
# 3rd: Let the HyperoptCluster instantiate all entities on Runtimes of the RuntimeGroup using default values. For custom
# configuration check the HyperoptCluster API documentation.
cluster.start()
# => Now, all cluster entities should be started and you can simply use
# it as documented in the hyperopt documentation. We recommend to call
# cluster.cleanup() once you are done.
Test the cluster setup using the simple example to minimize the sin function.
Note: The call to fmin
is also done on the manager. The objective_function
gets sent to the hyperopt workers by fmin via MongoDB. So there is no need to trigger the execution of fmin
or the objective_function
on the individual Runtimes
. See hyperopt docs for detailed explanation.
import math
from hyperopt import fmin, tpe, hp
from hyperopt.mongoexp import MongoTrials
# You can retrieve the the actual url required by MongoTrials form the cluster instance
trials = MongoTrials(cluster.mongo_trial_url, exp_key='exp1')
objective_function = math.sin
best = fmin(objective_function, hp.uniform('x', -2, 2), trials=trials, algo=tpe.suggest, max_evals=10)
# Ensures that MongoDB gets stopped and other resources
cluster.cleanup()
Now, we will cenceptually demonstrate how to use lazycluster
w/ hyperopt to optimize hyperparameters of a fasttext model. Note, this should not be a fasttext demo and thus the actual usage of fasttext is not optimized. Thus, you should read the related docs for this purpose. The example should just highlight how to get fasttext up and running in a distributed setting using lazycluster.
from lazycluster import RuntimeManager
from lazycluster.cluster.hyperopt_cluster import HyperoptCluster
import os
# 1st: Create a RuntimeGroup, e.g. by letting the RuntimeManager detect
# available hosts (i.e. Runtimes) and create the group with a persistent
# working directory for you.
runtime_group = RuntimeManager().create_group(working_dir='~/hyperopt')
# 2nd: Send the training - and test dataset to all Runtimes
path_to_datasets = '/path_on_manager'
train_file_name = 'train.csv'
train_path = os.path.join(path_to_datasets, train_file_name)
test_file_name = 'train.csv'
test_path = os.path.join(path_to_datasets, test_file_name)
# Per default the file will be send asynchronously to Runtime's working directory
runtime_group.send_file(train_file_name)
runtime_group.send_file(test_file_name)
# 3rd: Create the HyperoptCluster instance with the RuntimeGroup.
cluster = HyperoptCluster(runtime_group)
# 4th: Let the HyperoptCluster instantiate all entities on
# Runtimes of the RuntimeGroup using default values.
# For custom configuration check the HyperoptCluster API documentation.
cluster.start()
# 5th: Ensure that the processes for sending the files terminated already,
# since we sent the files async in 2nd step.
runtime_group.join()
# => Now, all cluster entities are started, datasets transferred, and you
# can simply use the lcuster as documented in the hyperopt documentation.
# 6th: Define the objective function to be minimized by Hyperopt in order to find the
# best hyperparameter combination.
def train(params):
import fasttext
import os
train_path = os.path.join(os.environ['WORKING_DIR'], params['train_set_file_name'])
test_path = os.path.join(os.environ['WORKING_DIR'], params['test_set_file_name'])
model = fasttext.train_supervised(
input = train_path,
lr = float(params['learning_rate']),
dim = int(params['vector_dim']),
ws = int(params['window_size']),
epoch = int(params['epochs']),
minCount = int(params['min_count']),
neg = int(params['negativ_sampling']),
t = float(params['sampling']),
wordNgrams = 1, # word ngrams other than 1 crash
bucket = int(params['bucket']),
pretrainedVectors = str(params['pretrained_vectors']),
lrUpdateRate = int(params['lr_update_rate']),
thread = int(params['threads']),
verbose = 2
)
number_of_classes, precision, recall = model.test(test_path)
f1 = 2 * ((precision * recall) / (precision + recall))
# Return value must be negative because hyperopt's fmin tries to minimize the objective
# function. You can think of it as minimizing an artificial loss function.
return -1 * f1
from hyperopt import fmin, tpe, hp
from hyperopt.mongoexp import MongoTrials
# 7th: Define the searh space for the paramters to be optimized. Check further functions
# of Hyperopt's hp module that might suit your specific requirement. This should just
# give you an idea and not show how to best use fasttext.
search_space = {
'min_count': hp.quniform('min_count', 2, 20, 1),
'window_size': hp.quniform('window_size', 4, 15, 1),
'vector_dim': hp.quniform('vector_dim', 100, 300, 1),
'learning_rate': 0.4,
'lr_update_rate': 100,
'negativ_sampling': hp.quniform('negativ_sampling', 5, 20, 1),
'sampling': hp.uniform('sampling', 0, 10**-3),
'bucket': 2000000,
'epochs': hp.quniform('epochs', 3, 30, 1),
'pretrained_vectors': '',
'threads': 8,
'train_set_file_name': train_file_name,
'test_set_file_name': test_file_name
}
# 8th: Actually, execute the hyperparameter optimization. Use the mongo_trial_url
# property of your HyperoptCluster instance to get the url in the format
# required by MongoTrials.
trials = MongoTrials(cluster.mongo_trial_url, exp_key='exp1')
best = fmin(train, search_space, tpe.suggest, 500, trials)
print(best)
Debugging (click to expand...)
In general you should read the Logging, exception handling and debugging section first so that you are aware of the general options lazycluster offers for debugging.
So the first step is to successfully launch a Hyperopt cluster by using the corresponding lazycluster class. If you experience problems until this point you should analyze the exceptions which should guide you forward to a solution. If this given error is not self explaining then please consider to provide meaningful feedback here so that it will be soon. Common problems until the cluster is started are:
- MongoDB or hyperopt are not installed, i.e. the prerequisites are not yet fulfilled. => Ensure that the prerequisites are fulfilled. Consider using ml-workspace to get rid of dependency problems.
- MongoDB is already running (under the same dbpath). This might especially happen if you started a cluster before and the cleanup did not happen correctly. Usually, the cleanup should happen atexit but sometimes it simply does not work depending on your execution environment. => to prevent this problem you can and should explicitly call the
cleanup()
method of theHyperoptCluster
instance => to solve the problem if MongoDB is still running just typelsof -i | grep mongod
into a terminal. Finally, use thekill pid
command with the process ID you got from issuing the previous command.
Once the Hyperopt cluster is running, you can start using it. It should be noted, that the following is mainly about finding Hyperopt related issues since lazycluster basically did its job already. Typically, this means you have a bug in your objective function that you try to minimize with Hyperopt.
First, you could use the print_log()
method of your hyperopt to check the execution log. If you can't find any error here, then check the execution log files or redirect the execution log from files to stdout of the manager by setting debug=True
in the start methods of the HyperoptCluster class.
Alternatively, you can ssh into one of your Runtimes
and manually start a hyperopt-worker process. You can find the respective shell command in the hyperopt docs. Moreover, you can get the necessary url for the --mongo
argument by accessing the python property mongo_url
from your HyperoptCluster
instance once its running. Consequently, the newly started worker will poll a job from the master (i.e. MongoDB) and start its execution. Now you should see the error in the terminal once it occurs.
We found two common bug types related to the objective function. First, make sure that the hyper-/parameters you are passing to your model have the correct datatypes. Sounds trivial, right? :)
Next, you typically use some training - and test dataset on your Runtimes inside your objective function. So the correct file paths may be a bit tricky at first. You should understand that the objective function gets communicated to the hyperopt worker processes by fmin()
via MongoDB. Consequently, the objective function gets executed as it is on the Runtimes and the paths must exist on the Runtimes
. The Runtime's
working directory as documented in the API docs is of interest here. It should be noted, that the path of this directory is available on the Runtimes. Consequently, we recommend that you manually set a working directory on your Runtimes
and move the training - and test dataset files relative to the working directory. This can also be done on RuntimeGroup
level. Now, you can create a relative path to the files inside your objective_function with os.path.join(os.environ['WORKING_DIR'], 'relative_file_path')
. Note: The advantage of manually setting a working directory in this case is that a manually set working directory does not get removed at the end. Consequently, you do not need to move the files each time you start the execution. This hint can safe you quite a lot of time especially when you need to restart the exectuion mutliple times while debugging.
Use different strategies for launching the master and the worker instances.
Details (click to expand...)
Use different strategies for launching the master and the worker instances by providing custom implementation of lazycluster.cluster.MasterLauncher
and lazycluster.cluster.WorkerLauncher
. The default implementations are lazycluster.cluster.hyperopt_cluster.LocalMongoLauncher
and lazycluster.cluster.hyperopt_cluster.RoundRobinLauncher
.
cluster = HyperoptCluster(RuntimeManager().create_group(),
MyMasterLauncherImpl(),
MyWorkerLauncherImpl())
cluster.start()
Expose services
Details (click to expand...)
Runtime
Expose a service from a A DB is running on a remote host on port runtime_port
and the DB is only accessible from the remote host. But you also want to access the service from the manager on port local_port
. Then you can use this method to expose the service which is running on the remote host to the manager.
Details (click to expand...)
from lazycluster import Runtime
# Create a Runtime
runtime = Runtime('host-1')
# Make the port 50000 from the Runtime accessible on localhost
runtime.expose_port_from_runtime(50000)
# Make the local port 40000 accessible on the Runtime
runtime.expose_port_to_runtime(40000)
Runtime
Expose a service to a A DB is running on the manager on port local_port
and the DB is only accessible from the manager. But you also want to access the service on the remote Runtime
on port runtime_port
. Then you can use this method to expose the service which is running on the manager to the remote host.
Details (click to expand...)
from lazycluster import Runtime
# Create a Runtime
runtime = Runtime('host-1')
# Make the port 50000 from the Runtime accessible on localhost
runtime.expose_port_from_runtime(50000)
# Make the local port 40000 accessible on the Runtime
runtime.expose_port_to_runtime(40000)
Service exposure
Now, we extend the previous example by using a RuntimeGroup
instead of just a single Runtime
. This means we want to expose a service which is running on the manager to a group of Runtimes
.
Details (click to expand...)
from lazycluster import RuntimeGroup
# Create a RuntimeGroup
runtime_group = RuntimeGroup('host1', 'host-2', 'host-3')
# Make the local port 50000 accessible on all Runtimes in the RuntimeGroup.
runtime_group.expose_port_to_runtimes(50000)
# Note: The port can also be exposed to a subset of the Runtimes by using the
# method parameter exclude_hosts.
runtime_group.expose_port_to_runtimes(50000, exclude_hosts='host-3')
Runtime
to the other Runtimes
in the RuntimeGroup
Expose a service from a Assume you have service which is running on Runtime host-1
. Now, you can expose the service to the remaining Runtimes
in the RuntimeGroup.
Details (click to expand...)
from lazycluster import RuntimeGroup
# Create a RuntimeGroup
runtime_group = RuntimeGroup('host1', 'host-2', 'host-3')
# Make the port 40000 which is running on host-1 accessible on all other Runtimes in the RuntimeGroup
runtime_group.expose_port_from_runtime_to_group('host-1', 40000)
File Transfer
Details (click to expand...)
A RuntimeTask
is capable of sending a file from the manager to a Runtime
or vice versa. Moreover, the Runtime
class as well as the RuntimeGroup
provide convenient methods for this purpose that internally creates the RuntimeTasks
for you.
In the following example, the file.csv
will be transferred to the Runtime's
working directory. Another path on the Runtime can be specified by supplying a remote_path
as argument. See Runtime docs for further details on the working directory.
from lazycluster import RuntimeTask, Runtime
task = RuntimeTask('file-transfer')
task.send_file('local_path/file.csv')
runtime = Runtime('host-1')
runtime.execute_task(task, exec_async=False)
The explicit creation of a RuntimeTask
is only necessary if you intend to add further steps to the RuntimeTask
instead of just transferring a file. For example, you want to send a file, execute a Python function, and transfer the file back. If not, you can use the file transfer methods of the Runtime
or RuntimeGroup
. In the case of sending a file to a RuntimeGroup
you should send the files asynchronously. Otherwise, each file will be transferred sequentially. Do not forget to call join()
, if you need the files to be transferred before proceeding.
from lazycluster import RuntimeTask, Runtime, RuntimeGroup, RuntimeManager
# Send a file to a single Runtime
runtime = Runtime('host-1')
send_file('local_path/file.csv', execute_async=False)
# Send a file to a whole RuntimeGroup
group = RuntimeManager().create_group()
group.send_file('local_path/file.csv', execute_async=True)
group.join()
The usage of get_file is similar and documented here.
Simple preprocessing example
Details (click to expand...)
Read a local CSV file (on the manager) and upper case chunks in parallel using RuntimeTasks and a RuntimeGroup.
from typing import List
import pandas as pd
from lazycluster import RuntimeTask, RuntimeManager
# Define the function to be executed remotely
def preprocess(docs: List[str]):
return [str(doc).lower() for doc in docs]
file_path = '/path/to/file.csv'
runtime_group = RuntimeManager().create_group()
tasks = []
# Distribute chunks of the csv and start the preprocessing in parallel in the RuntimeGroup
for df_chunk in pd.read_csv(file_path, sep=';', chunksize=500):
task = RuntimeTask().run_function(preprocess, docs=df_chunk['text'].tolist())
tasks.append(runtime_group.execute_task(task))
# Wait until all executions are done
runtime_group.join()
# Get the return data and print it
index = 0
for chunk in runtime_group.function_returns:
print('Chunk: ' + str(index))
index += 1
print(chunk)
Logging, exception handling and debugging
Details (click to expand...)
lazycluster
aims to abstract away the complexity implied by using multiple distributed Runtimes and provides an intuitive high level API fur this purpose. The lazycluster manager orchestrates the individual components of the distributed setup. A common use case could be to use lazycluster in order to launch a distributed hyperopt cluster. In this case, we have the lazycluster manager, that starts a MongoDB instance, starts the hyperopt worker processes on multiple Runtimes
and ensures the required communication via ssh between these instances. Each individual component could potentially fail including the 3rd party ones such as hyperopt workers. Since lazycluster
is a generic library and debugging a distributed system is an instrinsically non-trivial task, we tried to emphasize logging and good exception handling practices so that you can stay lazy.
Standard Python log
We use the standard Python logging module in order to log everything of interest that happens on the manager.
Details (click to expand...)
Per default we recommend to set the basicConfig log level to logging.INFO
. Consequently, you will get relevant status updates about the progress of launching a cluster for example. Of course, you can adjust the log level to logging.DEBUG
or anything you like.
We like to use the following basic configuration when using lazycluster in a Jupyter notebook:
import logging
logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.INFO)
Note: Some 3rd party libraries produce a lot of INFO messages, which are usually not of interest for the user. This is particular true for Paramiko. We base most ssh handling on Fabric which is based on Paramiko. We decided to set the log level for these libraries to logging.Error
per default. This happens in the __init__.py
module of the lazycluster package. And will be set once when importing the first module or class from lazycluster
. If you want to change the log level of 3rd party libs you can set it the following way:
import logging
from lazycluster import Environment
# Effects logs of all libraries that were initially set to logging.ERROR
lazycluster.Environment.set_third_party_log_level(logging.INFO)
# Of course, you can set the log level manually for each library / module
logging.getLogger('paramiko').setLevel(logging.DEBUG)
logging.getLogger('lazycluster').setLevel(logging.INFO)
See set_third_party_log_level()
of the Environment class for a full list of affected libraries.
Execution log
The execution log aims to provide a central access point to output produced on the Runtimes.
Details (click to expand...)
This type of log contains mainly the stdout/stderr produced when executing a RuntimeTask on a Runtime. If you are new to lazycluster or you never used the lower level API directly, then you might think the execution log is not relevant for you. But it is :) Also the concrete cluster implementations (e.g. DaskCluster or HyperoptCluster) are built on top of the lower-level API. You can think of it as the kind of log which you can use to understand what actually happened on your Runtimes
. You can access the execution log in 3 different ways.
The 1st option is by accessing the excution log files. The stdout/stderr generated on the Runtimes
is streamed to log files. The respective directory is per default ./lazycluster/execution_log
on the manager. The log directory contains a subfolder for each Runtime (i.e. host) that executed at least one RuntimeTask
. Inside a Runtime folder you will find one log file per executed RuntimeTask. Each logfile name is generated by concatenating the name of the RuntimeTask
and a current timestamp. You can configure the path were the log directory gets created by adjusting the lazycluster main directory. See Environment for this purpose. Moreover, the respective file path can be programmatically accessed via RuntimeTask.execution_log_file_path
. This property gets updated each time the RuntimeTask
gets executed.
The 2nd option is to redirect the execution log (i.e. stdout/stderr from the Runtimes) to the stdout of the manager. Hereby, you can quickly spot errors. The drawback here is that you can not directly distinguish which Runtime generated which output, since the output of potentially multiple Runtimes is directly streamed to the manager's stdout as it occurs. To enable this feature you need to pass on the debug
flag to the respective methods (i.e. RuntimeTask.execute(), Runtime.execute_task(), RuntimeGroup.execute_task()). All cluster related start()
methods (e.g. HyperoptCluster.start()
, DaskCluster.start()
etc.) provide the debug option too. Example:
from lazycluster import RuntimeGroup, RuntimeTask
task = RuntimeTask('debug-test').run_command('python --version')
group = RuntimeGroup(hosts=['gaia-1', 'gaia-2'])
tasks = group.execute_task(task, debug=True)
The 3rd option is to access the execution_log
property of a RuntimeTask
. Additionally, the Runtime
as well as the RuntimeGroup
provide a print_log()
function which prints the execution_log
of the RuntimeTasks
that were executed on the Runtimes
. The execution_log
property is a list and can be accessed via index. Each log entry corresponds to the output of a single (fully executed) step of a RuntimeTask
. This means the stdout/stderr is not streamed to the manager can only be accessed after its execution. This kind of log might be useful if you need to access the ouput of a concrete RuntimeTask
step programmatically. See the concept definition and the class documentation of the RuntimeTask
for further details.
Note: It should be noted that RuntimeTask.run_function()
is actually not a single task step. A call to this method will produce multiple steps, since the Python function that needs to be executed will be send as a pickle file to the remote host. There it gets unpickled, executed and the return data is sent back as a pickle file. This means if you intend to access the exectution log you should be aware that the log contains multiple log entries for the run_function()
call. But the number of steps per call is fixed. Moreover, you should think about using the return value of a a remotely executed Python function instead of using the execution log for this purpose.
from lazycluster import Runtime, RuntimeTask
# Create the task
task = RuntimeTask('exec-log-demo')
# Add 2 individual task steps
task.run_command('echo Hello')
task.run_command('echo lazycluster!')
# Create a Runtime
runtime = Runtime('host-1')
# Execute the task remotely on the Runtime
runtime.execute_task(task)
# Access th elog per index
print(task.execution_log[0]) # => 'Hello'
print(task.execution_log[1]) # => 'lazycluster!'
# Let the Runtime print the log
# an equivalent method exists for RuntimeGroup
runtime.print_log()
Exception handling
Details (click to expand...)
Our exception handling concept follows the idea to use standard python classes whenever appropriate. Otherwise, we create a library specific error (i.e. exception) class.
Each created error class inherits from our base class LazyclusterError which in turn inherits from Pythons's Exception class. We aim to be informative as possible with our used exceptions to guide you to a solution to your problem. So feel encouraged to provide feedback on misleading or unclear error messages, since we strongly believe that guided errors are essential so that you can stay as lazy as possible.
Contribution
- Pull requests are encouraged and always welcome. Read
CONTRIBUTING.md
and check out help-wanted issues. - Submit github issues for any feature enhancements, bugs, or documentation problems.
- By participating in this project you agree to abide by its Code of Conduct.
Licensed Apache 2.0. Created and maintained with