Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,5 @@ cython_debug/
node_modules
tsp-output/
.aider*

demo_app/
196 changes: 126 additions & 70 deletions agent/api/agent_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
from compiler.core import Compiler
from fsm_core.llm_common import get_sync_client

from anthropic.types import Message

from .models import (
AgentRequest,
AgentSseEvent,
AgentMessage,
AgentRequest,
AgentSseEvent,
AgentMessage,
UserMessage,
ConversationMessage,
AgentStatus,
AgentStatus,
MessageKind,
ErrorResponse,
)
Expand Down Expand Up @@ -52,7 +54,7 @@ async def lifespan(app: FastAPI):

class AgentSession:
"""Manages a single agent session and its state machine"""

def __init__(self, chatbot_id: str, trace_id: str, settings: Optional[Dict[str, Any]] = None):
"""Initialize a new agent session"""
self.chatbot_id = chatbot_id
Expand All @@ -73,7 +75,7 @@ def __init__(self, chatbot_id: str, trace_id: str, settings: Optional[Dict[str,
self.compiler = Compiler("botbuild/tsp_compiler", "botbuild/app_schema")
self._initialize_app()


def _initialize_app(self):
"""Initialize the application instance"""
logger.info(f"Initializing application for trace {self.trace_id}")
Expand All @@ -82,52 +84,103 @@ def _initialize_app(self):
self.processor_instance = FSMToolProcessor(self.fsm_api)
self.messages = []
logger.info(f"Application initialized for trace {self.trace_id}")




@staticmethod
def reconstruct_dialogue(input_messages: List[ConversationMessage]):
messages = [{
"role": "user" if msg.role == "user" else "assistant",
"content": msg.content
}
for msg in input_messages]
return messages


def initialize_fsm(self, messages: List[ConversationMessage], agent_state: Optional[Dict[str, Any]] = None):
"""Initialize the FSM with messages and optional state"""
logger.info(f"Initializing FSM for trace {self.trace_id}")
logger.debug(f"Agent state present: {agent_state is not None}")

if agent_state:
logger.info(f"Setting external state for trace {self.trace_id}")
self.fsm_api.set_full_external_state(agent_state)

# Extract user messages from the conversation history
user_messages = [msg.content for msg in messages if hasattr(msg, "role") and msg.role == "user"]
app_description = "\n".join(user_messages)
logger.debug(f"App description length: {len(app_description)}")
self.messages = [{"role": "user", "content": app_description}]

initial_prompt = """You are a software engineering expert who can generate application code using a code generation framework. This framework uses a Finite State Machine (FSM) to guide the generation process.

Your task is to control the FSM through the following stages of code generation:
1. TypeSpec schema (API specification)
2. Drizzle schema (database models)
3. TypeScript types and interfaces
4. Handler test files
5. Handler implementation files

To successfully complete this task, follow these steps:

1. Start a new FSM session using the start_fsm tool.
2. For each component generated by the FSM:
a. Carefully review the output.
b. Decide whether to confirm the output or provide feedback for improvement.
c. Use the appropriate tool (confirm_state or provide_feedback) based on your decision.
3. Repeat step 2 until all components have been generated and confirmed.
4. Use the complete_fsm tool to finalize the process and retrieve all artifacts.

During your review process, consider the following questions:
- Does the code correctly implement the application requirements?
- Are there any errors or inconsistencies?
- Could anything be improved or clarified?
- Does it match other requirements mentioned in the dialogue?

When providing feedback, be specific and actionable. If you're unsure about any aspect, ask for clarification before proceeding.
Do not ask too technical questions unless absolutely necessary. Instead, focus on the user requirements and delegate technical details to the FSM. Do not mention the FSM in your feedback, as it is an internal tool.

Do not consider the work complete until all five components (TypeSpec schema, Drizzle schema, TypeScript types and interfaces, handler test files, and handler implementation files) have been generated and the complete_fsm tool has been called."""
system_message = UserMessage(
content=initial_prompt,
role="user",
)
self.messages = self.reconstruct_dialogue([system_message] + messages)
logger.info(f"Starting FSM for trace {self.trace_id}")
result = self.processor_instance.tool_start_fsm(app_description)
logger.info(f"FSM started for trace {self.trace_id}")
return result


return

@property
def user_answered(self) -> bool:
"""Check if the user has answered"""
if not self.messages:
return False
last_message = self.messages[-1]
if isinstance(last_message, dict):
return last_message.get("role") == "user"
logger.warning(f"Last message : {last_message}")
return last_message.role == "user"

@property
def work_in_progress(self) -> bool:
return self.processor_instance.work_in_progress.locked()

def get_state(self) -> Dict[str, Any]:
"""Get the current FSM state"""
try:
logger.debug(f"Getting state for trace {self.trace_id}")
return self.fsm_api.get_full_external_state()
except Exception as e:
logger.error(f"Error getting state for trace {self.trace_id}: {str(e)}")
except Exception:
logger.info(f"Getting empty state for trace {self.trace_id} - FSM not initialized")
return {}

def bake_app_diff(self, app_out: Dict[str, Any]) -> str:
"""Bake the app diff"""
interpolator = Interpolator()
with tempfile.TemporaryDirectory() as temp_dir:
patch_on_template = interpolator.bake(app_out, temp_dir)
logger.info(f"Baked app successfully into {temp_dir}")
return patch_on_template

def process_step(self) -> Optional[AgentSseEvent]:
"""Process a single step and return an SSE event"""
if not self.processor_instance:
logger.warning(f"No processor instance found for trace {self.trace_id}")
return None

try:
logger.info(f"Processing step for trace {self.trace_id}")
new_message, is_complete, final_tool_result = run_with_claude(self.processor_instance, self.llm_client, self.messages)
Expand All @@ -138,7 +191,7 @@ def process_step(self) -> Optional[AgentSseEvent]:

if new_message:
self.messages.append(new_message)

app_diff = None
if final_tool_result:
logger.info(f"Final tool result for trace {self.trace_id}: {final_tool_result}")
Expand All @@ -152,17 +205,16 @@ def process_step(self) -> Optional[AgentSseEvent]:
message=AgentMessage(
role="agent",
kind=MessageKind.STAGE_RESULT,
content=new_message["content"] if new_message else final_tool_result,
content=new_message["content"] if new_message else final_tool_result,
agent_state=self.get_state(),
unified_diff=app_diff
)
)

logger.info(f"No new message generated for trace {self.trace_id}")
return None

except Exception as e:
logger.error(f"Error in process_step: {str(e)}")
logger.exception(f"Error in process_step for trace {self.trace_id}")
self.is_complete = True
return AgentSseEvent(
status=AgentStatus.IDLE,
Expand All @@ -177,23 +229,23 @@ def process_step(self) -> Optional[AgentSseEvent]:
)


def advance_fsm(self) -> bool:
"""
Advance the FSM state. Returns True if more steps are needed,
False if the FSM is complete or has reached a terminal state.
"""
def check_if_ready(self) -> bool:
if self.is_complete:
logger.info(f"FSM is already complete for trace {self.trace_id}")
return False
if not self.processor_instance:
logger.warning(f"No processor instance found for trace {self.trace_id}")
logger.info(f"FSM is already complete for trace {self.trace_id}")
return True

if self.work_in_progress:
logger.info(f"FSM is still processing for trace {self.trace_id}")
return False


if not self.user_answered:
logger.info(f"User has not answered for trace {self.trace_id}")
return True

logger.info(f"FSM should continue for trace {self.trace_id}")
return True
return False



def cleanup(self):
"""Cleanup resources for this session"""
logger.info(f"Cleaning up resources for trace {self.trace_id}")
Expand All @@ -204,56 +256,61 @@ def cleanup(self):


async def get_agent_session(
chatbot_id: str,
trace_id: str,
chatbot_id: str,
trace_id: str,
settings: Optional[Dict[str, Any]] = None
) -> AgentSession:
"""Get or create an agent session"""
session_key = f"{chatbot_id}:{trace_id}"

if session_key not in active_agents:
logger.info(f"Creating new agent session for {session_key}")
active_agents[session_key] = AgentSession(chatbot_id, trace_id, settings)

return active_agents[session_key]

async def sse_event_generator(session: AgentSession, messages: List[ConversationMessage], agent_state: Optional[Dict[str, Any]] = None) -> AsyncGenerator[str, None]:
"""Generate SSE events for the agent session"""
try:
logger.info(f"Initializing FSM for trace {session.trace_id}")
await run_in_threadpool(session.initialize_fsm, messages, agent_state)

logger.info(f"Processing initial step for trace {session.trace_id}")
initial_event = await run_in_threadpool(session.process_step)
if initial_event:
logger.info(f"Sending initial event for trace {session.trace_id}")
yield f"data: {initial_event.to_json()}\n\n"


waiting_for_user = False
while True:
logger.info(f"Checking if FSM should continue for trace {session.trace_id}")
should_continue = await run_in_threadpool(session.advance_fsm)
if not should_continue:
logger.info(f"FSM complete, processing final step for trace {session.trace_id}")
final_event = await run_in_threadpool(session.process_step)
if final_event:
logger.info(f"Sending final event for trace {session.trace_id}")
yield f"data: {final_event.to_json()}\n\n"
break

is_ready = await run_in_threadpool(session.check_if_ready)
logger.info(f"FSM ready status for trace {session.trace_id}: {is_ready}")
if is_ready:
if not session.user_answered:
# waiting for user input, stop the stream and wait
break
else:
final_event = await run_in_threadpool(session.process_step)
if final_event:
logger.info(f"Sending final event for trace {session.trace_id}")
yield f"data: {final_event.to_json()}\n\n"
break

logger.info(f"Processing next step for trace {session.trace_id}")
event = await run_in_threadpool(session.process_step)
if event:
logger.info(f"Sending event with status {event.status} for trace {session.trace_id}")
yield f"data: {event.to_json()}\n\n"

if event and event.status == AgentStatus.IDLE:
logger.info(f"Agent is idle, stopping event stream for trace {session.trace_id}")
break

await asyncio.sleep(0.1)



except Exception as e:
logger.error(f"Error in SSE generator: {str(e)}")
logger.exception(f"Error in SSE generator, session {session.trace_id}")
logger.debug(f"Creating error event with trace_id: {session.trace_id}")
error_event = AgentSseEvent(
status=AgentStatus.IDLE,
Expand All @@ -277,7 +334,7 @@ async def sse_event_generator(session: AgentSession, messages: List[Conversation
async def message(request: AgentRequest) -> StreamingResponse:
"""
Send a message to the agent and stream responses via SSE.

The server responds with a stream of Server-Sent Events (SSE).
Each event contains a JSON payload with status updates.
"""
Expand All @@ -286,18 +343,17 @@ async def message(request: AgentRequest) -> StreamingResponse:
logger.debug(f"Request settings: {request.settings}")
logger.debug(f"Number of messages: {len(request.all_messages)}")
logger.debug(f"Request as JSON: {request.to_json()}")

session = await get_agent_session(
request.chatbot_id,
request.trace_id,
request.chatbot_id,
request.trace_id,
request.settings
)

logger.info(f"Starting SSE stream for chatbot {request.chatbot_id}, trace {request.trace_id}")
return StreamingResponse(
sse_event_generator(
session,
request.all_messages,
session,
request.all_messages,
request.agent_state
),
media_type="text/event-stream"
Expand All @@ -318,4 +374,4 @@ async def message(request: AgentRequest) -> StreamingResponse:
async def healthcheck():
"""Health check endpoint"""
logger.debug("Health check requested")
return {"status": "healthy"}
return {"status": "healthy"}
Loading
Loading