Source code for commonocean.common.file_writer

import datetime
import enum
import pathlib
import os
from typing import Union, List, Set
import numpy as np
import decimal
import warnings

from commonocean import SCENARIO_VERSION
from lxml import etree, objectify

from commonroad.common.util import Interval
from commonroad.geometry.shape import Rectangle, Circle, ShapeGroup, Polygon
from commonocean.prediction.prediction import SetBasedPrediction, TrajectoryPrediction
from commonocean.scenario.trajectory import Trajectory
from commonocean.scenario.state import GeneralState

from commonocean.planning.planning_problem import PlanningProblemSet, PlanningProblem
from commonocean.scenario.waters import Waters, WatersType, WatersNetwork, Waterway, Shallow
from commonocean.scenario.obstacle import ObstacleRole, ObstacleType, DynamicObstacle, StaticObstacle, Obstacle, \
    Occupancy, Shape
from commonocean.scenario.scenario import Scenario, Tag, Location, GeoTransformation, Weather, Environment, SeaState, TimeOfDay
from commonocean.scenario.traffic_sign import TrafficSign

# Tunneling from CR-IO #
from commonroad.common.writer.file_writer_xml import float_to_str as float_to_str_CR
from commonroad.common.writer.file_writer_xml import create_exact_node_float as create_exact_node_float_CR
from commonroad.common.writer.file_writer_xml import create_exact_node_int as create_exact_node_int_CR
from commonroad.common.writer.file_writer_xml import create_interval_node_float as create_interval_node_float_CR
from commonroad.common.writer.file_writer_xml import create_interval_node_int as create_interval_node_int_CR
from commonroad.common.writer.file_writer_xml import Point, Pointlist
########################

__author__ = "Hanna Krasowski, Benedikt Pfleiderer, Fabian Thomas-Barein"
__copyright__ = "TUM Cyber-Physical System Group"
__credits__ = ["ConVeY"]
__version__ = "2022a"
__maintainer__ = "Hanna Krasowski"
__email__ = "commonocean@lists.lrz.de"
__status__ = "released"

"""
File Writer for scenarios to commonocean xml-format
"""

class DecimalPrecision:
    """
    Represents the decimal precision of the process
    """

    decimals = 4

precision = DecimalPrecision

def float_to_str(f):
    """
    Convert the given float to a string,
    without resorting to scientific notation
    """
    return float_to_str_CR(f)


def create_exact_node_float(value: Union[int, float]) -> etree.Element:
    """
    creates element node for exact value
    :param value: exact value
    :return: node for exact value
    """
    return create_exact_node_float_CR(value)


def create_exact_node_int(value: Union[int]) -> etree.Element:
    """
    creates element node for exact value
    :param value: exact value
    :return: node for exact value
    """
    return create_exact_node_int_CR(value)


def create_interval_node_float(interval: Interval) -> List[etree.Element]:
    """
    creates ElementTree.Element for an interval
    :param interval:
    :return: list of Element nodes with start and end of interval
    """
    return create_interval_node_float_CR(interval)


def create_interval_node_int(interval: Interval) -> List[etree.Element]:
    """
    creates ElementTree.Element for an interval
    :param interval:
    :return: list of Element nodes with start and end of interval
    """
    return create_interval_node_int_CR(interval)


