Asynchronous Filter

This chapter of the tutorial shows you a pattern to perform expensive processing in an asynchronous way, while keeping a smooth & fast video stream.

We start with the result of the previous chapter and modify the filter to use one of the OpenCV example HOG detectors for person detection.

Because it is so simple and you can test it with your webcam and yourself running around in front of it, it serves as a perfect example for this chapter. Also, the person detection is performance-hungry enough (as of current hardware) to show the problem we get with the synchronous approach.

The code examples included in this chapter are available as ready-to-run examples under examples/tutorial/03_*.py.

Try synchronous filter first

Apart from a bit of renaming, we modify the filter to do two things:

  • Detect people on the input image
  • Draw the detected people on a copy of the input image (this is our output image)
#!/usr/bin/env python3
# PYTHON_ARGCOMPLETE_OK
import cv2

from async2v.application import Application
from async2v.cli import ApplicationLauncher
from async2v.components.base import EventDrivenComponent
from async2v.components.opencv.video import VideoSource, Frame
from async2v.components.pygame.display import OpenCvDebugDisplay, OpenCvDisplay
from async2v.components.pygame.main import MainWindow
from async2v.event import OPENCV_FRAME_EVENT
from async2v.fields import Latest, Output


class PersonDetectFilter(EventDrivenComponent):

    def __init__(self, input_key: str, output_key: str):
        self.input: Latest[Frame] = Latest(key=input_key, trigger=True)
        self.output: Output[Frame] = Output(key=output_key)
        self.debug_output: Output[Frame] = Output(key=OPENCV_FRAME_EVENT)
        self.hog = cv2.HOGDescriptor()
        self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())

    async def process(self) -> None:
        people, weights = self.hog.detectMultiScale(self.input.value.image, scale=1.01)
        output_image = self.input.value.image.copy()
        for (x, y, w, h) in people:
            cv2.rectangle(output_image, (x, y), (x + w, y + h), (255, 255, 255), 2)
        output_frame = Frame(output_image, source=self.id)
        self.output.push(output_frame)
        self.debug_output.push(output_frame)


class Launcher(ApplicationLauncher):

    def __init__(self):
        super().__init__()
        self.add_configurator(MainWindow.configurator())
        self.add_configurator(VideoSource.configurator())

    def register_application_components(self, args, app: Application):
        displays = [
            OpenCvDisplay('drawn_people'),
            OpenCvDebugDisplay(),
        ]
        main_window = MainWindow(displays, config=MainWindow.configurator().config_from_args(args))
        video_source = VideoSource(config=VideoSource.configurator().config_from_args(args))
        person_detect_filter = PersonDetectFilter('source', 'drawn_people')
        app.register(main_window, video_source, person_detect_filter)


def main():
    Launcher().main()


if __name__ == '__main__':
    main()

As of now, that is not so different from our previous example. But if you run the example, you should notice that the video stream is not running smooth anymore. When you switch to debug mode, you should see the reason: The processing time for the PersonDetectFilter is very high. You should notice that the processing times of other components are also increasing – all components share the same thread, i.e. every computation done directly in one of the process() methods takes a bite out of the available CPU time of all components.

To solve this issue, we need to do two things.

Run the expensive operation in a separate thread

First, we need to run the expensive detectMultiScale call in a separate thread, allowing the asyncio main loop to continue and process the other components. For this, async2v has a convenience utility method run_in_executor, which is basically just a very thin wrapper around the corresponding asyncio method:

#!/usr/bin/env python3
# PYTHON_ARGCOMPLETE_OK
import cv2

from async2v.application import Application
from async2v.cli import ApplicationLauncher
from async2v.components.base import EventDrivenComponent
from async2v.components.opencv.video import VideoSource, Frame
from async2v.components.pygame.display import OpenCvDebugDisplay, OpenCvDisplay
from async2v.components.pygame.main import MainWindow
from async2v.event import OPENCV_FRAME_EVENT
from async2v.fields import Latest, Output
from async2v.util import run_in_executor


class PersonDetectFilter(EventDrivenComponent):

    def __init__(self, input_key: str, output_key: str):
        self.input: Latest[Frame] = Latest(key=input_key, trigger=True)
        self.output: Output[Frame] = Output(key=output_key)
        self.debug_output: Output[Frame] = Output(key=OPENCV_FRAME_EVENT)
        self.hog = cv2.HOGDescriptor()
        self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())

    async def process(self) -> None:
        people, weights = await run_in_executor(self.detect, self.input.value.image)
        output_image = self.input.value.image.copy()
        for (x, y, w, h) in people:
            cv2.rectangle(output_image, (x, y), (x + w, y + h), (255, 255, 255), 2)
        output_frame = Frame(output_image, source=self.id)
        self.output.push(output_frame)
        self.debug_output.push(output_frame)

    def detect(self, image):
        return self.hog.detectMultiScale(image, scale=1.01)


