Skip to content

MaskRCNN:Example:ExcelReport

Mask R-CNN 결과 리포팅을 Excel파일로 출력한다.

inference_maskrcnn.py

# -*- coding: utf-8 -*-

import os
import sys
import json
import copy

import cv2
import torch
import numpy as np

from math import fabs
from datetime import datetime
from optparse import OptionParser
from torchvision import transforms

from maskrcnn_benchmark.modeling.detector import build_detection_model
from maskrcnn_benchmark.utils.checkpoint import DetectronCheckpointer
from maskrcnn_benchmark.structures.image_list import to_image_list
from maskrcnn_benchmark.modeling.roi_heads.mask_head.inference import Masker
from maskrcnn_benchmark import layers
from maskrcnn_benchmark.utils import cv2_util
from maskrcnn_benchmark.structures.keypoint import PersonKeypoints
from maskrcnn_benchmark.config import cfg


SCRIPT_VERSION_MAJOR = 1
SCRIPT_VERSION_MINOR = 0
SCRIPT_VERSION_PATCH = 0

# COCO categories for pretty print
COCO_CATEGORIES = (
    "__background",
    "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck",
    "boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench",
    "bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra",
    "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee",
    "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove",
    "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork",
    "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli",
    "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch", "potted plant",
    "bed", "dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard",
    "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book",
    "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush",
)
COCO_PERSON_INDEX = COCO_CATEGORIES.index("person")
COCO_CAR_INDEX = COCO_CATEGORIES.index("car")
COCO_BUS_INDEX = COCO_CATEGORIES.index("bus")
COCO_TRUCK_INDEX = COCO_CATEGORIES.index("truck")

DEFAULT_CONFIG_NAME = 'your_mask_rcnn_R_101_FPN_1x_caffe2'
DEFAULT_CONFIDENCE_THRESHOLD = 0.7
DEFAULT_MIN_IMAGE_SIZE = 224
DEFAULT_DEVICE = 'cuda'


def create_default_option_parser():
    parser = OptionParser(add_help_option=False)
    parser.add_option('-h', '--help',
                      dest='help',
                      default=False,
                      action='store_true',
                      help='Print help message.')
    parser.add_option('-c', '--config-name',
                      dest='config_name',
                      metavar='{name}',
                      default=DEFAULT_CONFIG_NAME,
                      help='Set the configure. (name/file/yaml)')
    parser.add_option('-d', '--device-type',
                      dest='device_type',
                      metavar='{cpu/cuda}',
                      default=DEFAULT_DEVICE,
                      help='Device type. (cuda or cpu)')
    parser.add_option('-i', '--input',
                      dest='input',
                      metavar='{string}',
                      default=str(),
                      help='Input information.')
    parser.add_option('-o', '--output',
                      dest='output',
                      metavar='{string}',
                      default=str(),
                      help='Output information.')
    parser.add_option('--find-labels',
                      dest='find_labels',
                      metavar='{string}',
                      help='List of labels to find. (Comma separator)')
    parser.add_option('--device-index',
                      dest='device_index',
                      metavar='{integer}',
                      type=int,
                      default=0,
                      help='Device index.')
    parser.add_option('--min-image-size',
                      dest='min_image_size',
                      metavar='{integer}',
                      type=int,
                      default=DEFAULT_MIN_IMAGE_SIZE,
                      help='Device index.')
    parser.add_option('--confidence-threshold',
                      dest='confidence_threshold',
                      metavar='{float}',
                      type=float,
                      default=DEFAULT_CONFIDENCE_THRESHOLD,
                      help='Minimum score for the prediction to be shown.')
    parser.add_option('--label-names',
                      dest='label_names',
                      metavar='{file}',
                      help='List of printable labels file path. (LineFeed separator)')
    parser.add_option('--only-bbox',
                      dest='only_bbox',
                      default=False,
                      action='store_true',
                      help='Disable segmentation information.')
    parser.add_option('--disable-preview',
                      dest='disable_preview',
                      default=False,
                      action='store_true',
                      help='Disable preview window.')
    parser.add_option('--verbose',
                      dest='verbose',
                      default=False,
                      action='store_true',
                      help='Verbose message.')
    return parser


