Faster R-CNN to detect objects on the road

7 minute read

Faster Region Based CNN (Faster R-CNN)

The goal of this post is to show how to use faster R-CNN pretrained model to classify and localize objects on the road.

Introduction

A self-driving car makes use of the Computer Vision to percieve its environment. Mutliple object detection refers to the idea of detecting multiple objects within the image.

TensorFlow Object Detection API

TensorFlow’s Object Detection API makes it possible to do this analysis.

import numpy as np
import os
import six.moves.urllib as urllib
import sys
import tarfile
import tensorflow as tf
import zipfile
import pandas as pd
from PIL import Image
import cv2
import pandas as pd
from distutils.version import StrictVersion
from collections import defaultdict
from io import StringIO
from matplotlib import pyplot as plt
import time
import os

#!pip install unipath
from unipath import Path
ObjectDetectionDirPath = Path(os.getcwd())
ResearchFolderPath = Path(ObjectDetectionDirPath.parent)
sys.path.append(ResearchFolderPath)

from object_detection.utils import ops as utils_ops

if StrictVersion(tf.__version__) < StrictVersion('1.9.0'):
    raise ImportError('Please upgrade your TensorFlow installation to v1.9.* or later!')

# arial.ttf makes the class labels visible.
from PIL import ImageFont
font = ImageFont.truetype('arial.ttf', 20)

# This is needed to display the images.
%matplotlib inline

Assuming that you have a working copy of object detection setup on your laptop. If not, check out my other post where I listed the steps to configure your development environment.

# Import tensorflow object detection libs
from tensorflow.python.client import device_lib
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util

Download the models from TensorFlow Model Zoo

# What model to download.
#https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/detection_model_zoo.md
myModels = ['faster_rcnn_inception_v2_coco']
            #'faster_rcnn_inception_v2_coco_custom',
            #'faster_rcnn_resnet50_coco',
            #'ssd_inception_v2_coco',
            #'rfcn_resnet101_coco',
            #'mask_rcnn_inception_v2_coco']
            #,'mask_rcnn_resnet101_atrous_coco']
            #'faster_rcnn_inception_v2_coco'

# List of the strings that is used to add correct label for each box.
PATH_TO_LABELS = os.path.join(ObjectDetectionDirPath, 'data/mscoco_label_map.pbtxt')

category_index = label_map_util.create_category_index_from_labelmap(PATH_TO_LABELS, use_display_name=True)

# Dowload and load frozen model
def Download_and_loadFrozen_model(current_model = myModels[0]):
    MODEL_NAME = current_model+'_2018_01_28'
    MODEL_FILE = MODEL_NAME + '.tar.gz'
    DOWNLOAD_BASE = 'http://download.tensorflow.org/models/object_detection/'

    # Path to frozen detection graph. This is the actual model that is used for the object detection.
    PATH_TO_FROZEN_GRAPH = MODEL_NAME + '/frozen_inference_graph.pb'

    #download model
    opener = urllib.request.URLopener()
    opener.retrieve(DOWNLOAD_BASE + MODEL_FILE, MODEL_FILE)
    tar_file = tarfile.open(MODEL_FILE)
    for file in tar_file.getmembers():
      file_name = os.path.basename(file.name)
      if 'frozen_inference_graph.pb' in file_name:
        tar_file.extract(file, os.getcwd())
    print(current_model + " model has been downloaded...")

def LoadFrozenModel(current_model = myModels[0]):
    MODEL_NAME = current_model+'_2018_01_28'
    # Path to frozen detection graph. This is the actual model that is used for the object detection.
    PATH_TO_FROZEN_GRAPH = MODEL_NAME + '/frozen_inference_graph.pb'
    #load frozen model
    detection_graph = tf.Graph()
    with detection_graph.as_default():
      od_graph_def = tf.GraphDef()
      with tf.gfile.GFile(PATH_TO_FROZEN_GRAPH, 'rb') as fid:
        serialized_graph = fid.read()
        od_graph_def.ParseFromString(serialized_graph)
        tf.import_graph_def(od_graph_def, name='')
    #print(current_model+" model loaded...")
    return detection_graph

# Commect this section out if you want to download models passed
# from the Download_and_loadFrozen_model list above
#for i in myModels:
    #download and load a frozen model
    #detection_graph =Download_and_loadFrozen_model(i) #You can pass model name here

