import enum
import warnings
from typing import Union, Set
from abc import ABC, abstractmethod
import numpy as np
from commonroad.geometry.shape import Shape, Rectangle, Circle, Polygon
from commonocean.prediction.prediction import Prediction, Occupancy, SetBasedPrediction, TrajectoryPrediction
from commonocean.scenario.state import GeneralState
from commonroad.common.validity import is_valid_orientation, is_real_number_vector, is_real_number
__author__ = "Hanna Krasowski"
__copyright__ = "TUM Cyber-Physical System Group"
__credits__ = ["ConVeY"]
__version__ = "2022a"
__maintainer__ = "Hanna Krasowski"
__email__ = "commonocean@lists.lrz.de"
__status__ = "released"
[docs]@enum.unique
class ObstacleRole(enum.Enum):
""" Enum containing all possible obstacle roles defined in CommonOcean."""
STATIC = "static"
DYNAMIC = "dynamic"
[docs]@enum.unique
class ObstacleType(enum.Enum):
""" Enum containing all possible obstacle types defined in CommonOcean."""
UNKNOWN = "unknown"
BUOY = "buoy"
LAND = "land"
MOTORVESSEL = "motorvessel"
SAILINGVESSEL = "sailingvessel"
FISHINGVESSEL = "fishingvessel"
MILITARYVESSEL = "militaryvessel"
CARGOSHIP = "cargoship"
VESSELNOTUNDERCOMMAND = "vesselNotUnderCommand"
RESTRICTEDMANEUVERABILITYVESSEL = "restrictedManeuverabilityVessel"
ANCHOREDVESSEL = "anchoredvessel"
WINDFARM = "windfarm"
OILRIG = "oilrig"
WATERSBOUNDARY = "watersboundary"
[docs]class Obstacle(ABC):
""" Superclass for dynamic and static obstacles holding common properties defined in CommonOcean."""
def __init__(self, obstacle_id: int, obstacle_role: ObstacleRole,
obstacle_type: ObstacleType, obstacle_shape: Shape, initial_state: GeneralState, depth: float = None):
"""
:param obstacle_id: unique ID of the obstacle
:param obstacle_role: obstacle role as defined in CommonOcean
:param obstacle_type: obstacle type as defined in CommonOcean (e.g. ANCHOREDVESSEL)
:param obstacle_shape: occupied area of the obstacle
:param initial_state: initial state of the obstacle
"""
self._initial_occupancy_shape: Shape = None
self.obstacle_id: int = obstacle_id
self.obstacle_role: ObstacleRole = obstacle_role
self.obstacle_type: ObstacleType = obstacle_type
self.obstacle_shape: Shape = obstacle_shape
self.initial_state: GeneralState = initial_state
if depth != None:
self._depth = depth
else:
list_depth_inf = [ObstacleType.LAND, ObstacleType.BUOY, ObstacleType.UNKNOWN, ObstacleType.WINDFARM, ObstacleType.OILRIG]
if isinstance(obstacle_shape, Rectangle):
size = max(obstacle_shape.length, obstacle_shape.width)
elif isinstance(obstacle_shape, Circle):
size = 2*obstacle_shape.radius
elif isinstance(obstacle_shape, Polygon):
size = 0.0
for vert1 in obstacle_shape.vertices:
for vert2 in obstacle_shape.vertices:
dist = np.linalg.norm(vert1-vert2)
if dist > size:
size = dist
else:
size = 0.0
if obstacle_type not in list_depth_inf and size < 25:
depth = 5.0
elif obstacle_type not in list_depth_inf and size >= 25:
depth = 15.0
elif obstacle_type in list_depth_inf:
depth = np.inf
self._depth = depth
def __hash__(self):
return hash((self._obstacle_id, self._obstacle_role, self._obstacle_type, self._obstacle_shape,
self._initial_state))
@property
def obstacle_id(self) -> int:
""" Unique ID of the obstacle."""
return self._obstacle_id
@property
def obstacle_id(self) -> int:
""" Unique ID of the obstacle."""
return self._obstacle_id
@obstacle_id.setter
def obstacle_id(self, obstacle_id: int):
assert isinstance(obstacle_id, int), '<Obstacle/obstacle_id>: argument obstacle_id of wrong type.' \
'Expected type: %s. Got type: %s.' % (int, type(obstacle_id))
if not hasattr(self, '_obstacle_id'):
self._obstacle_id = obstacle_id
else:
warnings.warn('<Obstacle/obstacle_id>: Obstacle ID is immutable.')
@property
def obstacle_role(self) -> ObstacleRole:
""" Obstacle role as defined in CommonOcean."""
return self._obstacle_role
@obstacle_role.setter
def obstacle_role(self, obstacle_role: ObstacleRole):
assert isinstance(obstacle_role, ObstacleRole), '<Obstacle/obstacle_role>: argument obstacle_role of wrong ' \
'type. Expected type: %s. Got type: %s.' \
% (ObstacleRole, type(obstacle_role))
if not hasattr(self, '_obstacle_role'):
self._obstacle_role = obstacle_role
else:
warnings.warn('<Obstacle/obstacle_role>: Obstacle role is immutable.')
@property
def obstacle_type(self) -> ObstacleType:
""" Obstacle type as defined in CommonOcean."""
return self._obstacle_type
@obstacle_type.setter
def obstacle_type(self, obstacle_type: ObstacleType):
assert isinstance(obstacle_type, ObstacleType), '<Obstacle/obstacle_type>: argument obstacle_type of wrong ' \
'type. Expected type: %s. Got type: %s.' \
% (ObstacleType, type(obstacle_type))
if not hasattr(self, '_obstacle_type'):
self._obstacle_type = obstacle_type
else:
warnings.warn('<Obstacle/obstacle_type>: Obstacle type is immutable.')
@property
def obstacle_shape(self) -> Shape:
""" Obstacle shape as defined in CommonOcean."""
return self._obstacle_shape
@obstacle_shape.setter
def obstacle_shape(self, shape: Shape):
assert isinstance(shape,
(type(None), Shape)), '<Obstacle/obstacle_shape>: argument shape of wrong type. Expected ' \
'type %s. Got type %s.' % (Shape, type(shape))
if not hasattr(self, '_obstacle_shape'):
self._obstacle_shape = shape
else:
warnings.warn('<Obstacle/obstacle_shape>: Obstacle shape is immutable.')
@property
def initial_state(self) -> GeneralState:
""" Initial state of the obstacle, e.g., obtained through sensor measurements."""
return self._initial_state
@initial_state.setter
def initial_state(self, initial_state: GeneralState):
assert isinstance(initial_state, GeneralState), '<Obstacle/initial_state>: argument initial_state of wrong type. ' \
'Expected types: %s. Got type: %s.' % (GeneralState, type(initial_state))
self._initial_state = initial_state
self._initial_occupancy_shape = self._obstacle_shape.rotate_translate_local(
initial_state.position, initial_state.orientation)
@property
def depth(self):
return self._depth
@depth.setter
def depth(self, depth: float):
assert isinstance(depth, float), \
'<Obstacle/depth>: argument depth of wrong ' \
'type. Expected type: %s. Got type: %s.' \
% (float, type(depth))
assert depth >= 0, '<Obstacle/depth>: argument depth is a negative number. ' \
'Expected type is a positive number. Got: %s.' \
% (float, type(depth))
self._depth = depth
@property
def initial_center_waters_ids(self) -> Union[None, Set[int]]:
""" Initial waters of obstacle center, e.g., obtained through localization."""
return self._initial_center_waters_ids
@initial_center_waters_ids.setter
def initial_center_waters_ids(self, initial_center_waters_ids: Union[None, Set[int]]):
assert isinstance(initial_center_waters_ids, (set, type(None))), \
'<Obstacle/initial_center_waters_ids>: argument initial_waters_ids of wrong type. ' \
'Expected types: %s, %s. Got type: %s.' % (set, type(None), type(initial_center_waters_ids))
if initial_center_waters_ids is not None:
for waters_id in initial_center_waters_ids:
assert isinstance(waters_id, int), \
'<Obstacle/initial_center_waters_ids>: argument initial_waters of wrong type. ' \
'Expected types: %s. Got type: %s.' % (int, type(waters_id))
self._initial_center_waters_ids = initial_center_waters_ids
@property
def initial_shape_waters_ids(self) -> Union[None, Set[int]]:
""" Initial waters of obstacle shape, e.g., obtained through localization."""
return self._initial_shape_waters_ids
@initial_shape_waters_ids.setter
def initial_shape_waters_ids(self, initial_shape_waters_ids: Union[None, Set[int]]):
assert isinstance(initial_shape_waters_ids, (set, type(None))), \
'<Obstacle/initial_shape_waters_ids>: argument initial_waters_ids of wrong type. ' \
'Expected types: %s, %s. Got type: %s.' % (set, type(None), type(initial_shape_waters_ids))
if initial_shape_waters_ids is not None:
for waters_id in initial_shape_waters_ids:
assert isinstance(waters_id, int), \
'<Obstacle/initial_shape_waters_ids>: argument initial_waters of wrong type. ' \
'Expected types: %s. Got type: %s.' % (int, type(waters_id))
self._initial_shape_waters_ids = initial_shape_waters_ids
@abstractmethod
def occupancy_at_time(self, time_step: int) -> Union[None, Occupancy]:
pass
@abstractmethod
def translate_rotate(self, translation: np.ndarray, angle: float):
pass
[docs]class StaticObstacle(Obstacle):
""" Class representing static obstacles as defined in CommonOcean."""
def __init__(self, obstacle_id: int, obstacle_type: ObstacleType,
obstacle_shape: Shape, initial_state: GeneralState, depth: float = None):
"""
:param obstacle_id: unique ID of the obstacle
:param obstacle_type: type of obstacle (e.g. ANCHOREDVESSEL)
:param obstacle_shape: shape of the static obstacle
:param initial_state: initial state of the static obstacle
"""
Obstacle.__init__(self, obstacle_id=obstacle_id, obstacle_role=ObstacleRole.STATIC,
obstacle_type=obstacle_type, obstacle_shape=obstacle_shape, initial_state=initial_state, depth=depth)
[docs] def translate_rotate(self, translation: np.ndarray, angle: float):
""" First translates the static obstacle, then rotates the static obstacle around the origin.
:param translation: translation vector [x_off, y_off] in x- and y-direction
:param angle: rotation angle in radian (counter-clockwise)
"""
assert is_real_number_vector(translation, 2), '<StaticObstacle/translate_rotate>: argument translation is ' \
'not a vector of real numbers of length 2.'
assert is_real_number(angle), '<StaticObstacle/translate_rotate>: argument angle must be a scalar. ' \
'angle = %s' % angle
assert is_valid_orientation(angle), '<StaticObstacle/translate_rotate>: argument angle must be within the ' \
'interval [-2pi, 2pi]. angle = %s' % angle
self.initial_state = self._initial_state.translate_rotate(translation, angle)
[docs] def occupancy_at_time(self, time_step: int) -> Occupancy:
"""
Returns the predicted occupancy of the obstacle at a specific time step.
:param time_step: discrete time step
:return: occupancy of the static obstacle at time step
"""
return Occupancy(time_step=time_step, shape=self._initial_occupancy_shape)
[docs] def state_at_time(self, time_step: int) -> GeneralState:
"""
Returns the state the obstacle at a specific time step.
:param time_step: discrete time step
:return: state of the static obstacle at time step
"""
return self.initial_state
def __str__(self):
obs_str = 'Static Obstacle:\n'
obs_str += '\nid: {}'.format(self.obstacle_id)
obs_str += '\ninitial state: {}'.format(self.initial_state)
return obs_str
[docs]class DynamicObstacle(Obstacle):
""" Class representing dynamic obstacles as defined in CommonOcean. Each dynamic obstacle has stored its predicted
movement in future time steps.
"""
def __init__(self, obstacle_id: int, obstacle_type: ObstacleType,
obstacle_shape: Shape, initial_state: GeneralState,
prediction: Union[None, Prediction, TrajectoryPrediction, SetBasedPrediction] = None, depth: float = None):
"""
:param obstacle_id: unique ID of the obstacle
:param obstacle_type: type of obstacle (e.g. PARKED_VEHICLE)
:param obstacle_shape: shape of the static obstacle
:param initial_state: initial state of the static obstacle
:param prediction: predicted movement of the dynamic obstacle
"""
Obstacle.__init__(self, obstacle_id=obstacle_id, obstacle_role=ObstacleRole.DYNAMIC,
obstacle_type=obstacle_type, obstacle_shape=obstacle_shape, initial_state=initial_state, depth = depth)
self.prediction: Prediction = prediction
@property
def prediction(self) -> Union[Prediction, TrajectoryPrediction, SetBasedPrediction, None]:
""" Prediction describing the movement of the dynamic obstacle over time."""
return self._prediction
@prediction.setter
def prediction(self, prediction: Union[Prediction, TrajectoryPrediction, SetBasedPrediction, None]):
assert isinstance(prediction, (Prediction, type(None))), '<DynamicObstacle/prediction>: argument prediction ' \
'of wrong type. Expected types: %s, %s. Got type: ' \
'%s.' % (Prediction, type(None), type(prediction))
self._prediction = prediction
[docs] def occupancy_at_time(self, time_step: int) -> Union[None, Occupancy]:
"""
Returns the predicted occupancy of the obstacle at a specific time step.
:param time_step: discrete time step
:return: predicted occupancy of the obstacle at time step
"""
occupancy = None
if time_step == self.initial_state.time_step:
occupancy = Occupancy(time_step, self._initial_occupancy_shape)
elif time_step > self.initial_state.time_step and self._prediction is not None:
occupancy = self._prediction.occupancy_at_time_step(time_step)
return occupancy
[docs] def state_at_time(self, time_step: int) -> Union[None, GeneralState]:
"""
Returns the predicted state of the obstacle at a specific time step.
:param time_step: discrete time step
:return: predicted state of the obstacle at time step
"""
if time_step == self.initial_state.time_step:
return self.initial_state
elif type(self._prediction) is SetBasedPrediction:
warnings.warn("<DynamicObstacle/state_at_time>: Set-based prediction is used. CustomState cannot be returned!")
return None
elif time_step > self.initial_state.time_step and self._prediction is not None:
return self.prediction.trajectory.state_at_time_step(time_step)
else:
return None
[docs] def translate_rotate(self, translation: np.ndarray, angle: float):
""" First translates the dynamic obstacle, then rotates the dynamic obstacle around the origin.
:param translation: translation vector [x_off, y_off] in x- and y-direction
:param angle: rotation angle in radian (counter-clockwise)
"""
assert is_real_number_vector(translation, 2), '<DynamicObstacle/translate_rotate>: argument translation is ' \
'not a vector of real numbers of length 2.'
assert is_real_number(angle), '<DynamicObstacle/translate_rotate>: argument angle must be a scalar. ' \
'angle = %s' % angle
assert is_valid_orientation(angle), '<DynamicObstacle/translate_rotate>: argument angle must be within the ' \
'interval [-2pi, 2pi]. angle = %s' % angle
if self._prediction is not None:
self.prediction.translate_rotate(translation, angle)
self.initial_state = self._initial_state.translate_rotate(translation, angle)
[docs] def extreme_limits(self):
""" Calculate the extreme limits that the obstacle reaches during the movement.
"""
flag = True
for occupancy in self.prediction.occupancy_set:
if not flag:
x = occupancy.shape.center[0]
y = occupancy.shape.center[1]
if x > max_x:
max_x = x
if x < min_x:
min_x = x
if y > max_y:
max_y = y
if y < min_y:
min_y = y
else:
max_x = occupancy.shape.center[0]
max_y = occupancy.shape.center[1]
min_x = max_x
min_y = max_y
flag = False
return [[max_x, min_x], [max_y, min_y]]
def __str__(self):
obs_str = 'Dynamic Obstacle:\n'
obs_str += '\nid: {}'.format(self.obstacle_id)
obs_str += '\ninitial state: {}'.format(self.initial_state)
return obs_str