marv
Creating datasets
Dataset scanner.
Datasets are created based on information provided by scanners. A scanner is responsible to group files into named datasets:
from marv_api import DatasetInfo
def scan(dirpath, dirnames, filenames):
return [DatasetInfo(os.path.basename(x), [x])
for x in filenames
if x.endswith('.csv')]
Scanners are called for every directory within the configured
scanroots, while files and directories starting with a .
and
directories containing an (empty) .marvignore
file are ignored and
will not be traversed into.
Further, traversal into subdirectories can be controlled by
altering the dirnames
list in-place. To block further
traversal, e.g. for a directory-based dataset type, set it to an
empty list – os.walk()
is used behind the scenes:
dirnames[:] = []
Declaring nodes
- marv_api.input(name, default=<NOTSET>, foreach=None, type=None)[source]
Declare input for a node.
Plain inputs, that is plain python objects, are directly passed to the node. Whereas streams generated by other nodes are requested and once the handles of all input streams are available the node is instantiated.
- marv_api.node(schema=None, group=None, version=None)[source]
Turn function into node.
- Parameters
schema¶ – capnproto schema describing the output messages format
group¶ (bool) – A boolean indicating whether the default stream of the node is a group, meaning it will be used to published handles for streams or further groups. In case of
marv.input.foreach
specifications this flag will default to True. This parameter is currently only for internal usage.
- Returns
A
Node
instance according to the given arguments andinput()
decorators.- Raises
TypeError – If not called, double decorated, or not generator.
Interacting with marv
- class marv_api.DatasetInfo(name, files)
Bases:
tuple
- files
Alias for field number 1
- name
Alias for field number 0
- exception marv_api.InputNameCollisionError[source]
Bases:
Exception
An input with the same name already has been declared.
- exception marv_api.ReaderError[source]
Bases:
Exception
A file could not be read, full node run is aborted.
- exception marv_api.ResourceNotFoundError[source]
Bases:
Exception
Requested resource could not be found.
- marv_api.create_group(name, **header)[source]
- marv_api.create_stream(name, **header)[source]
Create a stream for publishing messages.
All keyword arguments will be used to form the header.
- marv_api.get_logger()[source]
- marv_api.get_requested()[source]
- marv_api.get_resource_path(name)[source]
Request path to resource from site/resources.
Treat resource as readonly, do NOT modify.
- marv_api.make_file(name)[source]
- marv_api.pull(handle, enumerate=False)[source]
Pull next message for handle.
- Parameters
- Returns
A
Pull
task to be yielded. Marv will send the corresponding message as soon as it is available. For groups this message will be a handle to a member of the group. Members of groups are either streams or groups.
Examples
Pulling (enumerated) message from stream:
msg = yield marv.pull(stream) idx, msg = yield marv.pull(stream, enumerate=True)
Pulling stream from group and message from stream:
stream = yield marv.pull(group) # a group of streams msg = yield marv.pull(stream)
- marv_api.pull_all(*handles)[source]
Pull next message of all handles.
- marv_api.push(msg)[source]
- marv_api.select(node, name)[source]
Select specific stream of a node by name.
- marv_api.set_header(**header)[source]
Set the header of a stream or group.