Source code for commonocean.scenario.waters

import enum
from typing import *
from uuid import RESERVED_FUTURE
import numpy as np
import abc

from commonroad.geometry.shape import Circle, Rectangle, Shape, Polygon, ShapeGroup
from commonroad.geometry.transform import translation_rotation_matrix
from commonroad.common.validity import *

from commonocean.scenario.obstacle import Obstacle
from commonocean.scenario.traffic_sign import TrafficSign


__author__ = "Hanna Krasowski, Benedikt Pfleiderer, Fabian Thomas-Barein"
__copyright__ = "TUM Cyber-Physical System Group"
__credits__ = ["ConVeY"]
__version__ = "2022a"
__maintainer__ = "Hanna Krasowski"
__email__ = "commonocean@lists.lrz.de"
__status__ = "released"


[docs]class WatersType(enum.Enum): """ Enum describing different types of waters """ UNKNOWN = 'unknown' FAIRWAY = 'fairway' SHALLOW = 'shallow' TRAFFICSEPARATIONZONE = 'trafficseparationzone'
[docs]class WatersUser(enum.Enum): """ Enum describing different types of water users """ VESSEL = 'vessel' MOTORVESSEL = 'motorvessel' SAILINGVESSEL = 'sailingvessel' CARGOSHIP = 'cargoship' SWIMMER = 'swimmer'
[docs]class Waters(metaclass=abc.ABCMeta): """ Abstract class for waters in CommonOcean """ @abc.abstractproperty def waters_type(self): pass @abc.abstractproperty def waters_id(self): pass
[docs] @abc.abstractmethod def translate_rotate(self, translation: np.ndarray, angle: float): pass
[docs] @abc.abstractmethod def contains_points(self, point: np.ndarray): pass
[docs] @abc.abstractmethod def get_obstacles(self): pass
class Waterway(Waters): """ Class which describes a Waters entity according to the CommonOcean specification. """ def __init__(self, left_vertices: np.ndarray, center_vertices: np.ndarray, right_vertices: np.ndarray, waters_id: int, waters_type: WatersType, predecessor=None, successor=None, traffic_signs=None ): """ Constructor of a Waters object :param left_vertices: The vertices of the left boundary of the Waters described as a polyline [[x0,x1,...,xn],[y0,y1,...,yn]] :param center_vertices: The vertices of the center line of the Waters described as a polyline [[x0,x1,...,xn],[y0,y1,...,yn]] :param right_vertices: The vertices of the right boundary of the Waters described as a polyline [[x0,x1,...,xn],[y0,y1,...,yn]] :param waters_id: The unique id (natural number) of the water :param waters_type: Class of Water :param predecessor: The list of predecessor waters (None if not existing) :param successor: The list of successor waters (None if not existing) :param traffic_signs: Traffic signs to be applied """ self._left_vertices = None self._right_vertices = None self._center_vertices = None self._waters_id = None self.waters_id = waters_id self.left_vertices = left_vertices self.right_vertices = right_vertices self.center_vertices = center_vertices assert len(left_vertices[0]) == len(center_vertices[0]) == len( right_vertices[0]), '<Waters/init>: Provided polylines do not share the same length! {}/{}/{}'.format( len(left_vertices[0]), len(center_vertices[0]), len(right_vertices[0])) assert (waters_type == WatersType.FAIRWAY or waters_type == WatersType.TRAFFICSEPARATIONZONE or WatersType.UNKNOWN), \ '<Waters/init>: Waterway must be of type traffic separation zone, fairway or unknown. Got type {}'.format(waters_type) self._waters_type = waters_type self._predecessor = None if predecessor is None: self._predecessor = [] else: self.predecessor = predecessor self._successor = None if successor is None: self._successor = [] else: self.successor = successor self._polygon = None self._traffic_signs = None if traffic_signs is None: self._traffic_signs = set() else: self.traffic_signs = traffic_signs self._depth = np.inf def __eq__(self, other): if not isinstance(other, Waters): warnings.warn(f"Inequality between Waters {repr(self)} and different type {type(other)}") return False waters_eq = True polylines = [self._left_vertices, self._right_vertices, self._center_vertices] polylines_other = [other.left_vertices, other.right_vertices, other.center_vertices] for i in range(0, len(polylines)): polyline = polylines[i] polyline_other = polylines_other[i] polyline_string = np.array2string(np.around(polyline.astype(float), 10), precision=10) polyline_other_string = np.array2string(np.around(polyline_other.astype(float), 10), precision=10) waters_eq = waters_eq and polyline_string == polyline_other_string if waters_eq and self.waters_id == other.waters_id \ and set(self._predecessor) == set(other.predecessor) and set(self._successor) == set(other.successor) \ and self._waters_type == other.waters_type \ and self._traffic_signs == other.traffic_signs: return polylines warnings.warn(f"Inequality of Waters {repr(self)} and the other one {repr(other)}") return False def __hash__(self): polylines = [self._left_vertices, self._right_vertices, self._center_vertices] polyline_strings = [] for polyline in polylines: polyline_string = np.array2string(np.around(polyline.astype(float), 10), precision=10) polyline_strings.append(polyline_string) elements = [self._predecessor, self._successor, self._traffic_signs] frozen_elements = [frozenset(e) for e in elements] frozen_elements.append(self._waters_type) return hash((self._waters_id, tuple(polyline_strings), tuple(frozen_elements))) def __str__(self): return 'Waterway with id:' + str(self.waters_id) def __repr__(self): return f"Waterway(left_vertices={self._left_vertices.tolist()}, " \ f"center_vertices={self._center_vertices.tolist()}, " \ f"right_vertices={self._right_vertices.tolist()}, waters_id={self._waters_id}, " \ f"predecessor={self._predecessor}, successor={self._successor}, " \ f"waters_type={self._waters_type}, " \ f" traffic_signs={self._traffic_signs}" @property def waters_type(self) -> WatersType: return self._waters_type @property def waters_id(self) -> int: return self._waters_id @waters_id.setter def waters_id(self, f_id: int): if self._waters_id is None: assert is_natural_number(f_id), '<Waters/waters_id>: Provided waters_id is not valid! id={}'.format(f_id) self._waters_id = f_id else: warnings.warn('<Waters/waters_id>: waters_id of waters is immutable') @property def left_vertices(self) -> np.ndarray: return self._left_vertices @left_vertices.setter def left_vertices(self, polyline: np.ndarray): if self._left_vertices is None: assert is_valid_polyline( polyline), '<Waters/left_vertices>: The provided polyline is not valid! polyline = {}'.format(polyline) self._left_vertices = polyline else: warnings.warn('<Waters/left_vertices>: left_vertices of waters are immutable!') @property def right_vertices(self) -> np.ndarray: return self._right_vertices @right_vertices.setter def right_vertices(self, polyline: np.ndarray): if self._right_vertices is None: assert is_valid_polyline( polyline), '<Waters/right_vertices>: The provided polyline is not valid! polyline = {}'.format( polyline) self._right_vertices = polyline else: warnings.warn('<Waters/right_vertices>: right_vertices of waters are immutable!') @property def center_vertices(self) -> np.ndarray: return self._center_vertices @center_vertices.setter def center_vertices(self, polyline: np.ndarray): if self._center_vertices is None: assert is_valid_polyline( polyline), '<Waters/center_vertices>: The provided polyline is not valid! polyline = {}'.format( polyline) self._center_vertices = polyline else: warnings.warn('<Waters/center_vertices>: center_vertices of water are immutable!') @property def predecessor(self) -> list: return self._predecessor @predecessor.setter def predecessor(self, predecessor: list): if self._predecessor is None: assert (is_list_of_natural_numbers(predecessor) and len(predecessor) >= 0), '<Waters/predecessor>: ' \ 'Provided list ' \ 'of predecessors is not valid!' \ 'predecessors = {}'.format( predecessor) self._predecessor = predecessor else: warnings.warn( '<Waters/predecessor>: predecessor of waters is immutable!') @property def successor(self) -> list: return self._successor @successor.setter def successor(self, successor: list): if self._successor is None: assert (is_list_of_natural_numbers(successor) and len(successor) >= 0), '<Waters/predecessor>: Provided ' \ 'list of successors is not valid!' \ 'successors = {}'.format(successor) self._successor = successor else: warnings.warn( '<Waters/successor>: successor of water is immutable!') @waters_type.setter def waters_type(self, waters_type: Set[WatersType]): if self._waters_type is None or len(self._waters_type) == 0: assert isinstance(waters_type, set) and all(isinstance(elem, WatersType) for elem in waters_type), \ '<Waters/waters_type>: ''Provided type is not valid! type = {}'.format(type(waters_type)) self._waters_type = waters_type else: warnings.warn( '<Water/waters_type>: type of water is immutable!') @property def traffic_signs(self) -> Set[int]: return self._traffic_signs @traffic_signs.setter def traffic_signs(self, traffic_sign_ids: Set[int]): if self._traffic_signs is None: assert isinstance(traffic_sign_ids, set), \ '<Waters/traffic_signs>: provided list of ids is not a ' \ 'set! type = {}'.format(type(traffic_sign_ids)) self._traffic_signs = traffic_sign_ids else: warnings.warn( '<Waters/traffic_signs>: traffic_signs of water is immutable!') @property def depth(self): return self._depth @depth.setter def depth(self, depth: float): warnings.warn('<Waters/depth>: depth of waterway is immutable!') def translate_rotate(self, translation: np.ndarray, angle: float): """ This method translates and rotates a water :param translation: The translation given as [x_off,y_off] for the x and y translation :param angle: The rotation angle in radian (counter-clockwise defined) """ assert is_real_number_vector(translation, 2), '<Waters/translate_rotate>: provided translation ' \ 'is not valid! translation = {}'.format(translation) assert is_valid_orientation( angle), '<Waters/translate_rotate>: provided angle is not valid! angle = {}'.format(angle) t_m = translation_rotation_matrix(translation,angle) tmp = t_m.dot(np.vstack((self.center_vertices.transpose(), np.ones((1, self.center_vertices.shape[0]))))) tmp = tmp[0:2, :] self._center_vertices = tmp.transpose() tmp = t_m.dot(np.vstack((self.left_vertices.transpose(), np.ones((1, self.left_vertices.shape[0]))))) tmp = tmp[0:2, :] self._left_vertices = tmp.transpose() tmp = t_m.dot(np.vstack((self.right_vertices.transpose(), np.ones((1, self.right_vertices.shape[0]))))) tmp = tmp[0:2, :] self._right_vertices = tmp.transpose() if self._polygon is not None: self._polygon = None self._polygon = self.convert_to_polygon() def convert_to_polygon(self) -> Polygon: """ Converts the given water to a polygon representation :return: The polygon of the water """ if self._polygon is None: self._polygon = Polygon(np.concatenate((self.right_vertices, np.flip(self.left_vertices, 0)))) return self._polygon def contains_points(self, point_list: np.ndarray) -> List[bool]: """ Checks if a list of points is enclosed in the water :param point_list: The list of points in the form [[px1,py1],[px2,py2,],...] :return: List of bools with True indicating point is enclosed and False otherwise """ assert isinstance(point_list, ValidTypes.ARRAY), '<Waters/contains_points>: provided list of points is not a list! type ' \ '= {}'.format(type(point_list)) assert is_valid_polyline( point_list), 'Waters/contains_points>: provided list of points is malformed! points = {}'.format( point_list) res = list() poly = self.convert_to_polygon() for p in point_list: res.append(poly.contains_point(p)) return res def get_obstacles(self, obstacles: List[Obstacle], time_step: int = 0) -> List[Obstacle]: """ Returns the subset of obstacles, which are located in the water, of a given candidate set :param obstacles: The set of obstacle candidates :param time_step: The time step for the occupancy to check :return: """ assert isinstance(obstacles, list) and all(isinstance(o, Obstacle) for o in obstacles), '<Waters/get_obstacles>: Provided list of obstacles' \ ' is malformed! obstacles = {}'.format( obstacles) res = list() for o in obstacles: o_shape = o.occupancy_at_time(time_step).shape vertices = list() if isinstance(o_shape, ShapeGroup): for sh in o_shape.shapes: if isinstance(sh, Circle): vertices.append(sh.center) else: vertices.append(sh.vertices) vertices = np.append(vertices, [o_shape.center], axis=0) else: if isinstance(o_shape, Circle): vertices = o_shape.center else: vertices = o_shape.vertices vertices = np.append(vertices, [o_shape.center], axis=0) if any(self.contains_points(np.array(vertices))): res.append(o) return res def add_traffic_sign_to_water(self, traffic_sign_id: int): """ Adds a traffic sign ID to water :param traffic_sign_id: traffic sign ID to add """ self.traffic_signs.add(traffic_sign_id) class Shallow(Waters): """Class to describe a shallow with a defined shape and depth""" def __init__(self, shape: Shape, waters_id: int, depth: float = 10.0): """ :param shape: shape of the shallow :param depth: depth of the shallow in meters (default: 10.0) """ self._shape = shape self._depth = depth self._waters_id = waters_id self._waters_type = WatersType.SHALLOW def __str__(self): shallow_str = "\n" shallow_str += "Shallow:\n" shallow_str += "- Shape: {}\n".format(type(self._shape).__name__) shallow_str += "- Center-Position: {}\n".format(str(self.shape.center)) shallow_str += "- Depth: {} Meters\n".format(str(self.depth)) return shallow_str def __repr__(self): return f"Shallow(shape={type(self._shape).__name__}, " \ f"waters_id={str(self.waters_id)}, " \ f"center_position={str(self.shape.center)}, " \ f"depth={str(self.depth)}" @property def waters_type(self) -> WatersType: return self._waters_type @property def waters_id(self) -> int: return self._waters_id @waters_id.setter def waters_id(self, f_id: int): if self._waters_id is None: assert is_natural_number(f_id), '<Waters/waters_id>: Provided waters_id is not valid! id={}'.format(f_id) self._waters_id = f_id else: warnings.warn('<Waters/waters_id>: waters_id of waters is immutable') @property def shape(self): return self._shape @shape.setter def shape(self, shape: Shape): assert isinstance(shape, Shape), \ '<Shallow/shape>: argument shape of wrong ' \ 'type. Expected type: %s. Got type: %s.' \ % (Shape, type(shape)) self._shape = shape @property def depth(self): return self._depth @depth.setter def depth(self, depth: float): assert isinstance(depth, float), \ '<Shallow/depth>: argument depth of wrong ' \ 'type. Expected type: %s. Got type: %s.' \ % (float, type(depth)) assert depth >= 0, '<Shallow/depth>: argument depth is a negative number. ' \ 'Expected type is a positive number. Got: %s.' \ % (float, type(depth)) self._depth = depth def translate_rotate(self, translation: np.ndarray, angle: float): new_shape = self.shape.translate_rotate(translation,angle) self.shape = new_shape def contains_points(self, point_list: np.ndarray)-> List[bool]: """ Checks if a list of points is enclosed in the shallow :param point_list: The list of points in the form [[px1,py1],[px2,py2,],...] :return: List of bools with True indicating point is enclosed and False otherwise """ assert isinstance(point_list, ValidTypes.ARRAY), '<Shallow/contains_points>: provided list of points is not a list! type ' \ '= {}'.format(type(point_list)) res = list() for p in point_list: res.append(self.shape.contains_point(p)) return res def get_obstacles(self, obstacles: List[Obstacle], time_step: int = 0) -> List[Obstacle]: """ Returns the subset of obstacles, which are located in the water, of a given candidate set :param obstacles: The set of obstacle candidates :param time_step: The time step for the occupancy to check :return: """ assert isinstance(obstacles, list) and all(isinstance(o, Obstacle) for o in obstacles), '<Shallow/get_obstacles>: Provided list of obstacles' \ ' is malformed! obstacles = {}'.format( obstacles) res = list() for o in obstacles: o_shape = o.occupancy_at_time(time_step).shape vertices = list() if isinstance(o_shape, ShapeGroup): for sh in o_shape.shapes: if isinstance(sh, Circle): vertices.append(sh.center) else: vertices.append(sh.vertices) vertices = np.append(vertices, [o_shape.center], axis=0) else: if isinstance(o_shape, Circle): vertices = o_shape.center else: vertices = o_shape.vertices vertices = np.append(vertices, [o_shape.center], axis=0) if any(self.contains_points(np.array(vertices))): res.append(o) return res
[docs]class WatersNetwork: """ Class which represents a network of connected waters (waterways and shallows) """ def __init__(self, center_nav_area: np.ndarray, length_nav_area: float, width_nav_area: float, orientation_nav_area: float): self._waterways: Dict[int, Waterway] = {} self._shallows: Dict[int, Shallow] = {} self._traffic_signs: Dict[int, TrafficSign] = {} self._unassigned_traffic_signs: Dict[int, TrafficSign] = {} assert len( center_nav_area) == 2, '<Watersnetwork/center navigationable area> Error: dimensions do not fit. Got { } but 2 is expected'.format( len(center_nav_area)) assert length_nav_area >= 0, '<Watersnetwork/length navigationable area> Error: negative length not allowed' assert width_nav_area >= 0, '<Watersnetwork/width navigationable area> Error: negative width not allowed' assert is_valid_orientation( orientation_nav_area), '<WatersNetwork/orientation navigationable area>: provided orientation is not valid! orientation = {}'.format(angle) self._navigationable_area = Rectangle(length_nav_area,width_nav_area,center_nav_area,orientation_nav_area) def __eq__(self, other): if not isinstance(other, WatersNetwork): warnings.warn(f"Inequality between WatersNetwork {repr(self)} and different type {type(other)}") return False list_elements_eq = True waters_network_eq = True elements = [self._waterways, self._traffic_signs, self._shallows] elements_other = [other._waterways, other._traffic_signs, other._shallows] for i in range(0, len(elements)): e = elements[i] e_other = elements_other[i] waters_network_eq = waters_network_eq and len(e) == len(e_other) for k in e.keys(): if k not in e_other: waters_network_eq = False continue if e.get(k) != e_other.get(k): list_elements_eq = False if not waters_network_eq: warnings.warn(f"Inequality of WatersNetwork {repr(self)} and the other one {repr(other)}") return waters_network_eq and list_elements_eq def __hash__(self): return hash((frozenset(self._waterways.items()), frozenset(self._shallows.items()), frozenset(self._traffic_signs.items()))) def __str__(self): return f"WatersNetwork consists of waterways {set(self._waterways.keys())}, " \ f" shallows {set(self._shallows.keys())}, " \ f" and traffic signs {set(self._traffic_signs.keys())}" def __repr__(self): return f"WatersNetwork(waterways={repr(self._waterways)}, shallows={repr(self._shallows)}, traffic_signs={repr(self._traffic_signs)}" @property def navigationable_area(self) -> Rectangle: return self._navigationable_area @navigationable_area.setter def navigationable_area(self, navigationable_area: Rectangle): warnings.warn('<WatersNetwork/navigationable area>: navigationable area of network are immutable') @property def waters(self) -> List[Waters]: return list(self._waterways.values()) + list(self._shallows.values()) @property def waterways(self) -> List[Waters]: return list(self._waterways.values()) @property def shallows(self) -> List[Waters]: return list(self._shallows.values()) @waters.setter def waters(self, waters: list): warnings.warn('<WatersNetwork/waters>: waters of network are immutable') @waterways.setter def waterways(self, waters: list): warnings.warn('<WatersNetwork/waterways>: waterways of network are immutable') @shallows.setter def shallows(self, waters: list): warnings.warn('<WatersNetwork/shallows>: shallows of network are immutable') @property def traffic_signs(self) -> List[TrafficSign]: return list(self._traffic_signs.values())
[docs] def find_waters_by_id(self, waters_id: int) -> Waters: """ Finds a water (shallow or waterway) for a given waters_id :param waters_id: The id of the waterway to find :return: The waters object if the id exists and None otherwise """ assert is_natural_number( waters_id), '<WatersNetwork/find_waters_by_id>: provided id is not valid! id = {}'.format(waters_id) res = self.find_waterway_by_id(waters_id) if res is None: return self.find_shallow_by_id(waters_id) else: return res
[docs] def find_waterway_by_id(self, waters_id: int) -> Waters: """ Finds a waterway for a given waters_id :param waters_id: The id of the waterway to find :return: The waterway object if the id exists and None otherwise """ assert is_natural_number( waters_id), '<WatersNetwork/find_waterway_by_id>: provided id is not valid! id = {}'.format(waters_id) return self._waterways[waters_id] if waters_id in self._waterways else None
[docs] def find_shallow_by_id(self, waters_id: int) -> Waters: """ Finds a shallow for a given waters_id :param waters_id: The id of the shallow to find :return: The shallow object if the id exists and None otherwise """ assert is_natural_number( waters_id), '<WatersNetwork/find_shallow_by_id>: provided id is not valid! id = {}'.format(waters_id) return self._shallows[waters_id] if waters_id in self._shallows else None
[docs] def find_traffic_sign_by_id(self, traffic_sign_id: int) -> TrafficSign: """ Finds a traffic sign for a given traffic_sign_id :param traffic_sign_id: The id of the traffic sign to find :return: The traffic sign object if the id exists and None otherwise """ assert is_natural_number( traffic_sign_id), '<WatersNetwork/find_traffic_sign_by_id>: provided id is not valid! ' \ 'id = {}'.format(traffic_sign_id) return self._traffic_signs[traffic_sign_id] if traffic_sign_id in self._traffic_signs else None
[docs] def add_waters(self, water: Waters): """ Adds a waters (shallow and waterway) to the WatersNetwork :param water: The waters to add :return: True if the waters has successfully been added to the network, false otherwise """ assert isinstance(water, Waters), '<WatersNetwork/add_waters>: provided water is not of ' \ 'type water! type = {}'.format(type(water)) if isinstance(water, Shallow): if water.waters_id in self._shallows.keys(): warnings.warn('Shallow already exists in network! No changes are made.') return False else: self._shallows[water.waters_id] = water return True elif isinstance(water, Waterway): if water.waters_id in self._waterways.keys(): warnings.warn('Waters already exists in network! No changes are made.') return False else: self._waterways[water.waters_id] = water return True
[docs] def add_traffic_sign(self, traffic_sign: TrafficSign, waters_ids: Union[None, Set[int]] = None): """ Adds a traffic sign to the WatersNetwork :param traffic_sign: The traffic sign to add :param waters_ids: Waters the traffic sign should be referenced from :return: True if the traffic sign has successfully been added to the network, false otherwise """ assert isinstance(traffic_sign, TrafficSign), '<WatersNetwork/add_traffic_sign>: provided traffic sign is ' \ 'not of type traffic_sign! type = {}'.format(type(traffic_sign)) if traffic_sign.traffic_sign_id in self._traffic_signs.keys(): warnings.warn('Traffic sign with ID {} already exists in network! ' 'No changes are made.'.format(traffic_sign.traffic_sign_id)) return False else: self._traffic_signs[traffic_sign.traffic_sign_id] = traffic_sign if waters_ids is None or len(waters_ids) < 1: warnings.warn('Traffic sign was not referenced to water, use post_assign_traffic_sign to assign it later.') self._unassigned_traffic_signs[traffic_sign.traffic_sign_id] = traffic_sign else: for water_id in waters_ids: if water_id is not None: water = self.find_waterway_by_id(water_id) if water is not None: water.add_traffic_sign_to_water(traffic_sign.traffic_sign_id) else: warnings.warn('Traffic sign cannot be referenced to water because the water does not exist.') else: pass return True
[docs] def post_assign_traffic_sign(self, traffic_sign_id: int, waters_ids: Set[int]): """ Assign an unassigned traffic sign to the WatersNetwork :param traffic_sign_id: The traffic sign id to be assigned :param waters_ids: Waters the traffic sign should be referenced from :return: True if the traffic sign has successfully been added to the network, false otherwise """ if traffic_sign_id not in self._traffic_signs.keys(): warnings.warn('Traffic sign with ID {} does not exist in network! ' 'No changes are made.'.format(traffic_sign_id)) return False elif traffic_sign_id not in self._unassigned_traffic_signs.keys(): warnings.warn('Traffic sign with ID {} is already assigned to a Water in network! ' 'No changes are made.'.format(traffic_sign_id)) return False else: assigned = False for water_id in waters_ids: water = self.find_waterway_by_id(water_id) if water is not None: water.add_traffic_sign_to_water(traffic_sign_id) assigned = True else: warnings.warn('Traffic sign cannot be referenced to water because the water does not exist.') if assigned: del self._unassigned_traffic_signs[traffic_sign_id] return True
[docs] def add_waters_from_network(self, waters_network: 'WatersNetwork'): """ Adds waters from a given network object to the current network :param waters_network: The water network :return: True if all waters have been added to the network, false otherwise """ flag = True for f in waters_network.waters: flag = flag and self.add_waters(f) return flag
[docs] def remove_waters(self, waters_id: int): """ Removes waters from a waters network and deletes all references. @param waters_id: ID of waters which should be removed. """ if waters_id in self._waterways.keys(): del self._waterways[waters_id] if waters_id in self._shallows.keys(): del self._shallows[waters_id]
[docs] def remove_traffic_sign(self, traffic_sign_id: int): """ Removes a traffic sign from a waters network and deletes all references. @param traffic_sign_id: ID of traffic sign which should be removed. """ if traffic_sign_id in self._traffic_signs.keys(): del self._traffic_signs[traffic_sign_id]
[docs] def translate_rotate(self, translation: np.ndarray, angle: float): """ Translates and rotates the complete waters network :param translation: The translation given as [x_off,y_off] for the x and y translation :param angle: The rotation angle in radian (counter-clockwise defined) """ assert is_real_number_vector(translation, 2), '<WatersNetwork/translate_rotate>: provided translation is not valid! ' \ 'translation = {}'.format(translation) assert is_valid_orientation( angle), '<WatersNetwork/translate_rotate>: provided angle is not valid! angle = {}'.format(angle) nav_area_new = self._navigationable_area.translate_rotate(translation,angle) self._navigationable_area = nav_area_new for waterway in self._waterways.values(): waterway.translate_rotate(translation, angle) for shallow in self._shallows.values(): shallow.translate_rotate(translation, angle) for traffic_sign in self._traffic_signs.values(): traffic_sign.translate_rotate(translation, angle)
[docs] def find_waterway_by_position(self, point_list: List[np.ndarray]) -> List[List[int]]: """ Finds the waterway id of a given position :param point_list: The list of positions to check :return: A list of water ids. If the position could not be matched to a water, an empty list is returned """ assert isinstance(point_list, ValidTypes.LISTS), '<waterways/contains_points>: provided list of points is not a list! ' \ 'type = {}'.format(type(point_list)) res = list() polygons = [(f.waters_id, f.convert_to_polygon()) for f in self.waterways] for point in point_list: mapped = list() for waters_id, poly in polygons: if poly.contains_point(point): mapped.append(waters_id) res.append(mapped) return res
[docs] def find_waterway_by_shape(self, shape: Shape) -> List[int]: """ Finds the waterway id of a given shape :param shape: The shape to check :return: A list of water ids. If the position could not be matched to a water, an empty list is returned """ assert isinstance(shape, (Circle, Polygon, Rectangle)), '<Waters/find_water_by_shape>: ' \ 'provided shape is not a shape! ' \ 'type = {}'.format(type(shape)) res = [] polygons = [(l.waters_id, l.convert_to_polygon()) for l in self.waterways] for waters_id, poly in polygons: if poly.shapely_object.intersects(shape.shapely_object): res.append(waters_id) return res
[docs] def filter_obstacles_in_network(self, obstacles: List[Obstacle]) -> List[Obstacle]: """ Returns the list of obstacles which are located in the water network :param obstacles: The list of obstacles to check :return: The list of obstacles which are located in the water network """ res = list() map = self.map_obstacles_to_waters(obstacles) for k in map.keys(): obs = map[k] for o in obs: if o not in res: res.append(o) return res
[docs] def map_obstacles_to_waters(self, obstacles: List[Obstacle]) -> Dict[int, List[Obstacle]]: """ Maps a given list of obstacles to the waters of the water network :param obstacles: The list of CR obstacles :return: A dictionary with the water id as key and the list of obstacles on the water as a List[Obstacles] """ mapping = {} for f in self.waters: mapped_objs = f.get_obstacles(obstacles) if len(mapped_objs) > 0: mapping[f.waters_id] = mapped_objs return mapping
[docs] def waterways_in_proximity(self, point: list, radius: float) -> List[Waters]: """ Finds all waterways which intersect a given circle, defined by the center point and radius :param point: The center of the circle :param radius: The radius of the circle :return: The list of waters which intersect the given circle """ assert is_real_number_vector(point, length=2), '<WatersNetwork/waters_in_proximity>: provided point is ' \ 'not valid! point = {}'.format(point) assert is_positive( radius), '<WatersNetwork/waters_in_proximity>: provided radius is not valid! radius = {}'.format(radius) ids = self._waterways.keys() lanes = dict() rad_sqr = radius ** 2 distance_list = list() for i in ids: if i not in lanes: water = self.find_waterway_by_id(i) distance = (water.center_vertices - point) ** 2. distance = distance[:, 0] + distance[:, 1] if any(np.greater_equal(rad_sqr, distance)): lanes[i] = self.find_waterway_by_id(i) distance_list.append(np.min(distance)) index_minDist = np.argmin(distance - rad_sqr) indices = np.argsort(distance_list) water = list(lanes.values()) return [water[i] for i in indices]
[docs] def shallow_depth_for_positions(self, positions: List[np.ndarray]) -> List: """ This function returns the shallow depth for positions :param positions: List of positions where each position is a 2D ndarray :return: List with the respective depths of the positions of the input list """ depths = [] for position in positions: is_infinite = True for shallow in self.shallows: if shallow.contains_points(np.array([position]))[0]: depths.append(shallow.depth) is_infinite = False break if is_infinite: depths.append(np.inf) return depths