Danger
Work in progress! It contains a lot of tpyos, please let us know. There are comments at the bottom or you can submit a PR against master branch.
Please help with the documentation if you know Dynflow.
Thanks!
TODO to be refined
Dynflow (DYNamic workFLOW) is a workflow engine written in Ruby that allows to:
Dynflow has been developed to be able to support orchestration of services in the Katello and Foreman projects.
TODO
Dynflow::Action
, defines code to be run in each phase.plan
, run
, finalize
.Hash
of data coming to the action.Hash
of data that the action produces. It's
persisted and can be used as input of other actions.run
/finalize
phase, holding the information
about steps that can run concurrently/in sequence. Part of execution plan.Persistence
, Logger
, Executor
and
all the other objects necessary for action executing. This concept
allows us to avoid globally shared state. Also, the worlds can
talk to each other, which is helpful for production and
high-availability setups,
having multiple worlds on different hosts handle the execution of the execution plans.
If you're still confused and come from RoR world, think about it as similar thing
that is Rails object for Ruby on Rails framework.See the
examples directory
for the code in action. Running those files (except the
example_helper.rb
file) leads to the Dynflow runtime being initialized
(including the web console where one can explore the features and
experiment).
TODO
TODO
When action is triggered, Dynflow executes plan method on this action, which
is responsible for building the execution plan. It builds the execution plan by calling
plan_action
and plan_self
methods, effectively listing actions that should be run as
a part of this execution plan. In other words, it compiles a list of actions on which
method run
will be called. Also it's responsible for giving these actions
an order. A simple example of such plan action might look like this
# this would plan deletion of files passed as an input array
def plan(files)
files.each do |filename|
plan_action MyActions::File::Destroy, filename
end
end
Note that it does not have to be only other actions that are planned to run.
In fact it's very common that the action plans itself, which means it will
put its own run
method call in the execution plan. In order to do that
you can use plan_self
. This could be used in MyActions::File::Destroy
used in previous example
class MyActions::File::Destroy < Dynflow::Action
def plan(filename)
plan_self path: filename
end
def run
File.rm(input.fetch(:path))
end
end
In example above, it seems that plan_self
is just shortcut to
plan_action MyActions::File::Destroy, filename
but it's not entirely true.
Note that plan_action
always triggers plan
of a given action while plan_self
plans only the run
and finalize
of Action, so by using plan_action
we'd end up in endless loop.
Also note, that run method does not take any input. In fact, it can use
input
method that refers to arguments, that were used in plan_self
.
Similar to the input mentioned above, the run produces output. After that some finalizing steps can be taken. Actions can use outputs of other actions as parts of their inputs establishing dependency. Action's state is serialized between each phase and survives machine/executor restarts.
As lightly touched in the previous paragraphs there are 3 phases: plan
, run
and finalize
.
Plan phase starts by triggering an action. run
and finalize
are only ran if you use
plan_self
in plan
phase.
Both input and output are Hash
es accessible by Action#input
and Action#output
methods. They
need to be serializable to JSON so it should contain only combination of primitive Ruby types
like: Hash
, Array
, String
, Integer
, etc.
Warning
One should avoid using directly
self.output = { key: data }
It might delete other data stored in the output (potentially by middleware and other parts of the action). Therefore it's preferred to use
output.update(key: data, another_key: another_data)
# or for one key
output[:key] = data
Note
You may sometime find these input/output format definitions:
class AnAction < Dynflow::Action
input_format do
param :id, Integer
param :name, String
end
output_format do
param :uuid, String
end
end
This might me quite handy especially in combination with subscriptions functionality.
The format follows apipie-params for more details. Validations of input/output could be performed against this description but it's not turned on by default. (It needs to be revisited and updated to be fully functional.)
Triggering the action means starting the plan phase, followed by immediate execution. Any action is triggered by calling:
world_instance.trigger(AnAction, *args)
Note
In Foreman and Katello actions are usually triggered by ForemanTask.sync_task
and
ForemanTasks.async_task
so following part is not that important if you are using
ForemanTasks
.
World#trigger
method returns object of TriggerResult
type. Which is
Algebrick variant type where definition follows:
TriggerResult = Algebrick.type do
# Returned by #trigger when planning fails.
PlaningFailed = type { fields! execution_plan_id: String, error: Exception }
# Returned by #trigger when planning is successful but execution fails to start.
ExecutionFailed = type { fields! execution_plan_id: String, error: Exception }
# Returned by #delay when scheduling succeeded.
Scheduled = type { fields! execution_plan_id: String }
# Returned by #trigger when planning is successful, #future will resolve after
# ExecutionPlan is executed.
Triggered = type { fields! execution_plan_id: String, future: Future }
variants PlaningFailed, ExecutionFailed, Triggered
end
If you do not know Algebrick
you can think about these as Struct
s with types.
You can see how it's used to distinguish all the possible results
in ForemanTasks module.
def self.trigger_task(async, action, *args, &block)
Match! async, true, false
match trigger(action, *args, &block),
# Raise if there is any error caused either by failed planning or
# by faild start of execution.
(on ::Dynflow::World::PlaningFailed.(error: ~any) |
::Dynflow::World::ExecutionFailed.(error: ~any) do |error|
raise error
end),
# Succesfully triggered.
(on ::Dynflow::World::Triggered.(
execution_plan_id: ~any, future: ~any) do |id, finished|
# block on the finished Future if this is called synchronously
finished.wait if async == false
return ForemanTasks::Task::DynflowTask.find_by_external_id!(id)
end)
end
Scheduling an action means setting it up to be triggered at set time in future. Any action can be delayed by calling:
world_instance.delay(AnAction,
{ start_at: Time.now + 360, start_before: Time.now + 400 },
*args)
This snippet of code would delay AnAction
with arguments args
to be executed
in the time interval between start_at
and start_before
. Setting start_before
to nil
would delay execution of this action without the timeout limit.
When an action is delayed, an execution plan object is created with state set
to scheduled
, but it doesn't run the the plan phase yet, the planning happens
when the start_at
time comes. If the planning doesn't happen in time
(e.g. after start_before
), the execution plan is marked as failed
(its state is set to stopped
and result to error
).
Since the args
have to be saved, there must be a mechanism to safely serialize and deserialize them
in order to make them survive being saved in a database. This is handled by a serializer.
Different serializers can be set per action by overriding its delay
method.
Planning of the delayed plans is handled by DelayedExecutor
, an object which
periodically checks for delayed execution plans and plans them. Scheduled execution
plans don't do anything by themselves, they just wait to be picked up and planned by a DelayedExecutor.
It means that if no DelayedExecutor is present, their planning will be delayed until a scheduler
is spawned.
Planning always uses the thread triggering the action. Plan phase
configures action's input for run phase. It starts by executing
plan
method of the action instance passing in arguments from
World#trigger method
world_instance.trigger(AnAction, *args)
# executes following
an_action.plan(*args) # an_action is AnAction
plan
method is inherited from Dynflow::Action and by default it plans itself if
run
method is present using first argument as input. It also calls finalize
method if it is present.
class AnAction < Dynflow::Action
def run
output.update self.input
end
end
world_instance.trigger AnAction, data: 'nothing'
The above will just plan itself copying input to output in run phase.
In most cases the plan
method is overridden to plan self with transformed arguments and/or
to plan other actions. In the Rails application, the arguments of the
plan phase are often the ActiveRecord objects, that are then
used to produce the inputs for the actions.
Let's look at the argument transformation first:
class AnAction < Dynflow::Action
def plan(any_array)
# pick just numbers
plan_self numbers: any_array.select { |v| v.is_a? Number }
end
def run
# compute sum - simulating a time consuming operation
output.update sum: input[:numbers].reduce(&:+)
end
end
Note
It's considered a good practice to use just enough data for the input for the action to perform the job. That means not too much (such as using ActiveRecord's attributes), as it might have performance impact as well as causes issues when changing the attributes later.
On the other hand, the input should contain enough data to perform the job without the need for reaching to external sources. Therefore, instead of passing just the ActiveRecord id and loading the whole record again in run phase, just to use some attributes, it's better to use these attributes directly as input of the action.
Following theses rules should lead to the best results, both from readability and performance point of view.
Now let's see an example with action planning:
class SumNumbers < Dynflow::Action
def plan(numbers)
plan_self numbers: numbers
end
def run
output.update sum: input[:numbers].reduce(&:+)
end
end
class SumManyNumbers < Dynflow::Action
def plan(numbers)
# references to planned actions
planned_sub_sum_actions = numbers.each_slice(10).map do |numbers|
plan_action SumNumbers, numbers
end
# prepare array of output references where each points to sum in the
# output of particular action
sub_sums = planned_sub_sum_actions.map do |action|
action.output[:sum]
end
# plan one last action which will sum the sub_sums
# it depends on all planned_sub_sum_actions because it uses theirs outputs
plan_action SumNumbers, sub_sums
end
end
world_instance.trigger SumManyNumbers, (1..100).to_a
Above example will in parallel sum numbers by slices of 10 values: first action sums 1..10
,
second action sums 11..20
, ..., tenth action sums 91..100
. After all sub sums are computed
one final action sums the sub sums into final sum.
Warning
This example is here to demonstrate the planning abilities. In reality this parallelization of compute intensive tasks does not have a positive effect on Dynflow running on MRI. The pool of workers may starve. It is not a big issue since Dynflow is mainly used to orchestrate external services.
TODO add link to detail explanation in How it works when available.
Action may access local DB in plan phase, see Database and Transactions.
Actions has a run phase if there is run
method implemented.
(There may be actions just planning other actions.)
The run method implements the main piece of work done by this action converting
input into output. Input is immutable in this phase. It's the right place for all the steps
which are likely to fail. Action run
phase are allowed to have side effects like: file operations,
calls to other systems, etc.
Local DB should not be accessed in this phase,
see Database and Transactions
Main purpose of finalize
phase is to be able access local DB after action finishes
successfully, like: indexing based on new data, updating records as fully created, etc.
Finalize phase does not modify input or output of the action.
Action may access local DB in finalize
phase and must be idempotent,
see Database and Transactions.
As already mentioned, actions can use output of different actions as their input (or just parts). When they do it creates dependency between actions, which is automatically detected by Dynflow and the execution plan is built accordingly.
def plan
first_action = plan_action AnAction
second_action = plan_action AnAction, first_action.output[:a_key_in_output]
end
second_action
uses part of the first_action
's output
therefore it depends on the first_action
.
If actions are planned without this dependency as follows
def plan
first_action = plan_action AnAction
second_action = plan_action AnAction
end
then they are independent and they are executed concurrently.
There is also other mechanism how to describe dependencies between actions than just
the one based on output usage. Dynflow user can specify the order between planned
actions with DSL methods sequence
and concurrence
. Both methods are taking blocks
and they specify how actions planned inside the block
(or inner sequence
and concurrence
blocks) should be executed.
By default plan
considers its space as inside concurrence
. Which means
def plan
first_action = plan_action AnAction
second_action = plan_action AnAction
end
equals
def plan
concurrence do
first_action = plan_action AnAction
second_action = plan_action AnAction
end
end
You can establish same dependency between first_action
and second_action
without
using output by using sequence
def plan
sequence do
first_action = plan_action AnAction
second_action = plan_action AnAction
end
end
As mentioned the sequence
and concurrence
methods can be nested and mixed
with output usage to create more complex dependencies. Let see commented example:
def plan
# Plans 3 actions of type AnAction to be executed in sequence
# argument is the index in the sequence.
actions_executed_sequentially = sequence do
3.times.map { |i| plan_action AnAction, i }
end
# Depends on output of the last action in `actions_executed_sequentially`
# so it's added to the above sequence to be executed as 4th.
action1 = plan_action AnAction, actions_executed_sequentially.last.output
# It's planned in default plan's concurrency scope so it's executed concurrently
# with the other four actions.
action2 = plan_action AnAction
end
The order than will be:
*actions_executed_sequentially
action1
action2
Let's see one more example:
def plan
actions = sequence do
2.times.map do |i|
concurrency do
2.times.map { plan_action AnAction, i }
end
end
end
end
Which results in order of execution:
actions[0][0]
argument: 0actions[0][1]
argument: 0actions[1][0]
argument: 1actions[1][1]
argument: 1Note
It's on our todo-list to change that to be able to define acyclic-graph of dependencies
between the actions. sequence
and concurrence
methods will then be deprecated and kept
just for backward compatibility.
Warning
Internally dependencies are also modeled with objects representing Sequences and Concurrences, which makes it weaker than acyclic-graph so in some cases during the dependency resolution it might not lead into the most effective execution plan. Some actions will run in sequence even though they could be run concurrently. This limitation is likely to be removed in some of the further releases.
Dynflow was designed to help with orchestration of other services. The usual execution looks as follows, we use an ActiveRecord User as example of a resource.
plan
phase. The record is marked as incomplete.run
phase back to the local database.For that reason there are transactions around whole plan
and finalize
phase
(all action's plan methods are in one transaction).
If anything goes wrong in the plan
phase any change made during planning to local DB is
reverted. Same holds for finalizing, if anything goes wrong, all changes are reverted. Therefore
all finalization methods has to be idempotent.
Internally Dynflow uses Sequel as its ORM, but users may choose what they need
to access they data. There is an interface TransactionAdapters::Abstract
where its
implementations may provide transactions using different ORMs.
The most common one probably being TransactionAdapters::ActiveRecord
.
So in the above example 2. and 4. step would be wrapped in ActiveRecord
transaction
if TransactionAdapters::ActiveRecord
is used.
Second outcome of the design is convention when actions should be accessing local Database:
plan
and finalize
phasesrun
phaseWarning
TODO warning about AR pool configuration, needs to have sufficient size
Dynflow is designed to allow easy composition of small building blocks
called Action
s. Typically there are actions composing smaller pieces
together and other actions doing actual steps of work as in following
example:
class CreateInfrastructure < Dynflow::Action
def plan
sequence do
concurrence do
plan_action(CreateMachine, 'host1', 'db')
plan_action(CreateMachine, 'host2', 'storage')
end
plan_action(CreateMachine,
'host3',
'web_server',
:db_machine => 'host1',
:storage_machine => 'host2')
end
end
end
Action CreateInfrastructure
does not have a run
method defined, it only
defines plan
action where other actions composed together.
Even though composing actions is quite easy and allows to decompose business logic to small pieces it does not directly support extensions by plugins. For that there are subscriptions.
Actions
can subscribe from a plugin, gem, any other library to already
loaded Actions
, doing so they extend the planning process with self.
Lets look at an example starting by definition of a core action
# This action can be extended without doing any
# other steps to support it.
class ACoreAppAction < Dynflow::Action
def plan(arguments)
plan_self(args: arguments)
plan_action(AnotherCoreAppAction, arguments.first)
end
def run
puts "Running core action: #{input[:args]}"
self.output.update success: true
end
end
followed by an action definition defined in a plugin/gem/etc.
class APluginAction < Dynflow::Action
# plan this action whenever ACoreAppAction action is planned
def self.subscribe
ACoreAppAction
end
def plan(arguments)
# arguments are same as in ACoreAppAction#plan
plan_self(args: arguments)
end
def run
puts "Running plugin action: #{input[:args]}"
end
end
Subscribed actions are planned with same arguments as action they are
subscribing to which is called trigger
. Their plan method is called right
after planning of the triggering action finishes.
It's also possible to access target action and use its output which makes it dependent (running in sequence) on triggering action.
def plan(arguments)
plan_self trigger_success: trigger.output[:success]
end
def run
self.output.update 'trigger succeeded' if self.input[:trigger_success]
end
Subscription is designed for extension by plugins, it should not be used inside a single library/app-module. It would make the process definition hard to follow (all subscribed actions would need to be looked up).
Sometimes action represents tasks taken in different services, (e.g. repository synchronization in Pulp). Dynflow tries not to waste computer resources so it offers tools to free threads to work on other actions while waiting on external tasks or events.
Dynflow allows actions to suspend and be woken up on external events. Lets create a simulation of an external service before showing the example of suspending action.
class AnExternalService
def start_synchronization(report_to)
Thread.new do
sleep 1
report_to << :done
end
end
end
The AnExternalService
can be invoked to start_synchronization
and it will
report back a second later to action passed in argument report_to
. It sends
event :done
back by <<
method.
Lets look at an action example.
class AnAction < Dynflow::Action
EXTERNAL_SERVICE = AnExternalService.new
def plan
plan_self
end
def run(event)
case event
when nil # first run
suspend do |suspended_action|
EXTERNAL_SERVICE.start_synchronization suspended_action
end
when :done # external task is done
output.update success: true
# let the run phase finish normally
else
raise 'unknown event'
end
end
end
Which is then executed as follows:
AnAction
is triggeredrun
phase begins.run
method is invoked with no event (nil
).suspend
is called, its block parameter is evaluated right after
suspending.:done
event through suspend action reference.run
method is executed again with :done
event.success: true
and actions finishes run
phase.finalize
phase, action is done.This event mechanism is quite flexible, it can be used for example to build a polling action abstraction which is a topic for next chapter.
Not all services support callbacks to be registered which would allow to wake up suspended actions only once at the end when the external task is finished. In that case we often need to poll the service to see if the task is still running or finished.
For that purpose there is Polling
module in Dynflow. Any action can be turned into a polling one
just by including the module.
class AnAction < Dynflow::Action
include Dynflow::Action::Polling
There are 3 methods need to be always implemented:
done?
- determines when the task is complete based on external task's data.invoke_external_task
- starts the external task.poll_external_task
- polls the external task status data and returns a status
(JSON serializable data like: Hash
, Array
, String
, etc.) which are stored in action's
output. def done?
external_task[:progress] == 1
end
def invoke_external_task
triger_the_task_with_rest_call
end
def poll_external_task
data = poll_data_with_rest_call
progress = calculate_progress data # => a float in 0..1
{ progress: progress
data: data }
end
end
This action will do following in run phase:
invoke_external_task
on first run of the actionpoll_external_task
done?
:
true
-> it concludes the run phasefalse
-> it schedules next pollingThere are 2 other methods handling external task data which can optionally overridden:
external_task
- reads the external task's stored data, by default it reads self.output[:task]
external_task=
- writes the the external task's stored data, by default it writes to
self.output[:task] = value
There are also other features implemented like:
Please see the
Polling
module
for more details.
Each Action phase can be in one of the following states:
run
phase, when action sleeps waiting for events to be woken up.Execution plan has following states:
run
and finalize
phases of actions are executed.Execution plan also has following results:
TODO how do I access such states as a programmer? TODO which Action phase states are "finish" and which requires user interaction?
If an error is raised in plan
phase, it is persisted in the Action object
for later inspection and it bubbles up in World#trigger
method which was used to trigger
the action leading to this error.
If you compare it to errors raised during run
and finalize
phase,
there's one major difference: Those never bubble up in trigger
because they are running
in executor not in triggering Thread, they are persisted just in the Action object.
If there is an error in run
phase, the execution pauses. You can inspect the error in
console. The error may be intermittent or you may fix the problem manually. After
that the execution plan can be resumed and it'll continue by rerunning the failed action and
continuing with the rest of the actions. During fixing the problem you may also do the steps
in the actions manually, in that case the failed action can be also marked as skipped. After
resuming the skipped action is not executed and the execution plan continues with the rest.
If there is an error in finalize
phase, whole finalize
phase for all the actions is
rollbacked and can be rerun when the problem is fixed by resuming.
If you encounter an error during run phase error!
or usual raise
can be used.
Dynflow was designed as an Orchestration tool, parallelization of heavy CPU computation tasks was not directly considered. Even with multiple executors single execution plan always runs on one executor, so without JRuby it wont scale well (MRI's GIL). However JRuby support should be added soon (TODO update when merged).
Another problem with long-running actions are blocked worker. Executor has only a limited pool of workers, if more of them become busy it may result in worsen performance.
Blocking actions for long time are also problematic.
Solutions are:
By default, a single queue and a pool of workers is used to process all the actions in the system. This can cause some actions to block execution of some higher-priority ones in the system.
To address this case, it's possible to define additional queues tied to additional pool of workers dedicated for it. This way, they can be processed more independently from the default queue.
To use the queue, one needs to register additional queues when defining the executor world:
config = Dynflow::Config.new
config.queues.add(:slow, :pool_size => 5)
world = Dynflow::World.new(config)
The action to use the queue just needs to override the queue
method like this:
class MyAction < Dynflow::Action
def queue
:slow
end
def run
sleep 60
end
end
In the current implementation, it's expected all the executors would have the same set of queues defined. In the future implementation it should be possible to have dedicated executors with just a subset of queues
Each action class has chain of middlewares which wrap phases of the action execution.
It's very similar to rack middlewares.
To create new middleware inherit from Dynflow::Middleware
class. It has 5 methods which can be
overridden: plan
, run
, finalize
, plan_phase
, finalize_phase
. Where the default
implementation for all the methods looks as following
def plan(*args)
pass *args
end
When overriding user can insert code before and/or after the pass
method which executes next
middleware in the chain or the action itself which is at the end of the chain. Most usually the
pass
is always called somewhere in the overridden method. There may be some cases when it can
be omitted, then it'll prevent all following middlewares and action from running.
Some implementation examples: KeepCurrentUser, Action::Progress::Calculate.
Each Action has a chain of middlewares defined. Middleware can be added by calling use
in the action class.
class AnAction < Dynflow::Action
use AMiddleware, after: AnotherMiddleware
end
Method use
understands 3 option keys:
:before
- makes this middleware to be ordered before a given middleware:after
- makes this middleware to be ordered after a given middleware:replace
- this middleware will replace given middlewareThe :before
and :after
keys are used to build a graph from the middlewares which is then
sorted down with
topological sort
to the chain of middleware execution.
To use sub-plans, you must include the Dynflow::Action::WithSubPlans
module
and override the create_sub_plans
method. Inside the create_sub_plans
method, you use the trigger
method to create sub-tasks that will be executed
in no particular order during the run phase. The parent task will wait for the
sub-tasks to finish without blocking a thread in a pool while waiting.
class MyAction < Actions::EntryAction
include Dynflow::Action::WithSubPlans
...
def create_sub_plans
[
trigger(Actions::OtherAction, action_param1, action_opts),
trigger(Actions::AnotherAction)
]
end
end
Dynflow allows actions to hook into the lifecycle of their execution plans. To
use the hooks, the user has to define a method on the action and register it as
a hook. Currently there are hook events for every execution plan's state which
are executed when the execution plan transitions into the state. Additionally
there are two more hook events, failure
and success
which are run when the
execution plan transitions into the stopped
state with error
or success
result.
Methods can be registered using execution_plan_hooks.use
call, providing the
method name as Symbol
and optionally a :on => HOOK_EVENT
parameter, where
HOOK_EVENT
can be one of the hook events or an array of them. In case the
optional parameter is not provided, the method is executed on every state
change. Similarly, for example inherited, hooks can be disabled by calling
execution_plan_hooks.do_not_use
, taking the same arguments. Hooks defined on
an action are inherited when the action is sub-classed.
The hooks are executed for every action in the execution plan and the order of execution is not guaranteed.
class MyAction < Actions::EntryAction
# Sets up a hook to call #state_change method when the execution plan changes
# its state
execution_plan_hooks.use :state_change
# Sets up a hook to call #success_notification method when the execution plan
# finishes successfully,
execution_plan_hooks.use :success_notification, :on => :success
# Disables running #state_change method when the execution plan starts or
# finishes planning
execution_plan_hooks.do_not_use :state_change, :on => [:planning, :planned]
def state_change(_execution_plan)
# Do something on every state change
end
def success_notification(_execution_plan)
# Display a notification
end
end
Note
This part is based on the current work-in-progress on multi-executors support
The world represents the Dynflow's run-time and it acts as an external interface. It holds all the configuration and sub-components needed for the Dynflow to perform its job.
The Dynflow worlds is composed of the following sub-components:
The underlying technologies are hidden behind adapters abstraction, which allows choosing the right technology for the job, while keeping the rest of the Dynflow intact.
In the simplest case, the world handles both the client requests and the execution itself. This is useful for small all-in-one deployments and development.
In production, however, one might want to separate the client worlds (often running in the web-server process or client library) and the executor worlds (running as part of standalone service). This setup makes the execution more stable and high-available (in active-active mode).
There might be multiple client as well as executor worlds in the Dynflow infrastructure.
The executor world has still its own client dispatcher, so that it can act as a client for triggering other execution plans (useful in sub-plans feature).
Dynflow recognizes different expectations from the underlying technologies from the persistence (durability), coordinator (real-time synchronization) and connector (transport).
However, we also realize that for many use-cases, a single shared database is just enough infrastructure the user needs for the job. Forcing using different technologies would mean just useless overhead.
Therefore, it's possible to use a single shared SQL database to do all the work.
Note
Something as simple as sqlite is enough for getting the Dynlfow up-and-running and getting something done.
For the best connector results, it's recommended to use PostgreSQL, as Dynflow can utilize the listen/notify feature for better response times.
1) the client prepares an execution plan, saves it into persistence and passes its id to the client dispatcher
2) the client dispatcher creates an IVar that represents the future value of the execution plan after it finishes execution. The client can use this value to wait for the execution plan to finish or hook other procedures on the finish-time.
3) the client dispatcher creates an envelope (see
connector for more details), containing the request for
execution, with receiver id
set to AnyExecutor
4) the connector uses some scheduling algorithm to choose what executor to send the request to and
replaces the AnyExecutor
with it. It sends the envelope the the
chosen executor.
5) the connector on the executor side receives the envelope and asks the executor dispatcher to handle it
6) the executor dispatcher acquires the lock on the execution plan and initiates the execution. In the mean-time, it lets the client know that the work was accepted (there might be additional logic on the client to handle a timeout after the work was not accepted)
7 - 9) the connector layer propagates the Accepted
response back
to client
10) the executor finishes execution
11 - 12) the connector layer propagates the Finished
response
13) the client dispatcher resolves the original IVar
, so that the
client is able to find out about the finished execution.
The behavior is the same even in the "one world" scenario: in that case this world is participating both on the client and the executor side, connected through a direct in-memory connector.
The persistence making sure that the serialized states of the execution plans are persisted for recovery and status tracking. The execution plan data are stored in it, with the actual state.
Unlike coordinator, all the persisted data don't have to be available for all the worlds at the same time: every world needs just the data that it is actively working on. Also, all the data don't have to be fully synchronized between worlds (as long as the up-to-date data about relevant execution plans are available for the world).
Provides messages passing between worlds. The message has a form of envelope of the following structure:
AnyExecutor
, when load-balancingThe connector is responsible for spreading the load across the
executor worlds. It's determined by the AnyExecutor
value at the
request id
field. The implementation available in the
current version uses a simple round-robin algorithm for this purpose.
This component (especially important in a multi-executor setup): Makes sure no two executors are executing the same execution plan at the same time (a.k.a locking). It also provides the information about the worlds available in the system (the worlds register). Unlike the persistence, it's not required to persist the data (could be recalculated), but it needs to provide a globally shared state.
The main type of objects the coordinator works with is a record which consists of:
There is a special type of record called lock
, that keeps the owner
information as well. It's used for keeping information about what
executor is actively working on what execution plan: the executor is
not allowed to start executing the unless it has successfully acquired
a lock for it.
Dynflow has a special module for actions of which there should be only one instance active at a time. This module provides a number of methods for managing the action's locks as well as a middleware for automatic locking.
It works in the following way. The middleware tries to acquire the lock for this action, which is owned by the execution plan. If another action already holds the lock, it fill fail and the execution plan will transition to stopped-error state. Having obtained the lock, the action goes through the planning as usually. In run phase, the middleware checks if the execution plan still owns the lock for the action. If the execution plan holds the lock or there is no lock at all and the action manages to acquire it again, the execution proceeds. If the lock is held by another execution plan, the current one fails. Unlocking can be either done manually from within the action or can be left to the execution plan. The execution plan unlocks all locks it holds whenever it transitions to paused or stopped state.
All that is needed to make an action a singleton one is including the module into it.
class ExampleSingletonAction < ::Dynflow::Action
include ::Dynflow::Action::Singleton
end
if
s in run phaseComments are temporally turned on here for faster feedback.