• __all__ = ['J2', 'Chat', 'CommandRunner', 'Conversation', 'GriptapeCloudStructure', 'ManifestValidator', 'PythonRunner', 'Stream', 'StructureVisualizer', 'TokenCounter', 'add_key_in_dict_recursively', 'build_strict_schema', 'deprecation_warn', 'dict_merge', 'execute_futures_dict', 'execute_futures_list', 'execute_futures_list_dict', 'get_mime_type', 'import_optional_dependency', 'is_dependency_installed', 'load_artifact_from_memory', 'minify_json', 'references_from_artifacts', 'remove_key_in_dict_recursively', 'remove_null_values_in_dict_recursively', 'resolve_refs', 'str_to_hash', 'with_contextvars'] module-attribute

Utility for running a chat with a Structure.

Attributes

NameTypeDescription
structureStructureThe Structure to run.
exit_keywordslist[str]Keywords that will exit the chat.
exiting_textstrText to display when exiting the chat.
processing_textstrText to display while processing the user's input.
intro_textOptional[str]Text to display when the chat starts.
prompt_prefixstrPrefix for the user's input.
response_prefixstrPrefix for the assistant's response.
handle_inputCallable[[str], str]Function to get the user's input.
handle_outputCallable[..., None]Function to output text. Takes a text argument for the text to output. Also takes a stream argument which will be set to True when streaming Prompt Tasks are present.
Source Code in griptape/utils/chat.py
@define
class Chat:
    """Utility for running a chat with a Structure.

    Attributes:
        structure: The Structure to run.
        exit_keywords: Keywords that will exit the chat.
        exiting_text: Text to display when exiting the chat.
        processing_text: Text to display while processing the user's input.
        intro_text: Text to display when the chat starts.
        prompt_prefix: Prefix for the user's input.
        response_prefix: Prefix for the assistant's response.
        handle_input: Function to get the user's input.
        handle_output: Function to output text. Takes a `text` argument for the text to output.
                   Also takes a `stream` argument which will be set to True when streaming Prompt Tasks are present.
    """

    class ChatPrompt(Prompt):
        prompt_suffix = ""  # We don't want rich's default prompt suffix

    structure: Structure = field()
    exit_keywords: list[str] = field(default=["exit"], kw_only=True)
    exiting_text: str = field(default="Exiting...", kw_only=True)
    processing_text: str = field(default="Thinking...", kw_only=True)
    intro_text: Optional[str] = field(default=None, kw_only=True)
    prompt_prefix: str = field(default="User: ", kw_only=True)
    response_prefix: str = field(default="Assistant: ", kw_only=True)
    handle_input: Callable[[str], str] = field(
        default=Factory(lambda self: self.default_handle_input, takes_self=True), kw_only=True
    )
    handle_output: Callable[..., None] = field(
        default=Factory(lambda self: self.default_handle_output, takes_self=True),
        kw_only=True,
    )
    logger_level: int = field(default=logging.ERROR, kw_only=True)

    def default_handle_input(self, prompt_prefix: str) -> str:
        return Chat.ChatPrompt.ask(prompt_prefix)

    def default_handle_output(self, text: str, *, stream: bool = False) -> None:
        if stream:
            rprint(text, end="", flush=True)
        else:
            rprint(text)

    def start(self) -> None:
        from griptape.configs import Defaults

        # Hide Gen AI Builder's logging output except for errors
        old_logger_level = logging.getLogger(Defaults.logging_config.logger_name).getEffectiveLevel()
        logging.getLogger(Defaults.logging_config.logger_name).setLevel(self.logger_level)

        if self.intro_text:
            self._call_handle_output(self.intro_text)

        has_streaming_tasks = self._has_streaming_tasks()
        while True:
            question = self.handle_input(self.prompt_prefix)

            if question.lower() in self.exit_keywords:
                self._call_handle_output(self.exiting_text)
                break

            if has_streaming_tasks:
                self._call_handle_output(self.processing_text)
                stream = Stream(self.structure).run(question)
                first_chunk = next(stream)
                self._call_handle_output(self.response_prefix + first_chunk.value, stream=True)
                for chunk in stream:
                    self._call_handle_output(chunk.value, stream=True)
            else:
                self._call_handle_output(self.processing_text)
                self._call_handle_output(f"{self.response_prefix}{self.structure.run(question).output.to_text()}")

        # Restore the original logger level
        logging.getLogger(Defaults.logging_config.logger_name).setLevel(old_logger_level)

    def _has_streaming_tasks(self) -> bool:
        from griptape.tasks.prompt_task import PromptTask

        return any(isinstance(task, PromptTask) and task.prompt_driver.stream for task in self.structure.tasks)

    def _call_handle_output(self, text: str, *, stream: bool = False) -> None:
        func_params = inspect.signature(self.handle_output).parameters.copy()
        has_kwargs = False
        for param in func_params.values():
            # if there is a **kwargs parameter, we can safely
            # pass all the params to the function
            if param.kind == inspect.Parameter.VAR_KEYWORD:
                has_kwargs = True
                break

        if "stream" in func_params or has_kwargs:
            self.handle_output(text, stream=stream)
        else:
            self.handle_output(text)
  • exit_keywords = field(default=['exit'], kw_only=True) class-attribute instance-attribute

  • exiting_text = field(default='Exiting...', kw_only=True) class-attribute instance-attribute

  • handle_input = field(default=Factory(lambda self: self.default_handle_input, takes_self=True), kw_only=True) class-attribute instance-attribute

  • handle_output = field(default=Factory(lambda self: self.default_handle_output, takes_self=True), kw_only=True) class-attribute instance-attribute

  • intro_text = field(default=None, kw_only=True) class-attribute instance-attribute

  • logger_level = field(default=logging.ERROR, kw_only=True) class-attribute instance-attribute

  • processing_text = field(default='Thinking...', kw_only=True) class-attribute instance-attribute

  • prompt_prefix = field(default='User: ', kw_only=True) class-attribute instance-attribute

  • response_prefix = field(default='Assistant: ', kw_only=True) class-attribute instance-attribute

  • structure = field() class-attribute instance-attribute

