• __all__ = ['AudioTranscriptionTool', 'BaseImageGenerationTool', 'BaseTool', 'CalculatorTool', 'ComputerTool', 'DateTimeTool', 'EmailTool', 'ExtractionTool', 'FileManagerTool', 'GriptapeCloudToolTool', 'ImageQueryTool', 'InpaintingImageGenerationTool', 'OutpaintingImageGenerationTool', 'PromptImageGenerationTool', 'PromptSummaryTool', 'QueryTool', 'RagTool', 'RestApiTool', 'SqlTool', 'StructureRunTool', 'StructuredOutputTool', 'TextToSpeechTool', 'VariationImageGenerationTool', 'VectorStoreTool', 'WebScraperTool', 'WebSearchTool'] module-attribute

Bases: BaseTool

Source Code in griptape/tools/audio_transcription/tool.py
@define
class AudioTranscriptionTool(BaseTool):
    """A tool that can be used to generate transcriptions from input audio."""

    audio_transcription_driver: BaseAudioTranscriptionDriver = field(kw_only=True)
    audio_loader: AudioLoader = field(default=Factory(lambda: AudioLoader()), kw_only=True)

    @activity(
        config={
            "description": "This tool can be used to generate transcriptions of audio files on disk.",
            "schema": Schema({Literal("path", description="The paths to an audio file on disk."): str}),
        },
    )
    def transcribe_audio_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
        audio_path = params["values"]["path"]

        audio_artifact = self.audio_loader.load(audio_path)

        return self.audio_transcription_driver.run(audio_artifact)

    @activity(
        config={
            "description": "This tool can be used to generate the transcription of an audio artifact in memory.",
            "schema": Schema({"schema": Schema({"memory_name": str, "artifact_namespace": str, "artifact_name": str})}),
        },
    )
    def transcribe_audio_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
        memory = self.find_input_memory(params["values"]["memory_name"])
        artifact_namespace = params["values"]["artifact_namespace"]
        artifact_name = params["values"]["artifact_name"]

        if memory is None:
            return ErrorArtifact("memory not found")

        audio_artifact = cast(
            "AudioArtifact",
            load_artifact_from_memory(memory, artifact_namespace, artifact_name, AudioArtifact),
        )

        return self.audio_transcription_driver.run(audio_artifact)
  • audio_loader = field(default=Factory(lambda: AudioLoader()), kw_only=True) class-attribute instance-attribute

  • audio_transcription_driver = field(kw_only=True) class-attribute instance-attribute

transcribe_audio_from_disk(params)

Source Code in griptape/tools/audio_transcription/tool.py
@activity(
    config={
        "description": "This tool can be used to generate transcriptions of audio files on disk.",
        "schema": Schema({Literal("path", description="The paths to an audio file on disk."): str}),
    },
)
def transcribe_audio_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
    audio_path = params["values"]["path"]

    audio_artifact = self.audio_loader.load(audio_path)

    return self.audio_transcription_driver.run(audio_artifact)

transcribe_audio_from_memory(params)

Source Code in griptape/tools/audio_transcription/tool.py
@activity(
    config={
        "description": "This tool can be used to generate the transcription of an audio artifact in memory.",
        "schema": Schema({"schema": Schema({"memory_name": str, "artifact_namespace": str, "artifact_name": str})}),
    },
)
def transcribe_audio_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
    memory = self.find_input_memory(params["values"]["memory_name"])
    artifact_namespace = params["values"]["artifact_namespace"]
    artifact_name = params["values"]["artifact_name"]

    if memory is None:
        return ErrorArtifact("memory not found")

    audio_artifact = cast(
        "AudioArtifact",
        load_artifact_from_memory(memory, artifact_namespace, artifact_name, AudioArtifact),
    )

    return self.audio_transcription_driver.run(audio_artifact)

BaseImageGenerationTool

Bases: ArtifactFileOutputMixin , BaseTool

Source Code in griptape/tools/base_image_generation_tool.py
@define
class BaseImageGenerationTool(ArtifactFileOutputMixin, BaseTool):
    """A base class for tools that generate images from text prompts."""

    PROMPT_DESCRIPTION = "Features and qualities to include in the generated image, descriptive and succinct."
    NEGATIVE_PROMPT_DESCRIPTION = (
        "Features and qualities to avoid in the generated image. Affirmatively describe "
        "what to avoid, for example: to avoid the color red, include 'red' "
        "rather than 'no red'."
    )
  • NEGATIVE_PROMPT_DESCRIPTION = "Features and qualities to avoid in the generated image. Affirmatively describe what to avoid, for example: to avoid the color red, include 'red' rather than 'no red'." class-attribute instance-attribute

  • PROMPT_DESCRIPTION = 'Features and qualities to include in the generated image, descriptive and succinct.' class-attribute instance-attribute

BaseTool

Bases: ActivityMixin , SerializableMixin , RunnableMixin['BaseTool'] , ABC

Attributes

