Worker API
Base API and schedulers
The main worker functions are wrapped in an R6 class with
the name of QSys
. This provides a standardized API to the
lower-level
messages that are sent via ZeroMQ.
The base class itself is derived in scheduler classes that add the required functions for submitting and cleaning up jobs:
+ QSys
|- Multicore
|- LSF
+ SGE
|- PBS
|- Torque
|- etc.
The user-visible object is a worker Pool
that wraps
this, and will eventually allow to manage different workers.
Workers
Creating a worker pool
A pool of workers can be created using the workers()
function, which instantiates a Pool
object of the
corresponding QSys
-derived scheduler class. See
?workers
for details.
Worker startup
For workers that are started up via a scheduler, we do not know which machine they will run on. This is why we start up every worker with a TCP/IP address of the master socket that will distribute work.
This is achieved by the call to R common to all schedulers:
Worker communication
On the master’s side, we wait until a worker connects:
msg = w$recv() # this will block until a worker is ready
We can then send any expression to be evaluated on the worker using
the send
method:
w$send(expression, ...)
After the expression (in ...
), any variables that should
be passed along with the call can be added. For batch processing that
clustermq
usually does, this command is
work_chunk
, where the chunk
data is added:
w$send(clustermq:::work_chunk(chunk, fun, const, rettype, common_seed),
chunk = chunk(iter, submit_index))
Worker environment
We can add any number of objects to a worker environment using the
env
method:
w$env(object=value, ...)
This will also invisibly return a data.frame
with all
objects currently in the environment. If a user wants to inspect the
environment without changing it they can call w$env()
without arguments. The environment will be propagated to all workers
automatically in a greedy fashion.
Main event loop
Putting the above together in an event loop, we get what is
essentially implemented in master
. w$send
invisibly returns an identifier to track which call was submitted, and
w$current()
matches the same to w$recv()
.
w = workers(3)
on.exit(w$cleanup())
w$env(...)
while (we have new work to send || jobs pending) {
res = w$recv() # the result of the call, or NULL for a new worker
w$current()$call_ref # matches answer to request, -1 otherwise
# handle result
if (more work)
call_ref = w$send(expression, ...) # call_ref tracks request identity
else
w$send_shutdown()
}
A loop of a similar structure can be used to extend
clustermq
. As an example, this
was done by the targets package.
ZeroMQ message specification
Communication between the master
(main event loop) and
workers (QSys
base class) is organised in
messages. These are chunks of serialized data sent via
ZeroMQ’s protocol (ZMTP). The parts of each message
are called frames.
Master - Worker communication
The master requests an evaluation in a message with X frames (direct) or Y if proxied. This is all handled by clustermq internally.
- The worker identity frame or routing identifier
- A delimiter frame
- Worker status (
wlife_t
) - The call to be evaluated
-
N repetitions of:
- The variable name of an environment object that is not yet present on the worker
- The variable value
If using a proxy, this will be followed by a SEXP
that
contains variable names the proxy should add before forwarding to the
worker.
Worker evaluation
A worker evaluates the call using the R C API:
If an error occurs in this evaluation will be returned as a structure
with class worker_error
. If a developer wants to catch
errors and warnings in a more fine-grained manner, it is recommended to
add their own callingHandlers
to cmd
(as
clustermq does work its work_chunk
).
Worker - Master communication
The result of this evaluation is then returned in a message with four (direct) or five (proxied) frames:
- Worker identity frame (handled internally by ZeroMQ’s
ZMQ_REQ
socket) - Empty frame (handled internally by ZeroMQ’s
ZMQ_REQ
socket) - Worker status (
wlife_t
) that is handled internally by clustermq - The result of the call (
SEXP
), visible to the user
If using a worker via SSH, these frames will be preceded by a routing identify frame that is handled internally by ZeroMQ and added or peeled off by the proxy.