You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
283 lines
10 KiB
283 lines
10 KiB
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. |
|
# |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
# you may not use this file except in compliance with the License. |
|
# You may obtain a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, software |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
# See the License for the specific language governing permissions and |
|
# limitations under the License. |
|
|
|
import paddle |
|
import paddle.nn as nn |
|
import paddle.nn.functional as F |
|
from paddle.nn.initializer import Normal |
|
|
|
from paddlers.models.ppdet.core.workspace import register |
|
from .bbox_head import BBoxHead, TwoFCHead, XConvNormHead |
|
from .roi_extractor import RoIAlign |
|
from ..shape_spec import ShapeSpec |
|
from ..bbox_utils import delta2bbox, clip_bbox, nonempty_bbox |
|
|
|
__all__ = ['CascadeTwoFCHead', 'CascadeXConvNormHead', 'CascadeHead'] |
|
|
|
|
|
@register |
|
class CascadeTwoFCHead(nn.Layer): |
|
__shared__ = ['num_cascade_stage'] |
|
""" |
|
Cascade RCNN bbox head with Two fc layers to extract feature |
|
|
|
Args: |
|
in_channel (int): Input channel which can be derived by from_config |
|
out_channel (int): Output channel |
|
resolution (int): Resolution of input feature map, default 7 |
|
num_cascade_stage (int): The number of cascade stage, default 3 |
|
""" |
|
|
|
def __init__(self, |
|
in_channel=256, |
|
out_channel=1024, |
|
resolution=7, |
|
num_cascade_stage=3): |
|
super(CascadeTwoFCHead, self).__init__() |
|
|
|
self.in_channel = in_channel |
|
self.out_channel = out_channel |
|
|
|
self.head_list = [] |
|
for stage in range(num_cascade_stage): |
|
head_per_stage = self.add_sublayer( |
|
str(stage), TwoFCHead(in_channel, out_channel, resolution)) |
|
self.head_list.append(head_per_stage) |
|
|
|
@classmethod |
|
def from_config(cls, cfg, input_shape): |
|
s = input_shape |
|
s = s[0] if isinstance(s, (list, tuple)) else s |
|
return {'in_channel': s.channels} |
|
|
|
@property |
|
def out_shape(self): |
|
return [ShapeSpec(channels=self.out_channel, )] |
|
|
|
def forward(self, rois_feat, stage=0): |
|
out = self.head_list[stage](rois_feat) |
|
return out |
|
|
|
|
|
@register |
|
class CascadeXConvNormHead(nn.Layer): |
|
__shared__ = ['norm_type', 'freeze_norm', 'num_cascade_stage'] |
|
""" |
|
Cascade RCNN bbox head with serveral convolution layers |
|
|
|
Args: |
|
in_channel (int): Input channels which can be derived by from_config |
|
num_convs (int): The number of conv layers |
|
conv_dim (int): The number of channels for the conv layers |
|
out_channel (int): Output channels |
|
resolution (int): Resolution of input feature map |
|
norm_type (string): Norm type, bn, gn, sync_bn are available, |
|
default `gn` |
|
freeze_norm (bool): Whether to freeze the norm |
|
num_cascade_stage (int): The number of cascade stage, default 3 |
|
""" |
|
|
|
def __init__(self, |
|
in_channel=256, |
|
num_convs=4, |
|
conv_dim=256, |
|
out_channel=1024, |
|
resolution=7, |
|
norm_type='gn', |
|
freeze_norm=False, |
|
num_cascade_stage=3): |
|
super(CascadeXConvNormHead, self).__init__() |
|
self.in_channel = in_channel |
|
self.out_channel = out_channel |
|
|
|
self.head_list = [] |
|
for stage in range(num_cascade_stage): |
|
head_per_stage = self.add_sublayer( |
|
str(stage), |
|
XConvNormHead( |
|
in_channel, |
|
num_convs, |
|
conv_dim, |
|
out_channel, |
|
resolution, |
|
norm_type, |
|
freeze_norm, |
|
stage_name='stage{}_'.format(stage))) |
|
self.head_list.append(head_per_stage) |
|
|
|
@classmethod |
|
def from_config(cls, cfg, input_shape): |
|
s = input_shape |
|
s = s[0] if isinstance(s, (list, tuple)) else s |
|
return {'in_channel': s.channels} |
|
|
|
@property |
|
def out_shape(self): |
|
return [ShapeSpec(channels=self.out_channel, )] |
|
|
|
def forward(self, rois_feat, stage=0): |
|
out = self.head_list[stage](rois_feat) |
|
return out |
|
|
|
|
|
@register |
|
class CascadeHead(BBoxHead): |
|
__shared__ = ['num_classes', 'num_cascade_stages'] |
|
__inject__ = ['bbox_assigner', 'bbox_loss'] |
|
""" |
|
Cascade RCNN bbox head |
|
|
|
Args: |
|
head (nn.Layer): Extract feature in bbox head |
|
in_channel (int): Input channel after RoI extractor |
|
roi_extractor (object): The module of RoI Extractor |
|
bbox_assigner (object): The module of Box Assigner, label and sample the |
|
box. |
|
num_classes (int): The number of classes |
|
bbox_weight (List[List[float]]): The weight to get the decode box and the |
|
length of weight is the number of cascade stage |
|
num_cascade_stages (int): THe number of stage to refine the box |
|
""" |
|
|
|
def __init__(self, |
|
head, |
|
in_channel, |
|
roi_extractor=RoIAlign().__dict__, |
|
bbox_assigner='BboxAssigner', |
|
num_classes=80, |
|
bbox_weight=[[10., 10., 5., 5.], [20.0, 20.0, 10.0, 10.0], |
|
[30.0, 30.0, 15.0, 15.0]], |
|
num_cascade_stages=3, |
|
bbox_loss=None): |
|
nn.Layer.__init__(self, ) |
|
self.head = head |
|
self.roi_extractor = roi_extractor |
|
if isinstance(roi_extractor, dict): |
|
self.roi_extractor = RoIAlign(**roi_extractor) |
|
self.bbox_assigner = bbox_assigner |
|
|
|
self.num_classes = num_classes |
|
self.bbox_weight = bbox_weight |
|
self.num_cascade_stages = num_cascade_stages |
|
self.bbox_loss = bbox_loss |
|
|
|
self.bbox_score_list = [] |
|
self.bbox_delta_list = [] |
|
for i in range(num_cascade_stages): |
|
score_name = 'bbox_score_stage{}'.format(i) |
|
delta_name = 'bbox_delta_stage{}'.format(i) |
|
bbox_score = self.add_sublayer( |
|
score_name, |
|
nn.Linear( |
|
in_channel, |
|
self.num_classes + 1, |
|
weight_attr=paddle.ParamAttr(initializer=Normal( |
|
mean=0.0, std=0.01)))) |
|
|
|
bbox_delta = self.add_sublayer( |
|
delta_name, |
|
nn.Linear( |
|
in_channel, |
|
4, |
|
weight_attr=paddle.ParamAttr(initializer=Normal( |
|
mean=0.0, std=0.001)))) |
|
self.bbox_score_list.append(bbox_score) |
|
self.bbox_delta_list.append(bbox_delta) |
|
self.assigned_label = None |
|
self.assigned_rois = None |
|
|
|
def forward(self, body_feats=None, rois=None, rois_num=None, inputs=None): |
|
""" |
|
body_feats (list[Tensor]): Feature maps from backbone |
|
rois (Tensor): RoIs generated from RPN module |
|
rois_num (Tensor): The number of RoIs in each image |
|
inputs (dict{Tensor}): The ground-truth of image |
|
""" |
|
targets = [] |
|
if self.training: |
|
rois, rois_num, targets = self.bbox_assigner(rois, rois_num, inputs) |
|
targets_list = [targets] |
|
self.assigned_rois = (rois, rois_num) |
|
self.assigned_targets = targets |
|
|
|
pred_bbox = None |
|
head_out_list = [] |
|
for i in range(self.num_cascade_stages): |
|
if i > 0: |
|
rois, rois_num = self._get_rois_from_boxes(pred_bbox, |
|
inputs['im_shape']) |
|
if self.training: |
|
rois, rois_num, targets = self.bbox_assigner( |
|
rois, rois_num, inputs, i, is_cascade=True) |
|
targets_list.append(targets) |
|
|
|
rois_feat = self.roi_extractor(body_feats, rois, rois_num) |
|
bbox_feat = self.head(rois_feat, i) |
|
scores = self.bbox_score_list[i](bbox_feat) |
|
deltas = self.bbox_delta_list[i](bbox_feat) |
|
head_out_list.append([scores, deltas, rois]) |
|
pred_bbox = self._get_pred_bbox(deltas, rois, self.bbox_weight[i]) |
|
|
|
if self.training: |
|
loss = {} |
|
for stage, value in enumerate(zip(head_out_list, targets_list)): |
|
(scores, deltas, rois), targets = value |
|
loss_stage = self.get_loss(scores, deltas, targets, rois, |
|
self.bbox_weight[stage]) |
|
for k, v in loss_stage.items(): |
|
loss[k + "_stage{}".format( |
|
stage)] = v / self.num_cascade_stages |
|
|
|
return loss, bbox_feat |
|
else: |
|
scores, deltas, self.refined_rois = self.get_prediction( |
|
head_out_list) |
|
return (deltas, scores), self.head |
|
|
|
def _get_rois_from_boxes(self, boxes, im_shape): |
|
rois = [] |
|
for i, boxes_per_image in enumerate(boxes): |
|
clip_box = clip_bbox(boxes_per_image, im_shape[i]) |
|
if self.training: |
|
keep = nonempty_bbox(clip_box) |
|
if keep.shape[0] == 0: |
|
keep = paddle.zeros([1], dtype='int32') |
|
clip_box = paddle.gather(clip_box, keep) |
|
rois.append(clip_box) |
|
rois_num = paddle.concat([paddle.shape(r)[0] for r in rois]) |
|
return rois, rois_num |
|
|
|
def _get_pred_bbox(self, deltas, proposals, weights): |
|
pred_proposals = paddle.concat(proposals) if len( |
|
proposals) > 1 else proposals[0] |
|
pred_bbox = delta2bbox(deltas, pred_proposals, weights) |
|
pred_bbox = paddle.reshape(pred_bbox, [-1, deltas.shape[-1]]) |
|
num_prop = [] |
|
for p in proposals: |
|
num_prop.append(p.shape[0]) |
|
return pred_bbox.split(num_prop) |
|
|
|
def get_prediction(self, head_out_list): |
|
""" |
|
head_out_list(List[Tensor]): scores, deltas, rois |
|
""" |
|
pred_list = [] |
|
scores_list = [F.softmax(head[0]) for head in head_out_list] |
|
scores = paddle.add_n(scores_list) / self.num_cascade_stages |
|
# Get deltas and rois from the last stage |
|
_, deltas, rois = head_out_list[-1] |
|
return scores, deltas, rois |
|
|
|
def get_refined_rois(self, ): |
|
return self.refined_rois
|
|
|