NameTypeDescription
namestrTool name.
input_memoryOptional[list[TaskMemory]]TaskMemory available in tool activities. Gets automatically set if None.
output_memoryOptional[dict[str, list[TaskMemory]]]TaskMemory that activities write to be default. Gets automatically set if None.
install_dependencies_on_initboolDetermines whether dependencies from the tool requirements.txt file are installed in init.
dependencies_install_directoryOptional[str]Custom dependency install directory.
verboseboolDetermines whether tool operations (such as dependency installation) should be verbose.
off_promptboolDetermines whether tool activity output goes to the output memory.
Source Code in griptape/tools/base_tool.py
@define
class BaseTool(ActivityMixin, SerializableMixin, RunnableMixin["BaseTool"], ABC):
    """Abstract class for all tools to inherit from for.

    Attributes:
        name: Tool name.
        input_memory: TaskMemory available in tool activities. Gets automatically set if None.
        output_memory: TaskMemory that activities write to be default. Gets automatically set if None.
        install_dependencies_on_init: Determines whether dependencies from the tool requirements.txt file are installed in init.
        dependencies_install_directory: Custom dependency install directory.
        verbose: Determines whether tool operations (such as dependency installation) should be verbose.
        off_prompt: Determines whether tool activity output goes to the output memory.
    """

    REQUIREMENTS_FILE = "requirements.txt"

    name: str = field(
        default=Factory(lambda self: self.__class__.__name__, takes_self=True),
        kw_only=True,
        metadata={"serializable": True},
    )
    input_memory: Optional[list[TaskMemory]] = field(default=None, kw_only=True, metadata={"serializable": True})
    output_memory: Optional[dict[str, list[TaskMemory]]] = field(
        default=None, kw_only=True, metadata={"serializable": True}
    )
    install_dependencies_on_init: bool = field(default=True, kw_only=True, metadata={"serializable": True})
    dependencies_install_directory: Optional[str] = field(default=None, kw_only=True, metadata={"serializable": True})
    verbose: bool = field(default=False, kw_only=True, metadata={"serializable": True})
    off_prompt: bool = field(default=False, kw_only=True, metadata={"serializable": True})

    def __attrs_post_init__(self) -> None:
        if (
            self.install_dependencies_on_init
            and self.has_requirements
            and not self.are_requirements_met(self.requirements_path)
        ):
            self.install_dependencies(os.environ.copy())

    @output_memory.validator  # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
    def validate_output_memory(self, _: Attribute, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None:
        if output_memory:
            for activity_name, memory_list in output_memory.items():
                if not self.find_activity(activity_name):
                    raise ValueError(f"activity {activity_name} doesn't exist")
                if memory_list is None:
                    raise ValueError(f"memory list for activity '{activity_name}' can't be None")

                output_memory_names = [memory.name for memory in memory_list]

                if len(output_memory_names) > len(set(output_memory_names)):
                    raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")

    @property
    def requirements_path(self) -> str:
        return os.path.join(self.abs_dir_path, self.REQUIREMENTS_FILE)

    @property
    def abs_file_path(self) -> str:
        return os.path.abspath(inspect.getfile(self.__class__))

    @property
    def abs_dir_path(self) -> str:
        return os.path.dirname(self.abs_file_path)

    @property
    def has_requirements(self) -> bool:
        return os.path.exists(self.requirements_path)

    # This method has to remain a method and can't be decorated with @property because
    # of the max depth recursion issue in `self.activities`.
    def schema(self) -> dict:
        full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.")

        return full_schema.json_schema(f"{self.name} ToolAction Schema")

    def activity_schemas(self) -> list[Schema]:
        schemas = []

        for activity in self.activities():
            schema_dict: dict[Literal | schema.Optional, Any] = {
                Literal("name"): self.name,
                Literal("path", description=self.activity_description(activity)): self.activity_name(activity),
            }

            activity_schema = self.activity_schema(activity)
            # If no schema is defined, we just make `input` optional instead of omitting it.
            # This works better with lower-end models that may accidentally pass in an empty dict.
            if activity_schema is None:
                schema_dict[schema.Optional("input")] = {}
            else:
                schema_dict[Literal("input")] = activity_schema.schema

            schemas.append(Schema(schema_dict))

        return schemas

    def run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> BaseArtifact:
        try:
            output = self.before_run(activity, subtask, action)

            output = self.try_run(activity, subtask, action, output)

            output = self.after_run(activity, subtask, action, output)
        except Exception as e:
            logging.debug(traceback.format_exc())
            output = ErrorArtifact(str(e), exception=e)

        return output

    def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> Optional[dict]:
        super().before_run()

        return action.input

    @observable(tags=["Tool.run()"])
    def try_run(
        self,
        activity: Callable,
        subtask: ActionsSubtask,
        action: ToolAction,
        value: Optional[dict],
    ) -> BaseArtifact:
        activity_result = activity(deepcopy(value))

        if isinstance(activity_result, BaseArtifact):
            result = activity_result
        else:
            logging.warning("Activity result is not an artifact; converting result to InfoArtifact")

            if activity_result is None:
                result = InfoArtifact("Tool returned an empty value")
            else:
                result = InfoArtifact(activity_result)

        return result

    def after_run(
        self,
        activity: Callable,
        subtask: ActionsSubtask,
        action: ToolAction,
        value: BaseArtifact,
    ) -> BaseArtifact:
        super().after_run()

        if self.output_memory:
            output_memories = self.output_memory[getattr(activity, "name")] or []
            for memory in output_memories:
                value = memory.process_output(activity, subtask, value)

            return value
        return value

    def validate(self) -> bool:
        if not os.path.exists(self.requirements_path):
            raise Exception(f"{self.REQUIREMENTS_FILE} not found")
        return True

    def tool_dir(self) -> str:
        class_file = inspect.getfile(self.__class__)

        return os.path.dirname(os.path.abspath(class_file))

    def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
        env = env or {}

        command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]

        if self.dependencies_install_directory is None:
            command.extend(["-U"])
        else:
            command.extend(["-t", self.dependencies_install_directory])

        subprocess.run(
            command,
            env=env,
            cwd=self.tool_dir(),
            stdout=None if self.verbose else subprocess.DEVNULL,
            stderr=None if self.verbose else subprocess.DEVNULL,
            check=False,
        )

    def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]:
        if self.input_memory:
            return next((m for m in self.input_memory if m.name == memory_name), None)
        return None

    def to_native_tool_name(self, activity: Callable) -> str:
        """Converts a Tool's name and an Activity into to a native tool name.

        The native tool name is a combination of the Tool's name and the Activity's name.
        The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores.

        Args:
            activity: Activity to convert

        Returns:
            str: Native tool name.
        """
        tool_name = self.name
        if re.match(r"^[a-zA-Z0-9]+$", tool_name) is None:
            raise ValueError("Tool name can only contain letters and numbers.")

        activity_name = self.activity_name(activity)
        if re.match(r"^[a-zA-Z0-9_]+$", activity_name) is None:
            raise ValueError("Activity name can only contain letters, numbers, and underscores.")

        return f"{tool_name}_{activity_name}"

    def are_requirements_met(self, requirements_path: str) -> bool:
        requirements = Path(requirements_path).read_text().splitlines()

        try:
            for requirement in requirements:
                importlib.metadata.version(requirement)
            return True
        except importlib.metadata.PackageNotFoundError:
            return False
  • REQUIREMENTS_FILE = 'requirements.txt' class-attribute instance-attribute

  • abs_dir_path property

  • abs_file_path property

  • dependencies_install_directory = field(default=None, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

  • has_requirements property

  • input_memory = field(default=None, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

  • install_dependencies_on_init = field(default=True, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

  • name = field(default=Factory(lambda self: self.__class__.__name__, takes_self=True), kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

  • off_prompt = field(default=False, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

  • output_memory = field(default=None, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

  • requirements_path property

  • verbose = field(default=False, kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

attrs_post_init()

Source Code in griptape/tools/base_tool.py
def __attrs_post_init__(self) -> None:
    if (
        self.install_dependencies_on_init
        and self.has_requirements
        and not self.are_requirements_met(self.requirements_path)
    ):
        self.install_dependencies(os.environ.copy())

activity_schemas()

Source Code in griptape/tools/base_tool.py
def activity_schemas(self) -> list[Schema]:
    schemas = []

    for activity in self.activities():
        schema_dict: dict[Literal | schema.Optional, Any] = {
            Literal("name"): self.name,
            Literal("path", description=self.activity_description(activity)): self.activity_name(activity),
        }

        activity_schema = self.activity_schema(activity)
        # If no schema is defined, we just make `input` optional instead of omitting it.
        # This works better with lower-end models that may accidentally pass in an empty dict.
        if activity_schema is None:
            schema_dict[schema.Optional("input")] = {}
        else:
            schema_dict[Literal("input")] = activity_schema.schema

        schemas.append(Schema(schema_dict))

    return schemas

after_run(activity, subtask, action, value)

Source Code in griptape/tools/base_tool.py
def after_run(
    self,
    activity: Callable,
    subtask: ActionsSubtask,
    action: ToolAction,
    value: BaseArtifact,
) -> BaseArtifact:
    super().after_run()

    if self.output_memory:
        output_memories = self.output_memory[getattr(activity, "name")] or []
        for memory in output_memories:
            value = memory.process_output(activity, subtask, value)

        return value
    return value

are_requirements_met(requirements_path)

Source Code in griptape/tools/base_tool.py
def are_requirements_met(self, requirements_path: str) -> bool:
    requirements = Path(requirements_path).read_text().splitlines()

    try:
        for requirement in requirements:
            importlib.metadata.version(requirement)
        return True
    except importlib.metadata.PackageNotFoundError:
        return False

before_run(activity, subtask, action)

Source Code in griptape/tools/base_tool.py
def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> Optional[dict]:
    super().before_run()

    return action.input

find_input_memory(memory_name)

Source Code in griptape/tools/base_tool.py
def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]:
    if self.input_memory:
        return next((m for m in self.input_memory if m.name == memory_name), None)
    return None

install_dependencies(env=None)

Source Code in griptape/tools/base_tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
    env = env or {}

    command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]

    if self.dependencies_install_directory is None:
        command.extend(["-U"])
    else:
        command.extend(["-t", self.dependencies_install_directory])

    subprocess.run(
        command,
        env=env,
        cwd=self.tool_dir(),
        stdout=None if self.verbose else subprocess.DEVNULL,
        stderr=None if self.verbose else subprocess.DEVNULL,
        check=False,
    )

run(activity, subtask, action)

Source Code in griptape/tools/base_tool.py
def run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> BaseArtifact:
    try:
        output = self.before_run(activity, subtask, action)

        output = self.try_run(activity, subtask, action, output)

        output = self.after_run(activity, subtask, action, output)
    except Exception as e:
        logging.debug(traceback.format_exc())
        output = ErrorArtifact(str(e), exception=e)

    return output

schema()

Source Code in griptape/tools/base_tool.py
def schema(self) -> dict:
    full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.")

    return full_schema.json_schema(f"{self.name} ToolAction Schema")

to_native_tool_name(activity)

Converts a Tool's name and an Activity into to a native tool name.

The native tool name is a combination of the Tool's name and the Activity's name. The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores.

Parameters

NameTypeDescriptionDefault
activityCallableActivity to convert
required

Returns

NameTypeDescription
strstrNative tool name.
Source Code in griptape/tools/base_tool.py
def to_native_tool_name(self, activity: Callable) -> str:
    """Converts a Tool's name and an Activity into to a native tool name.

    The native tool name is a combination of the Tool's name and the Activity's name.
    The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores.

    Args:
        activity: Activity to convert

    Returns:
        str: Native tool name.
    """
    tool_name = self.name
    if re.match(r"^[a-zA-Z0-9]+$", tool_name) is None:
        raise ValueError("Tool name can only contain letters and numbers.")

    activity_name = self.activity_name(activity)
    if re.match(r"^[a-zA-Z0-9_]+$", activity_name) is None:
        raise ValueError("Activity name can only contain letters, numbers, and underscores.")

    return f"{tool_name}_{activity_name}"

tool_dir()

Source Code in griptape/tools/base_tool.py
def tool_dir(self) -> str:
    class_file = inspect.getfile(self.__class__)

    return os.path.dirname(os.path.abspath(class_file))

try_run(activity, subtask, action, value)

Source Code in griptape/tools/base_tool.py
@observable(tags=["Tool.run()"])
def try_run(
    self,
    activity: Callable,
    subtask: ActionsSubtask,
    action: ToolAction,
    value: Optional[dict],
) -> BaseArtifact:
    activity_result = activity(deepcopy(value))

    if isinstance(activity_result, BaseArtifact):
        result = activity_result
    else:
        logging.warning("Activity result is not an artifact; converting result to InfoArtifact")

        if activity_result is None:
            result = InfoArtifact("Tool returned an empty value")
        else:
            result = InfoArtifact(activity_result)

    return result

validate()

Source Code in griptape/tools/base_tool.py
def validate(self) -> bool:
    if not os.path.exists(self.requirements_path):
        raise Exception(f"{self.REQUIREMENTS_FILE} not found")
    return True

validateoutput_memory(, output_memory)

Source Code in griptape/tools/base_tool.py
@output_memory.validator  # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
def validate_output_memory(self, _: Attribute, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None:
    if output_memory:
        for activity_name, memory_list in output_memory.items():
            if not self.find_activity(activity_name):
                raise ValueError(f"activity {activity_name} doesn't exist")
            if memory_list is None:
                raise ValueError(f"memory list for activity '{activity_name}' can't be None")

            output_memory_names = [memory.name for memory in memory_list]

            if len(output_memory_names) > len(set(output_memory_names)):
                raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")

CalculatorTool

Bases: BaseTool

Source Code in griptape/tools/calculator/tool.py
class CalculatorTool(BaseTool):
    @activity(
        config={
            "description": "Can be used for computing simple numerical or algebraic calculations in Python",
            "schema": Schema(
                {
                    Literal(
                        "expression",
                        description="Arithmetic expression parsable in pure Python. Single line only. "
                        "Don't use variables. Don't use any imports or external libraries",
                    ): str,
                },
            ),
        },
    )
    def calculate(self, params: dict) -> BaseArtifact:
        import numexpr  # pyright: ignore[reportMissingImports]

        try:
            expression = params["values"]["expression"]

            return TextArtifact(numexpr.evaluate(expression))
        except Exception as e:
            return ErrorArtifact(f"error calculating: {e}")

calculate(params)

Source Code in griptape/tools/calculator/tool.py
@activity(
    config={
        "description": "Can be used for computing simple numerical or algebraic calculations in Python",
        "schema": Schema(
            {
                Literal(
                    "expression",
                    description="Arithmetic expression parsable in pure Python. Single line only. "
                    "Don't use variables. Don't use any imports or external libraries",
                ): str,
            },
        ),
    },
)
def calculate(self, params: dict) -> BaseArtifact:
    import numexpr  # pyright: ignore[reportMissingImports]

    try:
        expression = params["values"]["expression"]

        return TextArtifact(numexpr.evaluate(expression))
    except Exception as e:
        return ErrorArtifact(f"error calculating: {e}")

ComputerTool

Bases: BaseTool

Source Code in griptape/tools/computer/tool.py
@define
class ComputerTool(BaseTool):
    local_workdir: Optional[str] = field(default=None, kw_only=True)
    container_workdir: str = field(default="/griptape", kw_only=True)
    env_vars: dict = field(factory=dict, kw_only=True)
    dockerfile_path: str = field(
        default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/Dockerfile')}", takes_self=True),
        kw_only=True,
    )
    requirements_txt_path: str = field(
        default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/requirements.txt')}", takes_self=True),
        kw_only=True,
    )
    docker_client: DockerClient = field(
        default=Factory(lambda self: self.default_docker_client(), takes_self=True),
        kw_only=True,
    )

    _tempdir: Optional[tempfile.TemporaryDirectory] = field(default=None, kw_only=True)

    def __attrs_post_init__(self) -> None:
        super().__attrs_post_init__()

        if self.local_workdir:
            Path(self.local_workdir).mkdir(parents=True, exist_ok=True)
        else:
            self._tempdir = tempfile.TemporaryDirectory()
            self.local_workdir = self._tempdir.name

    @docker_client.validator  # pyright: ignore[reportAttributeAccessIssue]
    def validate_docker_client(self, _: Attribute, docker_client: DockerClient) -> None:
        if not docker_client:
            raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")

    def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
        super().install_dependencies(env)

        self.remove_existing_container(self.container_name(self))
        self.build_image(self)

    @activity(
        config={
            "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze"
            " files in the file system. If you need to use code output use `print` statements. "
            "You have access to the following external Python libraries: "
            "{{ _self.dependencies() }}",
            "schema": Schema(
                {
                    Literal("code", description="Python code to execute"): str,
                    Literal(
                        "filename",
                        description="name of the file to put the Python code in before executing it",
                    ): str,
                },
            ),
        },
    )
    def execute_code(self, params: dict) -> BaseArtifact:
        code = params["values"]["code"]
        filename = params["values"]["filename"]

        return self.execute_code_in_container(filename, code)

    @activity(
        config={
            "description": "Can be used to execute shell commands in Linux",
            "schema": Schema({Literal("command", description="shell command to execute"): str}),
        },
    )
    def execute_command(self, params: dict) -> BaseArtifact:
        command = params["values"]["command"]

        return self.execute_command_in_container(command)

    def execute_command_in_container(self, command: str) -> BaseArtifact:
        from docker.models.containers import Container

        try:
            binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}} if self.local_workdir else None

            container = self.docker_client.containers.run(  # pyright: ignore[reportCallIssue]
                self.image_name(self),
                environment=self.env_vars,
                command=command,
                name=self.container_name(self),
                volumes=binds,  # pyright: ignore[reportArgumentType] According to the [docs](https://docker-py.readthedocs.io/en/stable/containers.html), the type of `volumes` is dict[str, dict[str, str]].
                stdout=True,
                stderr=True,
                detach=True,
            )

            if isinstance(container, Container):
                container.wait()

                stderr = container.logs(stdout=False, stderr=True).decode().strip()
                stdout = container.logs(stdout=True, stderr=False).decode().strip()

                container.stop()
                container.remove()

                if stderr:
                    return ErrorArtifact(stderr)
                return TextArtifact(stdout)
            return ErrorArtifact("error running container")
        except Exception as e:
            return ErrorArtifact(f"error executing command: {e}")

    def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact:
        container_file_path = os.path.join(self.container_workdir, filename)

        if self.local_workdir:
            tempdir = None
            local_workdir = self.local_workdir
        else:
            tempdir = tempfile.TemporaryDirectory()
            local_workdir = tempdir.name

        local_file_path = os.path.join(local_workdir, filename)

        try:
            Path(local_file_path).write_text(code)

            return self.execute_command_in_container(f"python {container_file_path}")
        except Exception as e:
            return ErrorArtifact(f"error executing code: {e}")
        finally:
            if tempdir:
                tempdir.cleanup()

    def default_docker_client(self) -> Optional[DockerClient]:
        import docker

        try:
            return docker.from_env()
        except Exception as e:
            logging.exception(e)

            return None

    def image_name(self, tool: BaseTool) -> str:
        import stringcase  # pyright: ignore[reportMissingImports]

        return f"{stringcase.snakecase(tool.name)}_image"

    def container_name(self, tool: BaseTool) -> str:
        import stringcase  # pyright: ignore[reportMissingImports]

        return f"{stringcase.snakecase(tool.name)}_container"

    def remove_existing_container(self, name: str) -> None:
        from docker.errors import NotFound
        from docker.models.containers import Container

        try:
            existing_container = self.docker_client.containers.get(name)
            if isinstance(existing_container, Container):
                existing_container.remove(force=True)

                logging.info("Removed existing container %s", name)
        except NotFound:
            pass

    def build_image(self, tool: BaseTool) -> None:
        with tempfile.TemporaryDirectory() as temp_dir:
            shutil.copy(self.dockerfile_path, temp_dir)
            shutil.copy(self.requirements_txt_path, temp_dir)

            image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True)

            if isinstance(image, tuple):
                logging.info("Built image: %s", image[0].short_id)

    def dependencies(self) -> list[str]:
        with open(self.requirements_txt_path) as file:
            return [line.strip() for line in file]

    def __del__(self) -> None:
        if self._tempdir:
            self._tempdir.cleanup()
  • _tempdir = field(default=None, kw_only=True) class-attribute instance-attribute

  • container_workdir = field(default='/griptape', kw_only=True) class-attribute instance-attribute

  • docker_client = field(default=Factory(lambda self: self.default_docker_client(), takes_self=True), kw_only=True) class-attribute instance-attribute

  • dockerfile_path = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/Dockerfile')}', takes_self=True), kw_only=True) class-attribute instance-attribute

  • env_vars = field(factory=dict, kw_only=True) class-attribute instance-attribute

  • local_workdir = field(default=None, kw_only=True) class-attribute instance-attribute

  • requirements_txt_path = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/requirements.txt')}', takes_self=True), kw_only=True) class-attribute instance-attribute

attrs_post_init()

Source Code in griptape/tools/computer/tool.py
def __attrs_post_init__(self) -> None:
    super().__attrs_post_init__()

    if self.local_workdir:
        Path(self.local_workdir).mkdir(parents=True, exist_ok=True)
    else:
        self._tempdir = tempfile.TemporaryDirectory()
        self.local_workdir = self._tempdir.name

del()

Source Code in griptape/tools/computer/tool.py
def __del__(self) -> None:
    if self._tempdir:
        self._tempdir.cleanup()

build_image(tool)

Source Code in griptape/tools/computer/tool.py
def build_image(self, tool: BaseTool) -> None:
    with tempfile.TemporaryDirectory() as temp_dir:
        shutil.copy(self.dockerfile_path, temp_dir)
        shutil.copy(self.requirements_txt_path, temp_dir)

        image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True)

        if isinstance(image, tuple):
            logging.info("Built image: %s", image[0].short_id)

container_name(tool)

Source Code in griptape/tools/computer/tool.py
def container_name(self, tool: BaseTool) -> str:
    import stringcase  # pyright: ignore[reportMissingImports]

    return f"{stringcase.snakecase(tool.name)}_container"

default_docker_client()

Source Code in griptape/tools/computer/tool.py
def default_docker_client(self) -> Optional[DockerClient]:
    import docker

    try:
        return docker.from_env()
    except Exception as e:
        logging.exception(e)

        return None

dependencies()

Source Code in griptape/tools/computer/tool.py
def dependencies(self) -> list[str]:
    with open(self.requirements_txt_path) as file:
        return [line.strip() for line in file]

execute_code(params)

Source Code in griptape/tools/computer/tool.py
@activity(
    config={
        "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze"
        " files in the file system. If you need to use code output use `print` statements. "
        "You have access to the following external Python libraries: "
        "{{ _self.dependencies() }}",
        "schema": Schema(
            {
                Literal("code", description="Python code to execute"): str,
                Literal(
                    "filename",
                    description="name of the file to put the Python code in before executing it",
                ): str,
            },
        ),
    },
)
def execute_code(self, params: dict) -> BaseArtifact:
    code = params["values"]["code"]
    filename = params["values"]["filename"]

    return self.execute_code_in_container(filename, code)

execute_code_in_container(filename, code)

Source Code in griptape/tools/computer/tool.py
def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact:
    container_file_path = os.path.join(self.container_workdir, filename)

    if self.local_workdir:
        tempdir = None
        local_workdir = self.local_workdir
    else:
        tempdir = tempfile.TemporaryDirectory()
        local_workdir = tempdir.name

    local_file_path = os.path.join(local_workdir, filename)

    try:
        Path(local_file_path).write_text(code)

        return self.execute_command_in_container(f"python {container_file_path}")
    except Exception as e:
        return ErrorArtifact(f"error executing code: {e}")
    finally:
        if tempdir:
            tempdir.cleanup()