ChatPrompt

Bases:

Prompt
Source Code in griptape/utils/chat.py
class ChatPrompt(Prompt):
    prompt_suffix = ""  # We don't want rich's default prompt suffix
  • prompt_suffix = '' class-attribute instance-attribute

_call_handle_output(text, *, stream=False)

Source Code in griptape/utils/chat.py
def _call_handle_output(self, text: str, *, stream: bool = False) -> None:
    func_params = inspect.signature(self.handle_output).parameters.copy()
    has_kwargs = False
    for param in func_params.values():
        # if there is a **kwargs parameter, we can safely
        # pass all the params to the function
        if param.kind == inspect.Parameter.VAR_KEYWORD:
            has_kwargs = True
            break

    if "stream" in func_params or has_kwargs:
        self.handle_output(text, stream=stream)
    else:
        self.handle_output(text)

_has_streaming_tasks()

Source Code in griptape/utils/chat.py
def _has_streaming_tasks(self) -> bool:
    from griptape.tasks.prompt_task import PromptTask

    return any(isinstance(task, PromptTask) and task.prompt_driver.stream for task in self.structure.tasks)

default_handle_input(prompt_prefix)

Source Code in griptape/utils/chat.py
def default_handle_input(self, prompt_prefix: str) -> str:
    return Chat.ChatPrompt.ask(prompt_prefix)

default_handle_output(text, *, stream=False)

Source Code in griptape/utils/chat.py
def default_handle_output(self, text: str, *, stream: bool = False) -> None:
    if stream:
        rprint(text, end="", flush=True)
    else:
        rprint(text)

start()

Source Code in griptape/utils/chat.py
def start(self) -> None:
    from griptape.configs import Defaults

    # Hide Gen AI Builder's logging output except for errors
    old_logger_level = logging.getLogger(Defaults.logging_config.logger_name).getEffectiveLevel()
    logging.getLogger(Defaults.logging_config.logger_name).setLevel(self.logger_level)

    if self.intro_text:
        self._call_handle_output(self.intro_text)

    has_streaming_tasks = self._has_streaming_tasks()
    while True:
        question = self.handle_input(self.prompt_prefix)

        if question.lower() in self.exit_keywords:
            self._call_handle_output(self.exiting_text)
            break

        if has_streaming_tasks:
            self._call_handle_output(self.processing_text)
            stream = Stream(self.structure).run(question)
            first_chunk = next(stream)
            self._call_handle_output(self.response_prefix + first_chunk.value, stream=True)
            for chunk in stream:
                self._call_handle_output(chunk.value, stream=True)
        else:
            self._call_handle_output(self.processing_text)
            self._call_handle_output(f"{self.response_prefix}{self.structure.run(question).output.to_text()}")

    # Restore the original logger level
    logging.getLogger(Defaults.logging_config.logger_name).setLevel(old_logger_level)

CommandRunner

Source Code in griptape/utils/command_runner.py
@define
class CommandRunner:
    def run(self, command: str) -> BaseArtifact:
        process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

        stdout, stderr = process.communicate()

        if len(stderr) == 0:
            return TextArtifact(stdout.strip().decode())
        return ErrorArtifact(f"error: {stderr.strip()}")

run(command)

Source Code in griptape/utils/command_runner.py
def run(self, command: str) -> BaseArtifact:
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    stdout, stderr = process.communicate()

    if len(stderr) == 0:
        return TextArtifact(stdout.strip().decode())
    return ErrorArtifact(f"error: {stderr.strip()}")

Conversation

Source Code in griptape/utils/conversation.py
@define(frozen=True)
class Conversation:
    memory: Optional[BaseConversationMemory] = field()

    @memory.validator  # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
    def validate_memory(self, attribute: Attribute, value: Optional[BaseConversationMemory]) -> None:
        if value is None:
            raise ValueError("Conversation memory must not be None.")

    def lines(self) -> list[str]:
        from griptape.memory.structure import SummaryConversationMemory

        lines = []

        for run in self.memory.runs if self.memory is not None else []:
            lines.extend((f"Q: {run.input}", f"A: {run.output}"))

        if isinstance(self.memory, SummaryConversationMemory):
            lines.append(f"Summary: {self.memory.summary}")

        return lines

    def prompt_stack(self) -> list[str]:
        from griptape.memory.structure import SummaryConversationMemory

        lines = []

        for stack in self.memory.to_prompt_stack().messages if self.memory is not None else []:
            lines.append(f"{stack.role}: {stack.to_text()}")

        if isinstance(self.memory, SummaryConversationMemory):
            lines.append(f"Summary: {self.memory.summary}")

        return lines

    def __str__(self) -> str:
        return str.join("\n", self.lines())
  • memory = field() class-attribute instance-attribute

