stream
Adapted from the Griptape AI Framework documentation.
A wrapper for Structures filters Events relevant to text output and converts them to TextArtifacts.
Attributes
Source Code in griptape/utils/stream.py
@define class Stream: """A wrapper for Structures filters Events relevant to text output and converts them to TextArtifacts. Attributes: structure: The Structure to wrap. """ structure: Structure = field() event_types: list[type[BaseEvent]] = field( default=Factory(lambda: [TextChunkEvent, ActionChunkEvent, FinishPromptEvent, FinishStructureRunEvent]) ) def run(self, *args) -> Iterator[TextArtifact]: action_str = "" for event in self.structure.run_stream(*args, event_types=self.event_types): if isinstance(event, FinishPromptEvent): yield TextArtifact(value="\n") elif isinstance(event, TextChunkEvent): yield TextArtifact(value=event.token) elif isinstance(event, ActionChunkEvent): if event.tag is not None and event.name is not None and event.path is not None: yield TextArtifact(f"{event.name}.{event.tag} ({event.path})") if event.partial_input is not None: action_str += event.partial_input try: yield TextArtifact(json.dumps(json.loads(action_str), indent=2)) action_str = "" except Exception: pass
event_types = field(default=Factory(lambda: [TextChunkEvent, ActionChunkEvent, FinishPromptEvent, FinishStructureRunEvent]))
class-attribute instance-attributestructure = field()
class-attribute instance-attribute
run(*args)
Source Code in griptape/utils/stream.py
def run(self, *args) -> Iterator[TextArtifact]: action_str = "" for event in self.structure.run_stream(*args, event_types=self.event_types): if isinstance(event, FinishPromptEvent): yield TextArtifact(value="\n") elif isinstance(event, TextChunkEvent): yield TextArtifact(value=event.token) elif isinstance(event, ActionChunkEvent): if event.tag is not None and event.name is not None and event.path is not None: yield TextArtifact(f"{event.name}.{event.tag} ({event.path})") if event.partial_input is not None: action_str += event.partial_input try: yield TextArtifact(json.dumps(json.loads(action_str), indent=2)) action_str = "" except Exception: pass
- On this page
- Attributes
- run(*args)
Could this page be better? Report a problem or suggest an addition!