|
|
|
import warnings
|
|
|
|
from typing import List, OrderedDict, Tuple, Union
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
import tensorrt as trt
|
|
|
|
|
|
|
|
warnings.filterwarnings(action='ignore', category=DeprecationWarning)
|
|
|
|
|
|
|
|
|
|
|
|
def trtweight(weights: np.ndarray) -> trt.Weights:
|
|
|
|
weights = weights.astype(weights.dtype.name)
|
|
|
|
return trt.Weights(weights)
|
|
|
|
|
|
|
|
|
|
|
|
def get_width(x: int, gw: float, divisor: int = 8) -> int:
|
|
|
|
return int(np.ceil(x * gw / divisor) * divisor)
|
|
|
|
|
|
|
|
|
|
|
|
def get_depth(x: int, gd: float) -> int:
|
|
|
|
return max(int(round(x * gd)), 1)
|
|
|
|
|
|
|
|
|
|
|
|
def Conv2d(network: trt.INetworkDefinition, weights: OrderedDict,
|
|
|
|
input: trt.ITensor, out_channel: int, ksize: int, stride: int,
|
|
|
|
group: int, layer_name: str) -> trt.ILayer:
|
|
|
|
padding = ksize // 2
|
|
|
|
conv_w = trtweight(weights[layer_name + '.weight'])
|
|
|
|
conv_b = trtweight(weights[layer_name + '.bias'])
|
|
|
|
conv = network.add_convolution_nd(input,
|
|
|
|
num_output_maps=out_channel,
|
|
|
|
kernel_shape=trt.DimsHW(ksize, ksize),
|
|
|
|
kernel=conv_w,
|
|
|
|
bias=conv_b)
|
|
|
|
assert conv, 'Add convolution_nd layer failed'
|
|
|
|
conv.stride_nd = trt.DimsHW(stride, stride)
|
|
|
|
conv.padding_nd = trt.DimsHW(padding, padding)
|
|
|
|
conv.num_groups = group
|
|
|
|
return conv
|
|
|
|
|
|
|
|
|
|
|
|
def Conv(network: trt.INetworkDefinition, weights: OrderedDict,
|
|
|
|
input: trt.ITensor, out_channel: int, ksize: int, stride: int,
|
|
|
|
group: int, layer_name: str) -> trt.ILayer:
|
|
|
|
padding = ksize // 2
|
|
|
|
if ksize > 3:
|
|
|
|
padding -= 1
|
|
|
|
conv_w = trtweight(weights[layer_name + '.conv.weight'])
|
|
|
|
conv_b = trtweight(weights[layer_name + '.conv.bias'])
|
|
|
|
|
|
|
|
conv = network.add_convolution_nd(input,
|
|
|
|
num_output_maps=out_channel,
|
|
|
|
kernel_shape=trt.DimsHW(ksize, ksize),
|
|
|
|
kernel=conv_w,
|
|
|
|
bias=conv_b)
|
|
|
|
assert conv, 'Add convolution_nd layer failed'
|
|
|
|
conv.stride_nd = trt.DimsHW(stride, stride)
|
|
|
|
conv.padding_nd = trt.DimsHW(padding, padding)
|
|
|
|
conv.num_groups = group
|
|
|
|
|
|
|
|
sigmoid = network.add_activation(conv.get_output(0),
|
|
|
|
trt.ActivationType.SIGMOID)
|
|
|
|
assert sigmoid, 'Add activation layer failed'
|
|
|
|
dot_product = network.add_elementwise(conv.get_output(0),
|
|
|
|
sigmoid.get_output(0),
|
|
|
|
trt.ElementWiseOperation.PROD)
|
|
|
|
assert dot_product, 'Add elementwise layer failed'
|
|
|
|
return dot_product
|
|
|
|
|
|
|
|
|
|
|
|
def Bottleneck(network: trt.INetworkDefinition, weights: OrderedDict,
|
|
|
|
input: trt.ITensor, c1: int, c2: int, shortcut: bool,
|
|
|
|
group: int, scale: float, layer_name: str) -> trt.ILayer:
|
|
|
|
c_ = int(c2 * scale)
|
|
|
|
conv1 = Conv(network, weights, input, c_, 3, 1, 1, layer_name + '.cv1')
|
|
|
|
conv2 = Conv(network, weights, conv1.get_output(0), c2, 3, 1, group,
|
|
|
|
layer_name + '.cv2')
|
|
|
|
if shortcut and c1 == c2:
|
|
|
|
ew = network.add_elementwise(input,
|
|
|
|
conv2.get_output(0),
|
|
|
|
op=trt.ElementWiseOperation.SUM)
|
|
|
|
assert ew, 'Add elementwise layer failed'
|
|
|
|
return ew
|
|
|
|
return conv2
|
|
|
|
|
|
|
|
|
|
|
|
def C2f(network: trt.INetworkDefinition, weights: OrderedDict,
|
|
|
|
input: trt.ITensor, cout: int, n: int, shortcut: bool, group: int,
|
|
|
|
scale: float, layer_name: str) -> trt.ILayer:
|
|
|
|
c_ = int(cout * scale) # e:expand param
|
|
|
|
conv1 = Conv(network, weights, input, 2 * c_, 1, 1, 1, layer_name + '.cv1')
|
|
|
|
y1 = conv1.get_output(0)
|
|
|
|
|
|
|
|
b, _, h, w = y1.shape
|
|
|
|
slice = network.add_slice(y1, (0, c_, 0, 0), (b, c_, h, w), (1, 1, 1, 1))
|
|
|
|
assert slice, 'Add slice layer failed'
|
|
|
|
y2 = slice.get_output(0)
|
|
|
|
|
|
|
|
input_tensors = [y1]
|
|
|
|
for i in range(n):
|
|
|
|
b = Bottleneck(network, weights, y2, c_, c_, shortcut, group, 1.0,
|
|
|
|
layer_name + '.m.' + str(i))
|
|
|
|
y2 = b.get_output(0)
|
|
|
|
input_tensors.append(y2)
|
|
|
|
|
|
|
|
cat = network.add_concatenation(input_tensors)
|
|
|
|
assert cat, 'Add concatenation layer failed'
|
|
|
|
|
|
|
|
conv2 = Conv(network, weights, cat.get_output(0), cout, 1, 1, 1,
|
|
|
|
layer_name + '.cv2')
|
|
|
|
return conv2
|
|
|
|
|
|
|
|
|
|
|
|
def SPPF(network: trt.INetworkDefinition, weights: OrderedDict,
|
|
|
|
input: trt.ITensor, c1: int, c2: int, ksize: int,
|
|
|
|
layer_name: str) -> trt.ILayer:
|
|
|
|
c_ = c1 // 2
|
|
|
|
conv1 = Conv(network, weights, input, c_, 1, 1, 1, layer_name + '.cv1')
|
|
|
|
|
|
|
|
pool1 = network.add_pooling_nd(conv1.get_output(0), trt.PoolingType.MAX,
|
|
|
|
trt.DimsHW(ksize, ksize))
|
|
|
|
assert pool1, 'Add pooling_nd layer failed'
|
|
|
|
pool1.padding_nd = trt.DimsHW(ksize // 2, ksize // 2)
|
|
|
|
pool1.stride_nd = trt.DimsHW(1, 1)
|
|
|
|
|
|
|
|
pool2 = network.add_pooling_nd(pool1.get_output(0), trt.PoolingType.MAX,
|
|
|
|
trt.DimsHW(ksize, ksize))
|
|
|
|
assert pool2, 'Add pooling_nd layer failed'
|
|
|
|
pool2.padding_nd = trt.DimsHW(ksize // 2, ksize // 2)
|
|
|
|
pool2.stride_nd = trt.DimsHW(1, 1)
|
|
|
|
|
|
|
|
pool3 = network.add_pooling_nd(pool2.get_output(0), trt.PoolingType.MAX,
|
|
|
|
trt.DimsHW(ksize, ksize))
|
|
|
|
assert pool3, 'Add pooling_nd layer failed'
|
|
|
|
pool3.padding_nd = trt.DimsHW(ksize // 2, ksize // 2)
|
|
|
|
pool3.stride_nd = trt.DimsHW(1, 1)
|
|
|
|
|
|
|
|
input_tensors = [
|
|
|
|
conv1.get_output(0),
|
|
|
|
pool1.get_output(0),
|
|
|
|
pool2.get_output(0),
|
|
|
|
pool3.get_output(0)
|
|
|
|
]
|
|
|
|
cat = network.add_concatenation(input_tensors)
|
|
|
|
assert cat, 'Add concatenation layer failed'
|
|
|
|
conv2 = Conv(network, weights, cat.get_output(0), c2, 1, 1, 1,
|
|
|
|
layer_name + '.cv2')
|
|
|
|
return conv2
|
|
|
|
|
|
|
|
|
|
|
|
def Detect(
|
|
|
|
network: trt.INetworkDefinition,
|
|
|
|
weights: OrderedDict,
|
|
|
|
input: Union[List, Tuple],
|
|
|
|
s: Union[List, Tuple],
|
|
|
|
layer_name: str,
|
|
|
|
reg_max: int = 16,
|
|
|
|
fp16: bool = True,
|
|
|
|
iou: float = 0.65,
|
|
|
|
conf: float = 0.25,
|
|
|
|
topk: int = 100,
|
|
|
|
) -> trt.ILayer:
|
|
|
|
bboxes_branch = []
|
|
|
|
scores_branch = []
|
|
|
|
anchors = []
|
|
|
|
strides = []
|
|
|
|
for i, (inp, stride) in enumerate(zip(input, s)):
|
|
|
|
h, w = inp.shape[2:]
|
|
|
|
sx = np.arange(0, w).astype(np.float16 if fp16 else np.float32) + 0.5
|
|
|
|
sy = np.arange(0, h).astype(np.float16 if fp16 else np.float32) + 0.5
|
|
|
|
sy, sx = np.meshgrid(sy, sx)
|
|
|
|
a = np.ascontiguousarray(np.stack((sy, sx), -1).reshape(-1, 2))
|
|
|
|
anchors.append(a)
|
|
|
|
strides.append(
|
|
|
|
np.full((1, h * w),
|
|
|
|
stride,
|
|
|
|
dtype=np.float16 if fp16 else np.float32))
|
|
|
|
c2 = weights[f'{layer_name}.cv2.{i}.0.conv.weight'].shape[0]
|
|
|
|
c3 = weights[f'{layer_name}.cv3.{i}.0.conv.weight'].shape[0]
|
|
|
|
nc = weights[f'{layer_name}.cv3.0.2.weight'].shape[0]
|
|
|
|
reg_max_x4 = weights[layer_name + f'.cv2.{i}.2.weight'].shape[0]
|
|
|
|
assert reg_max_x4 == reg_max * 4
|
|
|
|
b_Conv_0 = Conv(network, weights, inp, c2, 3, 1, 1,
|
|
|
|
layer_name + f'.cv2.{i}.0')
|
|
|
|
b_Conv_1 = Conv(network, weights, b_Conv_0.get_output(0), c2, 3, 1, 1,
|
|
|
|
layer_name + f'.cv2.{i}.1')
|
|
|
|
b_Conv_2 = Conv2d(network, weights, b_Conv_1.get_output(0), reg_max_x4,
|
|
|
|
1, 1, 1, layer_name + f'.cv2.{i}.2')
|
|
|
|
|
|
|
|
b_out = b_Conv_2.get_output(0)
|
|
|
|
b_shape = network.add_constant([
|
|
|
|
4,
|
|
|
|
], np.array(b_out.shape[0:1] + (4, reg_max, -1), dtype=np.int32))
|
|
|
|
assert b_shape, 'Add constant layer failed'
|
|
|
|
b_shuffle = network.add_shuffle(b_out)
|
|
|
|
assert b_shuffle, 'Add shuffle layer failed'
|
|
|
|
b_shuffle.set_input(1, b_shape.get_output(0))
|
|
|
|
b_shuffle.second_transpose = (0, 3, 1, 2)
|
|
|
|
|
|
|
|
bboxes_branch.append(b_shuffle.get_output(0))
|
|
|
|
|
|
|
|
s_Conv_0 = Conv(network, weights, inp, c3, 3, 1, 1,
|
|
|
|
layer_name + f'.cv3.{i}.0')
|
|
|
|
s_Conv_1 = Conv(network, weights, s_Conv_0.get_output(0), c3, 3, 1, 1,
|
|
|
|
layer_name + f'.cv3.{i}.1')
|
|
|
|
s_Conv_2 = Conv2d(network, weights, s_Conv_1.get_output(0), nc, 1, 1,
|
|
|
|
1, layer_name + f'.cv3.{i}.2')
|
|
|
|
s_out = s_Conv_2.get_output(0)
|
|
|
|
s_shape = network.add_constant([
|
|
|
|
3,
|
|
|
|
], np.array(s_out.shape[0:2] + (-1, ), dtype=np.int32))
|
|
|
|
assert s_shape, 'Add constant layer failed'
|
|
|
|
s_shuffle = network.add_shuffle(s_out)
|
|
|
|
assert s_shuffle, 'Add shuffle layer failed'
|
|
|
|
s_shuffle.set_input(1, s_shape.get_output(0))
|
|
|
|
s_shuffle.second_transpose = (0, 2, 1)
|
|
|
|
|
|
|
|
scores_branch.append(s_shuffle.get_output(0))
|
|
|
|
|
|
|
|
Cat_bboxes = network.add_concatenation(bboxes_branch)
|
|
|
|
assert Cat_bboxes, 'Add concatenation layer failed'
|
|
|
|
Cat_scores = network.add_concatenation(scores_branch)
|
|
|
|
assert Cat_scores, 'Add concatenation layer failed'
|
|
|
|
Cat_scores.axis = 1
|
|
|
|
|
|
|
|
Softmax = network.add_softmax(Cat_bboxes.get_output(0))
|
|
|
|
assert Softmax, 'Add softmax layer failed'
|
|
|
|
Softmax.axes = 1 << 3
|
|
|
|
|
|
|
|
SCORES = network.add_activation(Cat_scores.get_output(0),
|
|
|
|
trt.ActivationType.SIGMOID)
|
|
|
|
assert SCORES, 'Add activation layer failed'
|
|
|
|
|
|
|
|
reg_max = np.arange(
|
|
|
|
0, reg_max).astype(np.float16 if fp16 else np.float32).reshape(
|
|
|
|
(1, 1, -1, 1))
|
|
|
|
constant = network.add_constant(reg_max.shape, reg_max)
|
|
|
|
assert constant, 'Add constant layer failed'
|
|
|
|
Matmul = network.add_matrix_multiply(Softmax.get_output(0),
|
|
|
|
trt.MatrixOperation.NONE,
|
|
|
|
constant.get_output(0),
|
|
|
|
trt.MatrixOperation.NONE)
|
|
|
|
assert Matmul, 'Add matrix_multiply layer failed'
|
|
|
|
pre_bboxes = network.add_gather(
|
|
|
|
Matmul.get_output(0),
|
|
|
|
network.add_constant([
|
|
|
|
1,
|
|
|
|
], np.array([0], dtype=np.int32)).get_output(0), 3)
|
|
|
|
assert pre_bboxes, 'Add gather layer failed'
|
|
|
|
pre_bboxes.num_elementwise_dims = 1
|
|
|
|
|
|
|
|
pre_bboxes_tensor = pre_bboxes.get_output(0)
|
|
|
|
b, c, _ = pre_bboxes_tensor.shape
|
|
|
|
slice_x1y1 = network.add_slice(pre_bboxes_tensor, (0, 0, 0), (b, c, 2),
|
|
|
|
(1, 1, 1))
|
|
|
|
assert slice_x1y1, 'Add slice layer failed'
|
|
|
|
slice_x2y2 = network.add_slice(pre_bboxes_tensor, (0, 0, 2), (b, c, 2),
|
|
|
|
(1, 1, 1))
|
|
|
|
assert slice_x2y2, 'Add slice layer failed'
|
|
|
|
anchors = np.concatenate(anchors, 0)[np.newaxis]
|
|
|
|
anchors = network.add_constant(anchors.shape, anchors)
|
|
|
|
assert anchors, 'Add constant layer failed'
|
|
|
|
strides = np.concatenate(strides, 1)[..., np.newaxis]
|
|
|
|
strides = network.add_constant(strides.shape, strides)
|
|
|
|
assert strides, 'Add constant layer failed'
|
|
|
|
|
|
|
|
Sub = network.add_elementwise(anchors.get_output(0),
|
|
|
|
slice_x1y1.get_output(0),
|
|
|
|
trt.ElementWiseOperation.SUB)
|
|
|
|
assert Sub, 'Add elementwise layer failed'
|
|
|
|
Add = network.add_elementwise(anchors.get_output(0),
|
|
|
|
slice_x2y2.get_output(0),
|
|
|
|
trt.ElementWiseOperation.SUM)
|
|
|
|
assert Add, 'Add elementwise layer failed'
|
|
|
|
x1y1 = Sub.get_output(0)
|
|
|
|
x2y2 = Add.get_output(0)
|
|
|
|
|
|
|
|
Cat_bboxes_ = network.add_concatenation([x1y1, x2y2])
|
|
|
|
assert Cat_bboxes_, 'Add concatenation layer failed'
|
|
|
|
Cat_bboxes_.axis = 2
|
|
|
|
|
|
|
|
BBOXES = network.add_elementwise(Cat_bboxes_.get_output(0),
|
|
|
|
strides.get_output(0),
|
|
|
|
trt.ElementWiseOperation.PROD)
|
|
|
|
assert BBOXES, 'Add elementwise layer failed'
|
|
|
|
plugin_creator = trt.get_plugin_registry().get_plugin_creator(
|
|
|
|
'EfficientNMS_TRT', '1')
|
|
|
|
assert plugin_creator, 'Plugin EfficientNMS_TRT is not registried'
|
|
|
|
|
|
|
|
background_class = trt.PluginField('background_class',
|
|
|
|
np.array(-1, np.int32),
|
|
|
|
trt.PluginFieldType.INT32)
|
|
|
|
box_coding = trt.PluginField('box_coding', np.array(0, np.int32),
|
|
|
|
trt.PluginFieldType.INT32)
|
|
|
|
iou_threshold = trt.PluginField('iou_threshold',
|
|
|
|
np.array(iou, dtype=np.float32),
|
|
|
|
trt.PluginFieldType.FLOAT32)
|
|
|
|
max_output_boxes = trt.PluginField('max_output_boxes',
|
|
|
|
np.array(topk, np.int32),
|
|
|
|
trt.PluginFieldType.INT32)
|
|
|
|
plugin_version = trt.PluginField('plugin_version', np.array('1'),
|
|
|
|
trt.PluginFieldType.CHAR)
|
|
|
|
score_activation = trt.PluginField('score_activation',
|
|
|
|
np.array(0, np.int32),
|
|
|
|
trt.PluginFieldType.INT32)
|
|
|
|
score_threshold = trt.PluginField('score_threshold',
|
|
|
|
np.array(conf, dtype=np.float32),
|
|
|
|
trt.PluginFieldType.FLOAT32)
|
|
|
|
|
|
|
|
batched_nms_op = plugin_creator.create_plugin(
|
|
|
|
name='batched_nms',
|
|
|
|
field_collection=trt.PluginFieldCollection([
|
|
|
|
background_class, box_coding, iou_threshold, max_output_boxes,
|
|
|
|
plugin_version, score_activation, score_threshold
|
|
|
|
]))
|
|
|
|
|
|
|
|
batched_nms = network.add_plugin_v2(
|
|
|
|
inputs=[BBOXES.get_output(0),
|
|
|
|
SCORES.get_output(0)],
|
|
|
|
plugin=batched_nms_op)
|
|
|
|
|
|
|
|
batched_nms.get_output(0).name = 'num_dets'
|
|
|
|
batched_nms.get_output(1).name = 'bboxes'
|
|
|
|
batched_nms.get_output(2).name = 'scores'
|
|
|
|
batched_nms.get_output(3).name = 'labels'
|
|
|
|
|
|
|
|
return batched_nms
|