griptape_cloud_observability_driver
Bases:
OpenTelemetryObservabilityDriver
Source Code in griptape/drivers/observability/griptape_cloud_observability_driver.py
@define class GriptapeCloudObservabilityDriver(OpenTelemetryObservabilityDriver): base_url: str = field( default=Factory(lambda: os.getenv("GT_CLOUD_BASE_URL", "https://cloud.griptape.ai")), kw_only=True ) api_key: str = field(default=Factory(lambda: os.environ["GT_CLOUD_API_KEY"]), kw_only=True) headers: dict = field( default=Factory(lambda self: {"Authorization": f"Bearer {self.api_key}"}, takes_self=True), kw_only=True ) structure_run_id: Optional[str] = field( default=Factory(lambda: os.getenv("GT_CLOUD_STRUCTURE_RUN_ID")), kw_only=True ) span_processor: SpanProcessor = field( default=Factory( lambda self: import_optional_dependency("opentelemetry.sdk.trace.export").BatchSpanProcessor( GriptapeCloudObservabilityDriver.build_span_exporter( base_url=self.base_url, api_key=self.api_key, headers=self.headers, structure_run_id=self.structure_run_id, ) ), takes_self=True, ), kw_only=True, ) @structure_run_id.validator # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] def validate_run_id(self, _: Attribute, structure_run_id: Optional[str]) -> None: if structure_run_id is None: raise ValueError( "structure_run_id must be set either in the constructor or as an environment variable (GT_CLOUD_STRUCTURE_RUN_ID)." ) @staticmethod def format_trace_id(trace_id: int) -> str: return str(UUID(int=trace_id)) @staticmethod def format_span_id(span_id: int) -> str: return str(UUID(int=span_id)) @staticmethod def build_span_exporter(base_url: str, api_key: str, headers: dict, structure_run_id: str) -> SpanExporter: @define class SpanExporter(import_optional_dependency("opentelemetry.sdk.trace.export").SpanExporter): base_url: str = field(kw_only=True) api_key: str = field(kw_only=True) headers: dict = field(kw_only=True) structure_run_id: str = field(kw_only=True) def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: opentelemetry_util = import_optional_dependency("opentelemetry.sdk.util") opentelemetry_trace_export = import_optional_dependency("opentelemetry.sdk.trace.export") url = griptape_cloud_url(self.base_url, f"api/structure-runs/{self.structure_run_id}/spans") payload = [ { "trace_id": GriptapeCloudObservabilityDriver.format_trace_id(span.context.trace_id), "span_id": GriptapeCloudObservabilityDriver.format_span_id(span.context.span_id), "parent_id": GriptapeCloudObservabilityDriver.format_span_id(span.parent.span_id) if span.parent else None, "name": span.name, "start_time": opentelemetry_util.ns_to_iso_str(span.start_time) if span.start_time else None, "end_time": opentelemetry_util.ns_to_iso_str(span.end_time) if span.end_time else None, "status": span.status.status_code.name, "attributes": {**span.attributes} if span.attributes else {}, "events": [ { "name": event.name, "timestamp": opentelemetry_util.ns_to_iso_str(event.timestamp) if event.timestamp else None, "attributes": {**event.attributes} if event.attributes else {}, } for event in span.events ], } for span in spans if span.context is not None ] response = requests.post(url=url, json=payload, headers=self.headers) return ( opentelemetry_trace_export.SpanExportResult.SUCCESS if response.status_code == 200 else opentelemetry_trace_export.SpanExportResult.FAILURE ) return SpanExporter( base_url=base_url, api_key=api_key, headers=headers, structure_run_id=structure_run_id, ) def get_span_id(self) -> Optional[str]: opentelemetry_trace = import_optional_dependency("opentelemetry.trace") span = opentelemetry_trace.get_current_span() if span is opentelemetry_trace.INVALID_SPAN: return None return GriptapeCloudObservabilityDriver.format_span_id(span.get_span_context().span_id)
api_key = field(default=Factory(lambda: os.environ['GT_CLOUD_API_KEY']), kw_only=True)
class-attribute instance-attributebase_url = field(default=Factory(lambda: os.getenv('GT_CLOUD_BASE_URL', 'https://cloud.griptape.ai')), kw_only=True)
class-attribute instance-attributeheaders = field(default=Factory(lambda self: {'Authorization': f'Bearer {self.api_key}'}, takes_self=True), kw_only=True)
class-attribute instance-attributespan_processor = field(default=Factory(lambda self: import_optional_dependency('opentelemetry.sdk.trace.export').BatchSpanProcessor(GriptapeCloudObservabilityDriver.build_span_exporter(base_url=self.base_url, api_key=self.api_key, headers=self.headers, structure_run_id=self.structure_run_id)), takes_self=True), kw_only=True)
class-attribute instance-attributestructure_run_id = field(default=Factory(lambda: os.getenv('GT_CLOUD_STRUCTURE_RUN_ID')), kw_only=True)
class-attribute instance-attribute
build_span_exporter(base_url, api_key, headers, structure_run_id)staticmethod
Source Code in griptape/drivers/observability/griptape_cloud_observability_driver.py
@staticmethod def build_span_exporter(base_url: str, api_key: str, headers: dict, structure_run_id: str) -> SpanExporter: @define class SpanExporter(import_optional_dependency("opentelemetry.sdk.trace.export").SpanExporter): base_url: str = field(kw_only=True) api_key: str = field(kw_only=True) headers: dict = field(kw_only=True) structure_run_id: str = field(kw_only=True) def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: opentelemetry_util = import_optional_dependency("opentelemetry.sdk.util") opentelemetry_trace_export = import_optional_dependency("opentelemetry.sdk.trace.export") url = griptape_cloud_url(self.base_url, f"api/structure-runs/{self.structure_run_id}/spans") payload = [ { "trace_id": GriptapeCloudObservabilityDriver.format_trace_id(span.context.trace_id), "span_id": GriptapeCloudObservabilityDriver.format_span_id(span.context.span_id), "parent_id": GriptapeCloudObservabilityDriver.format_span_id(span.parent.span_id) if span.parent else None, "name": span.name, "start_time": opentelemetry_util.ns_to_iso_str(span.start_time) if span.start_time else None, "end_time": opentelemetry_util.ns_to_iso_str(span.end_time) if span.end_time else None, "status": span.status.status_code.name, "attributes": {**span.attributes} if span.attributes else {}, "events": [ { "name": event.name, "timestamp": opentelemetry_util.ns_to_iso_str(event.timestamp) if event.timestamp else None, "attributes": {**event.attributes} if event.attributes else {}, } for event in span.events ], } for span in spans if span.context is not None ] response = requests.post(url=url, json=payload, headers=self.headers) return ( opentelemetry_trace_export.SpanExportResult.SUCCESS if response.status_code == 200 else opentelemetry_trace_export.SpanExportResult.FAILURE ) return SpanExporter( base_url=base_url, api_key=api_key, headers=headers, structure_run_id=structure_run_id, )
format_span_id(span_id)staticmethod
Source Code in griptape/drivers/observability/griptape_cloud_observability_driver.py
@staticmethod def format_span_id(span_id: int) -> str: return str(UUID(int=span_id))
format_trace_id(trace_id)staticmethod
Source Code in griptape/drivers/observability/griptape_cloud_observability_driver.py
@staticmethod def format_trace_id(trace_id: int) -> str: return str(UUID(int=trace_id))
get_span_id()
Source Code in griptape/drivers/observability/griptape_cloud_observability_driver.py
def get_span_id(self) -> Optional[str]: opentelemetry_trace = import_optional_dependency("opentelemetry.trace") span = opentelemetry_trace.get_current_span() if span is opentelemetry_trace.INVALID_SPAN: return None return GriptapeCloudObservabilityDriver.format_span_id(span.get_span_context().span_id)
validaterun_id(, structure_run_id)
Source Code in griptape/drivers/observability/griptape_cloud_observability_driver.py
@structure_run_id.validator # pyright: ignore[reportAttributeAccessIssue, reportOptionalMemberAccess] def validate_run_id(self, _: Attribute, structure_run_id: Optional[str]) -> None: if structure_run_id is None: raise ValueError( "structure_run_id must be set either in the constructor or as an environment variable (GT_CLOUD_STRUCTURE_RUN_ID)." )
Could this page be better? Report a problem or suggest an addition!