str()

Source Code in griptape/utils/conversation.py
def __str__(self) -> str:
    return str.join("\n", self.lines())

lines()

Source Code in griptape/utils/conversation.py
def lines(self) -> list[str]:
    from griptape.memory.structure import SummaryConversationMemory

    lines = []

    for run in self.memory.runs if self.memory is not None else []:
        lines.extend((f"Q: {run.input}", f"A: {run.output}"))

    if isinstance(self.memory, SummaryConversationMemory):
        lines.append(f"Summary: {self.memory.summary}")

    return lines

prompt_stack()

Source Code in griptape/utils/conversation.py
def prompt_stack(self) -> list[str]:
    from griptape.memory.structure import SummaryConversationMemory

    lines = []

    for stack in self.memory.to_prompt_stack().messages if self.memory is not None else []:
        lines.append(f"{stack.role}: {stack.to_text()}")

    if isinstance(self.memory, SummaryConversationMemory):
        lines.append(f"Summary: {self.memory.summary}")

    return lines

validate_memory(attribute, value)

Source Code in griptape/utils/conversation.py
@memory.validator  # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess]
def validate_memory(self, attribute: Attribute, value: Optional[BaseConversationMemory]) -> None:
    if value is None:
        raise ValueError("Conversation memory must not be None.")

GriptapeCloudStructure

Utility for working with Gen AI Builder Structures.

Attributes

NameTypeDescription
_event_listenerOptional[EventListener]Event Listener to use. Defaults to an EventListener with a GriptapeCloudEventListenerDriver.
_observabilityOptional[Observability]Observability to use. Defaults to an Observability with a GriptapeCloudObservabilityDriver.
observeboolWhether to enable observability. Enabling requires the drivers-observability-griptape-cloud extra.
Source Code in griptape/utils/griptape_cloud.py
@define()
class GriptapeCloudStructure:
    """Utility for working with Gen AI Builder Structures.

    Attributes:
        _event_listener: Event Listener to use. Defaults to an EventListener with a GriptapeCloudEventListenerDriver.
        _observability: Observability to use. Defaults to an Observability with a GriptapeCloudObservabilityDriver.
        observe: Whether to enable observability. Enabling requires the `drivers-observability-griptape-cloud` extra.
    """

    _event_listener: Optional[EventListener] = field(default=None, kw_only=True, alias="event_listener")
    _observability: Optional[Observability] = field(default=None, kw_only=True, alias="observability")
    observe: bool = field(default=False, kw_only=True)
    _output: Optional[BaseArtifact] = field(default=None, init=False)

    @lazy_property()
    def event_listener(self) -> EventListener:
        from griptape.drivers.event_listener.griptape_cloud import GriptapeCloudEventListenerDriver

        return EventListener(event_listener_driver=GriptapeCloudEventListenerDriver())

    @lazy_property()
    def observability(self) -> Observability:
        from griptape.drivers.observability.griptape_cloud import GriptapeCloudObservabilityDriver
        from griptape.observability.observability import Observability

        return Observability(observability_driver=GriptapeCloudObservabilityDriver())

    @property
    def output(self) -> Optional[BaseArtifact]:
        return self._output

    @output.setter
    def output(self, value: BaseArtifact | Any) -> None:
        if isinstance(value, BaseArtifact):
            self._output = value
        elif isinstance(value, list):
            self._output = ListArtifact([self._to_artifact(item) for item in value])
        else:
            self._output = self._to_artifact(value)

    @property
    def structure_run_id(self) -> str:
        return os.environ["GT_CLOUD_STRUCTURE_RUN_ID"]

    @property
    def in_managed_environment(self) -> bool:
        return "GT_CLOUD_STRUCTURE_RUN_ID" in os.environ

    def __enter__(self) -> Self:
        from griptape.observability.observability import Observability

        if self.in_managed_environment:
            EventBus.add_event_listener(self.event_listener)

            if self.observe:
                Observability.set_global_driver(self.observability.observability_driver)
                self.observability.observability_driver.__enter__()

        return self

    def __exit__(
        self,
        exc_type: Optional[type[BaseException]],
        exc_value: Optional[BaseException],
        exc_traceback: Optional[TracebackType],
    ) -> None:
        from griptape.observability.observability import Observability

        if self.in_managed_environment:
            if self.output is not None:
                EventBus.publish_event(FinishStructureRunEvent(output_task_output=self.output), flush=True)
            EventBus.remove_event_listener(self.event_listener)

            if self.observe:
                Observability.set_global_driver(None)
                self.observability.observability_driver.__exit__(exc_type, exc_value, exc_traceback)

    def _to_artifact(self, value: Any) -> BaseArtifact:
        if isinstance(value, str):
            return TextArtifact(value)
        if isinstance(value, bool):
            return BooleanArtifact(value)
        if isinstance(value, dict):
            return JsonArtifact(value)
        if isinstance(value, bytes):
            return BlobArtifact(value)
        return GenericArtifact(value)
  • _event_listener = field(default=None, kw_only=True, alias='event_listener') class-attribute instance-attribute

  • _observability = field(default=None, kw_only=True, alias='observability') class-attribute instance-attribute

  • _output = field(default=None, init=False) class-attribute instance-attribute

  • in_managed_environment property

  • observe = field(default=False, kw_only=True) class-attribute instance-attribute

  • output property writable

  • structure_run_id property