Import the COCO labels.

coco_labels = pd.DataFrame(category_index).T
coco_labels.to_csv("coco_object_labels.csv")

Inference and Object Detection

def load_image_into_numpy_array(image):
  (im_width, im_height) = image.size
  return np.array(image.getdata()).reshape(
      (im_height, im_width, 3)).astype(np.uint8)
def run_inference_for_single_image(image, graph):
  with graph.as_default():
    #############################################
    # Comment this code out if you don't have a GPU
    #############################################
    #with tf.device('/device:GPU:0'):
    with tf.Session() as sess:
      # Get handles to input and output tensors
      ops = tf.get_default_graph().get_operations()
      all_tensor_names = {output.name for op in ops for output in op.outputs}
      tensor_dict = {}
      for key in [
          'num_detections', 'detection_boxes', 'detection_scores',
          'detection_classes', 'detection_masks'
      ]:
        tensor_name = key + ':0'
        if tensor_name in all_tensor_names:
          tensor_dict[key] = tf.get_default_graph().get_tensor_by_name(
              tensor_name)
      if 'detection_masks' in tensor_dict:
        # The following processing is only for single image
        detection_boxes = tf.squeeze(tensor_dict['detection_boxes'], [0])
        detection_masks = tf.squeeze(tensor_dict['detection_masks'], [0])
        # Reframe is required to translate mask from box coordinates to image coordinates and fit the image size.
        real_num_detection = tf.cast(tensor_dict['num_detections'][0], tf.int32)
        detection_boxes = tf.slice(detection_boxes, [0, 0], [real_num_detection, -1])
        detection_masks = tf.slice(detection_masks, [0, 0, 0], [real_num_detection, -1, -1])
        detection_masks_reframed = utils_ops.reframe_box_masks_to_image_masks(
            detection_masks, detection_boxes, image.shape[0], image.shape[1])
        detection_masks_reframed = tf.cast(
            tf.greater(detection_masks_reframed, 0.5), tf.uint8)
        # Follow the convention by adding back the batch dimension
        tensor_dict['detection_masks'] = tf.expand_dims(
            detection_masks_reframed, 0)
      image_tensor = tf.get_default_graph().get_tensor_by_name('image_tensor:0')

      # Run inference
      output_dict = sess.run(tensor_dict,
                             feed_dict={image_tensor: np.expand_dims(image, 0)})

      # all outputs are float32 numpy arrays, so convert types as appropriate
      output_dict['num_detections'] = int(output_dict['num_detections'][0])
      output_dict['detection_classes'] = output_dict[
          'detection_classes'][0].astype(np.uint8)
      output_dict['detection_boxes'] = output_dict['detection_boxes'][0]
      output_dict['detection_scores'] = output_dict['detection_scores'][0]
      if 'detection_masks' in output_dict:
        output_dict['detection_masks'] = output_dict['detection_masks'][0]
    return output_dict