class Predictor(object):

    def __init__(self, config,
                 confidence_threshold=DEFAULT_CONFIDENCE_THRESHOLD,
                 show_mask_heatmaps=False,
                 masks_per_dim=2,
                 min_image_size=DEFAULT_MIN_IMAGE_SIZE,
                 label_names=COCO_CATEGORIES):

        self.config = config.clone()
        self.model = build_detection_model(config)
        self.model.eval()
        self.device = torch.device(config.MODEL.DEVICE)
        self.model.to(self.device)
        self.min_image_size = min_image_size
        self.label_names = label_names

        save_dir = config.OUTPUT_DIR
        check_pointer = DetectronCheckpointer(config, self.model, save_dir=save_dir)
        _ = check_pointer.load(config.MODEL.WEIGHT)

        self.transforms = self.build_transform()

        mask_threshold = -1 if show_mask_heatmaps else 0.5
        self.masker = Masker(threshold=mask_threshold, padding=1)

        # used to make colors for each class
        self.palette = torch.tensor([2 ** 25 - 1, 2 ** 15 - 1, 2 ** 21 - 1])

        self.cpu_device = torch.device("cpu")
        self.confidence_threshold = confidence_threshold
        self.show_mask_heatmaps = show_mask_heatmaps
        self.masks_per_dim = masks_per_dim

    @staticmethod
    def overlay_key_points(image, predictions):
        key_points = predictions.get_field("keypoints")
        kps = key_points.keypoints
        scores = key_points.get_field("logits")
        kps = torch.cat((kps[:, :, 0:2], scores[:, :, None]), dim=2).numpy()
        for region in kps:
            image = vis_keypoints(image, region.transpose((1, 0)))
        return image

    # noinspection PyUnresolvedReferences
    @staticmethod
    def overlay_class_names(image, predictions, label_names=COCO_CATEGORIES):
        """
        Adds detected class names and scores in the positions defined by the
        top-left corner of the predicted bounding box

        Arguments:
            image (np.ndarray): an image as returned by OpenCV
            predictions (BoxList): the result of the computation by the model.
                It should contain the field `scores` and `labels`.
            label_names (list): List of printable label text.
        """

        scores = predictions.get_field("scores").tolist()
        labels = predictions.get_field("labels").tolist()
        labels = [label_names[i] for i in labels]
        boxes = predictions.bbox

        template = "{}: {:.2f}"
        for box, score, label in zip(boxes, scores, labels):
            x, y = box[:2]
            s = template.format(label, score)
            cv2.putText(image, s, (x, y), cv2.FONT_HERSHEY_SIMPLEX, .5, (255, 255, 255), 1)
        return image

    def build_transform(self):
        """
        Creates a basic transformation that was used to train the models
        """
        config = self.config

        # we are loading images with OpenCV, so we don't need to convert them
        # to BGR, they are already! So all we need to do is to normalize
        # by 255 if we want to convert to BGR255 format, or flip the channels
        # if we want it to be in RGB in [0-1] range.
        if config.INPUT.TO_BGR255:
            to_bgr_transform = transforms.Lambda(lambda x: x * 255)
        else:
            to_bgr_transform = transforms.Lambda(lambda x: x[[2, 1, 0]])

        normalize_transform = transforms.Normalize(mean=config.INPUT.PIXEL_MEAN,
                                                   std=config.INPUT.PIXEL_STD)
        return transforms.Compose([transforms.ToPILImage(),
                                   transforms.Resize(self.min_image_size),
                                   transforms.ToTensor(),
                                   to_bgr_transform,
                                   normalize_transform])

    def run_on_opencv_image(self, image):
        """
        Arguments:
            image (np.ndarray): an image as returned by OpenCV

        Returns:
            prediction (BoxList): the detected objects. Additional information
                of the detection properties can be found in the fields of
                the BoxList via `prediction.fields()`
        """

        predictions = self.compute_prediction(image)
        top_predictions = self.select_top_predictions(predictions)

        result = image.copy()
        if self.show_mask_heatmaps:
            return self.create_mask_montage(result, top_predictions)
        result = self.overlay_boxes(result, top_predictions)
        if self.config.MODEL.MASK_ON:
            result = self.overlay_mask(result, top_predictions)
        if self.config.MODEL.KEYPOINT_ON:
            result = Predictor.overlay_key_points(result, top_predictions)
        result = Predictor.overlay_class_names(result, top_predictions, self.label_names)

        return result

    # noinspection PyUnresolvedReferences, PyTypeChecker
    def run_on_opencv_image_for_vehicle(self, image):
        """
        Arguments:
            image (np.ndarray): an image as returned by OpenCV

        Returns:
            prediction (BoxList): the detected objects. Additional information
                of the detection properties can be found in the fields of
                the BoxList via `prediction.fields()`
        """

        predictions = self.compute_prediction(image)
        top_predictions = self.select_top_predictions(predictions)

        labels = top_predictions.get_field('labels')
        keep0 = torch.nonzero(labels == COCO_PERSON_INDEX).squeeze(1)
        keep1 = torch.nonzero(labels == COCO_CAR_INDEX).squeeze(1)
        keep2 = torch.nonzero(labels == COCO_BUS_INDEX).squeeze(1)
        keep3 = torch.nonzero(labels == COCO_TRUCK_INDEX).squeeze(1)
        vehicle_count = len(keep1) + len(keep2) + len(keep3)
        keep = torch.cat([keep0, keep1, keep2, keep3])
        top_predictions = top_predictions[keep]

        result = image.copy()
        if self.show_mask_heatmaps:
            return self.create_mask_montage(result, top_predictions)
        result = self.overlay_boxes(result, top_predictions)
        if self.config.MODEL.MASK_ON:
            result = self.overlay_mask(result, top_predictions)
        result = Predictor.overlay_class_names(result, top_predictions, self.label_names)

        text = 'Vehicle: {}'.format(vehicle_count)
        text_pos = (10, 50)
        font_scale = 0.5
        color = (0, 0, 200)
        thickness = 2
        cv2.putText(result, text, text_pos, cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, thickness, cv2.LINE_AA)

        return result

    def compute_prediction(self, original_image):
        """
        Arguments:
            original_image (np.ndarray): an image as returned by OpenCV

        Returns:
            prediction (BoxList): the detected objects. Additional information
                of the detection properties can be found in the fields of
                the BoxList via `prediction.fields()`
        """

        # apply pre-processing to image
        image = self.transforms(original_image)

        # convert to an ImageList, padded so that it is divisible by
        # config.DATALOADER.SIZE_DIVISIBILITY
        image_list = to_image_list(image, self.config.DATALOADER.SIZE_DIVISIBILITY)
        image_list = image_list.to(self.device)

        # compute predictions
        with torch.no_grad():
            predictions = self.model(image_list)
        predictions = [o.to(self.cpu_device) for o in predictions]

        # always single image is passed at a time
        prediction = predictions[0]

        # reshape prediction (a BoxList) into the original image size
        height, width = original_image.shape[:-1]
        prediction = prediction.resize((width, height))

        if prediction.has_field("mask"):
            # if we have masks, paste the masks in the right position
            # in the image, as defined by the bounding boxes
            masks = prediction.get_field("mask")
            # always single image is passed at a time
            masks = self.masker([masks], [prediction])[0]
            prediction.add_field("mask", masks)
        return prediction

    def select_top_predictions(self, predictions):
        """
        Select only predictions which have a `score` > self.confidence_threshold,
        and returns the predictions in descending order of score

        Arguments:
            predictions (BoxList): the result of the computation by the model.
                It should contain the field `scores`.

        Returns:
            prediction (BoxList): the detected objects. Additional information
                of the detection properties can be found in the fields of
                the BoxList via `prediction.fields()`
        """

        scores = predictions.get_field("scores")
        keep = torch.nonzero(scores > self.confidence_threshold).squeeze(1)
        predictions = predictions[keep]
        scores = predictions.get_field("scores")
        _, idx = scores.sort(0, descending=True)
        return predictions[idx]

    def compute_colors_for_labels(self, labels):
        """
        Simple function that adds fixed colors depending on the class
        """

        colors = labels[:, None] * self.palette
        colors = (colors % 255).numpy().astype("uint8")
        return colors

    # noinspection PyUnresolvedReferences
    def overlay_boxes(self, image, predictions):
        """
        Adds the predicted boxes on top of the image

        Arguments:
            image (np.ndarray): an image as returned by OpenCV
            predictions (BoxList): the result of the computation by the model.
                It should contain the field `labels`.
        """

        labels = predictions.get_field("labels")
        boxes = predictions.bbox

        colors = self.compute_colors_for_labels(labels).tolist()

        for box, color in zip(boxes, colors):
            box = box.to(torch.int64)
            top_left, bottom_right = box[:2].tolist(), box[2:].tolist()
            image = cv2.rectangle(image, tuple(top_left), tuple(bottom_right), tuple(color), 1)
        return image

    # noinspection PyUnresolvedReferences
    def overlay_mask(self, image, predictions):
        """
        Adds the instances contours for each predicted object.
        Each label has a different color.

        Arguments:
            image (np.ndarray): an image as returned by OpenCV
            predictions (BoxList): the result of the computation by the model.
                It should contain the field `mask` and `labels`.
        """

        masks = predictions.get_field("mask").numpy()
        labels = predictions.get_field("labels")

        colors = self.compute_colors_for_labels(labels).tolist()

        for mask, color in zip(masks, colors):
            thresh = mask[0, :, :, None]
            contours, hierarchy = cv2_util.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
            image = cv2.drawContours(image, contours, -1, color, 3)

        composite = image
        return composite

    # noinspection PyUnresolvedReferences
    def create_mask_montage(self, image, predictions):
        """
        Create a montage showing the probability heatmaps for each one one of the
        detected objects

        Arguments:
            image (np.ndarray): an image as returned by OpenCV
            predictions (BoxList): the result of the computation by the model.
                It should contain the field `mask`.
        """

        masks = predictions.get_field("mask")
        masks_per_dim = self.masks_per_dim
        masks = layers.interpolate(masks.float(), scale_factor=1 / masks_per_dim).byte()
        height, width = masks.shape[-2:]
        max_masks = masks_per_dim ** 2
        masks = masks[:max_masks]

        # handle case where we have less detections than max_masks
        if len(masks) < max_masks:
            masks_padded = torch.zeros(max_masks, 1, height, width, dtype=torch.uint8)
            masks_padded[: len(masks)] = masks
            masks = masks_padded
        masks = masks.reshape(masks_per_dim, masks_per_dim, height, width)
        result = torch.zeros((masks_per_dim * height, masks_per_dim * width), dtype=torch.uint8)
        for y in range(masks_per_dim):
            start_y = y * height
            end_y = (y + 1) * height
            for x in range(masks_per_dim):
                start_x = x * width
                end_x = (x + 1) * width
                result[start_y:end_y, start_x:end_x] = masks[y, x]

        image = cv2.applyColorMap(result.numpy(), cv2.COLORMAP_JET)
        return image


