event_bus
Adapted from the Griptape AI Framework documentation.
EventBus = _EventBus()
module-attribute_event_listeners = ContextVar('event_listeners', default=None)
module-attribute
Bases:
SingletonMixin
Source Code in griptape/events/event_bus.py
@define class _EventBus(SingletonMixin): @property def event_listeners(self) -> list[EventListener]: event_listeners_val = _event_listeners.get() if event_listeners_val is None: event_listeners_val = [] _event_listeners.set(event_listeners_val) return event_listeners_val @event_listeners.setter def event_listeners(self, event_listeners: list[EventListener]) -> None: _event_listeners.set(event_listeners) def add_event_listeners(self, event_listeners: list[EventListener]) -> list[EventListener]: return [self.add_event_listener(event_listener) for event_listener in event_listeners] def remove_event_listeners(self, event_listeners: list[EventListener]) -> None: for event_listener in event_listeners: self.remove_event_listener(event_listener) def add_event_listener(self, event_listener: EventListener) -> EventListener: if event_listener not in self.event_listeners: self.event_listeners = self.event_listeners + [event_listener] return event_listener def remove_event_listener(self, event_listener: EventListener) -> None: if event_listener in self.event_listeners: self.event_listeners = [listener for listener in self.event_listeners if listener != event_listener] def publish_event(self, event: BaseEvent, *, flush: bool = False) -> None: for event_listener in self.event_listeners: event_listener.publish_event(event, flush=flush) def clear_event_listeners(self) -> None: self.event_listeners = []
event_listeners
property writable
add_event_listener(event_listener)
Source Code in griptape/events/event_bus.py
def add_event_listener(self, event_listener: EventListener) -> EventListener: if event_listener not in self.event_listeners: self.event_listeners = self.event_listeners + [event_listener] return event_listener
add_event_listeners(event_listeners)
Source Code in griptape/events/event_bus.py
def add_event_listeners(self, event_listeners: list[EventListener]) -> list[EventListener]: return [self.add_event_listener(event_listener) for event_listener in event_listeners]
clear_event_listeners()
Source Code in griptape/events/event_bus.py
def clear_event_listeners(self) -> None: self.event_listeners = []
publish_event(event, *, flush=False)
Source Code in griptape/events/event_bus.py
def publish_event(self, event: BaseEvent, *, flush: bool = False) -> None: for event_listener in self.event_listeners: event_listener.publish_event(event, flush=flush)
remove_event_listener(event_listener)
Source Code in griptape/events/event_bus.py
def remove_event_listener(self, event_listener: EventListener) -> None: if event_listener in self.event_listeners: self.event_listeners = [listener for listener in self.event_listeners if listener != event_listener]
remove_event_listeners(event_listeners)
Source Code in griptape/events/event_bus.py
def remove_event_listeners(self, event_listeners: list[EventListener]) -> None: for event_listener in event_listeners: self.remove_event_listener(event_listener)
Could this page be better? Report a problem or suggest an addition!