Designing AVs¶
Now that we've seen how to run perception and tracking in series, we can venture into designing AVs as a stand-alone class. Let's set things up in the usual way. Keep the autoreload if you want to play around with the core or api libraries in editable mode.
import os
import avstack
import avapi
from tqdm import tqdm
%load_ext autoreload
%autoreload 2
data_base = '../../lib-avstack-api/data/'
obj_data_dir_k = os.path.join(data_base, 'KITTI/object')
raw_data_dir_k = os.path.join(data_base, 'KITTI/raw')
obj_data_dir_n = os.path.join(data_base, 'nuScenes')
KSM = avapi.kitti.KittiScenesManager(obj_data_dir_k, raw_data_dir_k, convert_raw=False)
KDM = KSM.get_scene_dataset_by_index(scene_idx=0)
NSM = avapi.nuscenes.nuScenesManager(obj_data_dir_n)
NDM = NSM.get_scene_dataset_by_index(0)
Jupyter environment detected. Enabling Open3D WebVisualizer. [Open3D INFO] WebRTC GUI backend enabled. [Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled. The autoreload extension is already loaded. To reload it, use: %reload_ext autoreload
Subclassing VehicleEgoStack¶
To create an ego vehicle, we subclass off of AVstack's VehicleEgoStack
. The only methods we need to implement are (1) how to initialize the modules, and (2) how to process the data as it comes in. All the other piping is handled for us by AVstack.
class OurTestAV(avstack.ego.vehicle.VehicleEgoStack):
"""Our own little AV"""
def _initialize_modules(self, *args, framerate=10, lidar_perception="pointpillars",
tracking="basic-box-tracker", dataset="kitti", **kwargs):
"""Initialize modules"""
self.perception = {
"object_3d": avstack.modules.perception.object3d.MMDetObjectDetector3D(
model=lidar_perception, dataset=dataset, **kwargs
)
}
self.tracking = avstack.modules.tracking.tracker3d.BasicBoxTracker3D(framerate=framerate, **kwargs)
self.prediction = avstack.modules.prediction.KinematicPrediction(
dt_pred=1.0/5, t_pred_forward=1, **kwargs,
)
def _tick_modules(self, frame, timestamp, data_manager, platform, *args, **kwargs):
dets_3d = self.perception["object_3d"](
data_manager.pop("lidar-0"), frame=frame, identifier="lidar_objects_3d")
tracks_3d = self.tracking(detections=dets_3d, t=timestamp, frame=frame, platform=platform)
# prediction_3d = self.prediction(tracks_3d, frame=frame) # FYI prediction is currently slow...
# standard packing of outputs
control=None
localization=None
dets_2d=None
tracks_2d=None
prediction_2d=None
prediction_3d=None
plan=None
return control, (localization, dets_2d, dets_3d, tracks_2d, tracks_3d,\
prediction_2d, prediction_3d, plan)
Testing our AV¶
To test our AV, we can make a simple data-source agnostic loop that stores data inside a DataManager
class. Because of the design decision to attach sensor-identifying information onto the sensor data itself, the DataManager
will be able to automatically allocate the sensor data into the appropriate bin - e.g., so that a sensor with very high rate doesn't impact the data buffer for any other sensor.
def run_our_av(DM, av):
"""Run an AV through a sequence of data"""
data_manager = avstack.datastructs.DataManager(max_size=1) # we either pop or we lose it
for frame in tqdm(DM.frames):
# -- add data --> sensor data has ID attached, so it knows where to go
data_manager.push(DM.get_lidar(frame, sensor="main_lidar"))
data_manager.push(DM.get_image(frame, sensor="main_camera"))
# -- run next frame
t = DM.framerate * frame
av.tick(frame, t, data_manager, platform=DM.get_calibration(frame, sensor="main_lidar").reference)
t_init = 0
ego_init = None # we don't need this for an agent without localization
result_folder = 'results_AV'
KITTI¶
Once we define the test loop, all that's left is to initialize our AV and put it through the evaluation!
DM = KDM
AV = OurTestAV(t_init, ego_init=None, framerate=DM.framerate, dataset=DM.NAME,
save_output=True, save_folder=result_folder)
run_our_av(KDM, AV)
Loads checkpoint by local backend from path: /home/spencer/Documents/Projects/AVstack/avstack-docs/lib-avstack-core/third_party/mmdetection3d/checkpoints/kitti/hv_pointpillars_secfpn_6x8_160e_kitti-3d-3class_20220301_150306-37dc2420.pth
100%|█████████████████████████████████████████████████████████████████| 108/108 [00:06<00:00, 17.08it/s]
# %%capture
percep_res_frames, percep_res_seq, _ = avapi.evaluation.get_percep_results_from_folder(
DM, AV.perception['object_3d'].save_folder, sensor_eval='main_lidar', multiprocess=True)
# track_res_frames, track_res_seq, track_res_exp = avapi.evaluation.get_track_results_from_folder(
# DM, AV.tracking.save_folder, sensor_eval='main_lidar')
100%|█████████████████████████████████████████████████████████████████| 108/108 [00:21<00:00, 4.97it/s]
print('Aggregate perception results:\n', percep_res_seq, '\n')
# print('Aggregate tracking results:\n', track_res_seq, '\n')
# print(f' e.g., HOTA_LocA(0): {track_res_exp["HOTA_LocA(0)"]}')
Aggregate perception results: {'tot_TP': 0, 'tot_FP': 0, 'tot_FN': 450, 'tot_T': 450, 'mean_precision': 0.0, 'mean_recall': 0.0}
nuScenes¶
Note that the only difference here is that we have to pass in nuScenes as the dataset so the perception models can initialize to the nuScenes weights.
DM = NDM
AV = OurTestAV(t_init, ego_init=None, framerate=DM.framerate, dataset=DM.NAME,
save_output=True, save_folder=result_folder)
run_our_av(DM, AV)
Loads checkpoint by local backend from path: /home/spencer/Documents/Projects/AVstack/avstack-docs/lib-avstack-core/third_party/mmdetection3d/checkpoints/nuscenes/hv_pointpillars_fpn_sbn-all_fp16_2x8_2x_nus-3d_20201021_120719-269f9dd6.pth
100%|███████████████████████████████████████████████████████████████████| 39/39 [00:02<00:00, 14.32it/s]
# %%capture
percep_res_frames, percep_res_seq, _ = avapi.evaluation.get_percep_results_from_folder(
DM, AV.perception['object_3d'].save_folder, sensor_eval='main_lidar', multiprocess=True)
# track_res_frames, track_res_seq, track_res_exp = avapi.evaluation.get_track_results_from_folder(
# DM, AV.tracking.save_folder, sensor_eval='main_lidar')
100%|███████████████████████████████████████████████████████████████████| 39/39 [00:27<00:00, 1.40it/s]
print('Aggregate perception results:\n', percep_res_seq, '\n')
# print('Aggregate tracking results:\n', track_res_seq, '\n')
# print(f' e.g., HOTA_LocA(0): {track_res_exp["HOTA_LocA(0)"]}')
Aggregate perception results: {'tot_TP': 0, 'tot_FP': 0, 'tot_FN': 2057, 'tot_T': 2057, 'mean_precision': 0.0, 'mean_recall': 0.0}
Exploring Pre-Made AVs¶
We've already designed a few AVs we think will be useful while you're spinning up AVstack.
import sys, inspect
clsmembers = [item[0] for item in reversed(inspect.getmembers(avstack.ego.vehicle, inspect.isclass))]
clsmembers
['VehicleEgoStack', 'PassthroughAutopilotVehicle', 'LidarPerceptionAndTrackingVehicle', 'LidarCollabPerceptionAndTrackingVehicle', 'LidarCameraPerceptionAndTrackingVehicle', 'LidarCamera3DFusionVehicle', 'Level2LidarBasedVehicle', 'Level2GtPerceptionGtLocalization', 'GroundTruthBehaviorAgent', 'GoStraightEgo', 'AutopilotWithCameraPerception']
Later, we'll get to how we can design "active" AVs that use perception and tracking information to make plans and control actuators in our environment. But for that, we'll need an AV simulator (or real hardware).