MaskRCNN:Example:ExcelReport
Mask R-CNN 결과 리포팅을 Excel파일로 출력한다.
inference_maskrcnn.py
# -*- coding: utf-8 -*-
import os
import sys
import json
import copy
import cv2
import torch
import numpy as np
from math import fabs
from datetime import datetime
from optparse import OptionParser
from torchvision import transforms
from maskrcnn_benchmark.modeling.detector import build_detection_model
from maskrcnn_benchmark.utils.checkpoint import DetectronCheckpointer
from maskrcnn_benchmark.structures.image_list import to_image_list
from maskrcnn_benchmark.modeling.roi_heads.mask_head.inference import Masker
from maskrcnn_benchmark import layers
from maskrcnn_benchmark.utils import cv2_util
from maskrcnn_benchmark.structures.keypoint import PersonKeypoints
from maskrcnn_benchmark.config import cfg
SCRIPT_VERSION_MAJOR = 1
SCRIPT_VERSION_MINOR = 0
SCRIPT_VERSION_PATCH = 0
# COCO categories for pretty print
COCO_CATEGORIES = (
"__background",
"person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck",
"boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench",
"bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra",
"giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee",
"skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove",
"skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork",
"knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli",
"carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch", "potted plant",
"bed", "dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard",
"cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book",
"clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush",
)
COCO_PERSON_INDEX = COCO_CATEGORIES.index("person")
COCO_CAR_INDEX = COCO_CATEGORIES.index("car")
COCO_BUS_INDEX = COCO_CATEGORIES.index("bus")
COCO_TRUCK_INDEX = COCO_CATEGORIES.index("truck")
DEFAULT_CONFIG_NAME = 'your_mask_rcnn_R_101_FPN_1x_caffe2'
DEFAULT_CONFIDENCE_THRESHOLD = 0.7
DEFAULT_MIN_IMAGE_SIZE = 224
DEFAULT_DEVICE = 'cuda'
def create_default_option_parser():
parser = OptionParser(add_help_option=False)
parser.add_option('-h', '--help',
dest='help',
default=False,
action='store_true',
help='Print help message.')
parser.add_option('-c', '--config-name',
dest='config_name',
metavar='{name}',
default=DEFAULT_CONFIG_NAME,
help='Set the configure. (name/file/yaml)')
parser.add_option('-d', '--device-type',
dest='device_type',
metavar='{cpu/cuda}',
default=DEFAULT_DEVICE,
help='Device type. (cuda or cpu)')
parser.add_option('-i', '--input',
dest='input',
metavar='{string}',
default=str(),
help='Input information.')
parser.add_option('-o', '--output',
dest='output',
metavar='{string}',
default=str(),
help='Output information.')
parser.add_option('--find-labels',
dest='find_labels',
metavar='{string}',
help='List of labels to find. (Comma separator)')
parser.add_option('--device-index',
dest='device_index',
metavar='{integer}',
type=int,
default=0,
help='Device index.')
parser.add_option('--min-image-size',
dest='min_image_size',
metavar='{integer}',
type=int,
default=DEFAULT_MIN_IMAGE_SIZE,
help='Device index.')
parser.add_option('--confidence-threshold',
dest='confidence_threshold',
metavar='{float}',
type=float,
default=DEFAULT_CONFIDENCE_THRESHOLD,
help='Minimum score for the prediction to be shown.')
parser.add_option('--label-names',
dest='label_names',
metavar='{file}',
help='List of printable labels file path. (LineFeed separator)')
parser.add_option('--only-bbox',
dest='only_bbox',
default=False,
action='store_true',
help='Disable segmentation information.')
parser.add_option('--disable-preview',
dest='disable_preview',
default=False,
action='store_true',
help='Disable preview window.')
parser.add_option('--verbose',
dest='verbose',
default=False,
action='store_true',
help='Verbose message.')
return parser
class Predictor(object):
def __init__(self, config,
confidence_threshold=DEFAULT_CONFIDENCE_THRESHOLD,
show_mask_heatmaps=False,
masks_per_dim=2,
min_image_size=DEFAULT_MIN_IMAGE_SIZE,
label_names=COCO_CATEGORIES):
self.config = config.clone()
self.model = build_detection_model(config)
self.model.eval()
self.device = torch.device(config.MODEL.DEVICE)
self.model.to(self.device)
self.min_image_size = min_image_size
self.label_names = label_names
save_dir = config.OUTPUT_DIR
check_pointer = DetectronCheckpointer(config, self.model, save_dir=save_dir)
_ = check_pointer.load(config.MODEL.WEIGHT)
self.transforms = self.build_transform()
mask_threshold = -1 if show_mask_heatmaps else 0.5
self.masker = Masker(threshold=mask_threshold, padding=1)
# used to make colors for each class
self.palette = torch.tensor([2 ** 25 - 1, 2 ** 15 - 1, 2 ** 21 - 1])
self.cpu_device = torch.device("cpu")
self.confidence_threshold = confidence_threshold
self.show_mask_heatmaps = show_mask_heatmaps
self.masks_per_dim = masks_per_dim
@staticmethod
def overlay_key_points(image, predictions):
key_points = predictions.get_field("keypoints")
kps = key_points.keypoints
scores = key_points.get_field("logits")
kps = torch.cat((kps[:, :, 0:2], scores[:, :, None]), dim=2).numpy()
for region in kps:
image = vis_keypoints(image, region.transpose((1, 0)))
return image
# noinspection PyUnresolvedReferences
@staticmethod
def overlay_class_names(image, predictions, label_names=COCO_CATEGORIES):
"""
Adds detected class names and scores in the positions defined by the
top-left corner of the predicted bounding box
Arguments:
image (np.ndarray): an image as returned by OpenCV
predictions (BoxList): the result of the computation by the model.
It should contain the field `scores` and `labels`.
label_names (list): List of printable label text.
"""
scores = predictions.get_field("scores").tolist()
labels = predictions.get_field("labels").tolist()
labels = [label_names[i] for i in labels]
boxes = predictions.bbox
template = "{}: {:.2f}"
for box, score, label in zip(boxes, scores, labels):
x, y = box[:2]
s = template.format(label, score)
cv2.putText(image, s, (x, y), cv2.FONT_HERSHEY_SIMPLEX, .5, (255, 255, 255), 1)
return image
def build_transform(self):
"""
Creates a basic transformation that was used to train the models
"""
config = self.config
# we are loading images with OpenCV, so we don't need to convert them
# to BGR, they are already! So all we need to do is to normalize
# by 255 if we want to convert to BGR255 format, or flip the channels
# if we want it to be in RGB in [0-1] range.
if config.INPUT.TO_BGR255:
to_bgr_transform = transforms.Lambda(lambda x: x * 255)
else:
to_bgr_transform = transforms.Lambda(lambda x: x[[2, 1, 0]])
normalize_transform = transforms.Normalize(mean=config.INPUT.PIXEL_MEAN,
std=config.INPUT.PIXEL_STD)
return transforms.Compose([transforms.ToPILImage(),
transforms.Resize(self.min_image_size),
transforms.ToTensor(),
to_bgr_transform,
normalize_transform])
def run_on_opencv_image(self, image):
"""
Arguments:
image (np.ndarray): an image as returned by OpenCV
Returns:
prediction (BoxList): the detected objects. Additional information
of the detection properties can be found in the fields of
the BoxList via `prediction.fields()`
"""
predictions = self.compute_prediction(image)
top_predictions = self.select_top_predictions(predictions)
result = image.copy()
if self.show_mask_heatmaps:
return self.create_mask_montage(result, top_predictions)
result = self.overlay_boxes(result, top_predictions)
if self.config.MODEL.MASK_ON:
result = self.overlay_mask(result, top_predictions)
if self.config.MODEL.KEYPOINT_ON:
result = Predictor.overlay_key_points(result, top_predictions)
result = Predictor.overlay_class_names(result, top_predictions, self.label_names)
return result
# noinspection PyUnresolvedReferences, PyTypeChecker
def run_on_opencv_image_for_vehicle(self, image):
"""
Arguments:
image (np.ndarray): an image as returned by OpenCV
Returns:
prediction (BoxList): the detected objects. Additional information
of the detection properties can be found in the fields of
the BoxList via `prediction.fields()`
"""
predictions = self.compute_prediction(image)
top_predictions = self.select_top_predictions(predictions)
labels = top_predictions.get_field('labels')
keep0 = torch.nonzero(labels == COCO_PERSON_INDEX).squeeze(1)
keep1 = torch.nonzero(labels == COCO_CAR_INDEX).squeeze(1)
keep2 = torch.nonzero(labels == COCO_BUS_INDEX).squeeze(1)
keep3 = torch.nonzero(labels == COCO_TRUCK_INDEX).squeeze(1)
vehicle_count = len(keep1) + len(keep2) + len(keep3)
keep = torch.cat([keep0, keep1, keep2, keep3])
top_predictions = top_predictions[keep]
result = image.copy()
if self.show_mask_heatmaps:
return self.create_mask_montage(result, top_predictions)
result = self.overlay_boxes(result, top_predictions)
if self.config.MODEL.MASK_ON:
result = self.overlay_mask(result, top_predictions)
result = Predictor.overlay_class_names(result, top_predictions, self.label_names)
text = 'Vehicle: {}'.format(vehicle_count)
text_pos = (10, 50)
font_scale = 0.5
color = (0, 0, 200)
thickness = 2
cv2.putText(result, text, text_pos, cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, thickness, cv2.LINE_AA)
return result
def compute_prediction(self, original_image):
"""
Arguments:
original_image (np.ndarray): an image as returned by OpenCV
Returns:
prediction (BoxList): the detected objects. Additional information
of the detection properties can be found in the fields of
the BoxList via `prediction.fields()`
"""
# apply pre-processing to image
image = self.transforms(original_image)
# convert to an ImageList, padded so that it is divisible by
# config.DATALOADER.SIZE_DIVISIBILITY
image_list = to_image_list(image, self.config.DATALOADER.SIZE_DIVISIBILITY)
image_list = image_list.to(self.device)
# compute predictions
with torch.no_grad():
predictions = self.model(image_list)
predictions = [o.to(self.cpu_device) for o in predictions]
# always single image is passed at a time
prediction = predictions[0]
# reshape prediction (a BoxList) into the original image size
height, width = original_image.shape[:-1]
prediction = prediction.resize((width, height))
if prediction.has_field("mask"):
# if we have masks, paste the masks in the right position
# in the image, as defined by the bounding boxes
masks = prediction.get_field("mask")
# always single image is passed at a time
masks = self.masker([masks], [prediction])[0]
prediction.add_field("mask", masks)
return prediction
def select_top_predictions(self, predictions):
"""
Select only predictions which have a `score` > self.confidence_threshold,
and returns the predictions in descending order of score
Arguments:
predictions (BoxList): the result of the computation by the model.
It should contain the field `scores`.
Returns:
prediction (BoxList): the detected objects. Additional information
of the detection properties can be found in the fields of
the BoxList via `prediction.fields()`
"""
scores = predictions.get_field("scores")
keep = torch.nonzero(scores > self.confidence_threshold).squeeze(1)
predictions = predictions[keep]
scores = predictions.get_field("scores")
_, idx = scores.sort(0, descending=True)
return predictions[idx]
def compute_colors_for_labels(self, labels):
"""
Simple function that adds fixed colors depending on the class
"""
colors = labels[:, None] * self.palette
colors = (colors % 255).numpy().astype("uint8")
return colors
# noinspection PyUnresolvedReferences
def overlay_boxes(self, image, predictions):
"""
Adds the predicted boxes on top of the image
Arguments:
image (np.ndarray): an image as returned by OpenCV
predictions (BoxList): the result of the computation by the model.
It should contain the field `labels`.
"""
labels = predictions.get_field("labels")
boxes = predictions.bbox
colors = self.compute_colors_for_labels(labels).tolist()
for box, color in zip(boxes, colors):
box = box.to(torch.int64)
top_left, bottom_right = box[:2].tolist(), box[2:].tolist()
image = cv2.rectangle(image, tuple(top_left), tuple(bottom_right), tuple(color), 1)
return image
# noinspection PyUnresolvedReferences
def overlay_mask(self, image, predictions):
"""
Adds the instances contours for each predicted object.
Each label has a different color.
Arguments:
image (np.ndarray): an image as returned by OpenCV
predictions (BoxList): the result of the computation by the model.
It should contain the field `mask` and `labels`.
"""
masks = predictions.get_field("mask").numpy()
labels = predictions.get_field("labels")
colors = self.compute_colors_for_labels(labels).tolist()
for mask, color in zip(masks, colors):
thresh = mask[0, :, :, None]
contours, hierarchy = cv2_util.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
image = cv2.drawContours(image, contours, -1, color, 3)
composite = image
return composite
# noinspection PyUnresolvedReferences
def create_mask_montage(self, image, predictions):
"""
Create a montage showing the probability heatmaps for each one one of the
detected objects
Arguments:
image (np.ndarray): an image as returned by OpenCV
predictions (BoxList): the result of the computation by the model.
It should contain the field `mask`.
"""
masks = predictions.get_field("mask")
masks_per_dim = self.masks_per_dim
masks = layers.interpolate(masks.float(), scale_factor=1 / masks_per_dim).byte()
height, width = masks.shape[-2:]
max_masks = masks_per_dim ** 2
masks = masks[:max_masks]
# handle case where we have less detections than max_masks
if len(masks) < max_masks:
masks_padded = torch.zeros(max_masks, 1, height, width, dtype=torch.uint8)
masks_padded[: len(masks)] = masks
masks = masks_padded
masks = masks.reshape(masks_per_dim, masks_per_dim, height, width)
result = torch.zeros((masks_per_dim * height, masks_per_dim * width), dtype=torch.uint8)
for y in range(masks_per_dim):
start_y = y * height
end_y = (y + 1) * height
for x in range(masks_per_dim):
start_x = x * width
end_x = (x + 1) * width
result[start_y:end_y, start_x:end_x] = masks[y, x]
image = cv2.applyColorMap(result.numpy(), cv2.COLORMAP_JET)
return image
# noinspection PyUnresolvedReferences, PyUnreachableCode
def vis_keypoints(img, kps, kp_thresh=2, alpha=0.7):
"""Visualizes keypoints (adapted from vis_one_image).
kps has shape (4, #keypoints) where 4 rows are (x, y, logit, prob).
"""
dataset_keypoints = PersonKeypoints.NAMES
kp_lines = PersonKeypoints.CONNECTIONS
# Convert from plt 0-1 RGBA colors to 0-255 BGR colors for opencv.
# cmap = plt.get_cmap('rainbow')
# colors = [cmap(i) for i in np.linspace(0, 1, len(kp_lines) + 2)]
# colors = [(c[2] * 255, c[1] * 255, c[0] * 255) for c in colors]
colors = [(255, 255, 255)]
# raise BaseException('Conflict libraries: wxWidgets and matplotlib')
# Perform the drawing on a copy of the image, to allow for blending.
kp_mask = np.copy(img)
# Draw mid shoulder / mid hip first for better visualization.
mid_shoulder = (kps[:2, dataset_keypoints.index('right_shoulder')] +
kps[:2, dataset_keypoints.index('left_shoulder')]) / 2.0
sc_mid_shoulder = np.minimum(kps[2, dataset_keypoints.index('right_shoulder')],
kps[2, dataset_keypoints.index('left_shoulder')])
mid_hip = (kps[:2, dataset_keypoints.index('right_hip')] +
kps[:2, dataset_keypoints.index('left_hip')]) / 2.0
sc_mid_hip = np.minimum(kps[2, dataset_keypoints.index('right_hip')],
kps[2, dataset_keypoints.index('left_hip')])
nose_idx = dataset_keypoints.index('nose')
if sc_mid_shoulder > kp_thresh and kps[2, nose_idx] > kp_thresh:
cv2.line(kp_mask, tuple(mid_shoulder), tuple(kps[:2, nose_idx]),
color=colors[len(kp_lines)], thickness=2, lineType=cv2.LINE_AA)
if sc_mid_shoulder > kp_thresh and sc_mid_hip > kp_thresh:
cv2.line(kp_mask, tuple(mid_shoulder), tuple(mid_hip),
color=colors[len(kp_lines) + 1], thickness=2, lineType=cv2.LINE_AA)
# Draw the key-points.
for l in range(len(kp_lines)):
i1 = kp_lines[l][0]
i2 = kp_lines[l][1]
p1 = kps[0, i1], kps[1, i1]
p2 = kps[0, i2], kps[1, i2]
if kps[2, i1] > kp_thresh and kps[2, i2] > kp_thresh:
cv2.line(kp_mask, p1, p2, color=colors[l], thickness=2, lineType=cv2.LINE_AA)
if kps[2, i1] > kp_thresh:
cv2.circle(kp_mask, p1, radius=3, color=colors[l], thickness=-1, lineType=cv2.LINE_AA)
if kps[2, i2] > kp_thresh:
cv2.circle(kp_mask, p2, radius=3, color=colors[l], thickness=-1, lineType=cv2.LINE_AA)
# Blend the keypoints.
return cv2.addWeighted(img, 1.0 - alpha, kp_mask, alpha, 0)
class DetectorObject:
"""
Integrated object information that can be obtained as a result of detection.
"""
name: str # class name.
source: str # source name.
score: float # threshold score (0.0 ~ 1.0).
bbox: list # (x, y, w, h) object bounding box (pixel)
size: list # (width, height) original image size (pixel).
points: list # polygon information (pixel).
def __init__(self, name=str(), source=str(), score=0.):
self.name = name
self.source = source
self.score = score
self.bbox = []
self.size = []
self.points = []
def __getitem__(self, item):
return self.points[item]
def __iter__(self):
return self.points
def __str__(self):
return self.name
def to_dict(self):
return {
'name': self.name,
'source': self.source,
'score': self.score,
'bbox': self.bbox,
'size': self.size,
'points': self.points,
}
def from_dict(self, obj):
self.name = str(obj['name'])
self.source = str(obj['source'])
self.score = float(obj['score'])
self.bbox = list(obj['bbox'])
self.size = list(obj['size'])
self.points = list(obj['points'])
def to_json(self):
return json.dumps(self.to_dict())
def from_json(self, json_text):
self.from_dict(json.loads(json_text))
def clone(self):
obj = DetectorObject()
obj.name = copy.deepcopy(self.name)
obj.source = copy.deepcopy(self.source)
obj.score = self.score
obj.bbox = copy.deepcopy(self.bbox)
obj.size = copy.deepcopy(self.size)
obj.points = copy.deepcopy(self.points)
return obj
@staticmethod
def create_from_dict(obj):
result = DetectorObject()
result.from_dict(obj)
return result
@staticmethod
def create_from_json(json_text):
result = DetectorObject()
result.from_json(json_text)
return result
def detection_objects_to_json(objects: list):
objects_list = []
for obj in objects:
objects_list.append(obj.to_dict())
return json.dumps(objects_list)
# noinspection PyGlobalUndefined
def create_config(config_name='your_mask_rcnn_R_101_FPN_1x_caffe2', device_type='cpu', args=()):
global GLOBAL_DICT # Bind from c2deep.
try:
exists_global_dict = isinstance(GLOBAL_DICT, dict)
except NameError:
exists_global_dict = False
if exists_global_dict and config_name in GLOBAL_DICT:
print('Use global dictionary: {}'.format(config_name))
config_content = GLOBAL_DICT[config_name]
elif os.path.exists(config_name):
print('Use config file: {}'.format(config_name))
with open(config_name, 'r') as f:
config_content = f.read()
else:
print('Use config text.')
config_content = config_name
if config_content:
cfg.merge_from_other_cfg(cfg.load_cfg(config_content))
if device_type == 'cuda' or device_type == 'cpu':
cfg.merge_from_list(['MODEL.DEVICE', device_type])
cfg.merge_from_list(args)
cfg.freeze()
return cfg.clone()
def create_predictor(config,
confidence_threshold=DEFAULT_CONFIDENCE_THRESHOLD,
min_image_size=DEFAULT_MIN_IMAGE_SIZE,
label_names=COCO_CATEGORIES):
return Predictor(config,
confidence_threshold=confidence_threshold,
min_image_size=min_image_size,
label_names=label_names)
def invert_color(color):
return 255-color[0], 255-color[1], 255-color[2]
def read_label_names(file):
with open(file, 'r') as f:
return f.read().splitlines()
def get_image_names(search_dir: str):
image_files = []
for root, dirs, files in os.walk(search_dir):
for item in files:
lower_item = item.lower()
if lower_item.endswith('.jpeg'):
image_files.append(item)
if lower_item.endswith('.jpg'):
image_files.append(item)
if lower_item.endswith('.png'):
image_files.append(item)
return image_files
# noinspection PyUnresolvedReferences
def find_contours(*args, **kwargs):
if cv2.__version__.startswith('4'):
contours, hierarchy = cv2.findContours(*args, **kwargs)
elif cv2.__version__.startswith('3'):
_, contours, hierarchy = cv2.findContours(*args, **kwargs)
else:
raise AssertionError('cv2 must be either version 3 or 4 to call this method')
return contours, hierarchy
def print_detection_objects(objects: list):
for obj in objects:
print('name: {}, score: {}'.format(obj.name, obj.score))
# noinspection PyUnresolvedReferences
def draw_detection_objects(image: np.ndarray, objects: list, color=None):
font_scale = 0.5
font = cv2.FONT_HERSHEY_SIMPLEX
thickness = 1
# font_scale = 0.4
# font = cv2.FONT_HERSHEY_PLAIN
# thickness = 1
for obj in objects:
x, y, w, h = obj.bbox
foreground = color
background = invert_color(foreground)
cv2.rectangle(image, (x, y), (x + w, y + h), background, thickness, cv2.LINE_AA)
text = '{} ({:.3f})'.format(obj.name, obj.score)
text_size = cv2.getTextSize(text, font, font_scale, thickness)
tw, th = text_size[0]
base_line = text_size[1]
cv2.rectangle(image, (x, y), (x+tw, y-th-base_line-(thickness*2)), background, cv2.FILLED)
cv2.putText(image, text, (x, y-base_line), font, font_scale, foreground, thickness, cv2.LINE_AA)
if obj.points:
draw_array = []
for points in obj.points:
draw_array.append(np.asarray(points, dtype=np.int32))
cv2.drawContours(image, draw_array, -1, background, 2, cv2.LINE_AA)
return image
# noinspection PyUnresolvedReferences
def draw_simple_detection_objects(image: np.ndarray, objects: list, color, thickness, padding):
for obj in objects:
x, y, w, h = obj.bbox
cv2.rectangle(image, (x-padding, y-padding), (x+w+padding, y+h+padding), color, thickness, cv2.LINE_AA)
# noinspection PyUnresolvedReferences
def inference_maskrcnn(predictor: Predictor,
image: np.ndarray,
source_name='',
find_labels=(),
threshold=0.0,
only_bbox=False):
predictions = predictor.compute_prediction(image)
top_predictions = predictor.select_top_predictions(predictions)
bbox = top_predictions.bbox
labels = top_predictions.get_field('labels')
scores = top_predictions.get_field('scores')
image_size = top_predictions.size
if not only_bbox:
masks = top_predictions.get_field('mask').numpy()
else:
masks = None
result = []
for i in range(len(bbox)):
label_number = int(labels[i])
try:
label_name = predictor.label_names[labels[i]]
except IndexError:
# print('Label name error - out of range: {}'.format(label_number))
label_name = str(label_number)
if find_labels and label_name not in find_labels:
continue
score = float(scores[i])
if threshold > 0.0 and score < threshold:
continue
obj = DetectorObject()
x1 = bbox[i][0]
y1 = bbox[i][1]
x2 = bbox[i][2]
y2 = bbox[i][3]
w = int(fabs(float(x2 - x1)))
h = int(fabs(float(y2 - y1)))
obj.name = label_name
obj.source = source_name
obj.score = score
obj.bbox = [int(x1), int(y1), w, h]
obj.size = [int(image_size[0]), int(image_size[1])]
if not only_bbox:
assert masks is not None
points_list, _ = find_contours(masks[i][0, :, :, None], cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
obj.points = []
for points in points_list:
obj.points.append(points.tolist())
result.append(obj)
return result
# noinspection PyUnresolvedReferences
def init_locals_box(params: dict):
config_name = str(params['config_name'])
device_type = str(params['device_type'])
args = str(params['args'])
config = create_config(config_name,
device_type if device_type else 'cpu',
args.split(',') if args else ())
confidence_threshold = float(params['confidence_threshold'])
min_image_size = int(params['min_image_size'])
label_names = params['label_names']
global GLOBAL_PREDICTOR
GLOBAL_PREDICTOR = create_predictor(config,
confidence_threshold,
min_image_size,
label_names.split(',') if label_names else ())
return GLOBAL_PREDICTOR is not None
# noinspection PyUnresolvedReferences
def run_locals_box(params: dict):
box = params['frame']
frame = np.array(box, copy=False)
find_labels = params['find_labels']
only_bbox = params['only_bbox'] is not '0'
global GLOBAL_PREDICTOR
objects = inference_maskrcnn(GLOBAL_PREDICTOR,
frame,
'',
find_labels.split(',') if find_labels else (),
0.0,
only_bbox)
return detection_objects_to_json(objects)
def response_json(code: int, msg: str, data: dict):
return json.dumps({
'code': code,
'msg': msg,
'data': data if data else dict()
})
def response_ok_json(data=None):
return response_json(200, 'OK', data)
def response_bad_request_json(data=None):
return response_json(400, 'Bad request', data)
def response_not_found_json(data=None):
return response_json(404, 'Not found', data)
def response_internal_server_error_json(data=None):
return response_json(500, 'Internal Server Error', data)
def response_service_unavailable_json(data=None):
return response_json(503, 'Service unavailable', data)
def response_debug_bad_request_json(msg: str):
return response_bad_request_json({'debug': msg})
def response_debug_not_found_json(msg: str):
return response_not_found_json({'debug': msg})
def response_debug_internal_server_error_json(msg: str):
return response_internal_server_error_json({'debug': msg})
def response_debug_service_unavailable_json(msg: str):
return response_service_unavailable_json({'debug': msg})
# noinspection PyUnresolvedReferences
def run_inference_web():
try:
from flask import Flask, request, make_response
except ImportError as e:
print(e)
return
parser = create_default_option_parser()
options, args = parser.parse_args(sys.argv[1:])
if options.help:
parser.print_help()
return True
print('Options: {}'.format(options))
print('Arguments: {}'.format(args))
app_name = 'Inference'
image_width = 1920
image_height = 1080
image_channels = 3
max_content_length = (image_width*image_height*image_channels)+(1024*1024)
app_debug = False
http_host = "0.0.0.0"
http_port = 8080
app = Flask(app_name)
app.config['MAX_CONTENT_LENGTH'] = max_content_length
input_path = options.input
find_labels = options.find_labels
min_image_size = options.min_image_size
confidence_threshold = options.confidence_threshold
only_bbox = options.only_bbox
verbose = options.verbose
if options.label_names:
label_names = read_label_names(options.label_names)
else:
label_names = COCO_CATEGORIES
config = create_config(options.config_name, options.device_type, args)
predictor = create_predictor(config, confidence_threshold, min_image_size, label_names)
source_names = list()
previews = dict()
caps = dict()
# Initialize default source.
if input_path:
default_source = 'default'
source_names.append(default_source)
caps[default_source] = cv2.VideoCapture(input_path)
else:
print('Empty input path')
for name in source_names:
if not caps[name].isOpened():
print('Video open error: ' + name)
return False
code, _ = caps[name].read()
if not code:
print('Video read error: ' + name)
return False
import threading
import time
class ThreadDoneException(Exception):
pass
# noinspection PyUnresolvedReferences
def update_video_image(sleep_time=1.0):
print('Update thread - begin.')
try:
while True:
for n in source_names:
if not caps[n].isOpened():
raise ThreadDoneException()
c, _ = caps[n].read()
if not c:
raise ThreadDoneException()
if verbose:
print('Update thread - sleep {}sec'.format(sleep_time))
time.sleep(sleep_time)
except ThreadDoneException:
pass
print('Update thread - end.')
if source_names:
video_update_thread = threading.Thread(target=update_video_image, args=(0.001,))
video_update_thread.start()
@app.route('/')
def index_main():
return response_bad_request_json()
@app.route('/version')
def version_main():
return response_ok_json({'major': SCRIPT_VERSION_MAJOR,
'minor': SCRIPT_VERSION_MINOR,
'patch': SCRIPT_VERSION_PATCH})
@app.route('/heartbeat', methods=['GET'])
def heartbeat_main():
return response_ok_json()
@app.route('/sources', methods=['GET'])
def sources_main():
return response_ok_json({'names': source_names})
@app.route('/source/<name>', methods=['GET'])
def source_name_main(name):
if name not in source_names:
return response_bad_request_json()
return response_ok_json({'name': name,
'type': 'camera',
'status': 'active' if caps[name].isOpened() else 'inactive'})
@app.route('/preview/<name>', methods=['GET'])
def preview_name_main(name):
if name not in previews:
return response_not_found_json()
result, image_buffer = cv2.imencode('.png', previews[name])
if not result:
return response_internal_server_error_json()
response = make_response(image_buffer.tobytes())
response.headers['Content-Type'] = 'image/png'
return response
def inference_and_response_ok(frame, source_name, request_find_labels, request_threshold):
if request_find_labels:
labels = list(map(lambda x: str(x), request_find_labels.split(';')))
else:
labels = request_find_labels
try:
threshold = float(request_threshold)
if threshold < 0.0:
threshold = 0.0
elif threshold > 1.0:
threshold = 1.0
except:
threshold = confidence_threshold
begin_time = datetime.now()
objects = inference_maskrcnn(predictor, frame, source_name, labels, threshold, only_bbox)
delta = datetime.now() - begin_time
milliseconds = int(delta.total_seconds() * 1000)
print('Current inference time {}ms'.format(milliseconds))
if source_name:
previews[source_name] = frame.copy()
draw_detection_objects(previews[source_name], objects, (255, 0, 0))
objects_list = []
for obj in objects:
objects_list.append(obj.to_dict())
return response_ok_json({'result': objects_list})
@app.route('/inference', methods=['POST'])
def inference_main():
"""
Test:
curl -F 'image=@image.jpg' -X POST "http://0.0.0.0:8080/inference?labels=car;&threshold=0.98&name=test"
"""
assert request.method == 'POST'
# check if the post request has the file part
if 'image' not in request.files:
return response_bad_request_json()
image = cv2.imdecode(np.frombuffer(request.files['image'].read(), np.uint8), cv2.IMREAD_COLOR)
labels = request.args.get('labels')
threshold = request.args.get('threshold')
name = request.args.get('name')
return inference_and_response_ok(image, name, labels, threshold)
@app.route('/inference/<name>', methods=['GET'])
def inference_name_main(name):
assert request.method == 'GET'
if name not in source_names:
return response_debug_bad_request_json('Unknown source name: ' + name)
if not caps[name].isOpened():
return response_debug_service_unavailable_json('The source is closed')
code, frame = caps[name].read()
if not code:
return response_debug_internal_server_error_json('An error occurred while reading the frame')
labels = request.args.get('labels')
threshold = request.args.get('threshold')
return inference_and_response_ok(frame, name, labels, threshold)
# Start Flask Web Application.
app.run(debug=app_debug, host=http_host, port=http_port)
for name in source_names:
while caps[name].isOpened():
caps[name].release()
maskrcnn_excel_report.py
# -*- coding: utf-8 -*-
import sys
import os
import cv2
import numpy as np
from datetime import datetime
SCRIPT_PATH = os.path.abspath(__file__)
SCRIPT_DIR = os.path.dirname(SCRIPT_PATH)
STORAGE_DIR = os.path.abspath(os.path.join(SCRIPT_DIR, os.pardir))
SOURCE_DIR = os.path.abspath(os.path.join(STORAGE_DIR, os.pardir))
PRESET_DIR = os.path.join(SOURCE_DIR, 'libc2deep', 'script', 'python', 'py')
CONFIG_DIR = os.path.join(SOURCE_DIR, 'libc2deep', 'res', 'static', 'caffe2')
LABELS_DIR = os.path.join(SOURCE_DIR, 'storage', 'label')
IMAGES_DIR = os.path.join(SOURCE_DIR, 'tester', 'asset', 'image')
TEMP_DIR = os.path.join(SOURCE_DIR, 'tester', 'asset', 'temp')
COLORS_PATH = os.path.join(SOURCE_DIR, 'storage', 'color', 'default.color')
sys.path.append(PRESET_DIR)
def clamp(x):
return max(0, min(x, 255))
# noinspection PyUnresolvedReferences
def fill_detection_ratio_objects(image: np.ndarray, objects: list, colors: dict):
for obj in objects:
assert obj.points
draw_array = []
for points in obj.points:
draw_array.append(np.asarray(points, dtype=np.int32))
cv2.drawContours(image, draw_array, -1, color=colors[obj.name], thickness=cv2.FILLED)
return image
# noinspection PyUnresolvedReferences
def write_excel_report(excel_path: str,
image: np.ndarray,
image_path: str,
preview_image_path: str,
objects: list,
labels: list,
colors: dict):
import openpyxl
wb = openpyxl.Workbook()
ws = wb.active
height = image.shape[0]
width = image.shape[1]
total = height * width
background_mask = np.all(image == colors['__background'], axis=-1)
background_mask_count = np.count_nonzero(background_mask)
assert background_mask_count <= total
foreground_mask = np.any(image != colors['__background'], axis=-1)
foreground_mask_count = np.count_nonzero(foreground_mask)
assert foreground_mask_count <= total
assert foreground_mask_count + background_mask_count == total
ws['A1'] = 'Foreground pixel count'
ws['B1'] = foreground_mask_count
ws['A2'] = 'Background pixel count'
ws['B2'] = background_mask_count
ws['A4'] = 'Label'
ws['B4'] = 'Pixel count'
ws['C4'] = 'Ratio' # Percentage (Pixel/Foreground)
ws['D4'] = 'Count'
ws['E4'] = 'Color'
row_index = 5
for label in filter(lambda i: i != '__background', labels):
mask = np.all(image == colors[label], axis=-1)
mask_count = np.count_nonzero(mask)
assert mask_count <= total
r = colors[label][2]
g = colors[label][1]
b = colors[label][0]
hex_color = "{0:02x}{1:02x}{2:02x}".format(clamp(r), clamp(g), clamp(b))
ws['A' + str(row_index)] = label
ws['B' + str(row_index)] = mask_count
ws['C' + str(row_index)] = '=B{}/B1'.format(row_index)
ws['D' + str(row_index)] = sum(x.name == label for x in objects)
ws['E' + str(row_index)].fill = openpyxl.styles.PatternFill(start_color=hex_color,
end_color=hex_color,
fill_type="solid")
row_index += 1
ws['F4'] = 'Label'
ws['G4'] = 'Score'
ws['H4'] = 'x'
ws['I4'] = 'y'
ws['J4'] = 'width'
ws['K4'] = 'height'
row_index = 5
for obj in objects:
ws['F' + str(row_index)] = obj.name
ws['G' + str(row_index)] = obj.score
ws['H' + str(row_index)] = obj.bbox[0]
ws['I' + str(row_index)] = obj.bbox[1]
ws['J' + str(row_index)] = obj.bbox[2]
ws['K' + str(row_index)] = obj.bbox[3]
row_index += 1
img1 = openpyxl.drawing.image.Image(image_path)
img1.anchor = 'L4'
img1.width = 300
img1.height = 300
ws.add_image(img1)
img2 = openpyxl.drawing.image.Image(preview_image_path)
img2.anchor = 'L18'
img2.width = 300
img2.height = 300
ws.add_image(img2)
wb.save(excel_path)
# noinspection PyUnresolvedReferences
def run_maskrcnn_object_ratio():
try:
from inference_maskrcnn import COCO_CATEGORIES, Predictor, \
create_default_option_parser, read_label_names, \
create_config, create_predictor, get_image_names, \
inference_maskrcnn, draw_detection_objects, \
print_detection_objects
except ImportError as e:
print(e)
return False
parser = create_default_option_parser()
options, args = parser.parse_args(sys.argv[1:])
if options.help:
parser.print_help()
return True
config_name = os.path.join(CONFIG_DIR, options.config_name + '.yaml')
input_path = options.input if options.input else IMAGES_DIR
output_path = options.output if options.output else TEMP_DIR
find_labels = options.find_labels.split(',') if options.find_labels else ()
min_image_size = options.min_image_size
confidence_threshold = options.confidence_threshold
only_bbox = False
verbose = options.verbose
label_names = list()
if options.label_names:
label_names = read_label_names(options.label_names)
if not label_names:
label_names = read_label_names(os.path.join(LABELS_DIR, options.label_names + '.label'))
if not label_names:
label_names = COCO_CATEGORIES
if verbose:
print('Config name: {}'.format(config_name))
print('Input path: {}'.format(input_path))
print('Output path: {}'.format(output_path))
print('Find labels: {}'.format(find_labels))
print('Min image size: {}'.format(min_image_size))
print('Confidence threshold: {}'.format(confidence_threshold))
print('Only bounding box: {}'.format(only_bbox))
print('Label names: {}'.format(label_names))
config = create_config(config_name, options.device_type, args)
predictor = create_predictor(config, confidence_threshold, min_image_size, label_names)
if os.path.isdir(input_path):
print('Read directory: {}'.format(input_path))
image_names = get_image_names(input_path)
elif os.path.exists(input_path):
print('Read file: {}'.format(input_path))
image_names = [input_path]
else:
print('Unknown input source: {}'.format(input_path))
return False
if not os.path.isdir(output_path):
print('The output is not a directory: {}'.format(output_path))
return False
ratio_output = os.path.join(output_path, 'ratio')
if not os.path.isdir(ratio_output):
print('Make the ratio directory: {}'.format(ratio_output))
os.mkdir(ratio_output)
colors = list()
for color in open(COLORS_PATH).read().strip().split('\n'):
colors.append(list(map(lambda x: int(x), color.split(','))))
label_colors = dict()
label_colors_index = 0
for label in label_names:
label_colors[label] = colors[label_colors_index]
label_colors_index += 1
image_names.sort()
for i in range(len(image_names)):
name = image_names[i]
frame = cv2.imread(os.path.join(input_path, name))
begin_time = datetime.now()
objects = inference_maskrcnn(predictor, frame, input_path, find_labels,
confidence_threshold, only_bbox)
delta = datetime.now() - begin_time
milliseconds = int(delta.total_seconds() * 1000)
print('Current frame ({}/{}) {} inference time {}ms'.format(i, len(image_names), name, milliseconds))
preview = frame.copy()
draw_detection_objects(preview, objects, (255, 0, 0))
if verbose:
print_detection_objects(objects)
preview_image_path = os.path.join(output_path, name)
cv2.imwrite(preview_image_path, preview)
# Image object ratio calculation:
ratio_preview = np.zeros(shape=frame.shape, dtype=frame.dtype)
objects = sorted(objects, key=lambda obj: obj.score, reverse=False)
fill_detection_ratio_objects(ratio_preview, objects, label_colors)
image_path = os.path.join(ratio_output, name)
cv2.imwrite(image_path, ratio_preview)
write_excel_report(os.path.join(ratio_output, name + '.xlsx'),
ratio_preview,
image_path,
preview_image_path,
objects,
label_names,
label_colors)
return True
if __name__ == "__main__":
run_maskrcnn_object_ratio()