enter()

Source Code in griptape/utils/griptape_cloud.py
def __enter__(self) -> Self:
    from griptape.observability.observability import Observability

    if self.in_managed_environment:
        EventBus.add_event_listener(self.event_listener)

        if self.observe:
            Observability.set_global_driver(self.observability.observability_driver)
            self.observability.observability_driver.__enter__()

    return self

exit(exc_type, exc_value, exc_traceback)

Source Code in griptape/utils/griptape_cloud.py
def __exit__(
    self,
    exc_type: Optional[type[BaseException]],
    exc_value: Optional[BaseException],
    exc_traceback: Optional[TracebackType],
) -> None:
    from griptape.observability.observability import Observability

    if self.in_managed_environment:
        if self.output is not None:
            EventBus.publish_event(FinishStructureRunEvent(output_task_output=self.output), flush=True)
        EventBus.remove_event_listener(self.event_listener)

        if self.observe:
            Observability.set_global_driver(None)
            self.observability.observability_driver.__exit__(exc_type, exc_value, exc_traceback)

_to_artifact(value)

Source Code in griptape/utils/griptape_cloud.py
def _to_artifact(self, value: Any) -> BaseArtifact:
    if isinstance(value, str):
        return TextArtifact(value)
    if isinstance(value, bool):
        return BooleanArtifact(value)
    if isinstance(value, dict):
        return JsonArtifact(value)
    if isinstance(value, bytes):
        return BlobArtifact(value)
    return GenericArtifact(value)

event_listener()

Source Code in griptape/utils/griptape_cloud.py
@lazy_property()
def event_listener(self) -> EventListener:
    from griptape.drivers.event_listener.griptape_cloud import GriptapeCloudEventListenerDriver

    return EventListener(event_listener_driver=GriptapeCloudEventListenerDriver())

observability()

Source Code in griptape/utils/griptape_cloud.py
@lazy_property()
def observability(self) -> Observability:
    from griptape.drivers.observability.griptape_cloud import GriptapeCloudObservabilityDriver
    from griptape.observability.observability import Observability

    return Observability(observability_driver=GriptapeCloudObservabilityDriver())

J2

Source Code in griptape/utils/j2.py
@define(frozen=True)
class J2:
    template_name: Optional[str] = field(default=None)
    templates_dir: str = field(default=abs_path("templates"), kw_only=True)
    environment: Environment = field(
        default=Factory(
            lambda self: Environment(loader=FileSystemLoader(self.templates_dir), trim_blocks=True, lstrip_blocks=True),
            takes_self=True,
        ),
        kw_only=True,
    )

    def render(self, **kwargs) -> str:
        if self.template_name is None:
            raise ValueError("template_name is required.")
        return self.environment.get_template(self.template_name).render(kwargs).rstrip()

    def render_from_string(self, value: str, **kwargs) -> str:
        return self.environment.from_string(value).render(kwargs)
  • environment = field(default=Factory(lambda self: Environment(loader=FileSystemLoader(self.templates_dir), trim_blocks=True, lstrip_blocks=True), takes_self=True), kw_only=True) class-attribute instance-attribute

  • template_name = field(default=None) class-attribute instance-attribute

  • templates_dir = field(default=abs_path('templates'), kw_only=True) class-attribute instance-attribute

render(**kwargs)

Source Code in griptape/utils/j2.py
def render(self, **kwargs) -> str:
    if self.template_name is None:
        raise ValueError("template_name is required.")
    return self.environment.get_template(self.template_name).render(kwargs).rstrip()

render_from_string(value, **kwargs)

Source Code in griptape/utils/j2.py
def render_from_string(self, value: str, **kwargs) -> str:
    return self.environment.from_string(value).render(kwargs)

ManifestValidator

Source Code in griptape/utils/manifest_validator.py
class ManifestValidator:
    def validate(self, manifest: dict) -> dict:
        return self.schema().validate(manifest)

    def schema(self) -> Schema:
        return Schema({"version": "v1", "name": str, "description": str, "contact_email": str, "legal_info_url": str})

schema()

Source Code in griptape/utils/manifest_validator.py
def schema(self) -> Schema:
    return Schema({"version": "v1", "name": str, "description": str, "contact_email": str, "legal_info_url": str})

validate(manifest)

Source Code in griptape/utils/manifest_validator.py
def validate(self, manifest: dict) -> dict:
    return self.schema().validate(manifest)

PythonRunner

Source Code in griptape/utils/python_runner.py
@define
class PythonRunner:
    libs: dict[str, str] = field(factory=dict, kw_only=True)

    def run(self, code: str) -> str:
        global_stdout = sys.stdout
        sys.stdout = local_stdout = StringIO()

        try:
            for lib, alias in self.libs.items():
                imported_lib = importlib.import_module(lib)
                globals()[alias] = imported_lib

            exec(f"print({code})", {}, {alias: eval(alias) for alias in self.libs.values()})

            output = local_stdout.getvalue()
        except Exception as e:
            output = str(e)
        finally:
            sys.stdout = global_stdout

        return output.strip()
  • libs = field(factory=dict, kw_only=True) class-attribute instance-attribute

