diff --git a/src/agent.rs b/src/agent.rs index 805e785..405e01e 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -20,7 +20,7 @@ pub struct ServiceInfo { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum AgentEvent { - NewRequirement { workflow_id: String, requirement: String, template_id: Option }, + NewRequirement { workflow_id: String, requirement: String, template_id: Option, #[serde(default)] worker: Option }, Comment { workflow_id: String, content: String }, } @@ -113,7 +113,7 @@ impl AgentManager { /// Dispatch an event to a worker. pub async fn send_event(self: &Arc, project_id: &str, event: AgentEvent) { match event { - AgentEvent::NewRequirement { workflow_id, requirement, template_id } => { + AgentEvent::NewRequirement { workflow_id, requirement, template_id, worker } => { // Generate title (heuristic) let title = generate_title_heuristic(&requirement); let _ = sqlx::query("UPDATE projects SET name = ? WHERE id = ?") @@ -154,12 +154,12 @@ impl AgentManager { }; // Retry dispatch up to 3 times (worker might be reconnecting) - let mut dispatch_result = self.worker_mgr.assign_workflow(assign.clone()).await; + let mut dispatch_result = self.worker_mgr.assign_workflow(assign.clone(), worker.as_deref()).await; for attempt in 1..3 { if dispatch_result.is_ok() { break; } tracing::warn!("Dispatch attempt {} failed, retrying in 5s...", attempt); tokio::time::sleep(std::time::Duration::from_secs(5)).await; - dispatch_result = self.worker_mgr.assign_workflow(assign.clone()).await; + dispatch_result = self.worker_mgr.assign_workflow(assign.clone(), worker.as_deref()).await; } match dispatch_result { @@ -247,7 +247,7 @@ impl AgentManager { .clone() }; - match self.worker_mgr.assign_workflow(assign).await { + match self.worker_mgr.assign_workflow(assign, None).await { Ok(name) => { let _ = sqlx::query("UPDATE workflows SET status = 'executing', status_reason = '' WHERE id = ?") .bind(&workflow_id).execute(&self.pool).await; diff --git a/src/api/workers.rs b/src/api/workers.rs index 7fbb982..9311a6f 100644 --- a/src/api/workers.rs +++ b/src/api/workers.rs @@ -6,6 +6,7 @@ use crate::worker::WorkerInfo; async fn list_workers(State(state): State>) -> Json> { let workers = state.agent_mgr.worker_mgr.list().await; + // WorkerInfo already contains `name` field from registration let entries: Vec = workers.into_iter().map(|(_, info)| info).collect(); Json(entries) } diff --git a/src/api/workflows.rs b/src/api/workflows.rs index ad47203..0e5ca15 100644 --- a/src/api/workflows.rs +++ b/src/api/workflows.rs @@ -25,6 +25,8 @@ pub struct CreateWorkflow { pub requirement: String, #[serde(default)] pub template_id: Option, + #[serde(default)] + pub worker: Option, } #[derive(Deserialize)] @@ -78,6 +80,7 @@ async fn create_workflow( workflow_id: workflow.id.clone(), requirement: workflow.requirement.clone(), template_id: input.template_id, + worker: input.worker, }).await; Ok(Json(workflow)) diff --git a/src/main.rs b/src/main.rs index fd44172..f78a002 100644 --- a/src/main.rs +++ b/src/main.rs @@ -211,6 +211,7 @@ async fn resume_workflows(pool: SqlitePool, agent_mgr: Arc) workflow_id, requirement, template_id: None, + worker: None, }).await; } } diff --git a/src/timer.rs b/src/timer.rs index ebdab7e..20d08fe 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -69,6 +69,7 @@ async fn check_timers(pool: &SqlitePool, agent_mgr: &Arc) -> anyho workflow_id, requirement: timer.requirement.clone(), template_id: None, + worker: None, }).await; } diff --git a/src/worker.rs b/src/worker.rs index 878cb50..863488c 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -118,17 +118,22 @@ impl WorkerManager { .collect() } - /// Assign a workflow to the first available worker. Returns worker name. - pub async fn assign_workflow(&self, assign: ServerToWorker) -> Result { + /// Assign a workflow to a worker. If `preferred` is specified, use that worker; + /// otherwise pick the first available. + pub async fn assign_workflow(&self, assign: ServerToWorker, preferred: Option<&str>) -> Result { let workflow_id = match &assign { ServerToWorker::WorkflowAssign { workflow_id, .. } => workflow_id.clone(), _ => return Err("Not a workflow assignment".into()), }; let workers = self.workers.read().await; - // Pick first worker (simple strategy for now) - let (name, worker) = workers.iter().next() - .ok_or_else(|| "No workers available".to_string())?; + let (name, worker) = if let Some(pref) = preferred { + workers.get_key_value(pref) + .ok_or_else(|| format!("Worker '{}' not available", pref))? + } else { + workers.iter().next() + .ok_or_else(|| "No workers available".to_string())? + }; worker.tx.send(assign).await.map_err(|_| { format!("Worker '{}' disconnected", name) diff --git a/web/src/api.ts b/web/src/api.ts index 92197d8..9250aad 100644 --- a/web/src/api.ts +++ b/web/src/api.ts @@ -45,10 +45,10 @@ export const api = { listWorkflows: (projectId: string) => request(`/projects/${projectId}/workflows`), - createWorkflow: (projectId: string, requirement: string, templateId?: string) => + createWorkflow: (projectId: string, requirement: string, templateId?: string, worker?: string) => request(`/projects/${projectId}/workflows`, { method: 'POST', - body: JSON.stringify({ requirement, template_id: templateId || undefined }), + body: JSON.stringify({ requirement, template_id: templateId || undefined, worker: worker || undefined }), }), listTemplates: () => @@ -93,6 +93,9 @@ export const api = { deleteTimer: (timerId: string) => request(`/timers/${timerId}`, { method: 'DELETE' }), + listWorkers: () => + request<{ name: string; cpu: string; memory: string; gpu: string }[]>('/workers'), + getKb: () => request<{ content: string }>('/kb'), listArticles: () => request('/kb/articles'), diff --git a/web/src/components/RequirementSection.vue b/web/src/components/RequirementSection.vue index a547973..dc1c686 100644 --- a/web/src/components/RequirementSection.vue +++ b/web/src/components/RequirementSection.vue @@ -1,5 +1,6 @@ @@ -47,7 +55,13 @@ function submit() { rows="8" @keydown.ctrl.enter="submit" /> - +
+ + +
@@ -148,8 +162,23 @@ function submit() { border-color: var(--accent); } +.submit-row { + display: flex; + align-items: center; + justify-content: flex-end; + gap: 8px; +} + +.worker-select { + padding: 6px 10px; + font-size: 12px; + background: var(--bg-secondary); + border: 1px solid var(--border); + border-radius: 6px; + color: var(--text-primary); +} + .btn-submit { - align-self: flex-end; background: var(--accent); color: var(--bg-primary); font-weight: 600; diff --git a/web/src/components/WorkflowView.vue b/web/src/components/WorkflowView.vue index d862ba8..c4ee9dd 100644 --- a/web/src/components/WorkflowView.vue +++ b/web/src/components/WorkflowView.vue @@ -147,9 +147,9 @@ watch(() => props.projectId, () => { setupWs() }) -async function onSubmitRequirement(text: string) { +async function onSubmitRequirement(text: string, worker?: string) { try { - const wf = await api.createWorkflow(props.projectId, text) + const wf = await api.createWorkflow(props.projectId, text, undefined, worker) workflow.value = wf logEntries.value = [] planSteps.value = []