open_telemetry
Adapted from the Griptape AI Framework documentation.
__all__ = ['OpenTelemetryObservabilityDriver']
module-attribute
Bases:
BaseObservabilityDriver
Source Code in griptape/drivers/observability/open_telemetry_observability_driver.py
@define class OpenTelemetryObservabilityDriver(BaseObservabilityDriver): service_name: str = field(default="griptape", kw_only=True) span_processor: SpanProcessor = field(kw_only=True) service_version: Optional[str] = field(default=None, kw_only=True) deployment_env: Optional[str] = field(default=None, kw_only=True) trace_provider: TracerProvider = field( default=Factory( lambda self: self._trace_provider_factory(), takes_self=True, ), kw_only=True, ) _tracer: Tracer = field(init=False) _root_span_context_manager: Any = None def _trace_provider_factory(self) -> TracerProvider: opentelemetry_trace = import_optional_dependency("opentelemetry.sdk.trace") attributes = {"service.name": self.service_name} if self.service_version is not None: attributes["service.version"] = self.service_version if self.deployment_env is not None: attributes["deployment.environment"] = self.deployment_env return opentelemetry_trace.TracerProvider( resource=import_optional_dependency("opentelemetry.sdk.resources").Resource(attributes=attributes) ) # pyright: ignore[reportArgumentType] def __attrs_post_init__(self) -> None: opentelemetry_trace = import_optional_dependency("opentelemetry.trace") self.trace_provider.add_span_processor(self.span_processor) self._tracer = opentelemetry_trace.get_tracer(self.service_name, tracer_provider=self.trace_provider) def __enter__(self) -> None: opentelemetry_instrumentation_threading = import_optional_dependency("opentelemetry.instrumentation.threading") opentelemetry_instrumentation_threading.ThreadingInstrumentor().instrument() self._root_span_context_manager = self._tracer.start_as_current_span("main") # pyright: ignore[reportCallIssue] self._root_span_context_manager.__enter__() def __exit__( self, exc_type: Optional[type[BaseException]], exc_value: Optional[BaseException], exc_traceback: Optional[TracebackType], ) -> bool: opentelemetry_trace = import_optional_dependency("opentelemetry.trace") opentelemetry_instrumentation_threading = import_optional_dependency("opentelemetry.instrumentation.threading") root_span = opentelemetry_trace.get_current_span() if exc_value: root_span = opentelemetry_trace.get_current_span() root_span.set_status(opentelemetry_trace.Status(opentelemetry_trace.StatusCode.ERROR)) root_span.record_exception(exc_value) else: root_span.set_status(opentelemetry_trace.Status(opentelemetry_trace.StatusCode.OK)) if self._root_span_context_manager: self._root_span_context_manager.__exit__(exc_type, exc_value, exc_traceback) self._root_span_context_manager = None self.trace_provider.force_flush() opentelemetry_instrumentation_threading.ThreadingInstrumentor().uninstrument() return False def observe(self, call: Observable.Call) -> Any: open_telemetry_trace = import_optional_dependency("opentelemetry.trace") func = call.func instance = call.instance tags = call.tags class_name = f"{instance.__class__.__name__}." if instance else "" span_name = f"{class_name}{func.__name__}()" with self._tracer.start_as_current_span(span_name) as span: # pyright: ignore[reportCallIssue] if tags is not None: span.set_attribute("tags", tags) try: result = call() span.set_status(open_telemetry_trace.Status(open_telemetry_trace.StatusCode.OK)) return result except Exception as e: span.set_status(open_telemetry_trace.Status(open_telemetry_trace.StatusCode.ERROR)) span.record_exception(e) raise e def get_span_id(self) -> Optional[str]: opentelemetry_trace = import_optional_dependency("opentelemetry.trace") span = opentelemetry_trace.get_current_span() if span is opentelemetry_trace.INVALID_SPAN: return None return opentelemetry_trace.format_span_id(span.get_span_context().span_id)
_root_span_context_manager = None
class-attribute instance-attribute_tracer = field(init=False)
class-attribute instance-attributedeployment_env = field(default=None, kw_only=True)
class-attribute instance-attributeservice_name = field(default='griptape', kw_only=True)
class-attribute instance-attributeservice_version = field(default=None, kw_only=True)
class-attribute instance-attributespan_processor = field(kw_only=True)
class-attribute instance-attributetrace_provider = field(default=Factory(lambda self: self._trace_provider_factory(), takes_self=True), kw_only=True)
class-attribute instance-attribute
attrs_post_init()
Source Code in griptape/drivers/observability/open_telemetry_observability_driver.py
def __attrs_post_init__(self) -> None: opentelemetry_trace = import_optional_dependency("opentelemetry.trace") self.trace_provider.add_span_processor(self.span_processor) self._tracer = opentelemetry_trace.get_tracer(self.service_name, tracer_provider=self.trace_provider)
enter()
Source Code in griptape/drivers/observability/open_telemetry_observability_driver.py
def __enter__(self) -> None: opentelemetry_instrumentation_threading = import_optional_dependency("opentelemetry.instrumentation.threading") opentelemetry_instrumentation_threading.ThreadingInstrumentor().instrument() self._root_span_context_manager = self._tracer.start_as_current_span("main") # pyright: ignore[reportCallIssue] self._root_span_context_manager.__enter__()
exit(exc_type, exc_value, exc_traceback)
Source Code in griptape/drivers/observability/open_telemetry_observability_driver.py
def __exit__( self, exc_type: Optional[type[BaseException]], exc_value: Optional[BaseException], exc_traceback: Optional[TracebackType], ) -> bool: opentelemetry_trace = import_optional_dependency("opentelemetry.trace") opentelemetry_instrumentation_threading = import_optional_dependency("opentelemetry.instrumentation.threading") root_span = opentelemetry_trace.get_current_span() if exc_value: root_span = opentelemetry_trace.get_current_span() root_span.set_status(opentelemetry_trace.Status(opentelemetry_trace.StatusCode.ERROR)) root_span.record_exception(exc_value) else: root_span.set_status(opentelemetry_trace.Status(opentelemetry_trace.StatusCode.OK)) if self._root_span_context_manager: self._root_span_context_manager.__exit__(exc_type, exc_value, exc_traceback) self._root_span_context_manager = None self.trace_provider.force_flush() opentelemetry_instrumentation_threading.ThreadingInstrumentor().uninstrument() return False
_trace_provider_factory()
Source Code in griptape/drivers/observability/open_telemetry_observability_driver.py
def _trace_provider_factory(self) -> TracerProvider: opentelemetry_trace = import_optional_dependency("opentelemetry.sdk.trace") attributes = {"service.name": self.service_name} if self.service_version is not None: attributes["service.version"] = self.service_version if self.deployment_env is not None: attributes["deployment.environment"] = self.deployment_env return opentelemetry_trace.TracerProvider( resource=import_optional_dependency("opentelemetry.sdk.resources").Resource(attributes=attributes) ) # pyright: ignore[reportArgumentType]
get_span_id()
Source Code in griptape/drivers/observability/open_telemetry_observability_driver.py
def get_span_id(self) -> Optional[str]: opentelemetry_trace = import_optional_dependency("opentelemetry.trace") span = opentelemetry_trace.get_current_span() if span is opentelemetry_trace.INVALID_SPAN: return None return opentelemetry_trace.format_span_id(span.get_span_context().span_id)
observe(call)
Source Code in griptape/drivers/observability/open_telemetry_observability_driver.py
def observe(self, call: Observable.Call) -> Any: open_telemetry_trace = import_optional_dependency("opentelemetry.trace") func = call.func instance = call.instance tags = call.tags class_name = f"{instance.__class__.__name__}." if instance else "" span_name = f"{class_name}{func.__name__}()" with self._tracer.start_as_current_span(span_name) as span: # pyright: ignore[reportCallIssue] if tags is not None: span.set_attribute("tags", tags) try: result = call() span.set_status(open_telemetry_trace.Status(open_telemetry_trace.StatusCode.OK)) return result except Exception as e: span.set_status(open_telemetry_trace.Status(open_telemetry_trace.StatusCode.ERROR)) span.record_exception(e) raise e
Could this page be better? Report a problem or suggest an addition!