class Launcher(ApplicationLauncher):

    def __init__(self):
        super().__init__()
        self.add_configurator(MainWindow.configurator())
        self.add_configurator(VideoSource.configurator())

    def register_application_components(self, args, app: Application):
        displays = [
            OpenCvDisplay('drawn_people'),
            OpenCvDebugDisplay(),
        ]
        main_window = MainWindow(displays, config=MainWindow.configurator().config_from_args(args))
        video_source = VideoSource(config=VideoSource.configurator().config_from_args(args))
        person_detect_filter = PersonDetectFilter('source', 'drawn_people')
        app.register(main_window, video_source, person_detect_filter)


def main():
    Launcher().main()


if __name__ == '__main__':
    main()

This should already be a huge improvement, as now the application can continue to run the process steps in other components while the person detection is done, especially updating the main window and reading from the video source.

But we can do better.

Extract the expensive operation into an asynchronous component

The framerate of the main display is still limited to the framerate of the PersonDetectFilter. This bottleneck can be bypassed by using the last known detected persons as long as we don’t get new ones, but keep drawing them on the current video images as they arrive. For this, we need to separate detection and drawing of persons into two components – an asynchronous PersonDetectorFilter and a synchronous PersonDrawFilter.

#!/usr/bin/env python3
# PYTHON_ARGCOMPLETE_OK
from typing import List

import cv2

from async2v.application import Application
from async2v.cli import ApplicationLauncher
from async2v.components.base import EventDrivenComponent
from async2v.components.opencv.video import VideoSource, Frame
from async2v.components.pygame.display import OpenCvDebugDisplay, OpenCvDisplay
from async2v.components.pygame.main import MainWindow
from async2v.event import OPENCV_FRAME_EVENT
from async2v.fields import Latest, Output
from async2v.util import run_in_executor


class PersonDetectFilter(EventDrivenComponent):

    def __init__(self, input_key: str, output_key: str):
        self.input: Latest[Frame] = Latest(key=input_key, trigger=True)
        self.output: Output[List] = Output(key=output_key)
        self.hog = cv2.HOGDescriptor()
        self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())

    async def process(self) -> None:
        people, weights = await run_in_executor(self.detect, self.input.value.image)
        self.output.push(people)

    def detect(self, image):
        return self.hog.detectMultiScale(image, scale=1.01)


class PersonDrawFilter(EventDrivenComponent):

    def __init__(self, input_key: str, people_key: str, output_key: str):
        self.input: Latest[Frame] = Latest(key=input_key, trigger=True)
        self.people: Latest[List] = Latest(key=people_key)
        self.output: Output[Frame] = Output(key=output_key)
        self.debug_output: Output[Frame] = Output(key=OPENCV_FRAME_EVENT)

    async def process(self) -> None:
        output_image = self.input.value.image.copy()
        if self.people.value is not None:
            for (x, y, w, h) in self.people.value:
                cv2.rectangle(output_image, (x, y), (x + w, y + h), (255, 255, 255), 2)
        output_frame = Frame(output_image, source=self.id)
        self.output.push(output_frame)
        self.debug_output.push(output_frame)


class Launcher(ApplicationLauncher):

    def __init__(self):
        super().__init__()
        self.add_configurator(MainWindow.configurator())
        self.add_configurator(VideoSource.configurator())

    def register_application_components(self, args, app: Application):
        displays = [
            OpenCvDisplay('drawn_people'),
            OpenCvDebugDisplay(),
        ]
        main_window = MainWindow(displays, config=MainWindow.configurator().config_from_args(args))
        video_source = VideoSource(config=VideoSource.configurator().config_from_args(args))
        person_detect_filter = PersonDetectFilter('source', 'people')
        person_draw_filter = PersonDrawFilter('source', 'people', 'drawn_people')
        app.register(main_window, video_source, person_detect_filter, person_draw_filter)


def main():
    Launcher().main()


if __name__ == '__main__':
    main()

Now we get a smooth video stream, no matter how slow the person detector runs. Every time the detection finishes, a new list of people is pushed to the people event key. From then on, the PersonDrawFilter uses that list to draw the rectangles on the video frames. The only downside is that outdated rectangles are drawn onto the video stream. But usually, a smooth video stream is preferable.

There are a few more things to notice:

  • The PersonDrawFilter now has two input fields – a triggering one and a non-triggering. This is intentional – we always want to process every input image when it arrives, but we don’t need to send an extra frame when a detection step is complete.
  • There is a None check for the people input. This is necessary, as the processing step can be triggered before the first people event has arrived. In that case, people.value is None.
  • The PersonDetectFilter does not push frames to the debug event key, as it does not produce frames directly – the debug display would not know how to handle the list of people pushed to the people event key.

Draw a component diagram

../_images/tutorial_03_03_graph.png

We see that the synchronous PersonDrawFilter lies on the path of our video live stream, while the asynchronous PersonDetectFilter is bypassed.