execute_command(params)

Source Code in griptape/tools/computer/tool.py
@activity(
    config={
        "description": "Can be used to execute shell commands in Linux",
        "schema": Schema({Literal("command", description="shell command to execute"): str}),
    },
)
def execute_command(self, params: dict) -> BaseArtifact:
    command = params["values"]["command"]

    return self.execute_command_in_container(command)

execute_command_in_container(command)

Source Code in griptape/tools/computer/tool.py
def execute_command_in_container(self, command: str) -> BaseArtifact:
    from docker.models.containers import Container

    try:
        binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}} if self.local_workdir else None

        container = self.docker_client.containers.run(  # pyright: ignore[reportCallIssue]
            self.image_name(self),
            environment=self.env_vars,
            command=command,
            name=self.container_name(self),
            volumes=binds,  # pyright: ignore[reportArgumentType] According to the [docs](https://docker-py.readthedocs.io/en/stable/containers.html), the type of `volumes` is dict[str, dict[str, str]].
            stdout=True,
            stderr=True,
            detach=True,
        )

        if isinstance(container, Container):
            container.wait()

            stderr = container.logs(stdout=False, stderr=True).decode().strip()
            stdout = container.logs(stdout=True, stderr=False).decode().strip()

            container.stop()
            container.remove()

            if stderr:
                return ErrorArtifact(stderr)
            return TextArtifact(stdout)
        return ErrorArtifact("error running container")
    except Exception as e:
        return ErrorArtifact(f"error executing command: {e}")

image_name(tool)

Source Code in griptape/tools/computer/tool.py
def image_name(self, tool: BaseTool) -> str:
    import stringcase  # pyright: ignore[reportMissingImports]

    return f"{stringcase.snakecase(tool.name)}_image"

install_dependencies(env=None)

Source Code in griptape/tools/computer/tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None:
    super().install_dependencies(env)

    self.remove_existing_container(self.container_name(self))
    self.build_image(self)

remove_existing_container(name)

Source Code in griptape/tools/computer/tool.py
def remove_existing_container(self, name: str) -> None:
    from docker.errors import NotFound
    from docker.models.containers import Container

    try:
        existing_container = self.docker_client.containers.get(name)
        if isinstance(existing_container, Container):
            existing_container.remove(force=True)

            logging.info("Removed existing container %s", name)
    except NotFound:
        pass

validatedocker_client(, docker_client)

Source Code in griptape/tools/computer/tool.py
@docker_client.validator  # pyright: ignore[reportAttributeAccessIssue]
def validate_docker_client(self, _: Attribute, docker_client: DockerClient) -> None:
    if not docker_client:
        raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")

DateTimeTool

Bases: BaseTool

Source Code in griptape/tools/date_time/tool.py
@define
class DateTimeTool(BaseTool):
    denylist: list[str] = field(default=Factory(lambda: ["get_relative_datetime"]), kw_only=True)

    @activity(config={"description": "Can be used to return current date and time."})
    def get_current_datetime(self) -> BaseArtifact:
        return TextArtifact(str(datetime.now()))

    @activity(
        config={
            "description": "Can be used to return a relative date and time.",
            "schema": schema.Schema(
                {
                    schema.Literal(
                        "relative_date_string",
                        description='Relative date in English compatible with the dateparser library. For example, "now EST", "20 minutes ago", '
                        '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"',
                    ): str,
                },
            ),
        },
    )
    def get_relative_datetime(self, params: dict) -> BaseArtifact:
        from dateparser import parse

        try:
            date_string = params["values"]["relative_date_string"]
            relative_datetime = parse(date_string)

            if relative_datetime:
                return TextArtifact(str(relative_datetime))
            return ErrorArtifact("invalid date string")
        except Exception as e:
            return ErrorArtifact(f"error getting current datetime: {e}")

    @activity(
        config={
            "description": "Can be used to add a timedelta to a datetime.",
            "schema": schema.Schema(
                {
                    schema.Literal(
                        "timedelta_kwargs",
                        description='A dictionary of keyword arguments to pass to the timedelta function. For example, {"days": -1, "hours": 2}',
                    ): dict,
                    schema.Optional(
                        schema.Literal(
                            "iso_datetime",
                            description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00". Defaults to the current datetime if not provided.',
                        )
                    ): str,
                },
            ),
        },
    )
    def add_timedelta(self, timedelta_kwargs: dict, iso_datetime: Optional[str] = None) -> BaseArtifact:
        if iso_datetime is None:
            iso_datetime = datetime.now().isoformat()
        return TextArtifact((datetime.fromisoformat(iso_datetime) + timedelta(**timedelta_kwargs)).isoformat())

    @activity(
        config={
            "description": "Can be used to calculate the difference between two datetimes. The difference is calculated as end_datetime - start_datetime.",
            "schema": schema.Schema(
                {
                    schema.Literal(
                        "start_datetime",
                        description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00"',
                    ): str,
                    schema.Literal(
                        "end_datetime",
                        description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-02T00:00:00"',
                    ): str,
                }
            ),
        }
    )
    def get_datetime_diff(self, start_datetime: str, end_datetime: str) -> BaseArtifact:
        return TextArtifact(str(datetime.fromisoformat(end_datetime) - datetime.fromisoformat(start_datetime)))
  • denylist = field(default=Factory(lambda: ['get_relative_datetime']), kw_only=True) class-attribute instance-attribute

add_timedelta(timedelta_kwargs, iso_datetime=None)

Source Code in griptape/tools/date_time/tool.py
@activity(
    config={
        "description": "Can be used to add a timedelta to a datetime.",
        "schema": schema.Schema(
            {
                schema.Literal(
                    "timedelta_kwargs",
                    description='A dictionary of keyword arguments to pass to the timedelta function. For example, {"days": -1, "hours": 2}',
                ): dict,
                schema.Optional(
                    schema.Literal(
                        "iso_datetime",
                        description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00". Defaults to the current datetime if not provided.',
                    )
                ): str,
            },
        ),
    },
)
def add_timedelta(self, timedelta_kwargs: dict, iso_datetime: Optional[str] = None) -> BaseArtifact:
    if iso_datetime is None:
        iso_datetime = datetime.now().isoformat()
    return TextArtifact((datetime.fromisoformat(iso_datetime) + timedelta(**timedelta_kwargs)).isoformat())

get_current_datetime()

Source Code in griptape/tools/date_time/tool.py
@activity(config={"description": "Can be used to return current date and time."})
def get_current_datetime(self) -> BaseArtifact:
    return TextArtifact(str(datetime.now()))

get_datetime_diff(start_datetime, end_datetime)

Source Code in griptape/tools/date_time/tool.py
@activity(
    config={
        "description": "Can be used to calculate the difference between two datetimes. The difference is calculated as end_datetime - start_datetime.",
        "schema": schema.Schema(
            {
                schema.Literal(
                    "start_datetime",
                    description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00"',
                ): str,
                schema.Literal(
                    "end_datetime",
                    description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-02T00:00:00"',
                ): str,
            }
        ),
    }
)
def get_datetime_diff(self, start_datetime: str, end_datetime: str) -> BaseArtifact:
    return TextArtifact(str(datetime.fromisoformat(end_datetime) - datetime.fromisoformat(start_datetime)))

get_relative_datetime(params)

Source Code in griptape/tools/date_time/tool.py
@activity(
    config={
        "description": "Can be used to return a relative date and time.",
        "schema": schema.Schema(
            {
                schema.Literal(
                    "relative_date_string",
                    description='Relative date in English compatible with the dateparser library. For example, "now EST", "20 minutes ago", '
                    '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"',
                ): str,
            },
        ),
    },
)
def get_relative_datetime(self, params: dict) -> BaseArtifact:
    from dateparser import parse

    try:
        date_string = params["values"]["relative_date_string"]
        relative_datetime = parse(date_string)

        if relative_datetime:
            return TextArtifact(str(relative_datetime))
        return ErrorArtifact("invalid date string")
    except Exception as e:
        return ErrorArtifact(f"error getting current datetime: {e}")

EmailTool

Bases: BaseTool

Attributes

NameTypeDescription
usernameOptional[str]Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol. Example: bender@futurama.com.
passwordOptional[str]Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail, this would be an App Password.
email_max_retrieve_countOptional[int]Used to limit the number of messages retrieved during any given activities.
smtp_hostOptional[str]Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the send activity.
smtp_portOptional[int]Port of the SMTP server. Example: 465. Required when using the send activity.
smtp_use_sslboolWhether to use SSL when sending email via the SMTP protocol.
smtp_userOptional[str]Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the send activity.
smtp_passwordOptional[str]Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the send activity.
imap_urlOptional[str]Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the retrieve activity.
imap_userOptional[str]Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the retrieve activity.
imap_passwordOptional[str]Password to retrieve email via the IMAP protocol. Overrides password for IMAP only. Required when using the retrieve activity.
mailboxesOptional[dict[str, Optional[str]]]Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the retrieve activity. Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'}
email_loaderEmailLoaderInstance of EmailLoader.
Source Code in griptape/tools/email/tool.py
@define
class EmailTool(BaseTool):
    """Tool for working with email.

    Attributes:
        username: Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol.
            Example: bender@futurama.com.
        password: Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail,
            this would be an App Password.
        email_max_retrieve_count: Used to limit the number of messages retrieved during any given activities.
        smtp_host: Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the `send` activity.
        smtp_port: Port of the SMTP server. Example: 465. Required when using the `send` activity.
        smtp_use_ssl: Whether to use SSL when sending email via the SMTP protocol.
        smtp_user: Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the `send` activity.
        smtp_password: Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the `send` activity.
        imap_url: Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the `retrieve` activity.
        imap_user: Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the `retrieve` activity.
        imap_password: Password to retrieve email via the IMAP protocol.  Overrides password for IMAP only. Required when using the `retrieve` activity.
        mailboxes: Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the `retrieve` activity.
            Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'}
        email_loader: Instance of `EmailLoader`.
    """

    username: Optional[str] = field(default=None, kw_only=True)
    password: Optional[str] = field(default=None, kw_only=True)
    email_max_retrieve_count: Optional[int] = field(default=None, kw_only=True)
    smtp_host: Optional[str] = field(default=None, kw_only=True)
    smtp_port: Optional[int] = field(default=None, kw_only=True)
    smtp_use_ssl: bool = field(default=True, kw_only=True)
    smtp_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
    smtp_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
    imap_url: Optional[str] = field(default=None, kw_only=True)
    imap_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
    imap_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
    mailboxes: Optional[dict[str, Optional[str]]] = field(default=None, kw_only=True)
    email_loader: EmailLoader = field(
        default=Factory(
            lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password),
            takes_self=True,
        ),
        kw_only=True,
    )

    @activity(
        config={
            "description": "Can be used to retrieve emails."
            "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}",
            "schema": Schema(
                {
                    Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str,
                    schema.Optional(
                        Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'"),
                    ): str,
                    schema.Optional(
                        Literal("search_criteria", description="Optional search criteria to filter emails by key"),
                    ): str,
                    schema.Optional(Literal("max_count", description="Optional max email count")): int,
                },
            ),
        },
    )
    def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact:
        if self.mailboxes is None:
            return ErrorArtifact("mailboxes is required")

        values = params["values"]
        max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count

        return self.email_loader.load(
            EmailLoader.EmailQuery(
                label=values["label"],
                key=values.get("key"),
                search_criteria=values.get("search_criteria"),
                max_count=max_count,
            ),
        )

    @activity(
        config={
            "description": "Can be used to send emails",
            "schema": Schema(
                {
                    Literal("to", description="Recipient's email address"): str,
                    Literal("subject", description="Email subject"): str,
                    Literal("body", description="Email body"): str,
                },
            ),
        },
    )
    def send(self, params: dict) -> InfoArtifact | ErrorArtifact:
        values = params["values"]
        if self.smtp_user is None:
            return ErrorArtifact("smtp_user is required")
        if self.smtp_password is None:
            return ErrorArtifact("smtp_password is required")
        if self.smtp_host is None:
            return ErrorArtifact("smtp_host is required")
        if self.smtp_port is None:
            return ErrorArtifact("smtp_port is required")

        msg = MIMEText(values["body"])
        msg["Subject"] = values["subject"]
        msg["From"] = self.smtp_user
        msg["To"] = values["to"]

        try:
            with self._create_smtp_client(self.smtp_host, self.smtp_port) as client:
                client.login(self.smtp_user, self.smtp_password)
                client.sendmail(msg["From"], [msg["To"]], msg.as_string())
                return InfoArtifact("email was successfully sent")
        except Exception as e:
            logging.exception(e)
            return ErrorArtifact(f"error sending email: {e}")

    def _create_smtp_client(self, smtp_host: str, smtp_port: int) -> smtplib.SMTP | smtplib.SMTP_SSL:
        if self.smtp_use_ssl:
            return smtplib.SMTP_SSL(smtp_host, smtp_port)
        return smtplib.SMTP(smtp_host, smtp_port)
  • email_loader = field(default=Factory(lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password), takes_self=True), kw_only=True) class-attribute instance-attribute

  • email_max_retrieve_count = field(default=None, kw_only=True) class-attribute instance-attribute

  • imap_password = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) class-attribute instance-attribute

  • imap_url = field(default=None, kw_only=True) class-attribute instance-attribute

  • imap_user = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) class-attribute instance-attribute

  • mailboxes = field(default=None, kw_only=True) class-attribute instance-attribute

  • password = field(default=None, kw_only=True) class-attribute instance-attribute

  • smtp_host = field(default=None, kw_only=True) class-attribute instance-attribute

  • smtp_password = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) class-attribute instance-attribute

  • smtp_port = field(default=None, kw_only=True) class-attribute instance-attribute

  • smtp_use_ssl = field(default=True, kw_only=True) class-attribute instance-attribute

  • smtp_user = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) class-attribute instance-attribute

  • username = field(default=None, kw_only=True) class-attribute instance-attribute

_create_smtp_client(smtp_host, smtp_port)

Source Code in griptape/tools/email/tool.py
def _create_smtp_client(self, smtp_host: str, smtp_port: int) -> smtplib.SMTP | smtplib.SMTP_SSL:
    if self.smtp_use_ssl:
        return smtplib.SMTP_SSL(smtp_host, smtp_port)
    return smtplib.SMTP(smtp_host, smtp_port)

retrieve(params)

