diff --git a/.gitignore b/.gitignore index 2e36b5d9..6c4b60f9 100644 --- a/.gitignore +++ b/.gitignore @@ -163,3 +163,5 @@ cython_debug/ node_modules tsp-output/ .aider* + +demo_app/ \ No newline at end of file diff --git a/agent/api/agent_server/server.py b/agent/api/agent_server/server.py index eba8a93e..dd540eaf 100644 --- a/agent/api/agent_server/server.py +++ b/agent/api/agent_server/server.py @@ -18,13 +18,15 @@ from compiler.core import Compiler from fsm_core.llm_common import get_sync_client +from anthropic.types import Message + from .models import ( - AgentRequest, - AgentSseEvent, - AgentMessage, + AgentRequest, + AgentSseEvent, + AgentMessage, UserMessage, ConversationMessage, - AgentStatus, + AgentStatus, MessageKind, ErrorResponse, ) @@ -52,7 +54,7 @@ async def lifespan(app: FastAPI): class AgentSession: """Manages a single agent session and its state machine""" - + def __init__(self, chatbot_id: str, trace_id: str, settings: Optional[Dict[str, Any]] = None): """Initialize a new agent session""" self.chatbot_id = chatbot_id @@ -73,7 +75,7 @@ def __init__(self, chatbot_id: str, trace_id: str, settings: Optional[Dict[str, self.compiler = Compiler("botbuild/tsp_compiler", "botbuild/app_schema") self._initialize_app() - + def _initialize_app(self): """Initialize the application instance""" logger.info(f"Initializing application for trace {self.trace_id}") @@ -82,38 +84,89 @@ def _initialize_app(self): self.processor_instance = FSMToolProcessor(self.fsm_api) self.messages = [] logger.info(f"Application initialized for trace {self.trace_id}") - - + + + @staticmethod + def reconstruct_dialogue(input_messages: List[ConversationMessage]): + messages = [{ + "role": "user" if msg.role == "user" else "assistant", + "content": msg.content + } + for msg in input_messages] + return messages + + def initialize_fsm(self, messages: List[ConversationMessage], agent_state: Optional[Dict[str, Any]] = None): """Initialize the FSM with messages and optional state""" logger.info(f"Initializing FSM for trace {self.trace_id}") logger.debug(f"Agent state present: {agent_state is not None}") - + if agent_state: logger.info(f"Setting external state for trace {self.trace_id}") self.fsm_api.set_full_external_state(agent_state) # Extract user messages from the conversation history - user_messages = [msg.content for msg in messages if hasattr(msg, "role") and msg.role == "user"] - app_description = "\n".join(user_messages) - logger.debug(f"App description length: {len(app_description)}") - self.messages = [{"role": "user", "content": app_description}] - + initial_prompt = """You are a software engineering expert who can generate application code using a code generation framework. This framework uses a Finite State Machine (FSM) to guide the generation process. + +Your task is to control the FSM through the following stages of code generation: +1. TypeSpec schema (API specification) +2. Drizzle schema (database models) +3. TypeScript types and interfaces +4. Handler test files +5. Handler implementation files + +To successfully complete this task, follow these steps: + +1. Start a new FSM session using the start_fsm tool. +2. For each component generated by the FSM: + a. Carefully review the output. + b. Decide whether to confirm the output or provide feedback for improvement. + c. Use the appropriate tool (confirm_state or provide_feedback) based on your decision. +3. Repeat step 2 until all components have been generated and confirmed. +4. Use the complete_fsm tool to finalize the process and retrieve all artifacts. + +During your review process, consider the following questions: +- Does the code correctly implement the application requirements? +- Are there any errors or inconsistencies? +- Could anything be improved or clarified? +- Does it match other requirements mentioned in the dialogue? + +When providing feedback, be specific and actionable. If you're unsure about any aspect, ask for clarification before proceeding. +Do not ask too technical questions unless absolutely necessary. Instead, focus on the user requirements and delegate technical details to the FSM. Do not mention the FSM in your feedback, as it is an internal tool. + +Do not consider the work complete until all five components (TypeSpec schema, Drizzle schema, TypeScript types and interfaces, handler test files, and handler implementation files) have been generated and the complete_fsm tool has been called.""" + system_message = UserMessage( + content=initial_prompt, + role="user", + ) + self.messages = self.reconstruct_dialogue([system_message] + messages) logger.info(f"Starting FSM for trace {self.trace_id}") - result = self.processor_instance.tool_start_fsm(app_description) - logger.info(f"FSM started for trace {self.trace_id}") - return result - - + return + + @property + def user_answered(self) -> bool: + """Check if the user has answered""" + if not self.messages: + return False + last_message = self.messages[-1] + if isinstance(last_message, dict): + return last_message.get("role") == "user" + logger.warning(f"Last message : {last_message}") + return last_message.role == "user" + + @property + def work_in_progress(self) -> bool: + return self.processor_instance.work_in_progress.locked() + def get_state(self) -> Dict[str, Any]: """Get the current FSM state""" try: logger.debug(f"Getting state for trace {self.trace_id}") return self.fsm_api.get_full_external_state() - except Exception as e: - logger.error(f"Error getting state for trace {self.trace_id}: {str(e)}") + except Exception: + logger.info(f"Getting empty state for trace {self.trace_id} - FSM not initialized") return {} - + def bake_app_diff(self, app_out: Dict[str, Any]) -> str: """Bake the app diff""" interpolator = Interpolator() @@ -121,13 +174,13 @@ def bake_app_diff(self, app_out: Dict[str, Any]) -> str: patch_on_template = interpolator.bake(app_out, temp_dir) logger.info(f"Baked app successfully into {temp_dir}") return patch_on_template - + def process_step(self) -> Optional[AgentSseEvent]: """Process a single step and return an SSE event""" if not self.processor_instance: logger.warning(f"No processor instance found for trace {self.trace_id}") return None - + try: logger.info(f"Processing step for trace {self.trace_id}") new_message, is_complete, final_tool_result = run_with_claude(self.processor_instance, self.llm_client, self.messages) @@ -138,7 +191,7 @@ def process_step(self) -> Optional[AgentSseEvent]: if new_message: self.messages.append(new_message) - + app_diff = None if final_tool_result: logger.info(f"Final tool result for trace {self.trace_id}: {final_tool_result}") @@ -152,17 +205,16 @@ def process_step(self) -> Optional[AgentSseEvent]: message=AgentMessage( role="agent", kind=MessageKind.STAGE_RESULT, - content=new_message["content"] if new_message else final_tool_result, + content=new_message["content"] if new_message else final_tool_result, agent_state=self.get_state(), unified_diff=app_diff ) ) - logger.info(f"No new message generated for trace {self.trace_id}") return None - + except Exception as e: - logger.error(f"Error in process_step: {str(e)}") + logger.exception(f"Error in process_step for trace {self.trace_id}") self.is_complete = True return AgentSseEvent( status=AgentStatus.IDLE, @@ -177,23 +229,23 @@ def process_step(self) -> Optional[AgentSseEvent]: ) - def advance_fsm(self) -> bool: - """ - Advance the FSM state. Returns True if more steps are needed, - False if the FSM is complete or has reached a terminal state. - """ + def check_if_ready(self) -> bool: if self.is_complete: - logger.info(f"FSM is already complete for trace {self.trace_id}") - return False - - if not self.processor_instance: - logger.warning(f"No processor instance found for trace {self.trace_id}") + logger.info(f"FSM is already complete for trace {self.trace_id}") + return True + + if self.work_in_progress: + logger.info(f"FSM is still processing for trace {self.trace_id}") return False - + + if not self.user_answered: + logger.info(f"User has not answered for trace {self.trace_id}") + return True + logger.info(f"FSM should continue for trace {self.trace_id}") - return True + return False + - def cleanup(self): """Cleanup resources for this session""" logger.info(f"Cleaning up resources for trace {self.trace_id}") @@ -204,56 +256,61 @@ def cleanup(self): async def get_agent_session( - chatbot_id: str, - trace_id: str, + chatbot_id: str, + trace_id: str, settings: Optional[Dict[str, Any]] = None ) -> AgentSession: """Get or create an agent session""" session_key = f"{chatbot_id}:{trace_id}" - + if session_key not in active_agents: logger.info(f"Creating new agent session for {session_key}") active_agents[session_key] = AgentSession(chatbot_id, trace_id, settings) - + return active_agents[session_key] async def sse_event_generator(session: AgentSession, messages: List[ConversationMessage], agent_state: Optional[Dict[str, Any]] = None) -> AsyncGenerator[str, None]: """Generate SSE events for the agent session""" try: - logger.info(f"Initializing FSM for trace {session.trace_id}") await run_in_threadpool(session.initialize_fsm, messages, agent_state) - + logger.info(f"Processing initial step for trace {session.trace_id}") initial_event = await run_in_threadpool(session.process_step) if initial_event: logger.info(f"Sending initial event for trace {session.trace_id}") yield f"data: {initial_event.to_json()}\n\n" - + + waiting_for_user = False while True: logger.info(f"Checking if FSM should continue for trace {session.trace_id}") - should_continue = await run_in_threadpool(session.advance_fsm) - if not should_continue: - logger.info(f"FSM complete, processing final step for trace {session.trace_id}") - final_event = await run_in_threadpool(session.process_step) - if final_event: - logger.info(f"Sending final event for trace {session.trace_id}") - yield f"data: {final_event.to_json()}\n\n" - break - + is_ready = await run_in_threadpool(session.check_if_ready) + logger.info(f"FSM ready status for trace {session.trace_id}: {is_ready}") + if is_ready: + if not session.user_answered: + # waiting for user input, stop the stream and wait + break + else: + final_event = await run_in_threadpool(session.process_step) + if final_event: + logger.info(f"Sending final event for trace {session.trace_id}") + yield f"data: {final_event.to_json()}\n\n" + break + logger.info(f"Processing next step for trace {session.trace_id}") event = await run_in_threadpool(session.process_step) if event: logger.info(f"Sending event with status {event.status} for trace {session.trace_id}") yield f"data: {event.to_json()}\n\n" - + if event and event.status == AgentStatus.IDLE: logger.info(f"Agent is idle, stopping event stream for trace {session.trace_id}") break - + await asyncio.sleep(0.1) - + + except Exception as e: - logger.error(f"Error in SSE generator: {str(e)}") + logger.exception(f"Error in SSE generator, session {session.trace_id}") logger.debug(f"Creating error event with trace_id: {session.trace_id}") error_event = AgentSseEvent( status=AgentStatus.IDLE, @@ -277,7 +334,7 @@ async def sse_event_generator(session: AgentSession, messages: List[Conversation async def message(request: AgentRequest) -> StreamingResponse: """ Send a message to the agent and stream responses via SSE. - + The server responds with a stream of Server-Sent Events (SSE). Each event contains a JSON payload with status updates. """ @@ -286,18 +343,17 @@ async def message(request: AgentRequest) -> StreamingResponse: logger.debug(f"Request settings: {request.settings}") logger.debug(f"Number of messages: {len(request.all_messages)}") logger.debug(f"Request as JSON: {request.to_json()}") - session = await get_agent_session( - request.chatbot_id, - request.trace_id, + request.chatbot_id, + request.trace_id, request.settings ) - + logger.info(f"Starting SSE stream for chatbot {request.chatbot_id}, trace {request.trace_id}") return StreamingResponse( sse_event_generator( - session, - request.all_messages, + session, + request.all_messages, request.agent_state ), media_type="text/event-stream" @@ -318,4 +374,4 @@ async def message(request: AgentRequest) -> StreamingResponse: async def healthcheck(): """Health check endpoint""" logger.debug("Health check requested") - return {"status": "healthy"} \ No newline at end of file + return {"status": "healthy"} diff --git a/agent/api/agent_server/test_direct.py b/agent/api/agent_server/test_direct.py new file mode 100644 index 00000000..cbb6175b --- /dev/null +++ b/agent/api/agent_server/test_direct.py @@ -0,0 +1,81 @@ +import asyncio +import json +import uuid +import pytest +from httpx import AsyncClient, ASGITransport + +from api.agent_server.server import app + +def create_test_request(message: str) -> dict: + return { + "allMessages": [ + { + "role": "user", + "content": message + } + ], + "chatbotId": f"test-bot-{uuid.uuid4().hex[:8]}", + "traceId": uuid.uuid4().hex, + "settings": {"max-iterations": 3} + } + + +@pytest.mark.asyncio +async def test_agent_message_endpoint(): + test_request = create_test_request("hello") + + transport = ASGITransport(app=app) + + async with AsyncClient(transport=transport) as client: + # Post the test request to the /message endpoint expecting an SSE stream. + response = await client.post( + "http://test/message", + json=test_request, + headers={"Accept": "text/event-stream"}, + timeout=None # Disable timeout to allow for streaming events + ) + assert response.status_code == 200 + events = [] + async for line in response.aiter_lines(): + # SSE lines can be empty or have the "data:" prefix. + # We only care about lines starting with "data:". + if line.startswith("data:"): + # Remove the "data:" and any leading whitespace + data_str = line.split("data:", 1)[1].strip() + try: + event_json = json.loads(data_str) + events.append(event_json) + except json.JSONDecodeError: + # Skip lines that are not valid JSON + continue + + assert len(events) > 0, "No SSE events received" + print(f"Received {len(events)} events") + + for event in events: + assert "traceId" in event, "Missing traceId in SSE payload" + assert event["traceId"] == test_request["traceId"], "Trace IDs do not match" + + response = await client.post( + "http://test/message", + json=create_test_request("make me an app that tracks my lunches"), + headers={"Accept": "application/json"}, + ) + assert response.status_code == 200 + + events = [] + async for line in response.aiter_lines(): + if line.startswith("data:"): + data_str = line.split("data:", 1)[1].strip() + try: + event_json = json.loads(data_str) + events.append(event_json) + except json.JSONDecodeError: + continue + + assert len(events) > 0, "No SSE events received" + print(f"Received {len(events)} events") + + +if __name__ == "__main__": + asyncio.run(test_agent_message_endpoint()) diff --git a/agent/api/fsm_tools.py b/agent/api/fsm_tools.py index f7bdbc39..a01eb652 100644 --- a/agent/api/fsm_tools.py +++ b/agent/api/fsm_tools.py @@ -4,6 +4,7 @@ import sys from dataclasses import dataclass from anthropic.types import MessageParam +from threading import Lock from fire import Fire import jinja2 from fsm_core.llm_common import LLMClient, get_sync_client @@ -51,6 +52,7 @@ def __init__(self, fsm_api: FSMManager): fsm_api: FSM API implementation to use """ self.fsm_api = fsm_api + self.work_in_progress = Lock() # Define tool definitions for the AI agent self.tool_definitions = [ @@ -253,7 +255,9 @@ def run_with_claude(processor: FSMToolProcessor, client: LLMClient, match message.type: case "text": logger.info(f"[Claude Response] Message: {message.text}") + new_message = {"role": "assistant", "content": message.text} case "tool_use": + processor.work_in_progress.acquire() tool_use = message.to_dict() tool_params = tool_use['input'] @@ -261,7 +265,11 @@ def run_with_claude(processor: FSMToolProcessor, client: LLMClient, tool_method = processor.tool_mapping.get(tool_use['name']) if tool_method: - result: ToolResult = tool_method(**tool_params) + try: + result: ToolResult = tool_method(**tool_params) + except Exception as e: + logger.exception(f"[Claude Response] Error executing tool {tool_use['name']}: {str(e)}") + result = ToolResult(success=False, error=str(e)) logger.info(f"[Claude Response] Tool result: {result.to_dict()}") # Special cases for determining if the interaction is complete @@ -274,8 +282,12 @@ def run_with_claude(processor: FSMToolProcessor, client: LLMClient, "tool": tool_use['name'], "result": result }) + else: raise ValueError(f"Unexpected tool name: {tool_use['name']}") + + processor.work_in_progress.release() + case _: raise ValueError(f"Unexpected message type: {message.type}") @@ -315,7 +327,7 @@ def run_with_claude(processor: FSMToolProcessor, client: LLMClient, return new_message, is_complete, final_tool_result else: # No tools were used - return None, is_complete, final_tool_result + return new_message, is_complete, final_tool_result def main(initial_prompt: str = "A simple greeting app that says hello in five languages"): """ @@ -379,7 +391,7 @@ def main(initial_prompt: str = "A simple greeting app that says hello in five la client, current_messages ) - + logger.info(f"[Main] New message: {new_message}") if new_message: current_messages = current_messages + [new_message] diff --git a/agent/fullstack/README.md b/agent/fullstack/README.md index f9d5783f..2deeac75 100644 --- a/agent/fullstack/README.md +++ b/agent/fullstack/README.md @@ -14,7 +14,7 @@ Generates full stack apps using trpc + shadcn components. ### Running generated code -Chande directory: +Change directory: `cd demo_app` @@ -22,10 +22,6 @@ Configure postgres address: `export DATABASE_URL=postgres://postgres:postgres@postgres:5432/postgres` -Apply migrations: - -`bun run db:push` - Start the app: `bun run dev:all` @@ -36,4 +32,29 @@ Start the app: ### Running with docker - doesn't have hot reload +Change directory: + +`cd demo_app` + +Run through docker compose: `docker compose up --build` + +This will apply DB migrations and start the server/client. + +Just open the browser and go to `http://localhost:80` + +### Deploying to fly + +Change directory: + +`cd demo_app` + +Run the following command to deploy the app: + +`fly deploy` - this will use the Dockerfile in the root to build the app. + +If you don't have a fly.toml file, you can create one by running: + +`fly launch` + +This will create a new app and deploy it to fly. diff --git a/agent/fullstack/prefabs/trpc_fullstack/Dockerfile b/agent/fullstack/prefabs/trpc_fullstack/Dockerfile new file mode 100644 index 00000000..a96d0aa7 --- /dev/null +++ b/agent/fullstack/prefabs/trpc_fullstack/Dockerfile @@ -0,0 +1,48 @@ +# Build stage for both client and server +FROM oven/bun:1 as builder + +# Set working directory +WORKDIR /app + +# Copy package files +COPY package.json bun.lockb ./ +COPY client/package.json ./client/ +COPY server/package.json ./server/ + +# Install dependencies +RUN bun install --frozen-lockfile + +# Copy source code +COPY . . + +# Build client and server +RUN cd client && bun run build +RUN cd server && bun run build + +# Final stage - use oven/bun as base to avoid compatibility issues +FROM oven/bun:1 + +# Install nginx directly +RUN apt-get update && apt-get install -y nginx && apt-get clean + +# Copy frontend files +COPY --from=builder /app/client/dist /usr/share/nginx/html +COPY ./client/nginx/nginx.conf /etc/nginx/conf.d/default.conf + +# Modify nginx config - in fly it needs to be 8080 +RUN sed -i 's/listen 80;/listen 8080;/g' /etc/nginx/conf.d/default.conf +RUN sed -i 's/app-backend/localhost/g' /etc/nginx/conf.d/default.conf + +# Copy backend files +COPY --from=builder /app/server /app/server +COPY --from=builder /app/node_modules /app/node_modules + +# Create a simple start script +RUN echo '#!/bin/sh\nnginx &\ncd /app/server && bun run start:build' > /start.sh +RUN chmod +x /start.sh + +# Expose port +EXPOSE 8080 + +# Use the script +CMD ["/start.sh"] \ No newline at end of file diff --git a/agent/fullstack/prefabs/trpc_fullstack/server/package.json b/agent/fullstack/prefabs/trpc_fullstack/server/package.json index ed414633..4823e6c6 100644 --- a/agent/fullstack/prefabs/trpc_fullstack/server/package.json +++ b/agent/fullstack/prefabs/trpc_fullstack/server/package.json @@ -5,7 +5,8 @@ "build": "tsc", "dev": "bun --hot src/index.ts", "db:push": "drizzle-kit push --force", - "lint": "eslint --cache src/index.ts" + "lint": "eslint --cache src/index.ts", + "start:build": "drizzle-kit push --force && node dist/src/index.js" }, "dependencies": { "@trpc/server": "npm:@trpc/server@next", diff --git a/agent/fullstack/prefabs/trpc_fullstack/server/server.Dockerfile b/agent/fullstack/prefabs/trpc_fullstack/server/server.Dockerfile index dec1603e..fce47113 100644 --- a/agent/fullstack/prefabs/trpc_fullstack/server/server.Dockerfile +++ b/agent/fullstack/prefabs/trpc_fullstack/server/server.Dockerfile @@ -25,5 +25,5 @@ RUN cd server && bun run build # Expose the server port EXPOSE 2022 -# Run the server (adjust the path if needed) -CMD ["node", "server/dist/src/index.js"] \ No newline at end of file +# Run the server +CMD cd server && bun run start:build \ No newline at end of file