Asynchronous Filter¶
This chapter of the tutorial shows you a pattern to perform expensive processing in an asynchronous way, while keeping a smooth & fast video stream.
We start with the result of the previous chapter and modify the filter to use one of the OpenCV example HOG detectors for person detection.
Because it is so simple and you can test it with your webcam and yourself running around in front of it, it serves as a perfect example for this chapter. Also, the person detection is performance-hungry enough (as of current hardware) to show the problem we get with the synchronous approach.
The code examples included in this chapter are available as ready-to-run examples under examples/tutorial/03_*.py
.
Try synchronous filter first¶
Apart from a bit of renaming, we modify the filter to do two things:
- Detect people on the input image
- Draw the detected people on a copy of the input image (this is our output image)
#!/usr/bin/env python3
# PYTHON_ARGCOMPLETE_OK
import cv2
from async2v.application import Application
from async2v.cli import ApplicationLauncher
from async2v.components.base import EventDrivenComponent
from async2v.components.opencv.video import VideoSource, Frame
from async2v.components.pygame.display import OpenCvDebugDisplay, OpenCvDisplay
from async2v.components.pygame.main import MainWindow
from async2v.event import OPENCV_FRAME_EVENT
from async2v.fields import Latest, Output
class PersonDetectFilter(EventDrivenComponent):
def __init__(self, input_key: str, output_key: str):
self.input: Latest[Frame] = Latest(key=input_key, trigger=True)
self.output: Output[Frame] = Output(key=output_key)
self.debug_output: Output[Frame] = Output(key=OPENCV_FRAME_EVENT)
self.hog = cv2.HOGDescriptor()
self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
async def process(self) -> None:
people, weights = self.hog.detectMultiScale(self.input.value.image, scale=1.01)
output_image = self.input.value.image.copy()
for (x, y, w, h) in people:
cv2.rectangle(output_image, (x, y), (x + w, y + h), (255, 255, 255), 2)
output_frame = Frame(output_image, source=self.id)
self.output.push(output_frame)
self.debug_output.push(output_frame)
class Launcher(ApplicationLauncher):
def __init__(self):
super().__init__()
self.add_configurator(MainWindow.configurator())
self.add_configurator(VideoSource.configurator())
def register_application_components(self, args, app: Application):
displays = [
OpenCvDisplay('drawn_people'),
OpenCvDebugDisplay(),
]
main_window = MainWindow(displays, config=MainWindow.configurator().config_from_args(args))
video_source = VideoSource(config=VideoSource.configurator().config_from_args(args))
person_detect_filter = PersonDetectFilter('source', 'drawn_people')
app.register(main_window, video_source, person_detect_filter)
def main():
Launcher().main()
if __name__ == '__main__':
main()
As of now, that is not so different from our previous example. But if you run the example, you should notice that the
video stream is not running smooth anymore. When you switch to debug mode, you should see the reason: The processing
time for the PersonDetectFilter
is very high. You should notice that the processing times of other components are
also increasing – all components share the same thread, i.e. every computation done directly in one of the
process()
methods takes a bite out of the available CPU time of all components.
To solve this issue, we need to do two things.
Run the expensive operation in a separate thread¶
First, we need to run the expensive detectMultiScale
call in a separate thread, allowing the asyncio main loop to
continue and process the other components. For this, async2v has a convenience utility method run_in_executor
, which
is basically just a very thin wrapper around the corresponding asyncio method:
#!/usr/bin/env python3
# PYTHON_ARGCOMPLETE_OK
import cv2
from async2v.application import Application
from async2v.cli import ApplicationLauncher
from async2v.components.base import EventDrivenComponent
from async2v.components.opencv.video import VideoSource, Frame
from async2v.components.pygame.display import OpenCvDebugDisplay, OpenCvDisplay
from async2v.components.pygame.main import MainWindow
from async2v.event import OPENCV_FRAME_EVENT
from async2v.fields import Latest, Output
from async2v.util import run_in_executor
class PersonDetectFilter(EventDrivenComponent):
def __init__(self, input_key: str, output_key: str):
self.input: Latest[Frame] = Latest(key=input_key, trigger=True)
self.output: Output[Frame] = Output(key=output_key)
self.debug_output: Output[Frame] = Output(key=OPENCV_FRAME_EVENT)
self.hog = cv2.HOGDescriptor()
self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
async def process(self) -> None:
people, weights = await run_in_executor(self.detect, self.input.value.image)
output_image = self.input.value.image.copy()
for (x, y, w, h) in people:
cv2.rectangle(output_image, (x, y), (x + w, y + h), (255, 255, 255), 2)
output_frame = Frame(output_image, source=self.id)
self.output.push(output_frame)
self.debug_output.push(output_frame)
def detect(self, image):
return self.hog.detectMultiScale(image, scale=1.01)
class Launcher(ApplicationLauncher):
def __init__(self):
super().__init__()
self.add_configurator(MainWindow.configurator())
self.add_configurator(VideoSource.configurator())
def register_application_components(self, args, app: Application):
displays = [
OpenCvDisplay('drawn_people'),
OpenCvDebugDisplay(),
]
main_window = MainWindow(displays, config=MainWindow.configurator().config_from_args(args))
video_source = VideoSource(config=VideoSource.configurator().config_from_args(args))
person_detect_filter = PersonDetectFilter('source', 'drawn_people')
app.register(main_window, video_source, person_detect_filter)
def main():
Launcher().main()
if __name__ == '__main__':
main()
This should already be a huge improvement, as now the application can continue to run the process steps in other components while the person detection is done, especially updating the main window and reading from the video source.
But we can do better.
Extract the expensive operation into an asynchronous component¶
The framerate of the main display is still limited to the framerate of the PersonDetectFilter
.
This bottleneck can be bypassed by using the last known detected persons as long as we don’t get new ones, but keep
drawing them on the current video images as they arrive. For this, we need to separate detection and drawing of
persons into two components – an asynchronous PersonDetectorFilter
and a synchronous PersonDrawFilter
.
#!/usr/bin/env python3
# PYTHON_ARGCOMPLETE_OK
from typing import List
import cv2
from async2v.application import Application
from async2v.cli import ApplicationLauncher
from async2v.components.base import EventDrivenComponent
from async2v.components.opencv.video import VideoSource, Frame
from async2v.components.pygame.display import OpenCvDebugDisplay, OpenCvDisplay
from async2v.components.pygame.main import MainWindow
from async2v.event import OPENCV_FRAME_EVENT
from async2v.fields import Latest, Output
from async2v.util import run_in_executor
class PersonDetectFilter(EventDrivenComponent):
def __init__(self, input_key: str, output_key: str):
self.input: Latest[Frame] = Latest(key=input_key, trigger=True)
self.output: Output[List] = Output(key=output_key)
self.hog = cv2.HOGDescriptor()
self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
async def process(self) -> None:
people, weights = await run_in_executor(self.detect, self.input.value.image)
self.output.push(people)
def detect(self, image):
return self.hog.detectMultiScale(image, scale=1.01)
class PersonDrawFilter(EventDrivenComponent):
def __init__(self, input_key: str, people_key: str, output_key: str):
self.input: Latest[Frame] = Latest(key=input_key, trigger=True)
self.people: Latest[List] = Latest(key=people_key)
self.output: Output[Frame] = Output(key=output_key)
self.debug_output: Output[Frame] = Output(key=OPENCV_FRAME_EVENT)
async def process(self) -> None:
output_image = self.input.value.image.copy()
if self.people.value is not None:
for (x, y, w, h) in self.people.value:
cv2.rectangle(output_image, (x, y), (x + w, y + h), (255, 255, 255), 2)
output_frame = Frame(output_image, source=self.id)
self.output.push(output_frame)
self.debug_output.push(output_frame)
class Launcher(ApplicationLauncher):
def __init__(self):
super().__init__()
self.add_configurator(MainWindow.configurator())
self.add_configurator(VideoSource.configurator())
def register_application_components(self, args, app: Application):
displays = [
OpenCvDisplay('drawn_people'),
OpenCvDebugDisplay(),
]
main_window = MainWindow(displays, config=MainWindow.configurator().config_from_args(args))
video_source = VideoSource(config=VideoSource.configurator().config_from_args(args))
person_detect_filter = PersonDetectFilter('source', 'people')
person_draw_filter = PersonDrawFilter('source', 'people', 'drawn_people')
app.register(main_window, video_source, person_detect_filter, person_draw_filter)
def main():
Launcher().main()
if __name__ == '__main__':
main()
Now we get a smooth video stream, no matter how slow the person detector runs. Every time the detection finishes,
a new list of people is pushed to the people
event key. From then on, the PersonDrawFilter
uses that list to
draw the rectangles on the video frames. The only downside is that outdated rectangles are drawn onto the video stream.
But usually, a smooth video stream is preferable.
There are a few more things to notice:
- The
PersonDrawFilter
now has two input fields – a triggering one and a non-triggering. This is intentional – we always want to process every input image when it arrives, but we don’t need to send an extra frame when a detection step is complete. - There is a
None
check for thepeople
input. This is necessary, as the processing step can be triggered before the firstpeople
event has arrived. In that case,people.value
isNone
. - The
PersonDetectFilter
does not push frames to the debug event key, as it does not produce frames directly – the debug display would not know how to handle the list ofpeople
pushed to thepeople
event key.
Draw a component diagram¶
We see that the synchronous PersonDrawFilter
lies on the path of our video live stream, while
the asynchronous PersonDetectFilter
is bypassed.