-1

问题

我想在我的应用程序中以 60 FPS 的速度全屏显示图像(1920 x 1080),但我实际得到的接近 15 FPS。

我试过的

这是我能够获得的最简化的代码版本,它仍然会产生我上面描述的问题。

// my code

import os
import sys
import cv2
import time
import pygame
import random
import numpy as np
import importlib.util
from threading import Thread
from imutils.video import VideoStream

### CLASSES ###

# Define VideoStream class to handle streaming of video from webcam in separate processing thread
class VideoStream:

    """Camera object that controls video streaming from the Picamera"""
    def __init__(self, resolution = (640, 360), framerate = 30):
        # Initialize the PiCamera and the camera image stream
        self.stream = cv2.VideoCapture(0)

        # Resolution of the video stream (w, h)
        self.resolution = int(resolution[0]), int(resolution[1])

        # Configure camera resolution
        ret = self.stream.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
        ret = self.stream.set(3, self.resolution[0])
        ret = self.stream.set(4, self.resolution[1])
            
        # Read first frame from the stream
        self.grabbed, self.frame = self.stream.read()

        # Variable to control when the camera is stopped
        self.stopped = False

        # Variables to monitor frame rate
        self.fps = framerate
        self.frequency = None
        self.t1 = 0
        self.t_delta = 0

    def start(self):
        # Start the thread that reads frames from the video stream
        Thread(target = self.update, args = ()).start()
        return self

    def update(self):
        # Initialize frame rate calculation
        self.fps = 60
        self.frequency = cv2.getTickFrequency()

        # Keep looping indefinitely until the thread is stopped
        while True:
            # If the camera is stopped, stop the thread
            if self.stopped:
                # Close camera resources
                self.stream.release()
                return
            # Otherwise, grab the next frame from the stream
            self.grabbed, self.frame = self.stream.read()

    def read(self):
        # Return the most recent frame
        return self.frame

    def stop(self):
        # Indicate that the camera and thread should be stopped
        self.stopped = True

