use std::sync::Arc; use axum::{ extract::{Path, State, WebSocketUpgrade, ws::{Message, WebSocket}}, response::Response, routing::get, Router, }; use futures::{SinkExt, StreamExt}; use crate::agent::{AgentEvent, AgentManager}; pub fn router(agent_mgr: Arc) -> Router { Router::new() .route("/{project_id}", get(ws_handler)) .with_state(agent_mgr) } async fn ws_handler( ws: WebSocketUpgrade, State(agent_mgr): State>, Path(project_id): Path, ) -> Response { ws.on_upgrade(move |socket| handle_socket(socket, agent_mgr, project_id)) } async fn handle_socket(socket: WebSocket, agent_mgr: Arc, project_id: String) { let (mut sender, mut receiver) = socket.split(); let mut broadcast_rx = agent_mgr.get_broadcast(&project_id).await; let send_task = tokio::spawn(async move { while let Ok(msg) = broadcast_rx.recv().await { if let Ok(json) = serde_json::to_string(&msg) { if sender.send(Message::Text(json.into())).await.is_err() { break; } } } }); let mgr = agent_mgr.clone(); let pid = project_id.clone(); let recv_task = tokio::spawn(async move { while let Some(Ok(msg)) = receiver.next().await { match msg { Message::Text(text) => { if let Ok(event) = serde_json::from_str::(&text) { mgr.send_event(&pid, event).await; } } Message::Close(_) => break, _ => {} } } }); tokio::select! { _ = send_task => {}, _ = recv_task => {}, } }