# Copyright 2016 - 2020 Ternaris.
# SPDX-License-Identifier: AGPL-3.0-only
from inspect import getfullargspec, isfunction, isgeneratorfunction
from pydantic import Field
from .dag import Inputs, Node, Stream
from .utils import NOTSET, exclusive_setitem, popattr
[docs]def node(schema=None, group=None, version=None):
"""Turn function into node.
Args:
schema: capnproto schema describing the output messages format
group (bool): A boolean indicating whether the default stream
of the node is a group, meaning it will be used to
published handles for streams or further groups. In case
of :paramref:`marv.input.foreach` specifications this flag will
default to `True`. This parameter is currently only for
internal usage.
version (int): This parameter currently has no effect.
Returns:
A :class:`Node` instance according to the given
arguments and :func:`input` decorators.
Raises:
TypeError: If not called, double decorated, or not generator.
"""
if hasattr(schema, 'from_bytes_packed'): # capnp schema
# There are two known variations:
# - marv_nodes/types.capnp:Dataset
# - marv_api.tests.types_capnp:Test
schema = schema.schema.node.displayName.replace('.capnp', '_capnp')\
.replace('/', '.')
elif isfunction(schema):
raise TypeError('Decorator must be called @marv.node() before being applied.')
def deco(func):
if hasattr(func, '__marv_node__'):
raise TypeError('Attempted to convert function into node twice.')
if not isgeneratorfunction(func):
raise TypeError(f'{func} needs to be a generator function')
foreach = popattr(func, '__marv_foreach__', None)
inputs = popattr(func, '__marv_inputs__', {})
argspec = getfullargspec(func)
missing = inputs.keys() ^ argspec.args
unsupported = {
x for x in (
'varargs',
'varkw',
'defaults',
'kwonlyargs',
'kwonlydefaults',
'annotations',
) if getattr(argspec, x)
}
if missing:
raise TypeError(f'Missing input declarations: {missing}')
if unsupported:
raise TypeError('Only positional arguments allowed in function signature')
func.__marv_inputs__ = Inputs.subclass(func.__module__, **inputs)
func.__marv_inputs__.__qualname__ = f'{func.__qualname__}.__marv_inputs__'
func.__marv_node__ = Node(
function=f'{func.__module__}.{func.__qualname__}',
inputs=func.__marv_inputs__(),
message_schema=schema,
group=group,
version=version,
foreach=foreach,
)
func.clone = func.__marv_node__.clone
return func
return deco
# NOTE: Strictly speaking not a decorator but related to decoration of node functions
[docs]def select(node, name): # pylint: disable=redefined-outer-name
"""Select specific stream of a node by name.
Args:
node: A node producing a group of streams.
name (str): Name of stream to select.
Returns:
Node outputting selected stream.
"""
return Stream(node=node.__marv_node__, name=name)
# NOTE: Strictly speaking not a decorator but related to decoration of node functions
def getdag(node): # pylint: disable=redefined-outer-name
return node.__marv_node__