.. _chap-streaming:

**********************
Streaming in VisTrails
**********************

Streaming data may be useful for a number of reasons, such as to incrementally
update a visualization, or to process more data than would fit into memory.
VisTrails supports streaming data through the workflow. By implementing modules
that supports streaming, data items will be passed through the whole workflow
one at a time.

Using Streaming
===============

Streaming is similar to list handling (see Chapter
:ref:`chap-list_handling`). Modules that create streams should output a port
with list depth 1. Downstream modules that do not accept lists will be executed
once for each item in the stream. Modules with multiple input streams will
combine them pairwise. For this reason the input streams should contain the
same number of items (or be unlimited).

Modules accepting a type with list depth 1, but not supporting streaming, will
see their input collected into a list, and will execute after the streaming has
ended.

.. topic:: Try it Now!

  Let's use PythonSources to create a simple example that incrementally sums up
  a sequence of numbers. First we will create a module that streams the
  natural numbers up to some value. Create a new workflow and add a
  ``PythonSource`` module. Give it an input port named ``inputs`` of type
  Integer, which will specify the maximum number to stream, and an output port
  named ``out`` of type ``Integer`` with list depth 1, which will be the output
  stream. An output stream can be created by using
  ``self.set_streaming_output``, which takes the port name, an iterator object,
  and an optional length of the input items. To create an integer iterator we
  can use ``xrange``. Add this to the PythonSource:

.. code-block:: python

   self.set_streaming_output('out',
                             iter(xrange(inputs)),
                             inputs)

.. topic:: Next Step!

  Now let's create a module that processes each item in the sequence. Add a second
  ``PythonSource`` module below the first one. Give it an input port named
  ``integerStream`` of type Integer and list depth 1 that will be our input
  stream. An input stream can be captured by adding the magic string
  ``# pragma: streaming`` to the PythonSource code and calling ``self.set_streaming``
  with a generator method as argument. The generator method should take the
  module as an input. It should first initialize its value, in our case set
  ``intsum=0``. Then it should receive the inputs in a loop ending with yield.
  In each iteration the module will be updated to contain a new input in the
  stream. Similar to a normal module, the loop should:

  1. get inputs
  2. compute outputs
  3. set outputs
  4. call ``yield``

  Below is the complete example. Add it to the PythonSource.

.. code-block:: python

  # pragma: streaming - This tag is magic, do not change

  def generator(module):
      intsum = 0
      while 1:
          i = module.get_input('integerStream')
          intsum += i
          print "Sum so far:", intsum
          yield

  self.set_streaming(generator)

.. topic:: Finally:

  Connect the two PythonSources, set ``inputs`` to 100 in the first
  PythonSource, open the VisTrails console and execute. See how the output is
  printed to the console while the stream runs and how the progress of the
  modules increase. The output should look like this: :vtl:`(open in vistrails)
  <streaming.vt>`

.. code-block:: python

  Sum so far: 0
  Sum so far: 1
  Sum so far: 3
  ...
  Sum so far: 4851
  Sum so far: 4950