Source Code in griptape/tools/email/tool.py
@activity(
    config={
        "description": "Can be used to retrieve emails."
        "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}",
        "schema": Schema(
            {
                Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str,
                schema.Optional(
                    Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'"),
                ): str,
                schema.Optional(
                    Literal("search_criteria", description="Optional search criteria to filter emails by key"),
                ): str,
                schema.Optional(Literal("max_count", description="Optional max email count")): int,
            },
        ),
    },
)
def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact:
    if self.mailboxes is None:
        return ErrorArtifact("mailboxes is required")

    values = params["values"]
    max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count

    return self.email_loader.load(
        EmailLoader.EmailQuery(
            label=values["label"],
            key=values.get("key"),
            search_criteria=values.get("search_criteria"),
            max_count=max_count,
        ),
    )

send(params)

Source Code in griptape/tools/email/tool.py
@activity(
    config={
        "description": "Can be used to send emails",
        "schema": Schema(
            {
                Literal("to", description="Recipient's email address"): str,
                Literal("subject", description="Email subject"): str,
                Literal("body", description="Email body"): str,
            },
        ),
    },
)
def send(self, params: dict) -> InfoArtifact | ErrorArtifact:
    values = params["values"]
    if self.smtp_user is None:
        return ErrorArtifact("smtp_user is required")
    if self.smtp_password is None:
        return ErrorArtifact("smtp_password is required")
    if self.smtp_host is None:
        return ErrorArtifact("smtp_host is required")
    if self.smtp_port is None:
        return ErrorArtifact("smtp_port is required")

    msg = MIMEText(values["body"])
    msg["Subject"] = values["subject"]
    msg["From"] = self.smtp_user
    msg["To"] = values["to"]

    try:
        with self._create_smtp_client(self.smtp_host, self.smtp_port) as client:
            client.login(self.smtp_user, self.smtp_password)
            client.sendmail(msg["From"], [msg["To"]], msg.as_string())
            return InfoArtifact("email was successfully sent")
    except Exception as e:
        logging.exception(e)
        return ErrorArtifact(f"error sending email: {e}")

ExtractionTool

Bases: BaseTool , RuleMixin

Attributes

NameTypeDescription
extraction_engineBaseExtractionEngineExtractionEngine.
Source Code in griptape/tools/extraction/tool.py
@define(kw_only=True)
class ExtractionTool(BaseTool, RuleMixin):
    """Tool for using an Extraction Engine.

    Attributes:
        extraction_engine: `ExtractionEngine`.
    """

    extraction_engine: BaseExtractionEngine = field()

    @activity(
        config={
            "description": "Can be used extract structured text from data.",
            "schema": Schema(
                {
                    Literal("data"): Or(
                        str,
                        Schema(
                            {
                                "memory_name": str,
                                "artifact_namespace": str,
                            }
                        ),
                    ),
                }
            ),
        },
    )
    def extract(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact:
        data = params["values"]["data"]

        if isinstance(data, str):
            artifacts = ListArtifact([TextArtifact(data)])
        else:
            memory = self.find_input_memory(data["memory_name"])
            artifact_namespace = data["artifact_namespace"]

            if memory is not None:
                artifacts = memory.load_artifacts(artifact_namespace)
            else:
                return ErrorArtifact("memory not found")

        return self.extraction_engine.extract_artifacts(artifacts)
  • extraction_engine = field() class-attribute instance-attribute

extract(params)

Source Code in griptape/tools/extraction/tool.py
@activity(
    config={
        "description": "Can be used extract structured text from data.",
        "schema": Schema(
            {
                Literal("data"): Or(
                    str,
                    Schema(
                        {
                            "memory_name": str,
                            "artifact_namespace": str,
                        }
                    ),
                ),
            }
        ),
    },
)
def extract(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact:
    data = params["values"]["data"]

    if isinstance(data, str):
        artifacts = ListArtifact([TextArtifact(data)])
    else:
        memory = self.find_input_memory(data["memory_name"])
        artifact_namespace = data["artifact_namespace"]

        if memory is not None:
            artifacts = memory.load_artifacts(artifact_namespace)
        else:
            return ErrorArtifact("memory not found")

    return self.extraction_engine.extract_artifacts(artifacts)

FileManagerTool

Bases: BaseTool

Attributes

NameTypeDescription
file_manager_driverBaseFileManagerDriverFile Manager Driver to use to list, load, and save files.
Source Code in griptape/tools/file_manager/tool.py
@define
class FileManagerTool(BaseTool):
    """FileManagerTool is a tool that can be used to list, load, and save files.

    Attributes:
        file_manager_driver: File Manager Driver to use to list, load, and save files.
    """

    file_manager_driver: BaseFileManagerDriver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True)

    loaders: dict[str, loaders.BaseLoader] = field(
        default=Factory(
            lambda self: {
                "application/pdf": loaders.PdfLoader(file_manager_driver=self.file_manager_driver),
                "text/csv": loaders.CsvLoader(file_manager_driver=self.file_manager_driver),
                "text": loaders.TextLoader(file_manager_driver=self.file_manager_driver),
                "image": loaders.ImageLoader(file_manager_driver=self.file_manager_driver),
                "application/octet-stream": BlobLoader(file_manager_driver=self.file_manager_driver),
            },
            takes_self=True,
        ),
        kw_only=True,
    )

    @activity(
        config={
            "description": "Can be used to list files on disk",
            "schema": Schema(
                {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str},
            ),
        },
    )
    def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
        path = params["values"]["path"]
        return self.file_manager_driver.list_files(path)

    @activity(
        config={
            "description": "Can be used to load files from disk",
            "schema": Schema(
                {
                    Literal(
                        "paths",
                        description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']",
                    ): Schema([str]),
                },
            ),
        },
    )
    def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact:
        paths = params["values"]["paths"]
        artifacts = []

        for path in paths:
            # Fetch the file to try and determine the appropriate loader
            abs_path = os.path.join(self.file_manager_driver.workdir, path)
            mime_type = get_mime_type(abs_path)
            loader = next((loader for key, loader in self.loaders.items() if mime_type.startswith(key)))

            artifact = loader.load(path)
            if isinstance(artifact, ListArtifact):
                artifacts.extend(artifact.value)
            else:
                artifacts.append(artifact)
        return ListArtifact(artifacts)

    @activity(
        config={
            "description": "Can be used to save memory artifacts to disk",
            "schema": Schema(
                {
                    Literal(
                        "dir_name",
                        description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'",
                    ): str,
                    Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str,
                    "memory_name": str,
                    "artifact_namespace": str,
                },
            ),
        },
    )
    def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact:
        dir_name = params["values"]["dir_name"]
        file_name = params["values"]["file_name"]
        memory_name = params["values"]["memory_name"]
        artifact_namespace = params["values"]["artifact_namespace"]

        memory = self.find_input_memory(params["values"]["memory_name"])
        if not memory:
            return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found")

        list_artifact = memory.load_artifacts(artifact_namespace)

        if len(list_artifact) == 0:
            return ErrorArtifact(
                f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts",
            )

        for artifact in list_artifact.value:
            formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name
            try:
                value = artifact.value if isinstance(artifact.value, (str, bytes)) else artifact.to_text()
                self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), value)
            except FileNotFoundError:
                return ErrorArtifact("Path not found")
            except IsADirectoryError:
                return ErrorArtifact("Path is a directory")
            except NotADirectoryError:
                return ErrorArtifact("Not a directory")
            except Exception as e:
                return ErrorArtifact(f"Failed to load file: {e!s}")

        return InfoArtifact("Successfully saved memory artifacts to disk")

    @activity(
        config={
            "description": "Can be used to save content to a file",
            "schema": Schema(
                {
                    Literal(
                        "path",
                        description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'",
                    ): str,
                    "content": str,
                },
            ),
        },
    )
    def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact:
        path = params["values"]["path"]
        content = params["values"]["content"]
        return self.file_manager_driver.save_file(path, content)
  • file_manager_driver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True) class-attribute instance-attribute

  • loaders = field(default=Factory(lambda self: {'application/pdf': loaders.PdfLoader(file_manager_driver=self.file_manager_driver), 'text/csv': loaders.CsvLoader(file_manager_driver=self.file_manager_driver), 'text': loaders.TextLoader(file_manager_driver=self.file_manager_driver), 'image': loaders.ImageLoader(file_manager_driver=self.file_manager_driver), 'application/octet-stream': BlobLoader(file_manager_driver=self.file_manager_driver)}, takes_self=True), kw_only=True) class-attribute instance-attribute

list_files_from_disk(params)

Source Code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to list files on disk",
        "schema": Schema(
            {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str},
        ),
    },
)
def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
    path = params["values"]["path"]
    return self.file_manager_driver.list_files(path)

load_files_from_disk(params)

Source Code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to load files from disk",
        "schema": Schema(
            {
                Literal(
                    "paths",
                    description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']",
                ): Schema([str]),
            },
        ),
    },
)
def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact:
    paths = params["values"]["paths"]
    artifacts = []

    for path in paths:
        # Fetch the file to try and determine the appropriate loader
        abs_path = os.path.join(self.file_manager_driver.workdir, path)
        mime_type = get_mime_type(abs_path)
        loader = next((loader for key, loader in self.loaders.items() if mime_type.startswith(key)))

        artifact = loader.load(path)
        if isinstance(artifact, ListArtifact):
            artifacts.extend(artifact.value)
        else:
            artifacts.append(artifact)
    return ListArtifact(artifacts)

save_content_to_file(params)

Source Code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to save content to a file",
        "schema": Schema(
            {
                Literal(
                    "path",
                    description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'",
                ): str,
                "content": str,
            },
        ),
    },
)
def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact:
    path = params["values"]["path"]
    content = params["values"]["content"]
    return self.file_manager_driver.save_file(path, content)

save_memory_artifacts_to_disk(params)

Source Code in griptape/tools/file_manager/tool.py
@activity(
    config={
        "description": "Can be used to save memory artifacts to disk",
        "schema": Schema(
            {
                Literal(
                    "dir_name",
                    description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'",
                ): str,
                Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str,
                "memory_name": str,
                "artifact_namespace": str,
            },
        ),
    },
)
def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact:
    dir_name = params["values"]["dir_name"]
    file_name = params["values"]["file_name"]
    memory_name = params["values"]["memory_name"]
    artifact_namespace = params["values"]["artifact_namespace"]

    memory = self.find_input_memory(params["values"]["memory_name"])
    if not memory:
        return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found")

    list_artifact = memory.load_artifacts(artifact_namespace)

    if len(list_artifact) == 0:
        return ErrorArtifact(
            f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts",
        )

    for artifact in list_artifact.value:
        formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name
        try:
            value = artifact.value if isinstance(artifact.value, (str, bytes)) else artifact.to_text()
            self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), value)
        except FileNotFoundError:
            return ErrorArtifact("Path not found")
        except IsADirectoryError:
            return ErrorArtifact("Path is a directory")
        except NotADirectoryError:
            return ErrorArtifact("Not a directory")
        except Exception as e:
            return ErrorArtifact(f"Failed to load file: {e!s}")

    return InfoArtifact("Successfully saved memory artifacts to disk")

GriptapeCloudToolTool

Bases: BaseGriptapeCloudTool

Attributes

NameTypeDescription
tool_idstrThe ID of the tool to run.
Source Code in griptape/tools/griptape_cloud_tool/tool.py
@define()
class GriptapeCloudToolTool(BaseGriptapeCloudTool):
    """Runs a Gen AI Builder hosted Tool.

    Attributes:
        tool_id: The ID of the tool to run.
    """

    tool_id: str = field(kw_only=True)

    def __attrs_post_init__(self) -> None:
        self._init_activities()

    def _init_activities(self) -> None:
        schema = self._get_schema()
        tool_name, activity_schemas = self._parse_schema(schema)

        if self.name == self.__class__.__name__:
            self.name = tool_name

        for activity_name, (description, activity_schema) in activity_schemas.items():
            activity_handler = self._create_activity_handler(activity_name, description, activity_schema)

            setattr(self, activity_name, MethodType(activity_handler, self))

    def _get_schema(self) -> dict:
        response = requests.get(urljoin(self.base_url, f"/api/tools/{self.tool_id}/openapi"), headers=self.headers)

        response.raise_for_status()
        schema = response.json()

        if not isinstance(schema, dict):
            raise RuntimeError(f"Invalid schema for tool {self.tool_id}: {schema}")

        if "error" in schema and "tool_run_id" in schema:
            raise RuntimeError(f"Failed to retrieve schema for tool {self.tool_id}: {schema['error']}")

        return schema

    def _parse_schema(self, schema: dict) -> tuple[str, dict[str, tuple[str, Schema]]]:
        """Parses an openapi schema into a dictionary of activity names and their respective descriptions + schemas."""
        activities = {}

        name = schema.get("info", {}).get("title")

        for path, path_info in schema.get("paths", {}).items():
            if not path.startswith("/activities"):
                continue
            for method, method_info in path_info.items():
                if "post" in method.lower():
                    activity_name = method_info["operationId"]
                    description = method_info.get("description", "")

                    activity_schema = self.__extract_schema_from_ref(
                        schema,
                        method_info.get("requestBody", {})
                        .get("content", {})
                        .get("application/json", {})
                        .get("schema", {}),
                    )

                    activities[activity_name] = (description, activity_schema)

        return name, activities

    def __extract_schema_from_ref(self, schema: dict, schema_ref: dict) -> Schema:
        """Extracts a schema from a $ref if present, resolving it into native schema properties."""
        if "$ref" in schema_ref:
            # Resolve the reference and retrieve the schema data
            ref_path = schema_ref["$ref"].split("/")[-1]
            schema_data = schema["components"]["schemas"].get(ref_path, {})
        else:
            # Use the provided schema directly if no $ref is found
            schema_data = schema_ref

        # Convert the schema_data dictionary into a Schema with its properties
        properties = {}
        for prop, prop_info in schema_data.get("properties", {}).items():
            prop_type = prop_info.get("type", "string")
            prop_description = prop_info.get("description", "")
            schema_prop = Literal(prop, description=prop_description)
            is_optional = prop not in schema_data.get("required", [])

            if is_optional:
                schema_prop = SchemaOptional(schema_prop)

            properties[schema_prop] = self._map_openapi_type_to_python(prop_type, prop_info)

        return Schema(properties)

    def _map_openapi_type_to_python(
        self, openapi_type: str, schema_info: Optional[dict] = None
    ) -> type | list[type] | Or:
        """Maps OpenAPI types to native Python types."""
        type_mapping = {"string": str, "integer": int, "boolean": bool, "number": float, "object": dict}

        if openapi_type == "array" and schema_info is not None and "items" in schema_info:
            enum = schema_info["items"].get("enum")
            if enum:
                return enum
            items_type = schema_info["items"].get("type", "string")
            return [self._map_openapi_type_to_python(items_type)]  # pyright: ignore[reportReturnType]
        if schema_info is not None and schema_info.get("enum"):
            return Or(*schema_info["enum"])
        return type_mapping.get(openapi_type, str)

    def _create_activity_handler(self, activity_name: str, description: str, activity_schema: Schema) -> Callable:
        """Creates an activity handler method for the tool."""

        @activity(config={"name": activity_name, "description": description, "schema": activity_schema})
        def activity_handler(self: GriptapeCloudToolTool, values: dict) -> Any:
            return self._run_activity(activity_name, values)

        return activity_handler

    def _run_activity(self, activity_name: str, params: dict) -> BaseArtifact:
        """Runs an activity on the tool with the provided parameters."""
        url = urljoin(self.base_url, f"/api/tools/{self.tool_id}/activities/{activity_name}")

        response = requests.post(url, json=params, headers=self.headers)

        response.raise_for_status()

        try:
            return BaseArtifact.from_dict(response.json())
        except ValueError:
            return TextArtifact(response.text)
  • tool_id = field(kw_only=True) class-attribute instance-attribute

