| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791 |
- #!/usr/bin/env python3
- """
- ABOUTME: Shared token estimation utilities for audit scripts
- ABOUTME: XML sanitization helpers for document processing
- """
- import json
- import os
- import re
- try:
- from google import genai
- from google.genai import types
- HAS_GEMINI = True
- except ImportError: # pragma: no cover - optional dependency
- genai = None
- types = None
- HAS_GEMINI = False
- try:
- import openai
- HAS_OPENAI = True
- except ImportError: # pragma: no cover - optional dependency
- openai = None
- HAS_OPENAI = False
- def estimate_tokens(text: str) -> int:
- """
- Estimate token count for LLM context management.
- Uses a weighted formula based on character types:
- - Chinese characters: ~0.75 tokens per character (subword tokenization)
- - JSON structural characters (brackets, quotes, commas): ~1 tokens per character
- - Other characters (English, numbers, symbols): ~0.4 tokens per character (~3 chars/token)
- Includes 5% buffer and safety offset for special formatting and system prompt overhead.
- Args:
- text: Input text to estimate tokens for
- Returns:
- int: Estimated token count
- """
- if not text:
- return 0
- chinese_count = len(re.findall(r"[\u4e00-\u9fa5]", text))
- json_chars_count = len(re.findall(r'[\[\]",{}]', text))
- other_count = len(text) - chinese_count - json_chars_count
- base_estimate = (
- (chinese_count * 0.75) + (json_chars_count * 1) + (other_count * 0.4)
- )
- final_tokens = int(base_estimate * 1.05) + 2
- return final_tokens
- def sanitize_xml_string(text: str) -> str:
- """
- Remove control characters that are illegal in XML 1.0.
- XML 1.0 allows: #x9 (tab), #xA (LF), #xD (CR), and #x20-#xD7FF, #xE000-#xFFFD, #x10000-#x10FFFF
- This function removes all other control characters (0x00-0x08, 0x0B, 0x0C, 0x0E-0x1F).
- Args:
- text: Text that may contain control characters
- Returns:
- Sanitized text safe for XML. Returns input unchanged if not a non-empty string.
- """
- if not text or not isinstance(text, str):
- return text
- # Build a translation table to remove illegal control characters
- # Keep: \t (0x09), \n (0x0A), \r (0x0D)
- # Remove: 0x00-0x08, 0x0B, 0x0C, 0x0E-0x1F
- illegal_chars = "".join(chr(c) for c in range(0x20) if c not in (0x09, 0x0A, 0x0D))
- return text.translate(str.maketrans("", "", illegal_chars))
- def is_vertex_ai_mode() -> bool:
- """
- Check if Vertex AI mode is enabled via environment variable.
- Returns:
- True if GOOGLE_GENAI_USE_VERTEXAI is set to 'true', False otherwise
- """
- return os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "").lower() == "true"
- def create_gemini_client(use_async: bool = False):
- """
- Create Gemini client for AI Studio or Vertex AI.
- Supports two modes:
- - AI Studio (default): Uses GOOGLE_API_KEY for authentication
- - Vertex AI: Uses ADC (GOOGLE_APPLICATION_CREDENTIALS or gcloud auth)
- Environment variables for Vertex AI mode:
- - GOOGLE_GENAI_USE_VERTEXAI: Set to 'true' to enable Vertex AI mode
- - GOOGLE_CLOUD_PROJECT: Required GCP project ID
- - GOOGLE_CLOUD_LOCATION: Optional region (default: us-central1)
- - GOOGLE_VERTEX_BASE_URL: Optional custom API endpoint (for API gateway proxies)
- - GOOGLE_APPLICATION_CREDENTIALS: Path to service account JSON (or use gcloud auth)
- Args:
- use_async: If True, return the async client (.aio), otherwise return sync client
- Returns:
- Gemini client instance (sync or async based on use_async parameter)
- Raises:
- ValueError: If required environment variables are not set
- """
- use_vertex = is_vertex_ai_mode()
- if use_vertex:
- # Vertex AI mode - uses ADC (GOOGLE_APPLICATION_CREDENTIALS or gcloud auth)
- project = os.getenv("GOOGLE_CLOUD_PROJECT")
- location = os.getenv("GOOGLE_CLOUD_LOCATION", "us-central1")
- base_url = os.getenv("GOOGLE_VERTEX_BASE_URL")
- if not project:
- raise ValueError(
- "GOOGLE_CLOUD_PROJECT is required for Vertex AI mode. "
- "Set GOOGLE_GENAI_USE_VERTEXAI=false to use AI Studio mode instead."
- )
- # Build http_options only if custom base_url is specified
- http_options = None
- if base_url:
- http_options = {"base_url": base_url}
- # Note: ADC handles authentication automatically
- # via GOOGLE_APPLICATION_CREDENTIALS env var or gcloud auth
- client = genai.Client(
- vertexai=True, project=project, location=location, http_options=http_options
- )
- else:
- # AI Studio mode - requires API key
- api_key = os.getenv("GOOGLE_API_KEY")
- if not api_key:
- raise ValueError(
- "GOOGLE_API_KEY is required for AI Studio mode. "
- "Set GOOGLE_GENAI_USE_VERTEXAI=true and configure GCP credentials for Vertex AI mode."
- )
- client = genai.Client(api_key=api_key)
- # Return async or sync client based on parameter
- return client.aio if use_async else client
- def get_gemini_provider_name() -> str:
- """
- Get the Gemini provider name based on current mode.
- Returns:
- Provider name string for display purposes
- """
- if is_vertex_ai_mode():
- project = os.getenv("GOOGLE_CLOUD_PROJECT", "unknown")
- location = os.getenv("GOOGLE_CLOUD_LOCATION", "us-central1")
- return f"Google Gemini (Vertex AI: {project}/{location})"
- return "Google Gemini (AI Studio)"
- def create_openai_client(use_async: bool = True):
- """
- Create OpenAI client with optional custom base URL.
- Environment variables:
- - OPENAI_API_KEY: Required API key
- - OPENAI_BASE_URL: Optional custom API endpoint (for proxies, Azure, etc.)
- Args:
- use_async: If True, return AsyncOpenAI, otherwise return OpenAI
- Returns:
- OpenAI client instance (async or sync based on use_async parameter)
- Raises:
- ValueError: If OPENAI_API_KEY is not set
- """
- if not HAS_OPENAI:
- raise ValueError("openai library is not installed.")
- api_key = os.getenv("OPENAI_API_KEY")
- if not api_key:
- raise ValueError("OPENAI_API_KEY is required for OpenAI mode.")
- base_url = os.getenv("OPENAI_BASE_URL")
- if use_async:
- return openai.AsyncOpenAI(base_url=base_url)
- return openai.OpenAI(base_url=base_url)
- def get_openai_provider_name() -> str:
- """
- Get the OpenAI provider name, including custom endpoint if configured.
- Returns:
- Provider name string for display purposes
- """
- base_url = os.getenv("OPENAI_BASE_URL")
- if base_url:
- return f"OpenAI (Custom: {base_url})"
- return "OpenAI"
- def is_openai_reasoning_model(model_name: str) -> bool:
- """
- Check if the OpenAI model supports reasoning_effort parameter.
- Models that support reasoning_effort:
- - o-series: o1, o3, o4 and their variants (o1-mini, o1-2024-12-17, etc.)
- - gpt-5 series: gpt-5, gpt-5.2, gpt-5-turbo, etc.
- Non-reasoning models like gpt-4.1, gpt-4o, etc. will reject this parameter.
- Handles proxy/router prefixes like "openai/o1-mini" or "openrouter/gpt-5.2".
- Args:
- model_name: The OpenAI model name (may include path prefix)
- Returns:
- True if the model supports reasoning_effort, False otherwise
- """
- model_lower = model_name.lower()
- # Handle proxy/router prefixes like "openai/o1-mini", "openrouter/gpt-5.2"
- # Extract the base model name after the last "/"
- if "/" in model_lower:
- model_lower = model_lower.rsplit("/", 1)[-1]
- # Match o-series and gpt-5 series
- return model_lower.startswith(("o1", "o3", "o4", "gpt-5"))
- def is_openai_retryable(error: Exception) -> bool:
- """
- Determine if an OpenAI error should be retried.
- Non-retryable errors:
- - AuthenticationError (401): Invalid API key
- - PermissionDeniedError (403): No access to resource
- - BadRequestError (400): Invalid request format
- - NotFoundError (404): Model or resource not found
- Retryable errors:
- - RateLimitError (429): Rate limit exceeded
- - APIConnectionError: Network issues
- - InternalServerError (500): Server errors
- - APIStatusError with 502, 503, 504: Gateway/service errors
- Args:
- error: The exception from OpenAI API call
- Returns:
- True if the error should be retried, False otherwise
- """
- if not HAS_OPENAI:
- return True
- # Authentication error - invalid API key (401)
- if isinstance(error, openai.AuthenticationError):
- return False
- # Permission denied - no access to resource (403)
- if isinstance(error, openai.PermissionDeniedError):
- return False
- # Bad request - invalid request format (400)
- if isinstance(error, openai.BadRequestError):
- return False
- # Not found - model or resource doesn't exist (404)
- if isinstance(error, openai.NotFoundError):
- return False
- # Rate limit exceeded - should retry with backoff (429)
- if isinstance(error, openai.RateLimitError):
- return True
- # API connection error - network issues, should retry
- if isinstance(error, openai.APIConnectionError):
- return True
- # Internal server error - should retry (500)
- if isinstance(error, openai.InternalServerError):
- return True
- # For other APIStatusError, check HTTP status code
- if isinstance(error, openai.APIStatusError):
- # Retryable server-side errors
- return error.status_code in (429, 500, 502, 503, 504)
- # For unknown errors, default to retry (network issues, timeouts, etc.)
- return True
- def is_gemini_retryable(error: Exception) -> bool:
- """
- Determine if a Gemini error should be retried.
- Uses string matching on error messages since google-genai may not have
- well-defined exception types for all error cases.
- Non-retryable errors:
- - API key errors
- - Authentication/permission errors
- - Invalid request errors
- - Model not found errors
- - Billing/quota permanently exceeded
- Retryable errors:
- - Rate limit (429)
- - Server errors (500, 502, 503, 504)
- - Timeout/connection errors
- Args:
- error: The exception from Gemini API call
- Returns:
- True if the error should be retried, False otherwise
- """
- error_str = str(error).lower()
- # API key / authentication errors - do not retry
- if "api_key" in error_str or "api key" in error_str:
- return False
- if "authentication" in error_str or "authenticate" in error_str:
- return False
- if "invalid_api_key" in error_str or "invalid api key" in error_str:
- return False
- # Permission / forbidden errors - do not retry
- if "permission" in error_str and "denied" in error_str:
- return False
- if "forbidden" in error_str or "403" in error_str:
- return False
- # Invalid request errors - do not retry
- if "invalid" in error_str and ("request" in error_str or "argument" in error_str):
- return False
- if "400" in error_str and "bad request" in error_str:
- return False
- # Model not found - do not retry
- if "model" in error_str and ("not found" in error_str or "not exist" in error_str):
- return False
- if "404" in error_str:
- return False
- # Billing / permanent quota errors - do not retry
- if "billing" in error_str:
- return False
- if "quota" in error_str and ("exceeded" in error_str or "exhausted" in error_str):
- # Check if it mentions billing which indicates permanent quota issue
- if "billing" in error_str or "payment" in error_str:
- return False
- # Temporary quota (rate limit) - should retry
- return True
- # Rate limit errors - should retry (429)
- if "rate" in error_str and "limit" in error_str:
- return True
- if "429" in error_str or "resource_exhausted" in error_str:
- return True
- # Server errors - should retry (500, 502, 503, 504)
- if any(code in error_str for code in ["500", "502", "503", "504"]):
- return True
- if "internal" in error_str and ("error" in error_str or "server" in error_str):
- return True
- if "service" in error_str and "unavailable" in error_str:
- return True
- if "gateway" in error_str:
- return True
- # Timeout / connection errors - should retry
- if "timeout" in error_str or "timed out" in error_str:
- return True
- if "connection" in error_str:
- return True
- if "network" in error_str:
- return True
- # Unknown errors - default to retry with limited attempts
- return True
- # JSON Schema for LLM structured output
- AUDIT_RESULT_SCHEMA = {
- "type": "object",
- "additionalProperties": False,
- "properties": {
- "is_violation": {
- "type": "boolean",
- "description": "Whether any violations were found",
- },
- "violations": {
- "type": "array",
- "description": "List of violations found",
- "items": {
- "type": "object",
- "additionalProperties": False,
- "properties": {
- "rule_id": {
- "type": "string",
- "description": "ID of the violated rule (e.g., R001)",
- },
- "violation_text": {
- "type": "string",
- "description": "The problematic text directly verbatim quote from the source content, and not span multiple cells",
- },
- "violation_reason": {
- "type": "string",
- "description": "Explanation of why this violates the rule",
- },
- "fix_action": {
- "type": "string",
- "enum": ["replace", "manual"],
- "description": "Action type: replace substitutes text (including deletion-via-replace), manual requires human review",
- },
- "revised_text": {
- "type": "string",
- "description": "For replace: complete replacement text (including deletion-via-replace). For manual: additional guidance for human reviewer",
- },
- },
- "required": [
- "rule_id",
- "violation_text",
- "violation_reason",
- "fix_action",
- "revised_text",
- ],
- },
- },
- },
- "required": ["is_violation", "violations"],
- }
- # JSON Schema for global extraction output
- GLOBAL_EXTRACT_SCHEMA = {
- "type": "object",
- "additionalProperties": False,
- "properties": {
- "results": {
- "type": "array",
- "items": {
- "type": "object",
- "additionalProperties": False,
- "properties": {
- "rule_id": {"type": "string"},
- "extracted_results": {
- "type": "array",
- "items": {
- "type": "object",
- "additionalProperties": False,
- "properties": {
- "entity": {"type": "string"},
- "fields": {
- "type": "array",
- "items": {
- "type": "object",
- "additionalProperties": False,
- "properties": {
- "name": {"type": "string"},
- "value": {"type": "string"},
- "evidence": {"type": "string"},
- },
- "required": ["name", "value", "evidence"],
- },
- },
- },
- "required": ["entity", "fields"],
- },
- },
- },
- "required": ["rule_id", "extracted_results"],
- },
- }
- },
- "required": ["results"],
- }
- # JSON Schema for global verification output
- GLOBAL_VERIFY_SCHEMA = {
- "type": "object",
- "additionalProperties": False,
- "properties": {
- "violations": {
- "type": "array",
- "items": {
- "type": "object",
- "additionalProperties": False,
- "properties": {
- "rule_id": {"type": "string"},
- "uuid": {"type": "string"},
- "uuid_end": {"type": "string"},
- "violation_text": {"type": "string"},
- "violation_reason": {"type": "string"},
- "fix_action": {"type": "string", "enum": ["replace", "manual"]},
- "revised_text": {"type": "string"},
- },
- "required": [
- "rule_id",
- "uuid",
- "uuid_end",
- "violation_text",
- "violation_reason",
- "fix_action",
- "revised_text",
- ],
- },
- }
- },
- "required": ["violations"],
- }
- async def global_extract_gemini_async(
- user_prompt: str,
- system_prompt: str,
- model_name: str,
- client,
- thinking_level: str = None,
- thinking_budget: int = None,
- ) -> dict:
- thinking_config = None
- if thinking_level and thinking_level.upper() in (
- "MINIMAL",
- "LOW",
- "MEDIUM",
- "HIGH",
- ):
- level_map = {
- "MINIMAL": types.ThinkingLevel.MINIMAL,
- "LOW": types.ThinkingLevel.LOW,
- "MEDIUM": types.ThinkingLevel.MEDIUM,
- "HIGH": types.ThinkingLevel.HIGH,
- }
- thinking_config = types.ThinkingConfig(
- thinking_level=level_map[thinking_level.upper()]
- )
- elif thinking_budget is not None:
- thinking_config = types.ThinkingConfig(thinking_budget=int(thinking_budget))
- config_params = {
- "system_instruction": system_prompt,
- "response_mime_type": "application/json",
- "response_schema": GLOBAL_EXTRACT_SCHEMA,
- }
- if thinking_config:
- config_params["thinking_config"] = thinking_config
- response = await client.models.generate_content(
- model=model_name,
- contents=user_prompt,
- config=types.GenerateContentConfig(**config_params),
- )
- return json.loads(response.text)
- async def global_extract_openai_async(
- user_prompt: str,
- system_prompt: str,
- model_name: str,
- client,
- reasoning_effort: str = None,
- ) -> dict:
- request_params = {
- "model": model_name,
- "messages": [
- {"role": "system", "content": system_prompt},
- {"role": "user", "content": user_prompt},
- ],
- "response_format": {
- "type": "json_schema",
- "json_schema": {
- "name": "global_extract",
- "strict": True,
- "schema": GLOBAL_EXTRACT_SCHEMA,
- },
- },
- }
- if (
- reasoning_effort
- and reasoning_effort.lower() in ("low", "medium", "high")
- and is_openai_reasoning_model(model_name)
- ):
- request_params["reasoning_effort"] = reasoning_effort.lower()
- response = await client.chat.completions.create(**request_params)
- return json.loads(response.choices[0].message.content)
- async def global_verify_gemini_async(
- user_prompt: str,
- system_prompt: str,
- model_name: str,
- client,
- thinking_level: str = None,
- thinking_budget: int = None,
- ) -> dict:
- thinking_config = None
- if thinking_level and thinking_level.upper() in (
- "MINIMAL",
- "LOW",
- "MEDIUM",
- "HIGH",
- ):
- level_map = {
- "MINIMAL": types.ThinkingLevel.MINIMAL,
- "LOW": types.ThinkingLevel.LOW,
- "MEDIUM": types.ThinkingLevel.MEDIUM,
- "HIGH": types.ThinkingLevel.HIGH,
- }
- thinking_config = types.ThinkingConfig(
- thinking_level=level_map[thinking_level.upper()]
- )
- elif thinking_budget is not None:
- thinking_config = types.ThinkingConfig(thinking_budget=int(thinking_budget))
- config_params = {
- "system_instruction": system_prompt,
- "response_mime_type": "application/json",
- "response_schema": GLOBAL_VERIFY_SCHEMA,
- }
- if thinking_config:
- config_params["thinking_config"] = thinking_config
- response = await client.models.generate_content(
- model=model_name,
- contents=user_prompt,
- config=types.GenerateContentConfig(**config_params),
- )
- return json.loads(response.text)
- async def global_verify_openai_async(
- user_prompt: str,
- system_prompt: str,
- model_name: str,
- client,
- reasoning_effort: str = None,
- ) -> dict:
- request_params = {
- "model": model_name,
- "messages": [
- {"role": "system", "content": system_prompt},
- {"role": "user", "content": user_prompt},
- ],
- "response_format": {
- "type": "json_schema",
- "json_schema": {
- "name": "global_verify",
- "strict": True,
- "schema": GLOBAL_VERIFY_SCHEMA,
- },
- },
- }
- if (
- reasoning_effort
- and reasoning_effort.lower() in ("low", "medium", "high")
- and is_openai_reasoning_model(model_name)
- ):
- request_params["reasoning_effort"] = reasoning_effort.lower()
- response = await client.chat.completions.create(**request_params)
- return json.loads(response.choices[0].message.content)
- async def audit_block_gemini_async(
- user_prompt: str,
- system_prompt: str,
- model_name: str,
- client,
- thinking_level: str = None,
- thinking_budget: int = None,
- ) -> dict:
- """
- Audit a text block using Google Gemini with strict JSON mode (async version).
- Args:
- user_prompt: User prompt to audit
- system_prompt: Cached system prompt with rules and instructions
- model_name: Gemini model to use
- client: Gemini async client instance (client.aio)
- thinking_level: Thinking level for Gemini 3 models (MINIMAL, LOW, MEDIUM, HIGH)
- thinking_budget: Thinking token budget for Gemini 2.5 models (integer)
- Returns:
- Audit result dictionary
- """
- # Build thinking config based on model and parameters
- thinking_config = None
- if thinking_level and thinking_level.upper() in (
- "MINIMAL",
- "LOW",
- "MEDIUM",
- "HIGH",
- ):
- # For Gemini 3 models
- level_map = {
- "MINIMAL": types.ThinkingLevel.MINIMAL,
- "LOW": types.ThinkingLevel.LOW,
- "MEDIUM": types.ThinkingLevel.MEDIUM,
- "HIGH": types.ThinkingLevel.HIGH,
- }
- thinking_config = types.ThinkingConfig(
- thinking_level=level_map[thinking_level.upper()]
- )
- elif thinking_budget is not None:
- # For Gemini 2.5 models
- thinking_config = types.ThinkingConfig(thinking_budget=int(thinking_budget))
- config_params = {
- "system_instruction": system_prompt,
- "response_mime_type": "application/json",
- "response_schema": AUDIT_RESULT_SCHEMA,
- }
- # Only add thinking_config if it's configured
- if thinking_config:
- config_params["thinking_config"] = thinking_config
- response = await client.models.generate_content(
- model=model_name,
- contents=user_prompt,
- config=types.GenerateContentConfig(**config_params),
- )
- # With structured output, response is guaranteed to be valid JSON
- result = json.loads(response.text)
- return result
- async def audit_block_openai_async(
- user_prompt: str,
- system_prompt: str,
- model_name: str,
- client,
- reasoning_effort: str = None,
- ) -> dict:
- """
- Audit a text block using OpenAI with strict JSON mode (async version).
- Args:
- user_prompt: User prompt to audit
- system_prompt: Cached system prompt with rules and instructions
- model_name: OpenAI model to use
- client: AsyncOpenAI client instance
- reasoning_effort: Reasoning effort for o-series models (low, medium, high)
- Returns:
- Audit result dictionary
- """
- request_params = {
- "model": model_name,
- "messages": [
- {"role": "system", "content": system_prompt},
- {"role": "user", "content": user_prompt},
- ],
- "response_format": {
- "type": "json_schema",
- "json_schema": {
- "name": "audit_result",
- "strict": True,
- "schema": AUDIT_RESULT_SCHEMA,
- },
- },
- }
- # Add reasoning_effort only for o-series models that support it
- if (
- reasoning_effort
- and reasoning_effort.lower() in ("low", "medium", "high")
- and is_openai_reasoning_model(model_name)
- ):
- request_params["reasoning_effort"] = reasoning_effort.lower()
- response = await client.chat.completions.create(**request_params)
- # With structured output, response is guaranteed to be valid JSON
- result = json.loads(response.choices[0].message.content)
- return result
|