run(code)

Source Code in griptape/utils/python_runner.py
def run(self, code: str) -> str:
    global_stdout = sys.stdout
    sys.stdout = local_stdout = StringIO()

    try:
        for lib, alias in self.libs.items():
            imported_lib = importlib.import_module(lib)
            globals()[alias] = imported_lib

        exec(f"print({code})", {}, {alias: eval(alias) for alias in self.libs.values()})

        output = local_stdout.getvalue()
    except Exception as e:
        output = str(e)
    finally:
        sys.stdout = global_stdout

    return output.strip()

Stream

A wrapper for Structures filters Events relevant to text output and converts them to TextArtifacts.

Attributes

NameTypeDescription
structureStructureThe Structure to wrap.
Source Code in griptape/utils/stream.py
@define
class Stream:
    """A wrapper for Structures filters Events relevant to text output and converts them to TextArtifacts.

    Attributes:
        structure: The Structure to wrap.
    """

    structure: Structure = field()
    event_types: list[type[BaseEvent]] = field(
        default=Factory(lambda: [TextChunkEvent, ActionChunkEvent, FinishPromptEvent, FinishStructureRunEvent])
    )

    def run(self, *args) -> Iterator[TextArtifact]:
        action_str = ""

        for event in self.structure.run_stream(*args, event_types=self.event_types):
            if isinstance(event, FinishPromptEvent):
                yield TextArtifact(value="\n")
            elif isinstance(event, TextChunkEvent):
                yield TextArtifact(value=event.token)
            elif isinstance(event, ActionChunkEvent):
                if event.tag is not None and event.name is not None and event.path is not None:
                    yield TextArtifact(f"{event.name}.{event.tag} ({event.path})")
                if event.partial_input is not None:
                    action_str += event.partial_input
                    try:
                        yield TextArtifact(json.dumps(json.loads(action_str), indent=2))
                        action_str = ""
                    except Exception:
                        pass
  • event_types = field(default=Factory(lambda: [TextChunkEvent, ActionChunkEvent, FinishPromptEvent, FinishStructureRunEvent])) class-attribute instance-attribute

  • structure = field() class-attribute instance-attribute

run(*args)

Source Code in griptape/utils/stream.py
def run(self, *args) -> Iterator[TextArtifact]:
    action_str = ""

    for event in self.structure.run_stream(*args, event_types=self.event_types):
        if isinstance(event, FinishPromptEvent):
            yield TextArtifact(value="\n")
        elif isinstance(event, TextChunkEvent):
            yield TextArtifact(value=event.token)
        elif isinstance(event, ActionChunkEvent):
            if event.tag is not None and event.name is not None and event.path is not None:
                yield TextArtifact(f"{event.name}.{event.tag} ({event.path})")
            if event.partial_input is not None:
                action_str += event.partial_input
                try:
                    yield TextArtifact(json.dumps(json.loads(action_str), indent=2))
                    action_str = ""
                except Exception:
                    pass

StructureVisualizer

Source Code in griptape/utils/structure_visualizer.py
@define
class StructureVisualizer:
    """Utility class to visualize a Structure structure."""

    structure: Structure = field()
    header: str = field(default="graph TD;", kw_only=True)
    build_node_id: Callable[[BaseTask], str] = field(default=lambda task: task.id.title(), kw_only=True)
    query_params: dict[str, str] = field(factory=dict, kw_only=True)
    base_url: str = field(default="https://mermaid.ink", kw_only=True)

    def to_url(self) -> str:
        """Generates a url that renders the Workflow structure as a Mermaid flowchart.

        Reference: https://mermaid.js.org/ecosystem/tutorials#jupyter-integration-with-mermaid-js.

        Returns:
            str: URL to the rendered image
        """
        self.structure.resolve_relationships()

        tasks = self.__render_tasks(self.structure.tasks)
        graph = f"{self.header}{tasks}"

        graph_bytes = graph.encode("utf-8")
        base64_string = base64.b64encode(graph_bytes).decode("utf-8")

        url = urllib.parse.urljoin(self.base_url, f"svg/{base64_string}")
        req = PreparedRequest()
        req.prepare_url(url, self.query_params)

        if req.url is None:
            raise ValueError("Failed to generate the URL")

        return req.url

    def __render_tasks(self, tasks: list[BaseTask]) -> str:
        return "\n\t" + "\n\t".join([self.__render_task(task) for task in tasks])

    def __render_task(self, task: BaseTask) -> str:
        from griptape.drivers.structure_run.local import LocalStructureRunDriver
        from griptape.tasks import BranchTask, StructureRunTask

        parts = []
        if task.children:
            children = " & ".join([f"{self.build_node_id(child)}" for child in task.children])
            if isinstance(task, BranchTask):
                parts.append(f"{self.build_node_id(task)}{{ {self.build_node_id(task)} }}-.-> {children};")
            else:
                parts.append(f"{self.build_node_id(task)}--> {children};")
        else:
            parts.append(f"{self.build_node_id(task)};")

        if isinstance(task, StructureRunTask) and isinstance(task.structure_run_driver, LocalStructureRunDriver):
            sub_structure = task.structure_run_driver.create_structure()
            sub_tasks = self.__render_tasks(sub_structure.tasks)
            parts.append(f"subgraph {self.build_node_id(task)}{sub_tasks}\n\tend")

        return "\n\t".join(parts)
  • base_url = field(default='https://mermaid.ink', kw_only=True) class-attribute instance-attribute

  • build_node_id = field(default=lambda task: task.id.title(), kw_only=True) class-attribute instance-attribute

  • header = field(default='graph TD;', kw_only=True) class-attribute instance-attribute

  • query_params = field(factory=dict, kw_only=True) class-attribute instance-attribute

  • structure = field() class-attribute instance-attribute

