async2v.fields module

Summary

Classes:

AveragingOutput Averaging output field
Buffer Provides all events received since the last processing step
DoubleBufferedField Abstract base class for most input fields
History Provides the last maxlen events
InputField Abstract base class for all input fields
InputQueue Unmanaged input field appending incoming events to a queue
Latest Provides the latest received event to the component
LatestBy Provides the latest received event to the component by category
Output All-purpose output field

Reference

class InputField(key)

Bases: typing.Generic

Abstract base class for all input fields

set(new)

Push new event into the field.

This method is used by the framework to push events to components. You should not need to call this method from your production code.

Return type:None
class DoubleBufferedField(key, trigger=False)

Bases: async2v.fields.InputField, typing.Generic

Abstract base class for most input fields

Implements double buffering to guarantee components stable content of their input fields during one processing step, even if new events are arriving meanwhile.

Parameters:
  • key (str) – Address that connects output to input fields
  • trigger (bool) – Set to True to trigger processing step when a new Event arrives. Only allowed within a EventDrivenComponent.
switch()

Switch the double buffer.

This method is used by the framework to make new values available for the next processing step. You should not need to call this method from your production code.

Return type:None
class Latest(key, trigger=False)

Bases: async2v.fields.DoubleBufferedField, typing.Generic

Provides the latest received event to the component

Parameters:
  • key (str) – Address that connects output to input fields
  • trigger (bool) – Set to True to trigger processing step when a new Event arrives. Only allowed within a EventDrivenComponent.
event

Latest received event

None if no event has been received.

Return type:Event[~T]
value

Value of latest received event

None if no event has been received.

Return type:Optional[~T]
timestamp

Timestamp of the latest received event.

Return type:float
class LatestBy(key, classifier, trigger=False)

Bases: async2v.fields.DoubleBufferedField

Provides the latest received event to the component by category

Incoming events are assigned to categories by the given classifier function. For each event class, the latest event is retained (the same way as in the Latest field). The latest events per category can be accessed as dict.

Example storing the latest message of a specific length:
>>> message_by_length = LatestBy[str, int]('message', lambda m: len(m))
>>> # The following 4 lines simulate the framework delivering events
>>> message_by_length.set(Event[str]('message', 'Hello'))
>>> message_by_length.set(Event[str]('message', 'World'))
>>> message_by_length.set(Event[str]('message', 'Hello World!'))
>>> message_by_length.switch()
>>> # The latest values are available by category
>>> message_by_length.value_dict
{5: 'World', 12: 'Hello World!'}
Parameters:
  • key (str) – Address that connects output to input fields
  • classifier (Callable[[~T], ~K]) – Function to sort input events of type Event[T] into categories of type K.
  • trigger (bool) – Set to True to trigger processing step when a new Event arrives. Only allowed within a EventDrivenComponent.
events

Latest received events per category

Empty dict if no event has been received.

Return type:Dict[~K, Event[~T]]
value_dict

Values of latest received events per category

Empty dict if no event has been received.

Return type:Dict[~K, ~T]
timestamps

Timestamps of latest received events per category

Empty dict if no event has been received.

Return type:Dict[~K, float]
class Buffer(key, maxlen=None, trigger=False)

Bases: async2v.fields.DoubleBufferedField, typing.Generic

Provides all events received since the last processing step

In the next processing step, the buffer contains only events received after the ones from the previous step.

Parameters:
  • key (str) – Address that connects output to input fields
  • trigger (bool) – Set to True to trigger processing step when a new Event arrives. Only allowed within a EventDrivenComponent.
events

All events received since the last processing step in received order.

Empty list if no event has been received.

Return type:List[Event[~T]]
values

Values of all events received since the last processing step in received order.

Empty list if no event has been received.

Return type:List[~T]
timestamps

Timestamps of all events received since the last processing step in received order.

Empty list if no event has been received.

Return type:List[float]
class History(key, maxlen, trigger=False)

Bases: async2v.fields.DoubleBufferedField, typing.Generic

Provides the last maxlen events

Other than the Buffer field, a History field may provide events that have already been processed again, as the events are not cleared between processing steps. It works like a Latest field that can store more than one historic value.

Parameters:
  • key (str) – Address that connects output to input fields
  • maxlen (int) – Maximum number of events to retain
  • trigger (bool) – Set to True to trigger processing step when a new Event arrives. Only allowed within a EventDrivenComponent.
events

The last maxlen events in received order.

Empty list if no event has been received.

Return type:List[Event[~T]]
values

The values of the last maxlen events in received order.

Empty list if no event has been received.

Return type:List[~T]
timestamps

The timestamps of the last maxlen events in received order.

Empty list if no event has been received.

Return type:List[float]
class InputQueue(key, maxlen=None)

Bases: async2v.fields.InputField, typing.Generic

Unmanaged input field appending incoming events to a queue

This input field is useful for components that don’t rely on the framework to trigger processing steps (subclasses of BareComponent). Internally, a deque is used, which is safe for threading scenarios (as long asjyou use popleft() to read events).

queue

Get the input deque, which contains incoming events of type Event. New events are appended at the end of the queue by the framework.

Return type:deque
class Output(key)

Bases: typing.Generic

All-purpose output field

Push events here during processing. They will be routed to all relevant input fields (matched by key).

Parameters:key (str) – Address that connects output to input fields
set_queue(queue_)

This method is used by the framework to inject the central application queue. You should not need to call this method from your production code.

push(value, timestamp=None)

Push a new output value

Parameters:
  • value (~T) – Payload
  • timestamp (Optional[float]) – Will be set to current time if not given. Set this field if you want to propagate the timestamp of a source event.
Return type:

None

class AveragingOutput(key, count=None, interval=None)

Bases: async2v.fields.Output, typing.Generic

Averaging output field

Push the average of the values collected every count values or after the given interval, whichever happens first. The value type T needs to support __add__(T, T) and __truediv__(T, int) to compute a meaningful average.

An event can only be emitted during a call to push. If the interval is expired but no further values are pushed, no output is generated.

This field is intended for special use cases like metrics. For example, it is used by the framework to report performance indicators (fps, processing time).

Parameters:
  • key (str) – Address that connects output to input fields
  • count (Optional[int]) – Maximum number of values to average over
  • interval (Optional[float]) – Maximum time in seconds to collect values to average over
push(value, timestamp=None)

Push a new output value

Depending on the configuration if this field, not all pushed values cause new events to be emitted.

Parameters:
  • value (~T) – Payload
  • timestamp (Optional[float]) – Will be set to current time if not given. Set this field if you want to propagate the timestamp of a source event.