|
| 1 | +import warnings |
1 | 2 | from pathlib import Path |
2 | 3 | from typing import Any, Dict, Final, List |
3 | 4 |
|
|
16 | 17 | from d123.dataset.conversion.map.road_edge.road_edge_3d_utils import ( |
17 | 18 | get_road_edges_3d_from_generic_drivable_area_df, |
18 | 19 | ) |
19 | | -from d123.dataset.maps.map_datatypes import MapLayer, RoadEdgeType, RoadLineType |
20 | | - |
21 | | -# TODO: |
22 | | -# - TODO |
23 | | -LANE_GROUP_MARK_TYPES = ["DASHED_WHITE", "DOUBLE_DASH_WHITE", "DASH_SOLID_WHITE", "SOLID_DASH_WHITE", "SOLID_WHITE"] |
| 20 | +from d123.dataset.dataset_specific.av2.av2_constants import AV2_ROAD_LINE_TYPE_MAPPING |
| 21 | +from d123.dataset.maps.map_datatypes import MapLayer, RoadEdgeType |
| 22 | + |
| 23 | +LANE_GROUP_MARK_TYPES: List[str] = [ |
| 24 | + "DASHED_WHITE", |
| 25 | + "DOUBLE_DASH_WHITE", |
| 26 | + "DASH_SOLID_WHITE", |
| 27 | + "SOLID_DASH_WHITE", |
| 28 | + "SOLID_WHITE", |
| 29 | +] |
24 | 30 | MAX_ROAD_EDGE_LENGTH: Final[float] = 100.0 # TODO: Add to config |
25 | 31 |
|
26 | 32 |
|
@@ -73,29 +79,27 @@ def _extract_polyline(data: List[Dict[str, float]], close: bool = False) -> Poly |
73 | 79 | lane_group_dict = _extract_lane_group_dict(log_map_archive["lane_segments"]) |
74 | 80 | intersection_dict = _extract_intersection_dict(log_map_archive["lane_segments"], lane_group_dict) |
75 | 81 |
|
76 | | - lane_df = get_lane_df(log_map_archive["lane_segments"]) |
77 | | - lane_group_df = get_lane_group_df(lane_group_dict) |
78 | | - intersection_df = get_intersections_df(intersection_dict) |
79 | | - crosswalk_df = get_crosswalk_df(log_map_archive["pedestrian_crossings"]) |
80 | | - walkway_df = get_empty_gdf() # NOTE: AV2 does not provide walkways, so we create an empty DataFrame. |
81 | | - carpark_df = get_empty_gdf() # NOTE: AV2 does not provide carparks, so we create an empty DataFrame. |
82 | | - generic_drivable_df = get_generic_drivable_df(drivable_areas) |
83 | | - road_edge_df = get_road_edge_df(generic_drivable_df) |
84 | | - # road_line_df = get_empty_gdf() |
| 82 | + dataframes: Dict[MapLayer, gpd.GeoDataFrame] = {} |
| 83 | + |
| 84 | + dataframes[MapLayer.LANE] = get_lane_df(log_map_archive["lane_segments"]) |
| 85 | + dataframes[MapLayer.LANE_GROUP] = get_lane_group_df(lane_group_dict) |
| 86 | + dataframes[MapLayer.INTERSECTION] = get_intersections_df(intersection_dict) |
| 87 | + dataframes[MapLayer.CROSSWALK] = get_crosswalk_df(log_map_archive["pedestrian_crossings"]) |
| 88 | + dataframes[MapLayer.GENERIC_DRIVABLE] = get_generic_drivable_df(drivable_areas) |
| 89 | + dataframes[MapLayer.ROAD_EDGE] = get_road_edge_df(dataframes[MapLayer.GENERIC_DRIVABLE]) |
| 90 | + dataframes[MapLayer.ROAD_LINE] = get_road_line_df(log_map_archive["lane_segments"]) |
| 91 | + # NOTE: AV2 does not provide walkways or carparks, so we create an empty DataFrame. |
| 92 | + dataframes[MapLayer.WALKWAY] = get_empty_gdf() |
| 93 | + dataframes[MapLayer.CARPARK] = get_empty_gdf() |
85 | 94 |
|
86 | 95 | map_file_path.unlink(missing_ok=True) |
87 | 96 | if not map_file_path.parent.exists(): |
88 | 97 | map_file_path.parent.mkdir(parents=True, exist_ok=True) |
89 | 98 |
|
90 | | - lane_df.to_file(map_file_path, layer=MapLayer.LANE.serialize(), driver="GPKG") |
91 | | - lane_group_df.to_file(map_file_path, layer=MapLayer.LANE_GROUP.serialize(), driver="GPKG", mode="a") |
92 | | - intersection_df.to_file(map_file_path, layer=MapLayer.INTERSECTION.serialize(), driver="GPKG", mode="a") |
93 | | - crosswalk_df.to_file(map_file_path, layer=MapLayer.CROSSWALK.serialize(), driver="GPKG", mode="a") |
94 | | - walkway_df.to_file(map_file_path, layer=MapLayer.WALKWAY.serialize(), driver="GPKG", mode="a") |
95 | | - carpark_df.to_file(map_file_path, layer=MapLayer.CARPARK.serialize(), driver="GPKG", mode="a") |
96 | | - generic_drivable_df.to_file(map_file_path, layer=MapLayer.GENERIC_DRIVABLE.serialize(), driver="GPKG", mode="a") |
97 | | - road_edge_df.to_file(map_file_path, layer=MapLayer.ROAD_EDGE.serialize(), driver="GPKG", mode="a") |
98 | | - # road_line_df.to_file(map_file_path, layer=MapLayer.ROAD_LINE.serialize(), driver="GPKG", mode="a") |
| 99 | + with warnings.catch_warnings(): |
| 100 | + warnings.filterwarnings("ignore", message="'crs' was not provided") |
| 101 | + for layer, gdf in dataframes.items(): |
| 102 | + gdf.to_file(map_file_path, layer=layer.serialize(), driver="GPKG", mode="a") |
99 | 103 |
|
100 | 104 |
|
101 | 105 | def get_empty_gdf() -> gpd.GeoDataFrame: |
@@ -151,10 +155,7 @@ def _get_centerline_from_boundaries( |
151 | 155 | left_boundary=lane_dict["left_lane_boundary"], |
152 | 156 | right_boundary=lane_dict["right_lane_boundary"], |
153 | 157 | ) |
154 | | - lane_speed_limit_mps = None |
155 | | - |
156 | | - # ids.append(lane_id) |
157 | | - # lane_types.append(lanes_type[lane_id]) |
| 158 | + lane_speed_limit_mps = None # TODO: Consider using geo reference to retrieve speed limits. |
158 | 159 | lane_group_ids.append(lane_id) |
159 | 160 | speed_limits_mps.append(lane_speed_limit_mps) |
160 | 161 | predecessor_ids.append(lane_dict["predecessors"]) |
@@ -313,18 +314,28 @@ def get_road_edge_df(generic_drivable_area_df: gpd.GeoDataFrame) -> gpd.GeoDataF |
313 | 314 | return gpd.GeoDataFrame(pd.DataFrame({"id": ids, "road_edge_type": road_edge_types}), geometry=geometries) |
314 | 315 |
|
315 | 316 |
|
316 | | -def get_road_line_df( |
317 | | - road_lines: Dict[int, npt.NDArray[np.float64]], road_lines_type: Dict[int, RoadLineType] |
318 | | -) -> gpd.GeoDataFrame: |
319 | | - ids = list(road_lines.keys()) |
320 | | - geometries = [Polyline3D.from_array(road_edge).linestring for road_edge in road_lines.values()] |
| 317 | +def get_road_line_df(lanes: Dict[int, Any]) -> gpd.GeoDataFrame: |
321 | 318 |
|
322 | | - data = pd.DataFrame( |
323 | | - { |
324 | | - "id": ids, |
325 | | - "road_line_type": [int(road_line_type) for road_line_type in road_lines_type.values()], |
326 | | - } |
327 | | - ) |
| 319 | + # TODO @DanielDauner: Allow lanes to reference road line dataframe. |
| 320 | + |
| 321 | + ids = [] |
| 322 | + road_lines_type = [] |
| 323 | + geometries = [] |
| 324 | + |
| 325 | + running_id = 0 |
| 326 | + for lane in lanes.values(): |
| 327 | + for side in ["left", "right"]: |
| 328 | + # NOTE: We currently ignore lane markings that are NONE in the AV2 dataset. |
| 329 | + # TODO: Review if the road line system should be changed in the future. |
| 330 | + if lane[f"{side}_lane_mark_type"] == "NONE": |
| 331 | + continue |
| 332 | + |
| 333 | + ids.append(running_id) |
| 334 | + road_lines_type.append(AV2_ROAD_LINE_TYPE_MAPPING[lane[f"{side}_lane_mark_type"]]) |
| 335 | + geometries.append(lane[f"{side}_lane_boundary"].linestring) |
| 336 | + running_id += 1 |
| 337 | + |
| 338 | + data = pd.DataFrame({"id": ids, "road_line_type": road_lines_type}) |
328 | 339 | gdf = gpd.GeoDataFrame(data, geometry=geometries) |
329 | 340 | return gdf |
330 | 341 |
|
|
0 commit comments