# noinspection PyUnresolvedReferences, PyUnreachableCode
def vis_keypoints(img, kps, kp_thresh=2, alpha=0.7):
    """Visualizes keypoints (adapted from vis_one_image).
    kps has shape (4, #keypoints) where 4 rows are (x, y, logit, prob).
    """

    dataset_keypoints = PersonKeypoints.NAMES
    kp_lines = PersonKeypoints.CONNECTIONS

    # Convert from plt 0-1 RGBA colors to 0-255 BGR colors for opencv.
    # cmap = plt.get_cmap('rainbow')
    # colors = [cmap(i) for i in np.linspace(0, 1, len(kp_lines) + 2)]
    # colors = [(c[2] * 255, c[1] * 255, c[0] * 255) for c in colors]
    colors = [(255, 255, 255)]

    # raise BaseException('Conflict libraries: wxWidgets and matplotlib')

    # Perform the drawing on a copy of the image, to allow for blending.
    kp_mask = np.copy(img)

    # Draw mid shoulder / mid hip first for better visualization.
    mid_shoulder = (kps[:2, dataset_keypoints.index('right_shoulder')] +
                    kps[:2, dataset_keypoints.index('left_shoulder')]) / 2.0
    sc_mid_shoulder = np.minimum(kps[2, dataset_keypoints.index('right_shoulder')],
                                 kps[2, dataset_keypoints.index('left_shoulder')])
    mid_hip = (kps[:2, dataset_keypoints.index('right_hip')] +
               kps[:2, dataset_keypoints.index('left_hip')]) / 2.0
    sc_mid_hip = np.minimum(kps[2, dataset_keypoints.index('right_hip')],
                            kps[2, dataset_keypoints.index('left_hip')])
    nose_idx = dataset_keypoints.index('nose')
    if sc_mid_shoulder > kp_thresh and kps[2, nose_idx] > kp_thresh:
        cv2.line(kp_mask, tuple(mid_shoulder), tuple(kps[:2, nose_idx]),
                 color=colors[len(kp_lines)], thickness=2, lineType=cv2.LINE_AA)
    if sc_mid_shoulder > kp_thresh and sc_mid_hip > kp_thresh:
        cv2.line(kp_mask, tuple(mid_shoulder), tuple(mid_hip),
                 color=colors[len(kp_lines) + 1], thickness=2, lineType=cv2.LINE_AA)

    # Draw the key-points.
    for l in range(len(kp_lines)):
        i1 = kp_lines[l][0]
        i2 = kp_lines[l][1]
        p1 = kps[0, i1], kps[1, i1]
        p2 = kps[0, i2], kps[1, i2]
        if kps[2, i1] > kp_thresh and kps[2, i2] > kp_thresh:
            cv2.line(kp_mask, p1, p2, color=colors[l], thickness=2, lineType=cv2.LINE_AA)
        if kps[2, i1] > kp_thresh:
            cv2.circle(kp_mask, p1, radius=3, color=colors[l], thickness=-1, lineType=cv2.LINE_AA)
        if kps[2, i2] > kp_thresh:
            cv2.circle(kp_mask, p2, radius=3, color=colors[l], thickness=-1, lineType=cv2.LINE_AA)

    # Blend the keypoints.
    return cv2.addWeighted(img, 1.0 - alpha, kp_mask, alpha, 0)