__render_task(task)

Source Code in griptape/utils/structure_visualizer.py
def __render_task(self, task: BaseTask) -> str:
    from griptape.drivers.structure_run.local import LocalStructureRunDriver
    from griptape.tasks import BranchTask, StructureRunTask

    parts = []
    if task.children:
        children = " & ".join([f"{self.build_node_id(child)}" for child in task.children])
        if isinstance(task, BranchTask):
            parts.append(f"{self.build_node_id(task)}{{ {self.build_node_id(task)} }}-.-> {children};")
        else:
            parts.append(f"{self.build_node_id(task)}--> {children};")
    else:
        parts.append(f"{self.build_node_id(task)};")

    if isinstance(task, StructureRunTask) and isinstance(task.structure_run_driver, LocalStructureRunDriver):
        sub_structure = task.structure_run_driver.create_structure()
        sub_tasks = self.__render_tasks(sub_structure.tasks)
        parts.append(f"subgraph {self.build_node_id(task)}{sub_tasks}\n\tend")

    return "\n\t".join(parts)

__render_tasks(tasks)

Source Code in griptape/utils/structure_visualizer.py
def __render_tasks(self, tasks: list[BaseTask]) -> str:
    return "\n\t" + "\n\t".join([self.__render_task(task) for task in tasks])

to_url()

Generates a url that renders the Workflow structure as a Mermaid flowchart.

Reference: https://mermaid.js.org/ecosystem/tutorials#jupyter-integration-with-mermaid-js.

Returns

NameTypeDescription
strstrURL to the rendered image
Source Code in griptape/utils/structure_visualizer.py
def to_url(self) -> str:
    """Generates a url that renders the Workflow structure as a Mermaid flowchart.

    Reference: https://mermaid.js.org/ecosystem/tutorials#jupyter-integration-with-mermaid-js.

    Returns:
        str: URL to the rendered image
    """
    self.structure.resolve_relationships()

    tasks = self.__render_tasks(self.structure.tasks)
    graph = f"{self.header}{tasks}"

    graph_bytes = graph.encode("utf-8")
    base64_string = base64.b64encode(graph_bytes).decode("utf-8")

    url = urllib.parse.urljoin(self.base_url, f"svg/{base64_string}")
    req = PreparedRequest()
    req.prepare_url(url, self.query_params)

    if req.url is None:
        raise ValueError("Failed to generate the URL")

    return req.url

TokenCounter

Source Code in griptape/utils/token_counter.py
@define
class TokenCounter:
    tokens: int = field(default=0, kw_only=True)

    def add_tokens(self, new_tokens: float) -> int:
        self.tokens += int(new_tokens)

        return self.tokens
  • tokens = field(default=0, kw_only=True) class-attribute instance-attribute

add_tokens(new_tokens)

Source Code in griptape/utils/token_counter.py
def add_tokens(self, new_tokens: float) -> int:
    self.tokens += int(new_tokens)

    return self.tokens

add_key_in_dict_recursively(d, key, value, criteria=None)

Add a key in a dictionary recursively.

Parameters

NameTypeDescriptionDefault
dAnyThe dictionary to add the key to.
required
keystrThe key to add.
required
valueAnyThe value to add.
required
criteriaOptional[Callable[[dict], bool]]An optional function to determine if the key should be added.
None
Source Code in griptape/utils/dict_utils.py
def add_key_in_dict_recursively(
    d: Any, key: str, value: Any, criteria: Optional[Callable[[dict], bool]] = None
) -> dict:
    """Add a key in a dictionary recursively.

    Args:
        d: The dictionary to add the key to.
        key: The key to add.
        value: The value to add.
        criteria: An optional function to determine if the key should be added.
    """
    if isinstance(d, dict):
        if criteria is None or criteria(d):
            d[key] = value
        return {k: add_key_in_dict_recursively(v, key, value, criteria) for k, v in d.items()}
    return d

build_strict_schema(json_schema, schema_id)

Performs a series of post-processing steps to ensure a JSON schema is compatible with LLMs.

  1. Adds the $id and $schema keys.
  2. Sets additionalProperties to False for objects without this key.
  3. Resolves $refs and removes $defs.

Parameters

NameTypeDescriptionDefault
json_schemadictThe JSON schema to ensure is strict.
required
schema_idstrThe ID of the schema.
required
Source Code in griptape/utils/json_schema_utils.py
def build_strict_schema(json_schema: dict, schema_id: str) -> dict:
    """Performs a series of post-processing steps to ensure a JSON schema is compatible with LLMs.

    1. Adds the `$id` and `$schema` keys.
    2. Sets `additionalProperties` to `False` for objects without this key.
    3. Resolves `$ref`s and removes `$defs`.

    Args:
        json_schema: The JSON schema to ensure is strict.
        schema_id: The ID of the schema.

    Returns: The strict JSON schema.
    """
    from griptape.utils.dict_utils import add_key_in_dict_recursively

    json_schema.setdefault("$id", schema_id)
    json_schema.setdefault("$schema", "http://json-schema.org/draft-07/schema#")
    json_schema = add_key_in_dict_recursively(
        json_schema,
        key="additionalProperties",
        value=False,
        criteria=lambda d: d.get("type") == "object" and "additionalProperties" not in d,
    )
    json_schema = resolve_refs(json_schema)
    if "$defs" in json_schema:
        del json_schema["$defs"]

    return json_schema

