base_task
T = TypeVar('T', bound=BaseArtifact)
module-attributelogger = logging.getLogger(Defaults.logging_config.logger_name)
module-attribute
Bases:
FuturesExecutorMixin
, SerializableMixin
, RunnableMixin['BaseTask']
, ABC, Generic[T]
Source Code in griptape/tasks/base_task.py
@define class BaseTask(FuturesExecutorMixin, SerializableMixin, RunnableMixin["BaseTask"], ABC, Generic[T]): class State(Enum): PENDING = 1 RUNNING = 2 FINISHED = 3 SKIPPED = 4 id: str = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True, metadata={"serializable": True}) state: State = field(default=State.PENDING, kw_only=True, metadata={"serializable": True}) parent_ids: list[str] = field(factory=list, kw_only=True, metadata={"serializable": True}) child_ids: list[str] = field(factory=list, kw_only=True, metadata={"serializable": True}) max_meta_memory_entries: Optional[int] = field(default=20, kw_only=True, metadata={"serializable": True}) structure: Optional[Structure] = field(default=None, kw_only=True) output: Optional[T] = field(default=None, init=False) context: dict[str, Any] = field(factory=dict, kw_only=True, metadata={"serializable": True}) _execution_args: tuple = field(factory=tuple, init=False) @property def execution_args(self) -> tuple: return self._execution_args def __rshift__(self, other: BaseTask | list[BaseTask]) -> BaseTask | list[BaseTask]: if isinstance(other, list): self.add_children(other) else: self.add_child(other) return other def __lshift__(self, other: BaseTask | list[BaseTask]) -> BaseTask | list[BaseTask]: if isinstance(other, list): self.add_parents(other) else: self.add_parent(other) return other def __attrs_post_init__(self) -> None: if self.structure is not None: self.structure.add_task(self) @property @abstractmethod def input(self) -> BaseArtifact: ... @property def parents(self) -> list[BaseTask]: if self.structure is not None: return [self.structure.find_task(parent_id) for parent_id in self.parent_ids] raise ValueError("Structure must be set to access parents") @property def children(self) -> list[BaseTask]: if self.structure is not None: return [self.structure.find_task(child_id) for child_id in self.child_ids] raise ValueError("Structure must be set to access children") @property def parent_outputs(self) -> dict[str, BaseArtifact]: return {parent.id: parent.output for parent in self.parents if parent.output} @property def parents_output_text(self) -> str: return "\n".join([parent.output.to_text() for parent in self.parents if parent.output]) @property def meta_memories(self) -> list[BaseMetaEntry]: if self.structure is not None and self.structure.meta_memory: if self.max_meta_memory_entries: return self.structure.meta_memory.entries[: self.max_meta_memory_entries] return self.structure.meta_memory.entries return [] def __str__(self) -> str: return str(self.output.value) if self.output is not None else "" def add_parents(self, parents: list[BaseTask]) -> None: for parent in parents: self.add_parent(parent) def add_parent(self, parent: BaseTask) -> BaseTask: if parent.id not in self.parent_ids: self.parent_ids.append(parent.id) if self.id not in parent.child_ids: parent.child_ids.append(self.id) if self.structure is not None and parent not in self.structure.tasks: self.structure.add_task(parent) return self def add_children(self, children: list[BaseTask]) -> None: for child in children: self.add_child(child) def add_child(self, child: BaseTask) -> BaseTask: if child.id not in self.child_ids: self.child_ids.append(child.id) if self.id not in child.parent_ids: child.parent_ids.append(self.id) if self.structure is not None and child not in self.structure.tasks: self.structure.add_task(child) return self def preprocess(self, structure: Structure) -> BaseTask: self.structure = structure return self def is_pending(self) -> bool: return self.state == BaseTask.State.PENDING def is_finished(self) -> bool: return self.state == BaseTask.State.FINISHED def is_running(self) -> bool: return self.state == BaseTask.State.RUNNING def is_skipped(self) -> bool: return self.state == BaseTask.State.SKIPPED def before_run(self) -> None: super().before_run() if self.structure is not None: EventBus.publish_event( StartTaskEvent( task_id=self.id, task_parent_ids=self.parent_ids, task_child_ids=self.child_ids, task_input=self.input, task_output=self.output, ), ) def run(self, *args) -> T: try: self._execution_args = args self.state = BaseTask.State.RUNNING self.before_run() self.output = self.try_run() self.after_run() except Exception as e: logger.exception("%s %s\n%s", self.__class__.__name__, self.id, e) self.output = cast("T", ErrorArtifact(str(e), exception=e)) finally: self.state = BaseTask.State.FINISHED return self.output def after_run(self) -> None: super().after_run() if self.structure is not None: EventBus.publish_event( FinishTaskEvent( task_id=self.id, task_parent_ids=self.parent_ids, task_child_ids=self.child_ids, task_input=self.input, task_output=self.output, ), ) def can_run(self) -> bool: # If this Task has been skipped or is not pending, it should not run if self.is_skipped() or not self.is_pending(): return False # If this Task has parents, and _all_ of them are skipped, it should not run if self.parents and all(parent.is_skipped() for parent in self.parents): self.state = BaseTask.State.SKIPPED return False # If _all_ this Task's unskipped parents are finished, it should run unskipped_parents = [parent for parent in self.parents if not parent.is_skipped()] return all(parent.is_finished() for parent in unskipped_parents) def reset(self) -> BaseTask: self.state = BaseTask.State.PENDING self.output = None self._execution_args = () return self @abstractmethod def try_run(self) -> T: ... @property def full_context(self) -> dict[str, Any]: # Need to deep copy so that the serialized context doesn't contain non-serializable data context = deepcopy(self.context) if self.structure is None: context.update({"args": self._execution_args}) else: context.update(self.structure.context(self)) return context
_execution_args = field(factory=tuple, init=False)
class-attribute instance-attributechild_ids = field(factory=list, kw_only=True, metadata={'serializable': True})
class-attribute instance-attributechildren
propertycontext = field(factory=dict, kw_only=True, metadata={'serializable': True})
class-attribute instance-attributeexecution_args
propertyfull_context
propertyid = field(default=Factory(lambda: uuid.uuid4().hex), kw_only=True, metadata={'serializable': True})
class-attribute instance-attributeinput
abstractmethod propertymax_meta_memory_entries = field(default=20, kw_only=True, metadata={'serializable': True})
class-attribute instance-attributemeta_memories
propertyoutput = field(default=None, init=False)
class-attribute instance-attributeparent_ids = field(factory=list, kw_only=True, metadata={'serializable': True})
class-attribute instance-attributeparent_outputs
propertyparents
propertyparents_output_text
propertystate = field(default=State.PENDING, kw_only=True, metadata={'serializable': True})
class-attribute instance-attributestructure = field(default=None, kw_only=True)
class-attribute instance-attribute
State
Bases:
EnumSource Code in griptape/tasks/base_task.py
class State(Enum): PENDING = 1 RUNNING = 2 FINISHED = 3 SKIPPED = 4
FINISHED = 3
class-attribute instance-attributePENDING = 1
class-attribute instance-attributeRUNNING = 2
class-attribute instance-attributeSKIPPED = 4
class-attribute instance-attribute
attrs_post_init()
Source Code in griptape/tasks/base_task.py
def __attrs_post_init__(self) -> None: if self.structure is not None: self.structure.add_task(self)
lshift(other)
Source Code in griptape/tasks/base_task.py
def __lshift__(self, other: BaseTask | list[BaseTask]) -> BaseTask | list[BaseTask]: if isinstance(other, list): self.add_parents(other) else: self.add_parent(other) return other
rshift(other)
Source Code in griptape/tasks/base_task.py
def __rshift__(self, other: BaseTask | list[BaseTask]) -> BaseTask | list[BaseTask]: if isinstance(other, list): self.add_children(other) else: self.add_child(other) return other
str()
Source Code in griptape/tasks/base_task.py
def __str__(self) -> str: return str(self.output.value) if self.output is not None else ""
add_child(child)
Source Code in griptape/tasks/base_task.py
def add_child(self, child: BaseTask) -> BaseTask: if child.id not in self.child_ids: self.child_ids.append(child.id) if self.id not in child.parent_ids: child.parent_ids.append(self.id) if self.structure is not None and child not in self.structure.tasks: self.structure.add_task(child) return self
add_children(children)
Source Code in griptape/tasks/base_task.py
def add_children(self, children: list[BaseTask]) -> None: for child in children: self.add_child(child)
add_parent(parent)
Source Code in griptape/tasks/base_task.py
def add_parent(self, parent: BaseTask) -> BaseTask: if parent.id not in self.parent_ids: self.parent_ids.append(parent.id) if self.id not in parent.child_ids: parent.child_ids.append(self.id) if self.structure is not None and parent not in self.structure.tasks: self.structure.add_task(parent) return self
add_parents(parents)
Source Code in griptape/tasks/base_task.py
def add_parents(self, parents: list[BaseTask]) -> None: for parent in parents: self.add_parent(parent)
after_run()
Source Code in griptape/tasks/base_task.py
def after_run(self) -> None: super().after_run() if self.structure is not None: EventBus.publish_event( FinishTaskEvent( task_id=self.id, task_parent_ids=self.parent_ids, task_child_ids=self.child_ids, task_input=self.input, task_output=self.output, ), )
before_run()
Source Code in griptape/tasks/base_task.py
def before_run(self) -> None: super().before_run() if self.structure is not None: EventBus.publish_event( StartTaskEvent( task_id=self.id, task_parent_ids=self.parent_ids, task_child_ids=self.child_ids, task_input=self.input, task_output=self.output, ), )
can_run()
Source Code in griptape/tasks/base_task.py
def can_run(self) -> bool: # If this Task has been skipped or is not pending, it should not run if self.is_skipped() or not self.is_pending(): return False # If this Task has parents, and _all_ of them are skipped, it should not run if self.parents and all(parent.is_skipped() for parent in self.parents): self.state = BaseTask.State.SKIPPED return False # If _all_ this Task's unskipped parents are finished, it should run unskipped_parents = [parent for parent in self.parents if not parent.is_skipped()] return all(parent.is_finished() for parent in unskipped_parents)
is_finished()
Source Code in griptape/tasks/base_task.py
def is_finished(self) -> bool: return self.state == BaseTask.State.FINISHED
is_pending()
Source Code in griptape/tasks/base_task.py
def is_pending(self) -> bool: return self.state == BaseTask.State.PENDING
is_running()
Source Code in griptape/tasks/base_task.py
def is_running(self) -> bool: return self.state == BaseTask.State.RUNNING
is_skipped()
Source Code in griptape/tasks/base_task.py
def is_skipped(self) -> bool: return self.state == BaseTask.State.SKIPPED
preprocess(structure)
Source Code in griptape/tasks/base_task.py
def preprocess(self, structure: Structure) -> BaseTask: self.structure = structure return self
reset()
Source Code in griptape/tasks/base_task.py
def reset(self) -> BaseTask: self.state = BaseTask.State.PENDING self.output = None self._execution_args = () return self
run(*args)
Source Code in griptape/tasks/base_task.py
def run(self, *args) -> T: try: self._execution_args = args self.state = BaseTask.State.RUNNING self.before_run() self.output = self.try_run() self.after_run() except Exception as e: logger.exception("%s %s\n%s", self.__class__.__name__, self.id, e) self.output = cast("T", ErrorArtifact(str(e), exception=e)) finally: self.state = BaseTask.State.FINISHED return self.output
try_run()abstractmethod
Source Code in griptape/tasks/base_task.py
@abstractmethod def try_run(self) -> T: ...
Could this page be better? Report a problem or suggest an addition!