class DetectorObject:
    """
    Integrated object information that can be obtained as a result of detection.
    """

    name: str  # class name.
    source: str  # source name.
    score: float  # threshold score (0.0 ~ 1.0).
    bbox: list  # (x, y, w, h) object bounding box (pixel)
    size: list  # (width, height) original image size (pixel).
    points: list  # polygon information (pixel).

    def __init__(self, name=str(), source=str(), score=0.):
        self.name = name
        self.source = source
        self.score = score
        self.bbox = []
        self.size = []
        self.points = []

    def __getitem__(self, item):
        return self.points[item]

    def __iter__(self):
        return self.points

    def __str__(self):
        return self.name

    def to_dict(self):
        return {
            'name': self.name,
            'source': self.source,
            'score': self.score,
            'bbox': self.bbox,
            'size': self.size,
            'points': self.points,
        }

    def from_dict(self, obj):
        self.name = str(obj['name'])
        self.source = str(obj['source'])
        self.score = float(obj['score'])
        self.bbox = list(obj['bbox'])
        self.size = list(obj['size'])
        self.points = list(obj['points'])

    def to_json(self):
        return json.dumps(self.to_dict())

    def from_json(self, json_text):
        self.from_dict(json.loads(json_text))

    def clone(self):
        obj = DetectorObject()
        obj.name = copy.deepcopy(self.name)
        obj.source = copy.deepcopy(self.source)
        obj.score = self.score
        obj.bbox = copy.deepcopy(self.bbox)
        obj.size = copy.deepcopy(self.size)
        obj.points = copy.deepcopy(self.points)
        return obj

    @staticmethod
    def create_from_dict(obj):
        result = DetectorObject()
        result.from_dict(obj)
        return result

    @staticmethod
    def create_from_json(json_text):
        result = DetectorObject()
        result.from_json(json_text)
        return result


def detection_objects_to_json(objects: list):
    objects_list = []
    for obj in objects:
        objects_list.append(obj.to_dict())
    return json.dumps(objects_list)


# noinspection PyGlobalUndefined
def create_config(config_name='your_mask_rcnn_R_101_FPN_1x_caffe2', device_type='cpu', args=()):
    global GLOBAL_DICT  # Bind from c2deep.
    try:
        exists_global_dict = isinstance(GLOBAL_DICT, dict)
    except NameError:
        exists_global_dict = False

    if exists_global_dict and config_name in GLOBAL_DICT:
        print('Use global dictionary: {}'.format(config_name))
        config_content = GLOBAL_DICT[config_name]
    elif os.path.exists(config_name):
        print('Use config file: {}'.format(config_name))
        with open(config_name, 'r') as f:
            config_content = f.read()
    else:
        print('Use config text.')
        config_content = config_name

    if config_content:
        cfg.merge_from_other_cfg(cfg.load_cfg(config_content))
    if device_type == 'cuda' or device_type == 'cpu':
        cfg.merge_from_list(['MODEL.DEVICE', device_type])
    cfg.merge_from_list(args)
    cfg.freeze()
    return cfg.clone()


