From VistrailsWiki
Jump to navigation Jump to search

The multithreaded-interpreter is a branch of development adding support for parallel processing to VisTrails modules. It allows to execute parts of a pipeline in parallel, or even on remote machines.

Currently, support for threads, multiprocessing and IPython has been put in.

A general description is available on Google Docs.


In 2.0, modules execute serially; each module calls it's upstream modules' update() method recursively to execute the full graph.

Because of the way VisTrails behaves and limitations of the libraries it uses (PyQt, VTK), this should be opt-in: by default, modules should still run sequentially on a single thread.

Modules that do support parallel execution would get executed in a different thread in parallel with each other and in parallel with classic modules, so a mechanism to allow the interpreter thread to continue while a module went to the background is necessary.

Execution logic

Currently, the execution logic for the pipeline is entirely contained in the Module class. In the update() method, a module calls update_upstream() then compute(); this first method simply calls update() on each upstream module recursively and in no particular order.

These recursive calls are incompatible with backgrounding tasks as there is no way for a module to do anything while these calls are no complete. Moreover, some modules override the update_upstream() logic, for instance the ControlFlow package (If/Default/ExecuteInOrder modules) or don’t actually use it, for instance ControlFlow’s or ParallelFlow’s Map modules.

Task system

Thus, I replaced the recursive calls with a task system (vistrails.core.task_system). This system provides a way to add tasks to be run later, and for a callback to be run when done; it also handles priorities. The system only runs tasks sequentially, but provides a way to register asynchronous tasks to support backgrounding modules.

To make sure that background tasks are started as soon as possible (before foreground tasks), the base priorities are as follow:

  • 10 (UPDATE_UPSTREAM_PRIORITY) for “update upstream” tasks, so that every module has a chance to register its tasks before anything starts computing
  • 100 (COMPUTE_PRIORITY) for “compute” tasks
  • 50 (COMPUTE_BACKGROUND_PRIORITY) for “start thread/process” tasks, so they they are started before compute tasks. This allows them to run in parallel with regular compute tasks, in addition to running in parallel between themselves

Parallelization layer

On top of this task system, a simple interface allowing standard modules to use parallilization without much efforts has been added. It consists of two parts:

  • A @parallelizable class decorator for Module subclasses, used to declare which targets a module can be executed on. A Module will only be executed on targets it supports.
  • A ParallelizationScheme base class, that provides a specific way to execute a module. Currently implemented are:
    • ThreadScheme, that uses concurrent.futures.ThreadPoolExecutor
    • ProcessScheme, that uses concurrent.futures.ProcessPoolExecutor
    • IPythonScheme, that uses IPython.parallel to dispatch modules to a cluster


Things that need work

  • Executing multiple modules at once: right now we can only send a single module. Sending multiple modules to be executed at once is only possible by making a group (if all the modules in the group support remote execution).
  • Data movement: currently module input/output is downloaded from clusters and uploaded back for each module (except inside groups). The interpreter should have knowledge of things like files that can't be moved easily (pickling a File object doesn't move the file...)
  • "remotely-executable" semantics: there are modules that we don't need to execute remotely (because they are really fast, like Constant) but shouldn't stop a containing pipeline from being sent out (because they can be executed remotely if needed).

Things to watch out for

  • When there are non-parallel modules or tasks, the task system may not run them in the most efficient order. This is because the actual duration of each task is not know. An effort has been made to try to reach the backgroundable tasks first, but as their upstream modules need to be run, finding the most efficient order is difficult. Moreover, using complex strategies to choose a task might make execution slower even though it uses parallelism.
  • upstream modules are not necessarily run consecutively, this causes issues with packages that wrongly assume that to keep some global state, like matplotlib [1] that creates a Figure then expects only its own upstream modules to be run.
  • Instances of VisTrails fired up by multiprocessing or IPython load the packages that are listed in the configuration. They won't load other required packages automatically, and they won't ignore unwanted packages (such as visualization packages; VTK takes a long time to load...).

Current work


The current provenance log stores Machines, with information such as processor and system type, ram, hostname. However, we want to capture information about which machine is local and how the job was sent to the remote machine.

It can also happen that a single module runs on several machine, because technically, execution starts locally and then the module sends itself to a remote machine for execution.

Also, we should provide a framework for modules that explicitely use remote machines during their execution to record the machines they used in the log.

Current format

Currently a workflowExec entry in the log looks like this (some parameters have been omitted):

<workflowExec ip="" name="ROOT + 32*" tsEnd="2013-07-18 10:38:55">
  <annotation id="1" key="__reason__" value="Pipeline Execution" />
  <machine id="1" name="" os="Windows" />
  <machine id="2" name="" os="Windows" />
  <machine id="3" name="veby" os="Linux" />
  <moduleExec id="1" machine_id="1" moduleId="3" moduleName="PythonSource" />
  <moduleExec id="2" machine_id="1" moduleId="0" moduleName="PythonSource" />
  <moduleExec id="3" machine_id="2" moduleId="3" moduleName="PythonSource">
    <annotation id="2" key="parallel_execution" value="True" />
  <moduleExec id="3" machine_id="3" moduleId="0" moduleName="PythonSource">
    <annotation id="3" key="parallel_execution" value="True" />
  <moduleExec id="3" machine_id="1" moduleId="2" moduleName="ConcatenateString" />


  • Remove the 'IP' field from WorkflowExec, make it optional on Machine
  • Move reason to a field on WorkflowExec
  • Use a RemoteExecution inside ModuleExec to keep track of what a module triggers remotely. It would have an attribute storing what parallelization scheme was used to submit the task (ex: IPython). Because each library might have a different set of parameters, these can probably be captured as annotations (ex: IPython profile).
  • RemoteExecution would contain either ModuleExec, for execution of a whole VisTrails module remotely (as with @parallelizable) or RemoteTask, to store that something else was done remotely (but we still want to capture what machines were used).

