# Copyright 2016 - 2018 Ternaris.
# SPDX-License-Identifier: AGPL-3.0-only
import json
from pathlib import Path
import marv_api as marv
import marv_nodes
from marv_api.types import Section, Widget
from marv_detail import make_map_dict
from .bag import bagmeta
from .cam import ffmpeg, images
from .gnss import gnss_plots
from .trajectory import trajectory
# pylint: disable=redefined-outer-name
[docs]@marv.node(Widget)
@marv.input('dataset', default=marv_nodes.dataset)
@marv.input('bagmeta', default=bagmeta)
def summary_keyval(dataset, bagmeta):
"""Keyval widget summarizing bag metadata.
Useful for detail_summary_widgets.
"""
dataset, bagmeta = yield marv.pull_all(dataset, bagmeta)
yield marv.push(
{
'keyval': {
'items': [
{
'title': 'size',
'formatter': 'filesize',
'list': False,
'cell': {
'uint64': sum(x.size for x in dataset.files),
},
},
{
'title': 'files',
'list': False,
'cell': {
'uint64': len(dataset.files),
},
},
{
'title': 'start time',
'formatter': 'datetime',
'list': False,
'cell': {
'timestamp': bagmeta.start_time,
},
},
{
'title': 'end time',
'formatter': 'datetime',
'list': False,
'cell': {
'timestamp': bagmeta.end_time,
},
},
{
'title': 'duration',
'formatter': 'timedelta',
'list': False,
'cell': {
'timedelta': bagmeta.duration,
},
},
],
},
},
)
[docs]@marv.node(Section)
@marv.input('title', default='Position and Orientation')
@marv.input('plots', default=gnss_plots)
def gnss_section(plots, title):
"""Section displaying GNSS plots."""
# tmps = []
# tmp = yield marv.pull(plots)
# while tmp:
# tmps.append(tmp)
# tmp = yield marv.pull(plots)
# plots = tmps
# TODO: no foreaching right now
plots = [plots]
widgets = []
for plot in plots:
plotfile = yield marv.pull(plot)
if plotfile:
widgets.append({'title': plot.title, 'image': {'src': plotfile.relpath}})
assert len({x['title'] for x in widgets}) == len(widgets)
if widgets:
yield marv.push({'title': title, 'widgets': widgets})
[docs]@marv.node(Widget)
@marv.input('stream', foreach=images) # images is a stream of streams of images
def galleries(stream):
"""Galleries for all images streams.
Used by marv_robotics.detail.images_section.
"""
yield marv.set_header(title=stream.title)
images = []
while img := (yield marv.pull(stream)):
images.append({'src': img.relpath})
if images:
yield marv.push({'title': stream.title, 'gallery': {'images': images}})
[docs]@marv.node(Section)
@marv.input('title', default='Images')
@marv.input('galleries', default=galleries)
def images_section(galleries, title):
"""Section with galleries of images for each images stream."""
tmp = []
while msg := (yield marv.pull(galleries)):
tmp.append(msg)
if not tmp:
raise marv.Abort()
galleries = tmp
galleries = sorted(galleries, key=lambda x: x.title)
widgets = yield marv.pull_all(*galleries)
if widgets:
yield marv.push({'title': title, 'widgets': widgets})
[docs]@marv.node(Section)
@marv.input('title', default='Connections')
@marv.input('bagmeta', default=bagmeta)
@marv.input('dataset', default=marv_nodes.dataset)
def connections_section(bagmeta, dataset, title):
"""Section displaying information about ROS connections."""
dataset, bagmeta = yield marv.pull_all(dataset, bagmeta)
if not bagmeta.topics:
raise marv.Abort()
columns = [
{
'title': 'Topic',
},
{
'title': 'Type',
},
{
'title': 'MD5',
},
{
'title': 'Latching',
},
{
'title': 'Message count',
'align': 'right',
},
]
rows = [
{
'id': idx,
'cells': [
{
'text': con.topic,
},
{
'text': con.datatype,
},
{
'text': con.md5sum,
},
{
'bool': con.latching,
},
{
'uint64': con.msg_count,
},
],
} for idx, con in enumerate(bagmeta.connections)
]
widgets = [{'table': {'columns': columns, 'rows': rows}}]
# TODO: Add text widget explaining what can be seen here: ROS bag
# files store connections. There can be multiple connections for
# one topic with the same or different message types and message
# types with the same name might have different md5s. For
# simplicity connections with the same topic, message type and md5
# are treated as one, within one bag file as well as across bags
# of one set. If one of such connections is latching, the
# aggregated connection will be latching.
yield marv.push({'title': title, 'widgets': widgets})
[docs]@marv.node(Section)
@marv.input('title', default='Trajectory')
@marv.input('geojson', default=trajectory)
@marv.input('minzoom', default=-30)
@marv.input('maxzoom', default=40)
@marv.input('tile_server_protocol', default='')
def trajectory_section(geojson, title, minzoom, maxzoom, tile_server_protocol):
"""Section displaying trajectory on a map.
Args:
title (str): Detail section title.
geojson: Stream with one GeoJson message.
minzoom (int): Minimum zoom level.
maxzoom (int): Maximum zoom level.
tile_server_protocol (str): Set to ``https:`` if you host marv
behind http and prefer the tile requests to be secured.
Yields:
Trajectory section.
Raises:
Abort: If stream is empty.
"""
geojson = yield marv.pull(geojson)
if not geojson:
raise marv.Abort()
suffix = '{z}/{x}/{y}.png' # noqa: FS003
osm_attribution = (
'© <a href="http://openstreetmap.org/copyright">OpenStreetMap</a> contributors'
)
arcgis_attribution = (
'Sources: Esri, DigitalGlobe, GeoEye, Earthstar Geographics, CNES/Airbus DS, '
'USDA, USGS, AeroGRID, IGN, and the GIS User Community'
)
layers = [
{
'title': 'Background',
'tiles': [
{
'title': 'Roadmap',
'url': (
f'{tile_server_protocol}//[abc].osm.ternaris.com'
f'/styles/osm-bright/rendered/{suffix}'
),
'attribution': osm_attribution,
'retina': 3,
'zoom': {
'min': 0,
'max': 20,
},
},
{
'title': 'Satellite',
'url': (
f'{tile_server_protocol}//server.arcgisonline.com'
f'/ArcGIS/rest/services/World_Imagery/MapServer/tile/{suffix}'
),
'attribution': arcgis_attribution,
'zoom': {
'min': 0,
'max': 18,
},
},
],
},
{
'title': 'Trajectory',
'color': (0., 1., 0., 1.),
'geojson': geojson,
},
]
dct = make_map_dict({
'layers': layers,
'zoom': {
'min': minzoom,
'max': maxzoom,
},
})
jsonfile = yield marv.make_file('data.json')
with open(jsonfile.path, 'w', encoding='utf-8') as f:
json.dump(dct, f, sort_keys=True)
yield marv.push(
{
'title': title,
'widgets': [{
'map_partial': f'marv-partial:{jsonfile.relpath}',
}],
},
)
[docs]@marv.node(Section)
@marv.input('title', default='Videos')
@marv.input('videos', default=ffmpeg)
def video_section(videos, title):
"""Section displaying one video player per image stream."""
tmps = []
while tmp := (yield marv.pull(videos)):
tmps.append(tmp)
videos = sorted(tmps, key=lambda x: x.title)
if not videos:
raise marv.Abort()
videofiles = yield marv.pull_all(*videos)
widgets = [
{
'title': video.title,
'video': {
'src': videofile.relpath,
},
} for video, videofile in zip(videos, videofiles) if videofile is not None
]
assert len({x['title'] for x in widgets}) == len(widgets)
if widgets:
yield marv.push({'title': title, 'widgets': widgets})