def create_predictor(config,
                     confidence_threshold=DEFAULT_CONFIDENCE_THRESHOLD,
                     min_image_size=DEFAULT_MIN_IMAGE_SIZE,
                     label_names=COCO_CATEGORIES):
    return Predictor(config,
                     confidence_threshold=confidence_threshold,
                     min_image_size=min_image_size,
                     label_names=label_names)


def invert_color(color):
    return 255-color[0], 255-color[1], 255-color[2]


def read_label_names(file):
    with open(file, 'r') as f:
        return f.read().splitlines()


def get_image_names(search_dir: str):
    image_files = []
    for root, dirs, files in os.walk(search_dir):
        for item in files:
            lower_item = item.lower()
            if lower_item.endswith('.jpeg'):
                image_files.append(item)
            if lower_item.endswith('.jpg'):
                image_files.append(item)
            if lower_item.endswith('.png'):
                image_files.append(item)
    return image_files


# noinspection PyUnresolvedReferences
def find_contours(*args, **kwargs):
    if cv2.__version__.startswith('4'):
        contours, hierarchy = cv2.findContours(*args, **kwargs)
    elif cv2.__version__.startswith('3'):
        _, contours, hierarchy = cv2.findContours(*args, **kwargs)
    else:
        raise AssertionError('cv2 must be either version 3 or 4 to call this method')
    return contours, hierarchy


def print_detection_objects(objects: list):
    for obj in objects:
        print('name: {}, score: {}'.format(obj.name, obj.score))


# noinspection PyUnresolvedReferences
def draw_detection_objects(image: np.ndarray, objects: list, color=None):
    font_scale = 0.5
    font = cv2.FONT_HERSHEY_SIMPLEX
    thickness = 1

    # font_scale = 0.4
    # font = cv2.FONT_HERSHEY_PLAIN
    # thickness = 1

    for obj in objects:
        x, y, w, h = obj.bbox
        foreground = color
        background = invert_color(foreground)
        cv2.rectangle(image, (x, y), (x + w, y + h), background, thickness, cv2.LINE_AA)

        text = '{} ({:.3f})'.format(obj.name, obj.score)
        text_size = cv2.getTextSize(text, font, font_scale, thickness)
        tw, th = text_size[0]
        base_line = text_size[1]
        cv2.rectangle(image, (x, y), (x+tw, y-th-base_line-(thickness*2)), background, cv2.FILLED)
        cv2.putText(image, text, (x, y-base_line), font, font_scale, foreground, thickness, cv2.LINE_AA)

        if obj.points:
            draw_array = []
            for points in obj.points:
                draw_array.append(np.asarray(points, dtype=np.int32))
            cv2.drawContours(image, draw_array, -1, background, 2, cv2.LINE_AA)
    return image


# noinspection PyUnresolvedReferences
def draw_simple_detection_objects(image: np.ndarray, objects: list, color, thickness, padding):
    for obj in objects:
        x, y, w, h = obj.bbox
        cv2.rectangle(image, (x-padding, y-padding), (x+w+padding, y+h+padding), color, thickness, cv2.LINE_AA)