Completely made-up hypothetical example:

<workflowExec name="ROOT + 32*" tsEnd="2013-07-18 10:38:55" reason="Pipeline Execution">
  <machine id="1" name="" os="Windows" />
  <machine id="2" name="" os="Windows" />
  <machine id="3" name="" os="Linux" />

  <!-- This module executed on IPython via @parallelizable -->
  <moduleExec id="1" machine_id="1" moduleId="0" moduleName="PythonSource">
    <RemoteExecution scheme="ipython">
      <annotation id="1" key="ipython_profile" value="default" />
      <moduleExec id="2" machine_id="2" moduleId="0" moduleName="PythonSource" />

  <!-- This module executed on another IPython profile via @parallelizable -->
  <moduleExec id="3" machine_id="1" moduleId="1" moduleName="PythonSource">
    <RemoteExecution scheme="ipython">
      <annotation id="1" key="ipython_profile" value="polycluster" />
      <moduleExec id="4" machine_id="3" moduleId="1" moduleName="PythonSource" />

  <!-- This module executed locally but made use of an IPython cluster -->
  <moduleExec id="5" machine_id="1" moduleId="2" moduleName="MultiprocessingModule">
    <RemoteExecution scheme="ipython">
      <annotation id="3" key="ipython_profile" value="default" />
      <remoteTask machine_id="2" function="map" count="2" description="Inversing five matrices" />
      <remoteTask machine_id="3" function="map" count="3" description="Inversing five matrices" />
    <RemoteExecution scheme="ipython">
      <annotation id="4" key="ipython_profile" value="polycluster" />
      <remoteTask machine_id="3" function="apply" count="1" description="Graphviz" />

  <!-- This module only used the local machine -->
  <moduleExec id="6" machine_id="1" moduleId="3" moduleName="ConcatenateString" />

Choosing per-module execution target

Currently, only the runtime Module class knows what targets it supports. Determination of the target to use happens when it actually runs, and thus can be dynamic (PythonSource uses an input port to choose the target).

Instead, it would make sense to be able to select a target for a particular module among what it supports. This means:

  • The UI needs to know what a module support; hence it should no longer just be an execution thing.
    • We could keep the current class-decorator, that we would extract to the ModuleDescriptor
    • or add a field (like _input_ports) and/or a parameter to ModuleRegistry#add_module().
  • The interpreter needs to know what was chosen in the UI
    • As simple as setting a field in Module#summon()
  • Needs actual UI support to choose the scheme, and the specific parameters for that scheme
  • Probably needs to allow VisTrails to connect to several IPython clusters at once, since we can now assign different modules to different clusters.
  • The configuration needs to be saved somewhere.
    • Juliana proposed: save it independently from the pipeline, in the workflow or in the .vt bundle, in a non-versioned manner. Allow to recreate the configuration from a WorkflowExec in the log (this provides versioning).
    • OR: annotate modules (but this interferes with versioning)
    • OR: use a port (interferes with versioning)

Execution configuration objects

  • vistrails_module:Module has a static supported_execution:SupportedExecution, initialized through the @parallelizable class decorator. This only contains what systems are supported.
  • ModuleDescriptor has the same supported_execution, extracted from the Module. It is used by the UI to show choices to the user.
  • vistrails.module:Module has an execution_preference:ExecutionPreference that is set by the user. This contains one supported system plus system-specific settings (cluster name, target address, ...) as annotations. Different modules can share an ExecutionPreference.
  • A whole pipeline's configuration is represented by ExecutionConfiguration. It contains a mapping from a list of module ids (string format: "{group_id},{group_id},{module_id}" to identify nested modules) to an ExecutionPreference.

During execution, the ExecutionPreference is passed to the Module in summon() (when the vistrail.module:Module creates a vistrails_module:Module). When the module executes, it uses the parallelization scheme with the highest priority among the ones it supports and that are enabled/available, unless an ExecutionPreference is set in which case it uses that (and fails otherwise).

Example of execution configuration file:

<executionConfiguration version="1.0.3" vtVersion="2.1 beta">
  <!-- This module will only be executed locally -->
  <moduleExecutionPreference module_id="0" preference="-1"/>

  <!-- IPython configuration (with annotation for system-specific settings) -->
  <executionPreference id="0" system="ipython">
    <annotation id="0" key="ipython-profile" value="poly_cluster"/>
  <moduleExecutionPreference module_id="1" preference="0"/>
  <!-- 0,4 means module 4 in group module 0 -->
  <moduleExecutionPreference module_id="0,4" preference="0"/>

  <!-- Another configuration -->
  <executionPreference id="1" system="multiprocessing">
    <annotation id="1" key="processes" value="8"/>
  <moduleExecutionPreference module_id="2" preference="1"/>

  <!-- Unlisted modules will choose automatically at runtime, like they
    currently do -->

Integration with job submission

09/05 meeting summary:

  • Store running jobs with a core component somewhere, by serializing the "job object" (currently "queue" object), instead of using the startup.xml ("runningJobsList" key)
  • IPython should provide a serializable job object with the profile and the task UUID
  • resuming a pipeline: suspended module wouldn't run upstream
  • add UI to stop execution if only resumable modules are holding it