marv

Creating datasets

Dataset scanner.

Datasets are created based on information provided by scanners. A scanner is responsible to group files into named datasets:

from marv_api import DatasetInfo

def scan(dirpath, dirnames, filenames):
    return [DatasetInfo(os.path.basename(x), [x])
            for x in filenames
            if x.endswith('.csv')]

Scanners are called for every directory within the configured scanroots, while files and directories starting with a . and directories containing an (empty) .marvignore file are ignored and will not be traversed into.

Further, traversal into subdirectories can be controlled by altering the dirnames list in-place. To block further traversal, e.g. for a directory-based dataset type, set it to an empty list – os.walk() is used behind the scenes:

dirnames[:] = []
class marv_api.scanner.DatasetInfo(name, files)

Bases: tuple

files

Alias for field number 1

name

Alias for field number 0

Declaring nodes

marv_api.input(name, default=<NOTSET>, foreach=None, type=None)[source]

Declare input for a node.

Plain inputs, that is plain python objects, are directly passed to the node. Whereas streams generated by other nodes are requested and once the handles of all input streams are available the node is instantiated.

Parameters
  • name (str) – Name of the node function argument the input will be passed to.

  • default – An optional default value for the input. This can be any python object or another node.

  • foreach (bool) – This parameter is currently not supported and only for internal usage.

  • type – Stream message type.

Returns

The original function decorated with this input specification. A function is turned into a node by the node() decorator.

Raises

TypeError – If type not supported for input stream.

marv_api.node(schema=None, group=None, version=None)[source]

Turn function into node.

Parameters
  • schema – capnproto schema describing the output messages format

  • group (bool) – A boolean indicating whether the default stream of the node is a group, meaning it will be used to published handles for streams or further groups. In case of marv.input.foreach specifications this flag will default to True. This parameter is currently only for internal usage.

  • version (int) – This parameter currently has no effect.

Returns

A Node instance according to the given arguments and input() decorators.

Raises

TypeError – If not called, double decorated, or not generator.

Interacting with marv

exception marv_api.Abort[source]

Bases: Exception

class marv_api.DatasetInfo(name, files)

Bases: tuple

files

Alias for field number 1

name

Alias for field number 0

exception marv_api.InputNameCollisionError[source]

Bases: Exception

An input with the same name already has been declared.

exception marv_api.ReaderError[source]

Bases: Exception

A file could not be read, full node run is aborted.

exception marv_api.ResourceNotFoundError[source]

Bases: Exception

Requested resource could not be found.

marv_api.create_group(name, **header)[source]
marv_api.create_stream(name, **header)[source]

Create a stream for publishing messages.

All keyword arguments will be used to form the header.

marv_api.get_logger()[source]
marv_api.get_requested()[source]
marv_api.get_resource_path(name)[source]

Request path to resource from site/resources.

Treat resource as readonly, do NOT modify.

Parameters

name (str) – Name of resource, interpreted as path relative to resource directory.

Return type

GetResourcePath

Returns

GetRequestPath request to yield to marv.

marv_api.make_file(name)[source]
marv_api.pull(handle, enumerate=False)[source]

Pull next message for handle.

Parameters
  • handle – A stream.Handle or GroupHandle.

  • enumerate (bool) – boolean to indicate whether a tuple (idx, msg) should be returned, not unlike Python’s enumerate().

Returns

A Pull task to be yielded. Marv will send the corresponding message as soon as it is available. For groups this message will be a handle to a member of the group. Members of groups are either streams or groups.

Examples

Pulling (enumerated) message from stream:

msg = yield marv.pull(stream)
idx, msg = yield marv.pull(stream, enumerate=True)

Pulling stream from group and message from stream:

stream = yield marv.pull(group)  # a group of streams
msg = yield marv.pull(stream)
marv_api.pull_all(*handles)[source]

Pull next message of all handles.

marv_api.push(msg)[source]
marv_api.select(node, name)[source]

Select specific stream of a node by name.

Parameters
  • node – A node producing a group of streams.

  • name (str) – Name of stream to select.

Returns

Node outputting selected stream.

marv_api.set_header(**header)[source]

Set the header of a stream or group.