Source code for commonocean.common.file_reader

from xml.etree import ElementTree

from commonocean.planning.goal import GoalRegion
from commonocean.planning.planning_problem import PlanningProblemSet, PlanningProblem
from commonocean.scenario.scenario import *
from commonocean.scenario.obstacle import *
from commonocean.scenario.traffic_sign import *
from commonocean.scenario.waters import Shallow, Waterway, WatersNetwork

from commonroad.common.util import AngleInterval
from commonroad.geometry.shape import *
from commonocean.scenario.trajectory import Trajectory
from commonocean.scenario.state import GeneralState

# Tunneling from CR-IO #
from commonroad.common.reader.file_reader_xml import read_value_exact_or_interval as read_value_exact_or_interval_CR
from commonroad.common.reader.file_reader_xml import read_time as read_time_CR
from commonroad.common.reader.file_reader_xml import ShapeFactory, RectangleFactory, CircleFactory, PolygonFactory, PointFactory, PointListFactory
########################

__author__ = "Hanna Krasowski, Benedikt Pfleiderer, Fabian Thomas-Barein"
__copyright__ = "TUM Cyber-Physical System Group"
__credits__ = ["ConVeY"]
__version__ = "2023a"
__maintainer__ = "Hanna Krasowski"
__email__ = "commonocean@lists.lrz.de"
__status__ = "released"

def read_value_exact_or_interval(xml_node: ElementTree.Element) \
        -> Union[float, Interval]:
    """
        :param xml_node: XML element
        :return: Interval
    """

    return read_value_exact_or_interval_CR(xml_node)


def read_time(xml_node: ElementTree.Element) -> Union[int, Interval]:
    """
        :param xml_node: XML element
        :return: Interval
    """

    return read_time_CR(xml_node)


