utils
__all__ = ['J2', 'Chat', 'CommandRunner', 'Conversation', 'GriptapeCloudStructure', 'ManifestValidator', 'PythonRunner', 'Stream', 'StructureVisualizer', 'TokenCounter', 'add_key_in_dict_recursively', 'build_strict_schema', 'deprecation_warn', 'dict_merge', 'execute_futures_dict', 'execute_futures_list', 'execute_futures_list_dict', 'get_mime_type', 'import_optional_dependency', 'is_dependency_installed', 'load_artifact_from_memory', 'minify_json', 'references_from_artifacts', 'remove_key_in_dict_recursively', 'remove_null_values_in_dict_recursively', 'resolve_refs', 'str_to_hash', 'with_contextvars']
module-attribute
Utility for running a chat with a Structure.
Attributes
Name | Type | Description |
---|---|---|
structure | Structure | The Structure to run. |
exit_keywords | list[str] | Keywords that will exit the chat. |
exiting_text | str | Text to display when exiting the chat. |
processing_text | str | Text to display while processing the user's input. |
intro_text | Optional[str] | Text to display when the chat starts. |
prompt_prefix | str | Prefix for the user's input. |
response_prefix | str | Prefix for the assistant's response. |
handle_input | Callable[[str], str] | Function to get the user's input. |
handle_output | Callable[..., None] | Function to output text. Takes a text argument for the text to output. Also takes a stream argument which will be set to True when streaming Prompt Tasks are present. |
Source Code in griptape/utils/chat.py
@define class Chat: """Utility for running a chat with a Structure. Attributes: structure: The Structure to run. exit_keywords: Keywords that will exit the chat. exiting_text: Text to display when exiting the chat. processing_text: Text to display while processing the user's input. intro_text: Text to display when the chat starts. prompt_prefix: Prefix for the user's input. response_prefix: Prefix for the assistant's response. handle_input: Function to get the user's input. handle_output: Function to output text. Takes a `text` argument for the text to output. Also takes a `stream` argument which will be set to True when streaming Prompt Tasks are present. """ class ChatPrompt(Prompt): prompt_suffix = "" # We don't want rich's default prompt suffix structure: Structure = field() exit_keywords: list[str] = field(default=["exit"], kw_only=True) exiting_text: str = field(default="Exiting...", kw_only=True) processing_text: str = field(default="Thinking...", kw_only=True) intro_text: Optional[str] = field(default=None, kw_only=True) prompt_prefix: str = field(default="User: ", kw_only=True) response_prefix: str = field(default="Assistant: ", kw_only=True) handle_input: Callable[[str], str] = field( default=Factory(lambda self: self.default_handle_input, takes_self=True), kw_only=True ) handle_output: Callable[..., None] = field( default=Factory(lambda self: self.default_handle_output, takes_self=True), kw_only=True, ) logger_level: int = field(default=logging.ERROR, kw_only=True) def default_handle_input(self, prompt_prefix: str) -> str: return Chat.ChatPrompt.ask(prompt_prefix) def default_handle_output(self, text: str, *, stream: bool = False) -> None: if stream: rprint(text, end="", flush=True) else: rprint(text) def start(self) -> None: from griptape.configs import Defaults # Hide Gen AI Builder's logging output except for errors old_logger_level = logging.getLogger(Defaults.logging_config.logger_name).getEffectiveLevel() logging.getLogger(Defaults.logging_config.logger_name).setLevel(self.logger_level) if self.intro_text: self._call_handle_output(self.intro_text) has_streaming_tasks = self._has_streaming_tasks() while True: question = self.handle_input(self.prompt_prefix) if question.lower() in self.exit_keywords: self._call_handle_output(self.exiting_text) break if has_streaming_tasks: self._call_handle_output(self.processing_text) stream = Stream(self.structure).run(question) first_chunk = next(stream) self._call_handle_output(self.response_prefix + first_chunk.value, stream=True) for chunk in stream: self._call_handle_output(chunk.value, stream=True) else: self._call_handle_output(self.processing_text) self._call_handle_output(f"{self.response_prefix}{self.structure.run(question).output.to_text()}") # Restore the original logger level logging.getLogger(Defaults.logging_config.logger_name).setLevel(old_logger_level) def _has_streaming_tasks(self) -> bool: from griptape.tasks.prompt_task import PromptTask return any(isinstance(task, PromptTask) and task.prompt_driver.stream for task in self.structure.tasks) def _call_handle_output(self, text: str, *, stream: bool = False) -> None: func_params = inspect.signature(self.handle_output).parameters.copy() has_kwargs = False for param in func_params.values(): # if there is a **kwargs parameter, we can safely # pass all the params to the function if param.kind == inspect.Parameter.VAR_KEYWORD: has_kwargs = True break if "stream" in func_params or has_kwargs: self.handle_output(text, stream=stream) else: self.handle_output(text)
exit_keywords = field(default=['exit'], kw_only=True)
class-attribute instance-attributeexiting_text = field(default='Exiting...', kw_only=True)
class-attribute instance-attributehandle_input = field(default=Factory(lambda self: self.default_handle_input, takes_self=True), kw_only=True)
class-attribute instance-attributehandle_output = field(default=Factory(lambda self: self.default_handle_output, takes_self=True), kw_only=True)
class-attribute instance-attributeintro_text = field(default=None, kw_only=True)
class-attribute instance-attributelogger_level = field(default=logging.ERROR, kw_only=True)
class-attribute instance-attributeprocessing_text = field(default='Thinking...', kw_only=True)
class-attribute instance-attributeprompt_prefix = field(default='User: ', kw_only=True)
class-attribute instance-attributeresponse_prefix = field(default='Assistant: ', kw_only=True)
class-attribute instance-attributestructure = field()
class-attribute instance-attribute
ChatPrompt
Bases:
PromptSource Code in griptape/utils/chat.py
class ChatPrompt(Prompt): prompt_suffix = "" # We don't want rich's default prompt suffix
prompt_suffix = ''
class-attribute instance-attribute
_call_handle_output(text, *, stream=False)
Source Code in griptape/utils/chat.py
def _call_handle_output(self, text: str, *, stream: bool = False) -> None: func_params = inspect.signature(self.handle_output).parameters.copy() has_kwargs = False for param in func_params.values(): # if there is a **kwargs parameter, we can safely # pass all the params to the function if param.kind == inspect.Parameter.VAR_KEYWORD: has_kwargs = True break if "stream" in func_params or has_kwargs: self.handle_output(text, stream=stream) else: self.handle_output(text)
_has_streaming_tasks()
Source Code in griptape/utils/chat.py
def _has_streaming_tasks(self) -> bool: from griptape.tasks.prompt_task import PromptTask return any(isinstance(task, PromptTask) and task.prompt_driver.stream for task in self.structure.tasks)
default_handle_input(prompt_prefix)
Source Code in griptape/utils/chat.py
def default_handle_input(self, prompt_prefix: str) -> str: return Chat.ChatPrompt.ask(prompt_prefix)
default_handle_output(text, *, stream=False)
Source Code in griptape/utils/chat.py
def default_handle_output(self, text: str, *, stream: bool = False) -> None: if stream: rprint(text, end="", flush=True) else: rprint(text)
start()
Source Code in griptape/utils/chat.py
def start(self) -> None: from griptape.configs import Defaults # Hide Gen AI Builder's logging output except for errors old_logger_level = logging.getLogger(Defaults.logging_config.logger_name).getEffectiveLevel() logging.getLogger(Defaults.logging_config.logger_name).setLevel(self.logger_level) if self.intro_text: self._call_handle_output(self.intro_text) has_streaming_tasks = self._has_streaming_tasks() while True: question = self.handle_input(self.prompt_prefix) if question.lower() in self.exit_keywords: self._call_handle_output(self.exiting_text) break if has_streaming_tasks: self._call_handle_output(self.processing_text) stream = Stream(self.structure).run(question) first_chunk = next(stream) self._call_handle_output(self.response_prefix + first_chunk.value, stream=True) for chunk in stream: self._call_handle_output(chunk.value, stream=True) else: self._call_handle_output(self.processing_text) self._call_handle_output(f"{self.response_prefix}{self.structure.run(question).output.to_text()}") # Restore the original logger level logging.getLogger(Defaults.logging_config.logger_name).setLevel(old_logger_level)
CommandRunner
Source Code in griptape/utils/command_runner.py
@define class CommandRunner: def run(self, command: str) -> BaseArtifact: process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() if len(stderr) == 0: return TextArtifact(stdout.strip().decode()) return ErrorArtifact(f"error: {stderr.strip()}")
run(command)
Source Code in griptape/utils/command_runner.py
def run(self, command: str) -> BaseArtifact: process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() if len(stderr) == 0: return TextArtifact(stdout.strip().decode()) return ErrorArtifact(f"error: {stderr.strip()}")
Conversation
Source Code in griptape/utils/conversation.py
@define(frozen=True) class Conversation: memory: Optional[BaseConversationMemory] = field() @memory.validator # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] def validate_memory(self, attribute: Attribute, value: Optional[BaseConversationMemory]) -> None: if value is None: raise ValueError("Conversation memory must not be None.") def lines(self) -> list[str]: from griptape.memory.structure import SummaryConversationMemory lines = [] for run in self.memory.runs if self.memory is not None else []: lines.extend((f"Q: {run.input}", f"A: {run.output}")) if isinstance(self.memory, SummaryConversationMemory): lines.append(f"Summary: {self.memory.summary}") return lines def prompt_stack(self) -> list[str]: from griptape.memory.structure import SummaryConversationMemory lines = [] for stack in self.memory.to_prompt_stack().messages if self.memory is not None else []: lines.append(f"{stack.role}: {stack.to_text()}") if isinstance(self.memory, SummaryConversationMemory): lines.append(f"Summary: {self.memory.summary}") return lines def __str__(self) -> str: return str.join("\n", self.lines())
memory = field()
class-attribute instance-attribute
str()
Source Code in griptape/utils/conversation.py
def __str__(self) -> str: return str.join("\n", self.lines())
lines()
Source Code in griptape/utils/conversation.py
def lines(self) -> list[str]: from griptape.memory.structure import SummaryConversationMemory lines = [] for run in self.memory.runs if self.memory is not None else []: lines.extend((f"Q: {run.input}", f"A: {run.output}")) if isinstance(self.memory, SummaryConversationMemory): lines.append(f"Summary: {self.memory.summary}") return lines
prompt_stack()
Source Code in griptape/utils/conversation.py
def prompt_stack(self) -> list[str]: from griptape.memory.structure import SummaryConversationMemory lines = [] for stack in self.memory.to_prompt_stack().messages if self.memory is not None else []: lines.append(f"{stack.role}: {stack.to_text()}") if isinstance(self.memory, SummaryConversationMemory): lines.append(f"Summary: {self.memory.summary}") return lines
validate_memory(attribute, value)
Source Code in griptape/utils/conversation.py
@memory.validator # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] def validate_memory(self, attribute: Attribute, value: Optional[BaseConversationMemory]) -> None: if value is None: raise ValueError("Conversation memory must not be None.")
GriptapeCloudStructure
Utility for working with Gen AI Builder Structures.
Attributes
Name | Type | Description |
---|---|---|
_event_listener | Optional[EventListener] | Event Listener to use. Defaults to an EventListener with a GriptapeCloudEventListenerDriver. |
_observability | Optional[Observability] | Observability to use. Defaults to an Observability with a GriptapeCloudObservabilityDriver. |
observe | bool | Whether to enable observability. Enabling requires the drivers-observability-griptape-cloud extra. |
Source Code in griptape/utils/griptape_cloud.py
@define() class GriptapeCloudStructure: """Utility for working with Gen AI Builder Structures. Attributes: _event_listener: Event Listener to use. Defaults to an EventListener with a GriptapeCloudEventListenerDriver. _observability: Observability to use. Defaults to an Observability with a GriptapeCloudObservabilityDriver. observe: Whether to enable observability. Enabling requires the `drivers-observability-griptape-cloud` extra. """ _event_listener: Optional[EventListener] = field(default=None, kw_only=True, alias="event_listener") _observability: Optional[Observability] = field(default=None, kw_only=True, alias="observability") observe: bool = field(default=False, kw_only=True) _output: Optional[BaseArtifact] = field(default=None, init=False) @lazy_property() def event_listener(self) -> EventListener: from griptape.drivers.event_listener.griptape_cloud import GriptapeCloudEventListenerDriver return EventListener(event_listener_driver=GriptapeCloudEventListenerDriver()) @lazy_property() def observability(self) -> Observability: from griptape.drivers.observability.griptape_cloud import GriptapeCloudObservabilityDriver from griptape.observability.observability import Observability return Observability(observability_driver=GriptapeCloudObservabilityDriver()) @property def output(self) -> Optional[BaseArtifact]: return self._output @output.setter def output(self, value: BaseArtifact | Any) -> None: if isinstance(value, BaseArtifact): self._output = value elif isinstance(value, list): self._output = ListArtifact([self._to_artifact(item) for item in value]) else: self._output = self._to_artifact(value) @property def structure_run_id(self) -> str: return os.environ["GT_CLOUD_STRUCTURE_RUN_ID"] @property def in_managed_environment(self) -> bool: return "GT_CLOUD_STRUCTURE_RUN_ID" in os.environ def __enter__(self) -> Self: from griptape.observability.observability import Observability if self.in_managed_environment: EventBus.add_event_listener(self.event_listener) if self.observe: Observability.set_global_driver(self.observability.observability_driver) self.observability.observability_driver.__enter__() return self def __exit__( self, exc_type: Optional[type[BaseException]], exc_value: Optional[BaseException], exc_traceback: Optional[TracebackType], ) -> None: from griptape.observability.observability import Observability if self.in_managed_environment: if self.output is not None: EventBus.publish_event(FinishStructureRunEvent(output_task_output=self.output), flush=True) EventBus.remove_event_listener(self.event_listener) if self.observe: Observability.set_global_driver(None) self.observability.observability_driver.__exit__(exc_type, exc_value, exc_traceback) def _to_artifact(self, value: Any) -> BaseArtifact: if isinstance(value, str): return TextArtifact(value) if isinstance(value, bool): return BooleanArtifact(value) if isinstance(value, dict): return JsonArtifact(value) if isinstance(value, bytes): return BlobArtifact(value) return GenericArtifact(value)
_event_listener = field(default=None, kw_only=True, alias='event_listener')
class-attribute instance-attribute_observability = field(default=None, kw_only=True, alias='observability')
class-attribute instance-attribute_output = field(default=None, init=False)
class-attribute instance-attributein_managed_environment
propertyobserve = field(default=False, kw_only=True)
class-attribute instance-attributeoutput
property writablestructure_run_id
property
enter()
Source Code in griptape/utils/griptape_cloud.py
def __enter__(self) -> Self: from griptape.observability.observability import Observability if self.in_managed_environment: EventBus.add_event_listener(self.event_listener) if self.observe: Observability.set_global_driver(self.observability.observability_driver) self.observability.observability_driver.__enter__() return self
exit(exc_type, exc_value, exc_traceback)
Source Code in griptape/utils/griptape_cloud.py
def __exit__( self, exc_type: Optional[type[BaseException]], exc_value: Optional[BaseException], exc_traceback: Optional[TracebackType], ) -> None: from griptape.observability.observability import Observability if self.in_managed_environment: if self.output is not None: EventBus.publish_event(FinishStructureRunEvent(output_task_output=self.output), flush=True) EventBus.remove_event_listener(self.event_listener) if self.observe: Observability.set_global_driver(None) self.observability.observability_driver.__exit__(exc_type, exc_value, exc_traceback)
_to_artifact(value)
Source Code in griptape/utils/griptape_cloud.py
def _to_artifact(self, value: Any) -> BaseArtifact: if isinstance(value, str): return TextArtifact(value) if isinstance(value, bool): return BooleanArtifact(value) if isinstance(value, dict): return JsonArtifact(value) if isinstance(value, bytes): return BlobArtifact(value) return GenericArtifact(value)
event_listener()
Source Code in griptape/utils/griptape_cloud.py
@lazy_property() def event_listener(self) -> EventListener: from griptape.drivers.event_listener.griptape_cloud import GriptapeCloudEventListenerDriver return EventListener(event_listener_driver=GriptapeCloudEventListenerDriver())
observability()
Source Code in griptape/utils/griptape_cloud.py
@lazy_property() def observability(self) -> Observability: from griptape.drivers.observability.griptape_cloud import GriptapeCloudObservabilityDriver from griptape.observability.observability import Observability return Observability(observability_driver=GriptapeCloudObservabilityDriver())
J2
Source Code in griptape/utils/j2.py
@define(frozen=True) class J2: template_name: Optional[str] = field(default=None) templates_dir: str = field(default=abs_path("templates"), kw_only=True) environment: Environment = field( default=Factory( lambda self: Environment(loader=FileSystemLoader(self.templates_dir), trim_blocks=True, lstrip_blocks=True), takes_self=True, ), kw_only=True, ) def render(self, **kwargs) -> str: if self.template_name is None: raise ValueError("template_name is required.") return self.environment.get_template(self.template_name).render(kwargs).rstrip() def render_from_string(self, value: str, **kwargs) -> str: return self.environment.from_string(value).render(kwargs)
environment = field(default=Factory(lambda self: Environment(loader=FileSystemLoader(self.templates_dir), trim_blocks=True, lstrip_blocks=True), takes_self=True), kw_only=True)
class-attribute instance-attributetemplate_name = field(default=None)
class-attribute instance-attributetemplates_dir = field(default=abs_path('templates'), kw_only=True)
class-attribute instance-attribute
render(**kwargs)
Source Code in griptape/utils/j2.py
def render(self, **kwargs) -> str: if self.template_name is None: raise ValueError("template_name is required.") return self.environment.get_template(self.template_name).render(kwargs).rstrip()
render_from_string(value, **kwargs)
Source Code in griptape/utils/j2.py
def render_from_string(self, value: str, **kwargs) -> str: return self.environment.from_string(value).render(kwargs)
ManifestValidator
Source Code in griptape/utils/manifest_validator.py
class ManifestValidator: def validate(self, manifest: dict) -> dict: return self.schema().validate(manifest) def schema(self) -> Schema: return Schema({"version": "v1", "name": str, "description": str, "contact_email": str, "legal_info_url": str})
schema()
Source Code in griptape/utils/manifest_validator.py
def schema(self) -> Schema: return Schema({"version": "v1", "name": str, "description": str, "contact_email": str, "legal_info_url": str})
validate(manifest)
Source Code in griptape/utils/manifest_validator.py
def validate(self, manifest: dict) -> dict: return self.schema().validate(manifest)
PythonRunner
Source Code in griptape/utils/python_runner.py
@define class PythonRunner: libs: dict[str, str] = field(factory=dict, kw_only=True) def run(self, code: str) -> str: global_stdout = sys.stdout sys.stdout = local_stdout = StringIO() try: for lib, alias in self.libs.items(): imported_lib = importlib.import_module(lib) globals()[alias] = imported_lib exec(f"print({code})", {}, {alias: eval(alias) for alias in self.libs.values()}) output = local_stdout.getvalue() except Exception as e: output = str(e) finally: sys.stdout = global_stdout return output.strip()
libs = field(factory=dict, kw_only=True)
class-attribute instance-attribute
run(code)
Source Code in griptape/utils/python_runner.py
def run(self, code: str) -> str: global_stdout = sys.stdout sys.stdout = local_stdout = StringIO() try: for lib, alias in self.libs.items(): imported_lib = importlib.import_module(lib) globals()[alias] = imported_lib exec(f"print({code})", {}, {alias: eval(alias) for alias in self.libs.values()}) output = local_stdout.getvalue() except Exception as e: output = str(e) finally: sys.stdout = global_stdout return output.strip()
Stream
A wrapper for Structures filters Events relevant to text output and converts them to TextArtifacts.
Attributes
Source Code in griptape/utils/stream.py
@define class Stream: """A wrapper for Structures filters Events relevant to text output and converts them to TextArtifacts. Attributes: structure: The Structure to wrap. """ structure: Structure = field() event_types: list[type[BaseEvent]] = field( default=Factory(lambda: [TextChunkEvent, ActionChunkEvent, FinishPromptEvent, FinishStructureRunEvent]) ) def run(self, *args) -> Iterator[TextArtifact]: action_str = "" for event in self.structure.run_stream(*args, event_types=self.event_types): if isinstance(event, FinishPromptEvent): yield TextArtifact(value="\n") elif isinstance(event, TextChunkEvent): yield TextArtifact(value=event.token) elif isinstance(event, ActionChunkEvent): if event.tag is not None and event.name is not None and event.path is not None: yield TextArtifact(f"{event.name}.{event.tag} ({event.path})") if event.partial_input is not None: action_str += event.partial_input try: yield TextArtifact(json.dumps(json.loads(action_str), indent=2)) action_str = "" except Exception: pass
event_types = field(default=Factory(lambda: [TextChunkEvent, ActionChunkEvent, FinishPromptEvent, FinishStructureRunEvent]))
class-attribute instance-attributestructure = field()
class-attribute instance-attribute
run(*args)
Source Code in griptape/utils/stream.py
def run(self, *args) -> Iterator[TextArtifact]: action_str = "" for event in self.structure.run_stream(*args, event_types=self.event_types): if isinstance(event, FinishPromptEvent): yield TextArtifact(value="\n") elif isinstance(event, TextChunkEvent): yield TextArtifact(value=event.token) elif isinstance(event, ActionChunkEvent): if event.tag is not None and event.name is not None and event.path is not None: yield TextArtifact(f"{event.name}.{event.tag} ({event.path})") if event.partial_input is not None: action_str += event.partial_input try: yield TextArtifact(json.dumps(json.loads(action_str), indent=2)) action_str = "" except Exception: pass
StructureVisualizer
Source Code in griptape/utils/structure_visualizer.py
@define class StructureVisualizer: """Utility class to visualize a Structure structure.""" structure: Structure = field() header: str = field(default="graph TD;", kw_only=True) build_node_id: Callable[[BaseTask], str] = field(default=lambda task: task.id.title(), kw_only=True) query_params: dict[str, str] = field(factory=dict, kw_only=True) base_url: str = field(default="https://mermaid.ink", kw_only=True) def to_url(self) -> str: """Generates a url that renders the Workflow structure as a Mermaid flowchart. Reference: https://mermaid.js.org/ecosystem/tutorials#jupyter-integration-with-mermaid-js. Returns: str: URL to the rendered image """ self.structure.resolve_relationships() tasks = self.__render_tasks(self.structure.tasks) graph = f"{self.header}{tasks}" graph_bytes = graph.encode("utf-8") base64_string = base64.b64encode(graph_bytes).decode("utf-8") url = urllib.parse.urljoin(self.base_url, f"svg/{base64_string}") req = PreparedRequest() req.prepare_url(url, self.query_params) if req.url is None: raise ValueError("Failed to generate the URL") return req.url def __render_tasks(self, tasks: list[BaseTask]) -> str: return "\n\t" + "\n\t".join([self.__render_task(task) for task in tasks]) def __render_task(self, task: BaseTask) -> str: from griptape.drivers.structure_run.local import LocalStructureRunDriver from griptape.tasks import BranchTask, StructureRunTask parts = [] if task.children: children = " & ".join([f"{self.build_node_id(child)}" for child in task.children]) if isinstance(task, BranchTask): parts.append(f"{self.build_node_id(task)}{{ {self.build_node_id(task)} }}-.-> {children};") else: parts.append(f"{self.build_node_id(task)}--> {children};") else: parts.append(f"{self.build_node_id(task)};") if isinstance(task, StructureRunTask) and isinstance(task.structure_run_driver, LocalStructureRunDriver): sub_structure = task.structure_run_driver.create_structure() sub_tasks = self.__render_tasks(sub_structure.tasks) parts.append(f"subgraph {self.build_node_id(task)}{sub_tasks}\n\tend") return "\n\t".join(parts)
base_url = field(default='https://mermaid.ink', kw_only=True)
class-attribute instance-attributebuild_node_id = field(default=lambda task: task.id.title(), kw_only=True)
class-attribute instance-attributeheader = field(default='graph TD;', kw_only=True)
class-attribute instance-attributequery_params = field(factory=dict, kw_only=True)
class-attribute instance-attributestructure = field()
class-attribute instance-attribute
__render_task(task)
Source Code in griptape/utils/structure_visualizer.py
def __render_task(self, task: BaseTask) -> str: from griptape.drivers.structure_run.local import LocalStructureRunDriver from griptape.tasks import BranchTask, StructureRunTask parts = [] if task.children: children = " & ".join([f"{self.build_node_id(child)}" for child in task.children]) if isinstance(task, BranchTask): parts.append(f"{self.build_node_id(task)}{{ {self.build_node_id(task)} }}-.-> {children};") else: parts.append(f"{self.build_node_id(task)}--> {children};") else: parts.append(f"{self.build_node_id(task)};") if isinstance(task, StructureRunTask) and isinstance(task.structure_run_driver, LocalStructureRunDriver): sub_structure = task.structure_run_driver.create_structure() sub_tasks = self.__render_tasks(sub_structure.tasks) parts.append(f"subgraph {self.build_node_id(task)}{sub_tasks}\n\tend") return "\n\t".join(parts)
__render_tasks(tasks)
Source Code in griptape/utils/structure_visualizer.py
def __render_tasks(self, tasks: list[BaseTask]) -> str: return "\n\t" + "\n\t".join([self.__render_task(task) for task in tasks])
to_url()
Generates a url that renders the Workflow structure as a Mermaid flowchart.
Reference: https://mermaid.js.org/ecosystem/tutorials#jupyter-integration-with-mermaid-js.
Returns
Name | Type | Description |
---|---|---|
str | str | URL to the rendered image |
Source Code in griptape/utils/structure_visualizer.py
def to_url(self) -> str: """Generates a url that renders the Workflow structure as a Mermaid flowchart. Reference: https://mermaid.js.org/ecosystem/tutorials#jupyter-integration-with-mermaid-js. Returns: str: URL to the rendered image """ self.structure.resolve_relationships() tasks = self.__render_tasks(self.structure.tasks) graph = f"{self.header}{tasks}" graph_bytes = graph.encode("utf-8") base64_string = base64.b64encode(graph_bytes).decode("utf-8") url = urllib.parse.urljoin(self.base_url, f"svg/{base64_string}") req = PreparedRequest() req.prepare_url(url, self.query_params) if req.url is None: raise ValueError("Failed to generate the URL") return req.url
TokenCounter
Source Code in griptape/utils/token_counter.py
@define class TokenCounter: tokens: int = field(default=0, kw_only=True) def add_tokens(self, new_tokens: float) -> int: self.tokens += int(new_tokens) return self.tokens
tokens = field(default=0, kw_only=True)
class-attribute instance-attribute
add_tokens(new_tokens)
Source Code in griptape/utils/token_counter.py
def add_tokens(self, new_tokens: float) -> int: self.tokens += int(new_tokens) return self.tokens
add_key_in_dict_recursively(d, key, value, criteria=None)
Add a key in a dictionary recursively.
Parameters
Name | Type | Description | Default |
---|---|---|---|
d | Any | The dictionary to add the key to. | required |
key | str | The key to add. | required |
value | Any | The value to add. | required |
criteria | Optional[Callable[[dict], bool]] | An optional function to determine if the key should be added. | None |
Source Code in griptape/utils/dict_utils.py
def add_key_in_dict_recursively( d: Any, key: str, value: Any, criteria: Optional[Callable[[dict], bool]] = None ) -> dict: """Add a key in a dictionary recursively. Args: d: The dictionary to add the key to. key: The key to add. value: The value to add. criteria: An optional function to determine if the key should be added. """ if isinstance(d, dict): if criteria is None or criteria(d): d[key] = value return {k: add_key_in_dict_recursively(v, key, value, criteria) for k, v in d.items()} return d
build_strict_schema(json_schema, schema_id)
Performs a series of post-processing steps to ensure a JSON schema is compatible with LLMs.
- Adds the
$id
and$schema
keys. - Sets
additionalProperties
toFalse
for objects without this key. - Resolves
$ref
s and removes$defs
.
Parameters
Name | Type | Description | Default |
---|---|---|---|
json_schema | dict | The JSON schema to ensure is strict. | required |
schema_id | str | The ID of the schema. | required |
Source Code in griptape/utils/json_schema_utils.py
def build_strict_schema(json_schema: dict, schema_id: str) -> dict: """Performs a series of post-processing steps to ensure a JSON schema is compatible with LLMs. 1. Adds the `$id` and `$schema` keys. 2. Sets `additionalProperties` to `False` for objects without this key. 3. Resolves `$ref`s and removes `$defs`. Args: json_schema: The JSON schema to ensure is strict. schema_id: The ID of the schema. Returns: The strict JSON schema. """ from griptape.utils.dict_utils import add_key_in_dict_recursively json_schema.setdefault("$id", schema_id) json_schema.setdefault("$schema", "http://json-schema.org/draft-07/schema#") json_schema = add_key_in_dict_recursively( json_schema, key="additionalProperties", value=False, criteria=lambda d: d.get("type") == "object" and "additionalProperties" not in d, ) json_schema = resolve_refs(json_schema) if "$defs" in json_schema: del json_schema["$defs"] return json_schema
deprecation_warn(message, stacklevel=2)
Source Code in griptape/utils/deprecation.py
def deprecation_warn(message: str, stacklevel: int = 2) -> None: warnings.simplefilter("always", DeprecationWarning) warnings.warn(message, category=DeprecationWarning, stacklevel=stacklevel) warnings.simplefilter("default", DeprecationWarning)
dict_merge(dct, merge_dct, *, add_keys=True)
Recursive dict merge.
Inspired by :meth:dict.update()
, instead of updating only top-level keys, dict_merge recurses down into dicts nested to an arbitrary depth, updating keys. The merge_dct
is merged into dct
.
This version will return a copy of the dictionary and leave the original arguments untouched.
The optional argument add_keys
, determines whether keys which are present in merge_dict
but not dct
should be included in the new dict.
Parameters
Name | Type | Description | Default |
---|---|---|---|
dct | Optional[dict] | onto which the merge is executed | required |
merge_dct | Optional[dict] | dct merged into dct | required |
add_keys | bool | whether to add new keys | True |
Returns
Name | Type | Description |
---|---|---|
dict | dict | updated dict |
Source Code in griptape/utils/dict_utils.py
def dict_merge(dct: Optional[dict], merge_dct: Optional[dict], *, add_keys: bool = True) -> dict: """Recursive dict merge. Inspired by :meth:``dict.update()``, instead of updating only top-level keys, dict_merge recurses down into dicts nested to an arbitrary depth, updating keys. The ``merge_dct`` is merged into ``dct``. This version will return a copy of the dictionary and leave the original arguments untouched. The optional argument ``add_keys``, determines whether keys which are present in ``merge_dict`` but not ``dct`` should be included in the new dict. Args: dct: onto which the merge is executed merge_dct: dct merged into dct add_keys: whether to add new keys Returns: dict: updated dict """ dct = {} if dct is None else dct merge_dct = {} if merge_dct is None else merge_dct dct = dct.copy() if not add_keys: merge_dct = {k: merge_dct[k] for k in set(dct).intersection(set(merge_dct))} for key in merge_dct: if key in dct and isinstance(dct[key], dict): dct[key] = dict_merge(dct[key], merge_dct[key], add_keys=add_keys) else: dct[key] = merge_dct[key] return dct
execute_futures_dict(fs_dict)
Source Code in griptape/utils/futures.py
def execute_futures_dict(fs_dict: dict[str, futures.Future[T]]) -> dict[str, T]: futures.wait(fs_dict.values(), timeout=None, return_when=futures.ALL_COMPLETED) return {key: future.result() for key, future in fs_dict.items()}
execute_futures_list(fs_list)
Source Code in griptape/utils/futures.py
def execute_futures_list(fs_list: list[futures.Future[T]]) -> list[T]: futures.wait(fs_list, timeout=None, return_when=futures.ALL_COMPLETED) return [future.result() for future in fs_list]
execute_futures_list_dict(fs_dict)
Source Code in griptape/utils/futures.py
def execute_futures_list_dict(fs_dict: dict[str, list[futures.Future[T]]]) -> dict[str, list[T]]: execute_futures_list([item for sublist in fs_dict.values() for item in sublist]) return {key: [f.result() for f in fs] for key, fs in fs_dict.items()}
get_mime_type(file_path_or_bytes)
Attempt to determine the MIME type of a file or bytes.
If the input is a file path, we use the built-in mimetypes
package to guess the MIME type.
If the input is bytes, we use the filetype
library to determine the MIME type. If the library cannot determine the MIME type (data missing magic bytes), we use a few heuristics to guess the type.
Parameters
Name | Type | Description | Default |
---|---|---|---|
file_path_or_bytes | str | bytes | The path to the file or the bytes to check. | required |
Source Code in griptape/utils/file_utils.py
def get_mime_type(file_path_or_bytes: str | bytes) -> str: """Attempt to determine the MIME type of a file or bytes. If the input is a file path, we use the built-in `mimetypes` package to guess the MIME type. If the input is bytes, we use the `filetype` library to determine the MIME type. If the library cannot determine the MIME type (data missing magic bytes), we use a few heuristics to guess the type. Args: file_path_or_bytes: The path to the file or the bytes to check. Returns: The MIME type of the file or bytes. """ if isinstance(file_path_or_bytes, bytes): filetype_guess = filetype.guess(file_path_or_bytes) if filetype_guess is None: if _is_text(file_path_or_bytes): if _is_json(file_path_or_bytes): return "application/json" if _is_csv(file_path_or_bytes): return "text/csv" return "text/plain" return "application/octet-stream" return filetype_guess.mime type_, _ = mimetypes.guess_type(file_path_or_bytes) if type_ is None: return "application/octet-stream" return type_
import_optional_dependency(name)
Import an optional dependency.
If a dependency is missing, an ImportError with a nice message will be raised.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | The module name. | required |
Returns
Type | Description |
---|---|
ModuleType | The imported module, when found. |
Source Code in griptape/utils/import_utils.py
def import_optional_dependency(name: str) -> ModuleType: """Import an optional dependency. If a dependency is missing, an ImportError with a nice message will be raised. Args: name: The module name. Returns: The imported module, when found. """ package_name = INSTALL_MAPPING.get(name) install_name = package_name if package_name is not None else name msg = ( f"Missing optional dependency: '{install_name}'. " f"Please install the appropriate extra: https://docs.griptape.ai/stable/griptape-framework/#extras." ) try: module = import_module(name) except ImportError as exc: raise ImportError(msg) from exc return module
is_dependency_installed(name)
Check if an optional dependency is available.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | The module name. | required |
Returns
Type | Description |
---|---|
bool | True if the dependency is available. |
bool | False if the dependency is not available. |
Source Code in griptape/utils/import_utils.py
def is_dependency_installed(name: str) -> bool: """Check if an optional dependency is available. Args: name: The module name. Returns: True if the dependency is available. False if the dependency is not available. """ try: import_optional_dependency(name) except ImportError: return False return True
minify_json(value)
Source Code in griptape/utils/__init__.py
def minify_json(value: str) -> str: return json.dumps(json.loads(value), separators=(",", ":"))
references_from_artifacts(artifacts)
Source Code in griptape/utils/reference_utils.py
def references_from_artifacts(artifacts: list[TextArtifact]) -> list[Reference]: references = [] for a in artifacts: if a.reference is not None and a.reference not in references: references.append(a.reference) return references
remove_key_in_dict_recursively(d, key)
Source Code in griptape/utils/dict_utils.py
def remove_key_in_dict_recursively(d: dict, key: str) -> dict: if isinstance(d, dict): return {k: remove_key_in_dict_recursively(v, key) for k, v in d.items() if k != key} return d
remove_null_values_in_dict_recursively(d)
Source Code in griptape/utils/dict_utils.py
def remove_null_values_in_dict_recursively(d: dict) -> dict: if isinstance(d, dict): return {k: remove_null_values_in_dict_recursively(v) for k, v in d.items() if v is not None} return d
resolve_refs(schema)
Recursively resolve all local $refs in the given JSON Schema using $defs as the source.
Required since pydantic does not support nested schemas without $refs. https://github.com/pydantic/pydantic/issues/889
Parameters
Name | Type | Description | Default |
---|---|---|---|
schema | dict[str, Any] | A JSON Schema as a dictionary, which may contain "$refs" and "$defs". | required |
Source Code in griptape/utils/json_schema_utils.py
def resolve_refs(schema: dict[str, Any]) -> dict[str, Any]: """Recursively resolve all local $refs in the given JSON Schema using $defs as the source. Required since pydantic does not support nested schemas without $refs. https://github.com/pydantic/pydantic/issues/889 Args: schema: A JSON Schema as a dictionary, which may contain "$refs" and "$defs". Returns: A new dictionary with all local $refs resolved against $defs. """ defs = schema.get("$defs", {}) # Work on a deep copy so we don't mutate the original schema. schema_copy = deepcopy(schema) def _resolve(node: Any) -> Any: """Recursively walk through the node, resolving any local $refs to the corresponding definitions in 'defs'.""" if isinstance(node, dict): # If this node is a reference to something in #/$defs/... ref = node.get("$ref") if isinstance(ref, str) and ref.startswith("#/$defs/"): def_name = ref.replace("#/$defs/", "") if def_name in defs: # Replace the entire node with the referenced definition. return _resolve(deepcopy(defs[def_name])) raise KeyError(f"Definition '{def_name}' not found in $defs.") # If not a ref, or doesn't start with #/$defs/, just walk deeper. return {key: _resolve(value) for key, value in node.items()} if isinstance(node, list): # Recurse into each item of the list return [_resolve(item) for item in node] # For scalars (str, int, bool, None, etc.), return them as-is. return node return _resolve(schema_copy)
str_to_hash(text, hash_algorithm='sha256')
Source Code in griptape/utils/hash.py
def str_to_hash(text: str, hash_algorithm: str = "sha256") -> str: m = hashlib.new(hash_algorithm) m.update(text.encode()) return m.hexdigest()
with_contextvars(wrapped)
Source Code in griptape/utils/contextvars_utils.py
def with_contextvars(wrapped: Callable) -> Callable: ctx = contextvars.copy_context() def wrapper(*args, **kwargs) -> Any: return ctx.run(wrapped, *args, **kwargs) return wrapper
- On this page
- CommandRunner
- Conversation
- GriptapeCloudStructure
- J2
- ManifestValidator
- PythonRunner
- Stream
- StructureVisualizer
- TokenCounter
- add_key_in_dict_recursively(d, key, value, criteria=None)
- build_strict_schema(json_schema, schema_id)
- deprecation_warn(message, stacklevel=2)
- dict_merge(dct, merge_dct, *, add_keys=True)
- execute_futures_dict(fs_dict)
- execute_futures_list(fs_list)
- execute_futures_list_dict(fs_dict)
- get_mime_type(file_path_or_bytes)
- import_optional_dependency(name)
- is_dependency_installed(name)
- minify_json(value)
- references_from_artifacts(artifacts)
- remove_key_in_dict_recursively(d, key)
- remove_null_values_in_dict_recursively(d)
- resolve_refs(schema)
- str_to_hash(text, hash_algorithm='sha256')
- with_contextvars(wrapped)
Could this page be better? Report a problem or suggest an addition!