mct-2.1.1
Laughing-q 7 months ago
parent 40137785ec
commit 2ef0bdcbc3
  1. 3
      ultralytics/nn/modules/block.py
  2. 144
      ultralytics/nn/modules/head.py
  3. 14
      ultralytics/nn/tasks.py
  4. 2
      ultralytics/utils/tal.py

@ -235,7 +235,8 @@ class C2f(nn.Module):
def forward(self, x):
"""Forward pass through C2f layer."""
y = list(self.cv1(x).chunk(2, 1))
y1 = self.cv1(x).chunk(2, 1)
y = [y1[0], y1[1]]
y.extend(m(y[-1]) for m in self.m)
return self.cv2(torch.cat(y, 1))

@ -19,148 +19,70 @@ __all__ = "Detect", "Segment", "Pose", "Classify", "OBB", "RTDETRDecoder", "v10D
class Detect(nn.Module):
"""YOLOv8 Detect head for detection models."""
def __init__(self, nc: int = 80, ch=()):
"""
Detection layer for YOLOv8.
dynamic = False # force grid reconstruction
export = False # export mode
end2end = False # end2end
max_det = 300 # max_det
shape = None
anchors = torch.empty(0) # init
strides = torch.empty(0) # init
Args:
nc (int): Number of classes.
ch (List[int]): List of channel values for detection layers.
def __init__(self, nc=80, ch=()):
"""Initializes the YOLOv8 detection layer with specified number of classes and channels."""
"""
super().__init__()
self.nc = nc # number of classes
self.nl = len(ch) # number of detection layers
self.reg_max = 16 # DFL channels (ch[0] // 16 to scale 4/8/12/16/20 for n/s/m/l/x)
self.no = nc + self.reg_max * 4 # number of outputs per anchor
self.stride = torch.zeros(self.nl) # strides computed during build
self.stride = torch.Tensor([8, 16, 32])
self.feat_sizes = torch.Tensor([80, 40, 20])
self.img_size = 640 # img size
c2, c3 = max((16, ch[0] // 4, self.reg_max * 4)), max(ch[0], min(self.nc, 100)) # channels
self.cv2 = nn.ModuleList(
nn.Sequential(Conv(x, c2, 3), Conv(c2, c2, 3), nn.Conv2d(c2, 4 * self.reg_max, 1)) for x in ch
)
self.cv3 = nn.ModuleList(nn.Sequential(Conv(x, c3, 3), Conv(c3, c3, 3), nn.Conv2d(c3, self.nc, 1)) for x in ch)
self.dfl = DFL(self.reg_max) if self.reg_max > 1 else nn.Identity()
anchors, strides = (x.transpose(0, 1) for x in make_anchors(self.feat_sizes, self.stride, 0.5))
strides = strides / self.img_size
anchors = anchors * strides
self.relu1 = nn.ReLU()
self.relu2 = nn.ReLU()
self.relu3 = nn.ReLU()
self.relu4 = nn.ReLU()
if self.end2end:
self.one2one_cv2 = copy.deepcopy(self.cv2)
self.one2one_cv3 = copy.deepcopy(self.cv3)
self.anchors = anchors
self.strides = strides
def forward(self, x):
"""Concatenates and returns predicted bounding boxes and class probabilities."""
if self.end2end:
return self.forward_end2end(x)
for i in range(self.nl):
x[i] = torch.cat((self.cv2[i](x[i]), self.cv3[i](x[i])), 1)
if self.training: # Training path
return x
y = self._inference(x)
return y if self.export else (y, x)
def forward_end2end(self, x):
"""
Performs forward pass of the v10Detect module.
Args:
x (tensor): Input tensor.
Returns:
(dict, tensor): If not in training mode, returns a dictionary containing the outputs of both one2many and one2one detections.
If in training mode, returns a dictionary containing the outputs of one2many and one2one detections separately.
"""
x_detach = [xi.detach() for xi in x]
one2one = [
torch.cat((self.one2one_cv2[i](x_detach[i]), self.one2one_cv3[i](x_detach[i])), 1) for i in range(self.nl)
]
shape = x[0].shape # BCHW
for i in range(self.nl):
x[i] = torch.cat((self.cv2[i](x[i]), self.cv3[i](x[i])), 1)
if self.training: # Training path
return {"one2many": x, "one2one": one2one}
y = self._inference(one2one)
y = self.postprocess(y.permute(0, 2, 1), self.max_det, self.nc)
return y if self.export else (y, {"one2many": x, "one2one": one2one})
def _inference(self, x):
"""Decode predicted bounding boxes and class probabilities based on multiple-level feature maps."""
# Inference path
shape = x[0].shape # BCHW
x_cat = torch.cat([xi.view(shape[0], self.no, -1) for xi in x], 2)
if self.dynamic or self.shape != shape:
self.anchors, self.strides = (x.transpose(0, 1) for x in make_anchors(x, self.stride, 0.5))
self.shape = shape
box, cls = torch.cat([xi.view(shape[0], self.no, -1) for xi in x], 2).split((self.reg_max * 4, self.nc), 1)
if self.export and self.format in {"saved_model", "pb", "tflite", "edgetpu", "tfjs"}: # avoid TF FlexSplitV ops
box = x_cat[:, : self.reg_max * 4]
cls = x_cat[:, self.reg_max * 4 :]
else:
box, cls = x_cat.split((self.reg_max * 4, self.nc), 1)
y_cls = cls.sigmoid().transpose(1, 2)
if self.export and self.format in {"tflite", "edgetpu"}:
# Precompute normalization factor to increase numerical stability
# See https://github.com/ultralytics/ultralytics/issues/7371
grid_h = shape[2]
grid_w = shape[3]
grid_size = torch.tensor([grid_w, grid_h, grid_w, grid_h], device=box.device).reshape(1, 4, 1)
norm = self.strides / (self.stride[0] * grid_size)
dbox = self.decode_bboxes(self.dfl(box) * norm, self.anchors.unsqueeze(0) * norm[:, :2])
else:
dbox = self.decode_bboxes(self.dfl(box), self.anchors.unsqueeze(0)) * self.strides
dfl = self.dfl(box)
dfl = dfl * self.strides
return torch.cat((dbox, cls.sigmoid()), 1)
# box decoding
lt, rb = dfl.chunk(2, 1)
y1 = self.relu1(self.anchors.unsqueeze(0)[:, 0, :] - lt[:, 0, :])
x1 = self.relu2(self.anchors.unsqueeze(0)[:, 1, :] - lt[:, 1, :])
y2 = self.relu3(self.anchors.unsqueeze(0)[:, 0, :] + rb[:, 0, :])
x2 = self.relu4(self.anchors.unsqueeze(0)[:, 1, :] + rb[:, 1, :])
y_bb = torch.stack((x1, y1, x2, y2), 1).transpose(1, 2)
return y_bb, y_cls
def bias_init(self):
"""Initialize Detect() biases, WARNING: requires stride availability."""
m = self # self.model[-1] # Detect() module
# cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1
# ncf = math.log(0.6 / (m.nc - 0.999999)) if cf is None else torch.log(cf / cf.sum()) # nominal class frequency
for a, b, s in zip(m.cv2, m.cv3, m.stride): # from
a[-1].bias.data[:] = 1.0 # box
b[-1].bias.data[: m.nc] = math.log(5 / m.nc / (640 / s) ** 2) # cls (.01 objects, 80 classes, 640 img)
if self.end2end:
for a, b, s in zip(m.one2one_cv2, m.one2one_cv3, m.stride): # from
a[-1].bias.data[:] = 1.0 # box
b[-1].bias.data[: m.nc] = math.log(5 / m.nc / (640 / s) ** 2) # cls (.01 objects, 80 classes, 640 img)
def decode_bboxes(self, bboxes, anchors):
"""Decode bounding boxes."""
return dist2bbox(bboxes, anchors, xywh=not self.end2end, dim=1)
@staticmethod
def postprocess(preds: torch.Tensor, max_det: int, nc: int = 80):
"""
Post-processes the predictions obtained from a YOLOv10 model.
Args:
preds (torch.Tensor): The predictions obtained from the model. It should have a shape of (batch_size, num_boxes, 4 + num_classes).
max_det (int): The maximum number of detections to keep.
nc (int, optional): The number of classes. Defaults to 80.
Returns:
(torch.Tensor): The post-processed predictions with shape (batch_size, max_det, 6),
including bounding boxes, scores and cls.
"""
assert 4 + nc == preds.shape[-1]
boxes, scores = preds.split([4, nc], dim=-1)
max_scores = scores.amax(dim=-1)
max_scores, index = torch.topk(max_scores, min(max_det, max_scores.shape[1]), axis=-1)
index = index.unsqueeze(-1)
boxes = torch.gather(boxes, dim=1, index=index.repeat(1, 1, boxes.shape[-1]))
scores = torch.gather(scores, dim=1, index=index.repeat(1, 1, scores.shape[-1]))
# NOTE: simplify but result slightly lower mAP
# scores, labels = scores.max(dim=-1)
# return torch.cat([boxes, scores.unsqueeze(-1), labels.unsqueeze(-1)], dim=-1)
scores, index = torch.topk(scores.flatten(1), max_det, axis=-1)
labels = index % nc
index = index // nc
boxes = boxes.gather(dim=1, index=index.unsqueeze(-1).repeat(1, 1, boxes.shape[-1]))
return torch.cat([boxes, scores.unsqueeze(-1), labels.unsqueeze(-1).to(boxes.dtype)], dim=-1)
return bboxes
class Segment(Detect):

@ -87,7 +87,7 @@ except ImportError:
class BaseModel(nn.Module):
"""The BaseModel class serves as a base class for all the models in the Ultralytics YOLO family."""
def forward(self, x, *args, **kwargs):
def forward(self, x):
"""
Forward pass of the model on a single scale. Wrapper for `_forward_once` method.
@ -97,9 +97,13 @@ class BaseModel(nn.Module):
Returns:
(torch.Tensor): The output of the network.
"""
if isinstance(x, dict): # for cases of training and validating while training.
return self.loss(x, *args, **kwargs)
return self.predict(x, *args, **kwargs)
y = []
for m in self.model:
if m.f != -1:
x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f]
x = m(x)
y.append(x if m.i in self.save else None)
return x
def predict(self, x, profile=False, visualize=False, augment=False, embed=None):
"""
@ -323,7 +327,7 @@ class DetectionModel(BaseModel):
return self.forward(x)["one2many"]
return self.forward(x)[0] if isinstance(m, (Segment, Pose, OBB)) else self.forward(x)
m.stride = torch.tensor([s / x.shape[-2] for x in _forward(torch.zeros(1, ch, s, s))]) # forward
# m.stride = torch.tensor([s / x.shape[-2] for x in _forward(torch.zeros(1, ch, s, s))]) # forward
self.stride = m.stride
m.bias_init() # only run once
else:

@ -297,7 +297,7 @@ def make_anchors(feats, strides, grid_cell_offset=0.5):
assert feats is not None
dtype, device = feats[0].dtype, feats[0].device
for i, stride in enumerate(strides):
_, _, h, w = feats[i].shape
h, w = int(feats[i]), int(feats[i])
sx = torch.arange(end=w, device=device, dtype=dtype) + grid_cell_offset # shift x
sy = torch.arange(end=h, device=device, dtype=dtype) + grid_cell_offset # shift y
sy, sx = torch.meshgrid(sy, sx, indexing="ij") if TORCH_1_10 else torch.meshgrid(sy, sx)

Loading…
Cancel
Save