attrs_post_init()

Source Code in griptape/tools/griptape_cloud_tool/tool.py
def __attrs_post_init__(self) -> None:
    self._init_activities()

__extract_schema_from_ref(schema, schema_ref)

Source Code in griptape/tools/griptape_cloud_tool/tool.py
def __extract_schema_from_ref(self, schema: dict, schema_ref: dict) -> Schema:
    """Extracts a schema from a $ref if present, resolving it into native schema properties."""
    if "$ref" in schema_ref:
        # Resolve the reference and retrieve the schema data
        ref_path = schema_ref["$ref"].split("/")[-1]
        schema_data = schema["components"]["schemas"].get(ref_path, {})
    else:
        # Use the provided schema directly if no $ref is found
        schema_data = schema_ref

    # Convert the schema_data dictionary into a Schema with its properties
    properties = {}
    for prop, prop_info in schema_data.get("properties", {}).items():
        prop_type = prop_info.get("type", "string")
        prop_description = prop_info.get("description", "")
        schema_prop = Literal(prop, description=prop_description)
        is_optional = prop not in schema_data.get("required", [])

        if is_optional:
            schema_prop = SchemaOptional(schema_prop)

        properties[schema_prop] = self._map_openapi_type_to_python(prop_type, prop_info)

    return Schema(properties)

_create_activity_handler(activity_name, description, activity_schema)

Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _create_activity_handler(self, activity_name: str, description: str, activity_schema: Schema) -> Callable:
    """Creates an activity handler method for the tool."""

    @activity(config={"name": activity_name, "description": description, "schema": activity_schema})
    def activity_handler(self: GriptapeCloudToolTool, values: dict) -> Any:
        return self._run_activity(activity_name, values)

    return activity_handler

_get_schema()

Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _get_schema(self) -> dict:
    response = requests.get(urljoin(self.base_url, f"/api/tools/{self.tool_id}/openapi"), headers=self.headers)

    response.raise_for_status()
    schema = response.json()

    if not isinstance(schema, dict):
        raise RuntimeError(f"Invalid schema for tool {self.tool_id}: {schema}")

    if "error" in schema and "tool_run_id" in schema:
        raise RuntimeError(f"Failed to retrieve schema for tool {self.tool_id}: {schema['error']}")

    return schema

_init_activities()

Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _init_activities(self) -> None:
    schema = self._get_schema()
    tool_name, activity_schemas = self._parse_schema(schema)

    if self.name == self.__class__.__name__:
        self.name = tool_name

    for activity_name, (description, activity_schema) in activity_schemas.items():
        activity_handler = self._create_activity_handler(activity_name, description, activity_schema)

        setattr(self, activity_name, MethodType(activity_handler, self))

_map_openapi_type_to_python(openapi_type, schema_info=None)

Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _map_openapi_type_to_python(
    self, openapi_type: str, schema_info: Optional[dict] = None
) -> type | list[type] | Or:
    """Maps OpenAPI types to native Python types."""
    type_mapping = {"string": str, "integer": int, "boolean": bool, "number": float, "object": dict}

    if openapi_type == "array" and schema_info is not None and "items" in schema_info:
        enum = schema_info["items"].get("enum")
        if enum:
            return enum
        items_type = schema_info["items"].get("type", "string")
        return [self._map_openapi_type_to_python(items_type)]  # pyright: ignore[reportReturnType]
    if schema_info is not None and schema_info.get("enum"):
        return Or(*schema_info["enum"])
    return type_mapping.get(openapi_type, str)

_parse_schema(schema)

Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _parse_schema(self, schema: dict) -> tuple[str, dict[str, tuple[str, Schema]]]:
    """Parses an openapi schema into a dictionary of activity names and their respective descriptions + schemas."""
    activities = {}

    name = schema.get("info", {}).get("title")

    for path, path_info in schema.get("paths", {}).items():
        if not path.startswith("/activities"):
            continue
        for method, method_info in path_info.items():
            if "post" in method.lower():
                activity_name = method_info["operationId"]
                description = method_info.get("description", "")

                activity_schema = self.__extract_schema_from_ref(
                    schema,
                    method_info.get("requestBody", {})
                    .get("content", {})
                    .get("application/json", {})
                    .get("schema", {}),
                )

                activities[activity_name] = (description, activity_schema)

    return name, activities

_run_activity(activity_name, params)

Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _run_activity(self, activity_name: str, params: dict) -> BaseArtifact:
    """Runs an activity on the tool with the provided parameters."""
    url = urljoin(self.base_url, f"/api/tools/{self.tool_id}/activities/{activity_name}")

    response = requests.post(url, json=params, headers=self.headers)

    response.raise_for_status()

    try:
        return BaseArtifact.from_dict(response.json())
    except ValueError:
        return TextArtifact(response.text)

ImageQueryTool

Bases: BaseTool

Source Code in griptape/tools/image_query/tool.py
@define
class ImageQueryTool(BaseTool):
    prompt_driver: BasePromptDriver = field(kw_only=True)
    image_loader: ImageLoader = field(default=Factory(lambda: ImageLoader()), kw_only=True)

    @activity(
        config={
            "description": "This tool can be used to query the contents of images on disk.",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="A detailed question to be answered using the contents of the provided images.",
                    ): str,
                    Literal("image_paths", description="The paths to an image files on disk."): Schema([str]),
                },
            ),
        },
    )
    def query_image_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
        query = params["values"]["query"]
        image_paths = params["values"]["image_paths"]

        image_artifacts = []
        for image_path in image_paths:
            image_artifacts.append(self.image_loader.load(image_path))

        return cast(
            "TextArtifact",
            self.prompt_driver.run(
                PromptStack.from_artifact(
                    ListArtifact([TextArtifact(query), *image_artifacts]),
                )
            ).to_artifact(),
        )

    @activity(
        config={
            "description": "This tool can be used to query the contents of images in memory.",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="A detailed question to be answered using the contents of the provided images.",
                    ): str,
                    Literal("image_artifacts", description="Image artifact memory references."): [
                        {"image_artifact_namespace": str, "image_artifact_name": str},
                    ],
                    "memory_name": str,
                },
            ),
        },
    )
    def query_images_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
        query = params["values"]["query"]
        image_artifact_references = params["values"]["image_artifacts"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        image_artifacts = []
        for image_artifact_reference in image_artifact_references:
            try:
                image_artifact = load_artifact_from_memory(
                    memory,
                    image_artifact_reference["image_artifact_namespace"],
                    image_artifact_reference["image_artifact_name"],
                    ImageArtifact,
                )

                image_artifacts.append(cast("ImageArtifact", image_artifact))
            except ValueError:
                # If we're unable to parse the artifact as an ImageArtifact, attempt to
                # parse a BlobArtifact and load it as an ImageArtifact.
                blob_artifact = load_artifact_from_memory(
                    memory,
                    image_artifact_reference["image_artifact_namespace"],
                    image_artifact_reference["image_artifact_name"],
                    BlobArtifact,
                )

                image_artifacts.append(self.image_loader.load(blob_artifact.value))
            except Exception as e:
                return ErrorArtifact(str(e))

        return cast(
            "TextArtifact",
            self.prompt_driver.run(
                PromptStack.from_artifact(
                    ListArtifact([TextArtifact(query), *image_artifacts]),
                )
            ).to_artifact(),
        )
  • image_loader = field(default=Factory(lambda: ImageLoader()), kw_only=True) class-attribute instance-attribute

  • prompt_driver = field(kw_only=True) class-attribute instance-attribute

query_image_from_disk(params)

Source Code in griptape/tools/image_query/tool.py
@activity(
    config={
        "description": "This tool can be used to query the contents of images on disk.",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="A detailed question to be answered using the contents of the provided images.",
                ): str,
                Literal("image_paths", description="The paths to an image files on disk."): Schema([str]),
            },
        ),
    },
)
def query_image_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact:
    query = params["values"]["query"]
    image_paths = params["values"]["image_paths"]

    image_artifacts = []
    for image_path in image_paths:
        image_artifacts.append(self.image_loader.load(image_path))

    return cast(
        "TextArtifact",
        self.prompt_driver.run(
            PromptStack.from_artifact(
                ListArtifact([TextArtifact(query), *image_artifacts]),
            )
        ).to_artifact(),
    )

query_images_from_memory(params)