class ObjectDetector:

    """Computer vision object that detects object(s) in images
    using artifical intelligence and a pre-trained model"""
    def __init__(self, model_path: str = None, graph_name: str = "detect.tflite", labelmap_name: str = "labelmap.txt", \
        use_TPU = False, minimum_confidence_threshold: float = 0.5, videostream: VideoStream = None):
        # Setup default properties
        self.model_path = model_path
        self.graph_name = graph_name
        self.labelmap_name = labelmap_name
        self.use_TPU = use_TPU
        self.minimum_confidence_threshold = minimum_confidence_threshold
        self.videostream = videostream

        self.frame = None
        self.score_string = ""

        self.stopped = False

        self.setup()

    def setup(self):
        # Setup the environment for the model
        print("[INFO] Preparing 'ShuffleboardPucks_TFLite' model...")

        # Import TensorFlow libraries
        pkg_exists = importlib.util.find_spec("tflite_runtime")
        if pkg_exists:
            from tflite_runtime.interpreter import Interpreter
            if self.use_TPU:
                from tflite_runtime.interpreter import load_delegate    
        else:
            from tensorflow.lite.python.interpreter import Interpreter
            if self.use_TPU:
                from tensorflow.lite.python.interpreter import load_delegate

        # If using Edge TPU, assign filename for Edge TPU model
        if self.use_TPU:
            # If user has specified the name of the *.tflite file, use that name, otherwise, use default 'edgetpu.tflite'
            if self.graph_name == "detect.tflite":
                self.graph_name = "edgetpu.tflite"     

        # Get path to current working directory
        cwd_path = os.getcwd()
        # Path to *.tflite file, which contains the model that is used for object detection
        ckpt_path = os.path.join(cwd_path, self.model_path, self.graph_name)
        # Path to labelmap file
        label_path = os.path.join(cwd_path, self.model_path, self.labelmap_name)
        # Load the labelmap
        with open(label_path, "r") as f:
            self.labels = [line.strip() for line in f.readlines()]
        # Have to do a weird fix for label map if using the COCO "starter model" from
        # https://www.tensorflow.org/lite/models/object_detection/overview
        # First label is '???', which has to be removed.
        if self.labels[0] == "???":
            del(self.labels[0])

        # Load the Tensorflow Lite model
        # If using Edge TPU, use special load_delegate argument
        if self.use_TPU:
            self.interpreter = Interpreter(model_path = ckpt_path, experimental_delegates = [load_delegate("libedgetpu.so.1.0")])
        else:
            self.interpreter = Interpreter(model_path = ckpt_path)

        self.interpreter.allocate_tensors()

        # Get model details
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()
        self.model_height = self.input_details[0]["shape"][1]
        self.model_width = self.input_details[0]["shape"][2]

        # Check if model is floating point version, which uses floats rather than
        # integers when predicting to decrease calculation time
        self.is_floating_model = (self.input_details[0]["dtype"] == np.float32)

        self.input_mean = 127.5
        self.input_std = 127.5

    def detect(self):
        # Start the thread that performs object detection on frames read from the video stream

        # Initialize the video stream and allow the camera sensor to warm up
        print("[INFO] Starting video stream...")

        self.videostream.start()
        # time.sleep(2) # TODO Determine whether this is necessary

        Thread(target = self.update, args = ()).start()
        return self

    def update(self):
        # Loop over the frames from the video stream until thread is stopped

        while True:

            # If the user closes pygame stopped, stop the thread
            if self.stopped:
                # Do a bit of cleanup
                cv2.destroyAllWindows()
                self.videostream.stop()
                return
            # Otherwise, continue to perform detection
            
            # Grab frame from video stream
            self.frame = self.videostream.read()
            
            # Recolor and resize frame to expected shape [1xHxWx3]
            frame_recolored = cv2.cvtColor(self.frame, cv2.COLOR_BGR2RGB)
            frame_resized = cv2.resize(frame_recolored, (self.model_width, self.model_height)) # NOTE: Resizing during each iteration is very slow
            input_data = np.expand_dims(frame_resized, axis = 0)
            
            # Get frame's height and width (h, w)
            h, w = self.videostream.stream.get(cv2.CAP_PROP_FRAME_HEIGHT), self.videostream.stream.get(cv2.CAP_PROP_FRAME_WIDTH)
            
            # Team score variables
            blue_score, red_score = 0, 0
            
            # Normalize pixel values if using a floating model (i.e. if model is non-quantized)
            if self.is_floating_model:
                input_data = (np.float32(input_data) - self.input_mean) / self.input_std

            # Perform the actual detection by running the model with the image as input
            self.interpreter.set_tensor(self.input_details[0]["index"], input_data)
            self.interpreter.invoke()

            # Retrieve detection results
            classes = self.interpreter.get_tensor(self.output_details[1]["index"])[0] # Class index of detected objects
            scores = self.interpreter.get_tensor(self.output_details[2]["index"])[0] # Confidence of detected objects

            # Loop over all detections and draw detection box if confidence is above minimum threshold
            for i in range(len(scores)):
                
                if (scores[i] > self.minimum_confidence_threshold) and (scores[i] <= 1.0):

                    # Draw label
                    object_name = self.labels[int(classes[i])] # Look up object name from "labels" array using class index
                    
                    # Allocate appropriate scores
                    if (object_name == "Blue Shuffleboard Puck"):
                        blue_score += 1
                        
                    elif (object_name == "Red Shuffleboard Puck"):
                        red_score += 1

            
            # This captures the latest score based on object detection results
            self.score_string = str(blue_score) + "-" + str(red_score)

            # Update the frame variable with latest results
            self.frame = self.frame

    def stop(self):
        # Indicate that the loop and thread should be stopped
        self.stopped = True

    def read(self):
        # Return the most recent frame and score
        return self.frame, self.score_string


### HELPER FUNCTIONS ###

# Calculate distance between two points
def calculateDistance(ptA, ptB):
    # Divide by 25.4 to convert to inches; remove to keep in millimeters
    return (abs(ptA - ptB))