# noinspection PyUnresolvedReferences
def inference_maskrcnn(predictor: Predictor,
                       image: np.ndarray,
                       source_name='',
                       find_labels=(),
                       threshold=0.0,
                       only_bbox=False):
    predictions = predictor.compute_prediction(image)
    top_predictions = predictor.select_top_predictions(predictions)

    bbox = top_predictions.bbox
    labels = top_predictions.get_field('labels')
    scores = top_predictions.get_field('scores')
    image_size = top_predictions.size

    if not only_bbox:
        masks = top_predictions.get_field('mask').numpy()
    else:
        masks = None

    result = []
    for i in range(len(bbox)):
        label_number = int(labels[i])
        try:
            label_name = predictor.label_names[labels[i]]
        except IndexError:
            # print('Label name error - out of range: {}'.format(label_number))
            label_name = str(label_number)

        if find_labels and label_name not in find_labels:
            continue

        score = float(scores[i])
        if threshold > 0.0 and score < threshold:
            continue

        obj = DetectorObject()

        x1 = bbox[i][0]
        y1 = bbox[i][1]
        x2 = bbox[i][2]
        y2 = bbox[i][3]

        w = int(fabs(float(x2 - x1)))
        h = int(fabs(float(y2 - y1)))

        obj.name = label_name
        obj.source = source_name
        obj.score = score
        obj.bbox = [int(x1), int(y1), w, h]
        obj.size = [int(image_size[0]), int(image_size[1])]

        if not only_bbox:
            assert masks is not None
            points_list, _ = find_contours(masks[i][0, :, :, None], cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
            obj.points = []
            for points in points_list:
                obj.points.append(points.tolist())

        result.append(obj)
    return result


# noinspection PyUnresolvedReferences
def init_locals_box(params: dict):
    config_name = str(params['config_name'])
    device_type = str(params['device_type'])
    args = str(params['args'])
    config = create_config(config_name,
                           device_type if device_type else 'cpu',
                           args.split(',') if args else ())

    confidence_threshold = float(params['confidence_threshold'])
    min_image_size = int(params['min_image_size'])
    label_names = params['label_names']

    global GLOBAL_PREDICTOR
    GLOBAL_PREDICTOR = create_predictor(config,
                                        confidence_threshold,
                                        min_image_size,
                                        label_names.split(',') if label_names else ())
    return GLOBAL_PREDICTOR is not None


# noinspection PyUnresolvedReferences
def run_locals_box(params: dict):
    box = params['frame']
    frame = np.array(box, copy=False)
    find_labels = params['find_labels']
    only_bbox = params['only_bbox'] is not '0'

    global GLOBAL_PREDICTOR
    objects = inference_maskrcnn(GLOBAL_PREDICTOR,
                                 frame,
                                 '',
                                 find_labels.split(',') if find_labels else (),
                                 0.0,
                                 only_bbox)
    return detection_objects_to_json(objects)


def response_json(code: int, msg: str, data: dict):
    return json.dumps({
        'code': code,
        'msg': msg,
        'data': data if data else dict()
    })


def response_ok_json(data=None):
    return response_json(200, 'OK', data)


def response_bad_request_json(data=None):
    return response_json(400, 'Bad request', data)


def response_not_found_json(data=None):
    return response_json(404, 'Not found', data)


def response_internal_server_error_json(data=None):
    return response_json(500, 'Internal Server Error', data)


def response_service_unavailable_json(data=None):
    return response_json(503, 'Service unavailable', data)


def response_debug_bad_request_json(msg: str):
    return response_bad_request_json({'debug': msg})


def response_debug_not_found_json(msg: str):
    return response_not_found_json({'debug': msg})


def response_debug_internal_server_error_json(msg: str):
    return response_internal_server_error_json({'debug': msg})


def response_debug_service_unavailable_json(msg: str):
    return response_service_unavailable_json({'debug': msg})


# noinspection PyUnresolvedReferences
def run_inference_web():
    try:
        from flask import Flask, request, make_response
    except ImportError as e:
        print(e)
        return

    parser = create_default_option_parser()
    options, args = parser.parse_args(sys.argv[1:])

    if options.help:
        parser.print_help()
        return True

    print('Options: {}'.format(options))
    print('Arguments: {}'.format(args))

    app_name = 'Inference'
    image_width = 1920
    image_height = 1080
    image_channels = 3
    max_content_length = (image_width*image_height*image_channels)+(1024*1024)
    app_debug = False
    http_host = "0.0.0.0"
    http_port = 8080

    app = Flask(app_name)
    app.config['MAX_CONTENT_LENGTH'] = max_content_length

    input_path = options.input
    find_labels = options.find_labels
    min_image_size = options.min_image_size
    confidence_threshold = options.confidence_threshold
    only_bbox = options.only_bbox
    verbose = options.verbose
    if options.label_names:
        label_names = read_label_names(options.label_names)
    else:
        label_names = COCO_CATEGORIES

    config = create_config(options.config_name, options.device_type, args)
    predictor = create_predictor(config, confidence_threshold, min_image_size, label_names)

    source_names = list()
    previews = dict()
    caps = dict()

    # Initialize default source.
    if input_path:
        default_source = 'default'
        source_names.append(default_source)
        caps[default_source] = cv2.VideoCapture(input_path)
    else:
        print('Empty input path')

    for name in source_names:
        if not caps[name].isOpened():
            print('Video open error: ' + name)
            return False
        code, _ = caps[name].read()
        if not code:
            print('Video read error: ' + name)
            return False

    import threading
    import time

    class ThreadDoneException(Exception):
        pass

    # noinspection PyUnresolvedReferences
    def update_video_image(sleep_time=1.0):
        print('Update thread - begin.')
        try:
            while True:
                for n in source_names:
                    if not caps[n].isOpened():
                        raise ThreadDoneException()
                    c, _ = caps[n].read()
                    if not c:
                        raise ThreadDoneException()
                if verbose:
                    print('Update thread - sleep {}sec'.format(sleep_time))
                time.sleep(sleep_time)
        except ThreadDoneException:
            pass
        print('Update thread - end.')

    if source_names:
        video_update_thread = threading.Thread(target=update_video_image, args=(0.001,))
        video_update_thread.start()

    @app.route('/')
    def index_main():
        return response_bad_request_json()

    @app.route('/version')
    def version_main():
        return response_ok_json({'major': SCRIPT_VERSION_MAJOR,
                                 'minor': SCRIPT_VERSION_MINOR,
                                 'patch': SCRIPT_VERSION_PATCH})

    @app.route('/heartbeat', methods=['GET'])
    def heartbeat_main():
        return response_ok_json()

    @app.route('/sources', methods=['GET'])
    def sources_main():
        return response_ok_json({'names': source_names})

    @app.route('/source/<name>', methods=['GET'])
    def source_name_main(name):
        if name not in source_names:
            return response_bad_request_json()
        return response_ok_json({'name': name,
                                 'type': 'camera',
                                 'status': 'active' if caps[name].isOpened() else 'inactive'})

    @app.route('/preview/<name>', methods=['GET'])
    def preview_name_main(name):
        if name not in previews:
            return response_not_found_json()

        result, image_buffer = cv2.imencode('.png', previews[name])
        if not result:
            return response_internal_server_error_json()

        response = make_response(image_buffer.tobytes())
        response.headers['Content-Type'] = 'image/png'
        return response

    def inference_and_response_ok(frame, source_name, request_find_labels, request_threshold):
        if request_find_labels:
            labels = list(map(lambda x: str(x), request_find_labels.split(';')))
        else:
            labels = request_find_labels

        try:
            threshold = float(request_threshold)
            if threshold < 0.0:
                threshold = 0.0
            elif threshold > 1.0:
                threshold = 1.0
        except:
            threshold = confidence_threshold

        begin_time = datetime.now()
        objects = inference_maskrcnn(predictor, frame, source_name, labels, threshold, only_bbox)
        delta = datetime.now() - begin_time
        milliseconds = int(delta.total_seconds() * 1000)
        print('Current inference time {}ms'.format(milliseconds))

        if source_name:
            previews[source_name] = frame.copy()
            draw_detection_objects(previews[source_name], objects, (255, 0, 0))

        objects_list = []
        for obj in objects:
            objects_list.append(obj.to_dict())
        return response_ok_json({'result': objects_list})

    @app.route('/inference', methods=['POST'])
    def inference_main():
        """
        Test:
        curl -F '[email protected]' -X POST "http://0.0.0.0:8080/inference?labels=car;&threshold=0.98&name=test"
        """

        assert request.method == 'POST'

        # check if the post request has the file part
        if 'image' not in request.files:
            return response_bad_request_json()

        image = cv2.imdecode(np.frombuffer(request.files['image'].read(), np.uint8), cv2.IMREAD_COLOR)
        labels = request.args.get('labels')
        threshold = request.args.get('threshold')
        name = request.args.get('name')
        return inference_and_response_ok(image, name, labels, threshold)

    @app.route('/inference/<name>', methods=['GET'])
    def inference_name_main(name):
        assert request.method == 'GET'

        if name not in source_names:
            return response_debug_bad_request_json('Unknown source name: ' + name)

        if not caps[name].isOpened():
            return response_debug_service_unavailable_json('The source is closed')

        code, frame = caps[name].read()
        if not code:
            return response_debug_internal_server_error_json('An error occurred while reading the frame')

        labels = request.args.get('labels')
        threshold = request.args.get('threshold')
        return inference_and_response_ok(frame, name, labels, threshold)

    # Start Flask Web Application.
    app.run(debug=app_debug, host=http_host, port=http_port)

    for name in source_names:
        while caps[name].isOpened():
            caps[name].release()

maskrcnn_excel_report.py

# -*- coding: utf-8 -*-

import sys
import os
import cv2
import numpy as np
from datetime import datetime

SCRIPT_PATH = os.path.abspath(__file__)
SCRIPT_DIR = os.path.dirname(SCRIPT_PATH)
STORAGE_DIR = os.path.abspath(os.path.join(SCRIPT_DIR, os.pardir))
SOURCE_DIR = os.path.abspath(os.path.join(STORAGE_DIR, os.pardir))
PRESET_DIR = os.path.join(SOURCE_DIR, 'libc2deep', 'script', 'python', 'py')
CONFIG_DIR = os.path.join(SOURCE_DIR, 'libc2deep', 'res', 'static', 'caffe2')
LABELS_DIR = os.path.join(SOURCE_DIR, 'storage', 'label')
IMAGES_DIR = os.path.join(SOURCE_DIR, 'tester', 'asset', 'image')
TEMP_DIR = os.path.join(SOURCE_DIR, 'tester', 'asset', 'temp')
COLORS_PATH = os.path.join(SOURCE_DIR, 'storage', 'color', 'default.color')
sys.path.append(PRESET_DIR)


def clamp(x):
    return max(0, min(x, 255))


# noinspection PyUnresolvedReferences
def fill_detection_ratio_objects(image: np.ndarray, objects: list, colors: dict):
    for obj in objects:
        assert obj.points
        draw_array = []
        for points in obj.points:
            draw_array.append(np.asarray(points, dtype=np.int32))
        cv2.drawContours(image, draw_array, -1, color=colors[obj.name], thickness=cv2.FILLED)
    return image


# noinspection PyUnresolvedReferences
def write_excel_report(excel_path: str,
                       image: np.ndarray,
                       image_path: str,
                       preview_image_path: str,
                       objects: list,
                       labels: list,
                       colors: dict):
    import openpyxl
    wb = openpyxl.Workbook()
    ws = wb.active

    height = image.shape[0]
    width = image.shape[1]
    total = height * width

    background_mask = np.all(image == colors['__background'], axis=-1)
    background_mask_count = np.count_nonzero(background_mask)
    assert background_mask_count <= total

    foreground_mask = np.any(image != colors['__background'], axis=-1)
    foreground_mask_count = np.count_nonzero(foreground_mask)
    assert foreground_mask_count <= total
    assert foreground_mask_count + background_mask_count == total

    ws['A1'] = 'Foreground pixel count'
    ws['B1'] = foreground_mask_count
    ws['A2'] = 'Background pixel count'
    ws['B2'] = background_mask_count

    ws['A4'] = 'Label'
    ws['B4'] = 'Pixel count'
    ws['C4'] = 'Ratio'  # Percentage (Pixel/Foreground)
    ws['D4'] = 'Count'
    ws['E4'] = 'Color'

    row_index = 5
    for label in filter(lambda i: i != '__background', labels):
        mask = np.all(image == colors[label], axis=-1)
        mask_count = np.count_nonzero(mask)
        assert mask_count <= total
        r = colors[label][2]
        g = colors[label][1]
        b = colors[label][0]
        hex_color = "{0:02x}{1:02x}{2:02x}".format(clamp(r), clamp(g), clamp(b))
        ws['A' + str(row_index)] = label
        ws['B' + str(row_index)] = mask_count
        ws['C' + str(row_index)] = '=B{}/B1'.format(row_index)
        ws['D' + str(row_index)] = sum(x.name == label for x in objects)
        ws['E' + str(row_index)].fill = openpyxl.styles.PatternFill(start_color=hex_color,
                                                                    end_color=hex_color,
                                                                    fill_type="solid")
        row_index += 1

    ws['F4'] = 'Label'
    ws['G4'] = 'Score'
    ws['H4'] = 'x'
    ws['I4'] = 'y'
    ws['J4'] = 'width'
    ws['K4'] = 'height'

    row_index = 5
    for obj in objects:
        ws['F' + str(row_index)] = obj.name
        ws['G' + str(row_index)] = obj.score
        ws['H' + str(row_index)] = obj.bbox[0]
        ws['I' + str(row_index)] = obj.bbox[1]
        ws['J' + str(row_index)] = obj.bbox[2]
        ws['K' + str(row_index)] = obj.bbox[3]
        row_index += 1

    img1 = openpyxl.drawing.image.Image(image_path)
    img1.anchor = 'L4'
    img1.width = 300
    img1.height = 300
    ws.add_image(img1)

    img2 = openpyxl.drawing.image.Image(preview_image_path)
    img2.anchor = 'L18'
    img2.width = 300
    img2.height = 300
    ws.add_image(img2)

    wb.save(excel_path)


# noinspection PyUnresolvedReferences
def run_maskrcnn_object_ratio():
    try:
        from inference_maskrcnn import COCO_CATEGORIES, Predictor, \
            create_default_option_parser, read_label_names, \
            create_config, create_predictor, get_image_names, \
            inference_maskrcnn, draw_detection_objects, \
            print_detection_objects
    except ImportError as e:
        print(e)
        return False

    parser = create_default_option_parser()
    options, args = parser.parse_args(sys.argv[1:])

    if options.help:
        parser.print_help()
        return True

    config_name = os.path.join(CONFIG_DIR, options.config_name + '.yaml')
    input_path = options.input if options.input else IMAGES_DIR
    output_path = options.output if options.output else TEMP_DIR
    find_labels = options.find_labels.split(',') if options.find_labels else ()
    min_image_size = options.min_image_size
    confidence_threshold = options.confidence_threshold
    only_bbox = False
    verbose = options.verbose

    label_names = list()
    if options.label_names:
        label_names = read_label_names(options.label_names)
        if not label_names:
            label_names = read_label_names(os.path.join(LABELS_DIR, options.label_names + '.label'))
    if not label_names:
        label_names = COCO_CATEGORIES

    if verbose:
        print('Config name: {}'.format(config_name))
        print('Input path: {}'.format(input_path))
        print('Output path: {}'.format(output_path))
        print('Find labels: {}'.format(find_labels))
        print('Min image size: {}'.format(min_image_size))
        print('Confidence threshold: {}'.format(confidence_threshold))
        print('Only bounding box: {}'.format(only_bbox))
        print('Label names: {}'.format(label_names))

    config = create_config(config_name, options.device_type, args)
    predictor = create_predictor(config, confidence_threshold, min_image_size, label_names)

    if os.path.isdir(input_path):
        print('Read directory: {}'.format(input_path))
        image_names = get_image_names(input_path)
    elif os.path.exists(input_path):
        print('Read file: {}'.format(input_path))
        image_names = [input_path]
    else:
        print('Unknown input source: {}'.format(input_path))
        return False

    if not os.path.isdir(output_path):
        print('The output is not a directory: {}'.format(output_path))
        return False

    ratio_output = os.path.join(output_path, 'ratio')
    if not os.path.isdir(ratio_output):
        print('Make the ratio directory: {}'.format(ratio_output))
        os.mkdir(ratio_output)

    colors = list()
    for color in open(COLORS_PATH).read().strip().split('\n'):
        colors.append(list(map(lambda x: int(x), color.split(','))))

    label_colors = dict()
    label_colors_index = 0
    for label in label_names:
        label_colors[label] = colors[label_colors_index]
        label_colors_index += 1

    image_names.sort()

    for i in range(len(image_names)):
        name = image_names[i]
        frame = cv2.imread(os.path.join(input_path, name))

        begin_time = datetime.now()
        objects = inference_maskrcnn(predictor, frame, input_path, find_labels,
                                     confidence_threshold, only_bbox)
        delta = datetime.now() - begin_time
        milliseconds = int(delta.total_seconds() * 1000)
        print('Current frame ({}/{}) {} inference time {}ms'.format(i, len(image_names), name, milliseconds))

        preview = frame.copy()
        draw_detection_objects(preview, objects, (255, 0, 0))
        if verbose:
            print_detection_objects(objects)
        preview_image_path = os.path.join(output_path, name)
        cv2.imwrite(preview_image_path, preview)

        # Image object ratio calculation:
        ratio_preview = np.zeros(shape=frame.shape, dtype=frame.dtype)
        objects = sorted(objects, key=lambda obj: obj.score, reverse=False)
        fill_detection_ratio_objects(ratio_preview, objects, label_colors)

        image_path = os.path.join(ratio_output, name)
        cv2.imwrite(image_path, ratio_preview)

        write_excel_report(os.path.join(ratio_output, name + '.xlsx'),
                           ratio_preview,
                           image_path,
                           preview_image_path,
                           objects,
                           label_names,
                           label_colors)

    return True


if __name__ == "__main__":
    run_maskrcnn_object_ratio()

See also