deprecation_warn(message, stacklevel=2)

Source Code in griptape/utils/deprecation.py
def deprecation_warn(message: str, stacklevel: int = 2) -> None:
    warnings.simplefilter("always", DeprecationWarning)
    warnings.warn(message, category=DeprecationWarning, stacklevel=stacklevel)
    warnings.simplefilter("default", DeprecationWarning)

dict_merge(dct, merge_dct, *, add_keys=True)

Recursive dict merge.

Inspired by :meth:dict.update(), instead of updating only top-level keys, dict_merge recurses down into dicts nested to an arbitrary depth, updating keys. The merge_dct is merged into dct.

This version will return a copy of the dictionary and leave the original arguments untouched.

The optional argument add_keys, determines whether keys which are present in merge_dict but not dct should be included in the new dict.

Parameters

NameTypeDescriptionDefault
dctOptional[dict]onto which the merge is executed
required
merge_dctOptional[dict]dct merged into dct
required
add_keysboolwhether to add new keys
True

Returns

NameTypeDescription
dictdictupdated dict
Source Code in griptape/utils/dict_utils.py
def dict_merge(dct: Optional[dict], merge_dct: Optional[dict], *, add_keys: bool = True) -> dict:
    """Recursive dict merge.

    Inspired by :meth:``dict.update()``, instead of
    updating only top-level keys, dict_merge recurses down into dicts nested
    to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
    ``dct``.

    This version will return a copy of the dictionary and leave the original
    arguments untouched.

    The optional argument ``add_keys``, determines whether keys which are
    present in ``merge_dict`` but not ``dct`` should be included in the
    new dict.

    Args:
        dct: onto which the merge is executed
        merge_dct: dct merged into dct
        add_keys: whether to add new keys

    Returns:
        dict: updated dict
    """
    dct = {} if dct is None else dct
    merge_dct = {} if merge_dct is None else merge_dct

    dct = dct.copy()

    if not add_keys:
        merge_dct = {k: merge_dct[k] for k in set(dct).intersection(set(merge_dct))}

    for key in merge_dct:
        if key in dct and isinstance(dct[key], dict):
            dct[key] = dict_merge(dct[key], merge_dct[key], add_keys=add_keys)
        else:
            dct[key] = merge_dct[key]

    return dct

execute_futures_dict(fs_dict)

Source Code in griptape/utils/futures.py
def execute_futures_dict(fs_dict: dict[str, futures.Future[T]]) -> dict[str, T]:
    futures.wait(fs_dict.values(), timeout=None, return_when=futures.ALL_COMPLETED)

    return {key: future.result() for key, future in fs_dict.items()}

execute_futures_list(fs_list)

Source Code in griptape/utils/futures.py
def execute_futures_list(fs_list: list[futures.Future[T]]) -> list[T]:
    futures.wait(fs_list, timeout=None, return_when=futures.ALL_COMPLETED)

    return [future.result() for future in fs_list]

execute_futures_list_dict(fs_dict)

Source Code in griptape/utils/futures.py
def execute_futures_list_dict(fs_dict: dict[str, list[futures.Future[T]]]) -> dict[str, list[T]]:
    execute_futures_list([item for sublist in fs_dict.values() for item in sublist])

    return {key: [f.result() for f in fs] for key, fs in fs_dict.items()}

get_mime_type(file_path_or_bytes)

Attempt to determine the MIME type of a file or bytes.

If the input is a file path, we use the built-in mimetypes package to guess the MIME type.

If the input is bytes, we use the filetype library to determine the MIME type. If the library cannot determine the MIME type (data missing magic bytes), we use a few heuristics to guess the type.

Parameters

NameTypeDescriptionDefault
file_path_or_bytesstr | bytesThe path to the file or the bytes to check.
required
Source Code in griptape/utils/file_utils.py
def get_mime_type(file_path_or_bytes: str | bytes) -> str:
    """Attempt to determine the MIME type of a file or bytes.

    If the input is a file path, we use the built-in `mimetypes` package to guess the MIME type.

    If the input is bytes, we use the `filetype` library to determine the MIME type.
    If the library cannot determine the MIME type (data missing magic bytes), we use a few heuristics to guess the type.

    Args:
        file_path_or_bytes: The path to the file or the bytes to check.

    Returns: The MIME type of the file or bytes.
    """
    if isinstance(file_path_or_bytes, bytes):
        filetype_guess = filetype.guess(file_path_or_bytes)

        if filetype_guess is None:
            if _is_text(file_path_or_bytes):
                if _is_json(file_path_or_bytes):
                    return "application/json"
                if _is_csv(file_path_or_bytes):
                    return "text/csv"
                return "text/plain"
            return "application/octet-stream"
        return filetype_guess.mime
    type_, _ = mimetypes.guess_type(file_path_or_bytes)
    if type_ is None:
        return "application/octet-stream"
    return type_