# Convert CV Image (frame) to Pygame Image
def convertToPygameImage(frame):
    return pygame.image.frombuffer(frame.tobytes(), frame.shape[1::-1], "RGB")


### MAIN THREAD ###

# Initialize the video stream using the PiCamera
resolution = (640, 360)

videostream = VideoStream(resolution = resolution, framerate = 60) # NOTE: Might have to use 30 instead of 60

# Intialize the object detection with the video stream's images
model_path = "/home/pi/tflite1/ShuffleboardPucks_TFLite_model_(ssd_mobilenet_v2_quantized_300x300_coco)"
graph_name = "detect.tflite"
labelmap_name = "labelmap.txt"

shuffleboard_detector = ObjectDetector(model_path, graph_name, labelmap_name, use_TPU = True, minimum_confidence_threshold = 0.5, videostream = videostream)
shuffleboard_detector.detect()


# Initialize Pygame
print("[INFO] Initializing Pygame...")

pygame.init()
clock = pygame.time.Clock()

# Setup Window
win = pygame.display.set_mode((0, 0), pygame.RESIZABLE)
win_W, win_H = pygame.display.get_surface().get_size()
pygame.display.set_caption("Prototype")

# Loop idefinitely iver object detection frames to display live shuffleboard score
playing_shuffleboard = True
while playing_shuffleboard:
    
    # Handle the Pygame events
    for event in pygame.event.get():
        # If user closed the window, stop object detection thread, videostream thread, and loop
        if event.type == pygame.QUIT:
            playing_shuffleboard = False
            shuffleboard_detector.stop()
            continue
        # If user pressed the 'q' key, ""
        elif event.type == pygame.KEYDOWN and event.key == pygame.K_q:
            playing_shuffleboard = False
            shuffleboard_detector.stop()
            continue
    
    clock.tick(60)

    # Get the latest detection results/annotations (image with detection annotations)
    # and score for both red and blue teams
    detection_frame, score_string = shuffleboard_detector.read()

    # Add object-detection-annotated camera frame to screen
    frame_resized = cv2.resize(detection_frame, (1920, 1080))
    frame_recolored = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2RGB)
    pygame_image = convertToPygameImage(frame_recolored)
    win.blit(pygame_image, (0, 0))

    pygame.display.update()

# Do a bit of cleanup
pygame.quit()
sys.exit()

基本上,代码所做的是(1)创建一个线程,用于通过 cv2.VideoCapture() 以 640 x 360 分辨率从我的 Raspberry Pi 4 PiCamera 捕获视频流,(2)然后,创建另一个执行对象检测的线程这些视频流图像并输出带注释的图像,(3)最后,使用 cv2.resize() 将该图像的大小调整为全屏 (1920 x 1080),然后通过主线程中的 Pygame blit() 函数显示。

将最终输出分辨率更改为 640 x 360 与 1920 x 1080 相比,将 FPS 提高到 40 左右;但是,我希望生成的图像全屏显示,而不是 640 x 360。

我的研究

在 OpenCV Python 中更快地调整图像大小描述了一个类似的问题,但我相信我的问题有所不同,因为我的任务处于一个 while 循环中,该循环旨在无限期地运行直到关闭。

问题

所以我的基本问题是,有没有一种替代方法可以用来拍摄最初的小图像(640 x 360),对其执行对象检测,并将其调整为更大(1920 x 1080)以显示全屏,同时实现接近的 FPS 60?

4

1 回答 1

2

也许 cv.pyrUp比一般的要快cv.resize

但是,您的代码可能很慢,因为您不只是在升级……您正在运行神经网络!

你应该预料到你做的所有其他事情也需要一些时间。

您还应该期望其他代码的速度与输入的大小成正比,也就是说,如果您将输入放大 9 倍(640x360 -> 1920x1080),则它完全是造成速度下降的原因。

您应该寻找“分析”您的代码的方法。分析意味着测量每条指令的执行时间。一种不太严格的方法是time.perf_counter()在不同的代码块周围使用和计算时间差异。

于 2021-08-26T16:15:42.360 回答