You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

580 lines
23 KiB

# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import math
import random
import numpy as np
from copy import deepcopy
from typing import List, Tuple
from collections import defaultdict
from .chip_box_utils import nms, transform_chip_boxes2image_boxes
from .chip_box_utils import find_chips_to_cover_overlaped_boxes
from .chip_box_utils import transform_chip_box
from .chip_box_utils import intersection_over_box
class AnnoCropper(object):
def __init__(self,
image_target_sizes: List[int],
valid_box_ratio_ranges: List[List[float]],
chip_target_size: int,
chip_target_stride: int,
use_neg_chip: bool=False,
max_neg_num_per_im: int=8,
max_per_img: int=-1,
nms_thresh: int=0.5):
"""
Generate chips by chip_target_size and chip_target_stride.
These two parameters just like kernel_size and stride in cnn.
Each image has its raw size. After resizing, then get its target size.
The resizing scale = target_size / raw_size.
So are chips of the image.
box_ratio = box_raw_size / image_raw_size = box_target_size / image_target_size
The 'size' above mentioned is the size of long-side of image, box or chip.
:param image_target_sizes: [2000, 1000]
:param valid_box_ratio_ranges: [[-1, 0.1],[0.08, -1]]
:param chip_target_size: 500
:param chip_target_stride: 200
"""
self.target_sizes = image_target_sizes
self.valid_box_ratio_ranges = valid_box_ratio_ranges
assert len(self.target_sizes) == len(self.valid_box_ratio_ranges)
self.scale_num = len(self.target_sizes)
self.chip_target_size = chip_target_size # is target size
self.chip_target_stride = chip_target_stride # is target stride
self.use_neg_chip = use_neg_chip
self.max_neg_num_per_im = max_neg_num_per_im
self.max_per_img = max_per_img
self.nms_thresh = nms_thresh
def crop_anno_records(self, records: List[dict]):
"""
The main logic:
# foreach record(image):
# foreach scale:
# 1 generate chips by chip size and stride for each scale
# 2 get pos chips
# - validate boxes: current scale; h,w >= 1
# - find pos chips greedily by valid gt boxes in each scale
# - for every valid gt box, find its corresponding pos chips in each scale
# 3 get neg chips
# - If given proposals, find neg boxes in them which are not in pos chips
# - If got neg boxes in last step, we find neg chips and assign neg boxes to neg chips such as 2.
# 4 sample neg chips if too much each image
# transform this image-scale annotations to chips(pos chips&neg chips) annotations
:param records, standard coco_record but with extra key `proposals`(Px4), which are predicted by stage1
model and maybe have neg boxes in them.
:return: new_records, list of dict like
{
'im_file': 'fake_image1.jpg',
'im_id': np.array([1]), # new _global_chip_id as im_id
'h': h, # chip height
'w': w, # chip width
'is_crowd': is_crowd, # Nx1 -> Mx1
'gt_class': gt_class, # Nx1 -> Mx1
'gt_bbox': gt_bbox, # Nx4 -> Mx4, 4 represents [x1,y1,x2,y2]
'gt_poly': gt_poly, # [None]xN -> [None]xM
'chip': [x1, y1, x2, y2] # added
}
Attention:
------------------------------>x
|
| (x1,y1)------
| | |
| | |
| | |
| | |
| | |
| ----------
| (x2,y2)
|
y
If we use [x1, y1, x2, y2] to represent boxes or chips,
(x1,y1) is the left-top point which is in the box,
but (x2,y2) is the right-bottom point which is not in the box.
So x1 in [0, w-1], x2 in [1, w], y1 in [0, h-1], y2 in [1,h].
And you can use x2-x1 to get width, and you can use image[y1:y2, x1:x2] to get the box area.
"""
self.chip_records = []
self._global_chip_id = 1
for r in records:
self._cur_im_pos_chips = [
] # element: (chip, boxes_idx), chip is [x1, y1, x2, y2], boxes_ids is List[int]
self._cur_im_neg_chips = [] # element: (chip, neg_box_num)
for scale_i in range(self.scale_num):
self._get_current_scale_parameters(scale_i, r)
# Cx4
chips = self._create_chips(r['h'], r['w'], self._cur_scale)
# # dict: chipid->[box_id, ...]
pos_chip2boxes_idx = self._get_valid_boxes_and_pos_chips(
r['gt_bbox'], chips)
# dict: chipid->neg_box_num
neg_chip2box_num = self._get_neg_boxes_and_chips(
chips,
list(pos_chip2boxes_idx.keys()), r.get('proposals', None))
self._add_to_cur_im_chips(chips, pos_chip2boxes_idx,
neg_chip2box_num)
cur_image_records = self._trans_all_chips2annotations(r)
self.chip_records.extend(cur_image_records)
return self.chip_records
def _add_to_cur_im_chips(self, chips, pos_chip2boxes_idx, neg_chip2box_num):
for pos_chipid, boxes_idx in pos_chip2boxes_idx.items():
chip = np.array(chips[pos_chipid]) # copy chips slice
self._cur_im_pos_chips.append((chip, boxes_idx))
if neg_chip2box_num is None:
return
for neg_chipid, neg_box_num in neg_chip2box_num.items():
chip = np.array(chips[neg_chipid])
self._cur_im_neg_chips.append((chip, neg_box_num))
def _trans_all_chips2annotations(self, r):
gt_bbox = r['gt_bbox']
im_file = r['im_file']
is_crowd = r['is_crowd']
gt_class = r['gt_class']
# gt_poly = r['gt_poly'] # [None]xN
# remaining keys: im_id, h, w
chip_records = self._trans_pos_chips2annotations(im_file, gt_bbox,
is_crowd, gt_class)
if not self.use_neg_chip:
return chip_records
sampled_neg_chips = self._sample_neg_chips()
neg_chip_records = self._trans_neg_chips2annotations(im_file,
sampled_neg_chips)
chip_records.extend(neg_chip_records)
return chip_records
def _trans_pos_chips2annotations(self, im_file, gt_bbox, is_crowd,
gt_class):
chip_records = []
for chip, boxes_idx in self._cur_im_pos_chips:
chip_bbox, final_boxes_idx = transform_chip_box(gt_bbox, boxes_idx,
chip)
x1, y1, x2, y2 = chip
chip_h = y2 - y1
chip_w = x2 - x1
rec = {
'im_file': im_file,
'im_id': np.array([self._global_chip_id]),
'h': chip_h,
'w': chip_w,
'gt_bbox': chip_bbox,
'is_crowd': is_crowd[final_boxes_idx].copy(),
'gt_class': gt_class[final_boxes_idx].copy(),
# 'gt_poly': [None] * len(final_boxes_idx),
'chip': chip
}
self._global_chip_id += 1
chip_records.append(rec)
return chip_records
def _sample_neg_chips(self):
pos_num = len(self._cur_im_pos_chips)
neg_num = len(self._cur_im_neg_chips)
sample_num = min(pos_num + 2, self.max_neg_num_per_im)
assert sample_num >= 1
if neg_num <= sample_num:
return self._cur_im_neg_chips
candidate_num = int(sample_num * 1.5)
candidate_neg_chips = sorted(
self._cur_im_neg_chips, key=lambda x: -x[1])[:candidate_num]
random.shuffle(candidate_neg_chips)
sampled_neg_chips = candidate_neg_chips[:sample_num]
return sampled_neg_chips
def _trans_neg_chips2annotations(self,
im_file: str,
sampled_neg_chips: List[Tuple]):
chip_records = []
for chip, neg_box_num in sampled_neg_chips:
x1, y1, x2, y2 = chip
chip_h = y2 - y1
chip_w = x2 - x1
rec = {
'im_file': im_file,
'im_id': np.array([self._global_chip_id]),
'h': chip_h,
'w': chip_w,
'gt_bbox': np.zeros(
(0, 4), dtype=np.float32),
'is_crowd': np.zeros(
(0, 1), dtype=np.int32),
'gt_class': np.zeros(
(0, 1), dtype=np.int32),
# 'gt_poly': [],
'chip': chip
}
self._global_chip_id += 1
chip_records.append(rec)
return chip_records
def _get_current_scale_parameters(self, scale_i, r):
im_size = max(r['h'], r['w'])
im_target_size = self.target_sizes[scale_i]
self._cur_im_size, self._cur_im_target_size = im_size, im_target_size
self._cur_scale = self._get_current_scale(im_target_size, im_size)
self._cur_valid_ratio_range = self.valid_box_ratio_ranges[scale_i]
def _get_current_scale(self, im_target_size, im_size):
return im_target_size / im_size
def _create_chips(self, h: int, w: int, scale: float):
"""
Generate chips by chip_target_size and chip_target_stride.
These two parameters just like kernel_size and stride in cnn.
:return: chips, Cx4, xy in raw size dimension
"""
chip_size = self.chip_target_size # omit target for simplicity
stride = self.chip_target_stride
width = int(scale * w)
height = int(scale * h)
min_chip_location_diff = 20 # in target size
assert chip_size >= stride
chip_overlap = chip_size - stride
if (width - chip_overlap
) % stride > min_chip_location_diff: # 不能被stride整除的部分比较大,则保留
w_steps = max(1, int(math.ceil((width - chip_overlap) / stride)))
else: # 不能被stride整除的部分比较小,则丢弃
w_steps = max(1, int(math.floor((width - chip_overlap) / stride)))
if (height - chip_overlap) % stride > min_chip_location_diff:
h_steps = max(1, int(math.ceil((height - chip_overlap) / stride)))
else:
h_steps = max(1, int(math.floor((height - chip_overlap) / stride)))
chips = list()
for j in range(h_steps):
for i in range(w_steps):
x1 = i * stride
y1 = j * stride
x2 = min(x1 + chip_size, width)
y2 = min(y1 + chip_size, height)
chips.append([x1, y1, x2, y2])
# check chip size
for item in chips:
if item[2] - item[0] > chip_size * 1.1 or item[3] - item[
1] > chip_size * 1.1:
raise ValueError(item)
chips = np.array(chips, dtype=np.float)
raw_size_chips = chips / scale
return raw_size_chips
def _get_valid_boxes_and_pos_chips(self, gt_bbox, chips):
valid_ratio_range = self._cur_valid_ratio_range
im_size = self._cur_im_size
scale = self._cur_scale
# Nx4 N
valid_boxes, valid_boxes_idx = self._validate_boxes(
valid_ratio_range, im_size, gt_bbox, scale)
# dict: chipid->[box_id, ...]
pos_chip2boxes_idx = self._find_pos_chips(chips, valid_boxes,
valid_boxes_idx)
return pos_chip2boxes_idx
def _validate_boxes(self,
valid_ratio_range: List[float],
im_size: int,
gt_boxes: 'np.array of Nx4',
scale: float):
"""
:return: valid_boxes: Nx4, valid_boxes_idx: N
"""
ws = (gt_boxes[:, 2] - gt_boxes[:, 0]).astype(np.int32)
hs = (gt_boxes[:, 3] - gt_boxes[:, 1]).astype(np.int32)
maxs = np.maximum(ws, hs)
box_ratio = maxs / im_size
mins = np.minimum(ws, hs)
target_mins = mins * scale
low = valid_ratio_range[0] if valid_ratio_range[0] > 0 else 0
high = valid_ratio_range[1] if valid_ratio_range[1] > 0 else np.finfo(
np.float).max
valid_boxes_idx = np.nonzero((low <= box_ratio) & (box_ratio < high) & (
target_mins >= 2))[0]
valid_boxes = gt_boxes[valid_boxes_idx]
return valid_boxes, valid_boxes_idx
def _find_pos_chips(self,
chips: 'Cx4',
valid_boxes: 'Bx4',
valid_boxes_idx: 'B'):
"""
:return: pos_chip2boxes_idx, dict: chipid->[box_id, ...]
"""
iob = intersection_over_box(chips, valid_boxes) # overlap, CxB
iob_threshold_to_find_chips = 1.
pos_chip_ids, _ = self._find_chips_to_cover_overlaped_boxes(
iob, iob_threshold_to_find_chips)
pos_chip_ids = set(pos_chip_ids)
iob_threshold_to_assign_box = 0.5
pos_chip2boxes_idx = self._assign_boxes_to_pos_chips(
iob, iob_threshold_to_assign_box, pos_chip_ids, valid_boxes_idx)
return pos_chip2boxes_idx
def _find_chips_to_cover_overlaped_boxes(self, iob, overlap_threshold):
return find_chips_to_cover_overlaped_boxes(iob, overlap_threshold)
def _assign_boxes_to_pos_chips(self, iob, overlap_threshold, pos_chip_ids,
valid_boxes_idx):
chip_ids, box_ids = np.nonzero(iob >= overlap_threshold)
pos_chip2boxes_idx = defaultdict(list)
for chip_id, box_id in zip(chip_ids, box_ids):
if chip_id not in pos_chip_ids:
continue
raw_gt_box_idx = valid_boxes_idx[box_id]
pos_chip2boxes_idx[chip_id].append(raw_gt_box_idx)
return pos_chip2boxes_idx
def _get_neg_boxes_and_chips(self,
chips: 'Cx4',
pos_chip_ids: 'D',
proposals: 'Px4'):
"""
:param chips:
:param pos_chip_ids:
:param proposals:
:return: neg_chip2box_num, None or dict: chipid->neg_box_num
"""
if not self.use_neg_chip:
return None
# train proposals maybe None
if proposals is None or len(proposals) < 1:
return None
valid_ratio_range = self._cur_valid_ratio_range
im_size = self._cur_im_size
scale = self._cur_scale
valid_props, _ = self._validate_boxes(valid_ratio_range, im_size,
proposals, scale)
neg_boxes = self._find_neg_boxes(chips, pos_chip_ids, valid_props)
neg_chip2box_num = self._find_neg_chips(chips, pos_chip_ids, neg_boxes)
return neg_chip2box_num
def _find_neg_boxes(self,
chips: 'Cx4',
pos_chip_ids: 'D',
valid_props: 'Px4'):
"""
:return: neg_boxes: Nx4
"""
if len(pos_chip_ids) == 0:
return valid_props
pos_chips = chips[pos_chip_ids]
iob = intersection_over_box(pos_chips, valid_props)
overlap_per_prop = np.max(iob, axis=0)
non_overlap_props_idx = overlap_per_prop < 0.5
neg_boxes = valid_props[non_overlap_props_idx]
return neg_boxes
def _find_neg_chips(self, chips: 'Cx4', pos_chip_ids: 'D',
neg_boxes: 'Nx4'):
"""
:return: neg_chip2box_num, dict: chipid->neg_box_num
"""
neg_chip_ids = np.setdiff1d(np.arange(len(chips)), pos_chip_ids)
neg_chips = chips[neg_chip_ids]
iob = intersection_over_box(neg_chips, neg_boxes)
iob_threshold_to_find_chips = 0.7
chosen_neg_chip_ids, chip_id2overlap_box_num = \
self._find_chips_to_cover_overlaped_boxes(iob, iob_threshold_to_find_chips)
neg_chipid2box_num = {}
for cid in chosen_neg_chip_ids:
box_num = chip_id2overlap_box_num[cid]
raw_chip_id = neg_chip_ids[cid]
neg_chipid2box_num[raw_chip_id] = box_num
return neg_chipid2box_num
def crop_infer_anno_records(self, records: List[dict]):
"""
transform image record to chips record
:param records:
:return: new_records, list of dict like
{
'im_file': 'fake_image1.jpg',
'im_id': np.array([1]), # new _global_chip_id as im_id
'h': h, # chip height
'w': w, # chip width
'chip': [x1, y1, x2, y2] # added
'ori_im_h': ori_im_h # added, origin image height
'ori_im_w': ori_im_w # added, origin image width
'scale_i': 0 # added,
}
"""
self.chip_records = []
self._global_chip_id = 1 # im_id start from 1
self._global_chip_id2img_id = {}
for r in records:
for scale_i in range(self.scale_num):
self._get_current_scale_parameters(scale_i, r)
# Cx4
chips = self._create_chips(r['h'], r['w'], self._cur_scale)
cur_img_chip_record = self._get_chips_records(r, chips, scale_i)
self.chip_records.extend(cur_img_chip_record)
return self.chip_records
def _get_chips_records(self, rec, chips, scale_i):
cur_img_chip_records = []
ori_im_h = rec["h"]
ori_im_w = rec["w"]
im_file = rec["im_file"]
ori_im_id = rec["im_id"]
for id, chip in enumerate(chips):
chip_rec = {}
x1, y1, x2, y2 = chip
chip_h = y2 - y1
chip_w = x2 - x1
chip_rec["im_file"] = im_file
chip_rec["im_id"] = self._global_chip_id
chip_rec["h"] = chip_h
chip_rec["w"] = chip_w
chip_rec["chip"] = chip
chip_rec["ori_im_h"] = ori_im_h
chip_rec["ori_im_w"] = ori_im_w
chip_rec["scale_i"] = scale_i
self._global_chip_id2img_id[self._global_chip_id] = int(ori_im_id)
self._global_chip_id += 1
cur_img_chip_records.append(chip_rec)
return cur_img_chip_records
def aggregate_chips_detections(self, results, records=None):
"""
# 1. transform chip dets to image dets
# 2. nms boxes per image;
# 3. format output results
:param results:
:param roidb:
:return:
"""
results = deepcopy(results)
records = records if records else self.chip_records
img_id2bbox = self._transform_chip2image_bboxes(results, records)
nms_img_id2bbox = self._nms_dets(img_id2bbox)
aggregate_results = self._reformat_results(nms_img_id2bbox)
return aggregate_results
def _transform_chip2image_bboxes(self, results, records):
# 1. Transform chip dets to image dets;
# 2. Filter valid range;
# 3. Reformat and Aggregate chip dets to Get scale_cls_dets
img_id2bbox = defaultdict(list)
for result in results:
bbox_locs = result['bbox']
bbox_nums = result['bbox_num']
if len(bbox_locs) == 1 and bbox_locs[0][
0] == -1: # current batch has no detections
# bbox_locs = array([[-1.]], dtype=float32); bbox_nums = [[1]]
# MultiClassNMS output: If there is no detected boxes for all images, lod will be set to {1} and Out only contains one value which is -1.
continue
im_ids = result['im_id'] # replace with range(len(bbox_nums))
last_bbox_num = 0
for idx, im_id in enumerate(im_ids):
cur_bbox_len = bbox_nums[idx]
bboxes = bbox_locs[last_bbox_num:last_bbox_num + cur_bbox_len]
last_bbox_num += cur_bbox_len
# box: [num_id, score, xmin, ymin, xmax, ymax]
if len(bboxes) == 0: # current image has no detections
continue
chip_rec = records[int(im_id) -
1] # im_id starts from 1, type is np.int64
image_size = max(chip_rec["ori_im_h"], chip_rec["ori_im_w"])
bboxes = transform_chip_boxes2image_boxes(
bboxes, chip_rec["chip"], chip_rec["ori_im_h"],
chip_rec["ori_im_w"])
scale_i = chip_rec["scale_i"]
cur_scale = self._get_current_scale(self.target_sizes[scale_i],
image_size)
_, valid_boxes_idx = self._validate_boxes(
self.valid_box_ratio_ranges[scale_i], image_size,
bboxes[:, 2:], cur_scale)
ori_img_id = self._global_chip_id2img_id[int(im_id)]
img_id2bbox[ori_img_id].append(bboxes[valid_boxes_idx])
return img_id2bbox
def _nms_dets(self, img_id2bbox):
# 1. NMS on each image-class
# 2. Limit number of detections to MAX_PER_IMAGE if requested
max_per_img = self.max_per_img
nms_thresh = self.nms_thresh
for img_id in img_id2bbox:
box = img_id2bbox[
img_id] # list of np.array of shape [N, 6], 6 is [label, score, x1, y1, x2, y2]
box = np.concatenate(box, axis=0)
nms_dets = nms(box, nms_thresh)
if max_per_img > 0:
if len(nms_dets) > max_per_img:
keep = np.argsort(-nms_dets[:, 1])[:max_per_img]
nms_dets = nms_dets[keep]
img_id2bbox[img_id] = nms_dets
return img_id2bbox
def _reformat_results(self, img_id2bbox):
"""reformat results"""
im_ids = img_id2bbox.keys()
results = []
for img_id in im_ids: # output by original im_id order
if len(img_id2bbox[img_id]) == 0:
bbox = np.array(
[[-1., 0., 0., 0., 0., 0.]]) # edge case: no detections
bbox_num = np.array([0])
else:
# np.array of shape [N, 6], 6 is [label, score, x1, y1, x2, y2]
bbox = img_id2bbox[img_id]
bbox_num = np.array([len(bbox)])
res = dict(im_id=np.array([[img_id]]), bbox=bbox, bbox_num=bbox_num)
results.append(res)
return results