import_optional_dependency(name)

Import an optional dependency.

If a dependency is missing, an ImportError with a nice message will be raised.

Parameters

NameTypeDescriptionDefault
namestrThe module name.
required

Returns

TypeDescription
ModuleTypeThe imported module, when found.
Source Code in griptape/utils/import_utils.py
def import_optional_dependency(name: str) -> ModuleType:
    """Import an optional dependency.

    If a dependency is missing, an ImportError with a nice message will be raised.

    Args:
        name: The module name.

    Returns:
        The imported module, when found.
    """
    package_name = INSTALL_MAPPING.get(name)
    install_name = package_name if package_name is not None else name

    msg = (
        f"Missing optional dependency: '{install_name}'. "
        f"Please install the appropriate extra: https://docs.griptape.ai/stable/griptape-framework/#extras."
    )
    try:
        module = import_module(name)
    except ImportError as exc:
        raise ImportError(msg) from exc

    return module

is_dependency_installed(name)

Check if an optional dependency is available.

Parameters

NameTypeDescriptionDefault
namestrThe module name.
required

Returns

TypeDescription
boolTrue if the dependency is available.
boolFalse if the dependency is not available.
Source Code in griptape/utils/import_utils.py
def is_dependency_installed(name: str) -> bool:
    """Check if an optional dependency is available.

    Args:
        name: The module name.

    Returns:
        True if the dependency is available.
        False if the dependency is not available.
    """
    try:
        import_optional_dependency(name)
    except ImportError:
        return False

    return True

minify_json(value)

Source Code in griptape/utils/__init__.py
def minify_json(value: str) -> str:
    return json.dumps(json.loads(value), separators=(",", ":"))

references_from_artifacts(artifacts)

Source Code in griptape/utils/reference_utils.py
def references_from_artifacts(artifacts: list[TextArtifact]) -> list[Reference]:
    references = []

    for a in artifacts:
        if a.reference is not None and a.reference not in references:
            references.append(a.reference)

    return references

remove_key_in_dict_recursively(d, key)

Source Code in griptape/utils/dict_utils.py
def remove_key_in_dict_recursively(d: dict, key: str) -> dict:
    if isinstance(d, dict):
        return {k: remove_key_in_dict_recursively(v, key) for k, v in d.items() if k != key}
    return d

remove_null_values_in_dict_recursively(d)

Source Code in griptape/utils/dict_utils.py
def remove_null_values_in_dict_recursively(d: dict) -> dict:
    if isinstance(d, dict):
        return {k: remove_null_values_in_dict_recursively(v) for k, v in d.items() if v is not None}
    return d

resolve_refs(schema)

Recursively resolve all local $refs in the given JSON Schema using $defs as the source.

Required since pydantic does not support nested schemas without $refs. https://github.com/pydantic/pydantic/issues/889

Parameters

NameTypeDescriptionDefault
schemadict[str, Any]A JSON Schema as a dictionary, which may contain "$refs" and "$defs".
required
Source Code in griptape/utils/json_schema_utils.py
def resolve_refs(schema: dict[str, Any]) -> dict[str, Any]:
    """Recursively resolve all local $refs in the given JSON Schema using $defs as the source.

    Required since pydantic does not support nested schemas without $refs. https://github.com/pydantic/pydantic/issues/889

    Args:
         schema: A JSON Schema as a dictionary, which may contain "$refs" and "$defs".
    Returns:  A new dictionary with all local $refs resolved against $defs.
    """
    defs = schema.get("$defs", {})
    # Work on a deep copy so we don't mutate the original schema.
    schema_copy = deepcopy(schema)

    def _resolve(node: Any) -> Any:
        """Recursively walk through the node, resolving any local $refs to the corresponding definitions in 'defs'."""
        if isinstance(node, dict):
            # If this node is a reference to something in #/$defs/...
            ref = node.get("$ref")
            if isinstance(ref, str) and ref.startswith("#/$defs/"):
                def_name = ref.replace("#/$defs/", "")
                if def_name in defs:
                    # Replace the entire node with the referenced definition.
                    return _resolve(deepcopy(defs[def_name]))
                raise KeyError(f"Definition '{def_name}' not found in $defs.")

            # If not a ref, or doesn't start with #/$defs/, just walk deeper.
            return {key: _resolve(value) for key, value in node.items()}

        if isinstance(node, list):
            # Recurse into each item of the list
            return [_resolve(item) for item in node]

        # For scalars (str, int, bool, None, etc.), return them as-is.
        return node

    return _resolve(schema_copy)

str_to_hash(text, hash_algorithm='sha256')

Source Code in griptape/utils/hash.py
def str_to_hash(text: str, hash_algorithm: str = "sha256") -> str:
    m = hashlib.new(hash_algorithm)

    m.update(text.encode())

    return m.hexdigest()

with_contextvars(wrapped)

Source Code in griptape/utils/contextvars_utils.py
def with_contextvars(wrapped: Callable) -> Callable:
    ctx = contextvars.copy_context()

    def wrapper(*args, **kwargs) -> Any:
        return ctx.run(wrapped, *args, **kwargs)

    return wrapper

Could this page be better? Report a problem or suggest an addition!