import os
# Size, in inches, of the output images.
IMAGE_SIZE = (20, 10)
def InferNowXXX(modelName =myModels[0], detection_graph=LoadFrozenModel()):
    Temp = pd.DataFrame()
    dataX = pd.DataFrame()
    result = pd.DataFrame()

    for i in range(0,len(folders)):
        Imageslist = os.listdir(folders[i]+'images')
        Imageslist.sort()

        PATH_TO_TEST_IMAGES_DIR =folders[i]+'images'
        imagePATH = [ os.path.join(PATH_TO_TEST_IMAGES_DIR, Imageslist[j]) for j in range(0,len(Imageslist)) ]

        for image_path in imagePATH:

            image = Image.open(image_path)
            width, height = image.size
            # the array based representation of the image will be used later in order to prepare the
            # result image with boxes and labels on it.
            image_np = load_image_into_numpy_array(image)
            # Expand dimensions since the model expects images to have shape: [1, None, None, 3]
            image_np_expanded = np.expand_dims(image_np, axis=0)
            # Actual detection.
            t0 = time.time()
            output_dict =run_inference_for_single_image(image_np, detection_graph)
            t1 = time.time()
            total = t1-t0
            t0 =t1=0
            # Visualization of the results of a detection.
            vis_util.visualize_boxes_and_labels_on_image_array(
               image_np,
               output_dict['detection_boxes'],
               output_dict['detection_classes'],
               output_dict['detection_scores'],
               category_index,
               instance_masks=output_dict.get('detection_masks'),
               use_normalized_coordinates=True, #
               line_thickness=2)
            plt.figure(figsize=IMAGE_SIZE)
            plt.imshow(image_np)
            plt.title(modelName)
            #df = pd.DataFrame([[modelName, total, condition[i], image_path]], columns=['Model','Inference Time (Sec)','Driving Condition', 'ImageID'])

            print(image_path.split('/')[4], ": ", modelName, ":", round(total, 3), "Seconds")
            #print(output_dict['detection_classes'],output_dict['detection_scores'])
            detection_classes = pd.DataFrame(output_dict['detection_classes'])
            detection_scores = pd.DataFrame(output_dict['detection_scores'])
            detection_boxes  = pd.DataFrame(output_dict['detection_boxes'])
            #<class_name> <confidence> <left> <top> <right> <bottom>
            #dataX["Class"] = detection_classes
            #pd.concat([df1, df3], join="inner")
            result = pd.concat([detection_classes, detection_scores], axis=1, sort=False)
            result = pd.concat([result, detection_boxes], axis=1, sort=False)
            #the code below was the bug
            result.columns = ['Class', 'Confidence','top', 'left', 'bottom', 'right'] #updated
            result['Width'] =width
            result['Height'] =height
            result['Model']= modelName
            result['Inference Time (Sec)']= total
            result['Driving Condition'] = condition[i]
            result['ImageID'] = image_path.split('/')[4]
            Temp = Temp.append(result)
            #print(str(output_dict))
    return(Temp)
def Run_inference():
    temp = pd.DataFrame()
    print("******************************************")
    print("*            Inference Time              *")
    print("******************************************")
    for i in myModels:
        #download and load a frozen model
        detgraph = LoadFrozenModel(i)
        #detection_graph =Download_and_loadFrozen_model(i) #You can pass model name here
        temp = temp.append(InferNowXXX(i, detgraph))
        #InferNow(i, detgraph)
    print("******************************************")
    temp = temp[temp.Confidence != 0]
    temp.to_csv('InferenceData_sk.csv', index=False)
    temp = pd.read_csv('InferenceData_sk.csv')

Generate Predict Labels txt files

# Generate predicted labels. txt file
# https://github.com/Cartucho/mAP#create-the-ground-truth-files
# This Code block generates predictions files which take the format below for indivisual images.
# <class_name> <confidence> <left> <top> <right> <bottom>

def PredictionsFiles():
    temp = pd.DataFrame()
    temp = pd.read_csv('InferenceData_sk.csv')
    #temp = tempX[tempX.Confidence != 0]
    models = pd.unique(temp['Model'])
    Drivingcondition = pd.unique(temp['Driving Condition'])
    col_list = ['Class', 'Confidence', 'left', 'top', 'right', 'bottom'] # updated

    for i in models:
        ModelData = temp[temp['Model']==i]  #subset data by model
        #display(ModelData)
        for j in Drivingcondition:
            #print(j)
            DrivingConditionData = ModelData[ModelData['Driving Condition']==j] #subset data by driving conditon
            #print(DrivingConditionData.shape)
            templist =pd.unique(DrivingConditionData['ImageID'])
            #print(len(templist))
            for k in templist:
                ImageData = DrivingConditionData[DrivingConditionData['ImageID']==k]

                mod=list(pd.unique(ImageData.Model))[0]# model
                weather =list(pd.unique(ImageData['Driving Condition']))[0]#driving Condition
                #weather = "Day1"
                pic = list(pd.unique(ImageData.ImageID))[0].split('.')[0] #image
                directory ="samples-1k/Driving_condition/"+weather+"/predicted/"+mod+"/"
                #create the directory if it doesn't exist
                import os, errno
                try:
                    os.makedirs(directory)
                except OSError as e:
                    if e.errno != errno.EEXIST:
                        raise
                #save_address ="/sample-1k/Driving_condition/Predictions/"+mod+"/"+weather+"/"+pic+".txt" #prediction file
                ImageData[col_list].to_csv(directory+pic+".txt", header=None, index=None, sep=' ')

Generate Groundtruth Files

