structures
__all__ = ['Agent', 'Pipeline', 'Structure', 'Workflow']
module-attribute
Bases:
Structure
Source Code in griptape/structures/agent.py
@define class Agent(Structure): input: Union[str, list, tuple, BaseArtifact, Callable[[BaseTask], BaseArtifact]] = field( default=lambda task: task.full_context["args"][0] if task.full_context["args"] else TextArtifact(value=""), ) stream: Optional[bool] = field(default=None, kw_only=True) prompt_driver: Optional[BasePromptDriver] = field(default=None, kw_only=True) output_schema: Optional[Union[Schema, type[BaseModel]]] = field(default=None, kw_only=True) tools: list[BaseTool] = field(factory=list, kw_only=True) max_meta_memory_entries: Optional[int] = field(default=20, kw_only=True) fail_fast: bool = field(default=False, kw_only=True) _tasks: list[Union[BaseTask, list[BaseTask]]] = field( factory=list, kw_only=True, alias="tasks", metadata={"serializable": True} ) @fail_fast.validator # pyright: ignore[reportAttributeAccessIssue] def validate_fail_fast(self, _: Attribute, fail_fast: bool) -> None: # noqa: FBT001 if fail_fast: raise ValueError("Agents cannot fail fast, as they can only have 1 task.") @prompt_driver.validator # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] def validate_prompt_driver(self, _: Attribute, prompt_driver: Optional[BasePromptDriver]) -> None: if prompt_driver is not None and self.stream is not None: warnings.warn( "`Agent.prompt_driver` is set, but `Agent.stream` was provided. `Agent.stream` will be ignored. This will be an error in the future.", UserWarning, stacklevel=2, ) @_tasks.validator # pyright: ignore[reportAttributeAccessIssue] def validate_tasks(self, _: Attribute, tasks: list) -> None: if tasks and self.prompt_driver is not None: warnings.warn( "`Agent.tasks` is set, but `Agent.prompt_driver` was provided. `Agent.prompt_driver` will be ignored. This will be an error in the future.", UserWarning, stacklevel=2, ) def __attrs_post_init__(self) -> None: super().__attrs_post_init__() if len(self.tasks) == 0: self._init_task() @property def task(self) -> BaseTask: return self.tasks[0] def add_task(self, task: BaseTask) -> BaseTask: self._tasks.clear() task.preprocess(self) self._tasks.append(task) return task def add_tasks(self, *tasks: BaseTask | list[BaseTask]) -> list[BaseTask]: if len(tasks) > 1: raise ValueError("Agents can only have one task.") return super().add_tasks(*tasks) @observable def try_run(self, *args) -> Agent: self.task.run() return self def _init_task(self) -> None: if self.stream is None: with validators.disabled(): self.stream = Defaults.drivers_config.prompt_driver.stream if self.prompt_driver is None: with validators.disabled(): prompt_driver = evolve(Defaults.drivers_config.prompt_driver, stream=self.stream) self.prompt_driver = prompt_driver else: prompt_driver = self.prompt_driver task = PromptTask( self.input, prompt_driver=prompt_driver, tools=self.tools, output_schema=self.output_schema, max_meta_memory_entries=self.max_meta_memory_entries, ) self.add_task(task)
_tasks = field(factory=list, kw_only=True, alias='tasks', metadata={'serializable': True})
class-attribute instance-attributefail_fast = field(default=False, kw_only=True)
class-attribute instance-attributeinput = field(default=lambda task: task.full_context['args'][0] if task.full_context['args'] else TextArtifact(value=''))
class-attribute instance-attributemax_meta_memory_entries = field(default=20, kw_only=True)
class-attribute instance-attributeoutput_schema = field(default=None, kw_only=True)
class-attribute instance-attributeprompt_driver = field(default=None, kw_only=True)
class-attribute instance-attributestream = field(default=None, kw_only=True)
class-attribute instance-attributetask
propertytools = field(factory=list, kw_only=True)
class-attribute instance-attribute
attrs_post_init()
Source Code in griptape/structures/agent.py
def __attrs_post_init__(self) -> None: super().__attrs_post_init__() if len(self.tasks) == 0: self._init_task()
_init_task()
Source Code in griptape/structures/agent.py
def _init_task(self) -> None: if self.stream is None: with validators.disabled(): self.stream = Defaults.drivers_config.prompt_driver.stream if self.prompt_driver is None: with validators.disabled(): prompt_driver = evolve(Defaults.drivers_config.prompt_driver, stream=self.stream) self.prompt_driver = prompt_driver else: prompt_driver = self.prompt_driver task = PromptTask( self.input, prompt_driver=prompt_driver, tools=self.tools, output_schema=self.output_schema, max_meta_memory_entries=self.max_meta_memory_entries, ) self.add_task(task)
add_task(task)
Source Code in griptape/structures/agent.py
def add_task(self, task: BaseTask) -> BaseTask: self._tasks.clear() task.preprocess(self) self._tasks.append(task) return task
add_tasks(*tasks)
Source Code in griptape/structures/agent.py
def add_tasks(self, *tasks: BaseTask | list[BaseTask]) -> list[BaseTask]: if len(tasks) > 1: raise ValueError("Agents can only have one task.") return super().add_tasks(*tasks)
try_run(*args)
Source Code in griptape/structures/agent.py
@observable def try_run(self, *args) -> Agent: self.task.run() return self
validatefail_fast(, fail_fast)
Source Code in griptape/structures/agent.py
@fail_fast.validator # pyright: ignore[reportAttributeAccessIssue] def validate_fail_fast(self, _: Attribute, fail_fast: bool) -> None: # noqa: FBT001 if fail_fast: raise ValueError("Agents cannot fail fast, as they can only have 1 task.")
validateprompt_driver(, prompt_driver)
Source Code in griptape/structures/agent.py
@prompt_driver.validator # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] def validate_prompt_driver(self, _: Attribute, prompt_driver: Optional[BasePromptDriver]) -> None: if prompt_driver is not None and self.stream is not None: warnings.warn( "`Agent.prompt_driver` is set, but `Agent.stream` was provided. `Agent.stream` will be ignored. This will be an error in the future.", UserWarning, stacklevel=2, )
validatetasks(, tasks)
Source Code in griptape/structures/agent.py
@_tasks.validator # pyright: ignore[reportAttributeAccessIssue] def validate_tasks(self, _: Attribute, tasks: list) -> None: if tasks and self.prompt_driver is not None: warnings.warn( "`Agent.tasks` is set, but `Agent.prompt_driver` was provided. `Agent.prompt_driver` will be ignored. This will be an error in the future.", UserWarning, stacklevel=2, )
Pipeline
Bases:
Structure
Source Code in griptape/structures/pipeline.py
@define class Pipeline(Structure): def add_task(self, task: BaseTask) -> BaseTask: if (existing_task := self.try_find_task(task.id)) is not None: return existing_task task.preprocess(self) if self.output_task: self.output_task.child_ids.append(task.id) task.parent_ids.append(self.output_task.id) self._tasks.append(task) return task def insert_task(self, parent_task: BaseTask, task: BaseTask) -> BaseTask: task.preprocess(self) if parent_task.children: child_task = parent_task.children[0] task.child_ids.append(child_task.id) child_task.parent_ids.append(task.id) child_task.parent_ids.remove(parent_task.id) parent_task.child_ids.remove(child_task.id) task.parent_ids.append(parent_task.id) parent_task.child_ids.append(task.id) parent_index = self.tasks.index(parent_task) self._tasks.insert(parent_index + 1, task) return task @observable def try_run(self, *args) -> Pipeline: self.__run_from_task(self.input_task) return self def context(self, task: BaseTask) -> dict[str, Any]: context = super().context(task) context.update( { "parent_output": task.parents[0].output if task.parents else None, "task_outputs": self.task_outputs, "parent": task.parents[0] if task.parents else None, "child": task.children[0] if task.children else None, }, ) return context def __run_from_task(self, task: Optional[BaseTask]) -> None: if task is None or isinstance(task.run(), ErrorArtifact) and self.fail_fast: return self.__run_from_task(next(iter(task.children), None))
__run_from_task(task)
Source Code in griptape/structures/pipeline.py
def __run_from_task(self, task: Optional[BaseTask]) -> None: if task is None or isinstance(task.run(), ErrorArtifact) and self.fail_fast: return self.__run_from_task(next(iter(task.children), None))
add_task(task)
Source Code in griptape/structures/pipeline.py
def add_task(self, task: BaseTask) -> BaseTask: if (existing_task := self.try_find_task(task.id)) is not None: return existing_task task.preprocess(self) if self.output_task: self.output_task.child_ids.append(task.id) task.parent_ids.append(self.output_task.id) self._tasks.append(task) return task
context(task)
Source Code in griptape/structures/pipeline.py
def context(self, task: BaseTask) -> dict[str, Any]: context = super().context(task) context.update( { "parent_output": task.parents[0].output if task.parents else None, "task_outputs": self.task_outputs, "parent": task.parents[0] if task.parents else None, "child": task.children[0] if task.children else None, }, ) return context
insert_task(parent_task, task)
Source Code in griptape/structures/pipeline.py
def insert_task(self, parent_task: BaseTask, task: BaseTask) -> BaseTask: task.preprocess(self) if parent_task.children: child_task = parent_task.children[0] task.child_ids.append(child_task.id) child_task.parent_ids.append(task.id) child_task.parent_ids.remove(parent_task.id) parent_task.child_ids.remove(child_task.id) task.parent_ids.append(parent_task.id) parent_task.child_ids.append(task.id) parent_index = self.tasks.index(parent_task) self._tasks.insert(parent_index + 1, task) return task
try_run(*args)
Source Code in griptape/structures/pipeline.py
@observable def try_run(self, *args) -> Pipeline: self.__run_from_task(self.input_task) return self
Structure
Bases:
RuleMixin
, SerializableMixin
, RunnableMixin['Structure']
, ABC
Source Code in griptape/structures/structure.py
@define class Structure(RuleMixin, SerializableMixin, RunnableMixin["Structure"], ABC): id: str = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True, metadata={"serializable": True}) _tasks: list[Union[BaseTask, list[BaseTask]]] = field( factory=list, kw_only=True, alias="tasks", metadata={"serializable": True} ) conversation_memory: Optional[BaseConversationMemory] = field( default=Factory(lambda: ConversationMemory()), kw_only=True, metadata={"serializable": True}, ) conversation_memory_strategy: Literal["per_structure", "per_task"] = field( default="per_structure", kw_only=True, metadata={"serializable": True} ) task_memory: TaskMemory = field( default=Factory(lambda self: TaskMemory(), takes_self=True), kw_only=True, ) meta_memory: MetaMemory = field(default=Factory(lambda: MetaMemory()), kw_only=True) fail_fast: bool = field(default=True, kw_only=True, metadata={"serializable": True}) _execution_args: tuple = () _event_queue: Queue[BaseEvent] = field(default=Factory(lambda: Queue()), init=False) def __attrs_post_init__(self) -> None: tasks = self._tasks.copy() self._tasks.clear() self.add_tasks(*tasks) def __add__(self, other: BaseTask | list[BaseTask | list[BaseTask]]) -> list[BaseTask]: return self.add_tasks(*other) if isinstance(other, list) else self.add_tasks(other) @property def tasks(self) -> list[BaseTask]: tasks = [] for task in self._tasks: if isinstance(task, list): tasks.extend(task) else: tasks.append(task) return tasks @property def execution_args(self) -> tuple: return self._execution_args @property def input_task(self) -> Optional[BaseTask]: return self.tasks[0] if self.tasks else None @property def output_task(self) -> Optional[BaseTask]: return self.tasks[-1] if self.tasks else None @property def output(self) -> BaseArtifact: if self.output_task is None: raise ValueError("Structure has no output Task. Add a Task to the Structure to generate output.") if self.output_task.output is None: raise ValueError("Structure's output Task has no output. Run the Structure to generate output.") return self.output_task.output @property def task_outputs(self) -> dict[str, Optional[BaseArtifact]]: return {task.id: task.output for task in self.tasks} @property def finished_tasks(self) -> list[BaseTask]: return [s for s in self.tasks if s.is_finished()] def is_finished(self) -> bool: return all(not s.can_run() for s in self.tasks) def is_running(self) -> bool: return any(s for s in self.tasks if s.is_running()) def find_task(self, task_id: str) -> BaseTask: if (task := self.try_find_task(task_id)) is not None: return task raise ValueError(f"Task with id {task_id} doesn't exist.") def try_find_task(self, task_id: str) -> Optional[BaseTask]: for task in self.tasks: if task.id == task_id: return task return None def add_tasks(self, *tasks: BaseTask | list[BaseTask]) -> list[BaseTask]: added_tasks = [] for task in tasks: if isinstance(task, list): added_tasks.extend(self.add_tasks(*task)) else: added_tasks.append(self.add_task(task)) return added_tasks def context(self, task: BaseTask) -> dict[str, Any]: return {"args": self.execution_args, "structure": self} def resolve_relationships(self) -> None: task_by_id = {} for task in self.tasks: if task.id in task_by_id: raise ValueError(f"Duplicate task with id {task.id} found.") task_by_id[task.id] = task for task in self.tasks: # Ensure parents include this task as a child for parent_id in task.parent_ids: if parent_id not in task_by_id: raise ValueError(f"Task with id {parent_id} doesn't exist.") parent = task_by_id[parent_id] if task.id not in parent.child_ids: parent.child_ids.append(task.id) # Ensure children include this task as a parent for child_id in task.child_ids: if child_id not in task_by_id: raise ValueError(f"Task with id {child_id} doesn't exist.") child = task_by_id[child_id] if task.id not in child.parent_ids: child.parent_ids.append(task.id) @observable def before_run(self, args: Any) -> None: super().before_run(args) self._execution_args = args [task.reset() for task in self.tasks] if self.input_task is not None: EventBus.publish_event( StartStructureRunEvent( structure_id=self.id, input_task_input=self.input_task.input, input_task_output=self.input_task.output, ), ) self.resolve_relationships() @observable def after_run(self) -> None: super().after_run() if self.output_task is not None: if ( self.conversation_memory_strategy == "per_structure" and self.conversation_memory is not None and self.input_task is not None and self.output_task.output is not None ): run = Run(input=self.input_task.input, output=self.output_task.output) self.conversation_memory.add_run(run) EventBus.publish_event( FinishStructureRunEvent( structure_id=self.id, output_task_input=self.output_task.input, output_task_output=self.output_task.output, ), flush=True, ) @abstractmethod def add_task(self, task: BaseTask) -> BaseTask: ... @observable def run(self, *args) -> Structure: self.before_run(args) result = self.try_run(*args) self.after_run() return result @observable def run_stream(self, *args, event_types: Optional[list[type[BaseEvent]]] = None) -> Iterator[BaseEvent]: if event_types is None: event_types = [BaseEvent] elif FinishStructureRunEvent not in event_types: event_types = [*event_types, FinishStructureRunEvent] with EventListener(self._event_queue.put, event_types=event_types): t = Thread(target=with_contextvars(self.run), args=args) t.start() while True: event = self._event_queue.get() if isinstance(event, FinishStructureRunEvent) and event.structure_id == self.id: break else: yield event t.join() @abstractmethod def try_run(self, *args) -> Structure: ...
_event_queue = field(default=Factory(lambda: Queue()), init=False)
class-attribute instance-attribute_execution_args = ()
class-attribute instance-attribute_tasks = field(factory=list, kw_only=True, alias='tasks', metadata={'serializable': True})
class-attribute instance-attributeconversation_memory = field(default=Factory(lambda: ConversationMemory()), kw_only=True, metadata={'serializable': True})
class-attribute instance-attributeconversation_memory_strategy = field(default='per_structure', kw_only=True, metadata={'serializable': True})
class-attribute instance-attributeexecution_args
propertyfail_fast = field(default=True, kw_only=True, metadata={'serializable': True})
class-attribute instance-attributefinished_tasks
propertyid = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True, metadata={'serializable': True})
class-attribute instance-attributeinput_task
propertymeta_memory = field(default=Factory(lambda: MetaMemory()), kw_only=True)
class-attribute instance-attributeoutput
propertyoutput_task
propertytask_memory = field(default=Factory(lambda self: TaskMemory(), takes_self=True), kw_only=True)
class-attribute instance-attributetask_outputs
propertytasks
property
add(other)
Source Code in griptape/structures/structure.py
def __add__(self, other: BaseTask | list[BaseTask | list[BaseTask]]) -> list[BaseTask]: return self.add_tasks(*other) if isinstance(other, list) else self.add_tasks(other)
attrs_post_init()
Source Code in griptape/structures/structure.py
def __attrs_post_init__(self) -> None: tasks = self._tasks.copy() self._tasks.clear() self.add_tasks(*tasks)
add_task(task)abstractmethod
Source Code in griptape/structures/structure.py
@abstractmethod def add_task(self, task: BaseTask) -> BaseTask: ...
add_tasks(*tasks)
Source Code in griptape/structures/structure.py
def add_tasks(self, *tasks: BaseTask | list[BaseTask]) -> list[BaseTask]: added_tasks = [] for task in tasks: if isinstance(task, list): added_tasks.extend(self.add_tasks(*task)) else: added_tasks.append(self.add_task(task)) return added_tasks
after_run()
Source Code in griptape/structures/structure.py
@observable def after_run(self) -> None: super().after_run() if self.output_task is not None: if ( self.conversation_memory_strategy == "per_structure" and self.conversation_memory is not None and self.input_task is not None and self.output_task.output is not None ): run = Run(input=self.input_task.input, output=self.output_task.output) self.conversation_memory.add_run(run) EventBus.publish_event( FinishStructureRunEvent( structure_id=self.id, output_task_input=self.output_task.input, output_task_output=self.output_task.output, ), flush=True, )
before_run(args)
Source Code in griptape/structures/structure.py
@observable def before_run(self, args: Any) -> None: super().before_run(args) self._execution_args = args [task.reset() for task in self.tasks] if self.input_task is not None: EventBus.publish_event( StartStructureRunEvent( structure_id=self.id, input_task_input=self.input_task.input, input_task_output=self.input_task.output, ), ) self.resolve_relationships()
context(task)
Source Code in griptape/structures/structure.py
def context(self, task: BaseTask) -> dict[str, Any]: return {"args": self.execution_args, "structure": self}
find_task(task_id)
Source Code in griptape/structures/structure.py
def find_task(self, task_id: str) -> BaseTask: if (task := self.try_find_task(task_id)) is not None: return task raise ValueError(f"Task with id {task_id} doesn't exist.")
is_finished()
Source Code in griptape/structures/structure.py
def is_finished(self) -> bool: return all(not s.can_run() for s in self.tasks)
is_running()
Source Code in griptape/structures/structure.py
def is_running(self) -> bool: return any(s for s in self.tasks if s.is_running())
resolve_relationships()
Source Code in griptape/structures/structure.py
def resolve_relationships(self) -> None: task_by_id = {} for task in self.tasks: if task.id in task_by_id: raise ValueError(f"Duplicate task with id {task.id} found.") task_by_id[task.id] = task for task in self.tasks: # Ensure parents include this task as a child for parent_id in task.parent_ids: if parent_id not in task_by_id: raise ValueError(f"Task with id {parent_id} doesn't exist.") parent = task_by_id[parent_id] if task.id not in parent.child_ids: parent.child_ids.append(task.id) # Ensure children include this task as a parent for child_id in task.child_ids: if child_id not in task_by_id: raise ValueError(f"Task with id {child_id} doesn't exist.") child = task_by_id[child_id] if task.id not in child.parent_ids: child.parent_ids.append(task.id)
run(*args)
Source Code in griptape/structures/structure.py
@observable def run(self, *args) -> Structure: self.before_run(args) result = self.try_run(*args) self.after_run() return result
run_stream(*args, event_types=None)
Source Code in griptape/structures/structure.py
@observable def run_stream(self, *args, event_types: Optional[list[type[BaseEvent]]] = None) -> Iterator[BaseEvent]: if event_types is None: event_types = [BaseEvent] elif FinishStructureRunEvent not in event_types: event_types = [*event_types, FinishStructureRunEvent] with EventListener(self._event_queue.put, event_types=event_types): t = Thread(target=with_contextvars(self.run), args=args) t.start() while True: event = self._event_queue.get() if isinstance(event, FinishStructureRunEvent) and event.structure_id == self.id: break else: yield event t.join()
try_find_task(task_id)
Source Code in griptape/structures/structure.py
def try_find_task(self, task_id: str) -> Optional[BaseTask]: for task in self.tasks: if task.id == task_id: return task return None
try_run(*args)abstractmethod
Source Code in griptape/structures/structure.py
@abstractmethod def try_run(self, *args) -> Structure: ...
Workflow
Bases:
Structure
, FuturesExecutorMixin
Source Code in griptape/structures/workflow.py
@define class Workflow(Structure, FuturesExecutorMixin): @property def input_task(self) -> Optional[BaseTask]: return self.order_tasks()[0] if self.tasks else None @property def output_task(self) -> Optional[BaseTask]: return self.order_tasks()[-1] if self.tasks else None @property def input_tasks(self) -> list[BaseTask]: return [task for task in self.tasks if not task.parents] @property def output_tasks(self) -> list[BaseTask]: return [task for task in self.tasks if not task.children] @property def outputs(self) -> list[BaseArtifact]: return [task.output for task in self.output_tasks if task.output is not None] def add_task(self, task: BaseTask) -> BaseTask: if (existing_task := self.try_find_task(task.id)) is not None: return existing_task task.preprocess(self) self._tasks.append(task) return task def insert_tasks( self, parent_tasks: BaseTask | list[BaseTask], tasks: BaseTask | list[BaseTask], child_tasks: BaseTask | list[BaseTask], *, preserve_relationship: bool = False, ) -> list[BaseTask]: """Insert tasks between parent and child tasks in the workflow. Args: parent_tasks: The tasks that will be the parents of the new tasks. tasks: The tasks to insert between the parent and child tasks. child_tasks: The tasks that will be the children of the new tasks. preserve_relationship: Whether to preserve the parent/child relationship when inserting between parent and child tasks. """ if not isinstance(parent_tasks, list): parent_tasks = [parent_tasks] if not isinstance(tasks, list): tasks = [tasks] if not isinstance(child_tasks, list): child_tasks = [child_tasks] for task in tasks: self.insert_task(parent_tasks, task, child_tasks, preserve_relationship=preserve_relationship) return tasks def insert_task( self, parent_tasks: list[BaseTask], task: BaseTask, child_tasks: list[BaseTask], *, preserve_relationship: bool = False, ) -> BaseTask: task.preprocess(self) self.__link_task_to_children(task, child_tasks) if not preserve_relationship: self.__remove_old_parent_child_relationships(parent_tasks, child_tasks) last_parent_index = self.__link_task_to_parents(task, parent_tasks) # Insert the new task once, just after the last parent task self._tasks.insert(last_parent_index + 1, task) return task @observable def try_run(self, *args) -> Workflow: exit_loop = False with self.create_futures_executor() as futures_executor: while not self.is_finished() and not exit_loop: futures_list = {} ordered_tasks = self.order_tasks() for task in ordered_tasks: if task.can_run(): future = futures_executor.submit(with_contextvars(task.run)) futures_list[future] = task # Wait for all tasks to complete for future in futures.as_completed(futures_list): if isinstance(future.result(), ErrorArtifact) and self.fail_fast: exit_loop = True break return self def context(self, task: BaseTask) -> dict[str, Any]: context = super().context(task) context.update( { "task_outputs": self.task_outputs, "parent_outputs": task.parent_outputs, "parents_output_text": task.parents_output_text, "parents": {parent.id: parent for parent in task.parents}, "children": {child.id: child for child in task.children}, }, ) return context def to_graph(self) -> dict[str, set[str]]: graph: dict[str, set[str]] = {} for key_task in self.tasks: graph[key_task.id] = set() for value_task in self.tasks: if key_task.id in value_task.child_ids: graph[key_task.id].add(value_task.id) return graph def order_tasks(self) -> list[BaseTask]: return [self.find_task(task_id) for task_id in TopologicalSorter(self.to_graph()).static_order()] def __link_task_to_children(self, task: BaseTask, child_tasks: list[BaseTask]) -> None: for child_task in child_tasks: # Link the new task to the child task if child_task.id not in task.child_ids: task.child_ids.append(child_task.id) if task.id not in child_task.parent_ids: child_task.parent_ids.append(task.id) def __remove_old_parent_child_relationships( self, parent_tasks: list[BaseTask], child_tasks: list[BaseTask], ) -> None: for parent_task in parent_tasks: for child_task in child_tasks: # Remove the old parent/child relationship if child_task.id in parent_task.child_ids: parent_task.child_ids.remove(child_task.id) if parent_task.id in child_task.parent_ids: child_task.parent_ids.remove(parent_task.id) def __link_task_to_parents(self, task: BaseTask, parent_tasks: list[BaseTask]) -> int: last_parent_index = -1 for parent_task in parent_tasks: # Link the new task to the parent task if parent_task.id not in task.parent_ids: task.parent_ids.append(parent_task.id) if task.id not in parent_task.child_ids: parent_task.child_ids.append(task.id) try: parent_index = self.tasks.index(parent_task) except ValueError as exc: raise ValueError(f"Parent task {parent_task.id} not found in workflow.") from exc else: last_parent_index = max(last_parent_index, parent_index) return last_parent_index
input_task
propertyinput_tasks
propertyoutput_task
propertyoutput_tasks
propertyoutputs
property
__link_task_to_children(task, child_tasks)
Source Code in griptape/structures/workflow.py
def __link_task_to_children(self, task: BaseTask, child_tasks: list[BaseTask]) -> None: for child_task in child_tasks: # Link the new task to the child task if child_task.id not in task.child_ids: task.child_ids.append(child_task.id) if task.id not in child_task.parent_ids: child_task.parent_ids.append(task.id)
__link_task_to_parents(task, parent_tasks)
Source Code in griptape/structures/workflow.py
def __link_task_to_parents(self, task: BaseTask, parent_tasks: list[BaseTask]) -> int: last_parent_index = -1 for parent_task in parent_tasks: # Link the new task to the parent task if parent_task.id not in task.parent_ids: task.parent_ids.append(parent_task.id) if task.id not in parent_task.child_ids: parent_task.child_ids.append(task.id) try: parent_index = self.tasks.index(parent_task) except ValueError as exc: raise ValueError(f"Parent task {parent_task.id} not found in workflow.") from exc else: last_parent_index = max(last_parent_index, parent_index) return last_parent_index
__remove_old_parent_child_relationships(parent_tasks, child_tasks)
Source Code in griptape/structures/workflow.py
def __remove_old_parent_child_relationships( self, parent_tasks: list[BaseTask], child_tasks: list[BaseTask], ) -> None: for parent_task in parent_tasks: for child_task in child_tasks: # Remove the old parent/child relationship if child_task.id in parent_task.child_ids: parent_task.child_ids.remove(child_task.id) if parent_task.id in child_task.parent_ids: child_task.parent_ids.remove(parent_task.id)
add_task(task)
Source Code in griptape/structures/workflow.py
def add_task(self, task: BaseTask) -> BaseTask: if (existing_task := self.try_find_task(task.id)) is not None: return existing_task task.preprocess(self) self._tasks.append(task) return task
context(task)
Source Code in griptape/structures/workflow.py
def context(self, task: BaseTask) -> dict[str, Any]: context = super().context(task) context.update( { "task_outputs": self.task_outputs, "parent_outputs": task.parent_outputs, "parents_output_text": task.parents_output_text, "parents": {parent.id: parent for parent in task.parents}, "children": {child.id: child for child in task.children}, }, ) return context
insert_task(parent_tasks, task, child_tasks, *, preserve_relationship=False)
Source Code in griptape/structures/workflow.py
def insert_task( self, parent_tasks: list[BaseTask], task: BaseTask, child_tasks: list[BaseTask], *, preserve_relationship: bool = False, ) -> BaseTask: task.preprocess(self) self.__link_task_to_children(task, child_tasks) if not preserve_relationship: self.__remove_old_parent_child_relationships(parent_tasks, child_tasks) last_parent_index = self.__link_task_to_parents(task, parent_tasks) # Insert the new task once, just after the last parent task self._tasks.insert(last_parent_index + 1, task) return task
insert_tasks(parent_tasks, tasks, child_tasks, *, preserve_relationship=False)
Insert tasks between parent and child tasks in the workflow.
Parameters
Name | Type | Description | Default |
---|---|---|---|
parent_tasks | BaseTask | list[BaseTask] | The tasks that will be the parents of the new tasks. | required |
tasks | BaseTask | list[BaseTask] | The tasks to insert between the parent and child tasks. | required |
child_tasks | BaseTask | list[BaseTask] | The tasks that will be the children of the new tasks. | required |
preserve_relationship | bool | Whether to preserve the parent/child relationship when inserting between parent and child tasks. | False |
Source Code in griptape/structures/workflow.py
def insert_tasks( self, parent_tasks: BaseTask | list[BaseTask], tasks: BaseTask | list[BaseTask], child_tasks: BaseTask | list[BaseTask], *, preserve_relationship: bool = False, ) -> list[BaseTask]: """Insert tasks between parent and child tasks in the workflow. Args: parent_tasks: The tasks that will be the parents of the new tasks. tasks: The tasks to insert between the parent and child tasks. child_tasks: The tasks that will be the children of the new tasks. preserve_relationship: Whether to preserve the parent/child relationship when inserting between parent and child tasks. """ if not isinstance(parent_tasks, list): parent_tasks = [parent_tasks] if not isinstance(tasks, list): tasks = [tasks] if not isinstance(child_tasks, list): child_tasks = [child_tasks] for task in tasks: self.insert_task(parent_tasks, task, child_tasks, preserve_relationship=preserve_relationship) return tasks
order_tasks()
Source Code in griptape/structures/workflow.py
def order_tasks(self) -> list[BaseTask]: return [self.find_task(task_id) for task_id in TopologicalSorter(self.to_graph()).static_order()]
to_graph()
Source Code in griptape/structures/workflow.py
def to_graph(self) -> dict[str, set[str]]: graph: dict[str, set[str]] = {} for key_task in self.tasks: graph[key_task.id] = set() for value_task in self.tasks: if key_task.id in value_task.child_ids: graph[key_task.id].add(value_task.id) return graph
try_run(*args)
Source Code in griptape/structures/workflow.py
@observable def try_run(self, *args) -> Workflow: exit_loop = False with self.create_futures_executor() as futures_executor: while not self.is_finished() and not exit_loop: futures_list = {} ordered_tasks = self.order_tasks() for task in ordered_tasks: if task.can_run(): future = futures_executor.submit(with_contextvars(task.run)) futures_list[future] = task # Wait for all tasks to complete for future in futures.as_completed(futures_list): if isinstance(future.result(), ErrorArtifact) and self.fail_fast: exit_loop = True break return self
Could this page be better? Report a problem or suggest an addition!