# Copyright 2016 - 2020 Ternaris.
# SPDX-License-Identifier: AGPL-3.0-only
from contextvars import ContextVar
from capnp.lib.capnp import KjException
from marv_pycapnp import Wrapper
from .iomsgs import (
CreateStream,
GetLogger,
GetRequested,
GetResourcePath,
Handle,
MakeFile,
Pull,
PullAll,
Push,
SetHeader,
)
from .utils import err
NODE_SCHEMA: ContextVar[dict] = ContextVar('NODE_SCHEMA')
[docs]class Abort(Exception): # noqa: N818
pass
[docs]class ReaderError(Exception):
"""A file could not be read, full node run is aborted."""
[docs]class ResourceNotFoundError(Exception):
"""Requested resource could not be found."""
[docs]def create_stream(name, **header):
"""Create a stream for publishing messages.
All keyword arguments will be used to form the header.
"""
assert isinstance(name, str), name
return CreateStream(parent=None, name=name, group=False, header=header)
[docs]def create_group(name, **header):
assert isinstance(name, str), name
return CreateStream(parent=None, name=name, group=True, header=header)
[docs]def get_logger():
return GetLogger()
[docs]def get_requested():
return GetRequested()
[docs]def get_resource_path(name: str) -> GetResourcePath:
"""Request path to resource from site/resources.
Treat resource as readonly, do NOT modify.
Args:
name: Name of resource, interpreted as path relative to resource directory.
Returns:
GetRequestPath request to yield to marv.
"""
# We validate once we process the request and throw excepetion into node.
return GetResourcePath(name)
[docs]def make_file(name):
assert isinstance(name, str)
return MakeFile(None, name)
[docs]def pull(handle, enumerate=False):
"""Pull next message for handle.
Args:
handle: A :class:`.stream.Handle` or GroupHandle.
enumerate (bool): boolean to indicate whether a tuple ``(idx, msg)``
should be returned, not unlike Python's enumerate().
Returns:
A :class:`Pull` task to be yielded. Marv will send the
corresponding message as soon as it is available. For groups
this message will be a handle to a member of the
group. Members of groups are either streams or groups.
Examples:
Pulling (enumerated) message from stream::
msg = yield marv.pull(stream)
idx, msg = yield marv.pull(stream, enumerate=True)
Pulling stream from group and message from stream::
stream = yield marv.pull(group) # a group of streams
msg = yield marv.pull(stream)
"""
assert isinstance(handle, Handle), handle
return Pull(handle, enumerate)
[docs]def pull_all(*handles):
"""Pull next message of all handles."""
return PullAll(handles)
[docs]def push(msg):
schema = NODE_SCHEMA.get()
if schema is not None and not isinstance(msg, Wrapper):
try:
msg = Wrapper.from_dict(schema, msg)
except KjException:
from pprint import pformat # pylint: disable=import-outside-toplevel
_node = schema.schema.node
err(
f'Schema violation for {_node.displayName} with data:\n'
f'{pformat(msg)}\nschema: {_node.displayName}',
)
raise
return Push(msg)