API
CooperativeTasks.Command
CooperativeTasks.Future
CooperativeTasks.LoopExecution
CooperativeTasks.Message
CooperativeTasks.SingleExecution
CooperativeTasks.SpawnOptions
CooperativeTasks.TaskException
Base.fetch
CooperativeTasks.cancel
CooperativeTasks.istasksuccessful
CooperativeTasks.schedule_shutdown
CooperativeTasks.shutdown
CooperativeTasks.shutdown_owned_tasks
CooperativeTasks.shutdown_scheduled
CooperativeTasks.spawn
CooperativeTasks.tryfetch
CooperativeTasks.trysend
CooperativeTasks.@spawn
CooperativeTasks.Command
— TypeExecute ret = f()
on a task, optionally executing continuation(ret)
from the task the message has been sent from.
First, the command is registered on a source task with a corresponding UUID. Then, as part of a message, it is sent to the destination task for execution, which will send back the value associated with this UUID if any continuation has been provided. If so, when the source task next collects new messages, it will run continuation
with the returned value.
struct Command
f::Any
args::Any
kwargs::Any
continuation::Any
CooperativeTasks.Future
— TypeOperation waiting on another task to produce and return a value.
The value is accessible using fetch
and tryfetch
, with a timeout that may be set as a parameter.
mutable struct Future
uuid::Base.UUID
value::Ref{Any}
from::Task
to::Task
CooperativeTasks.LoopExecution
— TypeExecute a function repeatedly until a shutdown is scheduled with schedule_shutdown
.
If period
is greater than a millisecond, every iteration may trigger a sleep for the remaining period time after executing the main function. Simple heuristics are used to prevent sleeping when there were recent interactions with other tasks, to avoid suffering from large communication delays.
At every iteration, task messages will be checked and allow (among other things) to compute/return results to other tasks (see Future
), and to cancel the task. This mode of execution is preferred over manual loops precisely for the ability to satisfy task duties as required in the context of this library.
struct LoopExecution
period::Union{Nothing, Float64}
state::CooperativeTasks.ExecutionState
shutdown::Bool
: Whether to callshutdown()
on this task after the loop finishes executing.Can be useful in contexts where
SingleExecution
andLoopExecution
are mixed together, to avoid shutting down multiple times.
CooperativeTasks.Message
— TypeMessage sent by a task with an optional payload, identified by a UUID.
If critical
is set to true, it will be processed before all non-critical messages.
struct Message{T}
from::Task
uuid::Base.UUID
payload::Any
critical::Bool
CooperativeTasks.SingleExecution
— TypeExecute a function once, then return.
struct SingleExecution
shutdown::Bool
: Whether to callshutdown()
on this task after the loop finishes executing.Can be useful in contexts where
SingleExecution
andLoopExecution
are mixed together, to avoid shutting down multiple times.
CooperativeTasks.SpawnOptions
— TypeOptions for spawning tasks with spawn
.
struct SpawnOptions
execution_mode::Union{SingleExecution, LoopExecution}
: Specifies how the task should be run, via aSingleExecution
orLoopExecution
structure.
start_threadid::Union{Nothing, Int64}
: Optional 1-based thread ID to start the task on.If task migration is disabled, then the task will execute on this thread during its entire lifespan. If set to
nothing
(default), then the starting thread is the same as the thread launching the task.
disallow_task_migration::Bool
: Disallow tasks to migrate between Julia threads (false
by default).When set to
false
, this corresponds to the behavior of@async
, and when set totrue
, to the behavior ofThreads.@spawn
.Preventing task migration enables the use of
Threads.threadid()
as an index from the spawned tasks. For example, consider the following pattern:const results = [Int[] for i in 1:Threads.nthreads()] # ... # Spawn a bunch of tasks. # ... # Execute code like the following from these tasks: push!(results[Threads.threadid()], rand())
This pattern requires
Threads.threadid()
to be constant over the entire lifespan of the tasks, which requires task migration to be disabled.Disabling task migration can also be useful when e.g. C libraries rely on functions to be executed in the same thread in which some library-defined context has been created, as can be the case for graphics API such as Vulkan or OpenGL.
CooperativeTasks.TaskException
— TypeEmitted when a task has failed, for a reason described with its StatusCode
and an optional message.
struct TaskException <: Exception
msg::String
code::CooperativeTasks.StatusCode
Base.fetch
— MethodFetch the value of the future
, waiting at most timeout
seconds if the value is not available yet.
If sleep_time
is nonzero, sleep(sleep_time)
will be called while waiting.
If an error occurs, or timeout
seconds have passed without result, an exception will be thrown.
fetch(
future::Union{Future, Result{Future}};
timeout,
sleep_time
) -> Any
CooperativeTasks.cancel
— MethodRequest the task to cancel its execution.
cancel(task::Task) -> Result{Future, TaskException}
CooperativeTasks.istasksuccessful
— MethodCheck whether the task has successfully terminated execution.
By default, an error will be logged if an exception was found; set log = false
to prevent that.
istasksuccessful(task::Task; log) -> Bool
CooperativeTasks.schedule_shutdown
— MethodSignal the current task to shut down.
To check if a task received such a signal, use shutdown_scheduled
.
schedule_shutdown() -> Bool
CooperativeTasks.shutdown
— MethodShut down a task by cancelling it if it has not completed.
See cancel
.
shutdown(task::Task) -> CooperativeTasks.Condition
CooperativeTasks.shutdown_owned_tasks
— MethodShutdown all children of the current task. Returns a Condition
which can be waited on.
shutdown_owned_tasks() -> CooperativeTasks.Condition
CooperativeTasks.shutdown_scheduled
— MethodReturn whether the current task is scheduled for shutdown with schedule_shutdown
.
If this function returns true, the task is responsible for finishing any pending activity and shutting itself down.
shutdown_scheduled() -> Bool
CooperativeTasks.spawn
— MethodSpawn a new task executing the function f
.
Depending on the execution_mode
parameter of the provided SpawnOptions
, f()
may be executed multiple times.
spawn(f, options::SpawnOptions) -> Task
CooperativeTasks.tryfetch
— MethodFetch the value of the future
, waiting at most timeout
seconds if the value is not available yet.
If sleep_time
is nonzero, sleep(sleep_time)
will be called while waiting.
If an error occurs, or timeout
seconds have passed without result, an exception will be returned.
This performs the same as fetch
, but does not throw and wraps the returned value or exception into a Result
.
tryfetch(
future::Union{Future, Result{Future}};
timeout,
sleep_time
) -> Result{Any, Union{ExecutionError, PropagatedTaskException, TaskException}}
CooperativeTasks.trysend
— MethodSend a message m
to task
, unless the task is dead.
trysend(
task::Task,
m::Message
) -> Result{Future, TaskException}
CooperativeTasks.@spawn
— Macro@spawn [options] $ex
@spawn begin ... end
@spawn :single begin ... end
@spawn :looped begin ... end
Convenience macro to spawn a task via spawn
, defining a closure over ex
as the function to be executed by the task.