Source Code in griptape/tools/image_query/tool.py
@activity(
    config={
        "description": "This tool can be used to query the contents of images in memory.",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="A detailed question to be answered using the contents of the provided images.",
                ): str,
                Literal("image_artifacts", description="Image artifact memory references."): [
                    {"image_artifact_namespace": str, "image_artifact_name": str},
                ],
                "memory_name": str,
            },
        ),
    },
)
def query_images_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact:
    query = params["values"]["query"]
    image_artifact_references = params["values"]["image_artifacts"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    image_artifacts = []
    for image_artifact_reference in image_artifact_references:
        try:
            image_artifact = load_artifact_from_memory(
                memory,
                image_artifact_reference["image_artifact_namespace"],
                image_artifact_reference["image_artifact_name"],
                ImageArtifact,
            )

            image_artifacts.append(cast("ImageArtifact", image_artifact))
        except ValueError:
            # If we're unable to parse the artifact as an ImageArtifact, attempt to
            # parse a BlobArtifact and load it as an ImageArtifact.
            blob_artifact = load_artifact_from_memory(
                memory,
                image_artifact_reference["image_artifact_namespace"],
                image_artifact_reference["image_artifact_name"],
                BlobArtifact,
            )

            image_artifacts.append(self.image_loader.load(blob_artifact.value))
        except Exception as e:
            return ErrorArtifact(str(e))

    return cast(
        "TextArtifact",
        self.prompt_driver.run(
            PromptStack.from_artifact(
                ListArtifact([TextArtifact(query), *image_artifacts]),
            )
        ).to_artifact(),
    )

InpaintingImageGenerationTool

Bases: BaseImageGenerationTool

Attributes

NameTypeDescription
image_generation_driverBaseImageGenerationDriverThe image generation driver used to generate the image.
output_dirOptional[str]If provided, the generated image will be written to disk in output_dir.
output_fileOptional[str]If provided, the generated image will be written to disk as output_file.
Source Code in griptape/tools/inpainting_image_generation/tool.py
@define
class InpaintingImageGenerationTool(BaseImageGenerationTool):
    """A tool that can be used to generate prompted inpaintings of an image.

    Attributes:
        image_generation_driver: The image generation driver used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    image_generation_driver: BaseImageGenerationDriver = field(kw_only=True)
    image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True)

    @activity(
        config={
            "description": "Modifies an image within a specified mask area using image and mask files.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    Literal(
                        "image_file",
                        description="The path to an image file to be used as a base to generate variations from.",
                    ): str,
                    Literal("mask_file", description="The path to mask image file."): str,
                },
            ),
        },
    )
    def image_inpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_file = params["values"]["image_file"]
        mask_file = params["values"]["mask_file"]

        input_artifact = self.image_loader.load(image_file)
        mask_artifact = self.image_loader.load(mask_file)

        return self._generate_inpainting(
            prompt, negative_prompt, cast("ImageArtifact", input_artifact), cast("ImageArtifact", mask_artifact)
        )

    @activity(
        config={
            "description": "Modifies an image within a specified mask area using image and mask artifacts in memory.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    "memory_name": str,
                    "image_artifact_namespace": str,
                    "image_artifact_name": str,
                    "mask_artifact_namespace": str,
                    "mask_artifact_name": str,
                },
            ),
        },
    )
    def image_inpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_artifact_namespace = params["values"]["image_artifact_namespace"]
        image_artifact_name = params["values"]["image_artifact_name"]
        mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
        mask_artifact_name = params["values"]["mask_artifact_name"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        try:
            image_artifact = load_artifact_from_memory(
                memory,
                image_artifact_namespace,
                image_artifact_name,
                ImageArtifact,
            )
            mask_artifact = load_artifact_from_memory(
                memory,
                mask_artifact_namespace,
                mask_artifact_name,
                ImageArtifact,
            )
        except ValueError as e:
            return ErrorArtifact(str(e))

        return self._generate_inpainting(
            prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact)
        )

    def _generate_inpainting(
        self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact
    ) -> ImageArtifact:
        output_artifact = self.image_generation_driver.run_image_inpainting(
            prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact
  • image_generation_driver = field(kw_only=True) class-attribute instance-attribute

  • image_loader = field(default=ImageLoader(), kw_only=True) class-attribute instance-attribute

_generate_inpainting(prompt, negative_prompt, image_artifact, mask_artifact)

Source Code in griptape/tools/inpainting_image_generation/tool.py
def _generate_inpainting(
    self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact
) -> ImageArtifact:
    output_artifact = self.image_generation_driver.run_image_inpainting(
        prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_artifact)

    return output_artifact

image_inpainting_from_file(params)

Source Code in griptape/tools/inpainting_image_generation/tool.py
@activity(
    config={
        "description": "Modifies an image within a specified mask area using image and mask files.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                Literal(
                    "image_file",
                    description="The path to an image file to be used as a base to generate variations from.",
                ): str,
                Literal("mask_file", description="The path to mask image file."): str,
            },
        ),
    },
)
def image_inpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    image_file = params["values"]["image_file"]
    mask_file = params["values"]["mask_file"]

    input_artifact = self.image_loader.load(image_file)
    mask_artifact = self.image_loader.load(mask_file)

    return self._generate_inpainting(
        prompt, negative_prompt, cast("ImageArtifact", input_artifact), cast("ImageArtifact", mask_artifact)
    )

image_inpainting_from_memory(params)

Source Code in griptape/tools/inpainting_image_generation/tool.py
@activity(
    config={
        "description": "Modifies an image within a specified mask area using image and mask artifacts in memory.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                "memory_name": str,
                "image_artifact_namespace": str,
                "image_artifact_name": str,
                "mask_artifact_namespace": str,
                "mask_artifact_name": str,
            },
        ),
    },
)
def image_inpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    image_artifact_namespace = params["values"]["image_artifact_namespace"]
    image_artifact_name = params["values"]["image_artifact_name"]
    mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
    mask_artifact_name = params["values"]["mask_artifact_name"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    try:
        image_artifact = load_artifact_from_memory(
            memory,
            image_artifact_namespace,
            image_artifact_name,
            ImageArtifact,
        )
        mask_artifact = load_artifact_from_memory(
            memory,
            mask_artifact_namespace,
            mask_artifact_name,
            ImageArtifact,
        )
    except ValueError as e:
        return ErrorArtifact(str(e))

    return self._generate_inpainting(
        prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact)
    )

OutpaintingImageGenerationTool

Bases: BaseImageGenerationTool

Attributes

NameTypeDescription
image_generation_driverBaseImageGenerationDriverThe image generation driver used to generate the image.
output_dirOptional[str]If provided, the generated image will be written to disk in output_dir.
output_fileOptional[str]If provided, the generated image will be written to disk as output_file.
Source Code in griptape/tools/outpainting_image_generation/tool.py
@define
class OutpaintingImageGenerationTool(BaseImageGenerationTool):
    """A tool that can be used to generate prompted outpaintings of an image.

    Attributes:
        image_generation_driver: The image generation driver used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    image_generation_driver: BaseImageGenerationDriver = field(kw_only=True)
    image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True)

    @activity(
        config={
            "description": "Modifies an image outside a specified mask area using image and mask files.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    Literal(
                        "image_file",
                        description="The path to an image file to be used as a base to generate variations from.",
                    ): str,
                    Literal("mask_file", description="The path to mask image file."): str,
                },
            ),
        },
    )
    def image_outpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_file = params["values"]["image_file"]
        mask_file = params["values"]["mask_file"]

        input_artifact = self.image_loader.load(image_file)
        mask_artifact = self.image_loader.load(mask_file)

        return self._generate_outpainting(prompt, negative_prompt, input_artifact, mask_artifact)

    @activity(
        config={
            "description": "Modifies an image outside a specified mask area using image and mask artifacts in memory.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    "memory_name": str,
                    "image_artifact_namespace": str,
                    "mask_artifact_namespace": str,
                },
            ),
        },
    )
    def image_outpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_artifact_namespace = params["values"]["image_artifact_namespace"]
        image_artifact_name = params["values"]["image_artifact_name"]
        mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
        mask_artifact_name = params["values"]["mask_artifact_name"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        try:
            image_artifact = load_artifact_from_memory(
                memory,
                image_artifact_namespace,
                image_artifact_name,
                ImageArtifact,
            )
            mask_artifact = load_artifact_from_memory(
                memory,
                mask_artifact_namespace,
                mask_artifact_name,
                ImageArtifact,
            )
        except ValueError as e:
            return ErrorArtifact(str(e))

        return self._generate_outpainting(
            prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact)
        )

    def _generate_outpainting(
        self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact
    ) -> ImageArtifact | ErrorArtifact:
        output_artifact = self.image_generation_driver.run_image_outpainting(
            prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact
  • image_generation_driver = field(kw_only=True) class-attribute instance-attribute

  • image_loader = field(default=ImageLoader(), kw_only=True) class-attribute instance-attribute

_generate_outpainting(prompt, negative_prompt, image_artifact, mask_artifact)

Source Code in griptape/tools/outpainting_image_generation/tool.py
def _generate_outpainting(
    self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact
) -> ImageArtifact | ErrorArtifact:
    output_artifact = self.image_generation_driver.run_image_outpainting(
        prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_artifact)

    return output_artifact

image_outpainting_from_file(params)

Source Code in griptape/tools/outpainting_image_generation/tool.py
@activity(
    config={
        "description": "Modifies an image outside a specified mask area using image and mask files.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                Literal(
                    "image_file",
                    description="The path to an image file to be used as a base to generate variations from.",
                ): str,
                Literal("mask_file", description="The path to mask image file."): str,
            },
        ),
    },
)
def image_outpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    image_file = params["values"]["image_file"]
    mask_file = params["values"]["mask_file"]

    input_artifact = self.image_loader.load(image_file)
    mask_artifact = self.image_loader.load(mask_file)

    return self._generate_outpainting(prompt, negative_prompt, input_artifact, mask_artifact)

image_outpainting_from_memory(params)

Source Code in griptape/tools/outpainting_image_generation/tool.py
@activity(
    config={
        "description": "Modifies an image outside a specified mask area using image and mask artifacts in memory.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                "memory_name": str,
                "image_artifact_namespace": str,
                "mask_artifact_namespace": str,
            },
        ),
    },
)
def image_outpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    image_artifact_namespace = params["values"]["image_artifact_namespace"]
    image_artifact_name = params["values"]["image_artifact_name"]
    mask_artifact_namespace = params["values"]["mask_artifact_namespace"]
    mask_artifact_name = params["values"]["mask_artifact_name"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    try:
        image_artifact = load_artifact_from_memory(
            memory,
            image_artifact_namespace,
            image_artifact_name,
            ImageArtifact,
        )
        mask_artifact = load_artifact_from_memory(
            memory,
            mask_artifact_namespace,
            mask_artifact_name,
            ImageArtifact,
        )
    except ValueError as e:
        return ErrorArtifact(str(e))

    return self._generate_outpainting(
        prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact)
    )

PromptImageGenerationTool

Bases: BaseImageGenerationTool

Attributes

NameTypeDescription
image_generation_driverBaseImageGenerationDriverThe image generation driver used to generate the image.
output_dirOptional[str]If provided, the generated image will be written to disk in output_dir.
output_fileOptional[str]If provided, the generated image will be written to disk as output_file.
Source Code in griptape/tools/prompt_image_generation/tool.py
@define
class PromptImageGenerationTool(BaseImageGenerationTool):
    """A tool that can be used to generate an image from a text prompt.

    Attributes:
        image_generation_driver: The image generation driver used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    image_generation_driver: BaseImageGenerationDriver = field(kw_only=True)

    @activity(
        config={
            "description": "Generates an image from text prompts.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                }
            ),
        },
    )
    def generate_image(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]

        output_artifact = self.image_generation_driver.run_text_to_image(
            prompts=[prompt], negative_prompts=[negative_prompt]
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact
  • image_generation_driver = field(kw_only=True) class-attribute instance-attribute

generate_image(params)

Source Code in griptape/tools/prompt_image_generation/tool.py
@activity(
    config={
        "description": "Generates an image from text prompts.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
            }
        ),
    },
)
def generate_image(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]

    output_artifact = self.image_generation_driver.run_text_to_image(
        prompts=[prompt], negative_prompts=[negative_prompt]
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_artifact)

    return output_artifact

PromptSummaryTool

Bases: BaseTool , RuleMixin

Attributes

NameTypeDescription
prompt_summary_enginePromptSummaryEnginePromptSummaryEngine.
Source Code in griptape/tools/prompt_summary/tool.py
@define(kw_only=True)
class PromptSummaryTool(BaseTool, RuleMixin):
    """Tool for using a Prompt Summary Engine.

    Attributes:
        prompt_summary_engine: `PromptSummaryEngine`.
    """

    prompt_summary_engine: PromptSummaryEngine = field(kw_only=True, default=Factory(lambda: PromptSummaryEngine()))

    @activity(
        config={
            "description": "Can be used to summarize text content.",
            "schema": Schema(
                {
                    Literal("summary"): Or(
                        str,
                        Schema(
                            {
                                "memory_name": str,
                                "artifact_namespace": str,
                            }
                        ),
                    ),
                }
            ),
        },
    )
    def summarize(self, params: dict) -> BaseArtifact:
        summary = params["values"]["summary"]

        if isinstance(summary, str):
            artifacts = ListArtifact([TextArtifact(summary)])
        else:
            memory = self.find_input_memory(summary["memory_name"])
            artifact_namespace = summary["artifact_namespace"]

            if memory is not None:
                artifacts = memory.load_artifacts(artifact_namespace)
            else:
                return ErrorArtifact("memory not found")

        return self.prompt_summary_engine.summarize_artifacts(artifacts, rulesets=self.rulesets)
  • prompt_summary_engine = field(kw_only=True, default=Factory(lambda: PromptSummaryEngine())) class-attribute instance-attribute

summarize(params)

Source Code in griptape/tools/prompt_summary/tool.py
@activity(
    config={
        "description": "Can be used to summarize text content.",
        "schema": Schema(
            {
                Literal("summary"): Or(
                    str,
                    Schema(
                        {
                            "memory_name": str,
                            "artifact_namespace": str,
                        }
                    ),
                ),
            }
        ),
    },
)
def summarize(self, params: dict) -> BaseArtifact:
    summary = params["values"]["summary"]

    if isinstance(summary, str):
        artifacts = ListArtifact([TextArtifact(summary)])
    else:
        memory = self.find_input_memory(summary["memory_name"])
        artifact_namespace = summary["artifact_namespace"]

        if memory is not None:
            artifacts = memory.load_artifacts(artifact_namespace)
        else:
            return ErrorArtifact("memory not found")

    return self.prompt_summary_engine.summarize_artifacts(artifacts, rulesets=self.rulesets)

QueryTool

Bases: BaseTool , RuleMixin

Source Code in griptape/tools/query/tool.py
@define(kw_only=True)
class QueryTool(BaseTool, RuleMixin):
    """Tool for performing a query against data."""

    prompt_driver: BasePromptDriver = field(default=Factory(lambda: Defaults.drivers_config.prompt_driver))

    _rag_engine: RagEngine = field(
        default=Factory(
            lambda self: RagEngine(
                response_stage=ResponseRagStage(
                    response_modules=[
                        PromptResponseRagModule(prompt_driver=self.prompt_driver, rulesets=self.rulesets)
                    ],
                ),
            ),
            takes_self=True,
        ),
        alias="_rag_engine",
    )

    @activity(
        config={
            "description": "Can be used to search through textual content.",
            "schema": Schema(
                {
                    Literal("query", description="A natural language search query"): str,
                    Literal("content"): Or(
                        str,
                        Schema(
                            {
                                "memory_name": str,
                                "artifact_namespace": str,
                            }
                        ),
                    ),
                }
            ),
        },
    )
    def query(self, params: dict) -> ListArtifact | ErrorArtifact:
        query = params["values"]["query"]
        content = params["values"]["content"]

        if isinstance(content, str):
            text_artifacts = [TextArtifact(content)]
        else:
            memory = self.find_input_memory(content["memory_name"])
            artifact_namespace = content["artifact_namespace"]

            if memory is not None:
                artifacts = memory.load_artifacts(artifact_namespace)
            else:
                return ErrorArtifact("memory not found")

            text_artifacts = [artifact for artifact in artifacts if isinstance(artifact, TextArtifact)]

        outputs = self._rag_engine.process(RagContext(query=query, text_chunks=text_artifacts)).outputs

        if len(outputs) > 0:
            return ListArtifact(outputs)
        return ErrorArtifact("query output is empty")
  • _rag_engine = field(default=Factory(lambda self: RagEngine(response_stage=ResponseRagStage(response_modules=[PromptResponseRagModule(prompt_driver=self.prompt_driver, rulesets=self.rulesets)])), takes_self=True), alias='_rag_engine') class-attribute instance-attribute

  • prompt_driver = field(default=Factory(lambda: Defaults.drivers_config.prompt_driver)) class-attribute instance-attribute

query(params)

Source Code in griptape/tools/query/tool.py
@activity(
    config={
        "description": "Can be used to search through textual content.",
        "schema": Schema(
            {
                Literal("query", description="A natural language search query"): str,
                Literal("content"): Or(
                    str,
                    Schema(
                        {
                            "memory_name": str,
                            "artifact_namespace": str,
                        }
                    ),
                ),
            }
        ),
    },
)
def query(self, params: dict) -> ListArtifact | ErrorArtifact:
    query = params["values"]["query"]
    content = params["values"]["content"]

    if isinstance(content, str):
        text_artifacts = [TextArtifact(content)]
    else:
        memory = self.find_input_memory(content["memory_name"])
        artifact_namespace = content["artifact_namespace"]

        if memory is not None:
            artifacts = memory.load_artifacts(artifact_namespace)
        else:
            return ErrorArtifact("memory not found")

        text_artifacts = [artifact for artifact in artifacts if isinstance(artifact, TextArtifact)]

    outputs = self._rag_engine.process(RagContext(query=query, text_chunks=text_artifacts)).outputs

    if len(outputs) > 0:
        return ListArtifact(outputs)
    return ErrorArtifact("query output is empty")

RagTool

Bases: BaseTool

Attributes

NameTypeDescription
descriptionstrLLM-friendly RAG engine description.
rag_engineRagEngineRagEngine.
Source Code in griptape/tools/rag/tool.py
@define(kw_only=True)
class RagTool(BaseTool):
    """Tool for querying a RAG engine.

    Attributes:
        description: LLM-friendly RAG engine description.
        rag_engine: `RagEngine`.
    """

    description: str = field()
    rag_engine: RagEngine = field()

    @activity(
        config={
            "description": "{{ _self.description }}",
            "schema": Schema({Literal("query", description="A natural language search query"): str}),
        },
    )
    def search(self, params: dict) -> ListArtifact | ErrorArtifact:
        query = params["values"]["query"]

        try:
            artifacts = self.rag_engine.process_query(query).outputs

            outputs = []
            for artifact in artifacts:
                if isinstance(artifact, ListArtifact):
                    outputs.extend(artifact.value)
                else:
                    outputs.append(artifact)

            if len(outputs) > 0:
                return ListArtifact(outputs)
            return ErrorArtifact("query output is empty")

        except Exception as e:
            return ErrorArtifact(f"error querying: {e}")
  • description = field() class-attribute instance-attribute

  • rag_engine = field() class-attribute instance-attribute

search(params)

Source Code in griptape/tools/rag/tool.py
@activity(
    config={
        "description": "{{ _self.description }}",
        "schema": Schema({Literal("query", description="A natural language search query"): str}),
    },
)
def search(self, params: dict) -> ListArtifact | ErrorArtifact:
    query = params["values"]["query"]

    try:
        artifacts = self.rag_engine.process_query(query).outputs

        outputs = []
        for artifact in artifacts:
            if isinstance(artifact, ListArtifact):
                outputs.extend(artifact.value)
            else:
                outputs.append(artifact)

        if len(outputs) > 0:
            return ListArtifact(outputs)
        return ErrorArtifact("query output is empty")

    except Exception as e:
        return ErrorArtifact(f"error querying: {e}")

RestApiTool

Bases: BaseTool

Attributes

NameTypeDescription
base_urlstrThe base url that will be used for the request.
pathOptional[str]The resource path that will be appended to base_url.
descriptionstrA description of what the REST API does.
request_body_schemaOptional[str]A JSON schema string describing the request body. Recommended for PUT, POST, and PATCH requests.
request_query_params_schemaOptional[str]A JSON schema string describing the available query parameters.
request_path_params_schemaOptional[str]A JSON schema string describing the available path parameters. The schema must describe an array of string values.
response_body_schemaOptional[str]A JSON schema string describing the response body.
request_headersOptional[dict[str, str]]Headers to include in the requests.
Source Code in griptape/tools/rest_api/tool.py
@define
class RestApiTool(BaseTool):
    """A tool for making REST API requests.

    Attributes:
        base_url: The base url that will be used for the request.
        path: The resource path that will be appended to base_url.
        description: A description of what the REST API does.
        request_body_schema: A JSON schema string describing the request body. Recommended for PUT, POST, and PATCH requests.
        request_query_params_schema: A JSON schema string describing the available query parameters.
        request_path_params_schema: A JSON schema string describing the available path parameters. The schema must describe an array of string values.
        response_body_schema: A JSON schema string describing the response body.
        request_headers: Headers to include in the requests.
    """

    base_url: str = field(kw_only=True)
    path: Optional[str] = field(default=None, kw_only=True)
    description: str = field(kw_only=True)
    request_path_params_schema: Optional[str] = field(default=None, kw_only=True)
    request_query_params_schema: Optional[str] = field(default=None, kw_only=True)
    request_body_schema: Optional[str] = field(default=None, kw_only=True)
    response_body_schema: Optional[str] = field(default=None, kw_only=True)
    request_headers: Optional[dict[str, str]] = field(default=None, kw_only=True)

    @property
    def full_url(self) -> str:
        return self._build_url(self.base_url, path=self.path)

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a put request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
                {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
                """,
            ),
            "schema": Schema({Literal("body", description="The request body."): dict}),
        },
    )
    def put(self, params: dict) -> BaseArtifact:
        from requests import exceptions, put

        values = params["values"]
        base_url = self.base_url
        path = self.path
        body = values["body"]
        url = self._build_url(base_url, path=path)

        try:
            response = put(url, json=body, timeout=30, headers=self.request_headers)

            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a patch request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
                {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
                """,
            ),
            "schema": Schema(
                {
                    Literal("path_params", description="The request path parameters."): Schema([str]),
                    Literal("body", description="The request body."): dict,
                },
            ),
        },
    )
    def patch(self, params: dict) -> BaseArtifact:
        from requests import exceptions, patch

        values = params["values"]
        base_url = self.base_url
        path = self.path
        body = values["body"]
        path_params = values["path_params"]
        url = self._build_url(base_url, path=path, path_params=path_params)

        try:
            response = patch(url, json=body, timeout=30, headers=self.request_headers)
            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a post request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
                {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
                """,
            ),
            "schema": Schema({Literal("body", description="The request body."): dict}),
        },
    )
    def post(self, params: dict) -> BaseArtifact:
        from requests import exceptions, post

        values = params["values"]
        base_url = self.base_url
        path = self.path
        url = self._build_url(base_url, path=path)
        body = values["body"]

        try:
            response = post(url, json=body, timeout=30, headers=self.request_headers)
            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a get request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
                """,
            ),
            "schema": schema.Optional(
                Schema(
                    {
                        schema.Optional(Literal("query_params", description="The request query parameters.")): dict,
                        schema.Optional(Literal("path_params", description="The request path parameters.")): Schema(
                            [str]
                        ),
                    },
                ),
            ),
        },
    )
    def get(self, params: dict) -> BaseArtifact:
        from requests import exceptions, get

        values = params["values"]
        base_url = self.base_url
        path = self.path

        query_params = {}
        path_params = []
        if values:
            query_params = values.get("query_params", {})
            path_params = values.get("path_params", [])
        url = self._build_url(base_url, path=path, path_params=path_params)

        try:
            response = get(url, params=query_params, timeout=30, headers=self.request_headers)
            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    @activity(
        config={
            "description": dedent(
                """
                This tool can be used to make a delete request to the rest api url: {{ _self.full_url }}
                This rest api has the following description: {{ _self.description }}
                {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
                """,
            ),
            "schema": Schema(
                {
                    schema.Optional(Literal("query_params", description="The request query parameters.")): dict,
                    schema.Optional(Literal("path_params", description="The request path parameters.")): Schema([str]),
                },
            ),
        },
    )
    def delete(self, params: dict) -> BaseArtifact:
        from requests import delete, exceptions

        values = params["values"]
        base_url = self.base_url
        path = self.path

        query_params = values.get("query_params", {})
        path_params = values.get("path_params", [])
        url = self._build_url(base_url, path=path, path_params=path_params)

        try:
            response = delete(url, params=query_params, timeout=30, headers=self.request_headers)
            return TextArtifact(response.text)
        except exceptions.RequestException as err:
            return ErrorArtifact(str(err))

    def _build_url(self, base_url: str, path: Optional[str] = None, path_params: Optional[list] = None) -> str:
        url = ""

        if path:
            url += path.strip("/")

        if path_params:
            url += f"/{str.join('/', map(str, path_params))}"

        return urljoin(base_url.strip("/"), url)
  • base_url = field(kw_only=True) class-attribute instance-attribute

  • description = field(kw_only=True) class-attribute instance-attribute

  • full_url property

  • path = field(default=None, kw_only=True) class-attribute instance-attribute

  • request_body_schema = field(default=None, kw_only=True) class-attribute instance-attribute

  • request_headers = field(default=None, kw_only=True) class-attribute instance-attribute

  • request_path_params_schema = field(default=None, kw_only=True) class-attribute instance-attribute

  • request_query_params_schema = field(default=None, kw_only=True) class-attribute instance-attribute

  • response_body_schema = field(default=None, kw_only=True) class-attribute instance-attribute

_build_url(base_url, path=None, path_params=None)

Source Code in griptape/tools/rest_api/tool.py
def _build_url(self, base_url: str, path: Optional[str] = None, path_params: Optional[list] = None) -> str:
    url = ""

    if path:
        url += path.strip("/")

    if path_params:
        url += f"/{str.join('/', map(str, path_params))}"

    return urljoin(base_url.strip("/"), url)

delete(params)

Source Code in griptape/tools/rest_api/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a delete request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            """,
        ),
        "schema": Schema(
            {
                schema.Optional(Literal("query_params", description="The request query parameters.")): dict,
                schema.Optional(Literal("path_params", description="The request path parameters.")): Schema([str]),
            },
        ),
    },
)
def delete(self, params: dict) -> BaseArtifact:
    from requests import delete, exceptions

    values = params["values"]
    base_url = self.base_url
    path = self.path

    query_params = values.get("query_params", {})
    path_params = values.get("path_params", [])
    url = self._build_url(base_url, path=path, path_params=path_params)

    try:
        response = delete(url, params=query_params, timeout=30, headers=self.request_headers)
        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