[docs]class OverwriteExistingFile(enum.Enum): """ Specifies whether an existing file will be overwritten or skipped """ ASK_USER_INPUT = 0 ALWAYS = 1 SKIP = 2
[docs]class CommonOceanFileWriter: def __init__( self, scenario: Scenario, planning_problem_set: PlanningProblemSet, author: str = None, affiliation: str = None, source: str = None, tags: Set[Tag] = None, location: Location = None, decimal_precision: int = 8, ): """ Initialize the FileWriter with a scenario and tags for the xml-header :param scenario: scenario that should be written later :param planning_problem_set: corresponding planning problem to the scenario :param author: author's name :param affiliation: affiliation of the author :param source: source of dataset (d.h. database, handcrafted, etc.) :param tags: list of keywords describing the scenario (e.g. waters type, required maneuver etc., see commonocean.cps.cit.tum.de for full list)) :param decimal_precision: number of decimal places used when writing float values """ assert not (author is None and scenario.author is None) assert not (affiliation is None and scenario.affiliation is None) assert not (source is None and scenario.source is None) assert not (tags is None and scenario.tags is None) self.scenario: Scenario = scenario self.planning_problem_set: PlanningProblemSet = planning_problem_set self._root_node = etree.Element('commonOcean') self.author = author if author is not None else scenario.author self.affiliation = affiliation if affiliation is not None else scenario.affiliation self.source = source if source is not None else scenario.source self.location = location if location is not None else scenario.location self.tags = tags if tags is not None else scenario.tags precision.decimals = decimal_precision @property def root_node(self): return self._root_node @root_node.setter def root_node(self, root_node): warnings.warn( '<CommonOceanFileWriter/root_node> root_node of CommonOceanFileWriter is immutable' ) @property def author(self): return self._author @author.setter def author(self, author): assert isinstance( author, str ), '<CommonOceanFileWriter/author> author must be a string, but has type {}'.format( type(author) ) self._author = author @property def affiliation(self): return self._affiliation @affiliation.setter def affiliation(self, affiliation): assert isinstance( affiliation, str ), '<CommonOceanFileWriter/affiliation> affiliation must be a string, but has type {}'.format( type(affiliation) ) self._affiliation = affiliation @property def source(self): return self._source @source.setter def source(self, source): assert isinstance( source, str ), '<CommonOceanFileWriter/source> source must be a string, but has type {}'.format( type(source) ) self._source = source @property def tags(self): return self._tags @tags.setter def tags(self, tags): for tag in tags: assert isinstance(tag, Tag), '<CommonOceanFileWriter/tags> tag must ' \ 'be a enum of type Tag, but has type {}'.format(type(tag)) self._tags = tags def _write_header(self): self._root_node.set('timeStepSize', str(self.scenario.dt)) self._root_node.set('commonOceanVersion', SCENARIO_VERSION) self._root_node.set('author', self.author) self._root_node.set('affiliation', self.affiliation) self._root_node.set('source', self.source) try: if self.scenario.scenario_id: self._root_node.set('benchmarkID', str(self.scenario.scenario_id)) except: self._root_node.set('benchmarkID', '-1') print('Warning: No scenario_id set.') self._root_node.set('date', datetime.datetime.today().strftime('%Y-%m-%d')) def _add_all_objects_from_scenario(self): if self.location is not None: self._root_node.append(LocationXMLNode.create_node(self.location)) else: self._root_node.append(LocationXMLNode.create_node(Location())) self._root_node.append(TagXMLNode.create_node(self.tags)) self._root_node.append(WatersNetworkXMLNode.create_node(self.scenario._waters_network)) for w in self.scenario.waterways: self._root_node.append(WaterwaysXMLNode.create_node(w)) for sign in self.scenario._waters_network.traffic_signs: self._root_node.append(TrafficSignXMLNode.create_node(sign)) for o in self.scenario.obstacles: self._root_node.append(ObstacleXMLNode.create_node(o)) for s in self.scenario.shallows: self._root_node.append(ShallowXMLNode.create_node(s)) def _add_all_planning_problems_from_planning_problem_set(self): for ( planning_problem ) in self.planning_problem_set.planning_problem_dict.values(): self._root_node.append(PlanningProblemXMLNode.create_node(planning_problem)) def _dump(self): rough_string = etree.tostring( self._root_node, pretty_print=True, encoding='UTF-8' ) rough_string = rough_string return rough_string
[docs] def write_to_file( self, filename: Union[str, None] = None, overwrite_existing_file: OverwriteExistingFile = OverwriteExistingFile.ASK_USER_INPUT, check_validity: bool = False ): """ Write a scenario including planning-problem. If file already exists, it will be overwritten of skipped :param filename: filename of the xml output file. If 'None', the Benchmark ID is taken :param overwrite_existing_file: Specify whether an already existing file should be overwritten or skipped :param check_validity: check xml file against .xsd definition :return: """ if filename is None: filename = str(self.scenario.scenario_id) if pathlib.Path(filename).is_file(): if overwrite_existing_file is OverwriteExistingFile.ASK_USER_INPUT: overwrite = input( 'File {} already exists, replace old file (or else skip)? (y/n)'.format( filename ) ) elif overwrite_existing_file is OverwriteExistingFile.SKIP: overwrite = 'n' else: overwrite = 'y' if overwrite is 'n': print('Writing of file {} skipped'.format(filename)) return else: print('Replace file {}'.format(filename)) self._write_header() self._add_all_objects_from_scenario() self._add_all_planning_problems_from_planning_problem_set() if check_validity: self.check_validity_of_commonocean_file(self._dump()) tree = etree.ElementTree(self._root_node) tree.write(filename, pretty_print=True, xml_declaration=True, encoding="utf-8")
[docs] def write_scenario_to_file( self, filename: Union[str, None] = None, overwrite_existing_file: OverwriteExistingFile = OverwriteExistingFile.ASK_USER_INPUT ): """ Write a scenario without planning-problem. If file already exists, it will be overwritten of skipped. :param filename: filename of the xml output file. If 'None', the Benchmark ID is taken :param OverwriteExistingFile: Specify whether an already existing file should be overwritten or skipped :return: None """ if filename is None: filename = str(self.scenario.scenario_id) if pathlib.Path(filename).is_file(): if overwrite_existing_file is OverwriteExistingFile.ASK_USER_INPUT: overwrite = input( 'File {} already exists, replace old file (or else skip)? (y/n)'.format( filename ) ) elif overwrite_existing_file is OverwriteExistingFile.SKIP: overwrite = 'n' else: overwrite = 'y' if overwrite is 'n': print( 'Writing skipped for file, since it already exists {}'.format( filename ) ) return else: print('Replace file {}'.format(filename)) self._write_header() self._add_all_objects_from_scenario() tree = etree.ElementTree(self._root_node) tree.write(filename, pretty_print=True, xml_declaration=True, encoding="utf-8")
[docs] @staticmethod def check_validity_of_commonocean_file(commonocean_str: str): """Check the validity of a generated xml_string in terms of commonocean with an existing XSD schema. Throw an error if it is not valid. Args: commonocean_str: XML formatted string which should be checked. """ with open( os.path.dirname(os.path.abspath(__file__)) + '/XML_commonOcean_XSD.xsd', 'rb', ) as schema_file: schema = etree.XMLSchema(etree.parse(schema_file)) parser = objectify.makeparser(schema=schema, encoding='utf-8') try: etree.fromstring(commonocean_str, parser) except etree.XMLSyntaxError as error: raise Exception( 'Could not produce valid CommmonOcean file! Error: {}'.format(error.msg) )
class LocationXMLNode: """ Class to create a XML element from a Location.""" @classmethod def create_node(cls, location: Location) -> etree.Element: """ Create XML-Node for a location :param location: location object :return: node """ location_node = etree.Element('location') geo_name_id_node = etree.Element("geoNameId") geo_name_id_node.text = str(location.geo_name_id) location_node.append(geo_name_id_node) gps_latitude_node = etree.Element("gpsLatitude") gps_latitude_node.text = str(location.gps_latitude) location_node.append(gps_latitude_node) gps_longitude_node = etree.Element("gpsLongitude") gps_longitude_node.text = str(location.gps_longitude) location_node.append(gps_longitude_node) if location.geo_transformation is not None: location_node.append(GeoTransformationXMLNode.create_node(location.geo_transformation)) if location.environment is not None: location_node.append(EnvironmentXMLNode.create_node(location.environment)) return location_node class GeoTransformationXMLNode: """ Class to create a XML element from a GeoTransformation.""" @classmethod def create_node(cls, geo_transformation: GeoTransformation) -> etree.Element: """ Create XML-Node for a location :param geo_transformation: GeoTransformation object :return: node """ geotransform_node = etree.Element('geoTransformation') geo_reference_node = etree.Element("geoReference") geo_reference_node.text = geo_transformation.geo_reference geotransform_node.append(geo_reference_node) additional_transformation_node = etree.Element('additionalTransformation') x_translation_node = etree.Element("xTranslation") x_translation_node.text = str(geo_transformation.x_translation) additional_transformation_node.append(x_translation_node) y_translation_node = etree.Element("yTranslation") y_translation_node.text = str(geo_transformation.y_translation) additional_transformation_node.append(y_translation_node) z_rotation_node = etree.Element("zRotation") z_rotation_node.text = str(geo_transformation.z_rotation) additional_transformation_node.append(z_rotation_node) scaling_node = etree.Element("scaling") scaling_node.text = str(geo_transformation.scaling) additional_transformation_node.append(scaling_node) geotransform_node.append(additional_transformation_node) return geotransform_node class EnvironmentXMLNode: """ Class to create a XML element from a Environment.""" @classmethod def create_node(cls, environment: Environment) -> etree.Element: """ Create XML-Node for a environment :param environment: Environment object :return: node """ environment_node = etree.Element('environment') if environment.time_of_day.value is not TimeOfDay.UNKNOWN: time_node = etree.Element('time') if environment.time.month < 10: time_node.text = str(environment.time.year) + "-0" + str(environment.time.month) else: time_node.text = str(environment.time.year) + "-" + str(environment.time.month) if environment.time.day < 10: time_node.text = time_node.text + "-0" + str(environment.time.day) else: time_node.text = time_node.text + "-" + str(environment.time.day) if environment.time.hours < 10: time_node.text = time_node.text + "-0" + str(environment.time.hours) else: time_node.text = time_node.text + "-0" + str(environment.time.hours) if environment.time.minutes < 10: time_node.text = time_node.text + ":0" + str(environment.time.minutes) else: time_node.text = time_node.text + ":" + str(environment.time.minutes) environment_node.append(time_node) time_of_day_node = etree.Element('timeOfDay') time_of_day_node.text = environment.time_of_day.value environment_node.append(time_of_day_node) if environment.weather.value is not Weather.UNKNOWN: weather_node = etree.Element('weather') weather_node.text = environment.weather.value environment_node.append(weather_node) if environment.seastate.value is not SeaState.UNKNOWN: underground_node = etree.Element('seaState') underground_node.text = environment.seastate.value environment_node.append(underground_node) return environment_node class TagXMLNode: """ Class to create a XML element from a Tag.""" @classmethod def create_node(cls, tags: Set[Tag]) -> etree.Element: """ Create XML-Node for a tag element :param tags: list of tags of the scenario :return: node """ tags_node = etree.Element('scenarioTags') for tag in tags: tags_node.append(etree.Element(tag.value)) return tags_node class WatersNetworkXMLNode: """ Class to create a XML element from a WatersNetwork.""" @classmethod def create_node(cls, watersnetwork: WatersNetwork) -> etree.Element: """ Create XML-Node for a waters network element :param watersnetwork: list of of the scenario :return: node """ watersnetwork_node = etree.Element('navigationableArea') watersnetwork_node.extend(ShapeXMLNode.create_node(watersnetwork.navigationable_area)) return watersnetwork_node class WaterwaysXMLNode: """ Class to create a XML element from a Waterway.""" @classmethod def create_node(cls, waters: Waterway) -> etree.Element: """ Create XML-Node for a Water :param waters: water for creating a node :return: node """ waters_node = etree.Element('waterway') waters_node.set('id', str(waters.waters_id)) left_boundary = etree.Element('leftBound') Pointlist.create_from_numpy_array(waters.left_vertices).add_points_to_node( left_boundary ) waters_node.append(left_boundary) right_boundary = etree.Element('rightBound') Pointlist.create_from_numpy_array(waters.right_vertices).add_points_to_node( right_boundary ) waters_node.append(right_boundary) waters_type_node = etree.Element('watersType') waters_type_node.text = str(waters.waters_type.value) waters_node.append(waters_type_node) for l in waters.predecessor: predecessor = etree.Element('predecessor') predecessor.set('ref', str(l)) waters_node.append(predecessor) for l in waters.successor: successor = etree.Element('successor') successor.set('ref', str(l)) waters_node.append(successor) if waters.traffic_signs: for traffic_sign in waters.traffic_signs: traffic_sign_node = TrafficSignXMLNode.create_ref_node(traffic_sign) waters_node.append(traffic_sign_node) return waters_node class ShallowXMLNode: """ Class to create a XML element from a Shallow.""" @classmethod def create_node(cls, shallow: Shallow) -> etree.Element: """ Create XML-Node for a Shallow :param shallow: shallow for creating a node :return: node """ shallow_node = etree.Element('shallow') shallow_node.set('id', str(shallow.waters_id)) waters_type_node = etree.Element('watersType') waters_type_node.text = str(shallow.waters_type.value) shallow_node.append(waters_type_node) shape_node = etree.Element('shape') shape_node.extend(ShapeXMLNode.create_node(shallow.shape)) shallow_node.append(shape_node) depth_node = etree.Element('depth') depth_node.append(create_exact_node_float(shallow.depth)) shallow_node.append(depth_node) return shallow_node class ObstacleXMLNode: """ Class to create a XML element from an Obstacle.""" @classmethod def create_node(cls, obstacle: Union[Obstacle, DynamicObstacle, StaticObstacle]) -> etree.Element: """ Create XML-Node for an Obstacle :param obstacle: Obstacle for creating a node :return: """ if type(obstacle) == DynamicObstacle: return DynamicObstacleXMLNode.create_node(obstacle) elif type(obstacle) == StaticObstacle: return StaticObstacleXMLNode.create_node(obstacle) else: raise Exception() @classmethod def create_obstacle_node_header( cls, obstacle_id: int, obstacle_role: ObstacleRole, obstacle_type: ObstacleType, depth: float ): """ Create the header of XML-Node for an Obstacle :param obstacle_id: id of the obstacle :param obstacle_role: role of the obstacle :param obstacle_type: type of the obstacle :return: node """ obstacle_node = etree.Element(obstacle_role.value+'Obstacle') obstacle_node.set('id', str(obstacle_id)) type_node = etree.Element('type') type_node.text = obstacle_type.value obstacle_node.append(type_node) depth_node = etree.Element('depth') depth_node.text = str(depth) obstacle_node.append(depth_node) return obstacle_node class StaticObstacleXMLNode: """ Class to create a XML element from a StaticObstacle.""" @classmethod def create_node(cls, static_obstacle: StaticObstacle) -> etree.Element: """ Create XML-Node for a StaticObstacle :param static_obstacle: static_obstacle for creating a node :return: node """ node = ObstacleXMLNode.create_obstacle_node_header( static_obstacle.obstacle_id, static_obstacle.obstacle_role, static_obstacle.obstacle_type, static_obstacle.depth ) shape_node = etree.Element('shape') shape_node.extend(ShapeXMLNode.create_node(static_obstacle.obstacle_shape)) node.append(shape_node) initial_state_node = etree.Element('initialState') StateXMLNode.create_state_node( static_obstacle.initial_state, initial_state_node, time_step=static_obstacle.initial_state.time_step, ) node.append(initial_state_node) return node class DynamicObstacleXMLNode: """ Class to create a XML element from a DynamicObstacle.""" @classmethod def create_node(cls, dynamic_obstacle: DynamicObstacle) -> etree.Element: """ Create XML-Node for a DynamicObstacle :param dynamic_obstacle: dynamic_obstacle for creating a node :return: node """ obstacle_node = ObstacleXMLNode.create_obstacle_node_header( dynamic_obstacle.obstacle_id, dynamic_obstacle.obstacle_role, dynamic_obstacle.obstacle_type, dynamic_obstacle.depth ) shape_node = etree.Element('shape') shape_node.extend( ShapeXMLNode.create_node( dynamic_obstacle.obstacle_shape, dynamic_obstacle_shape=True ) ) obstacle_node.append(shape_node) initial_state_node = etree.Element('initialState') StateXMLNode.create_state_node( dynamic_obstacle.initial_state, initial_state_node, time_step=dynamic_obstacle.initial_state.time_step, ) obstacle_node.append(initial_state_node) if isinstance(dynamic_obstacle.prediction, SetBasedPrediction): obstacle_node.append( cls._create_occupancy_node(dynamic_obstacle.prediction.occupancy_set) ) elif isinstance(dynamic_obstacle.prediction, TrajectoryPrediction): obstacle_node.append( cls._create_trajectory_node(dynamic_obstacle.prediction.trajectory) ) return obstacle_node @classmethod def _create_trajectory_node(cls, trajectory: Trajectory) -> etree.Element: """ Create XML-Node for a Trajectory :param trajectory: trajectory for creating a node :return: node """ traj_node = etree.Element('trajectory') for state in trajectory.state_list: state_node = etree.Element('state') traj_node.append( StateXMLNode.create_state_node(state, state_node, state.time_step) ) return traj_node @classmethod def _create_occupancy_node(cls, occupancy_set: List[Occupancy]) -> etree.Element: """ Create XML-Node for an occupancy_set :param occupancy_set: occupancy_set for creating a node :return: node """ occupancy_set_node = etree.Element('occupancySet') for occupancy in occupancy_set: occupancy_set_node.append(OccupancyXMLNode.create_node(occupancy)) return occupancy_set_node class OccupancyXMLNode: """ Class to create a XML element from a Occupancy.""" @classmethod def create_node(cls, occupancy: Occupancy) -> etree.Element: """ Create XML-Node for an Occupancy :param occupancy: occupancy for creating a node :return: node """ occupancy_node = etree.Element('occupancy') shape_node = etree.Element('shape') shape_node.extend(ShapeXMLNode.create_node(occupancy.shape)) occupancy_node.append(shape_node) time_node = etree.Element('time') time = occupancy.time_step if isinstance(occupancy.time_step, Interval): time_node.extend(create_interval_node_int(time)) else: time_node.append(create_exact_node_int(time)) occupancy_node.append(time_node) return occupancy_node class ShapeXMLNode: """ Class to create a XML element from a Shape.""" @classmethod def create_node(cls, shape, dynamic_obstacle_shape=False) -> List[etree.Element]: """ Create XML-Node for a shape :param shape: shape for creating a node :param dynamic_obstacle_shape: specify whether the shape belongs to an dynamic obstacle or not :return: node """ if type(shape) == ShapeGroup: shape_node_list = [] for s in shape.shapes: shape_node_list.append( cls._create_single_element(s, dynamic_obstacle_shape) ) else: shape_node = cls._create_single_element(shape, dynamic_obstacle_shape) shape_node_list = [shape_node] return shape_node_list @classmethod def _create_single_element( cls, shape: Shape, dynamic_obstacle_shape: bool ) -> etree.Element: """ Create XML-Node for a single shape element :param shape: shape for creating a node :param dynamic_obstacle_shape: specify whether the shape belongs to an dynamic obstacle or not :return: node """ if type(shape) == Rectangle: node = RectangleXMLNode.create_rectangle_node(shape, dynamic_obstacle_shape) elif type(shape) == Circle: node = CircleXMLNode.create_circle_node(shape, dynamic_obstacle_shape) elif type(shape) == Polygon: node = PolygonXMLNode.create_polygon_node(shape, dynamic_obstacle_shape) else: raise TypeError( '<ShapeXMLNode/_create_single_element> Expected type Polygon, Circle or Rectangle but got %s' % (type(shape)) ) return node class RectangleXMLNode: """ Class to create a XML element from a Rectangle.""" @classmethod def create_rectangle_node( cls, rectangle: Rectangle, dynamic_obstacle_shape=False ) -> etree.Element: """ Create XML-Node for a rectangle :param rectangle: rectangle for creating a node :param dynamic_obstacle_shape: specify whether the shape belongs to an dynamic obstacle or not :return: node """ rectangle_node = etree.Element('rectangle') length_node = etree.Element('length') length_node.text = str(rectangle.length) rectangle_node.append(length_node) width_node = etree.Element('width') width_node.text = str(rectangle.width) rectangle_node.append(width_node) if not dynamic_obstacle_shape: orientation_node = etree.Element('orientation') orientation_node.text = str(np.float64(rectangle.orientation)) rectangle_node.append(orientation_node) center_node = etree.Element('center') x_node = etree.Element('x') x_node.text = str(np.float64(rectangle.center[0])) center_node.append(x_node) y_node = etree.Element('y') y_node.text = str(np.float64(rectangle.center[1])) center_node.append(y_node) rectangle_node.append(center_node) return rectangle_node class CircleXMLNode: """ Class to create a XML element from a Circle.""" @classmethod def create_circle_node( cls, circle: Circle, dynamic_obstacle_shape=False ) -> etree.Element: """ Create XML-Node for a circle :param circle: circle for creating a node :param dynamic_obstacle_shape: specify whether the shape belongs to an dynamic obstacle or not :return: node """ circle_node = etree.Element('circle') radius_node = etree.Element('radius') radius_node.text = str(np.float64(circle.radius)) circle_node.append(radius_node) if not dynamic_obstacle_shape: center_node = etree.Element('center') x_node = etree.Element('x') x_node.text = str(np.float64(circle.center[0])) center_node.append(x_node) y_node = etree.Element('y') y_node.text = str(np.float64(circle.center[1])) center_node.append(y_node) circle_node.append(center_node) return circle_node class PolygonXMLNode: """ Class to create a XML element from a Polygon.""" @classmethod def create_polygon_node( cls, polygon: Polygon, dynamic_obstacle_shape: bool = False ) -> etree.Element: """ Create XML-Node for a polygon :param polygon: polygon for creating a node :param dynamic_obstacle_shape: specify whether the shape belongs to an dynamic obstacle or not :return: node """ polygon_node = etree.Element('polygon') for p in polygon.vertices: polygon_node.append(Point(p[0], p[1]).create_node()) return polygon_node class StateXMLNode: """ Class to create a XML element from a CustomState.""" @classmethod def create_waypoint_node(cls, state: GeneralState, waypoint_water_ids: List[int]) -> etree.Element: """ Create XML-Node for a state :param state: CommonOcean state :param waypoint_water_ids: contains a list of water ids if a waypoint state's position is specified water id(s) :return: node """ state_node = etree.Element('waypoint') if hasattr(state, 'position') or len(waypoint_water_ids) > 0: position = etree.Element('position') position = cls._write_goal_position(position, state.position, waypoint_water_ids) state_node.append(position) if hasattr(state, 'orientation'): orientation = etree.Element('orientation') orientation = cls._write_value_exact_or_interval( orientation, state.orientation ) state_node.append(orientation) if hasattr(state, 'time_step'): time = etree.Element('time') time = cls._write_goal_time_exact_or_interval(time, state.time_step) state_node.append(time) if hasattr(state, 'velocity'): velocity = etree.Element('velocity') velocity = cls._write_value_exact_or_interval(velocity, state.velocity) state_node.append(velocity) if hasattr(state, 'acceleration'): acceleration = etree.Element('acceleration') acceleration = cls._write_value_exact_or_interval( acceleration, state.acceleration ) state_node.append(acceleration) if hasattr(state, 'yaw_rate'): yaw_rate = etree.Element('yawRate') yaw_rate = cls._write_value_exact_or_interval(yaw_rate, state.yaw_rate) state_node.append(yaw_rate) return state_node @classmethod def create_goal_state_node(cls, state: GeneralState, goal_waters_ids: List[int]) -> etree.Element: """ Create XML-Node for a state :param state: CommonOcean state :param goal_waters_ids: contains a list of water ids if a goal state's position is specified water id(s) :return: node """ state_node = etree.Element('goalState') if hasattr(state, 'position') or len(goal_waters_ids) > 0: position = etree.Element('position') position = cls._write_goal_position(position, state.position, goal_waters_ids) state_node.append(position) if hasattr(state, 'orientation'): orientation = etree.Element('orientation') orientation = cls._write_value_exact_or_interval( orientation, state.orientation ) state_node.append(orientation) if hasattr(state, 'time_step'): time = etree.Element('time') time = cls._write_goal_time_exact_or_interval(time, state.time_step) state_node.append(time) if hasattr(state, 'velocity'): velocity = etree.Element('velocity') velocity = cls._write_value_exact_or_interval(velocity, state.velocity) state_node.append(velocity) if hasattr(state, 'acceleration'): acceleration = etree.Element('acceleration') acceleration = cls._write_value_exact_or_interval( acceleration, state.acceleration ) state_node.append(acceleration) if hasattr(state, 'yaw_rate'): yaw_rate = etree.Element('yawRate') yaw_rate = cls._write_value_exact_or_interval(yaw_rate, state.yaw_rate) state_node.append(yaw_rate) return state_node @classmethod def _write_goal_position( cls, node: etree.Element, position: Union[Shape, int, list], goal_waters_ids: List[int], ) -> etree.Element: """ Create XML-Node for a goal position :param node: node of the GoalState :param position: either (list of) shape elements or water ids specifying the goal position :return: node """ if len(goal_waters_ids) > 0: for id in goal_waters_ids: waters = etree.Element('water') waters.set('ref', str(id)) node.append(waters) elif isinstance(position, int): waters = etree.Element('water') waters.set('ref', str(position)) node.append(waters) elif( isinstance(position, Rectangle) or isinstance(position, Circle) or isinstance(position, Polygon) ): node.extend(ShapeXMLNode.create_node(position)) elif isinstance(position, ShapeGroup): node.extend(ShapeXMLNode.create_node(position)) elif type(position) is list: raise ValueError('A goal state cannot contain multiple items. Use a list of goal states instead.') else: raise ValueError('Case should not occur, position={}, goal_waters_ids={}.'.format(position, goal_waters_ids)) return node @classmethod def _write_goal_time_exact_or_interval( cls, node: etree.Element, time_step: Union[Interval, float, int] ) -> etree.Element: """ Create XML-Node for a goal time :param node: node of the GoalState :param time_step: contains time interval or time_step of goal time :return: node """ if isinstance(time_step, int): node.append(create_exact_node_int(time_step)) elif isinstance(time_step, Interval): node.extend( create_interval_node_int(Interval(time_step.start, time_step.end)) ) else: raise Exception() return node @classmethod def _write_value_exact_or_interval( cls, node: etree.Element, var: Union[Interval, float, int] ): """ Create XML-Node for a goal value :param node: node of the GoalState :param var: contains interval or exact_value of goal state value :return: node """ if isinstance(var, (float, int)): node.append(create_exact_node_float(var)) elif isinstance(var, Interval): node.extend(create_interval_node_float(var)) else: raise Exception() return node @classmethod def create_state_node( cls, state: GeneralState, state_node: etree.Element, time_step: int ) -> etree.Element: """ Create XML-Node for a state :param state: value of the state :param state_node: node of the overlying state :return: node """ if hasattr(state, 'position'): position = etree.Element('position') if type(state.position) in [np.ndarray, list]: position.append( Point.create_from_numpy_array(state.position).create_node() ) state_node.append(position) elif isinstance(state.position, Shape): position.extend(ShapeXMLNode.create_node(state.position)) state_node.append(position) else: raise Exception() if hasattr(state, 'orientation'): orientation = etree.Element('orientation') orientation = cls._write_value_exact_or_interval( orientation, state.orientation ) state_node.append(orientation) time_node = etree.Element('time') time_node.append(create_exact_node_int(time_step)) state_node.append(time_node) if hasattr(state, 'velocity'): velocity = etree.Element('velocity') velocity = cls._write_value_exact_or_interval(velocity, state.velocity) state_node.append(velocity) if hasattr(state, 'acceleration'): acceleration = etree.Element('acceleration') acceleration = cls._write_value_exact_or_interval( acceleration, state.acceleration ) state_node.append(acceleration) if hasattr(state, 'yaw_rate'): yaw_rate = etree.Element('yawRate') yaw_rate = cls._write_value_exact_or_interval(yaw_rate, state.yaw_rate) state_node.append(yaw_rate) return state_node class PlanningProblemXMLNode: """ Class to create a XML element from a PlanningProblem.""" @classmethod def create_node(cls, planning_problem: PlanningProblem) -> etree.Element: """ Create a xml-Node for a single planning_problem :param planning_problem: planning problem for creating the node :return: """ planning_problem_node = etree.Element('planningProblem') planning_problem_node.set('id', str(planning_problem.planning_problem_id)) if planning_problem.max_lateral_deviation is not None: planning_problem_node.set('maxLateralDeviation', str(planning_problem.max_lateral_deviation)) initial_state_node = etree.Element('initialState') planning_problem_node.append( StateXMLNode.create_state_node( planning_problem.initial_state, initial_state_node, planning_problem.initial_state.time_step, ) ) for state_id, goal_state in enumerate(planning_problem.goal.state_list): if ( planning_problem.goal.waters_of_goal_position is not None and state_id in planning_problem.goal.waters_of_goal_position ): goal_waters_ids: List[ int ] = planning_problem.goal.waters_of_goal_position[state_id] else: goal_waters_ids = [] planning_problem_node.append( StateXMLNode.create_goal_state_node(goal_state, goal_waters_ids) ) if planning_problem.waypoints is not None: for waypoint in planning_problem.waypoints: state = waypoint.state_list[0] planning_problem_node.append( StateXMLNode.create_waypoint_node(state, []) ) return planning_problem_node class TrafficSignXMLNode: """ Class to create a XML element from a TrafficSign.""" @classmethod def create_node(cls, traffic_sign: TrafficSign) -> etree.Element: """ Create XML-Node for a state :param traffic_sign: TrafficSign object :return: node """ traffic_sign_node = etree.Element('trafficSign') traffic_sign_node.set('id', str(traffic_sign.traffic_sign_id)) for element in traffic_sign.traffic_sign_elements: element_node = etree.Element('trafficSignElement') sign_id_node = etree.Element('trafficSignID') sign_id_node.text = str(element.traffic_sign_element_id.value) if str(element.traffic_sign_element_id.value) == '': warnings.warn('<FileWriter>: Invalid traffic sign ID!') element_node.append(sign_id_node) for value in element.additional_values: value_node = etree.Element('additionalValue') value_node.text = str(value) element_node.append(value_node) traffic_sign_node.append(element_node) if traffic_sign.position is not None: position_node = etree.Element('position') position_node.append(Point(traffic_sign.position[0], traffic_sign.position[1]).create_node()) traffic_sign_node.append(position_node) if traffic_sign.virtual is not None: virtual_node = etree.Element('virtual') virtual_node.text = str(traffic_sign.virtual).lower() traffic_sign_node.append(virtual_node) return traffic_sign_node @classmethod def create_ref_node(cls, traffic_sign_ref) -> etree.Element: traffic_sign_ref_node = etree.Element('trafficSignRef') traffic_sign_ref_node.set('ref', str(traffic_sign_ref)) return traffic_sign_ref_node