[docs]class CommonOceanFileReader: """ Class which reads CommonOcean XML-files. The XML-files are composed of (1) a formal representation of the water network, (2) static and dynamic obstacles, (3) the planning problem of the ego vehicle(s). """ def __init__(self, filename: str): """ :param filename: full path + filename of the CommonOcean XML-file, """ self._filename = filename self._tree = None self._dt = None self._benchmark_id = None self._meta_data = None
[docs] def open(self) -> Tuple[Scenario, PlanningProblemSet]: """ Reads a CommonOcean XML-file. :return: the scenario containing the water network and the obstacles and the planning problem set \ containing the planning problems---initial states and goal regions--for all ego vessels. """ self._read_header() scenario = self._open_scenario() planning_problem_set = self._open_planning_problem_set(scenario._waters_network) return scenario, planning_problem_set
[docs] def open_waters_network(self) -> WatersNetwork: """ Reads the water network of a CommonOcean XML-file. :return: object of class WatersNetwork """ self._read_header() return WatersNetworkFactory.create_from_xml_node(self._tree)
def _open_scenario(self) -> Scenario: """ Reads the water network and obstacles from the CommonOcean XML-file. :return: object of class scenario containing the water network and the obstacles """ scenario = ScenarioFactory.create_from_xml_node(self._tree, self._dt, self._benchmark_id, self._commonocean_version, self._meta_data) return scenario def _open_planning_problem_set(self, waters_network: WatersNetwork) -> PlanningProblemSet: """ Reads all planning problems from the CommonOcean XML-file. :return: object of class PlanningProblemSet containing the planning problems for all ego vessels. """ planning_problem_set = PlanningProblemSetFactory.create_from_xml_node(self._tree, waters_network) return planning_problem_set def _read_header(self): """ Parses the CommonOcean XML-file into element tree; reads the global time step size of the time-discrete scenario and the CommonOcean benchmark ID.""" self._parse_file() self._dt = self._get_dt() self._benchmark_id = self._get_benchmark_id() self._commonocean_version = self._get_commonocean_version() self._meta_data = {'author': self._get_author(), 'affiliation': self._get_affiliation(), 'source': self._get_source(), 'tags': self._get_tags(), 'location': Location()} def _parse_file(self): """ Parses the CommonOcean XML-file into element tree.""" self._tree = ElementTree.parse(self._filename) def _get_dt(self) -> float: """ Reads the time step size of the time-discrete scenario.""" return float(self._tree.getroot().get('timeStepSize')) def _get_benchmark_id(self) -> str: """ Reads the unique CommonOcean benchmark ID of the scenario.""" return self._tree.getroot().get('benchmarkID') def _get_commonocean_version(self) -> str: """ Reads the CommonOcean version of the XML-file.""" return self._tree.getroot().get('commonOceanVersion') def _get_author(self) -> str: """ Reads the author of the scenario.""" return self._tree.getroot().get('author') def _get_affiliation(self) -> str: """ Reads the affiliation of the author of the scenario.""" return self._tree.getroot().get('affiliation') def _get_source(self) -> str: """ Reads the source of the scenario.""" return self._tree.getroot().get('source') def _get_tags(self) -> Set[Tag]: """ Reads the tags of the scenario.""" tags_string = self._tree.getroot().get('tags') if tags_string is None: return None else: splits = tags_string.split() tags = set() for tag in splits: try: tags.add(Tag(tag)) except ValueError: warnings.warn('Scenario tag \'{}\' not valid.'.format(tag), stacklevel=2) return tags
class ScenarioFactory: """ Class to create an object of class Scenario from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element, dt: float, benchmark_id: str, commonocean_version: str, meta_data: dict): """ :param xml_node: XML element :param dt: time step size of the scenario :param benchmark_id: unique CommonOcean benchmark ID :param commonocean_version: CommonOcean version of the file :return: CommonOcean scenario """ meta_data["tags"] = TagsFactory.create_from_xml_node(xml_node) meta_data["location"] = LocationFactory.create_from_xml_node(xml_node) scenario_id = ScenarioID.from_benchmark_id(benchmark_id, commonocean_version) scenario = Scenario(dt, scenario_id, **meta_data) with warnings.catch_warnings(): warnings.filterwarnings("ignore") scenario.add_objects(WatersNetworkFactory.create_from_xml_node(xml_node)) scenario.add_objects(cls._obstacles(xml_node, scenario._waters_network)) scenario.add_objects(cls._shallows(xml_node)) return scenario @classmethod def _obstacles(cls, xml_node: ElementTree.Element, waters_network: WatersNetwork) -> List[Obstacle]: """ Reads all obstacles specified in a CommonOcean XML-file. :param xml_node: XML element :param waters_network: WatersNetwork :return: list of static and dynamic obstacles specified in the CommonOcean XML-file """ obstacles = [] for o in xml_node.findall('staticObstacle'): obstacles.append(StaticObstacleFactory.create_from_xml_node(o, waters_network)) for o in xml_node.findall('dynamicObstacle'): obstacles.append(DynamicObstacleFactory.create_from_xml_node(o, waters_network)) return obstacles @classmethod def _shallows(cls, xml_node: ElementTree.Element) -> List[Shallow]: """ Reads all shallows specified in a CommonOcean XML-file. :param xml_node: XML element :return: list of shallows specified in the CommonOcean XML-file """ shallows = [] for d in xml_node.findall('shallow'): shallows.append(ShallowFactory.create_from_xml_node(d)) return shallows class ShallowFactory: """ Class to create a shallow from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element) -> Shallow: """ :param xml_node: XML element :return: shallow """ shape = ShapeFactory.create_from_xml_node(xml_node.find('shape')) if xml_node.find('depth') is None: return Shallow(shape) else: depth = float(xml_node.find('depth').find('exact').text) return Shallow(shape, depth) class TagsFactory: """ Class to create a tag set from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element) -> Set[Tag]: """ :param xml_node: XML element :return: set of tags """ tags = set() tag_element = xml_node.find('scenarioTags') if tag_element is not None: for elem in Tag: if tag_element.find(elem.value) is not None: tags.add(elem) return tags else: return None class LocationFactory: """ Class to create a location from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element) -> Union[Location, None]: """ :param xml_node: XML element :return: location object """ if xml_node.find('location') is not None: location_element = xml_node.find('location') geo_name_id = int(location_element.find('geoNameId').text) gps_latitude = float(location_element.find('gpsLatitude').text) gps_longitude = float(location_element.find('gpsLongitude').text) if location_element.find('geoTransformation') is not None: geo_transformation = GeoTransformationFactory.create_from_xml_node( location_element.find('geoTransformation')) else: geo_transformation = None if location_element.find('environment') is not None: environment = EnvironmentFactory.create_from_xml_node( location_element.find('environment')) else: environment = None return Location(geo_name_id, gps_latitude, gps_longitude, geo_transformation, environment) else: return None class GeoTransformationFactory: """ Class to create a geotransformation object of an XML element according to the CommonOcean specification.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element) -> GeoTransformation: """ :param xml_node: XML element :return: GeoTransformation object """ geo_reference = xml_node.find('geoReference').text x_translation = float(xml_node.find('xTranslation').text) y_translation = float(xml_node.find('yTranslation').text) z_rotation = float(xml_node.find('zRotation').text) scaling = float(xml_node.find('scaling').text) return GeoTransformation(geo_reference, x_translation, y_translation, z_rotation, scaling) class EnvironmentFactory: """ Class to create a environment object of an XML element according to the CommonOcean specification.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element) -> Environment: """ :param xml_node: XML element :return: Environment object """ time = TimeFactory.create_from_xml_node(xml_node.find('time').text) weather = Weather(xml_node.find('weather').text) sea_state = SeaState(xml_node.find('seaState').text) time_of_day = TimeOfDay(xml_node.find('timeOfDay').text) return Environment(time, time_of_day, weather, sea_state) class TimeFactory: """ Class to create a time object of an XML element.""" @classmethod def create_from_xml_node(cls, time_text: str) -> Time: """ :param time_text: time as string :return: time object """ year = int(time_text[0:4]) month = int(time_text[5:7]) day = int(time_text[8:10]) hours = int(time_text[11:13]) minutes = int(time_text[14:16]) return Time(year, month, day, hours, minutes) class WatersNetworkFactory: @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element) -> WatersNetwork: """ Reads all waters specified in a CommonOcean XML-file. :param xml_node: XML element :return: list of waters """ shape = StaticObstacleFactory.read_shape(xml_node.find('navigationableArea')) waters_network = WatersNetwork(shape.center, shape.length, shape.width, shape.orientation) waters = [] assigned_traffic_signs = [] for waters_node in xml_node.findall('waterway'): waters.append(WaterwayFactory.create_from_xml_node(waters_node)) for traffic_sign_ref in waters_node.findall('trafficSignRef'): if traffic_sign_ref.get("ref") is not None: assigned_traffic_signs.append(int(traffic_sign_ref.get("ref"))) else: raise ValueError('<WatersNetworkFactory/create_from_xml_node>: Unknown type of traffic sign reference: %s.' % traffic_sign_ref.get("ref")) for water in waters: waters_network.add_waters(water) for traffic_sign_node in xml_node.findall('trafficSign'): unassigned = True if int(traffic_sign_node.get('id')) in assigned_traffic_signs: waters_network.add_traffic_sign(TrafficSignFactory.create_from_xml_node(traffic_sign_node), [None, None]) else: waters_network.add_traffic_sign(TrafficSignFactory.create_from_xml_node(traffic_sign_node), []) return waters_network class WaterwayFactory: """ Class to create an object of class Waters from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element) -> Waters: """ :param xml_node: XML element :return: object of class Waterway according to the CommonOcean specification. """ waters_id = int(xml_node.get('id')) left_vertices = cls._vertices(xml_node.find('leftBound')) right_vertices = cls._vertices(xml_node.find('rightBound')) center_vertices = 0.5 * (left_vertices + right_vertices) # center_vertices = right_vertices predecessors = cls._predecessors(xml_node) successors = cls._successors(xml_node) waters_type = cls._waters_type(xml_node) traffic_signs = cls._traffic_signs(xml_node) return Waterway( left_vertices=left_vertices, center_vertices=center_vertices, right_vertices=right_vertices, waters_id=waters_id, waters_type=waters_type, predecessor=predecessors, successor=successors, traffic_signs=traffic_signs) @classmethod def _vertices(cls, xml_node: ElementTree.Element) -> np.ndarray: """ Reads the vertices of the water boundary. :param xml_node: XML element :return: The vertices of the boundary of the Waterway described as a polyline """ return PointListFactory.create_from_xml_node(xml_node) @classmethod def _predecessors(cls, xml_node: ElementTree.Element) -> List[int]: """ Reads all predecessor waters. :param xml_node: XML element :return: list of IDs of all predecessor waters """ predecessors = list() for l in xml_node.findall('predecessor'): predecessors.append(int(l.get('ref'))) return predecessors @classmethod def _successors(cls, xml_node: ElementTree.Element) -> List[int]: """ Reads all successor waters. :param xml_node: XML element :return: list of IDs of all successor waters """ successors = list() for l in xml_node.findall('successor'): successors.append(int(l.get('ref'))) return successors @classmethod def _waters_type(cls, xml_node: ElementTree.Element) -> WatersType: """ Reads the water types of the water. :param xml_node: XML element :return: set of water types for a water """ for f_type in xml_node.findall('watersType'): if WatersType(f_type.text) is not None: waters_type = (WatersType(f_type.text)) else: raise ValueError('<WaterwayFactory/_waters_type>: Unkown type of water: %s.' % f_type.text) return waters_type @classmethod def _traffic_signs(cls, xml_node: ElementTree.Element) -> Union[Set[int], None]: """ Reads the traffic sign references of the water. :param xml_node: XML element :return: set of traffic sign IDs (None if not specified). """ traffic_signs = set() for traffic_sign_ref in xml_node.findall('trafficSignRef'): if traffic_sign_ref.get("ref") is not None: traffic_signs.add(int(traffic_sign_ref.get("ref"))) else: raise ValueError('<WaterwayFactory/_traffic_signs>: Unknown type of traffic sign reference: %s.' % traffic_sign_ref.get("ref")) return traffic_signs class TrafficSignFactory: """ Class to create an object of class TrafficSign from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element) -> TrafficSign: """ :param xml_node: XML element :param first_traffic_sign_occurence: set of first occurences of traffic sign :param waters_network: CommonOcean waters network :return: object of class TrafficSign according to the CommonOcean specification. """ traffic_sign_id = int(xml_node.get('id')) traffic_sign_elements = [] for element in xml_node.findall('trafficSignElement'): traffic_sign_elements.append(TrafficSignElementFactory.create_from_xml_node(element)) if xml_node.find('position') is not None: position = PointFactory.create_from_xml_node(xml_node.find('position').find('point')) if xml_node.get('virtual') is not None: if xml_node.get('virtual').text == "true": virtual = True elif xml_node.get('virtual').text == "false": virtual = False else: raise ValueError() else: virtual = False return TrafficSign(traffic_sign_id=traffic_sign_id, position=position, traffic_sign_elements=traffic_sign_elements, virtual=virtual) class TrafficSignElementFactory: """ Class to create an object of class TrafficSignElement from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element) -> TrafficSignElement: """ :param xml_node: XML element :return: object of class TrafficSignElement according to the CommonOcean specification. """ traffic_sign_element_id = TrafficSignElementID(xml_node.find('trafficSignID').text) additional_values = [] for additional_value in xml_node.findall('additionalValue'): additional_values.append(additional_value.text) return TrafficSignElement(traffic_sign_element_id=traffic_sign_element_id, additional_values=additional_values) class ObstacleFactory(ABC): """ Class to create an Obstacle from an XML element.""" @classmethod def read_type(cls, xml_node: ElementTree.Element) -> ObstacleType: """ :param xml_node: XML element :return: object of class ObstacleType according to the CommonOcean specification. """ obstacle_type = None if xml_node.find('type') is not None: if ObstacleType(xml_node.find('type').text) is not None: obstacle_type = ObstacleType(xml_node.find('type').text) else: raise ValueError('Type of obstacle is unknown. Got type: {}'.format(xml_node.find('type').text)) return obstacle_type @classmethod def read_id(cls, xml_node: ElementTree.Element) -> int: """ :param xml_node: XML element :return: int. """ obstacle_id = int(xml_node.get('id')) return obstacle_id @classmethod def read_depth(cls, xml_node: ElementTree.Element) -> Union[None, float]: """ :param xml_node: XML element :return: int. """ if xml_node.find('depth') is None: return None else: depth_text = xml_node.find('depth').text if depth_text != "inf": depth = float(depth_text) else: depth = np.inf return depth @classmethod def read_initial_state(cls, xml_node: ElementTree.Element) -> GeneralState: """ :param xml_node: XML element :return: object of class CustomState according to the CommonRoad specification. """ initial_state = StateFactory.create_from_xml_node(xml_node) return initial_state @classmethod def read_shape(cls, xml_node: ElementTree.Element) -> Shape: """ :param xml_node: XML element :return: object of class Shape. """ shape = ShapeFactory.create_from_xml_node(xml_node) return shape class StaticObstacleFactory(ObstacleFactory): """ Class to create a Static Obstacle from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element, waters_network: WatersNetwork) -> StaticObstacle: """ :param xml_node: XML element :param xml_node: waters network element :return: object of class StaticObstacle according to the CommonOcean specification. """ obstacle_type = StaticObstacleFactory.read_type(xml_node) obstacle_id = StaticObstacleFactory.read_id(xml_node) initial_state = StaticObstacleFactory.read_initial_state(xml_node.find('initialState')) shape = StaticObstacleFactory.read_shape(xml_node.find('shape')) depth = StaticObstacleFactory.read_depth(xml_node) return StaticObstacle(obstacle_id=obstacle_id, obstacle_type=obstacle_type, obstacle_shape=shape, initial_state=initial_state, depth = depth) class DynamicObstacleFactory(ObstacleFactory): """ Class to create a Dynamic Obstacle from an XML element.""" @staticmethod def find_obstacle_shape_lanelets(initial_state: GeneralState, state_list: List[GeneralState], waters_network: WatersNetwork, obstacle_id: int, shape: Shape) -> Dict[int, Set[int]]: """ Extracts for each shape the corresponding waters it is on :param initial_state: initial CommonOcean state :param state_list: trajectory state list :param waters_network: CommonOcean water network :param obstacle_id: ID of obstacle :param shape: shape of obstacle :return: list of IDs of all predecessor waters """ compl_state_list = [initial_state] + state_list lanelet_ids_per_state = {} for state in compl_state_list: rotated_shape = shape.rotate_translate_local(state.position, state.orientation) lanelet_ids = waters_network.find_water_by_shape(rotated_shape) for l_id in lanelet_ids: waters_network.find_lanelet_by_id(l_id).add_dynamic_obstacle_to_lanelet(obstacle_id=obstacle_id, time_step=state.time_step) lanelet_ids_per_state[state.time_step] = set(lanelet_ids) return lanelet_ids_per_state @staticmethod def find_obstacle_center_lanelets(initial_state: GeneralState, state_list: List[GeneralState], waters_network: WatersNetwork) -> Dict[int, Set[int]]: """ Extracts for each shape the corresponding waters it is on :param initial_state: initial CommonOcean state :param state_list: trajectory state list :param waters_network: CommonOcean water network :return: list of IDs of all predecessor waters """ compl_state_list = [initial_state] + state_list lanelet_ids_per_state = {} for state in compl_state_list: lanelet_ids = waters_network.find_lanelet_by_position([state.position])[0] lanelet_ids_per_state[state.time_step] = set(lanelet_ids) return lanelet_ids_per_state @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element, waters_network: WatersNetwork) -> DynamicObstacle: """ :param xml_node: XML element :param xml_node: waters network element :return: object of class DynamicObstacle according to the CommonOcean specification. """ obstacle_type = DynamicObstacleFactory.read_type(xml_node) obstacle_id = DynamicObstacleFactory.read_id(xml_node) shape = DynamicObstacleFactory.read_shape(xml_node.find('shape')) initial_state = DynamicObstacleFactory.read_initial_state(xml_node.find('initialState')) depth = DynamicObstacleFactory.read_depth(xml_node) if xml_node.find('trajectory') is not None: trajectory = TrajectoryFactory.create_from_xml_node(xml_node.find('trajectory')) shape_waters_assignment = None center_waters_assignment = None prediction = TrajectoryPrediction(trajectory, shape, center_waters_assignment, shape_waters_assignment) elif xml_node.find('occupancySet') is not None: prediction = SetBasedPredictionFactory.create_from_xml_node(xml_node.find('occupancySet')) else: prediction = None return DynamicObstacle(obstacle_id=obstacle_id, obstacle_type=obstacle_type, obstacle_shape=shape, initial_state=initial_state, prediction=prediction, depth=depth) class TrajectoryFactory: """ Class to create a Trajectory from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element) \ -> Trajectory: """ :param xml_node: XML element :return: object of class Trajectory according to the CommonOcean specification. """ state_list = list() for state_node in xml_node.findall('state'): state_list.append(StateFactory.create_from_xml_node(state_node)) if isinstance(state_list[0].time_step, Interval): t0 = min(state_list[0].time_step) else: t0 = state_list[0].time_step return Trajectory(t0, state_list) class SetBasedPredictionFactory: """ Class to create a SetBasedPrediction from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element) -> SetBasedPrediction: """ :param xml_node: XML element :return: object of class SetBasedPrediction according to the CommonOcean specification. """ occupancies = list() for occupancy in xml_node.findall('occupancy'): occupancies.append(OccupancyFactory.create_from_xml_node(occupancy)) if isinstance(occupancies[0].time_step, Interval): t0 = min(occupancies[0].time_step) else: t0 = occupancies[0].time_step return SetBasedPrediction(t0, occupancies) class OccupancyFactory: """ Class to create a Occupancy from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element) -> Occupancy: """ :param xml_node: XML element :return: object of class Occupancy according to the CommonOcean specification. """ shape = ShapeFactory.create_from_xml_node(xml_node.find('shape')) time = read_time(xml_node.find('time')) return Occupancy(time, shape) class PlanningProblemSetFactory: """ Class to create a PlanningProblemSet from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element, waters_network: WatersNetwork) \ -> PlanningProblemSet: """ :param xml_node: XML element :param waters_network: object of class WatersNetwork :return: object of class PlanningProblemSet according to the CommonOcean specification. """ planning_problem_set = PlanningProblemSet() for p in xml_node.findall('planningProblem'): planning_problem_set.add_planning_problem( PlanningProblemFactory.create_from_xml_node(p, waters_network)) return planning_problem_set class PlanningProblemFactory: """ Class to create a PlanningProblem from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element, waters_network: WatersNetwork) \ -> PlanningProblem: """ :param xml_node: XML element :param waters_network: object of class WatersNetwork :return: object of class PlanningProblem according to the CommonOcean specification. """ planning_problem_id = int(xml_node.get('id')) initial_state = cls._add_initial_state(xml_node) goal_region = GoalRegionFactory.create_from_xml_node(xml_node, waters_network) if xml_node.find('waypoint') is not None: waypoints = WaypointFactory.create_from_xml_node(xml_node, waters_network) max_lateral_deviation = float(xml_node.get('maxLateralDeviation')) else: waypoints= None max_lateral_deviation = None return PlanningProblem(planning_problem_id, initial_state, goal_region, waypoints, max_lateral_deviation) @classmethod def _add_initial_state(cls, xml_node: ElementTree.Element) \ -> GeneralState: """ :param xml_node: XML element :return: object of class CustomState according to the CommonRoad specification. """ initial_state = StateFactory.create_from_xml_node(xml_node.find('initialState')) return initial_state class GoalRegionFactory: """ Class to create a GoalRegion from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element, waters_network: WatersNetwork) \ -> GoalRegion: """ :param xml_node: XML element :param waters_network: object of class WatersNetwork :return: object of class GoalRegion according to the CommonOcean specification. """ state_list = list() waters_of_goal_position = defaultdict(list) for idx, goal_state_node in enumerate(xml_node.findall('goalState')): state_list.append(StateFactory.create_from_xml_node(goal_state_node, waters_network)) if goal_state_node.find('position') is not None \ and goal_state_node.find('position').find('water') is not None: for f in goal_state_node.find('position').findall('water'): waters_of_goal_position[idx].append(int(f.get('ref'))) if not waters_of_goal_position: waters_of_goal_position = None return GoalRegion(state_list, waters_of_goal_position) class WaypointFactory: """ Class to create a Waypoint from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element, waters_network: Union[WatersNetwork, None] = None) -> GoalRegion: """ :param xml_node: XML element :param waters_network: object of class WatersNetwork :return: object of class GoalRegion according to the CommonOcean specification. """ waypoint_list = list() for waypoint_node in xml_node.findall('waypoint'): waypoint_list.append(StateFactory.create_from_xml_node(waypoint_node, waters_network)) return GoalRegion(waypoint_list) class StateFactory: """ Class to create a CustomState from an XML element.""" @classmethod def create_from_xml_node(cls, xml_node: ElementTree.Element, waters_network: Union[WatersNetwork, None] = None) \ -> GeneralState: """ :param xml_node: XML element :param waters_network: object of class WatersNetwork :return: object of class CustomState according to the CommonOcean specification. """ state_args = dict() if xml_node.find('position') is not None: position = cls._read_position(xml_node.find('position'), waters_network) state_args['position'] = position if xml_node.find('time') is not None: state_args['time_step'] = read_time(xml_node.find('time')) if xml_node.find('orientation') is not None: orientation = cls._read_orientation(xml_node.find('orientation')) state_args['orientation'] = orientation if xml_node.find('velocity') is not None: speed = read_value_exact_or_interval(xml_node.find('velocity')) state_args['velocity'] = speed if xml_node.find('acceleration') is not None: acceleration = read_value_exact_or_interval(xml_node.find('acceleration')) state_args['acceleration'] = acceleration return GeneralState(**state_args) @classmethod def _read_position(cls, xml_node: ElementTree.Element, waters_network: Union[WatersNetwork, None] = None) \ -> Union[np.ndarray, Shape]: """ :param xml_node: XML element :param waters_network: object of class WatersNetwork :return: position (np array) of the Shape according to the CommonOcean specification. """ if xml_node.find('point') is not None: position = PointFactory.create_from_xml_node(xml_node.find('point')) elif (xml_node.find('rectangle') is not None or xml_node.find('circle') is not None or xml_node.find('polygon') is not None): position = ShapeFactory.create_from_xml_node(xml_node) elif waters_network is not None and xml_node.find('water') is not None: position_list = list() for l in xml_node.findall('water'): waterway = waters_network.find_waterway_by_id(int(l.get('ref'))) polygon = waterway.convert_to_polygon() position_list.append(polygon) position = ShapeGroup(position_list) else: raise Exception() return position @classmethod def _read_orientation(cls, xml_node: ElementTree.Element) -> Union[float, AngleInterval]: """ :param xml_node: XML element :return: float of AngleInterval according to the CommonOcean specification. """ if xml_node.find('exact') is not None: value = float(xml_node.find('exact').text) elif xml_node.find('intervalStart') is not None \ and xml_node.find('intervalEnd') is not None: value = AngleInterval( float(xml_node.find('intervalStart').text), float(xml_node.find('intervalEnd').text)) else: raise Exception() return value