import os
import glob
import pandas as pd
import xml.etree.ElementTree as ET
# this function takes in folder with ground truth xml files and returns the data in a dataframe
def xml_to_csv(path):
    xml_list = []
    for xml_file in glob.glob(path + '/*.xml'):
        tree = ET.parse(xml_file)
        root = tree.getroot()
        for member in root.findall('object'):
            value = (root.find('filename').text,
                     int(root.find('size')[0].text),
                     int(root.find('size')[1].text),
                     member[0].text,
                     int(member[4][0].text),
                     int(member[4][1].text),
                     int(member[4][2].text),
                     int(member[4][3].text)
                     )
            xml_list.append(value)
    column_name = ['filename', 'width', 'height', 'class', 'xmin', 'ymin', 'xmax', 'ymax']
    xml_df = pd.DataFrame(xml_list, columns=column_name)
    classes = pd.read_csv('coco_object_labels.csv')
    classid =[]
    # add class id from the coco labels
    for i in list(xml_df['class']):
        #print(int(classes[classes.name==i].id))
        classid =classid+[int(classes[classes.name==i].id)]

    xml_df['id'] =classid
    # scaling the x and y coordinates for the ground truth file by the image dimensions
    xml_df["xmin_scaled"] = xml_df.xmin/xml_df.width
    xml_df["ymin_scaled"] = xml_df.ymin/xml_df.height
    xml_df["xmax_scaled"] = xml_df.xmax/xml_df.width
    xml_df["ymax_scaled"] = xml_df.ymax/xml_df.height
    return xml_df
# https://github.com/Cartucho/mAP#create-the-ground-truth-files
# This Code block generates predictions files which take the format below for indivisual images.
# <class_name> <confidence> <left> <top> <right> <bottom>

def GroundTruth2txt(GroundTruth,folder= "samples-1k/Driving_condition/Snowy"):
    directory = folder+'/ground-truth/'
    temp = pd.DataFrame(GroundTruth.copy())
    #temp = tempX[tempX.Confidence != 0]
    imageslist = pd.unique(temp['filename']) #unique images
    col_list = ['id', 'xmin_scaled', 'ymin_scaled','xmax_scaled','ymax_scaled'] # updated

    for i in imageslist:
        imageData = temp[temp['filename']==i]  #subset data by image
        filename = i.split('.jpg')[0]+'.txt' #get the ground truth filename
        import os, errno
        try:
            os.makedirs(directory)
        except OSError as e:
            if e.errno != errno.EEXIST:
                raise
        imageData[col_list].to_csv(directory+filename, header=None, index=None, sep=' ')
###################################################################
# This is the main function. runs all the functions in notebook   #
###################################################################
import glob

folders = ['samples-1k/Driving_condition/custom/']

condition = ['custom']#, 'Night', 'Snowy']

def main(folder =folders[0]):
    Run_inference()# Run Inferrence
    PredictionsFiles()
    image_path = os.path.join(os.getcwd(), folder+'/ground-truth-xml')
    GroundTruth = xml_to_csv(image_path)
    foldername = str(folder.split('/')[2])
    GroundTruth.to_csv(folder+'/'+foldername+'_groundTruthlabels.csv', index=None)
    print('Successfully converted groundtruth xml to txt. You are ready to run mAP')
    GroundTruth2txt(GroundTruth, folder) #

########################    
main(folders[0])

******************************************
*            Inference Time              *
******************************************
s4.jpg :  faster_rcnn_inception_v2_coco : 7.896 Seconds
s40.jpg :  faster_rcnn_inception_v2_coco : 7.635 Seconds
s41.jpg :  faster_rcnn_inception_v2_coco : 7.728 Seconds
s42.jpg :  faster_rcnn_inception_v2_coco : 8.06 Seconds
s43.jpg :  faster_rcnn_inception_v2_coco : 7.636 Seconds
s44.jpg :  faster_rcnn_inception_v2_coco : 7.558 Seconds
s45.jpg :  faster_rcnn_inception_v2_coco : 7.813 Seconds
s46.jpg :  faster_rcnn_inception_v2_coco : 7.488 Seconds
s47.jpg :  faster_rcnn_inception_v2_coco : 8.033 Seconds
s48.jpg :  faster_rcnn_inception_v2_coco : 7.611 Seconds
s49.jpg :  faster_rcnn_inception_v2_coco : 7.818 Seconds
******************************************
Successfully converted groundtruth xml to txt. You are ready to run mAP

png

png

png

png

png

png

png

png

png

png

png