observability.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. """
  2. Observability demo showing OpenAI (built-in), Langfuse and AgentOps tracing.
  3. Make sure you have correct environment variables set up prior to running the script
  4. or comment out unused tracking options.
  5. OpenAI does not require extra setup, results can be found here: https://platform.openai.com/traces
  6. For Langfuse and AgentOps, follow the setup guides below.
  7. Langfuse setup guide: https://langfuse.com/integrations/model-providers/openai-py
  8. AgentOps setup guide: https://docs.agentops.ai/v2/integrations/openai_agents_python
  9. Results can be found in the platform's respective dashboards.
  10. Run with: python examples/observability_demo.py
  11. """
  12. import asyncio
  13. import importlib.util
  14. import logging
  15. import os
  16. import sys
  17. from statistics import mean, stdev
  18. from typing import Any
  19. sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
  20. from agency_swarm import ( # noqa: E402
  21. Agency,
  22. Agent,
  23. RunContextWrapper,
  24. function_tool,
  25. trace,
  26. )
  27. logging.basicConfig(level=logging.INFO)
  28. # ────────────────────────────────
  29. # Tool definition
  30. # ────────────────────────────────
  31. @function_tool
  32. async def analyze_dataset(ctx: RunContextWrapper[Any], dataset: list[int]) -> str:
  33. """Analyze a dataset and return basic statistics."""
  34. print(f"DATASET ANALYZED by {ctx.context.current_agent_name}: {dataset}")
  35. return f"Mean: {mean(dataset):.1f}, Standard Deviation: {stdev(dataset):.1f}"
  36. # ────────────────────────────────
  37. # Agency definition (agents + flows)
  38. # ────────────────────────────────
  39. def create_agency() -> Agency:
  40. """Create agency with CEO, Developer, and Analyst."""
  41. ceo = Agent(
  42. name="CEO",
  43. instructions="You are the CEO. NEVER execute tasks yourself. Instead, DELEGATE all tasks: coding tasks to Developer and data analysis to DataAnalyst.",
  44. description="Manages projects and coordinates between team members",
  45. model="gpt-5.4-mini",
  46. )
  47. developer = Agent(
  48. name="Developer",
  49. instructions="You are the Developer. Solve coding problems by implementing solutions and writing code.",
  50. description="Implements technical solutions and writes code",
  51. model="gpt-5.4-mini",
  52. )
  53. analyst = Agent(
  54. name="DataAnalyst",
  55. instructions="You are the Data Analyst. Analyze data and provide insights. Always use analyze_dataset in your response to process the dataset.",
  56. description="Analyzes data and provides insights",
  57. model="gpt-5.4-mini",
  58. tools=[analyze_dataset],
  59. )
  60. return Agency(
  61. ceo,
  62. communication_flows=[
  63. ceo > developer,
  64. ceo > analyst,
  65. ],
  66. )
  67. # ────────────────────────────────
  68. # Example tracing wrappers
  69. # ────────────────────────────────
  70. async def openai_tracing(input_message: str) -> str:
  71. agency_instance = create_agency()
  72. with trace("OpenAI tracing"):
  73. response = await agency_instance.get_response(message=input_message)
  74. return response.final_output
  75. async def langfuse_tracing(input_message: str) -> str:
  76. if os.getenv("LANGFUSE_SECRET_KEY") is None or os.getenv("LANGFUSE_PUBLIC_KEY") is None:
  77. raise ValueError("LANGFUSE api keys are not set")
  78. from langfuse import observe
  79. agency_instance = create_agency()
  80. @observe()
  81. async def get_response_wrapper(message: str):
  82. return await agency_instance.get_response(
  83. message=message,
  84. )
  85. response = await get_response_wrapper(input_message)
  86. return response.final_output
  87. async def agentops_tracing(input_message: str) -> str:
  88. if os.getenv("AGENTOPS_API_KEY") is None:
  89. raise ValueError("AGENTOPS_API_KEY is not set")
  90. import agentops
  91. agentops.init(auto_start_session=True, trace_name="Agentops tracing", tags=["openai", "agentops-example"])
  92. tracer = agentops.start_trace(trace_name="Agentops tracing", tags=["openai", "agentops-example"])
  93. agency_instance = create_agency()
  94. response = await agency_instance.get_response(
  95. message=input_message,
  96. )
  97. agentops.end_trace(tracer, end_state="Success")
  98. return response.final_output
  99. # ────────────────────────────────
  100. # Entry point
  101. # ────────────────────────────────
  102. if __name__ == "__main__":
  103. test_message = "Create a function to calculate factorial and analyze the dataset [10, 25, 15, 30, 20]."
  104. print("Running OpenAI tracing...")
  105. print(asyncio.run(openai_tracing(test_message)))
  106. if importlib.util.find_spec("langfuse") and os.getenv("LANGFUSE_SECRET_KEY") and os.getenv("LANGFUSE_PUBLIC_KEY"):
  107. print("\nRunning Langfuse tracing...")
  108. print(asyncio.run(langfuse_tracing(test_message)))
  109. else:
  110. print("\nSkipping Langfuse tracing: install langfuse and set LANGFUSE_SECRET_KEY/LANGFUSE_PUBLIC_KEY.")
  111. if importlib.util.find_spec("agentops") and os.getenv("AGENTOPS_API_KEY"):
  112. print("\nRunning AgentOps tracing...")
  113. print(asyncio.run(agentops_tracing(test_message)))
  114. else:
  115. print("\nSkipping AgentOps tracing: install agentops and set AGENTOPS_API_KEY.")