Source code for commonocean.scenario.scenario

import itertools
import re
import iso3166
from collections import defaultdict

from commonroad.common.util import Interval
from commonocean.prediction.prediction import Occupancy, TrajectoryPrediction, SetBasedPrediction

from commonocean import SCENARIO_VERSION, SUPPORTED_COMMONOCEAN_VERSIONS
from commonocean.scenario.obstacle import StaticObstacle, DynamicObstacle, ObstacleRole, ObstacleType
from commonocean.scenario.state import GeneralState
from commonocean.scenario.waters import *

# Tunneling from CR-IO #
from commonroad.scenario.scenario import GeoTransformation as GeoTransformation_CR
########################

__author__ = "Hanna Krasowski, Benedikt Pfleiderer, Fabian Thomas-Barein"
__copyright__ = "TUM Cyber-Physical System Group"
__credits__ = ["ConVeY"]
__version__ = "2022a"
__maintainer__ = "Hanna Krasowski"
__email__ = "commonocean@lists.lrz.de"
__status__ = "released"


[docs]@enum.unique class Tag(enum.Enum): """ Enum containing all possible tags of a CommonOcean scenario.""" OPENSEA = "open_sea" TRAFFICSIGN = "traffic_sign" NARROWWATERS = "narrow_waters" TRAFFICSEPERATIONZONE = "traffic_separation_zone" HARBOUR = "harbour" COMFORT = "comfort" CRITICAL = "critical" EVASIVE = "evasive" SPEED_LIMIT = "speed_limit"
@enum.unique class TimeOfDay(enum.Enum): """ Enum containing all possible time of days.""" DAY = "day" NIGHT = "night" UNKNOWN = "unknown" @enum.unique class Weather(enum.Enum): """ Enum containing all possible weathers.""" SUNNY = "sunny" LIGHT_RAIN = "light_rain" HEAVY_RAIN = "heavy_rain" FOG = "fog" SNOW = "snow" HAIL = "hail" UNKNOWN = "unknown" @enum.unique class SeaState(enum.Enum): """ Enum containing all possible states.""" CALM = "calm" ROUGH = "rough" UNKNOWN = "unknown" class Time: """ Class which describes the fictive time when a scenario starts. """ def __init__(self, year: int, month: int, day: int, hours: int, minutes: int): """ Constructor of a time object :param year: year of scenario e.g. 2022 :param month: month of scenario (1-12) :param day: day of scenario (1-31) :param hours: hours at start of scenario (0-24) :param minutes: minutes at start of scenario (0-60) """ self._year = year self._month = month self._day = day self._hours = hours self._minutes = minutes def __eq__(self, other): if not isinstance(other, Time): return False return self._hours == other.hours and self._minutes == other.minutes and self._year == other._year \ and self._day == other._day and self._month == other._month def __hash__(self): return hash((self._hours, self._minutes)) @property def year(self) -> int: return self._year @property def month(self) -> int: return self._month @property def day(self) -> int: return self._day @property def year(self) -> int: return self._year @property def month(self) -> int: return self._month @property def day(self) -> int: return self._day @property def hours(self) -> int: return self._hours @property def minutes(self) -> int: return self._minutes
[docs]class GeoTransformation(GeoTransformation_CR): """ Class which describes the transformation from geodetic to projected Cartesian coordinates according to the CommonOcean specification """ def __init__(self, geo_reference: str = None, x_translation: float = None, y_translation: float = None, z_rotation: float = None, scaling: float = None): """ Constructor of a location object :param geo_reference: proj-string describing transformation from geodetic to projected Cartesian coordinates :param x_translation: translation value for x-coordinates :param y_translation: translation value for y-coordinates :param z_rotation: rotation value around origin :param scaling: multiplication value of x- and y-coordinates """ super().__init__(geo_reference, x_translation, y_translation, z_rotation, scaling)
class Environment: """ Class which describes the environment where a scenario takes place as specified in the CommonOcean specification. """ def __init__(self, time: Time = None, time_of_day: TimeOfDay = None, weather: Weather = None, seastate: SeaState = None): """ Constructor of an environment object :param time: time in day, hours and minutes :param time_of_day: current time of day, i.e., day or night :param weather: weather information, e.g., sunny :param seastate: seastate information, e.g., calm """ self._time = time self._time_of_day = time_of_day self._weather = weather self._seastate = seastate def __eq__(self, other): if not isinstance(other, Environment): return False return self._time == other.time and self._time_of_day == other.time_of_day and \ self._weather == other.weather and self._seastate == other.seastate def __hash__(self): return hash((self._time, self._time_of_day, self._weather, self._seastate)) @property def time(self) -> Time: return self._time @property def time_of_day(self) -> TimeOfDay: return self._time_of_day @property def weather(self) -> Weather: return self._weather @property def seastate(self) -> SeaState: return self._seastate
[docs]class Location: """ Class which describes a location according to the CommonOcean specification. """ def __init__(self, geo_name_id: int = -999, gps_latitude: float = 999, gps_longitude: float = 999, geo_transformation: GeoTransformation = None, environment: Environment = None): """ Constructor of a location object :param geo_name_id: GeoName ID :param gps_latitude: GPS latitude coordinate :param gps_longitude: GPS longitude coordinate :param geo_transformation: description of geometric transformation during scenario generation :param environment: environmental information, e.g. weather """ self._geo_name_id = geo_name_id self._gps_latitude = gps_latitude self._gps_longitude = gps_longitude self._geo_transformation = geo_transformation self._environment = environment def __eq__(self, other): if not isinstance(other, Location): return False return self._geo_name_id == other.geo_name_id and self._gps_latitude == other.gps_latitude and \ self._gps_longitude == other.gps_longitude and self._geo_transformation == other.geo_transformation and \ self._environment == other.environment def __hash__(self): return hash((self._geo_name_id, self._gps_latitude, self._gps_longitude, self._geo_transformation, self._environment)) @property def geo_name_id(self) -> int: return self._geo_name_id @property def gps_latitude(self) -> float: return self._gps_latitude @property def gps_longitude(self) -> float: return self._gps_longitude @property def geo_transformation(self) -> GeoTransformation: return self._geo_transformation @property def environment(self) -> Environment: return self._environment
class ScenarioID: def __init__(self, cooperative: bool = False, country_id: str = "ZAM", map_name: str = "Test", map_id: int = 1, configuration_id: Union[None, int] = None, prediction_type: Union[None, str] = None, prediction_id: Union[None, int] = None, scenario_version: str = SCENARIO_VERSION): """ Implements the scenario ID as specified in the scenario documentation. Example for benchmark ID C-USA_US101-33_2_T-1 :param cooperative: True if scenario contains cooperative planning problem sets with multiple planning problems :param country_id: three-letter ID according :param map_name: name of the map (e.g. US101) :param map_id: index of the map (e.g. 33) :param configuration_id: enumerates initial configuration of vehicles on the map (e.g. 2) :param prediction_type: type of the prediction for surrounding vehicles (e.g. T) :param prediction_id: enumerates different predictions for the same initial configuration (e.g. 1) :param scenario_version: scenario version identifier (e.g. 2020a) """ assert scenario_version in SUPPORTED_COMMONOCEAN_VERSIONS, 'Scenario_version {} not supported.' \ .format(scenario_version) self.scenario_version = scenario_version self.cooperative = cooperative self._country_id = None self.country_id = country_id self.map_name = map_name self.map_id = map_id self.configuration_id = configuration_id self.prediction_type = prediction_type self.prediction_id = prediction_id def __str__(self): scenario_id = "" if self.cooperative is True: scenario_id += "C-" if self.country_id is not None: scenario_id += self.country_id + "_" if self.map_name is not None: scenario_id += self.map_name + "-" if self.map_id is not None: scenario_id += str(self.map_id) if self.configuration_id is not None: scenario_id += "_" + str(self.configuration_id) if self.prediction_type is not None: scenario_id += "_" + self.prediction_type + "-" if self.prediction_id is not None: if type(self.prediction_id) == list: scenario_id += "-".join([str(i) for i in self.prediction_id]) else: scenario_id += str(self.prediction_id) return scenario_id @property def country_id(self): return self._country_id @country_id.setter def country_id(self, country_id: str): if country_id is None: self._country_id = 'ZAM' elif country_id in iso3166.countries_by_alpha3 or country_id == 'ZAM': self._country_id = country_id else: raise ValueError('Country ID {} is not in the ISO-3166 three-letter format. '.format(country_id)) @property def country_name(self): if self.country_id == "ZAM": return "Zamunda" else: return iso3166.countries_by_alpha3[self.country_id].name @classmethod def from_benchmark_id(cls, benchmark_id: str, scenario_version: str): """ Create ScenarioID from benchmark_id and scenario_version in the XML header. :param benchmark_id: scenario ID provided as a string :param scenario_version: scenario format version (e.g. 2020a) :return: """ if not (benchmark_id.count('_') in (1, 2, 3) and benchmark_id.count('-') in (1, 2, 3, 4)): warnings.warn('Not a valid scenario id: ' + benchmark_id) return ScenarioID(None, None, benchmark_id, 0, None, None, None) if benchmark_id[0:2] == 'C-': cooperative = True benchmark_id = benchmark_id[2:] else: cooperative = False sub_ids = re.split('_|-', benchmark_id) country_id, map_name, map_id = sub_ids[:3] map_id = int(map_id) configuration_id = prediction_type = prediction_id = None if len(sub_ids) > 3: configuration_id = int(sub_ids[3]) if len(sub_ids) > 4: assert sub_ids[4] in ('S', 'T', 'I'), "prediction type must be one of (S, T, I) but is {}".format( sub_ids[4]) prediction_type = sub_ids[4] if len(sub_ids) == 6: prediction_id = int(sub_ids[5]) else: prediction_id = [int(s) for s in sub_ids[5:]] return ScenarioID(cooperative, country_id, map_name, map_id, configuration_id, prediction_type, prediction_id, scenario_version) def __eq__(self, other: 'ScenarioID'): return str(self) == str(other) and self.scenario_version == other.scenario_version
[docs]class Scenario: """ Class which describes a Scenario entity according to the CommonOcean specification. Each scenario is described by a ocean network consisting of waters (see :class:`commonocean.scenario.water.WatersNetwork`) and a set of obstacles which can be either static or dynamic (see :class:`commonocean.scenario.obstacle.Obstacle`).""" def __init__(self, dt: float, scenario_id: Union[str, ScenarioID], author: str = None, tags: Set[Tag] = None, affiliation: str = None, source: str = None, location: Location = None, benchmark_id: str = None): """ Constructor of a Scenario object :param dt: global time step size of the time-discrete scenario :param benchmark_id: unique CommonOcean benchmark ID of the scenario :param author: authors of the CommonOcean scenario :param tags: tags describing and classifying the scenario :param affiliation: institution of the authors :param source: source of the scenario, e.g. generated by a map converter and a traffic simulator :param location: location object of the scenario :param benchmark_id: for backwards compatibility """ self.dt: float = dt self.scenario_id = scenario_id if isinstance(scenario_id, str): self.scenario_id = ScenarioID.from_benchmark_id(scenario_id, SCENARIO_VERSION) elif scenario_id is None and benchmark_id is not None: warnings.warn('Use the the class commonocean.scenario.ScenarioID to define the scenario id.', DeprecationWarning) self.scenario_id = ScenarioID.from_benchmark_id(benchmark_id, SCENARIO_VERSION) self._waters_network: WatersNetwork = WatersNetwork(np.array([0,0]),0,0,0) self._static_obstacles: Dict[int, StaticObstacle] = defaultdict() self._dynamic_obstacles: Dict[int, DynamicObstacle] = defaultdict() self._id_set: Set[int] = set() # count ids generated but not necessarily added yet self._id_counter = None # meta data self.author = author self.tags = tags self.affiliation = affiliation self.source = source self.location = location @property def dt(self) -> float: """ Global time step size of the time-discrete scenario.""" return self._dt @dt.setter def dt(self, dt: float): assert is_real_number(dt), '<Scenario/dt> argument "dt" of wrong type. ' \ 'Expected a real number. Got type: %s.' % type(dt) self._dt = dt @property def benchmark_id(self) -> str: """ Unique benchmark ID of a scenario as specified in the CommonOcean XML-file.""" warnings.warn('benchmark_id is deprecated, use scenario_id instead', DeprecationWarning) return str(self.scenario_id) @benchmark_id.setter def benchmark_id(self, benchmark_id): raise ValueError('benchmark_id is deprecated, use scenario_id instead') @property def waters_network(self) -> WatersNetwork: raise ValueError("You are trying to access the WatersNetwork of your scenario. This is not recommended, to avoid future bugs in your work! To alter your Network, use the appropriate method scenario.add_objects(). If it is really unavoidable to access the WatersNetwork object of your Scenario, use scenario._waters_network.") @property def dynamic_obstacles(self) -> List[DynamicObstacle]: """ Returns a list of all dynamic obstacles in the scenario.""" return list(self._dynamic_obstacles.values()) @property def static_obstacles(self) -> List[StaticObstacle]: """ Returns a list of all static obstacles in the scenario.""" return list(self._static_obstacles.values()) @property def obstacles(self) -> List[Union[Obstacle, StaticObstacle, DynamicObstacle]]: """ Returns a list of all static and dynamic obstacles in the scenario.""" return list(itertools.chain(self._static_obstacles.values(), self._dynamic_obstacles.values()))
[docs] def add_objects(self, scenario_object: Union[List[Union[Obstacle, Waters, WatersNetwork, TrafficSign]], Obstacle, Waters, WatersNetwork, TrafficSign], waters_ids: Union[None, Set[int]] = None, traffic_sign_parameters: Dict[str, Any] = None): """ Function to add objects, e.g., waters, dynamic and static obstacles, to the scenario. :param scenario_object: object(s) to be added to the scenario :param waters_ids: water IDs a traffic sign should be referenced from :param traffic_sign_parameters: dict of parameters of the obstacle related with the traffic sign (keys must be 'obstacle_type', 'obstacle_id' and 'radius') :raise ValueError: a value error is raised if the type of scenario_object is invalid. """ if isinstance(scenario_object, list): for obj in scenario_object: self.add_objects(obj) elif isinstance(scenario_object, StaticObstacle): self._mark_object_id_as_used(scenario_object.obstacle_id) self._static_obstacles[scenario_object.obstacle_id] = scenario_object elif isinstance(scenario_object, DynamicObstacle): self._mark_object_id_as_used(scenario_object.obstacle_id) self._dynamic_obstacles[scenario_object.obstacle_id] = scenario_object elif isinstance(scenario_object, WatersNetwork): for water in scenario_object.waters: self._mark_object_id_as_used(water.waters_id) for traffic_sign in scenario_object.traffic_signs: self._mark_object_id_as_used(traffic_sign.traffic_sign_id) for shallow in scenario_object.shallows: self._mark_object_id_as_used(shallow.waters_id) self._waters_network: WatersNetwork = scenario_object warnings.warn('WatersNetwork replaced. (When a WatersNetwork is used in the add_objects method, the old one present in the scenario is replaced by the new one)') elif isinstance(scenario_object, Waters): self._mark_object_id_as_used(scenario_object.waters_id) self._waters_network.add_waters(scenario_object) elif isinstance(scenario_object, TrafficSign): warnings.warn('By adding a traffic sign, you automatically creates an obstacle in the same position that represent the physical boundary of the sign.') self._mark_object_id_as_used(scenario_object.traffic_sign_id) self._waters_network.add_traffic_sign(scenario_object, waters_ids) if traffic_sign_parameters is None: traffic_sign_parameters = {'obstacle_type': ObstacleType.BUOY, 'obstacle_id': self.generate_object_id(), 'radius': 5} else: pass obstacle_type = traffic_sign_parameters.get('obstacle_type', ObstacleType.BUOY) obstacle_id = traffic_sign_parameters.get('obstacle_id', None) obstacle_radius = traffic_sign_parameters.get('radius', 5) if obstacle_id is None: obstacle_id = self.generate_object_id() else: pass position = scenario_object.position above_obstacle_list = self.obstacles_by_position_intervals(position_intervals=[Interval(start=position[0] - obstacle_radius, end=position[0] + obstacle_radius), Interval(start= position[1] - obstacle_radius, end=position[1] + obstacle_radius)], obstacle_role=[ObstacleRole.STATIC]) if above_obstacle_list: obstacle_id = above_obstacle_list[0].obstacle_id warnings.warn('As there was already an obstacle under the position of your traffic_sign, a new obstacle was not inserted!') else: circ_1 = Circle(obstacle_radius) init_state_1 = GeneralState(time_step=0, orientation=0, position=position, velocity=0) static_obs_1 = StaticObstacle(obstacle_id, obstacle_type, obstacle_shape=circ_1, initial_state=init_state_1) self._mark_object_id_as_used(obstacle_id) self._static_obstacles[obstacle_id] = static_obs_1 scenario_object.related_obstacle = obstacle_id else: raise ValueError('<Scenario/add_objects> argument "scenario_object" of wrong type. ' 'Expected types: %s, %s, %s, and %s. Got type: %s.' % (list, Obstacle, Waters, WatersNetwork, type(scenario_object)))
[docs] def remove_obstacle(self, obstacle: Union[Obstacle, List[Obstacle]]): """ Removes a static, dynamic or a list of obstacles from the scenario. If the obstacle ID is not assigned, a warning message is given. :param obstacle: obstacle to be removed """ assert isinstance(obstacle, (list, Obstacle)), '<Scenario/remove_obstacle> argument "obstacle" of wrong type. ' \ 'Expected type: %s. Got type: %s.' % (Obstacle, type(obstacle)) if isinstance(obstacle, list): for obs in obstacle: self.remove_obstacle(obs) return if obstacle.obstacle_id in self._static_obstacles: del self._static_obstacles[obstacle.obstacle_id] self._id_set.remove(obstacle.obstacle_id) elif obstacle.obstacle_id in self._dynamic_obstacles: del self._dynamic_obstacles[obstacle.obstacle_id] self._id_set.remove(obstacle.obstacle_id) else: warnings.warn('<Scenario/remove_obstacle> Cannot remove obstacle with ID %s, ' 'since it is not contained in the scenario.' % obstacle.obstacle_id)
[docs] def erase_waters_network(self): """ Removes all elements from waters network. """ for waters in self._waters_network.waters: self.remove_waters(waters) for traffic_sign in self._waters_network.traffic_signs: self.remove_traffic_sign(traffic_sign) self._waters_network = WatersNetwork(np.array([0,0]),0,0,0)
[docs] def replace_waters_network(self, waters_network: WatersNetwork): """ Removes waters network with all its elements from the scenario and replaces it with new waters network. :param waters_network: new waters network """ self.erase_waters_network() self.add_objects(waters_network)
[docs] def remove_hanging_waters_members(self, remove_waters: Union[List[Waters], Waters]): """ After removing waters from remove_waters, this function removes all traffic lights and signs that are not used by other waters. :param remove_waters: Waters that should be removed from scenario. """ all_waters = self._waters_network.waters remove_waters_ids = [la.waters_id for la in remove_waters] remaining_waters = [la for la in all_waters if la.waters_id not in remove_waters_ids] traffic_signs_to_delete = set().union(*[la.traffic_signs for la in remove_waters]) traffic_signs_to_save = set().union(*[la.traffic_signs for la in remaining_waters]) remove_traffic_signs = [] for t in self._waters_network.traffic_signs: if t.traffic_sign_id in set(traffic_signs_to_delete - traffic_signs_to_save): remove_traffic_signs.append(self._waters_network.find_traffic_sign_by_id(t.traffic_sign_id)) self.remove_traffic_sign(remove_traffic_signs)
[docs] def remove_waters(self, waters: Union[List[Waters], Waters], referenced_elements: bool = True): """ Removes a waters or a list of waters from a scenario. :param waters: Waters which should be removed from scenario. :param referenced_elements: Boolean indicating whether references of waters should also be removed. """ assert isinstance(waters, (list, Waters)), '<Scenario/remove_waters> argument "waters" of wrong type. ' \ 'Expected type: %s. Got type: %s.' % (Waters, type(waters)) assert isinstance(referenced_elements, bool), '<Scenario/remove_waters> argument "referenced_elements" of wrong type. ' \ 'Expected type: %s, Got type: %s.' % (bool, type(referenced_elements)) if not isinstance(waters, list): waters = [waters] if referenced_elements: self.remove_hanging_waters_members(waters) for la in waters: self._waters_network.remove_waters(la.waters_id) self._id_set.remove(la.waters_id)
[docs] def remove_traffic_sign(self, traffic_sign: Union[List[TrafficSign], TrafficSign]): """ Removes a traffic sign or a list of traffic signs from the scenario. :param traffic_sign: Traffic sign which should be removed from scenario. """ assert isinstance(traffic_sign, (list, TrafficSign)), '<Scenario/remove_traffic_sign> argument "traffic_sign" of wrong ' \ 'type. ' \ 'Expected type: %s. Got type: %s.' % (TrafficSign, type(traffic_sign)) if isinstance(traffic_sign, list): for sign in traffic_sign: self._waters_network.remove_traffic_sign(sign.traffic_sign_id) self._id_set.remove(sign.traffic_sign_id) self.remove_obstacle(self.obstacle_by_id(sign.related_obstacle)) return self._waters_network.remove_traffic_sign(traffic_sign.traffic_sign_id) self._id_set.remove(traffic_sign.traffic_sign_id) self.remove_obstacle(self.obstacle_by_id(traffic_sign.related_obstacle))
[docs] def generate_object_id(self) -> int: """ Generates a unique ID which is not assigned to any object in the scenario. :return: unique object ID """ if self._id_counter is None: self._id_counter = 0 if len(self._id_set) > 0: max_id_used = max(self._id_set) self._id_counter = max(self._id_counter, max_id_used) self._id_counter += 1 return int(self._id_counter)
@property def shallows(self) -> List[Shallow]: """ Returns a list of all shallows in the WatersNetwork of the scenario.""" return self._waters_network.shallows @property def waterways(self) -> List[Waterway]: """ Returns a list of all waterways in the WatersNetwork of the scenario.""" return self._waters_network.waterways @property def waters(self) -> List[Waters]: """ Returns a list of all waters (waterways and shallows) in the WatersNetwork of the scenario.""" return self._waters_network.waters
[docs] def occupancies_at_time_step(self, time_step: int, obstacle_role: Union[None, ObstacleRole] = None) \ -> List[Occupancy]: """ Returns the occupancies of all static and dynamic obstacles at a specific time step. :param time_step: occupancies of obstacles at this time step :param obstacle_role: obstacle role as defined in CommonOcean, e.g., static or dynamic :return: list of occupancies of the obstacles """ assert is_natural_number(time_step), '<Scenario/occupancies_at_time> argument "time_step" of wrong type. ' \ 'Expected type: %s. Got type: %s.' % (int, type(time_step)) assert isinstance(obstacle_role, (ObstacleRole, type(None))), \ '<Scenario/obstacles_by_role_and_type> argument "obstacle_role" of wrong type. Expected types: ' \ ' %s or %s. Got type: %s.' % (ObstacleRole, None, type(obstacle_role)) occupancies = list() for obstacle in self.obstacles: if ((obstacle_role is None or obstacle.obstacle_role == obstacle_role) and obstacle.occupancy_at_time(time_step)): occupancies.append(obstacle.occupancy_at_time(time_step)) return occupancies
[docs] def obstacle_by_id(self, obstacle_id: int) -> Union[Obstacle, DynamicObstacle, StaticObstacle, None]: """ Finds an obstacle for a given obstacle_id :param obstacle_id: ID of the queried obstacle :return: the obstacle object if the ID exists, otherwise None """ assert is_integer_number(obstacle_id), '<Scenario/obstacle_by_id> argument "obstacle_id" of wrong type. ' \ 'Expected type: %s. Got type: %s.' % (int, type(obstacle_id)) obstacle = None if obstacle_id in self._static_obstacles: obstacle = self._static_obstacles[obstacle_id] elif obstacle_id in self._dynamic_obstacles: obstacle = self._dynamic_obstacles[obstacle_id] else: warnings.warn('<Scenario/obstacle_by_id> Obstacle with ID %s is not contained in the scenario.' % obstacle_id) return obstacle
[docs] def obstacles_by_role_and_type(self, obstacle_role: Union[None, ObstacleRole] = None, obstacle_type: Union[None, ObstacleType] = None) \ -> List[Obstacle]: """ Filters the obstacles by their role and type. :param obstacle_role: obstacle role as defined in CommonOcean, e.g., static or dynamic :param obstacle_type: obstacle type as defined in CommonOcean, e.g., car, train, or bus :return: list of all obstacles satisfying the given obstacle_role and obstacle_type """ assert isinstance(obstacle_role, (ObstacleRole, type(None))), \ '<Scenario/obstacles_by_role_and_type> argument "obstacle_role" of wrong type. Expected types: ' \ ' %s or %s. Got type: %s.' % (ObstacleRole, None, type(obstacle_role)) assert isinstance(obstacle_type, (ObstacleType, type(None))), \ '<Scenario/obstacles_by_role_and_type> argument "obstacle_type" of wrong type. Expected types: ' \ ' %s or %s. Got type: %s.' % (ObstacleType, None, type(obstacle_type)) obstacle_list = list() for obstacle in self.obstacles: if ((obstacle_role is None or obstacle.obstacle_role == obstacle_role) and (obstacle_type is None or obstacle.obstacle_type == obstacle_type)): obstacle_list.append(obstacle) return obstacle_list
[docs] def obstacles_by_position_intervals( self, position_intervals: List[Interval], obstacle_role: Tuple[ObstacleRole] = (ObstacleRole.DYNAMIC, ObstacleRole.STATIC), time_step: int = None) -> List[Obstacle]: """ Returns obstacles which center is located within in the given x-/y-position intervals. :param position_intervals: list of intervals for x- and y-coordinates [interval_x, interval_y] :param obstacle_role: tuple containing the desired obstacle roles :return: list of obstacles in the position intervals """ def contained_in_interval(position: np.ndarray): if position_intervals[0].contains(position[0]) and position_intervals[1].contains(position[1]): return True return False if time_step is None: time_step = 0 obstacle_list = list() if ObstacleRole.STATIC in obstacle_role: for obstacle in self.static_obstacles: if contained_in_interval(obstacle.initial_state.position): obstacle_list.append(obstacle) if ObstacleRole.DYNAMIC in obstacle_role: for obstacle in self.dynamic_obstacles: occ = obstacle.occupancy_at_time(time_step) if occ is not None: if not hasattr(occ.shape, 'center'): obstacle_list.append(obstacle) elif contained_in_interval(occ.shape.center): obstacle_list.append(obstacle) return obstacle_list
[docs] def obstacle_states_at_time_step(self, time_step: int) -> Dict[int, GeneralState]: """ Returns all obstacle states which exist at a provided time step. :param time_step: time step of interest :return: dictionary which maps id to obstacle state at time step """ assert is_natural_number(time_step), '<Scenario/obstacle_at_time_step> argument "time_step" of wrong type. ' \ 'Expected type: %s. Got type: %s.' % (int, type(time_step)) obstacle_states = {} for obstacle in self.dynamic_obstacles: if obstacle.state_at_time(time_step) is not None: obstacle_states[obstacle.obstacle_id] = obstacle.state_at_time(time_step) for obstacle in self.static_obstacles: obstacle_states[obstacle.obstacle_id] = obstacle.initial_state return obstacle_states
[docs] def translate_rotate(self, translation: np.ndarray, angle: float): """ Translates and rotates all objects, e.g., obstacles and water network, in the scenario. :param translation: translation vector [x_off, y_off] in x- and y-direction :param angle: rotation angle in radian (counter-clockwise) """ assert is_real_number_vector(translation, 2), '<Scenario/translate_rotate>: argument "translation" is ' \ 'not a vector of real numbers of length 2. translation = {}.' \ .format(translation) assert is_valid_orientation(angle), '<Scenario/translate_rotate>: argument "orientation" is not valid. ' \ 'angle = {}.'.format(angle) self._waters_network.translate_rotate(translation, angle) for obstacle in self.obstacles: obstacle.translate_rotate(translation, angle)
def _is_object_id_used(self, object_id: int) -> bool: """ Checks if an ID is already assigned to an object in the scenario. :param object_id: object ID to be checked :return: True, if the object ID is already assigned, False otherwise """ return object_id in self._id_set def _mark_object_id_as_used(self, object_id: int): """ Checks if an ID is assigned to an object in the scenario. If the ID is already assigned an error is raised, otherwise, the ID is added to the set of assigned IDs. :param object_id: object ID to be checked :raise ValueError: if the object ID is already assigned to another object in the scenario. """ if self._id_counter is None: self._id_counter = object_id if self._is_object_id_used(object_id): raise ValueError("ID %s is already used." % object_id) self._id_set.add(object_id) def __str__(self): traffic_str = "\n" traffic_str += "Scenario:\n" traffic_str += "- Scenario ID: {}\n".format(str(self.scenario_id)) traffic_str += "- Time step size: {}\n".format(self._dt) traffic_str += "- Number of Obstacles: {}\n".format(len(self.obstacles)) traffic_str += "- WatersNetwork:\n" traffic_str += str(self._waters_network) return traffic_str