tools
__all__ = ['AudioTranscriptionTool', 'BaseImageGenerationTool', 'BaseTool', 'CalculatorTool', 'ComputerTool', 'DateTimeTool', 'EmailTool', 'ExtractionTool', 'FileManagerTool', 'GriptapeCloudToolTool', 'ImageQueryTool', 'InpaintingImageGenerationTool', 'OutpaintingImageGenerationTool', 'PromptImageGenerationTool', 'PromptSummaryTool', 'QueryTool', 'RagTool', 'RestApiTool', 'SqlTool', 'StructureRunTool', 'StructuredOutputTool', 'TextToSpeechTool', 'VariationImageGenerationTool', 'VectorStoreTool', 'WebScraperTool', 'WebSearchTool']
module-attribute
Bases:
BaseTool
Source Code in griptape/tools/audio_transcription/tool.py
@define class AudioTranscriptionTool(BaseTool): """A tool that can be used to generate transcriptions from input audio.""" audio_transcription_driver: BaseAudioTranscriptionDriver = field(kw_only=True) audio_loader: AudioLoader = field(default=Factory(lambda: AudioLoader()), kw_only=True) @activity( config={ "description": "This tool can be used to generate transcriptions of audio files on disk.", "schema": Schema({Literal("path", description="The paths to an audio file on disk."): str}), }, ) def transcribe_audio_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact: audio_path = params["values"]["path"] audio_artifact = self.audio_loader.load(audio_path) return self.audio_transcription_driver.run(audio_artifact) @activity( config={ "description": "This tool can be used to generate the transcription of an audio artifact in memory.", "schema": Schema({"schema": Schema({"memory_name": str, "artifact_namespace": str, "artifact_name": str})}), }, ) def transcribe_audio_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact: memory = self.find_input_memory(params["values"]["memory_name"]) artifact_namespace = params["values"]["artifact_namespace"] artifact_name = params["values"]["artifact_name"] if memory is None: return ErrorArtifact("memory not found") audio_artifact = cast( "AudioArtifact", load_artifact_from_memory(memory, artifact_namespace, artifact_name, AudioArtifact), ) return self.audio_transcription_driver.run(audio_artifact)
audio_loader = field(default=Factory(lambda: AudioLoader()), kw_only=True)
class-attribute instance-attributeaudio_transcription_driver = field(kw_only=True)
class-attribute instance-attribute
transcribe_audio_from_disk(params)
Source Code in griptape/tools/audio_transcription/tool.py
@activity( config={ "description": "This tool can be used to generate transcriptions of audio files on disk.", "schema": Schema({Literal("path", description="The paths to an audio file on disk."): str}), }, ) def transcribe_audio_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact: audio_path = params["values"]["path"] audio_artifact = self.audio_loader.load(audio_path) return self.audio_transcription_driver.run(audio_artifact)
transcribe_audio_from_memory(params)
Source Code in griptape/tools/audio_transcription/tool.py
@activity( config={ "description": "This tool can be used to generate the transcription of an audio artifact in memory.", "schema": Schema({"schema": Schema({"memory_name": str, "artifact_namespace": str, "artifact_name": str})}), }, ) def transcribe_audio_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact: memory = self.find_input_memory(params["values"]["memory_name"]) artifact_namespace = params["values"]["artifact_namespace"] artifact_name = params["values"]["artifact_name"] if memory is None: return ErrorArtifact("memory not found") audio_artifact = cast( "AudioArtifact", load_artifact_from_memory(memory, artifact_namespace, artifact_name, AudioArtifact), ) return self.audio_transcription_driver.run(audio_artifact)
BaseImageGenerationTool
Bases:
ArtifactFileOutputMixin
, BaseTool
Source Code in griptape/tools/base_image_generation_tool.py
@define class BaseImageGenerationTool(ArtifactFileOutputMixin, BaseTool): """A base class for tools that generate images from text prompts.""" PROMPT_DESCRIPTION = "Features and qualities to include in the generated image, descriptive and succinct." NEGATIVE_PROMPT_DESCRIPTION = ( "Features and qualities to avoid in the generated image. Affirmatively describe " "what to avoid, for example: to avoid the color red, include 'red' " "rather than 'no red'." )
NEGATIVE_PROMPT_DESCRIPTION = "Features and qualities to avoid in the generated image. Affirmatively describe what to avoid, for example: to avoid the color red, include 'red' rather than 'no red'."
class-attribute instance-attributePROMPT_DESCRIPTION = 'Features and qualities to include in the generated image, descriptive and succinct.'
class-attribute instance-attribute
BaseTool
Bases:
ActivityMixin
, SerializableMixin
, RunnableMixin['BaseTool']
, ABC
Attributes
Name | Type | Description |
---|---|---|
name | str | Tool name. |
input_memory | Optional[list[TaskMemory]] | TaskMemory available in tool activities. Gets automatically set if None. |
output_memory | Optional[dict[str, list[TaskMemory]]] | TaskMemory that activities write to be default. Gets automatically set if None. |
install_dependencies_on_init | bool | Determines whether dependencies from the tool requirements.txt file are installed in init. |
dependencies_install_directory | Optional[str] | Custom dependency install directory. |
verbose | bool | Determines whether tool operations (such as dependency installation) should be verbose. |
off_prompt | bool | Determines whether tool activity output goes to the output memory. |
Source Code in griptape/tools/base_tool.py
@define class BaseTool(ActivityMixin, SerializableMixin, RunnableMixin["BaseTool"], ABC): """Abstract class for all tools to inherit from for. Attributes: name: Tool name. input_memory: TaskMemory available in tool activities. Gets automatically set if None. output_memory: TaskMemory that activities write to be default. Gets automatically set if None. install_dependencies_on_init: Determines whether dependencies from the tool requirements.txt file are installed in init. dependencies_install_directory: Custom dependency install directory. verbose: Determines whether tool operations (such as dependency installation) should be verbose. off_prompt: Determines whether tool activity output goes to the output memory. """ REQUIREMENTS_FILE = "requirements.txt" name: str = field( default=Factory(lambda self: self.__class__.__name__, takes_self=True), kw_only=True, metadata={"serializable": True}, ) input_memory: Optional[list[TaskMemory]] = field(default=None, kw_only=True, metadata={"serializable": True}) output_memory: Optional[dict[str, list[TaskMemory]]] = field( default=None, kw_only=True, metadata={"serializable": True} ) install_dependencies_on_init: bool = field(default=True, kw_only=True, metadata={"serializable": True}) dependencies_install_directory: Optional[str] = field(default=None, kw_only=True, metadata={"serializable": True}) verbose: bool = field(default=False, kw_only=True, metadata={"serializable": True}) off_prompt: bool = field(default=False, kw_only=True, metadata={"serializable": True}) def __attrs_post_init__(self) -> None: if ( self.install_dependencies_on_init and self.has_requirements and not self.are_requirements_met(self.requirements_path) ): self.install_dependencies(os.environ.copy()) @output_memory.validator # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] def validate_output_memory(self, _: Attribute, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None: if output_memory: for activity_name, memory_list in output_memory.items(): if not self.find_activity(activity_name): raise ValueError(f"activity {activity_name} doesn't exist") if memory_list is None: raise ValueError(f"memory list for activity '{activity_name}' can't be None") output_memory_names = [memory.name for memory in memory_list] if len(output_memory_names) > len(set(output_memory_names)): raise ValueError(f"memory names have to be unique in activity '{activity_name}' output") @property def requirements_path(self) -> str: return os.path.join(self.abs_dir_path, self.REQUIREMENTS_FILE) @property def abs_file_path(self) -> str: return os.path.abspath(inspect.getfile(self.__class__)) @property def abs_dir_path(self) -> str: return os.path.dirname(self.abs_file_path) @property def has_requirements(self) -> bool: return os.path.exists(self.requirements_path) # This method has to remain a method and can't be decorated with @property because # of the max depth recursion issue in `self.activities`. def schema(self) -> dict: full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.") return full_schema.json_schema(f"{self.name} ToolAction Schema") def activity_schemas(self) -> list[Schema]: schemas = [] for activity in self.activities(): schema_dict: dict[Literal | schema.Optional, Any] = { Literal("name"): self.name, Literal("path", description=self.activity_description(activity)): self.activity_name(activity), } activity_schema = self.activity_schema(activity) # If no schema is defined, we just make `input` optional instead of omitting it. # This works better with lower-end models that may accidentally pass in an empty dict. if activity_schema is None: schema_dict[schema.Optional("input")] = {} else: schema_dict[Literal("input")] = activity_schema.schema schemas.append(Schema(schema_dict)) return schemas def run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> BaseArtifact: try: output = self.before_run(activity, subtask, action) output = self.try_run(activity, subtask, action, output) output = self.after_run(activity, subtask, action, output) except Exception as e: logging.debug(traceback.format_exc()) output = ErrorArtifact(str(e), exception=e) return output def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> Optional[dict]: super().before_run() return action.input @observable(tags=["Tool.run()"]) def try_run( self, activity: Callable, subtask: ActionsSubtask, action: ToolAction, value: Optional[dict], ) -> BaseArtifact: activity_result = activity(deepcopy(value)) if isinstance(activity_result, BaseArtifact): result = activity_result else: logging.warning("Activity result is not an artifact; converting result to InfoArtifact") if activity_result is None: result = InfoArtifact("Tool returned an empty value") else: result = InfoArtifact(activity_result) return result def after_run( self, activity: Callable, subtask: ActionsSubtask, action: ToolAction, value: BaseArtifact, ) -> BaseArtifact: super().after_run() if self.output_memory: output_memories = self.output_memory[getattr(activity, "name")] or [] for memory in output_memories: value = memory.process_output(activity, subtask, value) return value return value def validate(self) -> bool: if not os.path.exists(self.requirements_path): raise Exception(f"{self.REQUIREMENTS_FILE} not found") return True def tool_dir(self) -> str: class_file = inspect.getfile(self.__class__) return os.path.dirname(os.path.abspath(class_file)) def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None: env = env or {} command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"] if self.dependencies_install_directory is None: command.extend(["-U"]) else: command.extend(["-t", self.dependencies_install_directory]) subprocess.run( command, env=env, cwd=self.tool_dir(), stdout=None if self.verbose else subprocess.DEVNULL, stderr=None if self.verbose else subprocess.DEVNULL, check=False, ) def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]: if self.input_memory: return next((m for m in self.input_memory if m.name == memory_name), None) return None def to_native_tool_name(self, activity: Callable) -> str: """Converts a Tool's name and an Activity into to a native tool name. The native tool name is a combination of the Tool's name and the Activity's name. The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores. Args: activity: Activity to convert Returns: str: Native tool name. """ tool_name = self.name if re.match(r"^[a-zA-Z0-9]+$", tool_name) is None: raise ValueError("Tool name can only contain letters and numbers.") activity_name = self.activity_name(activity) if re.match(r"^[a-zA-Z0-9_]+$", activity_name) is None: raise ValueError("Activity name can only contain letters, numbers, and underscores.") return f"{tool_name}_{activity_name}" def are_requirements_met(self, requirements_path: str) -> bool: requirements = Path(requirements_path).read_text().splitlines() try: for requirement in requirements: importlib.metadata.version(requirement) return True except importlib.metadata.PackageNotFoundError: return False
REQUIREMENTS_FILE = 'requirements.txt'
class-attribute instance-attributeabs_dir_path
propertyabs_file_path
propertydependencies_install_directory = field(default=None, kw_only=True, metadata={'serializable': True})
class-attribute instance-attributehas_requirements
propertyinput_memory = field(default=None, kw_only=True, metadata={'serializable': True})
class-attribute instance-attributeinstall_dependencies_on_init = field(default=True, kw_only=True, metadata={'serializable': True})
class-attribute instance-attributename = field(default=Factory(lambda self: self.__class__.__name__, takes_self=True), kw_only=True, metadata={'serializable': True})
class-attribute instance-attributeoff_prompt = field(default=False, kw_only=True, metadata={'serializable': True})
class-attribute instance-attributeoutput_memory = field(default=None, kw_only=True, metadata={'serializable': True})
class-attribute instance-attributerequirements_path
propertyverbose = field(default=False, kw_only=True, metadata={'serializable': True})
class-attribute instance-attribute
attrs_post_init()
Source Code in griptape/tools/base_tool.py
def __attrs_post_init__(self) -> None: if ( self.install_dependencies_on_init and self.has_requirements and not self.are_requirements_met(self.requirements_path) ): self.install_dependencies(os.environ.copy())
activity_schemas()
Source Code in griptape/tools/base_tool.py
def activity_schemas(self) -> list[Schema]: schemas = [] for activity in self.activities(): schema_dict: dict[Literal | schema.Optional, Any] = { Literal("name"): self.name, Literal("path", description=self.activity_description(activity)): self.activity_name(activity), } activity_schema = self.activity_schema(activity) # If no schema is defined, we just make `input` optional instead of omitting it. # This works better with lower-end models that may accidentally pass in an empty dict. if activity_schema is None: schema_dict[schema.Optional("input")] = {} else: schema_dict[Literal("input")] = activity_schema.schema schemas.append(Schema(schema_dict)) return schemas
after_run(activity, subtask, action, value)
Source Code in griptape/tools/base_tool.py
def after_run( self, activity: Callable, subtask: ActionsSubtask, action: ToolAction, value: BaseArtifact, ) -> BaseArtifact: super().after_run() if self.output_memory: output_memories = self.output_memory[getattr(activity, "name")] or [] for memory in output_memories: value = memory.process_output(activity, subtask, value) return value return value
are_requirements_met(requirements_path)
Source Code in griptape/tools/base_tool.py
def are_requirements_met(self, requirements_path: str) -> bool: requirements = Path(requirements_path).read_text().splitlines() try: for requirement in requirements: importlib.metadata.version(requirement) return True except importlib.metadata.PackageNotFoundError: return False
before_run(activity, subtask, action)
Source Code in griptape/tools/base_tool.py
def before_run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> Optional[dict]: super().before_run() return action.input
find_input_memory(memory_name)
Source Code in griptape/tools/base_tool.py
def find_input_memory(self, memory_name: str) -> Optional[TaskMemory]: if self.input_memory: return next((m for m in self.input_memory if m.name == memory_name), None) return None
install_dependencies(env=None)
Source Code in griptape/tools/base_tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None: env = env or {} command = [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"] if self.dependencies_install_directory is None: command.extend(["-U"]) else: command.extend(["-t", self.dependencies_install_directory]) subprocess.run( command, env=env, cwd=self.tool_dir(), stdout=None if self.verbose else subprocess.DEVNULL, stderr=None if self.verbose else subprocess.DEVNULL, check=False, )
run(activity, subtask, action)
Source Code in griptape/tools/base_tool.py
def run(self, activity: Callable, subtask: ActionsSubtask, action: ToolAction) -> BaseArtifact: try: output = self.before_run(activity, subtask, action) output = self.try_run(activity, subtask, action, output) output = self.after_run(activity, subtask, action, output) except Exception as e: logging.debug(traceback.format_exc()) output = ErrorArtifact(str(e), exception=e) return output
schema()
Source Code in griptape/tools/base_tool.py
def schema(self) -> dict: full_schema = Schema(Or(*self.activity_schemas()), description=f"{self.name} action schema.") return full_schema.json_schema(f"{self.name} ToolAction Schema")
to_native_tool_name(activity)
Converts a Tool's name and an Activity into to a native tool name.
The native tool name is a combination of the Tool's name and the Activity's name. The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores.
Parameters
Name | Type | Description | Default |
---|---|---|---|
activity | Callable | Activity to convert | required |
Returns
Name | Type | Description |
---|---|---|
str | str | Native tool name. |
Source Code in griptape/tools/base_tool.py
def to_native_tool_name(self, activity: Callable) -> str: """Converts a Tool's name and an Activity into to a native tool name. The native tool name is a combination of the Tool's name and the Activity's name. The Tool's name may only contain letters and numbers, and the Activity's name may only contain letters, numbers, and underscores. Args: activity: Activity to convert Returns: str: Native tool name. """ tool_name = self.name if re.match(r"^[a-zA-Z0-9]+$", tool_name) is None: raise ValueError("Tool name can only contain letters and numbers.") activity_name = self.activity_name(activity) if re.match(r"^[a-zA-Z0-9_]+$", activity_name) is None: raise ValueError("Activity name can only contain letters, numbers, and underscores.") return f"{tool_name}_{activity_name}"
tool_dir()
Source Code in griptape/tools/base_tool.py
def tool_dir(self) -> str: class_file = inspect.getfile(self.__class__) return os.path.dirname(os.path.abspath(class_file))
try_run(activity, subtask, action, value)
Source Code in griptape/tools/base_tool.py
@observable(tags=["Tool.run()"]) def try_run( self, activity: Callable, subtask: ActionsSubtask, action: ToolAction, value: Optional[dict], ) -> BaseArtifact: activity_result = activity(deepcopy(value)) if isinstance(activity_result, BaseArtifact): result = activity_result else: logging.warning("Activity result is not an artifact; converting result to InfoArtifact") if activity_result is None: result = InfoArtifact("Tool returned an empty value") else: result = InfoArtifact(activity_result) return result
validate()
Source Code in griptape/tools/base_tool.py
def validate(self) -> bool: if not os.path.exists(self.requirements_path): raise Exception(f"{self.REQUIREMENTS_FILE} not found") return True
validateoutput_memory(, output_memory)
Source Code in griptape/tools/base_tool.py
@output_memory.validator # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] def validate_output_memory(self, _: Attribute, output_memory: dict[str, Optional[list[TaskMemory]]]) -> None: if output_memory: for activity_name, memory_list in output_memory.items(): if not self.find_activity(activity_name): raise ValueError(f"activity {activity_name} doesn't exist") if memory_list is None: raise ValueError(f"memory list for activity '{activity_name}' can't be None") output_memory_names = [memory.name for memory in memory_list] if len(output_memory_names) > len(set(output_memory_names)): raise ValueError(f"memory names have to be unique in activity '{activity_name}' output")
CalculatorTool
Bases:
BaseTool
Source Code in griptape/tools/calculator/tool.py
class CalculatorTool(BaseTool): @activity( config={ "description": "Can be used for computing simple numerical or algebraic calculations in Python", "schema": Schema( { Literal( "expression", description="Arithmetic expression parsable in pure Python. Single line only. " "Don't use variables. Don't use any imports or external libraries", ): str, }, ), }, ) def calculate(self, params: dict) -> BaseArtifact: import numexpr # pyright: ignore[reportMissingImports] try: expression = params["values"]["expression"] return TextArtifact(numexpr.evaluate(expression)) except Exception as e: return ErrorArtifact(f"error calculating: {e}")
calculate(params)
Source Code in griptape/tools/calculator/tool.py
@activity( config={ "description": "Can be used for computing simple numerical or algebraic calculations in Python", "schema": Schema( { Literal( "expression", description="Arithmetic expression parsable in pure Python. Single line only. " "Don't use variables. Don't use any imports or external libraries", ): str, }, ), }, ) def calculate(self, params: dict) -> BaseArtifact: import numexpr # pyright: ignore[reportMissingImports] try: expression = params["values"]["expression"] return TextArtifact(numexpr.evaluate(expression)) except Exception as e: return ErrorArtifact(f"error calculating: {e}")
ComputerTool
Bases:
BaseTool
Source Code in griptape/tools/computer/tool.py
@define class ComputerTool(BaseTool): local_workdir: Optional[str] = field(default=None, kw_only=True) container_workdir: str = field(default="/griptape", kw_only=True) env_vars: dict = field(factory=dict, kw_only=True) dockerfile_path: str = field( default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/Dockerfile')}", takes_self=True), kw_only=True, ) requirements_txt_path: str = field( default=Factory(lambda self: f"{os.path.join(self.tool_dir(), 'resources/requirements.txt')}", takes_self=True), kw_only=True, ) docker_client: DockerClient = field( default=Factory(lambda self: self.default_docker_client(), takes_self=True), kw_only=True, ) _tempdir: Optional[tempfile.TemporaryDirectory] = field(default=None, kw_only=True) def __attrs_post_init__(self) -> None: super().__attrs_post_init__() if self.local_workdir: Path(self.local_workdir).mkdir(parents=True, exist_ok=True) else: self._tempdir = tempfile.TemporaryDirectory() self.local_workdir = self._tempdir.name @docker_client.validator # pyright: ignore[reportAttributeAccessIssue] def validate_docker_client(self, _: Attribute, docker_client: DockerClient) -> None: if not docker_client: raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running") def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None: super().install_dependencies(env) self.remove_existing_container(self.container_name(self)) self.build_image(self) @activity( config={ "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze" " files in the file system. If you need to use code output use `print` statements. " "You have access to the following external Python libraries: " "{{ _self.dependencies() }}", "schema": Schema( { Literal("code", description="Python code to execute"): str, Literal( "filename", description="name of the file to put the Python code in before executing it", ): str, }, ), }, ) def execute_code(self, params: dict) -> BaseArtifact: code = params["values"]["code"] filename = params["values"]["filename"] return self.execute_code_in_container(filename, code) @activity( config={ "description": "Can be used to execute shell commands in Linux", "schema": Schema({Literal("command", description="shell command to execute"): str}), }, ) def execute_command(self, params: dict) -> BaseArtifact: command = params["values"]["command"] return self.execute_command_in_container(command) def execute_command_in_container(self, command: str) -> BaseArtifact: from docker.models.containers import Container try: binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}} if self.local_workdir else None container = self.docker_client.containers.run( # pyright: ignore[reportCallIssue] self.image_name(self), environment=self.env_vars, command=command, name=self.container_name(self), volumes=binds, # pyright: ignore[reportArgumentType] According to the [docs](https://docker-py.readthedocs.io/en/stable/containers.html), the type of `volumes` is dict[str, dict[str, str]]. stdout=True, stderr=True, detach=True, ) if isinstance(container, Container): container.wait() stderr = container.logs(stdout=False, stderr=True).decode().strip() stdout = container.logs(stdout=True, stderr=False).decode().strip() container.stop() container.remove() if stderr: return ErrorArtifact(stderr) return TextArtifact(stdout) return ErrorArtifact("error running container") except Exception as e: return ErrorArtifact(f"error executing command: {e}") def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact: container_file_path = os.path.join(self.container_workdir, filename) if self.local_workdir: tempdir = None local_workdir = self.local_workdir else: tempdir = tempfile.TemporaryDirectory() local_workdir = tempdir.name local_file_path = os.path.join(local_workdir, filename) try: Path(local_file_path).write_text(code) return self.execute_command_in_container(f"python {container_file_path}") except Exception as e: return ErrorArtifact(f"error executing code: {e}") finally: if tempdir: tempdir.cleanup() def default_docker_client(self) -> Optional[DockerClient]: import docker try: return docker.from_env() except Exception as e: logging.exception(e) return None def image_name(self, tool: BaseTool) -> str: import stringcase # pyright: ignore[reportMissingImports] return f"{stringcase.snakecase(tool.name)}_image" def container_name(self, tool: BaseTool) -> str: import stringcase # pyright: ignore[reportMissingImports] return f"{stringcase.snakecase(tool.name)}_container" def remove_existing_container(self, name: str) -> None: from docker.errors import NotFound from docker.models.containers import Container try: existing_container = self.docker_client.containers.get(name) if isinstance(existing_container, Container): existing_container.remove(force=True) logging.info("Removed existing container %s", name) except NotFound: pass def build_image(self, tool: BaseTool) -> None: with tempfile.TemporaryDirectory() as temp_dir: shutil.copy(self.dockerfile_path, temp_dir) shutil.copy(self.requirements_txt_path, temp_dir) image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True) if isinstance(image, tuple): logging.info("Built image: %s", image[0].short_id) def dependencies(self) -> list[str]: with open(self.requirements_txt_path) as file: return [line.strip() for line in file] def __del__(self) -> None: if self._tempdir: self._tempdir.cleanup()
_tempdir = field(default=None, kw_only=True)
class-attribute instance-attributecontainer_workdir = field(default='/griptape', kw_only=True)
class-attribute instance-attributedocker_client = field(default=Factory(lambda self: self.default_docker_client(), takes_self=True), kw_only=True)
class-attribute instance-attributedockerfile_path = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/Dockerfile')}', takes_self=True), kw_only=True)
class-attribute instance-attributeenv_vars = field(factory=dict, kw_only=True)
class-attribute instance-attributelocal_workdir = field(default=None, kw_only=True)
class-attribute instance-attributerequirements_txt_path = field(default=Factory(lambda self: f'{os.path.join(self.tool_dir(), 'resources/requirements.txt')}', takes_self=True), kw_only=True)
class-attribute instance-attribute
attrs_post_init()
Source Code in griptape/tools/computer/tool.py
def __attrs_post_init__(self) -> None: super().__attrs_post_init__() if self.local_workdir: Path(self.local_workdir).mkdir(parents=True, exist_ok=True) else: self._tempdir = tempfile.TemporaryDirectory() self.local_workdir = self._tempdir.name
del()
Source Code in griptape/tools/computer/tool.py
def __del__(self) -> None: if self._tempdir: self._tempdir.cleanup()
build_image(tool)
Source Code in griptape/tools/computer/tool.py
def build_image(self, tool: BaseTool) -> None: with tempfile.TemporaryDirectory() as temp_dir: shutil.copy(self.dockerfile_path, temp_dir) shutil.copy(self.requirements_txt_path, temp_dir) image = self.docker_client.images.build(path=temp_dir, tag=self.image_name(tool), rm=True, forcerm=True) if isinstance(image, tuple): logging.info("Built image: %s", image[0].short_id)
container_name(tool)
Source Code in griptape/tools/computer/tool.py
def container_name(self, tool: BaseTool) -> str: import stringcase # pyright: ignore[reportMissingImports] return f"{stringcase.snakecase(tool.name)}_container"
default_docker_client()
Source Code in griptape/tools/computer/tool.py
def default_docker_client(self) -> Optional[DockerClient]: import docker try: return docker.from_env() except Exception as e: logging.exception(e) return None
dependencies()
Source Code in griptape/tools/computer/tool.py
def dependencies(self) -> list[str]: with open(self.requirements_txt_path) as file: return [line.strip() for line in file]
execute_code(params)
Source Code in griptape/tools/computer/tool.py
@activity( config={ "description": "Can be used to execute Python code to solve any programmatic tasks and access and analyze" " files in the file system. If you need to use code output use `print` statements. " "You have access to the following external Python libraries: " "{{ _self.dependencies() }}", "schema": Schema( { Literal("code", description="Python code to execute"): str, Literal( "filename", description="name of the file to put the Python code in before executing it", ): str, }, ), }, ) def execute_code(self, params: dict) -> BaseArtifact: code = params["values"]["code"] filename = params["values"]["filename"] return self.execute_code_in_container(filename, code)
execute_code_in_container(filename, code)
Source Code in griptape/tools/computer/tool.py
def execute_code_in_container(self, filename: str, code: str) -> BaseArtifact: container_file_path = os.path.join(self.container_workdir, filename) if self.local_workdir: tempdir = None local_workdir = self.local_workdir else: tempdir = tempfile.TemporaryDirectory() local_workdir = tempdir.name local_file_path = os.path.join(local_workdir, filename) try: Path(local_file_path).write_text(code) return self.execute_command_in_container(f"python {container_file_path}") except Exception as e: return ErrorArtifact(f"error executing code: {e}") finally: if tempdir: tempdir.cleanup()
execute_command(params)
Source Code in griptape/tools/computer/tool.py
@activity( config={ "description": "Can be used to execute shell commands in Linux", "schema": Schema({Literal("command", description="shell command to execute"): str}), }, ) def execute_command(self, params: dict) -> BaseArtifact: command = params["values"]["command"] return self.execute_command_in_container(command)
execute_command_in_container(command)
Source Code in griptape/tools/computer/tool.py
def execute_command_in_container(self, command: str) -> BaseArtifact: from docker.models.containers import Container try: binds = {self.local_workdir: {"bind": self.container_workdir, "mode": "rw"}} if self.local_workdir else None container = self.docker_client.containers.run( # pyright: ignore[reportCallIssue] self.image_name(self), environment=self.env_vars, command=command, name=self.container_name(self), volumes=binds, # pyright: ignore[reportArgumentType] According to the [docs](https://docker-py.readthedocs.io/en/stable/containers.html), the type of `volumes` is dict[str, dict[str, str]]. stdout=True, stderr=True, detach=True, ) if isinstance(container, Container): container.wait() stderr = container.logs(stdout=False, stderr=True).decode().strip() stdout = container.logs(stdout=True, stderr=False).decode().strip() container.stop() container.remove() if stderr: return ErrorArtifact(stderr) return TextArtifact(stdout) return ErrorArtifact("error running container") except Exception as e: return ErrorArtifact(f"error executing command: {e}")
image_name(tool)
Source Code in griptape/tools/computer/tool.py
def image_name(self, tool: BaseTool) -> str: import stringcase # pyright: ignore[reportMissingImports] return f"{stringcase.snakecase(tool.name)}_image"
install_dependencies(env=None)
Source Code in griptape/tools/computer/tool.py
def install_dependencies(self, env: Optional[dict[str, str]] = None) -> None: super().install_dependencies(env) self.remove_existing_container(self.container_name(self)) self.build_image(self)
remove_existing_container(name)
Source Code in griptape/tools/computer/tool.py
def remove_existing_container(self, name: str) -> None: from docker.errors import NotFound from docker.models.containers import Container try: existing_container = self.docker_client.containers.get(name) if isinstance(existing_container, Container): existing_container.remove(force=True) logging.info("Removed existing container %s", name) except NotFound: pass
validatedocker_client(, docker_client)
Source Code in griptape/tools/computer/tool.py
@docker_client.validator # pyright: ignore[reportAttributeAccessIssue] def validate_docker_client(self, _: Attribute, docker_client: DockerClient) -> None: if not docker_client: raise ValueError("Docker client can't be initialized: make sure the Docker daemon is running")
DateTimeTool
Bases:
BaseTool
Source Code in griptape/tools/date_time/tool.py
@define class DateTimeTool(BaseTool): denylist: list[str] = field(default=Factory(lambda: ["get_relative_datetime"]), kw_only=True) @activity(config={"description": "Can be used to return current date and time."}) def get_current_datetime(self) -> BaseArtifact: return TextArtifact(str(datetime.now())) @activity( config={ "description": "Can be used to return a relative date and time.", "schema": schema.Schema( { schema.Literal( "relative_date_string", description='Relative date in English compatible with the dateparser library. For example, "now EST", "20 minutes ago", ' '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"', ): str, }, ), }, ) def get_relative_datetime(self, params: dict) -> BaseArtifact: from dateparser import parse try: date_string = params["values"]["relative_date_string"] relative_datetime = parse(date_string) if relative_datetime: return TextArtifact(str(relative_datetime)) return ErrorArtifact("invalid date string") except Exception as e: return ErrorArtifact(f"error getting current datetime: {e}") @activity( config={ "description": "Can be used to add a timedelta to a datetime.", "schema": schema.Schema( { schema.Literal( "timedelta_kwargs", description='A dictionary of keyword arguments to pass to the timedelta function. For example, {"days": -1, "hours": 2}', ): dict, schema.Optional( schema.Literal( "iso_datetime", description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00". Defaults to the current datetime if not provided.', ) ): str, }, ), }, ) def add_timedelta(self, timedelta_kwargs: dict, iso_datetime: Optional[str] = None) -> BaseArtifact: if iso_datetime is None: iso_datetime = datetime.now().isoformat() return TextArtifact((datetime.fromisoformat(iso_datetime) + timedelta(**timedelta_kwargs)).isoformat()) @activity( config={ "description": "Can be used to calculate the difference between two datetimes. The difference is calculated as end_datetime - start_datetime.", "schema": schema.Schema( { schema.Literal( "start_datetime", description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00"', ): str, schema.Literal( "end_datetime", description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-02T00:00:00"', ): str, } ), } ) def get_datetime_diff(self, start_datetime: str, end_datetime: str) -> BaseArtifact: return TextArtifact(str(datetime.fromisoformat(end_datetime) - datetime.fromisoformat(start_datetime)))
denylist = field(default=Factory(lambda: ['get_relative_datetime']), kw_only=True)
class-attribute instance-attribute
add_timedelta(timedelta_kwargs, iso_datetime=None)
Source Code in griptape/tools/date_time/tool.py
@activity( config={ "description": "Can be used to add a timedelta to a datetime.", "schema": schema.Schema( { schema.Literal( "timedelta_kwargs", description='A dictionary of keyword arguments to pass to the timedelta function. For example, {"days": -1, "hours": 2}', ): dict, schema.Optional( schema.Literal( "iso_datetime", description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00". Defaults to the current datetime if not provided.', ) ): str, }, ), }, ) def add_timedelta(self, timedelta_kwargs: dict, iso_datetime: Optional[str] = None) -> BaseArtifact: if iso_datetime is None: iso_datetime = datetime.now().isoformat() return TextArtifact((datetime.fromisoformat(iso_datetime) + timedelta(**timedelta_kwargs)).isoformat())
get_current_datetime()
Source Code in griptape/tools/date_time/tool.py
@activity(config={"description": "Can be used to return current date and time."}) def get_current_datetime(self) -> BaseArtifact: return TextArtifact(str(datetime.now()))
get_datetime_diff(start_datetime, end_datetime)
Source Code in griptape/tools/date_time/tool.py
@activity( config={ "description": "Can be used to calculate the difference between two datetimes. The difference is calculated as end_datetime - start_datetime.", "schema": schema.Schema( { schema.Literal( "start_datetime", description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-01T00:00:00"', ): str, schema.Literal( "end_datetime", description='Datetime represented as a string in ISO 8601 format. For example, "2021-01-02T00:00:00"', ): str, } ), } ) def get_datetime_diff(self, start_datetime: str, end_datetime: str) -> BaseArtifact: return TextArtifact(str(datetime.fromisoformat(end_datetime) - datetime.fromisoformat(start_datetime)))
get_relative_datetime(params)
Source Code in griptape/tools/date_time/tool.py
@activity( config={ "description": "Can be used to return a relative date and time.", "schema": schema.Schema( { schema.Literal( "relative_date_string", description='Relative date in English compatible with the dateparser library. For example, "now EST", "20 minutes ago", ' '"in 2 days", "3 months, 1 week and 1 day ago", or "yesterday at 2pm"', ): str, }, ), }, ) def get_relative_datetime(self, params: dict) -> BaseArtifact: from dateparser import parse try: date_string = params["values"]["relative_date_string"] relative_datetime = parse(date_string) if relative_datetime: return TextArtifact(str(relative_datetime)) return ErrorArtifact("invalid date string") except Exception as e: return ErrorArtifact(f"error getting current datetime: {e}")
EmailTool
Bases:
BaseTool
Attributes
Name | Type | Description |
---|---|---|
username | Optional[str] | Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol. Example: bender@futurama.com. |
password | Optional[str] | Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail, this would be an App Password. |
email_max_retrieve_count | Optional[int] | Used to limit the number of messages retrieved during any given activities. |
smtp_host | Optional[str] | Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the send activity. |
smtp_port | Optional[int] | Port of the SMTP server. Example: 465. Required when using the send activity. |
smtp_use_ssl | bool | Whether to use SSL when sending email via the SMTP protocol. |
smtp_user | Optional[str] | Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the send activity. |
smtp_password | Optional[str] | Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the send activity. |
imap_url | Optional[str] | Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the retrieve activity. |
imap_user | Optional[str] | Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the retrieve activity. |
imap_password | Optional[str] | Password to retrieve email via the IMAP protocol. Overrides password for IMAP only. Required when using the retrieve activity. |
mailboxes | Optional[dict[str, Optional[str]]] | Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the retrieve activity. Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'} |
email_loader | EmailLoader | Instance of EmailLoader . |
Source Code in griptape/tools/email/tool.py
@define class EmailTool(BaseTool): """Tool for working with email. Attributes: username: Username/email address used to send email via the SMTP protocol and retrieve email via the IMAP protocol. Example: bender@futurama.com. password: Password used to send email via the SMTP protocol and retrieve email via the IMAP protocol. If using gmail, this would be an App Password. email_max_retrieve_count: Used to limit the number of messages retrieved during any given activities. smtp_host: Hostname or url of the SMTP server. Example: smtp.gmail.com. Required when using the `send` activity. smtp_port: Port of the SMTP server. Example: 465. Required when using the `send` activity. smtp_use_ssl: Whether to use SSL when sending email via the SMTP protocol. smtp_user: Username/email address used to send email via the SMTP protocol. Overrides username for SMTP only. Required when using the `send` activity. smtp_password: Password to send email via the SMTP protocol. Overrides password for SMTP only. Required when using the `send` activity. imap_url: Hostname or url of the IMAP server. Example: imap.gmail.com. Required when using the `retrieve` activity. imap_user: Username/email address used to retrieve email via the IMAP protocol. Overrides username for IMAP only. Required when using the `retrieve` activity. imap_password: Password to retrieve email via the IMAP protocol. Overrides password for IMAP only. Required when using the `retrieve` activity. mailboxes: Descriptions of mailboxes available for retrieving email via the IMAP protocol. Required when using the `retrieve` activity. Example: {'INBOX': 'default mailbox for incoming email', 'SENT': 'default mailbox for sent email'} email_loader: Instance of `EmailLoader`. """ username: Optional[str] = field(default=None, kw_only=True) password: Optional[str] = field(default=None, kw_only=True) email_max_retrieve_count: Optional[int] = field(default=None, kw_only=True) smtp_host: Optional[str] = field(default=None, kw_only=True) smtp_port: Optional[int] = field(default=None, kw_only=True) smtp_use_ssl: bool = field(default=True, kw_only=True) smtp_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) smtp_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) imap_url: Optional[str] = field(default=None, kw_only=True) imap_user: Optional[str] = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True) imap_password: Optional[str] = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True) mailboxes: Optional[dict[str, Optional[str]]] = field(default=None, kw_only=True) email_loader: EmailLoader = field( default=Factory( lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password), takes_self=True, ), kw_only=True, ) @activity( config={ "description": "Can be used to retrieve emails." "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}", "schema": Schema( { Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str, schema.Optional( Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'"), ): str, schema.Optional( Literal("search_criteria", description="Optional search criteria to filter emails by key"), ): str, schema.Optional(Literal("max_count", description="Optional max email count")): int, }, ), }, ) def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact: if self.mailboxes is None: return ErrorArtifact("mailboxes is required") values = params["values"] max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count return self.email_loader.load( EmailLoader.EmailQuery( label=values["label"], key=values.get("key"), search_criteria=values.get("search_criteria"), max_count=max_count, ), ) @activity( config={ "description": "Can be used to send emails", "schema": Schema( { Literal("to", description="Recipient's email address"): str, Literal("subject", description="Email subject"): str, Literal("body", description="Email body"): str, }, ), }, ) def send(self, params: dict) -> InfoArtifact | ErrorArtifact: values = params["values"] if self.smtp_user is None: return ErrorArtifact("smtp_user is required") if self.smtp_password is None: return ErrorArtifact("smtp_password is required") if self.smtp_host is None: return ErrorArtifact("smtp_host is required") if self.smtp_port is None: return ErrorArtifact("smtp_port is required") msg = MIMEText(values["body"]) msg["Subject"] = values["subject"] msg["From"] = self.smtp_user msg["To"] = values["to"] try: with self._create_smtp_client(self.smtp_host, self.smtp_port) as client: client.login(self.smtp_user, self.smtp_password) client.sendmail(msg["From"], [msg["To"]], msg.as_string()) return InfoArtifact("email was successfully sent") except Exception as e: logging.exception(e) return ErrorArtifact(f"error sending email: {e}") def _create_smtp_client(self, smtp_host: str, smtp_port: int) -> smtplib.SMTP | smtplib.SMTP_SSL: if self.smtp_use_ssl: return smtplib.SMTP_SSL(smtp_host, smtp_port) return smtplib.SMTP(smtp_host, smtp_port)
email_loader = field(default=Factory(lambda self: EmailLoader(imap_url=self.imap_url, username=self.imap_user, password=self.imap_password), takes_self=True), kw_only=True)
class-attribute instance-attributeemail_max_retrieve_count = field(default=None, kw_only=True)
class-attribute instance-attributeimap_password = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
class-attribute instance-attributeimap_url = field(default=None, kw_only=True)
class-attribute instance-attributeimap_user = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
class-attribute instance-attributemailboxes = field(default=None, kw_only=True)
class-attribute instance-attributepassword = field(default=None, kw_only=True)
class-attribute instance-attributesmtp_host = field(default=None, kw_only=True)
class-attribute instance-attributesmtp_password = field(default=Factory(lambda self: self.password, takes_self=True), kw_only=True)
class-attribute instance-attributesmtp_port = field(default=None, kw_only=True)
class-attribute instance-attributesmtp_use_ssl = field(default=True, kw_only=True)
class-attribute instance-attributesmtp_user = field(default=Factory(lambda self: self.username, takes_self=True), kw_only=True)
class-attribute instance-attributeusername = field(default=None, kw_only=True)
class-attribute instance-attribute
_create_smtp_client(smtp_host, smtp_port)
Source Code in griptape/tools/email/tool.py
def _create_smtp_client(self, smtp_host: str, smtp_port: int) -> smtplib.SMTP | smtplib.SMTP_SSL: if self.smtp_use_ssl: return smtplib.SMTP_SSL(smtp_host, smtp_port) return smtplib.SMTP(smtp_host, smtp_port)
retrieve(params)
Source Code in griptape/tools/email/tool.py
@activity( config={ "description": "Can be used to retrieve emails." "{% if _self.mailboxes %} Available mailboxes: {{ _self.mailboxes }}{% endif %}", "schema": Schema( { Literal("label", description="Label to retrieve emails from such as 'INBOX' or 'SENT'"): str, schema.Optional( Literal("key", description="Optional key for filtering such as 'FROM' or 'SUBJECT'"), ): str, schema.Optional( Literal("search_criteria", description="Optional search criteria to filter emails by key"), ): str, schema.Optional(Literal("max_count", description="Optional max email count")): int, }, ), }, ) def retrieve(self, params: dict) -> ListArtifact | ErrorArtifact: if self.mailboxes is None: return ErrorArtifact("mailboxes is required") values = params["values"] max_count = int(values["max_count"]) if values.get("max_count") is not None else self.email_max_retrieve_count return self.email_loader.load( EmailLoader.EmailQuery( label=values["label"], key=values.get("key"), search_criteria=values.get("search_criteria"), max_count=max_count, ), )
send(params)
Source Code in griptape/tools/email/tool.py
@activity( config={ "description": "Can be used to send emails", "schema": Schema( { Literal("to", description="Recipient's email address"): str, Literal("subject", description="Email subject"): str, Literal("body", description="Email body"): str, }, ), }, ) def send(self, params: dict) -> InfoArtifact | ErrorArtifact: values = params["values"] if self.smtp_user is None: return ErrorArtifact("smtp_user is required") if self.smtp_password is None: return ErrorArtifact("smtp_password is required") if self.smtp_host is None: return ErrorArtifact("smtp_host is required") if self.smtp_port is None: return ErrorArtifact("smtp_port is required") msg = MIMEText(values["body"]) msg["Subject"] = values["subject"] msg["From"] = self.smtp_user msg["To"] = values["to"] try: with self._create_smtp_client(self.smtp_host, self.smtp_port) as client: client.login(self.smtp_user, self.smtp_password) client.sendmail(msg["From"], [msg["To"]], msg.as_string()) return InfoArtifact("email was successfully sent") except Exception as e: logging.exception(e) return ErrorArtifact(f"error sending email: {e}")
ExtractionTool
Attributes
Name | Type | Description |
---|---|---|
extraction_engine | BaseExtractionEngine | ExtractionEngine . |
Source Code in griptape/tools/extraction/tool.py
@define(kw_only=True) class ExtractionTool(BaseTool, RuleMixin): """Tool for using an Extraction Engine. Attributes: extraction_engine: `ExtractionEngine`. """ extraction_engine: BaseExtractionEngine = field() @activity( config={ "description": "Can be used extract structured text from data.", "schema": Schema( { Literal("data"): Or( str, Schema( { "memory_name": str, "artifact_namespace": str, } ), ), } ), }, ) def extract(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact: data = params["values"]["data"] if isinstance(data, str): artifacts = ListArtifact([TextArtifact(data)]) else: memory = self.find_input_memory(data["memory_name"]) artifact_namespace = data["artifact_namespace"] if memory is not None: artifacts = memory.load_artifacts(artifact_namespace) else: return ErrorArtifact("memory not found") return self.extraction_engine.extract_artifacts(artifacts)
extraction_engine = field()
class-attribute instance-attribute
extract(params)
Source Code in griptape/tools/extraction/tool.py
@activity( config={ "description": "Can be used extract structured text from data.", "schema": Schema( { Literal("data"): Or( str, Schema( { "memory_name": str, "artifact_namespace": str, } ), ), } ), }, ) def extract(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact: data = params["values"]["data"] if isinstance(data, str): artifacts = ListArtifact([TextArtifact(data)]) else: memory = self.find_input_memory(data["memory_name"]) artifact_namespace = data["artifact_namespace"] if memory is not None: artifacts = memory.load_artifacts(artifact_namespace) else: return ErrorArtifact("memory not found") return self.extraction_engine.extract_artifacts(artifacts)
FileManagerTool
Bases:
BaseTool
Attributes
Name | Type | Description |
---|---|---|
file_manager_driver | BaseFileManagerDriver | File Manager Driver to use to list, load, and save files. |
Source Code in griptape/tools/file_manager/tool.py
@define class FileManagerTool(BaseTool): """FileManagerTool is a tool that can be used to list, load, and save files. Attributes: file_manager_driver: File Manager Driver to use to list, load, and save files. """ file_manager_driver: BaseFileManagerDriver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True) loaders: dict[str, loaders.BaseLoader] = field( default=Factory( lambda self: { "application/pdf": loaders.PdfLoader(file_manager_driver=self.file_manager_driver), "text/csv": loaders.CsvLoader(file_manager_driver=self.file_manager_driver), "text": loaders.TextLoader(file_manager_driver=self.file_manager_driver), "image": loaders.ImageLoader(file_manager_driver=self.file_manager_driver), "application/octet-stream": BlobLoader(file_manager_driver=self.file_manager_driver), }, takes_self=True, ), kw_only=True, ) @activity( config={ "description": "Can be used to list files on disk", "schema": Schema( {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str}, ), }, ) def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact: path = params["values"]["path"] return self.file_manager_driver.list_files(path) @activity( config={ "description": "Can be used to load files from disk", "schema": Schema( { Literal( "paths", description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']", ): Schema([str]), }, ), }, ) def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact: paths = params["values"]["paths"] artifacts = [] for path in paths: # Fetch the file to try and determine the appropriate loader abs_path = os.path.join(self.file_manager_driver.workdir, path) mime_type = get_mime_type(abs_path) loader = next((loader for key, loader in self.loaders.items() if mime_type.startswith(key))) artifact = loader.load(path) if isinstance(artifact, ListArtifact): artifacts.extend(artifact.value) else: artifacts.append(artifact) return ListArtifact(artifacts) @activity( config={ "description": "Can be used to save memory artifacts to disk", "schema": Schema( { Literal( "dir_name", description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'", ): str, Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str, "memory_name": str, "artifact_namespace": str, }, ), }, ) def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact: dir_name = params["values"]["dir_name"] file_name = params["values"]["file_name"] memory_name = params["values"]["memory_name"] artifact_namespace = params["values"]["artifact_namespace"] memory = self.find_input_memory(params["values"]["memory_name"]) if not memory: return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found") list_artifact = memory.load_artifacts(artifact_namespace) if len(list_artifact) == 0: return ErrorArtifact( f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts", ) for artifact in list_artifact.value: formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name try: value = artifact.value if isinstance(artifact.value, (str, bytes)) else artifact.to_text() self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), value) except FileNotFoundError: return ErrorArtifact("Path not found") except IsADirectoryError: return ErrorArtifact("Path is a directory") except NotADirectoryError: return ErrorArtifact("Not a directory") except Exception as e: return ErrorArtifact(f"Failed to load file: {e!s}") return InfoArtifact("Successfully saved memory artifacts to disk") @activity( config={ "description": "Can be used to save content to a file", "schema": Schema( { Literal( "path", description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'", ): str, "content": str, }, ), }, ) def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact: path = params["values"]["path"] content = params["values"]["content"] return self.file_manager_driver.save_file(path, content)
file_manager_driver = field(default=Factory(lambda: LocalFileManagerDriver()), kw_only=True)
class-attribute instance-attributeloaders = field(default=Factory(lambda self: {'application/pdf': loaders.PdfLoader(file_manager_driver=self.file_manager_driver), 'text/csv': loaders.CsvLoader(file_manager_driver=self.file_manager_driver), 'text': loaders.TextLoader(file_manager_driver=self.file_manager_driver), 'image': loaders.ImageLoader(file_manager_driver=self.file_manager_driver), 'application/octet-stream': BlobLoader(file_manager_driver=self.file_manager_driver)}, takes_self=True), kw_only=True)
class-attribute instance-attribute
list_files_from_disk(params)
Source Code in griptape/tools/file_manager/tool.py
@activity( config={ "description": "Can be used to list files on disk", "schema": Schema( {Literal("path", description="Relative path in the POSIX format. For example, 'foo/bar'"): str}, ), }, ) def list_files_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact: path = params["values"]["path"] return self.file_manager_driver.list_files(path)
load_files_from_disk(params)
Source Code in griptape/tools/file_manager/tool.py
@activity( config={ "description": "Can be used to load files from disk", "schema": Schema( { Literal( "paths", description="Relative paths to files to be loaded in the POSIX format. For example, ['foo/bar/file.txt']", ): Schema([str]), }, ), }, ) def load_files_from_disk(self, params: dict) -> ListArtifact | ErrorArtifact: paths = params["values"]["paths"] artifacts = [] for path in paths: # Fetch the file to try and determine the appropriate loader abs_path = os.path.join(self.file_manager_driver.workdir, path) mime_type = get_mime_type(abs_path) loader = next((loader for key, loader in self.loaders.items() if mime_type.startswith(key))) artifact = loader.load(path) if isinstance(artifact, ListArtifact): artifacts.extend(artifact.value) else: artifacts.append(artifact) return ListArtifact(artifacts)
save_content_to_file(params)
Source Code in griptape/tools/file_manager/tool.py
@activity( config={ "description": "Can be used to save content to a file", "schema": Schema( { Literal( "path", description="Destination file path on disk in the POSIX format. For example, 'foo/bar/baz.txt'", ): str, "content": str, }, ), }, ) def save_content_to_file(self, params: dict) -> ErrorArtifact | InfoArtifact: path = params["values"]["path"] content = params["values"]["content"] return self.file_manager_driver.save_file(path, content)
save_memory_artifacts_to_disk(params)
Source Code in griptape/tools/file_manager/tool.py
@activity( config={ "description": "Can be used to save memory artifacts to disk", "schema": Schema( { Literal( "dir_name", description="Relative destination path name on disk in the POSIX format. For example, 'foo/bar'", ): str, Literal("file_name", description="Destination file name. For example, 'baz.txt'"): str, "memory_name": str, "artifact_namespace": str, }, ), }, ) def save_memory_artifacts_to_disk(self, params: dict) -> ErrorArtifact | InfoArtifact: dir_name = params["values"]["dir_name"] file_name = params["values"]["file_name"] memory_name = params["values"]["memory_name"] artifact_namespace = params["values"]["artifact_namespace"] memory = self.find_input_memory(params["values"]["memory_name"]) if not memory: return ErrorArtifact(f"Failed to save memory artifacts to disk - memory named '{memory_name}' not found") list_artifact = memory.load_artifacts(artifact_namespace) if len(list_artifact) == 0: return ErrorArtifact( f"Failed to save memory artifacts to disk - memory named '{memory_name}' does not contain any artifacts", ) for artifact in list_artifact.value: formatted_file_name = f"{artifact.name}-{file_name}" if len(list_artifact) > 1 else file_name try: value = artifact.value if isinstance(artifact.value, (str, bytes)) else artifact.to_text() self.file_manager_driver.save_file(os.path.join(dir_name, formatted_file_name), value) except FileNotFoundError: return ErrorArtifact("Path not found") except IsADirectoryError: return ErrorArtifact("Path is a directory") except NotADirectoryError: return ErrorArtifact("Not a directory") except Exception as e: return ErrorArtifact(f"Failed to load file: {e!s}") return InfoArtifact("Successfully saved memory artifacts to disk")
GriptapeCloudToolTool
Bases:
BaseGriptapeCloudTool
Attributes
Name | Type | Description |
---|---|---|
tool_id | str | The ID of the tool to run. |
Source Code in griptape/tools/griptape_cloud_tool/tool.py
@define() class GriptapeCloudToolTool(BaseGriptapeCloudTool): """Runs a Gen AI Builder hosted Tool. Attributes: tool_id: The ID of the tool to run. """ tool_id: str = field(kw_only=True) def __attrs_post_init__(self) -> None: self._init_activities() def _init_activities(self) -> None: schema = self._get_schema() tool_name, activity_schemas = self._parse_schema(schema) if self.name == self.__class__.__name__: self.name = tool_name for activity_name, (description, activity_schema) in activity_schemas.items(): activity_handler = self._create_activity_handler(activity_name, description, activity_schema) setattr(self, activity_name, MethodType(activity_handler, self)) def _get_schema(self) -> dict: response = requests.get(urljoin(self.base_url, f"/api/tools/{self.tool_id}/openapi"), headers=self.headers) response.raise_for_status() schema = response.json() if not isinstance(schema, dict): raise RuntimeError(f"Invalid schema for tool {self.tool_id}: {schema}") if "error" in schema and "tool_run_id" in schema: raise RuntimeError(f"Failed to retrieve schema for tool {self.tool_id}: {schema['error']}") return schema def _parse_schema(self, schema: dict) -> tuple[str, dict[str, tuple[str, Schema]]]: """Parses an openapi schema into a dictionary of activity names and their respective descriptions + schemas.""" activities = {} name = schema.get("info", {}).get("title") for path, path_info in schema.get("paths", {}).items(): if not path.startswith("/activities"): continue for method, method_info in path_info.items(): if "post" in method.lower(): activity_name = method_info["operationId"] description = method_info.get("description", "") activity_schema = self.__extract_schema_from_ref( schema, method_info.get("requestBody", {}) .get("content", {}) .get("application/json", {}) .get("schema", {}), ) activities[activity_name] = (description, activity_schema) return name, activities def __extract_schema_from_ref(self, schema: dict, schema_ref: dict) -> Schema: """Extracts a schema from a $ref if present, resolving it into native schema properties.""" if "$ref" in schema_ref: # Resolve the reference and retrieve the schema data ref_path = schema_ref["$ref"].split("/")[-1] schema_data = schema["components"]["schemas"].get(ref_path, {}) else: # Use the provided schema directly if no $ref is found schema_data = schema_ref # Convert the schema_data dictionary into a Schema with its properties properties = {} for prop, prop_info in schema_data.get("properties", {}).items(): prop_type = prop_info.get("type", "string") prop_description = prop_info.get("description", "") schema_prop = Literal(prop, description=prop_description) is_optional = prop not in schema_data.get("required", []) if is_optional: schema_prop = SchemaOptional(schema_prop) properties[schema_prop] = self._map_openapi_type_to_python(prop_type, prop_info) return Schema(properties) def _map_openapi_type_to_python( self, openapi_type: str, schema_info: Optional[dict] = None ) -> type | list[type] | Or: """Maps OpenAPI types to native Python types.""" type_mapping = {"string": str, "integer": int, "boolean": bool, "number": float, "object": dict} if openapi_type == "array" and schema_info is not None and "items" in schema_info: enum = schema_info["items"].get("enum") if enum: return enum items_type = schema_info["items"].get("type", "string") return [self._map_openapi_type_to_python(items_type)] # pyright: ignore[reportReturnType] if schema_info is not None and schema_info.get("enum"): return Or(*schema_info["enum"]) return type_mapping.get(openapi_type, str) def _create_activity_handler(self, activity_name: str, description: str, activity_schema: Schema) -> Callable: """Creates an activity handler method for the tool.""" @activity(config={"name": activity_name, "description": description, "schema": activity_schema}) def activity_handler(self: GriptapeCloudToolTool, values: dict) -> Any: return self._run_activity(activity_name, values) return activity_handler def _run_activity(self, activity_name: str, params: dict) -> BaseArtifact: """Runs an activity on the tool with the provided parameters.""" url = urljoin(self.base_url, f"/api/tools/{self.tool_id}/activities/{activity_name}") response = requests.post(url, json=params, headers=self.headers) response.raise_for_status() try: return BaseArtifact.from_dict(response.json()) except ValueError: return TextArtifact(response.text)
tool_id = field(kw_only=True)
class-attribute instance-attribute
attrs_post_init()
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def __attrs_post_init__(self) -> None: self._init_activities()
__extract_schema_from_ref(schema, schema_ref)
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def __extract_schema_from_ref(self, schema: dict, schema_ref: dict) -> Schema: """Extracts a schema from a $ref if present, resolving it into native schema properties.""" if "$ref" in schema_ref: # Resolve the reference and retrieve the schema data ref_path = schema_ref["$ref"].split("/")[-1] schema_data = schema["components"]["schemas"].get(ref_path, {}) else: # Use the provided schema directly if no $ref is found schema_data = schema_ref # Convert the schema_data dictionary into a Schema with its properties properties = {} for prop, prop_info in schema_data.get("properties", {}).items(): prop_type = prop_info.get("type", "string") prop_description = prop_info.get("description", "") schema_prop = Literal(prop, description=prop_description) is_optional = prop not in schema_data.get("required", []) if is_optional: schema_prop = SchemaOptional(schema_prop) properties[schema_prop] = self._map_openapi_type_to_python(prop_type, prop_info) return Schema(properties)
_create_activity_handler(activity_name, description, activity_schema)
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _create_activity_handler(self, activity_name: str, description: str, activity_schema: Schema) -> Callable: """Creates an activity handler method for the tool.""" @activity(config={"name": activity_name, "description": description, "schema": activity_schema}) def activity_handler(self: GriptapeCloudToolTool, values: dict) -> Any: return self._run_activity(activity_name, values) return activity_handler
_get_schema()
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _get_schema(self) -> dict: response = requests.get(urljoin(self.base_url, f"/api/tools/{self.tool_id}/openapi"), headers=self.headers) response.raise_for_status() schema = response.json() if not isinstance(schema, dict): raise RuntimeError(f"Invalid schema for tool {self.tool_id}: {schema}") if "error" in schema and "tool_run_id" in schema: raise RuntimeError(f"Failed to retrieve schema for tool {self.tool_id}: {schema['error']}") return schema
_init_activities()
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _init_activities(self) -> None: schema = self._get_schema() tool_name, activity_schemas = self._parse_schema(schema) if self.name == self.__class__.__name__: self.name = tool_name for activity_name, (description, activity_schema) in activity_schemas.items(): activity_handler = self._create_activity_handler(activity_name, description, activity_schema) setattr(self, activity_name, MethodType(activity_handler, self))
_map_openapi_type_to_python(openapi_type, schema_info=None)
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _map_openapi_type_to_python( self, openapi_type: str, schema_info: Optional[dict] = None ) -> type | list[type] | Or: """Maps OpenAPI types to native Python types.""" type_mapping = {"string": str, "integer": int, "boolean": bool, "number": float, "object": dict} if openapi_type == "array" and schema_info is not None and "items" in schema_info: enum = schema_info["items"].get("enum") if enum: return enum items_type = schema_info["items"].get("type", "string") return [self._map_openapi_type_to_python(items_type)] # pyright: ignore[reportReturnType] if schema_info is not None and schema_info.get("enum"): return Or(*schema_info["enum"]) return type_mapping.get(openapi_type, str)
_parse_schema(schema)
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _parse_schema(self, schema: dict) -> tuple[str, dict[str, tuple[str, Schema]]]: """Parses an openapi schema into a dictionary of activity names and their respective descriptions + schemas.""" activities = {} name = schema.get("info", {}).get("title") for path, path_info in schema.get("paths", {}).items(): if not path.startswith("/activities"): continue for method, method_info in path_info.items(): if "post" in method.lower(): activity_name = method_info["operationId"] description = method_info.get("description", "") activity_schema = self.__extract_schema_from_ref( schema, method_info.get("requestBody", {}) .get("content", {}) .get("application/json", {}) .get("schema", {}), ) activities[activity_name] = (description, activity_schema) return name, activities
_run_activity(activity_name, params)
Source Code in griptape/tools/griptape_cloud_tool/tool.py
def _run_activity(self, activity_name: str, params: dict) -> BaseArtifact: """Runs an activity on the tool with the provided parameters.""" url = urljoin(self.base_url, f"/api/tools/{self.tool_id}/activities/{activity_name}") response = requests.post(url, json=params, headers=self.headers) response.raise_for_status() try: return BaseArtifact.from_dict(response.json()) except ValueError: return TextArtifact(response.text)
ImageQueryTool
Bases:
BaseTool
Source Code in griptape/tools/image_query/tool.py
@define class ImageQueryTool(BaseTool): prompt_driver: BasePromptDriver = field(kw_only=True) image_loader: ImageLoader = field(default=Factory(lambda: ImageLoader()), kw_only=True) @activity( config={ "description": "This tool can be used to query the contents of images on disk.", "schema": Schema( { Literal( "query", description="A detailed question to be answered using the contents of the provided images.", ): str, Literal("image_paths", description="The paths to an image files on disk."): Schema([str]), }, ), }, ) def query_image_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact: query = params["values"]["query"] image_paths = params["values"]["image_paths"] image_artifacts = [] for image_path in image_paths: image_artifacts.append(self.image_loader.load(image_path)) return cast( "TextArtifact", self.prompt_driver.run( PromptStack.from_artifact( ListArtifact([TextArtifact(query), *image_artifacts]), ) ).to_artifact(), ) @activity( config={ "description": "This tool can be used to query the contents of images in memory.", "schema": Schema( { Literal( "query", description="A detailed question to be answered using the contents of the provided images.", ): str, Literal("image_artifacts", description="Image artifact memory references."): [ {"image_artifact_namespace": str, "image_artifact_name": str}, ], "memory_name": str, }, ), }, ) def query_images_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact: query = params["values"]["query"] image_artifact_references = params["values"]["image_artifacts"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") image_artifacts = [] for image_artifact_reference in image_artifact_references: try: image_artifact = load_artifact_from_memory( memory, image_artifact_reference["image_artifact_namespace"], image_artifact_reference["image_artifact_name"], ImageArtifact, ) image_artifacts.append(cast("ImageArtifact", image_artifact)) except ValueError: # If we're unable to parse the artifact as an ImageArtifact, attempt to # parse a BlobArtifact and load it as an ImageArtifact. blob_artifact = load_artifact_from_memory( memory, image_artifact_reference["image_artifact_namespace"], image_artifact_reference["image_artifact_name"], BlobArtifact, ) image_artifacts.append(self.image_loader.load(blob_artifact.value)) except Exception as e: return ErrorArtifact(str(e)) return cast( "TextArtifact", self.prompt_driver.run( PromptStack.from_artifact( ListArtifact([TextArtifact(query), *image_artifacts]), ) ).to_artifact(), )
image_loader = field(default=Factory(lambda: ImageLoader()), kw_only=True)
class-attribute instance-attributeprompt_driver = field(kw_only=True)
class-attribute instance-attribute
query_image_from_disk(params)
Source Code in griptape/tools/image_query/tool.py
@activity( config={ "description": "This tool can be used to query the contents of images on disk.", "schema": Schema( { Literal( "query", description="A detailed question to be answered using the contents of the provided images.", ): str, Literal("image_paths", description="The paths to an image files on disk."): Schema([str]), }, ), }, ) def query_image_from_disk(self, params: dict) -> TextArtifact | ErrorArtifact: query = params["values"]["query"] image_paths = params["values"]["image_paths"] image_artifacts = [] for image_path in image_paths: image_artifacts.append(self.image_loader.load(image_path)) return cast( "TextArtifact", self.prompt_driver.run( PromptStack.from_artifact( ListArtifact([TextArtifact(query), *image_artifacts]), ) ).to_artifact(), )
query_images_from_memory(params)
Source Code in griptape/tools/image_query/tool.py
@activity( config={ "description": "This tool can be used to query the contents of images in memory.", "schema": Schema( { Literal( "query", description="A detailed question to be answered using the contents of the provided images.", ): str, Literal("image_artifacts", description="Image artifact memory references."): [ {"image_artifact_namespace": str, "image_artifact_name": str}, ], "memory_name": str, }, ), }, ) def query_images_from_memory(self, params: dict[str, Any]) -> TextArtifact | ErrorArtifact: query = params["values"]["query"] image_artifact_references = params["values"]["image_artifacts"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") image_artifacts = [] for image_artifact_reference in image_artifact_references: try: image_artifact = load_artifact_from_memory( memory, image_artifact_reference["image_artifact_namespace"], image_artifact_reference["image_artifact_name"], ImageArtifact, ) image_artifacts.append(cast("ImageArtifact", image_artifact)) except ValueError: # If we're unable to parse the artifact as an ImageArtifact, attempt to # parse a BlobArtifact and load it as an ImageArtifact. blob_artifact = load_artifact_from_memory( memory, image_artifact_reference["image_artifact_namespace"], image_artifact_reference["image_artifact_name"], BlobArtifact, ) image_artifacts.append(self.image_loader.load(blob_artifact.value)) except Exception as e: return ErrorArtifact(str(e)) return cast( "TextArtifact", self.prompt_driver.run( PromptStack.from_artifact( ListArtifact([TextArtifact(query), *image_artifacts]), ) ).to_artifact(), )
InpaintingImageGenerationTool
Bases:
BaseImageGenerationTool
Attributes
Name | Type | Description |
---|---|---|
image_generation_driver | BaseImageGenerationDriver | The image generation driver used to generate the image. |
output_dir | Optional[str] | If provided, the generated image will be written to disk in output_dir. |
output_file | Optional[str] | If provided, the generated image will be written to disk as output_file. |
Source Code in griptape/tools/inpainting_image_generation/tool.py
@define class InpaintingImageGenerationTool(BaseImageGenerationTool): """A tool that can be used to generate prompted inpaintings of an image. Attributes: image_generation_driver: The image generation driver used to generate the image. output_dir: If provided, the generated image will be written to disk in output_dir. output_file: If provided, the generated image will be written to disk as output_file. """ image_generation_driver: BaseImageGenerationDriver = field(kw_only=True) image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True) @activity( config={ "description": "Modifies an image within a specified mask area using image and mask files.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, Literal( "image_file", description="The path to an image file to be used as a base to generate variations from.", ): str, Literal("mask_file", description="The path to mask image file."): str, }, ), }, ) def image_inpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_file = params["values"]["image_file"] mask_file = params["values"]["mask_file"] input_artifact = self.image_loader.load(image_file) mask_artifact = self.image_loader.load(mask_file) return self._generate_inpainting( prompt, negative_prompt, cast("ImageArtifact", input_artifact), cast("ImageArtifact", mask_artifact) ) @activity( config={ "description": "Modifies an image within a specified mask area using image and mask artifacts in memory.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, "memory_name": str, "image_artifact_namespace": str, "image_artifact_name": str, "mask_artifact_namespace": str, "mask_artifact_name": str, }, ), }, ) def image_inpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_artifact_namespace = params["values"]["image_artifact_namespace"] image_artifact_name = params["values"]["image_artifact_name"] mask_artifact_namespace = params["values"]["mask_artifact_namespace"] mask_artifact_name = params["values"]["mask_artifact_name"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") try: image_artifact = load_artifact_from_memory( memory, image_artifact_namespace, image_artifact_name, ImageArtifact, ) mask_artifact = load_artifact_from_memory( memory, mask_artifact_namespace, mask_artifact_name, ImageArtifact, ) except ValueError as e: return ErrorArtifact(str(e)) return self._generate_inpainting( prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact) ) def _generate_inpainting( self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact ) -> ImageArtifact: output_artifact = self.image_generation_driver.run_image_inpainting( prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_generation_driver = field(kw_only=True)
class-attribute instance-attributeimage_loader = field(default=ImageLoader(), kw_only=True)
class-attribute instance-attribute
_generate_inpainting(prompt, negative_prompt, image_artifact, mask_artifact)
Source Code in griptape/tools/inpainting_image_generation/tool.py
def _generate_inpainting( self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact ) -> ImageArtifact: output_artifact = self.image_generation_driver.run_image_inpainting( prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_inpainting_from_file(params)
Source Code in griptape/tools/inpainting_image_generation/tool.py
@activity( config={ "description": "Modifies an image within a specified mask area using image and mask files.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, Literal( "image_file", description="The path to an image file to be used as a base to generate variations from.", ): str, Literal("mask_file", description="The path to mask image file."): str, }, ), }, ) def image_inpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_file = params["values"]["image_file"] mask_file = params["values"]["mask_file"] input_artifact = self.image_loader.load(image_file) mask_artifact = self.image_loader.load(mask_file) return self._generate_inpainting( prompt, negative_prompt, cast("ImageArtifact", input_artifact), cast("ImageArtifact", mask_artifact) )
image_inpainting_from_memory(params)
Source Code in griptape/tools/inpainting_image_generation/tool.py
@activity( config={ "description": "Modifies an image within a specified mask area using image and mask artifacts in memory.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, "memory_name": str, "image_artifact_namespace": str, "image_artifact_name": str, "mask_artifact_namespace": str, "mask_artifact_name": str, }, ), }, ) def image_inpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_artifact_namespace = params["values"]["image_artifact_namespace"] image_artifact_name = params["values"]["image_artifact_name"] mask_artifact_namespace = params["values"]["mask_artifact_namespace"] mask_artifact_name = params["values"]["mask_artifact_name"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") try: image_artifact = load_artifact_from_memory( memory, image_artifact_namespace, image_artifact_name, ImageArtifact, ) mask_artifact = load_artifact_from_memory( memory, mask_artifact_namespace, mask_artifact_name, ImageArtifact, ) except ValueError as e: return ErrorArtifact(str(e)) return self._generate_inpainting( prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact) )
OutpaintingImageGenerationTool
Bases:
BaseImageGenerationTool
Attributes
Name | Type | Description |
---|---|---|
image_generation_driver | BaseImageGenerationDriver | The image generation driver used to generate the image. |
output_dir | Optional[str] | If provided, the generated image will be written to disk in output_dir. |
output_file | Optional[str] | If provided, the generated image will be written to disk as output_file. |
Source Code in griptape/tools/outpainting_image_generation/tool.py
@define class OutpaintingImageGenerationTool(BaseImageGenerationTool): """A tool that can be used to generate prompted outpaintings of an image. Attributes: image_generation_driver: The image generation driver used to generate the image. output_dir: If provided, the generated image will be written to disk in output_dir. output_file: If provided, the generated image will be written to disk as output_file. """ image_generation_driver: BaseImageGenerationDriver = field(kw_only=True) image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True) @activity( config={ "description": "Modifies an image outside a specified mask area using image and mask files.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, Literal( "image_file", description="The path to an image file to be used as a base to generate variations from.", ): str, Literal("mask_file", description="The path to mask image file."): str, }, ), }, ) def image_outpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_file = params["values"]["image_file"] mask_file = params["values"]["mask_file"] input_artifact = self.image_loader.load(image_file) mask_artifact = self.image_loader.load(mask_file) return self._generate_outpainting(prompt, negative_prompt, input_artifact, mask_artifact) @activity( config={ "description": "Modifies an image outside a specified mask area using image and mask artifacts in memory.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, "memory_name": str, "image_artifact_namespace": str, "mask_artifact_namespace": str, }, ), }, ) def image_outpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_artifact_namespace = params["values"]["image_artifact_namespace"] image_artifact_name = params["values"]["image_artifact_name"] mask_artifact_namespace = params["values"]["mask_artifact_namespace"] mask_artifact_name = params["values"]["mask_artifact_name"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") try: image_artifact = load_artifact_from_memory( memory, image_artifact_namespace, image_artifact_name, ImageArtifact, ) mask_artifact = load_artifact_from_memory( memory, mask_artifact_namespace, mask_artifact_name, ImageArtifact, ) except ValueError as e: return ErrorArtifact(str(e)) return self._generate_outpainting( prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact) ) def _generate_outpainting( self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact ) -> ImageArtifact | ErrorArtifact: output_artifact = self.image_generation_driver.run_image_outpainting( prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_generation_driver = field(kw_only=True)
class-attribute instance-attributeimage_loader = field(default=ImageLoader(), kw_only=True)
class-attribute instance-attribute
_generate_outpainting(prompt, negative_prompt, image_artifact, mask_artifact)
Source Code in griptape/tools/outpainting_image_generation/tool.py
def _generate_outpainting( self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact, mask_artifact: ImageArtifact ) -> ImageArtifact | ErrorArtifact: output_artifact = self.image_generation_driver.run_image_outpainting( prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact, mask=mask_artifact ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_outpainting_from_file(params)
Source Code in griptape/tools/outpainting_image_generation/tool.py
@activity( config={ "description": "Modifies an image outside a specified mask area using image and mask files.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, Literal( "image_file", description="The path to an image file to be used as a base to generate variations from.", ): str, Literal("mask_file", description="The path to mask image file."): str, }, ), }, ) def image_outpainting_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_file = params["values"]["image_file"] mask_file = params["values"]["mask_file"] input_artifact = self.image_loader.load(image_file) mask_artifact = self.image_loader.load(mask_file) return self._generate_outpainting(prompt, negative_prompt, input_artifact, mask_artifact)
image_outpainting_from_memory(params)
Source Code in griptape/tools/outpainting_image_generation/tool.py
@activity( config={ "description": "Modifies an image outside a specified mask area using image and mask artifacts in memory.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, "memory_name": str, "image_artifact_namespace": str, "mask_artifact_namespace": str, }, ), }, ) def image_outpainting_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_artifact_namespace = params["values"]["image_artifact_namespace"] image_artifact_name = params["values"]["image_artifact_name"] mask_artifact_namespace = params["values"]["mask_artifact_namespace"] mask_artifact_name = params["values"]["mask_artifact_name"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") try: image_artifact = load_artifact_from_memory( memory, image_artifact_namespace, image_artifact_name, ImageArtifact, ) mask_artifact = load_artifact_from_memory( memory, mask_artifact_namespace, mask_artifact_name, ImageArtifact, ) except ValueError as e: return ErrorArtifact(str(e)) return self._generate_outpainting( prompt, negative_prompt, cast("ImageArtifact", image_artifact), cast("ImageArtifact", mask_artifact) )
PromptImageGenerationTool
Bases:
BaseImageGenerationTool
Attributes
Name | Type | Description |
---|---|---|
image_generation_driver | BaseImageGenerationDriver | The image generation driver used to generate the image. |
output_dir | Optional[str] | If provided, the generated image will be written to disk in output_dir. |
output_file | Optional[str] | If provided, the generated image will be written to disk as output_file. |
Source Code in griptape/tools/prompt_image_generation/tool.py
@define class PromptImageGenerationTool(BaseImageGenerationTool): """A tool that can be used to generate an image from a text prompt. Attributes: image_generation_driver: The image generation driver used to generate the image. output_dir: If provided, the generated image will be written to disk in output_dir. output_file: If provided, the generated image will be written to disk as output_file. """ image_generation_driver: BaseImageGenerationDriver = field(kw_only=True) @activity( config={ "description": "Generates an image from text prompts.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, } ), }, ) def generate_image(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] output_artifact = self.image_generation_driver.run_text_to_image( prompts=[prompt], negative_prompts=[negative_prompt] ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_generation_driver = field(kw_only=True)
class-attribute instance-attribute
generate_image(params)
Source Code in griptape/tools/prompt_image_generation/tool.py
@activity( config={ "description": "Generates an image from text prompts.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, } ), }, ) def generate_image(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] output_artifact = self.image_generation_driver.run_text_to_image( prompts=[prompt], negative_prompts=[negative_prompt] ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
PromptSummaryTool
Attributes
Name | Type | Description |
---|---|---|
prompt_summary_engine | PromptSummaryEngine | PromptSummaryEngine . |
Source Code in griptape/tools/prompt_summary/tool.py
@define(kw_only=True) class PromptSummaryTool(BaseTool, RuleMixin): """Tool for using a Prompt Summary Engine. Attributes: prompt_summary_engine: `PromptSummaryEngine`. """ prompt_summary_engine: PromptSummaryEngine = field(kw_only=True, default=Factory(lambda: PromptSummaryEngine())) @activity( config={ "description": "Can be used to summarize text content.", "schema": Schema( { Literal("summary"): Or( str, Schema( { "memory_name": str, "artifact_namespace": str, } ), ), } ), }, ) def summarize(self, params: dict) -> BaseArtifact: summary = params["values"]["summary"] if isinstance(summary, str): artifacts = ListArtifact([TextArtifact(summary)]) else: memory = self.find_input_memory(summary["memory_name"]) artifact_namespace = summary["artifact_namespace"] if memory is not None: artifacts = memory.load_artifacts(artifact_namespace) else: return ErrorArtifact("memory not found") return self.prompt_summary_engine.summarize_artifacts(artifacts, rulesets=self.rulesets)
prompt_summary_engine = field(kw_only=True, default=Factory(lambda: PromptSummaryEngine()))
class-attribute instance-attribute
summarize(params)
Source Code in griptape/tools/prompt_summary/tool.py
@activity( config={ "description": "Can be used to summarize text content.", "schema": Schema( { Literal("summary"): Or( str, Schema( { "memory_name": str, "artifact_namespace": str, } ), ), } ), }, ) def summarize(self, params: dict) -> BaseArtifact: summary = params["values"]["summary"] if isinstance(summary, str): artifacts = ListArtifact([TextArtifact(summary)]) else: memory = self.find_input_memory(summary["memory_name"]) artifact_namespace = summary["artifact_namespace"] if memory is not None: artifacts = memory.load_artifacts(artifact_namespace) else: return ErrorArtifact("memory not found") return self.prompt_summary_engine.summarize_artifacts(artifacts, rulesets=self.rulesets)
QueryTool
Source Code in griptape/tools/query/tool.py
@define(kw_only=True) class QueryTool(BaseTool, RuleMixin): """Tool for performing a query against data.""" prompt_driver: BasePromptDriver = field(default=Factory(lambda: Defaults.drivers_config.prompt_driver)) _rag_engine: RagEngine = field( default=Factory( lambda self: RagEngine( response_stage=ResponseRagStage( response_modules=[ PromptResponseRagModule(prompt_driver=self.prompt_driver, rulesets=self.rulesets) ], ), ), takes_self=True, ), alias="_rag_engine", ) @activity( config={ "description": "Can be used to search through textual content.", "schema": Schema( { Literal("query", description="A natural language search query"): str, Literal("content"): Or( str, Schema( { "memory_name": str, "artifact_namespace": str, } ), ), } ), }, ) def query(self, params: dict) -> ListArtifact | ErrorArtifact: query = params["values"]["query"] content = params["values"]["content"] if isinstance(content, str): text_artifacts = [TextArtifact(content)] else: memory = self.find_input_memory(content["memory_name"]) artifact_namespace = content["artifact_namespace"] if memory is not None: artifacts = memory.load_artifacts(artifact_namespace) else: return ErrorArtifact("memory not found") text_artifacts = [artifact for artifact in artifacts if isinstance(artifact, TextArtifact)] outputs = self._rag_engine.process(RagContext(query=query, text_chunks=text_artifacts)).outputs if len(outputs) > 0: return ListArtifact(outputs) return ErrorArtifact("query output is empty")
_rag_engine = field(default=Factory(lambda self: RagEngine(response_stage=ResponseRagStage(response_modules=[PromptResponseRagModule(prompt_driver=self.prompt_driver, rulesets=self.rulesets)])), takes_self=True), alias='_rag_engine')
class-attribute instance-attributeprompt_driver = field(default=Factory(lambda: Defaults.drivers_config.prompt_driver))
class-attribute instance-attribute
query(params)
Source Code in griptape/tools/query/tool.py
@activity( config={ "description": "Can be used to search through textual content.", "schema": Schema( { Literal("query", description="A natural language search query"): str, Literal("content"): Or( str, Schema( { "memory_name": str, "artifact_namespace": str, } ), ), } ), }, ) def query(self, params: dict) -> ListArtifact | ErrorArtifact: query = params["values"]["query"] content = params["values"]["content"] if isinstance(content, str): text_artifacts = [TextArtifact(content)] else: memory = self.find_input_memory(content["memory_name"]) artifact_namespace = content["artifact_namespace"] if memory is not None: artifacts = memory.load_artifacts(artifact_namespace) else: return ErrorArtifact("memory not found") text_artifacts = [artifact for artifact in artifacts if isinstance(artifact, TextArtifact)] outputs = self._rag_engine.process(RagContext(query=query, text_chunks=text_artifacts)).outputs if len(outputs) > 0: return ListArtifact(outputs) return ErrorArtifact("query output is empty")
RagTool
Bases:
BaseTool
Attributes
Name | Type | Description |
---|---|---|
description | str | LLM-friendly RAG engine description. |
rag_engine | RagEngine | RagEngine . |
Source Code in griptape/tools/rag/tool.py
@define(kw_only=True) class RagTool(BaseTool): """Tool for querying a RAG engine. Attributes: description: LLM-friendly RAG engine description. rag_engine: `RagEngine`. """ description: str = field() rag_engine: RagEngine = field() @activity( config={ "description": "{{ _self.description }}", "schema": Schema({Literal("query", description="A natural language search query"): str}), }, ) def search(self, params: dict) -> ListArtifact | ErrorArtifact: query = params["values"]["query"] try: artifacts = self.rag_engine.process_query(query).outputs outputs = [] for artifact in artifacts: if isinstance(artifact, ListArtifact): outputs.extend(artifact.value) else: outputs.append(artifact) if len(outputs) > 0: return ListArtifact(outputs) return ErrorArtifact("query output is empty") except Exception as e: return ErrorArtifact(f"error querying: {e}")
description = field()
class-attribute instance-attributerag_engine = field()
class-attribute instance-attribute
search(params)
Source Code in griptape/tools/rag/tool.py
@activity( config={ "description": "{{ _self.description }}", "schema": Schema({Literal("query", description="A natural language search query"): str}), }, ) def search(self, params: dict) -> ListArtifact | ErrorArtifact: query = params["values"]["query"] try: artifacts = self.rag_engine.process_query(query).outputs outputs = [] for artifact in artifacts: if isinstance(artifact, ListArtifact): outputs.extend(artifact.value) else: outputs.append(artifact) if len(outputs) > 0: return ListArtifact(outputs) return ErrorArtifact("query output is empty") except Exception as e: return ErrorArtifact(f"error querying: {e}")
RestApiTool
Bases:
BaseTool
Attributes
Name | Type | Description |
---|---|---|
base_url | str | The base url that will be used for the request. |
path | Optional[str] | The resource path that will be appended to base_url. |
description | str | A description of what the REST API does. |
request_body_schema | Optional[str] | A JSON schema string describing the request body. Recommended for PUT, POST, and PATCH requests. |
request_query_params_schema | Optional[str] | A JSON schema string describing the available query parameters. |
request_path_params_schema | Optional[str] | A JSON schema string describing the available path parameters. The schema must describe an array of string values. |
response_body_schema | Optional[str] | A JSON schema string describing the response body. |
request_headers | Optional[dict[str, str]] | Headers to include in the requests. |
Source Code in griptape/tools/rest_api/tool.py
@define class RestApiTool(BaseTool): """A tool for making REST API requests. Attributes: base_url: The base url that will be used for the request. path: The resource path that will be appended to base_url. description: A description of what the REST API does. request_body_schema: A JSON schema string describing the request body. Recommended for PUT, POST, and PATCH requests. request_query_params_schema: A JSON schema string describing the available query parameters. request_path_params_schema: A JSON schema string describing the available path parameters. The schema must describe an array of string values. response_body_schema: A JSON schema string describing the response body. request_headers: Headers to include in the requests. """ base_url: str = field(kw_only=True) path: Optional[str] = field(default=None, kw_only=True) description: str = field(kw_only=True) request_path_params_schema: Optional[str] = field(default=None, kw_only=True) request_query_params_schema: Optional[str] = field(default=None, kw_only=True) request_body_schema: Optional[str] = field(default=None, kw_only=True) response_body_schema: Optional[str] = field(default=None, kw_only=True) request_headers: Optional[dict[str, str]] = field(default=None, kw_only=True) @property def full_url(self) -> str: return self._build_url(self.base_url, path=self.path) @activity( config={ "description": dedent( """ This tool can be used to make a put request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": Schema({Literal("body", description="The request body."): dict}), }, ) def put(self, params: dict) -> BaseArtifact: from requests import exceptions, put values = params["values"] base_url = self.base_url path = self.path body = values["body"] url = self._build_url(base_url, path=path) try: response = put(url, json=body, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err)) @activity( config={ "description": dedent( """ This tool can be used to make a patch request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": Schema( { Literal("path_params", description="The request path parameters."): Schema([str]), Literal("body", description="The request body."): dict, }, ), }, ) def patch(self, params: dict) -> BaseArtifact: from requests import exceptions, patch values = params["values"] base_url = self.base_url path = self.path body = values["body"] path_params = values["path_params"] url = self._build_url(base_url, path=path, path_params=path_params) try: response = patch(url, json=body, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err)) @activity( config={ "description": dedent( """ This tool can be used to make a post request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": Schema({Literal("body", description="The request body."): dict}), }, ) def post(self, params: dict) -> BaseArtifact: from requests import exceptions, post values = params["values"] base_url = self.base_url path = self.path url = self._build_url(base_url, path=path) body = values["body"] try: response = post(url, json=body, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err)) @activity( config={ "description": dedent( """ This tool can be used to make a get request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": schema.Optional( Schema( { schema.Optional(Literal("query_params", description="The request query parameters.")): dict, schema.Optional(Literal("path_params", description="The request path parameters.")): Schema( [str] ), }, ), ), }, ) def get(self, params: dict) -> BaseArtifact: from requests import exceptions, get values = params["values"] base_url = self.base_url path = self.path query_params = {} path_params = [] if values: query_params = values.get("query_params", {}) path_params = values.get("path_params", []) url = self._build_url(base_url, path=path, path_params=path_params) try: response = get(url, params=query_params, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err)) @activity( config={ "description": dedent( """ This tool can be used to make a delete request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} """, ), "schema": Schema( { schema.Optional(Literal("query_params", description="The request query parameters.")): dict, schema.Optional(Literal("path_params", description="The request path parameters.")): Schema([str]), }, ), }, ) def delete(self, params: dict) -> BaseArtifact: from requests import delete, exceptions values = params["values"] base_url = self.base_url path = self.path query_params = values.get("query_params", {}) path_params = values.get("path_params", []) url = self._build_url(base_url, path=path, path_params=path_params) try: response = delete(url, params=query_params, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err)) def _build_url(self, base_url: str, path: Optional[str] = None, path_params: Optional[list] = None) -> str: url = "" if path: url += path.strip("/") if path_params: url += f"/{str.join('/', map(str, path_params))}" return urljoin(base_url.strip("/"), url)
base_url = field(kw_only=True)
class-attribute instance-attributedescription = field(kw_only=True)
class-attribute instance-attributefull_url
propertypath = field(default=None, kw_only=True)
class-attribute instance-attributerequest_body_schema = field(default=None, kw_only=True)
class-attribute instance-attributerequest_headers = field(default=None, kw_only=True)
class-attribute instance-attributerequest_path_params_schema = field(default=None, kw_only=True)
class-attribute instance-attributerequest_query_params_schema = field(default=None, kw_only=True)
class-attribute instance-attributeresponse_body_schema = field(default=None, kw_only=True)
class-attribute instance-attribute
_build_url(base_url, path=None, path_params=None)
Source Code in griptape/tools/rest_api/tool.py
def _build_url(self, base_url: str, path: Optional[str] = None, path_params: Optional[list] = None) -> str: url = "" if path: url += path.strip("/") if path_params: url += f"/{str.join('/', map(str, path_params))}" return urljoin(base_url.strip("/"), url)
delete(params)
Source Code in griptape/tools/rest_api/tool.py
@activity( config={ "description": dedent( """ This tool can be used to make a delete request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} """, ), "schema": Schema( { schema.Optional(Literal("query_params", description="The request query parameters.")): dict, schema.Optional(Literal("path_params", description="The request path parameters.")): Schema([str]), }, ), }, ) def delete(self, params: dict) -> BaseArtifact: from requests import delete, exceptions values = params["values"] base_url = self.base_url path = self.path query_params = values.get("query_params", {}) path_params = values.get("path_params", []) url = self._build_url(base_url, path=path, path_params=path_params) try: response = delete(url, params=query_params, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err))
get(params)
Source Code in griptape/tools/rest_api/tool.py
@activity( config={ "description": dedent( """ This tool can be used to make a get request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.request_query_parameters %}The request query parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": schema.Optional( Schema( { schema.Optional(Literal("query_params", description="The request query parameters.")): dict, schema.Optional(Literal("path_params", description="The request path parameters.")): Schema( [str] ), }, ), ), }, ) def get(self, params: dict) -> BaseArtifact: from requests import exceptions, get values = params["values"] base_url = self.base_url path = self.path query_params = {} path_params = [] if values: query_params = values.get("query_params", {}) path_params = values.get("path_params", []) url = self._build_url(base_url, path=path, path_params=path_params) try: response = get(url, params=query_params, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err))
patch(params)
Source Code in griptape/tools/rest_api/tool.py
@activity( config={ "description": dedent( """ This tool can be used to make a patch request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_path_parameters %}The request path parameters must follow this JSON schema: {{ _self.request_path_params_schema }}{% endif %} {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": Schema( { Literal("path_params", description="The request path parameters."): Schema([str]), Literal("body", description="The request body."): dict, }, ), }, ) def patch(self, params: dict) -> BaseArtifact: from requests import exceptions, patch values = params["values"] base_url = self.base_url path = self.path body = values["body"] path_params = values["path_params"] url = self._build_url(base_url, path=path, path_params=path_params) try: response = patch(url, json=body, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err))
post(params)
Source Code in griptape/tools/rest_api/tool.py
@activity( config={ "description": dedent( """ This tool can be used to make a post request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": Schema({Literal("body", description="The request body."): dict}), }, ) def post(self, params: dict) -> BaseArtifact: from requests import exceptions, post values = params["values"] base_url = self.base_url path = self.path url = self._build_url(base_url, path=path) body = values["body"] try: response = post(url, json=body, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err))
put(params)
Source Code in griptape/tools/rest_api/tool.py
@activity( config={ "description": dedent( """ This tool can be used to make a put request to the rest api url: {{ _self.full_url }} This rest api has the following description: {{ _self.description }} {% if _self.request_body_schema %}The request body must follow this JSON schema: {{ _self.request_body_schema }}{% endif %} {% if _self.response_body_schema %}The response body must follow this JSON schema: {{ _self.response_body_schema }}{% endif %} """, ), "schema": Schema({Literal("body", description="The request body."): dict}), }, ) def put(self, params: dict) -> BaseArtifact: from requests import exceptions, put values = params["values"] base_url = self.base_url path = self.path body = values["body"] url = self._build_url(base_url, path=path) try: response = put(url, json=body, timeout=30, headers=self.request_headers) return TextArtifact(response.text) except exceptions.RequestException as err: return ErrorArtifact(str(err))
SqlTool
Bases:
BaseTool
Source Code in griptape/tools/sql/tool.py
@define class SqlTool(BaseTool): sql_loader: SqlLoader = field(kw_only=True) schema_name: Optional[str] = field(default=None, kw_only=True) table_name: str = field(kw_only=True) table_description: Optional[str] = field(default=None, kw_only=True) engine_name: Optional[str] = field(default=None, kw_only=True) @property def full_table_name(self) -> str: return f"{self.schema_name}.{self.table_name}" if self.schema_name else self.table_name @property def table_schema(self) -> Optional[str]: return self.sql_loader.sql_driver.get_table_schema(self.full_table_name, schema=self.schema_name) @activity( config={ "description": "Can be used to execute{% if _self.engine_name %} {{ _self.engine_name }}{% endif %} SQL SELECT queries " "in table {{ _self.full_table_name }}. " "Make sure the `SELECT` statement contains enough columns to get an answer without knowing " "the original question. " "Be creative when you use `WHERE` statements: you can use wildcards, `LOWER()`, and other functions " "to get better results. " "You can use JOINs if more tables are available in other tools.\n" "{{ _self.table_name }} schema: {{ _self.table_schema }}\n" "{% if _self.table_description %}{{ _self.table_name }} description: {{ _self.table_description }}{% endif %}", "schema": Schema({"sql_query": str}), }, ) def execute_query(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact: try: query = params["values"]["sql_query"] rows = self.sql_loader.load(query) except Exception as e: return ErrorArtifact(f"error executing query: {e}") if len(rows) > 0: return rows return InfoArtifact("No results found")
engine_name = field(default=None, kw_only=True)
class-attribute instance-attributefull_table_name
propertyschema_name = field(default=None, kw_only=True)
class-attribute instance-attributesql_loader = field(kw_only=True)
class-attribute instance-attributetable_description = field(default=None, kw_only=True)
class-attribute instance-attributetable_name = field(kw_only=True)
class-attribute instance-attributetable_schema
property
execute_query(params)
Source Code in griptape/tools/sql/tool.py
@activity( config={ "description": "Can be used to execute{% if _self.engine_name %} {{ _self.engine_name }}{% endif %} SQL SELECT queries " "in table {{ _self.full_table_name }}. " "Make sure the `SELECT` statement contains enough columns to get an answer without knowing " "the original question. " "Be creative when you use `WHERE` statements: you can use wildcards, `LOWER()`, and other functions " "to get better results. " "You can use JOINs if more tables are available in other tools.\n" "{{ _self.table_name }} schema: {{ _self.table_schema }}\n" "{% if _self.table_description %}{{ _self.table_name }} description: {{ _self.table_description }}{% endif %}", "schema": Schema({"sql_query": str}), }, ) def execute_query(self, params: dict) -> ListArtifact | InfoArtifact | ErrorArtifact: try: query = params["values"]["sql_query"] rows = self.sql_loader.load(query) except Exception as e: return ErrorArtifact(f"error executing query: {e}") if len(rows) > 0: return rows return InfoArtifact("No results found")
StructureRunTool
Bases:
BaseTool
Attributes
Name | Type | Description |
---|---|---|
description | str | A description of what the Structure does. |
structure_run_driver | BaseStructureRunDriver | Driver to run the Structure. |
Source Code in griptape/tools/structure_run/tool.py
@define class StructureRunTool(BaseTool): """Tool for running a Structure. Attributes: description: A description of what the Structure does. structure_run_driver: Driver to run the Structure. """ description: str = field(kw_only=True, metadata={"serializable": True}) structure_run_driver: BaseStructureRunDriver = field(kw_only=True, metadata={"serializable": True}) @activity( config={ "description": "Can be used to run a Gen AI Builder Structure with the following description: {{ _self.description }}", "schema": Schema( { Literal("args", description="A list of string arguments to submit to the Structure Run"): Schema( [str] ) }, ), }, ) def run_structure(self, params: dict) -> BaseArtifact: args: list[str] = params["values"]["args"] return self.structure_run_driver.run(*[TextArtifact(arg) for arg in args])
description = field(kw_only=True, metadata={'serializable': True})
class-attribute instance-attributestructure_run_driver = field(kw_only=True, metadata={'serializable': True})
class-attribute instance-attribute
run_structure(params)
Source Code in griptape/tools/structure_run/tool.py
@activity( config={ "description": "Can be used to run a Gen AI Builder Structure with the following description: {{ _self.description }}", "schema": Schema( { Literal("args", description="A list of string arguments to submit to the Structure Run"): Schema( [str] ) }, ), }, ) def run_structure(self, params: dict) -> BaseArtifact: args: list[str] = params["values"]["args"] return self.structure_run_driver.run(*[TextArtifact(arg) for arg in args])
StructuredOutputTool
Bases:
BaseTool
Source Code in griptape/tools/structured_output/tool.py
@define class StructuredOutputTool(BaseTool): output_schema: Union[Schema, type[BaseModel]] = field(kw_only=True) @activity( config={ "description": "Used to provide the final response which ends this conversation.", "schema": lambda _self: _self.output_schema, } ) def provide_output(self, params: dict) -> BaseArtifact: return JsonArtifact(params["values"])
output_schema = field(kw_only=True)
class-attribute instance-attribute
provide_output(params)
Source Code in griptape/tools/structured_output/tool.py
@activity( config={ "description": "Used to provide the final response which ends this conversation.", "schema": lambda _self: _self.output_schema, } ) def provide_output(self, params: dict) -> BaseArtifact: return JsonArtifact(params["values"])
TextToSpeechTool
Bases:
ArtifactFileOutputMixin
, BaseTool
Attributes
Name | Type | Description |
---|---|---|
text_to_speech_driver | BaseTextToSpeechDriver | The text to audio generation driver used to generate the speech audio. |
output_dir | Optional[str] | If provided, the generated audio will be written to disk in output_dir. |
output_file | Optional[str] | If provided, the generated audio will be written to disk as output_file. |
Source Code in griptape/tools/text_to_speech/tool.py
@define class TextToSpeechTool(ArtifactFileOutputMixin, BaseTool): """A tool that can be used to generate speech from input text. Attributes: text_to_speech_driver: The text to audio generation driver used to generate the speech audio. output_dir: If provided, the generated audio will be written to disk in output_dir. output_file: If provided, the generated audio will be written to disk as output_file. """ text_to_speech_driver: BaseTextToSpeechDriver = field(kw_only=True) @activity( config={ "description": "Can be used to generate speech from the provided input text.", "schema": Schema({Literal("text", description="The literal text to be converted to speech."): str}), }, ) def text_to_speech(self, params: dict[str, Any]) -> AudioArtifact | ErrorArtifact: text = params["values"]["text"] output_artifact = self.text_to_speech_driver.run_text_to_audio(prompts=[text]) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
text_to_speech_driver = field(kw_only=True)
class-attribute instance-attribute
text_to_speech(params)
Source Code in griptape/tools/text_to_speech/tool.py
@activity( config={ "description": "Can be used to generate speech from the provided input text.", "schema": Schema({Literal("text", description="The literal text to be converted to speech."): str}), }, ) def text_to_speech(self, params: dict[str, Any]) -> AudioArtifact | ErrorArtifact: text = params["values"]["text"] output_artifact = self.text_to_speech_driver.run_text_to_audio(prompts=[text]) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
VariationImageGenerationTool
Bases:
BaseImageGenerationTool
Attributes
Name | Type | Description |
---|---|---|
image_generation_driver | BaseImageGenerationDriver | The image generation driver used to generate the image. |
output_dir | Optional[str] | If provided, the generated image will be written to disk in output_dir. |
output_file | Optional[str] | If provided, the generated image will be written to disk as output_file. |
Source Code in griptape/tools/variation_image_generation/tool.py
@define class VariationImageGenerationTool(BaseImageGenerationTool): """A tool that can be used to generate prompted variations of an image. Attributes: image_generation_driver: The image generation driver used to generate the image. output_dir: If provided, the generated image will be written to disk in output_dir. output_file: If provided, the generated image will be written to disk as output_file. """ image_generation_driver: BaseImageGenerationDriver = field(kw_only=True) image_loader: ImageLoader = field(default=ImageLoader(), kw_only=True) @activity( config={ "description": "Generates a variation of a given input image file.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, Literal( "image_file", description="The path to an image file to be used as a base to generate variations from.", ): str, }, ), }, ) def image_variation_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_file = params["values"]["image_file"] image_artifact = self.image_loader.load(image_file) return self._generate_variation(prompt, negative_prompt, image_artifact) @activity( config={ "description": "Generates a variation of a given input image artifact in memory.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, "memory_name": str, "artifact_namespace": str, "artifact_name": str, }, ), }, ) def image_variation_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] artifact_namespace = params["values"]["artifact_namespace"] artifact_name = params["values"]["artifact_name"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") try: image_artifact = load_artifact_from_memory(memory, artifact_namespace, artifact_name, ImageArtifact) except ValueError as e: return ErrorArtifact(str(e)) return self._generate_variation(prompt, negative_prompt, cast("ImageArtifact", image_artifact)) def _generate_variation( self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact ) -> ImageArtifact | ErrorArtifact: output_artifact = self.image_generation_driver.run_image_variation( prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_generation_driver = field(kw_only=True)
class-attribute instance-attributeimage_loader = field(default=ImageLoader(), kw_only=True)
class-attribute instance-attribute
_generate_variation(prompt, negative_prompt, image_artifact)
Source Code in griptape/tools/variation_image_generation/tool.py
def _generate_variation( self, prompt: str, negative_prompt: str, image_artifact: ImageArtifact ) -> ImageArtifact | ErrorArtifact: output_artifact = self.image_generation_driver.run_image_variation( prompts=[prompt], negative_prompts=[negative_prompt], image=image_artifact ) if self.output_dir or self.output_file: self._write_to_file(output_artifact) return output_artifact
image_variation_from_file(params)
Source Code in griptape/tools/variation_image_generation/tool.py
@activity( config={ "description": "Generates a variation of a given input image file.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, Literal( "image_file", description="The path to an image file to be used as a base to generate variations from.", ): str, }, ), }, ) def image_variation_from_file(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] image_file = params["values"]["image_file"] image_artifact = self.image_loader.load(image_file) return self._generate_variation(prompt, negative_prompt, image_artifact)
image_variation_from_memory(params)
Source Code in griptape/tools/variation_image_generation/tool.py
@activity( config={ "description": "Generates a variation of a given input image artifact in memory.", "schema": Schema( { Literal("prompt", description=BaseImageGenerationTool.PROMPT_DESCRIPTION): str, Literal("negative_prompt", description=BaseImageGenerationTool.NEGATIVE_PROMPT_DESCRIPTION): str, "memory_name": str, "artifact_namespace": str, "artifact_name": str, }, ), }, ) def image_variation_from_memory(self, params: dict[str, dict[str, str]]) -> ImageArtifact | ErrorArtifact: prompt = params["values"]["prompt"] negative_prompt = params["values"]["negative_prompt"] artifact_namespace = params["values"]["artifact_namespace"] artifact_name = params["values"]["artifact_name"] memory = self.find_input_memory(params["values"]["memory_name"]) if memory is None: return ErrorArtifact("memory not found") try: image_artifact = load_artifact_from_memory(memory, artifact_namespace, artifact_name, ImageArtifact) except ValueError as e: return ErrorArtifact(str(e)) return self._generate_variation(prompt, negative_prompt, cast("ImageArtifact", image_artifact))
VectorStoreTool
Bases:
BaseTool
Attributes
Name | Type | Description |
---|---|---|
description | str | LLM-friendly vector DB description. |
vector_store_driver | BaseVectorStoreDriver | BaseVectorStoreDriver . |
query_params | dict[str, Any] | Optional dictionary of vector store driver query parameters. |
process_query_output | Callable[[list[Entry]], BaseArtifact] | Optional lambda for processing vector store driver query output Entry s. |
Source Code in griptape/tools/vector_store/tool.py
@define(kw_only=True) class VectorStoreTool(BaseTool): """A tool for querying a vector database. Attributes: description: LLM-friendly vector DB description. vector_store_driver: `BaseVectorStoreDriver`. query_params: Optional dictionary of vector store driver query parameters. process_query_output: Optional lambda for processing vector store driver query output `Entry`s. """ DEFAULT_TOP_N = 5 description: str = field() vector_store_driver: BaseVectorStoreDriver = field() query_params: dict[str, Any] = field(factory=dict) process_query_output: Callable[[list[BaseVectorStoreDriver.Entry]], BaseArtifact] = field( default=Factory(lambda: lambda es: ListArtifact([e.to_artifact() for e in es])), ) @activity( config={ "description": "Can be used to search a database with the following description: {{ _self.description }}", "schema": Schema( { Literal( "query", description="A natural language search query to run against the vector database", ): str, }, ), }, ) def search(self, params: dict) -> BaseArtifact: query = params["values"]["query"] try: return self.process_query_output(self.vector_store_driver.query(query, **self.query_params)) except Exception as e: return ErrorArtifact(f"error querying vector store: {e}")
DEFAULT_TOP_N = 5
class-attribute instance-attributedescription = field()
class-attribute instance-attributeprocess_query_output = field(default=Factory(lambda: lambda es: ListArtifact([e.to_artifact() for e in es])))
class-attribute instance-attributequery_params = field(factory=dict)
class-attribute instance-attributevector_store_driver = field()
class-attribute instance-attribute
search(params)
Source Code in griptape/tools/vector_store/tool.py
@activity( config={ "description": "Can be used to search a database with the following description: {{ _self.description }}", "schema": Schema( { Literal( "query", description="A natural language search query to run against the vector database", ): str, }, ), }, ) def search(self, params: dict) -> BaseArtifact: query = params["values"]["query"] try: return self.process_query_output(self.vector_store_driver.query(query, **self.query_params)) except Exception as e: return ErrorArtifact(f"error querying vector store: {e}")
WebScraperTool
Bases:
BaseTool
Source Code in griptape/tools/web_scraper/tool.py
@define class WebScraperTool(BaseTool): web_loader: WebLoader = field(default=Factory(lambda: WebLoader()), kw_only=True) text_chunker: TextChunker = field(default=Factory(lambda: TextChunker(max_tokens=400)), kw_only=True) @activity( config={ "description": "Can be used to browse a web page and load its content", "schema": Schema({Literal("url", description="Valid HTTP URL"): str}), }, ) def get_content(self, params: dict) -> ListArtifact | ErrorArtifact: url = params["values"]["url"] try: result = self.web_loader.load(url) chunks = self.text_chunker.chunk(result) return ListArtifact(chunks) except Exception as e: return ErrorArtifact("Error getting page content: " + str(e))
text_chunker = field(default=Factory(lambda: TextChunker(max_tokens=400)), kw_only=True)
class-attribute instance-attributeweb_loader = field(default=Factory(lambda: WebLoader()), kw_only=True)
class-attribute instance-attribute
get_content(params)
Source Code in griptape/tools/web_scraper/tool.py
@activity( config={ "description": "Can be used to browse a web page and load its content", "schema": Schema({Literal("url", description="Valid HTTP URL"): str}), }, ) def get_content(self, params: dict) -> ListArtifact | ErrorArtifact: url = params["values"]["url"] try: result = self.web_loader.load(url) chunks = self.text_chunker.chunk(result) return ListArtifact(chunks) except Exception as e: return ErrorArtifact("Error getting page content: " + str(e))
WebSearchTool
Bases:
BaseTool
Source Code in griptape/tools/web_search/tool.py
@define class WebSearchTool(BaseTool): web_search_driver: BaseWebSearchDriver = field(kw_only=True) @activity( config={ "description": "Can be used for searching the web via the {{ _self.web_search_driver.__class__.__name__}}.", "schema": Schema( { Literal( "query", description="Search engine request that returns a list of pages with titles, descriptions, and URLs", ): str, }, ), }, ) def search(self, values: dict) -> ListArtifact | ErrorArtifact: # `BaseWebSearchDriver.query` already has a parameter named `query`, so we need to pop it from the values # to avoid passing it twice. query = values.pop("query") try: return self.web_search_driver.search(query, **values) except Exception as e: return ErrorArtifact(f"Error searching '{query}' with {self.web_search_driver.__class__.__name__}: {e}")
web_search_driver = field(kw_only=True)
class-attribute instance-attribute
search(values)
Source Code in griptape/tools/web_search/tool.py
@activity( config={ "description": "Can be used for searching the web via the {{ _self.web_search_driver.__class__.__name__}}.", "schema": Schema( { Literal( "query", description="Search engine request that returns a list of pages with titles, descriptions, and URLs", ): str, }, ), }, ) def search(self, values: dict) -> ListArtifact | ErrorArtifact: # `BaseWebSearchDriver.query` already has a parameter named `query`, so we need to pop it from the values # to avoid passing it twice. query = values.pop("query") try: return self.web_search_driver.search(query, **values) except Exception as e: return ErrorArtifact(f"Error searching '{query}' with {self.web_search_driver.__class__.__name__}: {e}")
- On this page
- BaseImageGenerationTool
- BaseTool
- CalculatorTool
- ComputerTool
- DateTimeTool
- EmailTool
- ExtractionTool
- FileManagerTool
- GriptapeCloudToolTool
- ImageQueryTool
- InpaintingImageGenerationTool
- OutpaintingImageGenerationTool
- PromptImageGenerationTool
- PromptSummaryTool
- QueryTool
- RagTool
- RestApiTool
- SqlTool
- StructureRunTool
- StructuredOutputTool
- TextToSpeechTool
- VariationImageGenerationTool
- VectorStoreTool
- WebScraperTool
- WebSearchTool
Could this page be better? Report a problem or suggest an addition!