opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
integrations
/
Go to Home Directory
+
Upload
Create File
root@0UT1S:~$
Execute
By Order of Mr.0UT1S
[DIR] ..
N/A
[DIR] __pycache__
N/A
[DIR] celery
N/A
[DIR] django
N/A
[DIR] grpc
N/A
[DIR] opentelemetry
N/A
[DIR] redis
N/A
[DIR] spark
N/A
__init__.py
9.95 KB
Rename
Delete
_asgi_common.py
3.11 KB
Rename
Delete
_wsgi_common.py
7.38 KB
Rename
Delete
aiohttp.py
12.59 KB
Rename
Delete
anthropic.py
9.21 KB
Rename
Delete
argv.py
911 bytes
Rename
Delete
ariadne.py
5.70 KB
Rename
Delete
arq.py
7.67 KB
Rename
Delete
asgi.py
12.46 KB
Rename
Delete
asyncio.py
4.65 KB
Rename
Delete
asyncpg.py
6.37 KB
Rename
Delete
atexit.py
1.61 KB
Rename
Delete
aws_lambda.py
17.53 KB
Rename
Delete
beam.py
5.06 KB
Rename
Delete
boto3.py
4.31 KB
Rename
Delete
bottle.py
6.46 KB
Rename
Delete
chalice.py
4.59 KB
Rename
Delete
clickhouse_driver.py
5.12 KB
Rename
Delete
cloud_resource_context.py
7.60 KB
Rename
Delete
cohere.py
9.05 KB
Rename
Delete
dedupe.py
1.38 KB
Rename
Delete
dramatiq.py
5.45 KB
Rename
Delete
excepthook.py
2.35 KB
Rename
Delete
executing.py
1.95 KB
Rename
Delete
falcon.py
9.28 KB
Rename
Delete
fastapi.py
4.61 KB
Rename
Delete
flask.py
8.54 KB
Rename
Delete
gcp.py
8.08 KB
Rename
Delete
gnu_backtrace.py
2.83 KB
Rename
Delete
gql.py
4.08 KB
Rename
Delete
graphene.py
4.92 KB
Rename
Delete
httpx.py
5.73 KB
Rename
Delete
huey.py
5.32 KB
Rename
Delete
huggingface_hub.py
6.38 KB
Rename
Delete
langchain.py
17.30 KB
Rename
Delete
launchdarkly.py
1.90 KB
Rename
Delete
litestar.py
11.30 KB
Rename
Delete
logging.py
13.09 KB
Rename
Delete
loguru.py
3.77 KB
Rename
Delete
modules.py
820 bytes
Rename
Delete
openai.py
15.19 KB
Rename
Delete
openfeature.py
1.27 KB
Rename
Delete
pure_eval.py
4.47 KB
Rename
Delete
pymongo.py
6.23 KB
Rename
Delete
pyramid.py
7.19 KB
Rename
Delete
quart.py
7.26 KB
Rename
Delete
ray.py
4.06 KB
Rename
Delete
rq.py
5.18 KB
Rename
Delete
rust_tracing.py
8.87 KB
Rename
Delete
sanic.py
12.66 KB
Rename
Delete
serverless.py
1.76 KB
Rename
Delete
socket.py
3.09 KB
Rename
Delete
sqlalchemy.py
4.27 KB
Rename
Delete
starlette.py
25.69 KB
Rename
Delete
starlite.py
10.37 KB
Rename
Delete
statsig.py
1.20 KB
Rename
Delete
stdlib.py
8.62 KB
Rename
Delete
strawberry.py
13.79 KB
Rename
Delete
sys_exit.py
2.43 KB
Rename
Delete
threading.py
3.92 KB
Rename
Delete
tornado.py
7.05 KB
Rename
Delete
trytond.py
1.61 KB
Rename
Delete
typer.py
1.77 KB
Rename
Delete
unleash.py
1.05 KB
Rename
Delete
wsgi.py
10.50 KB
Rename
Delete
from collections import OrderedDict from functools import wraps import sentry_sdk from sentry_sdk.ai.monitoring import set_ai_pipeline_name, record_token_usage from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.ai.utils import set_data_normalized from sentry_sdk.scope import should_send_default_pii from sentry_sdk.tracing import Span from sentry_sdk.integrations import DidNotEnable, Integration from sentry_sdk.utils import logger, capture_internal_exceptions from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Any, List, Callable, Dict, Union, Optional from uuid import UUID try: from langchain_core.messages import BaseMessage from langchain_core.outputs import LLMResult from langchain_core.callbacks import ( manager, BaseCallbackHandler, ) from langchain_core.agents import AgentAction, AgentFinish except ImportError: raise DidNotEnable("langchain not installed") DATA_FIELDS = { "temperature": SPANDATA.AI_TEMPERATURE, "top_p": SPANDATA.AI_TOP_P, "top_k": SPANDATA.AI_TOP_K, "function_call": SPANDATA.AI_FUNCTION_CALL, "tool_calls": SPANDATA.AI_TOOL_CALLS, "tools": SPANDATA.AI_TOOLS, "response_format": SPANDATA.AI_RESPONSE_FORMAT, "logit_bias": SPANDATA.AI_LOGIT_BIAS, "tags": SPANDATA.AI_TAGS, } # To avoid double collecting tokens, we do *not* measure # token counts for models for which we have an explicit integration NO_COLLECT_TOKEN_MODELS = [ "openai-chat", "anthropic-chat", "cohere-chat", "huggingface_endpoint", ] class LangchainIntegration(Integration): identifier = "langchain" origin = f"auto.ai.{identifier}" # The most number of spans (e.g., LLM calls) that can be processed at the same time. max_spans = 1024 def __init__( self, include_prompts=True, max_spans=1024, tiktoken_encoding_name=None ): # type: (LangchainIntegration, bool, int, Optional[str]) -> None self.include_prompts = include_prompts self.max_spans = max_spans self.tiktoken_encoding_name = tiktoken_encoding_name @staticmethod def setup_once(): # type: () -> None manager._configure = _wrap_configure(manager._configure) class WatchedSpan: span = None # type: Span num_completion_tokens = 0 # type: int num_prompt_tokens = 0 # type: int no_collect_tokens = False # type: bool children = [] # type: List[WatchedSpan] is_pipeline = False # type: bool def __init__(self, span): # type: (Span) -> None self.span = span class SentryLangchainCallback(BaseCallbackHandler): # type: ignore[misc] """Base callback handler that can be used to handle callbacks from langchain.""" span_map = OrderedDict() # type: OrderedDict[UUID, WatchedSpan] max_span_map_size = 0 def __init__(self, max_span_map_size, include_prompts, tiktoken_encoding_name=None): # type: (int, bool, Optional[str]) -> None self.max_span_map_size = max_span_map_size self.include_prompts = include_prompts self.tiktoken_encoding = None if tiktoken_encoding_name is not None: import tiktoken # type: ignore self.tiktoken_encoding = tiktoken.get_encoding(tiktoken_encoding_name) def count_tokens(self, s): # type: (str) -> int if self.tiktoken_encoding is not None: return len(self.tiktoken_encoding.encode_ordinary(s)) return 0 def gc_span_map(self): # type: () -> None while len(self.span_map) > self.max_span_map_size: run_id, watched_span = self.span_map.popitem(last=False) self._exit_span(watched_span, run_id) def _handle_error(self, run_id, error): # type: (UUID, Any) -> None if not run_id or run_id not in self.span_map: return span_data = self.span_map[run_id] if not span_data: return sentry_sdk.capture_exception(error, span_data.span.scope) span_data.span.__exit__(None, None, None) del self.span_map[run_id] def _normalize_langchain_message(self, message): # type: (BaseMessage) -> Any parsed = {"content": message.content, "role": message.type} parsed.update(message.additional_kwargs) return parsed def _create_span(self, run_id, parent_id, **kwargs): # type: (SentryLangchainCallback, UUID, Optional[Any], Any) -> WatchedSpan watched_span = None # type: Optional[WatchedSpan] if parent_id: parent_span = self.span_map.get(parent_id) # type: Optional[WatchedSpan] if parent_span: watched_span = WatchedSpan(parent_span.span.start_child(**kwargs)) parent_span.children.append(watched_span) if watched_span is None: watched_span = WatchedSpan(sentry_sdk.start_span(**kwargs)) if kwargs.get("op", "").startswith("ai.pipeline."): if kwargs.get("name"): set_ai_pipeline_name(kwargs.get("name")) watched_span.is_pipeline = True watched_span.span.__enter__() self.span_map[run_id] = watched_span self.gc_span_map() return watched_span def _exit_span(self, span_data, run_id): # type: (SentryLangchainCallback, WatchedSpan, UUID) -> None if span_data.is_pipeline: set_ai_pipeline_name(None) span_data.span.__exit__(None, None, None) del self.span_map[run_id] def on_llm_start( self, serialized, prompts, *, run_id, tags=None, parent_run_id=None, metadata=None, **kwargs, ): # type: (SentryLangchainCallback, Dict[str, Any], List[str], UUID, Optional[List[str]], Optional[UUID], Optional[Dict[str, Any]], Any) -> Any """Run when LLM starts running.""" with capture_internal_exceptions(): if not run_id: return all_params = kwargs.get("invocation_params", {}) all_params.update(serialized.get("kwargs", {})) watched_span = self._create_span( run_id, kwargs.get("parent_run_id"), op=OP.LANGCHAIN_RUN, name=kwargs.get("name") or "Langchain LLM call", origin=LangchainIntegration.origin, ) span = watched_span.span if should_send_default_pii() and self.include_prompts: set_data_normalized(span, SPANDATA.AI_INPUT_MESSAGES, prompts) for k, v in DATA_FIELDS.items(): if k in all_params: set_data_normalized(span, v, all_params[k]) def on_chat_model_start(self, serialized, messages, *, run_id, **kwargs): # type: (SentryLangchainCallback, Dict[str, Any], List[List[BaseMessage]], UUID, Any) -> Any """Run when Chat Model starts running.""" with capture_internal_exceptions(): if not run_id: return all_params = kwargs.get("invocation_params", {}) all_params.update(serialized.get("kwargs", {})) watched_span = self._create_span( run_id, kwargs.get("parent_run_id"), op=OP.LANGCHAIN_CHAT_COMPLETIONS_CREATE, name=kwargs.get("name") or "Langchain Chat Model", origin=LangchainIntegration.origin, ) span = watched_span.span model = all_params.get( "model", all_params.get("model_name", all_params.get("model_id")) ) watched_span.no_collect_tokens = any( x in all_params.get("_type", "") for x in NO_COLLECT_TOKEN_MODELS ) if not model and "anthropic" in all_params.get("_type"): model = "claude-2" if model: span.set_data(SPANDATA.AI_MODEL_ID, model) if should_send_default_pii() and self.include_prompts: set_data_normalized( span, SPANDATA.AI_INPUT_MESSAGES, [ [self._normalize_langchain_message(x) for x in list_] for list_ in messages ], ) for k, v in DATA_FIELDS.items(): if k in all_params: set_data_normalized(span, v, all_params[k]) if not watched_span.no_collect_tokens: for list_ in messages: for message in list_: self.span_map[run_id].num_prompt_tokens += self.count_tokens( message.content ) + self.count_tokens(message.type) def on_llm_new_token(self, token, *, run_id, **kwargs): # type: (SentryLangchainCallback, str, UUID, Any) -> Any """Run on new LLM token. Only available when streaming is enabled.""" with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span_data = self.span_map[run_id] if not span_data or span_data.no_collect_tokens: return span_data.num_completion_tokens += self.count_tokens(token) def on_llm_end(self, response, *, run_id, **kwargs): # type: (SentryLangchainCallback, LLMResult, UUID, Any) -> Any """Run when LLM ends running.""" with capture_internal_exceptions(): if not run_id: return token_usage = ( response.llm_output.get("token_usage") if response.llm_output else None ) span_data = self.span_map[run_id] if not span_data: return if should_send_default_pii() and self.include_prompts: set_data_normalized( span_data.span, SPANDATA.AI_RESPONSES, [[x.text for x in list_] for list_ in response.generations], ) if not span_data.no_collect_tokens: if token_usage: record_token_usage( span_data.span, token_usage.get("prompt_tokens"), token_usage.get("completion_tokens"), token_usage.get("total_tokens"), ) else: record_token_usage( span_data.span, span_data.num_prompt_tokens, span_data.num_completion_tokens, ) self._exit_span(span_data, run_id) def on_llm_error(self, error, *, run_id, **kwargs): # type: (SentryLangchainCallback, Union[Exception, KeyboardInterrupt], UUID, Any) -> Any """Run when LLM errors.""" with capture_internal_exceptions(): self._handle_error(run_id, error) def on_chain_start(self, serialized, inputs, *, run_id, **kwargs): # type: (SentryLangchainCallback, Dict[str, Any], Dict[str, Any], UUID, Any) -> Any """Run when chain starts running.""" with capture_internal_exceptions(): if not run_id: return watched_span = self._create_span( run_id, kwargs.get("parent_run_id"), op=( OP.LANGCHAIN_RUN if kwargs.get("parent_run_id") is not None else OP.LANGCHAIN_PIPELINE ), name=kwargs.get("name") or "Chain execution", origin=LangchainIntegration.origin, ) metadata = kwargs.get("metadata") if metadata: set_data_normalized(watched_span.span, SPANDATA.AI_METADATA, metadata) def on_chain_end(self, outputs, *, run_id, **kwargs): # type: (SentryLangchainCallback, Dict[str, Any], UUID, Any) -> Any """Run when chain ends running.""" with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span_data = self.span_map[run_id] if not span_data: return self._exit_span(span_data, run_id) def on_chain_error(self, error, *, run_id, **kwargs): # type: (SentryLangchainCallback, Union[Exception, KeyboardInterrupt], UUID, Any) -> Any """Run when chain errors.""" self._handle_error(run_id, error) def on_agent_action(self, action, *, run_id, **kwargs): # type: (SentryLangchainCallback, AgentAction, UUID, Any) -> Any with capture_internal_exceptions(): if not run_id: return watched_span = self._create_span( run_id, kwargs.get("parent_run_id"), op=OP.LANGCHAIN_AGENT, name=action.tool or "AI tool usage", origin=LangchainIntegration.origin, ) if action.tool_input and should_send_default_pii() and self.include_prompts: set_data_normalized( watched_span.span, SPANDATA.AI_INPUT_MESSAGES, action.tool_input ) def on_agent_finish(self, finish, *, run_id, **kwargs): # type: (SentryLangchainCallback, AgentFinish, UUID, Any) -> Any with capture_internal_exceptions(): if not run_id: return span_data = self.span_map[run_id] if not span_data: return if should_send_default_pii() and self.include_prompts: set_data_normalized( span_data.span, SPANDATA.AI_RESPONSES, finish.return_values.items() ) self._exit_span(span_data, run_id) def on_tool_start(self, serialized, input_str, *, run_id, **kwargs): # type: (SentryLangchainCallback, Dict[str, Any], str, UUID, Any) -> Any """Run when tool starts running.""" with capture_internal_exceptions(): if not run_id: return watched_span = self._create_span( run_id, kwargs.get("parent_run_id"), op=OP.LANGCHAIN_TOOL, name=serialized.get("name") or kwargs.get("name") or "AI tool usage", origin=LangchainIntegration.origin, ) if should_send_default_pii() and self.include_prompts: set_data_normalized( watched_span.span, SPANDATA.AI_INPUT_MESSAGES, kwargs.get("inputs", [input_str]), ) if kwargs.get("metadata"): set_data_normalized( watched_span.span, SPANDATA.AI_METADATA, kwargs.get("metadata") ) def on_tool_end(self, output, *, run_id, **kwargs): # type: (SentryLangchainCallback, str, UUID, Any) -> Any """Run when tool ends running.""" with capture_internal_exceptions(): if not run_id or run_id not in self.span_map: return span_data = self.span_map[run_id] if not span_data: return if should_send_default_pii() and self.include_prompts: set_data_normalized(span_data.span, SPANDATA.AI_RESPONSES, output) self._exit_span(span_data, run_id) def on_tool_error(self, error, *args, run_id, **kwargs): # type: (SentryLangchainCallback, Union[Exception, KeyboardInterrupt], UUID, Any) -> Any """Run when tool errors.""" self._handle_error(run_id, error) def _wrap_configure(f): # type: (Callable[..., Any]) -> Callable[..., Any] @wraps(f) def new_configure(*args, **kwargs): # type: (Any, Any) -> Any integration = sentry_sdk.get_client().get_integration(LangchainIntegration) if integration is None: return f(*args, **kwargs) with capture_internal_exceptions(): new_callbacks = [] # type: List[BaseCallbackHandler] if "local_callbacks" in kwargs: existing_callbacks = kwargs["local_callbacks"] kwargs["local_callbacks"] = new_callbacks elif len(args) > 2: existing_callbacks = args[2] args = ( args[0], args[1], new_callbacks, ) + args[3:] else: existing_callbacks = [] if existing_callbacks: if isinstance(existing_callbacks, list): for cb in existing_callbacks: new_callbacks.append(cb) elif isinstance(existing_callbacks, BaseCallbackHandler): new_callbacks.append(existing_callbacks) else: logger.debug("Unknown callback type: %s", existing_callbacks) already_added = False for callback in new_callbacks: if isinstance(callback, SentryLangchainCallback): already_added = True if not already_added: new_callbacks.append( SentryLangchainCallback( integration.max_spans, integration.include_prompts, integration.tiktoken_encoding_name, ) ) return f(*args, **kwargs) return new_configure
Save