Optimize `speed estimation` solution (#16254)

Co-authored-by: UltralyticsAssistant <web@ultralytics.com>
pull/8642/head^2
Muhammad Rizwan Munawar 3 months ago committed by GitHub
parent 684519fe52
commit bf2b221d11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      docs/en/guides/speed-estimation.md
  2. 172
      ultralytics/solutions/speed_estimation.py

@ -72,7 +72,7 @@ keywords: Ultralytics YOLOv8, speed estimation, object tracking, computer vision
print("Video frame is empty or video processing has been successfully completed.") print("Video frame is empty or video processing has been successfully completed.")
break break
tracks = model.track(im0, persist=True, show=False) tracks = model.track(im0, persist=True)
im0 = speed_obj.estimate_speed(im0, tracks) im0 = speed_obj.estimate_speed(im0, tracks)
video_writer.write(im0) video_writer.write(im0)
@ -94,7 +94,6 @@ keywords: Ultralytics YOLOv8, speed estimation, object tracking, computer vision
| `reg_pts` | `list` | `[(20, 400), (1260, 400)]` | List of region points for speed estimation. | | `reg_pts` | `list` | `[(20, 400), (1260, 400)]` | List of region points for speed estimation. |
| `view_img` | `bool` | `False` | Whether to display the image with annotations. | | `view_img` | `bool` | `False` | Whether to display the image with annotations. |
| `line_thickness` | `int` | `2` | Thickness of the lines for drawing boxes and tracks. | | `line_thickness` | `int` | `2` | Thickness of the lines for drawing boxes and tracks. |
| `region_thickness` | `int` | `5` | Thickness of the region lines. |
| `spdl_dist_thresh` | `int` | `10` | Distance threshold for speed calculation. | | `spdl_dist_thresh` | `int` | `10` | Distance threshold for speed calculation. |
### Arguments `model.track` ### Arguments `model.track`

@ -13,7 +13,7 @@ from ultralytics.utils.plotting import Annotator, colors
class SpeedEstimator: class SpeedEstimator:
"""A class to estimate the speed of objects in a real-time video stream based on their tracks.""" """A class to estimate the speed of objects in a real-time video stream based on their tracks."""
def __init__(self, names, reg_pts=None, view_img=False, line_thickness=2, region_thickness=5, spdl_dist_thresh=10): def __init__(self, names, reg_pts=None, view_img=False, line_thickness=2, spdl_dist_thresh=10):
""" """
Initializes the SpeedEstimator with the given parameters. Initializes the SpeedEstimator with the given parameters.
@ -22,158 +22,94 @@ class SpeedEstimator:
reg_pts (list, optional): List of region points for speed estimation. Defaults to [(20, 400), (1260, 400)]. reg_pts (list, optional): List of region points for speed estimation. Defaults to [(20, 400), (1260, 400)].
view_img (bool, optional): Whether to display the image with annotations. Defaults to False. view_img (bool, optional): Whether to display the image with annotations. Defaults to False.
line_thickness (int, optional): Thickness of the lines for drawing boxes and tracks. Defaults to 2. line_thickness (int, optional): Thickness of the lines for drawing boxes and tracks. Defaults to 2.
region_thickness (int, optional): Thickness of the region lines. Defaults to 5.
spdl_dist_thresh (int, optional): Distance threshold for speed calculation. Defaults to 10. spdl_dist_thresh (int, optional): Distance threshold for speed calculation. Defaults to 10.
""" """
# Visual & image information
self.im0 = None
self.annotator = None
self.view_img = view_img
# Region information # Region information
self.reg_pts = reg_pts if reg_pts is not None else [(20, 400), (1260, 400)] self.reg_pts = reg_pts if reg_pts is not None else [(20, 400), (1260, 400)]
self.region_thickness = region_thickness
self.names = names # Classes names
# Tracking information # Tracking information
self.clss = None
self.names = names
self.boxes = None
self.trk_ids = None
self.trk_pts = None
self.line_thickness = line_thickness
self.trk_history = defaultdict(list) self.trk_history = defaultdict(list)
# Speed estimation information self.view_img = view_img # bool for displaying inference
self.current_time = 0 self.tf = line_thickness # line thickness for annotator
self.dist_data = {} self.spd = {} # set for speed data
self.trk_idslist = [] self.trkd_ids = [] # list for already speed_estimated and tracked ID's
self.spdl_dist_thresh = spdl_dist_thresh self.spdl = spdl_dist_thresh # Speed line distance threshold
self.trk_previous_times = {} self.trk_pt = {} # set for tracks previous time
self.trk_previous_points = {} self.trk_pp = {} # set for tracks previous point
# Check if the environment supports imshow # Check if the environment supports imshow
self.env_check = check_imshow(warn=True) self.env_check = check_imshow(warn=True)
def extract_tracks(self, tracks): def estimate_speed(self, im0, tracks):
""" """
Extracts results from the provided tracking data. Estimates the speed of objects based on tracking data.
Args: Args:
im0 (ndarray): Image.
tracks (list): List of tracks obtained from the object tracking process. tracks (list): List of tracks obtained from the object tracking process.
"""
self.boxes = tracks[0].boxes.xyxy.cpu()
self.clss = tracks[0].boxes.cls.cpu().tolist()
self.trk_ids = tracks[0].boxes.id.int().cpu().tolist()
def store_track_info(self, track_id, box):
"""
Stores track data.
Args:
track_id (int): Object track id.
box (list): Object bounding box data.
Returns: Returns:
(list): Updated tracking history for the given track_id. (ndarray): The image with annotated boxes and tracks.
""" """
track = self.trk_history[track_id] if tracks[0].boxes.id is None:
bbox_center = (float((box[0] + box[2]) / 2), float((box[1] + box[3]) / 2)) return im0
track.append(bbox_center)
if len(track) > 30: boxes = tracks[0].boxes.xyxy.cpu()
track.pop(0) clss = tracks[0].boxes.cls.cpu().tolist()
t_ids = tracks[0].boxes.id.int().cpu().tolist()
annotator = Annotator(im0, line_width=self.tf)
annotator.draw_region(reg_pts=self.reg_pts, color=(255, 0, 255), thickness=self.tf * 2)
self.trk_pts = np.hstack(track).astype(np.int32).reshape((-1, 1, 2)) for box, t_id, cls in zip(boxes, t_ids, clss):
return track track = self.trk_history[t_id]
bbox_center = (float((box[0] + box[2]) / 2), float((box[1] + box[3]) / 2))
track.append(bbox_center)
def plot_box_and_track(self, track_id, box, cls, track): if len(track) > 30:
""" track.pop(0)
Plots track and bounding box.
Args: trk_pts = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
track_id (int): Object track id.
box (list): Object bounding box data.
cls (str): Object class name.
track (list): Tracking history for drawing tracks path.
"""
speed_label = f"{int(self.dist_data[track_id])} km/h" if track_id in self.dist_data else self.names[int(cls)]
bbox_color = colors(int(track_id)) if track_id in self.dist_data else (255, 0, 255)
self.annotator.box_label(box, speed_label, bbox_color) if t_id not in self.trk_pt:
cv2.polylines(self.im0, [self.trk_pts], isClosed=False, color=(0, 255, 0), thickness=1) self.trk_pt[t_id] = 0
cv2.circle(self.im0, (int(track[-1][0]), int(track[-1][1])), 5, bbox_color, -1)
def calculate_speed(self, trk_id, track): speed_label = f"{int(self.spd[t_id])} km/h" if t_id in self.spd else self.names[int(cls)]
""" bbox_color = colors(int(t_id), True)
Calculates the speed of an object.
Args: annotator.box_label(box, speed_label, bbox_color)
trk_id (int): Object track id. cv2.polylines(im0, [trk_pts], isClosed=False, color=bbox_color, thickness=self.tf)
track (list): Tracking history for drawing tracks path. cv2.circle(im0, (int(track[-1][0]), int(track[-1][1])), self.tf * 2, bbox_color, -1)
"""
if not self.reg_pts[0][0] < track[-1][0] < self.reg_pts[1][0]:
return
if self.reg_pts[1][1] - self.spdl_dist_thresh < track[-1][1] < self.reg_pts[1][1] + self.spdl_dist_thresh:
direction = "known"
elif self.reg_pts[0][1] - self.spdl_dist_thresh < track[-1][1] < self.reg_pts[0][1] + self.spdl_dist_thresh:
direction = "known"
else:
direction = "unknown"
if self.trk_previous_times.get(trk_id) != 0 and direction != "unknown" and trk_id not in self.trk_idslist:
self.trk_idslist.append(trk_id)
time_difference = time() - self.trk_previous_times[trk_id]
if time_difference > 0:
dist_difference = np.abs(track[-1][1] - self.trk_previous_points[trk_id][1])
speed = dist_difference / time_difference
self.dist_data[trk_id] = speed
self.trk_previous_times[trk_id] = time()
self.trk_previous_points[trk_id] = track[-1]
def estimate_speed(self, im0, tracks, region_color=(255, 0, 0)):
"""
Estimates the speed of objects based on tracking data.
Args: # Calculation of object speed
im0 (ndarray): Image. if not self.reg_pts[0][0] < track[-1][0] < self.reg_pts[1][0]:
tracks (list): List of tracks obtained from the object tracking process. return
region_color (tuple, optional): Color to use when drawing regions. Defaults to (255, 0, 0). if self.reg_pts[1][1] - self.spdl < track[-1][1] < self.reg_pts[1][1] + self.spdl:
direction = "known"
elif self.reg_pts[0][1] - self.spdl < track[-1][1] < self.reg_pts[0][1] + self.spdl:
direction = "known"
else:
direction = "unknown"
Returns: if self.trk_pt.get(t_id) != 0 and direction != "unknown" and t_id not in self.trkd_ids:
(ndarray): The image with annotated boxes and tracks. self.trkd_ids.append(t_id)
"""
self.im0 = im0
if tracks[0].boxes.id is None:
if self.view_img and self.env_check:
self.display_frames()
return im0
self.extract_tracks(tracks) time_difference = time() - self.trk_pt[t_id]
self.annotator = Annotator(self.im0, line_width=self.line_thickness) if time_difference > 0:
self.annotator.draw_region(reg_pts=self.reg_pts, color=region_color, thickness=self.region_thickness) self.spd[t_id] = np.abs(track[-1][1] - self.trk_pp[t_id][1]) / time_difference
for box, trk_id, cls in zip(self.boxes, self.trk_ids, self.clss): self.trk_pt[t_id] = time()
track = self.store_track_info(trk_id, box) self.trk_pp[t_id] = track[-1]
if trk_id not in self.trk_previous_times:
self.trk_previous_times[trk_id] = 0
self.plot_box_and_track(trk_id, box, cls, track)
self.calculate_speed(trk_id, track)
if self.view_img and self.env_check: if self.view_img and self.env_check:
self.display_frames() cv2.imshow("Ultralytics Speed Estimation", im0)
if cv2.waitKey(1) & 0xFF == ord("q"):
return
return im0 return im0
def display_frames(self):
"""Displays the current frame."""
cv2.imshow("Ultralytics Speed Estimation", self.im0)
if cv2.waitKey(1) & 0xFF == ord("q"):
return
if __name__ == "__main__": if __name__ == "__main__":
names = {0: "person", 1: "car"} # example class names names = {0: "person", 1: "car"} # example class names

Loading…
Cancel
Save