get(params)

Source Code in griptape/tools/rest_api/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a get request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
            """,
        ),
        "schema": schema.Optional(
            Schema(
                {
                    schema.Optional(Literal("query_params", description="The request query parameters.")): dict,
                    schema.Optional(Literal("path_params", description="The request path parameters.")): Schema(
                        [str]
                    ),
                },
            ),
        ),
    },
)
def get(self, params: dict) -> BaseArtifact:
    from requests import exceptions, get

    values = params["values"]
    base_url = self.base_url
    path = self.path

    query_params = {}
    path_params = []
    if values:
        query_params = values.get("query_params", {})
        path_params = values.get("path_params", [])
    url = self._build_url(base_url, path=path, path_params=path_params)

    try:
        response = get(url, params=query_params, timeout=30, headers=self.request_headers)
        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

patch(params)

Source Code in griptape/tools/rest_api/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a patch request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %}
            {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
            {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
            """,
        ),
        "schema": Schema(
            {
                Literal("path_params", description="The request path parameters."): Schema([str]),
                Literal("body", description="The request body."): dict,
            },
        ),
    },
)
def patch(self, params: dict) -> BaseArtifact:
    from requests import exceptions, patch

    values = params["values"]
    base_url = self.base_url
    path = self.path
    body = values["body"]
    path_params = values["path_params"]
    url = self._build_url(base_url, path=path, path_params=path_params)

    try:
        response = patch(url, json=body, timeout=30, headers=self.request_headers)
        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

post(params)

Source Code in griptape/tools/rest_api/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a post request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
            {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
            """,
        ),
        "schema": Schema({Literal("body", description="The request body."): dict}),
    },
)
def post(self, params: dict) -> BaseArtifact:
    from requests import exceptions, post

    values = params["values"]
    base_url = self.base_url
    path = self.path
    url = self._build_url(base_url, path=path)
    body = values["body"]

    try:
        response = post(url, json=body, timeout=30, headers=self.request_headers)
        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

put(params)

Source Code in griptape/tools/rest_api/tool.py
@activity(
    config={
        "description": dedent(
            """
            This tool can be used to make a put request to the rest api url: {{ _self.full_url }}
            This rest api has the following description: {{ _self.description }}
            {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %}
            {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %}
            """,
        ),
        "schema": Schema({Literal("body", description="The request body."): dict}),
    },
)
def put(self, params: dict) -> BaseArtifact:
    from requests import exceptions, put

    values = params["values"]
    base_url = self.base_url
    path = self.path
    body = values["body"]
    url = self._build_url(base_url, path=path)

    try:
        response = put(url, json=body, timeout=30, headers=self.request_headers)

        return TextArtifact(response.text)
    except exceptions.RequestException as err:
        return ErrorArtifact(str(err))

SqlTool

Bases: BaseTool

Source Code in griptape/tools/sql/tool.py
@define
class SqlTool(BaseTool):
    sql_loader: SqlLoader = field(kw_only=True)
    schema_name: Optional[str] = field(default=None, kw_only=True)
    table_name: str = field(kw_only=True)
    table_description: Optional[str] = field(default=None, kw_only=True)
    engine_name: Optional[str] = field(default=None, kw_only=True)

    @property
    def full_table_name(self) -> str:
        return f"{self.schema_name}.{self.table_name}" if self.schema_name else self.table_name

    @property
    def table_schema(self) -> Optional[str]:
        return self.sql_loader.sql_driver.get_table_schema(self.full_table_name, schema=self.schema_name)

    @activity(
        config={
            "description": "Can be used to execute{% if _self.engine_name %} {{ _self.engine_name }}{% endif %} SQL SELECT queries "
            "in table {{ _self.full_table_name }}. "
            "Make sure the `SELECT` statement contains enough columns to get an answer without knowing "
            "the original question. "
            "Be creative when you use `WHERE` statements: you can use wildcards, `LOWER()`, and other functions "
            "to get better results. "
            "You can use JOINs if more tables are available in other tools.\n"
            "{{ _self.table_name }} schema: {{ _self.table_schema }}\n"
            "{% if _self.table_description %}{{ _self.table_name }} description: {{ _self.table_description }}{% endif %}",
            "schema": Schema({"sql_query": str}),
        },
    )
    def execute_query(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact:
        try:
            query = params["values"]["sql_query"]
            rows = self.sql_loader.load(query)
        except Exception as e:
            return ErrorArtifact(f"error executing query: {e}")

        if len(rows) > 0:
            return rows
        return InfoArtifact("No results found")
  • engine_name = field(default=None, kw_only=True) class-attribute instance-attribute

  • full_table_name property

  • schema_name = field(default=None, kw_only=True) class-attribute instance-attribute

  • sql_loader = field(kw_only=True) class-attribute instance-attribute

  • table_description = field(default=None, kw_only=True) class-attribute instance-attribute

  • table_name = field(kw_only=True) class-attribute instance-attribute

  • table_schema property

execute_query(params)

Source Code in griptape/tools/sql/tool.py
@activity(
    config={
        "description": "Can be used to execute{% if _self.engine_name %} {{ _self.engine_name }}{% endif %} SQL SELECT queries "
        "in table {{ _self.full_table_name }}. "
        "Make sure the `SELECT` statement contains enough columns to get an answer without knowing "
        "the original question. "
        "Be creative when you use `WHERE` statements: you can use wildcards, `LOWER()`, and other functions "
        "to get better results. "
        "You can use JOINs if more tables are available in other tools.\n"
        "{{ _self.table_name }} schema: {{ _self.table_schema }}\n"
        "{% if _self.table_description %}{{ _self.table_name }} description: {{ _self.table_description }}{% endif %}",
        "schema": Schema({"sql_query": str}),
    },
)
def execute_query(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact:
    try:
        query = params["values"]["sql_query"]
        rows = self.sql_loader.load(query)
    except Exception as e:
        return ErrorArtifact(f"error executing query: {e}")

    if len(rows) > 0:
        return rows
    return InfoArtifact("No results found")

StructureRunTool

Bases: BaseTool

Attributes

NameTypeDescription
descriptionstrA description of what the Structure does.
structure_run_driverBaseStructureRunDriverDriver to run the Structure.
Source Code in griptape/tools/structure_run/tool.py
@define
class StructureRunTool(BaseTool):
    """Tool for running a Structure.

    Attributes:
        description: A description of what the Structure does.
        structure_run_driver: Driver to run the Structure.
    """

    description: str = field(kw_only=True, metadata={"serializable": True})
    structure_run_driver: BaseStructureRunDriver = field(kw_only=True, metadata={"serializable": True})

    @activity(
        config={
            "description": "Can be used to run a Gen AI Builder Structure with the following description: {{ _self.description }}",
            "schema": Schema(
                {
                    Literal("args", description="A list of string arguments to submit to the Structure Run"): Schema(
                        [str]
                    )
                },
            ),
        },
    )
    def run_structure(self, params: dict) -> BaseArtifact:
        args: list[str] = params["values"]["args"]

        return self.structure_run_driver.run(*[TextArtifact(arg) for arg in args])
  • description = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

  • structure_run_driver = field(kw_only=True, metadata={'serializable': True}) class-attribute instance-attribute

run_structure(params)

Source Code in griptape/tools/structure_run/tool.py
@activity(
    config={
        "description": "Can be used to run a Gen AI Builder Structure with the following description: {{ _self.description }}",
        "schema": Schema(
            {
                Literal("args", description="A list of string arguments to submit to the Structure Run"): Schema(
                    [str]
                )
            },
        ),
    },
)
def run_structure(self, params: dict) -> BaseArtifact:
    args: list[str] = params["values"]["args"]

    return self.structure_run_driver.run(*[TextArtifact(arg) for arg in args])

StructuredOutputTool

Bases: BaseTool

Source Code in griptape/tools/structured_output/tool.py
@define
class StructuredOutputTool(BaseTool):
    output_schema: Union[Schema, type[BaseModel]] = field(kw_only=True)

    @activity(
        config={
            "description": "Used to provide the final response which ends this conversation.",
            "schema": lambda _self: _self.output_schema,
        }
    )
    def provide_output(self, params: dict) -> BaseArtifact:
        return JsonArtifact(params["values"])
  • output_schema = field(kw_only=True) class-attribute instance-attribute

provide_output(params)

Source Code in griptape/tools/structured_output/tool.py
@activity(
    config={
        "description": "Used to provide the final response which ends this conversation.",
        "schema": lambda _self: _self.output_schema,
    }
)
def provide_output(self, params: dict) -> BaseArtifact:
    return JsonArtifact(params["values"])

TextToSpeechTool

Bases: ArtifactFileOutputMixin , BaseTool

Attributes

NameTypeDescription
text_to_speech_driverBaseTextToSpeechDriverThe text to audio generation driver used to generate the speech audio.
output_dirOptional[str]If provided, the generated audio will be written to disk in output_dir.
output_fileOptional[str]If provided, the generated audio will be written to disk as output_file.
Source Code in griptape/tools/text_to_speech/tool.py
@define
class TextToSpeechTool(ArtifactFileOutputMixin, BaseTool):
    """A tool that can be used to generate speech from input text.

    Attributes:
        text_to_speech_driver: The text to audio generation driver used to generate the speech audio.
        output_dir: If provided, the generated audio will be written to disk in output_dir.
        output_file: If provided, the generated audio will be written to disk as output_file.
    """

    text_to_speech_driver: BaseTextToSpeechDriver = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used to generate speech from the provided input text.",
            "schema": Schema({Literal("text", description="The literal text to be converted to speech."): str}),
        },
    )
    def text_to_speech(self, params: dict[str, Any]) -> AudioArtifact | ErrorArtifact:
        text = params["values"]["text"]

        output_artifact = self.text_to_speech_driver.run_text_to_audio(prompts=[text])

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact
  • text_to_speech_driver = field(kw_only=True) class-attribute instance-attribute

text_to_speech(params)

Source Code in griptape/tools/text_to_speech/tool.py
@activity(
    config={
        "description": "Can be used to generate speech from the provided input text.",
        "schema": Schema({Literal("text", description="The literal text to be converted to speech."): str}),
    },
)
def text_to_speech(self, params: dict[str, Any]) -> AudioArtifact | ErrorArtifact:
    text = params["values"]["text"]

    output_artifact = self.text_to_speech_driver.run_text_to_audio(prompts=[text])

    if self.output_dir or self.output_file:
        self._write_to_file(output_artifact)

    return output_artifact

VariationImageGenerationTool

Bases: BaseImageGenerationTool

Attributes

NameTypeDescription
image_generation_driverBaseImageGenerationDriverThe image generation driver used to generate the image.
output_dirOptional[str]If provided, the generated image will be written to disk in output_dir.
output_fileOptional[str]If provided, the generated image will be written to disk as output_file.
Source Code in griptape/tools/variation_image_generation/tool.py
@define
class VariationImageGenerationTool(BaseImageGenerationTool):
    """A tool that can be used to generate prompted variations of an image.

    Attributes:
        image_generation_driver: The image generation driver used to generate the image.
        output_dir: If provided, the generated image will be written to disk in output_dir.
        output_file: If provided, the generated image will be written to disk as output_file.
    """

    image_generation_driver: BaseImageGenerationDriver = field(kw_only=True)
    image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True)

    @activity(
        config={
            "description": "Generates a variation of a given input image file.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    Literal(
                        "image_file",
                        description="The path to an image file to be used as a base to generate variations from.",
                    ): str,
                },
            ),
        },
    )
    def image_variation_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        image_file = params["values"]["image_file"]

        image_artifact = self.image_loader.load(image_file)

        return self._generate_variation(prompt, negative_prompt, image_artifact)

    @activity(
        config={
            "description": "Generates a variation of a given input image artifact in memory.",
            "schema": Schema(
                {
                    Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                    Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                    "memory_name": str,
                    "artifact_namespace": str,
                    "artifact_name": str,
                },
            ),
        },
    )
    def image_variation_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
        prompt = params["values"]["prompt"]
        negative_prompt = params["values"]["negative_prompt"]
        artifact_namespace = params["values"]["artifact_namespace"]
        artifact_name = params["values"]["artifact_name"]
        memory = self.find_input_memory(params["values"]["memory_name"])

        if memory is None:
            return ErrorArtifact("memory not found")

        try:
            image_artifact = load_artifact_from_memory(memory, artifact_namespace, artifact_name, ImageArtifact)
        except ValueError as e:
            return ErrorArtifact(str(e))

        return self._generate_variation(prompt, negative_prompt, cast("ImageArtifact", image_artifact))

    def _generate_variation(
        self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact
    ) -> ImageArtifact | ErrorArtifact:
        output_artifact = self.image_generation_driver.run_image_variation(
            prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact
        )

        if self.output_dir or self.output_file:
            self._write_to_file(output_artifact)

        return output_artifact
  • image_generation_driver = field(kw_only=True) class-attribute instance-attribute

  • image_loader = field(default=ImageLoader(), kw_only=True) class-attribute instance-attribute

_generate_variation(prompt, negative_prompt, image_artifact)

Source Code in griptape/tools/variation_image_generation/tool.py
def _generate_variation(
    self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact
) -> ImageArtifact | ErrorArtifact:
    output_artifact = self.image_generation_driver.run_image_variation(
        prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact
    )

    if self.output_dir or self.output_file:
        self._write_to_file(output_artifact)

    return output_artifact

image_variation_from_file(params)

Source Code in griptape/tools/variation_image_generation/tool.py
@activity(
    config={
        "description": "Generates a variation of a given input image file.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                Literal(
                    "image_file",
                    description="The path to an image file to be used as a base to generate variations from.",
                ): str,
            },
        ),
    },
)
def image_variation_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    image_file = params["values"]["image_file"]

    image_artifact = self.image_loader.load(image_file)

    return self._generate_variation(prompt, negative_prompt, image_artifact)

image_variation_from_memory(params)

Source Code in griptape/tools/variation_image_generation/tool.py
@activity(
    config={
        "description": "Generates a variation of a given input image artifact in memory.",
        "schema": Schema(
            {
                Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str,
                Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str,
                "memory_name": str,
                "artifact_namespace": str,
                "artifact_name": str,
            },
        ),
    },
)
def image_variation_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact:
    prompt = params["values"]["prompt"]
    negative_prompt = params["values"]["negative_prompt"]
    artifact_namespace = params["values"]["artifact_namespace"]
    artifact_name = params["values"]["artifact_name"]
    memory = self.find_input_memory(params["values"]["memory_name"])

    if memory is None:
        return ErrorArtifact("memory not found")

    try:
        image_artifact = load_artifact_from_memory(memory, artifact_namespace, artifact_name, ImageArtifact)
    except ValueError as e:
        return ErrorArtifact(str(e))

    return self._generate_variation(prompt, negative_prompt, cast("ImageArtifact", image_artifact))

VectorStoreTool

Bases: BaseTool

Attributes

NameTypeDescription
descriptionstrLLM-friendly vector DB description.
vector_store_driverBaseVectorStoreDriverBaseVectorStoreDriver.
query_paramsdict[str, Any]Optional dictionary of vector store driver query parameters.
process_query_outputCallable[[list[Entry]], BaseArtifact]Optional lambda for processing vector store driver query output Entrys.
Source Code in griptape/tools/vector_store/tool.py
@define(kw_only=True)
class VectorStoreTool(BaseTool):
    """A tool for querying a vector database.

    Attributes:
        description: LLM-friendly vector DB description.
        vector_store_driver: `BaseVectorStoreDriver`.
        query_params: Optional dictionary of vector store driver query parameters.
        process_query_output: Optional lambda for processing vector store driver query output `Entry`s.
    """

    DEFAULT_TOP_N = 5

    description: str = field()
    vector_store_driver: BaseVectorStoreDriver = field()
    query_params: dict[str, Any] = field(factory=dict)
    process_query_output: Callable[[list[BaseVectorStoreDriver.Entry]], BaseArtifact] = field(
        default=Factory(lambda: lambda es: ListArtifact([e.to_artifact() for e in es])),
    )

    @activity(
        config={
            "description": "Can be used to search a database with the following description: {{ _self.description }}",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="A natural language search query to run against the vector database",
                    ): str,
                },
            ),
        },
    )
    def search(self, params: dict) -> BaseArtifact:
        query = params["values"]["query"]

        try:
            return self.process_query_output(self.vector_store_driver.query(query, **self.query_params))
        except Exception as e:
            return ErrorArtifact(f"error querying vector store: {e}")
  • DEFAULT_TOP_N = 5 class-attribute instance-attribute

  • description = field() class-attribute instance-attribute

  • process_query_output = field(default=Factory(lambda: lambda es: ListArtifact([e.to_artifact() for e in es]))) class-attribute instance-attribute

  • query_params = field(factory=dict) class-attribute instance-attribute

  • vector_store_driver = field() class-attribute instance-attribute

search(params)

Source Code in griptape/tools/vector_store/tool.py
@activity(
    config={
        "description": "Can be used to search a database with the following description: {{ _self.description }}",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="A natural language search query to run against the vector database",
                ): str,
            },
        ),
    },
)
def search(self, params: dict) -> BaseArtifact:
    query = params["values"]["query"]

    try:
        return self.process_query_output(self.vector_store_driver.query(query, **self.query_params))
    except Exception as e:
        return ErrorArtifact(f"error querying vector store: {e}")

WebScraperTool

Bases: BaseTool

Source Code in griptape/tools/web_scraper/tool.py
@define
class WebScraperTool(BaseTool):
    web_loader: WebLoader = field(default=Factory(lambda: WebLoader()), kw_only=True)
    text_chunker: TextChunker = field(default=Factory(lambda: TextChunker(max_tokens=400)), kw_only=True)

    @activity(
        config={
            "description": "Can be used to browse a web page and load its content",
            "schema": Schema({Literal("url", description="Valid HTTP URL"): str}),
        },
    )
    def get_content(self, params: dict) -> ListArtifact | ErrorArtifact:
        url = params["values"]["url"]

        try:
            result = self.web_loader.load(url)
            chunks = self.text_chunker.chunk(result)

            return ListArtifact(chunks)
        except Exception as e:
            return ErrorArtifact("Error getting page content: " + str(e))
  • text_chunker = field(default=Factory(lambda: TextChunker(max_tokens=400)), kw_only=True) class-attribute instance-attribute

  • web_loader = field(default=Factory(lambda: WebLoader()), kw_only=True) class-attribute instance-attribute

get_content(params)

Source Code in griptape/tools/web_scraper/tool.py
@activity(
    config={
        "description": "Can be used to browse a web page and load its content",
        "schema": Schema({Literal("url", description="Valid HTTP URL"): str}),
    },
)
def get_content(self, params: dict) -> ListArtifact | ErrorArtifact:
    url = params["values"]["url"]

    try:
        result = self.web_loader.load(url)
        chunks = self.text_chunker.chunk(result)

        return ListArtifact(chunks)
    except Exception as e:
        return ErrorArtifact("Error getting page content: " + str(e))

WebSearchTool

Bases: BaseTool

Source Code in griptape/tools/web_search/tool.py
@define
class WebSearchTool(BaseTool):
    web_search_driver: BaseWebSearchDriver = field(kw_only=True)

    @activity(
        config={
            "description": "Can be used for searching the web via the {{ _self.web_search_driver.__class__.__name__}}.",
            "schema": Schema(
                {
                    Literal(
                        "query",
                        description="Search engine request that returns a list of pages with titles, descriptions, and URLs",
                    ): str,
                },
            ),
        },
    )
    def search(self, values: dict) -> ListArtifact | ErrorArtifact:
        # `BaseWebSearchDriver.query` already has a parameter named `query`, so we need to pop it from the values
        # to avoid passing it twice.
        query = values.pop("query")

        try:
            return self.web_search_driver.search(query, **values)
        except Exception as e:
            return ErrorArtifact(f"Error searching '{query}' with {self.web_search_driver.__class__.__name__}: {e}")
  • web_search_driver = field(kw_only=True) class-attribute instance-attribute

search(values)

Source Code in griptape/tools/web_search/tool.py
@activity(
    config={
        "description": "Can be used for searching the web via the {{ _self.web_search_driver.__class__.__name__}}.",
        "schema": Schema(
            {
                Literal(
                    "query",
                    description="Search engine request that returns a list of pages with titles, descriptions, and URLs",
                ): str,
            },
        ),
    },
)
def search(self, values: dict) -> ListArtifact | ErrorArtifact:
    # `BaseWebSearchDriver.query` already has a parameter named `query`, so we need to pop it from the values
    # to avoid passing it twice.
    query = values.pop("query")

    try:
        return self.web_search_driver.search(query, **values)
    except Exception as e:
        return ErrorArtifact(f"Error searching '{query}' with {self.web_search_driver.__class__.__name__}: {e}")

Could this page be better? Report a problem or suggest an addition!