Replace PTY chat with streaming AI chat and shared tool registry
Some checks failed
CI / lint-and-test (push) Successful in 39s
Deploy Production / deploy (push) Successful in 2m5s
CI / build (push) Has been cancelled

Replace the xterm/PTY-based chat tab with a direct-to-API streaming chat
UI that calls provider APIs (Anthropic, OpenAI) with SSE streaming and
inline tool execution.

- Extract 18 shared tools from MCP server into chat-tools.ts registry
  (knowledge, task, agent, model, web, shell, filesystem tools)
- Add streaming adapters for Anthropic and OpenAI APIs (raw fetch, no SDK)
- Add POST /api/chat SSE route with tool-use loop (max 10 rounds)
- Rewrite chat-tab.tsx as message-based UI with model selector,
  streaming text, and collapsible tool call blocks
- Refactor mcp-server.ts to consume shared tool registry
This commit is contained in:
Julia McGhee
2026-03-22 10:42:48 +00:00
parent c718b67772
commit 9e1491201b
6 changed files with 1778 additions and 563 deletions

View File

@@ -0,0 +1,197 @@
import { NextRequest } from "next/server";
import { z } from "zod";
import { getRawCredentialsByProvider, Provider } from "@/lib/credentials";
import { executeToolCall, toolsAsAnthropic, toolsAsOpenAI } from "@/lib/chat-tools";
import {
streamAnthropic,
streamOpenAI,
isAnthropicProvider,
getBaseUrl,
type ProviderMessages,
} from "@/lib/chat-providers";
import type { StreamEvent } from "@/lib/chat-types";
const MAX_TOOL_ROUNDS = 10;
const RequestSchema = z.object({
messages: z.array(
z.object({
role: z.enum(["user", "assistant"]),
content: z.string(),
toolCalls: z
.array(
z.object({
id: z.string(),
name: z.string(),
input: z.record(z.string(), z.unknown()),
output: z.string(),
}),
)
.optional(),
}),
),
model: z.string(),
provider: z.string(),
});
const SYSTEM_PROMPT = `You are the Harness control plane assistant. You help the user manage and direct autonomous coding agents, tasks, knowledge documents, and models within the Harness orchestration system.
You have access to tools for:
- Listing, reading, writing, and searching knowledge documents
- Listing, creating, starting, stopping, and inspecting tasks
- Listing configured agents and available models
Be concise and direct. When the user asks about tasks or agents, use the tools to get current state rather than guessing. When creating tasks, confirm the parameters with the user before proceeding unless they've been explicit.
Format your responses in markdown when helpful. Use code blocks for IDs, JSON, and technical values.`;
export async function POST(request: NextRequest) {
let body: z.infer<typeof RequestSchema>;
try {
body = RequestSchema.parse(await request.json());
} catch (err) {
return Response.json(
{ error: `Invalid request: ${err instanceof Error ? err.message : err}` },
{ status: 400 },
);
}
const { model, provider } = body;
// Look up credentials
const creds = await getRawCredentialsByProvider(provider as Provider);
if (creds.length === 0) {
return Response.json(
{ error: `No credentials configured for provider: ${provider}` },
{ status: 400 },
);
}
const apiKey = creds[0].token;
const baseUrl = getBaseUrl(provider, creds[0].baseUrl);
const useAnthropic = isAnthropicProvider(provider);
// Build conversation history in provider format
const providerMessages: ProviderMessages[] = [];
for (const msg of body.messages) {
if (msg.role === "user") {
providerMessages.push({ role: "user", content: msg.content });
} else if (msg.role === "assistant") {
if (msg.toolCalls && msg.toolCalls.length > 0) {
// Assistant message that made tool calls
providerMessages.push({
role: "assistant",
content: msg.content,
toolCalls: msg.toolCalls.map((tc) => ({
id: tc.id,
name: tc.name,
input: tc.input,
})),
});
// Followed by tool results as a user message
providerMessages.push({
role: "user",
content: "",
toolResults: msg.toolCalls.map((tc) => ({
id: tc.id,
content: tc.output,
})),
});
} else {
providerMessages.push({ role: "assistant", content: msg.content });
}
}
}
// SSE stream
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
function send(event: StreamEvent) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`));
}
try {
let currentMessages = [...providerMessages];
let rounds = 0;
while (rounds < MAX_TOOL_ROUNDS) {
rounds++;
let textContent = "";
const toolCalls: Array<{ id: string; name: string; input: Record<string, unknown> }> = [];
let finalUsage: { input: number; output: number } | undefined;
// Stream from provider
const events = useAnthropic
? streamAnthropic(apiKey, model, SYSTEM_PROMPT, currentMessages, toolsAsAnthropic())
: streamOpenAI(apiKey, baseUrl, model, SYSTEM_PROMPT, currentMessages, toolsAsOpenAI());
for await (const event of events) {
if (event.type === "text_delta") {
textContent += event.content;
send(event);
} else if (event.type === "tool_call_start") {
toolCalls.push({ id: event.id, name: event.name, input: event.input });
send(event);
} else if (event.type === "message_end") {
finalUsage = event.usage;
} else if (event.type === "error") {
send(event);
controller.close();
return;
}
}
// No tool calls — we're done
if (toolCalls.length === 0) {
send({ type: "message_end", model, usage: finalUsage });
break;
}
// Execute tool calls and stream results
const toolResults: Array<{ id: string; content: string }> = [];
for (const tc of toolCalls) {
const start = Date.now();
const result = await executeToolCall(tc.name, tc.input);
const durationMs = Date.now() - start;
send({ type: "tool_call_end", id: tc.id, output: result.text, durationMs });
toolResults.push({ id: tc.id, content: result.text });
}
// Append assistant + tool results to conversation for next round
currentMessages.push({
role: "assistant",
content: textContent,
toolCalls,
});
currentMessages.push({
role: "user",
content: "",
toolResults,
});
// If this was the last allowed round, send message_end
if (rounds >= MAX_TOOL_ROUNDS) {
send({ type: "message_end", model, usage: finalUsage });
}
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: "error", message: msg })}\n\n`));
} finally {
controller.close();
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}

View File

@@ -6,299 +6,570 @@ import {
Label,
Mono,
Btn,
Panel,
PanelHeader,
SearchableDropdown,
DropdownOption,
type DropdownOption,
} from "./harness-design-system";
import type { ChatMessage, ToolCallRecord, StreamEvent } from "@/lib/chat-types";
type SessionState = "idle" | "connecting" | "connected" | "disconnected";
interface Agent {
interface CuratedModel {
id: string;
name: string;
runtime: string;
modelId: string;
provider: string;
enabled: boolean;
}
export default function ChatTab({ mobile }: { mobile: boolean }) {
const [agents, setAgents] = useState<Agent[]>([]);
const [selectedAgentId, setSelectedAgentId] = useState<string>("");
const [state, setState] = useState<SessionState>("idle");
const [models, setModels] = useState<CuratedModel[]>([]);
const [selectedModel, setSelectedModel] = useState("");
const [messages, setMessages] = useState<ChatMessage[]>([]);
const [input, setInput] = useState("");
const [streaming, setStreaming] = useState(false);
const [streamText, setStreamText] = useState("");
const [streamToolCalls, setStreamToolCalls] = useState<ToolCallRecord[]>([]);
const [pendingToolIds, setPendingToolIds] = useState<Set<string>>(new Set());
const termRef = useRef<HTMLDivElement>(null);
const xtermRef = useRef<any>(null);
const fitRef = useRef<any>(null);
const wsRef = useRef<WebSocket | null>(null);
const abortRef = useRef<AbortController | null>(null);
const messagesEndRef = useRef<HTMLDivElement>(null);
const inputRef = useRef<HTMLTextAreaElement>(null);
// Fetch agents list
// Fetch curated models
useEffect(() => {
fetch("/api/agents")
fetch("/api/models/curated")
.then((r) => r.json())
.then((data) => {
setAgents(data.configs || []);
const enabled = (data.models || []).filter((m: CuratedModel) => m.enabled);
setModels(enabled);
})
.catch(() => {});
}, []);
const disconnect = useCallback(() => {
if (wsRef.current) {
wsRef.current.close();
wsRef.current = null;
}
setState("disconnected");
}, []);
// Auto-scroll on new content
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
}, [messages, streamText, streamToolCalls]);
const connect = useCallback(async () => {
if (!selectedAgentId || !termRef.current) return;
const selectedProvider = models.find((m) => m.id === selectedModel)?.provider || "";
setState("connecting");
const sendMessage = useCallback(async () => {
const text = input.trim();
if (!text || !selectedModel || streaming) return;
// Dynamic import xterm (SSR-safe)
const { Terminal } = await import("@xterm/xterm");
const { FitAddon } = await import("@xterm/addon-fit");
// CSS is imported at module level below
await import("@xterm/xterm/css/xterm.css" as any);
// Clean up previous terminal
if (xtermRef.current) {
xtermRef.current.dispose();
}
termRef.current.innerHTML = "";
const term = new Terminal({
cursorBlink: true,
fontFamily: tokens.font.mono,
fontSize: 14,
theme: {
background: tokens.color.bg1,
foreground: tokens.color.text0,
cursor: tokens.color.accent,
selectionBackground: "#374151",
},
});
const fit = new FitAddon();
term.loadAddon(fit);
term.open(termRef.current);
fit.fit();
xtermRef.current = term;
fitRef.current = fit;
const cols = term.cols;
const rows = term.rows;
const proto = location.protocol === "https:" ? "wss:" : "ws:";
const ws = new WebSocket(
`${proto}//${location.host}/ws/pty?agentId=${encodeURIComponent(selectedAgentId)}&cols=${cols}&rows=${rows}`,
);
wsRef.current = ws;
ws.onopen = () => {
setState("connected");
const userMsg: ChatMessage = {
id: `msg-${Date.now()}`,
role: "user",
content: text,
timestamp: Date.now(),
};
ws.onmessage = (e) => {
const data = e.data;
// Check for JSON control messages
if (typeof data === "string" && data.startsWith("{")) {
try {
const msg = JSON.parse(data);
if (msg.type === "exit") {
term.writeln(`\r\n\x1b[90m[session exited with code ${msg.code}]\x1b[0m`);
setState("disconnected");
return;
const newMessages = [...messages, userMsg];
setMessages(newMessages);
setInput("");
setStreaming(true);
setStreamText("");
setStreamToolCalls([]);
setPendingToolIds(new Set());
const abort = new AbortController();
abortRef.current = abort;
try {
const res = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: newMessages.map((m) => ({
role: m.role,
content: m.content,
toolCalls: m.toolCalls?.map((tc) => ({
id: tc.id,
name: tc.name,
input: tc.input,
output: tc.output,
})),
})),
model: selectedModel,
provider: selectedProvider,
}),
signal: abort.signal,
});
if (!res.ok) {
const err = await res.json().catch(() => ({ error: res.statusText }));
throw new Error(err.error || `HTTP ${res.status}`);
}
const reader = res.body!.getReader();
const decoder = new TextDecoder();
let buf = "";
let accText = "";
let accToolCalls: ToolCallRecord[] = [];
let pending = new Set<string>();
let msgModel = selectedModel;
while (true) {
const { done, value } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
const lines = buf.split("\n");
buf = lines.pop()!;
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const json = line.slice(6).trim();
if (!json) continue;
let event: StreamEvent;
try {
event = JSON.parse(json);
} catch {
continue;
}
if (msg.type === "error") {
term.writeln(`\r\n\x1b[31m[error: ${msg.message}]\x1b[0m`);
setState("disconnected");
return;
switch (event.type) {
case "text_delta":
accText += event.content;
setStreamText(accText);
break;
case "tool_call_start":
pending.add(event.id);
setPendingToolIds(new Set(pending));
accToolCalls = [
...accToolCalls,
{
id: event.id,
name: event.name,
input: event.input,
output: "",
durationMs: 0,
},
];
setStreamToolCalls([...accToolCalls]);
break;
case "tool_call_end":
pending.delete(event.id);
setPendingToolIds(new Set(pending));
accToolCalls = accToolCalls.map((tc) =>
tc.id === event.id
? { ...tc, output: event.output, durationMs: event.durationMs }
: tc,
);
setStreamToolCalls([...accToolCalls]);
break;
case "message_end":
msgModel = event.model;
break;
case "error":
accText += `\n\n**Error:** ${event.message}`;
setStreamText(accText);
break;
}
if (msg.type === "connected") {
return;
}
} catch {
// Not JSON, render as terminal output
}
}
term.write(data);
};
ws.onclose = () => {
setState("disconnected");
};
ws.onerror = () => {
setState("disconnected");
};
// Terminal → WebSocket
term.onData((data: string) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
});
// Resize handling
term.onResize(({ cols, rows }: { cols: number; rows: number }) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: "resize", cols, rows }));
}
});
}, [selectedAgentId]);
// ResizeObserver for terminal container
useEffect(() => {
const el = termRef.current;
if (!el) return;
const observer = new ResizeObserver(() => {
if (fitRef.current) {
try {
fitRef.current.fit();
} catch {
// Terminal may not be ready
// Commit the streamed message
const assistantMsg: ChatMessage = {
id: `msg-${Date.now()}`,
role: "assistant",
content: accText,
toolCalls: accToolCalls.length > 0 ? accToolCalls : undefined,
model: msgModel,
timestamp: Date.now(),
};
setMessages((prev) => [...prev, assistantMsg]);
} catch (err: any) {
if (err.name === "AbortError") {
// User cancelled
if (streamText) {
setMessages((prev) => [
...prev,
{
id: `msg-${Date.now()}`,
role: "assistant",
content: streamText + "\n\n*(cancelled)*",
timestamp: Date.now(),
},
]);
}
} else {
setMessages((prev) => [
...prev,
{
id: `msg-${Date.now()}`,
role: "assistant",
content: `**Error:** ${err.message}`,
timestamp: Date.now(),
},
]);
}
});
observer.observe(el);
return () => observer.disconnect();
}, []);
} finally {
setStreaming(false);
setStreamText("");
setStreamToolCalls([]);
setPendingToolIds(new Set());
abortRef.current = null;
}
}, [input, selectedModel, selectedProvider, streaming, messages]);
// Cleanup on unmount
useEffect(() => {
return () => {
if (wsRef.current) wsRef.current.close();
if (xtermRef.current) xtermRef.current.dispose();
};
}, []);
const handleKeyDown = (e: React.KeyboardEvent) => {
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
};
const agentOptions: DropdownOption[] = agents.map((a) => ({
value: a.id,
label: `${a.name}`,
sub: `${a.runtime} · ${a.modelId}`,
const clearConversation = () => {
setMessages([]);
setStreamText("");
setStreamToolCalls([]);
};
const stopStreaming = () => {
abortRef.current?.abort();
};
// Model dropdown options grouped by provider
const modelOptions: DropdownOption[] = models.map((m) => ({
value: m.id,
label: m.name,
detail: m.provider,
}));
const statusColor =
state === "connected"
? tokens.color.pass
: state === "connecting"
? tokens.color.warn
: state === "disconnected"
? tokens.color.fail
: tokens.color.muted;
const statusLabel =
state === "connected"
? "CONNECTED"
: state === "connecting"
? "CONNECTING"
: state === "disconnected"
? "DISCONNECTED"
: "IDLE";
return (
<div
style={{
flex: 1,
display: "flex",
flexDirection: "column",
padding: mobile ? tokens.space[3] : tokens.space[5],
gap: tokens.space[4],
overflow: "hidden",
}}
>
{/* Header bar */}
{/* Header */}
<div
style={{
display: "flex",
alignItems: "center",
gap: tokens.space[4],
gap: tokens.space[3],
padding: `${tokens.space[3]}px ${mobile ? tokens.space[3] : tokens.space[5]}px`,
borderBottom: `1px solid ${tokens.color.border0}`,
flexWrap: "wrap",
}}
>
<div style={{ minWidth: 240, flex: mobile ? 1 : undefined }}>
<div style={{ minWidth: 220, flex: mobile ? 1 : undefined, maxWidth: 360 }}>
<SearchableDropdown
options={agentOptions}
value={selectedAgentId}
onChange={(v) => setSelectedAgentId(Array.isArray(v) ? v[0] : v)}
placeholder="Select agent..."
options={modelOptions}
value={selectedModel}
onChange={(v) => setSelectedModel(Array.isArray(v) ? v[0] : v)}
placeholder="Select model..."
/>
</div>
{state === "connected" ? (
<Btn onClick={disconnect} style={{ background: tokens.color.failDim, borderColor: tokens.color.fail }}>
DISCONNECT
</Btn>
) : (
<Btn
onClick={connect}
disabled={!selectedAgentId || state === "connecting"}
>
{state === "connecting" ? "CONNECTING..." : "CONNECT"}
</Btn>
{selectedProvider && (
<Label color={tokens.color.text2}>{selectedProvider}</Label>
)}
<div
style={{
display: "flex",
alignItems: "center",
gap: tokens.space[2],
marginLeft: "auto",
}}
>
<div
style={{
width: 7,
height: 7,
borderRadius: "50%",
background: statusColor,
boxShadow: state === "connected" ? tokens.color.accentGlow : undefined,
}}
/>
<Label color={statusColor} style={{ fontSize: tokens.size.xs }}>
{statusLabel}
</Label>
<div style={{ marginLeft: "auto", display: "flex", gap: tokens.space[2] }}>
{messages.length > 0 && (
<Btn variant="ghost" onClick={clearConversation} disabled={streaming}>
CLEAR
</Btn>
)}
</div>
</div>
{/* Terminal area */}
{/* Messages */}
<div
style={{
flex: 1,
background: tokens.color.bg1,
border: `1px solid ${tokens.color.border0}`,
borderRadius: 6,
overflow: "hidden",
position: "relative",
minHeight: 0,
overflowY: "auto",
padding: `${tokens.space[4]}px ${mobile ? tokens.space[3] : tokens.space[5]}px`,
display: "flex",
flexDirection: "column",
gap: tokens.space[4],
}}
>
<div
ref={termRef}
style={{
width: "100%",
height: "100%",
padding: 4,
}}
/>
{state === "idle" && (
{messages.length === 0 && !streaming && (
<div
style={{
position: "absolute",
inset: 0,
flex: 1,
display: "flex",
alignItems: "center",
justifyContent: "center",
}}
>
<Mono size={tokens.size.sm} color={tokens.color.text3}>
Select an agent and click CONNECT to start an interactive session
Select a model and start a conversation
</Mono>
</div>
)}
{messages.map((msg) => (
<MessageBubble key={msg.id} message={msg} />
))}
{/* Streaming message */}
{streaming && (streamText || streamToolCalls.length > 0) && (
<div style={{ display: "flex", flexDirection: "column", gap: tokens.space[2] }}>
<div
style={{
background: tokens.color.bg1,
border: `1px solid ${tokens.color.border0}`,
padding: `${tokens.space[3]}px ${tokens.space[4]}px`,
maxWidth: "85%",
}}
>
{streamToolCalls.length > 0 && (
<div style={{ marginBottom: streamText ? tokens.space[3] : 0 }}>
{streamToolCalls.map((tc) => (
<ToolCallBlock
key={tc.id}
toolCall={tc}
pending={pendingToolIds.has(tc.id)}
/>
))}
</div>
)}
{streamText && (
<div
style={{
fontFamily: tokens.font.mono,
fontSize: tokens.size.sm,
color: tokens.color.text1,
whiteSpace: "pre-wrap",
wordBreak: "break-word",
lineHeight: 1.6,
}}
>
{streamText}
<span
style={{
display: "inline-block",
width: 6,
height: 14,
background: tokens.color.accent,
marginLeft: 2,
animation: "hpulse 1s infinite",
}}
/>
</div>
)}
</div>
</div>
)}
<div ref={messagesEndRef} />
</div>
{/* Input area */}
<div
style={{
borderTop: `1px solid ${tokens.color.border0}`,
padding: `${tokens.space[3]}px ${mobile ? tokens.space[3] : tokens.space[5]}px`,
display: "flex",
gap: tokens.space[3],
alignItems: "flex-end",
}}
>
<textarea
ref={inputRef}
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyDown={handleKeyDown}
placeholder={
selectedModel ? "Message the control plane..." : "Select a model first"
}
disabled={!selectedModel}
rows={1}
style={{
flex: 1,
background: tokens.color.bg0,
border: `1px solid ${tokens.color.border0}`,
color: tokens.color.text0,
fontFamily: tokens.font.mono,
fontSize: tokens.size.base,
padding: `${tokens.space[3]}px`,
outline: "none",
resize: "none",
borderRadius: 0,
minHeight: tokens.touch.min,
maxHeight: 160,
boxSizing: "border-box",
opacity: selectedModel ? 1 : 0.4,
}}
/>
{streaming ? (
<Btn variant="danger" onClick={stopStreaming}>
STOP
</Btn>
) : (
<Btn
variant="primary"
onClick={sendMessage}
disabled={!input.trim() || !selectedModel}
>
SEND
</Btn>
)}
</div>
</div>
);
}
// ── Message Bubble ───────────────────────────────────────────
function MessageBubble({ message }: { message: ChatMessage }) {
const isUser = message.role === "user";
return (
<div
style={{
display: "flex",
flexDirection: "column",
alignItems: isUser ? "flex-end" : "flex-start",
gap: tokens.space[1],
}}
>
<div
style={{
background: isUser ? tokens.color.bg2 : tokens.color.bg1,
border: `1px solid ${tokens.color.border0}`,
padding: `${tokens.space[3]}px ${tokens.space[4]}px`,
maxWidth: "85%",
}}
>
{message.toolCalls && message.toolCalls.length > 0 && (
<div style={{ marginBottom: message.content ? tokens.space[3] : 0 }}>
{message.toolCalls.map((tc) => (
<ToolCallBlock key={tc.id} toolCall={tc} pending={false} />
))}
</div>
)}
<div
style={{
fontFamily: tokens.font.mono,
fontSize: tokens.size.sm,
color: tokens.color.text1,
whiteSpace: "pre-wrap",
wordBreak: "break-word",
lineHeight: 1.6,
}}
>
{message.content}
</div>
</div>
{message.model && (
<Label color={tokens.color.text3} style={{ fontSize: 11 }}>
{message.model}
</Label>
)}
</div>
);
}
// ── Tool Call Block ──────────────────────────────────────────
function ToolCallBlock({
toolCall,
pending,
}: {
toolCall: ToolCallRecord;
pending: boolean;
}) {
const [expanded, setExpanded] = useState(false);
return (
<div
style={{
border: `1px solid ${pending ? tokens.color.warnDim : tokens.color.border0}`,
background: tokens.color.bg0,
marginBottom: tokens.space[2],
}}
>
<div
onClick={() => setExpanded(!expanded)}
style={{
display: "flex",
alignItems: "center",
gap: tokens.space[2],
padding: `${tokens.space[1]}px ${tokens.space[3]}px`,
cursor: "pointer",
minHeight: 32,
}}
>
<span
style={{
color: tokens.color.text3,
fontSize: tokens.size.xs,
fontFamily: tokens.font.mono,
}}
>
{expanded ? "▾" : "▸"}
</span>
<Label
color={pending ? tokens.color.warn : tokens.color.purple}
>
{toolCall.name}
</Label>
{pending && (
<Label color={tokens.color.warn} style={{ fontSize: 11 }}>
running...
</Label>
)}
{!pending && toolCall.durationMs > 0 && (
<Label color={tokens.color.text3} style={{ fontSize: 11 }}>
{toolCall.durationMs}ms
</Label>
)}
</div>
{expanded && (
<div
style={{
borderTop: `1px solid ${tokens.color.border0}`,
padding: `${tokens.space[2]}px ${tokens.space[3]}px`,
}}
>
{Object.keys(toolCall.input).length > 0 && (
<div style={{ marginBottom: tokens.space[2] }}>
<Label color={tokens.color.text3}>Input</Label>
<pre
style={{
fontFamily: tokens.font.mono,
fontSize: tokens.size.xs,
color: tokens.color.text2,
margin: `${tokens.space[1]}px 0`,
whiteSpace: "pre-wrap",
wordBreak: "break-word",
}}
>
{JSON.stringify(toolCall.input, null, 2)}
</pre>
</div>
)}
{toolCall.output && (
<div>
<Label color={tokens.color.text3}>Output</Label>
<pre
style={{
fontFamily: tokens.font.mono,
fontSize: tokens.size.xs,
color: tokens.color.text2,
margin: `${tokens.space[1]}px 0`,
whiteSpace: "pre-wrap",
wordBreak: "break-word",
maxHeight: 200,
overflowY: "auto",
}}
>
{toolCall.output}
</pre>
</div>
)}
</div>
)}
</div>
);
}

View File

@@ -0,0 +1,340 @@
import type { StreamEvent } from "./chat-types";
// ── SSE line parser ──────────────────────────────────────────
async function* sseLines(
body: ReadableStream<Uint8Array>,
): AsyncGenerator<{ event?: string; data: string }> {
const reader = body.getReader();
const decoder = new TextDecoder();
let buf = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
const parts = buf.split("\n");
buf = parts.pop()!;
let event: string | undefined;
let dataLines: string[] = [];
for (const line of parts) {
if (line.startsWith("event:")) {
event = line.slice(6).trim();
} else if (line.startsWith("data:")) {
dataLines.push(line.slice(5).trim());
} else if (line === "") {
if (dataLines.length > 0) {
yield { event, data: dataLines.join("\n") };
event = undefined;
dataLines = [];
}
}
}
}
// Flush remaining
if (buf.startsWith("data:")) {
yield { data: buf.slice(5).trim() };
}
} finally {
reader.releaseLock();
}
}
// ── Anthropic ────────────────────────────────────────────────
interface AnthropicMessage {
role: "user" | "assistant";
content: string | Array<Record<string, unknown>>;
}
export interface ProviderMessages {
role: "user" | "assistant";
content: string;
toolCalls?: Array<{ id: string; name: string; input: Record<string, unknown> }>;
toolResults?: Array<{ id: string; content: string }>;
}
function toAnthropicMessages(messages: ProviderMessages[]): AnthropicMessage[] {
const out: AnthropicMessage[] = [];
for (const msg of messages) {
if (msg.role === "user" && msg.toolResults) {
// Tool results go as user message with tool_result blocks
const blocks: Array<Record<string, unknown>> = msg.toolResults.map((r) => ({
type: "tool_result",
tool_use_id: r.id,
content: r.content,
}));
out.push({ role: "user", content: blocks });
} else if (msg.role === "assistant" && msg.toolCalls) {
// Assistant message with tool use blocks
const blocks: Array<Record<string, unknown>> = [];
if (msg.content) {
blocks.push({ type: "text", text: msg.content });
}
for (const tc of msg.toolCalls) {
blocks.push({ type: "tool_use", id: tc.id, name: tc.name, input: tc.input });
}
out.push({ role: "assistant", content: blocks });
} else {
out.push({ role: msg.role, content: msg.content });
}
}
return out;
}
export async function* streamAnthropic(
apiKey: string,
model: string,
systemPrompt: string,
messages: ProviderMessages[],
tools: Array<{ name: string; description: string; input_schema: Record<string, unknown> }>,
): AsyncGenerator<StreamEvent> {
const res = await fetch("https://api.anthropic.com/v1/messages", {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-api-key": apiKey,
"anthropic-version": "2023-06-01",
},
body: JSON.stringify({
model,
max_tokens: 8192,
system: systemPrompt,
messages: toAnthropicMessages(messages),
tools,
stream: true,
}),
});
if (!res.ok) {
const text = await res.text();
yield { type: "error", message: `Anthropic API error ${res.status}: ${text}` };
return;
}
let currentToolId = "";
let currentToolName = "";
let toolInputJson = "";
let usage = { input: 0, output: 0 };
for await (const { event, data } of sseLines(res.body!)) {
if (data === "[DONE]") break;
let parsed: Record<string, any>;
try {
parsed = JSON.parse(data);
} catch {
continue;
}
if (event === "message_start" && parsed.message?.usage) {
usage.input = parsed.message.usage.input_tokens ?? 0;
}
if (event === "content_block_start") {
const block = parsed.content_block;
if (block?.type === "tool_use") {
currentToolId = block.id;
currentToolName = block.name;
toolInputJson = "";
}
}
if (event === "content_block_delta") {
const delta = parsed.delta;
if (delta?.type === "text_delta") {
yield { type: "text_delta", content: delta.text };
} else if (delta?.type === "input_json_delta") {
toolInputJson += delta.partial_json;
}
}
if (event === "content_block_stop" && currentToolId) {
let input: Record<string, unknown> = {};
try {
input = toolInputJson ? JSON.parse(toolInputJson) : {};
} catch { /* empty */ }
yield {
type: "tool_call_start",
id: currentToolId,
name: currentToolName,
input,
};
currentToolId = "";
currentToolName = "";
toolInputJson = "";
}
if (event === "message_delta" && parsed.usage) {
usage.output = parsed.usage.output_tokens ?? 0;
}
}
yield { type: "message_end", model, usage };
}
// ── OpenAI ───────────────────────────────────────────────────
interface OpenAIMessage {
role: "user" | "assistant" | "system" | "tool";
content?: string | null;
tool_calls?: Array<{ id: string; type: "function"; function: { name: string; arguments: string } }>;
tool_call_id?: string;
}
function toOpenAIMessages(
systemPrompt: string,
messages: ProviderMessages[],
): OpenAIMessage[] {
const out: OpenAIMessage[] = [{ role: "system", content: systemPrompt }];
for (const msg of messages) {
if (msg.role === "assistant" && msg.toolCalls) {
out.push({
role: "assistant",
content: msg.content || null,
tool_calls: msg.toolCalls.map((tc) => ({
id: tc.id,
type: "function" as const,
function: { name: tc.name, arguments: JSON.stringify(tc.input) },
})),
});
} else if (msg.role === "user" && msg.toolResults) {
for (const r of msg.toolResults) {
out.push({ role: "tool", content: r.content, tool_call_id: r.id });
}
} else {
out.push({ role: msg.role, content: msg.content });
}
}
return out;
}
export async function* streamOpenAI(
apiKey: string,
baseUrl: string,
model: string,
systemPrompt: string,
messages: ProviderMessages[],
tools: Array<{
type: "function";
function: { name: string; description: string; parameters: Record<string, unknown> };
}>,
): AsyncGenerator<StreamEvent> {
const url = `${baseUrl.replace(/\/$/, "")}/v1/chat/completions`;
const res = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${apiKey}`,
},
body: JSON.stringify({
model,
messages: toOpenAIMessages(systemPrompt, messages),
tools,
stream: true,
stream_options: { include_usage: true },
}),
});
if (!res.ok) {
const text = await res.text();
yield { type: "error", message: `OpenAI API error ${res.status}: ${text}` };
return;
}
// Accumulate tool calls across deltas
const pendingTools = new Map<number, { id: string; name: string; args: string }>();
let usage = { input: 0, output: 0 };
for await (const { data } of sseLines(res.body!)) {
if (data === "[DONE]") break;
let parsed: Record<string, any>;
try {
parsed = JSON.parse(data);
} catch {
continue;
}
if (parsed.usage) {
usage.input = parsed.usage.prompt_tokens ?? 0;
usage.output = parsed.usage.completion_tokens ?? 0;
}
const choice = parsed.choices?.[0];
if (!choice) continue;
const delta = choice.delta;
if (!delta) continue;
// Text content
if (delta.content) {
yield { type: "text_delta", content: delta.content };
}
// Tool call deltas
if (delta.tool_calls) {
for (const tc of delta.tool_calls) {
const idx = tc.index ?? 0;
if (!pendingTools.has(idx)) {
pendingTools.set(idx, { id: tc.id || "", name: tc.function?.name || "", args: "" });
}
const pending = pendingTools.get(idx)!;
if (tc.id) pending.id = tc.id;
if (tc.function?.name) pending.name = tc.function.name;
if (tc.function?.arguments) pending.args += tc.function.arguments;
}
}
// When finish_reason is "tool_calls", emit all accumulated tool calls
if (choice.finish_reason === "tool_calls") {
for (const [, tc] of pendingTools) {
let input: Record<string, unknown> = {};
try {
input = tc.args ? JSON.parse(tc.args) : {};
} catch { /* empty */ }
yield { type: "tool_call_start", id: tc.id, name: tc.name, input };
}
pendingTools.clear();
}
}
// Emit any remaining tool calls (some providers use "stop" instead of "tool_calls")
for (const [, tc] of pendingTools) {
let input: Record<string, unknown> = {};
try {
input = tc.args ? JSON.parse(tc.args) : {};
} catch { /* empty */ }
yield { type: "tool_call_start", id: tc.id, name: tc.name, input };
}
yield { type: "message_end", model, usage };
}
// ── Provider router ──────────────────────────────────────────
const PROVIDER_BASE_URLS: Record<string, string> = {
openai: "https://api.openai.com",
openrouter: "https://openrouter.ai/api",
google: "https://generativelanguage.googleapis.com",
};
export type ChatProvider = "anthropic" | "openai" | "openrouter" | "google";
export function isAnthropicProvider(provider: string): boolean {
return provider === "anthropic";
}
export function getBaseUrl(provider: string, credentialBaseUrl?: string): string {
return credentialBaseUrl || PROVIDER_BASE_URLS[provider] || "https://api.openai.com";
}

View File

@@ -0,0 +1,683 @@
import { z } from "zod";
import { eq, and } from "drizzle-orm";
import { db } from "./db";
import {
tasks as tasksTable,
iterations as iterationsTable,
agentConfigs as agentTable,
curatedModels as modelsTable,
orchestratorState as orchTable,
} from "@homelab/db";
import { readFile, writeFile, readdir, mkdir, stat, access } from "node:fs/promises";
import { execFile } from "node:child_process";
import path from "node:path";
// ── Types ────────────────────────────────────────────────────
export interface ToolResult {
text: string;
isError?: boolean;
}
export interface ToolDef {
name: string;
description: string;
inputSchema?: z.ZodType;
execute: (input: Record<string, unknown>) => Promise<ToolResult>;
}
// ── Helpers ──────────────────────────────────────────────────
const KNOWLEDGE_DIR = () => process.env.HARNESS_KNOWLEDGE_DIR || "";
function taskSummary(row: typeof tasksTable.$inferSelect) {
return {
id: row.id,
slug: row.slug,
goal: row.goal,
status: row.status,
project: row.project,
iteration: row.iteration,
maxIterations: row.maxIterations,
startedAt: row.startedAt,
completedAt: row.completedAt,
};
}
// ── Tool Implementations ─────────────────────────────────────
const knowledgeList: ToolDef = {
name: "knowledge_list",
description: "List all knowledge documents available in the harness knowledge base",
async execute() {
const dir = KNOWLEDGE_DIR();
if (!dir) return { text: "HARNESS_KNOWLEDGE_DIR not configured" };
try {
const entries = await readdir(dir, { withFileTypes: true });
const files = entries.filter((e) => e.isFile()).map((e) => e.name);
return { text: files.length > 0 ? files.join("\n") : "(empty)" };
} catch (err) {
return { text: `Error listing knowledge dir: ${err}`, isError: true };
}
},
};
const knowledgeRead: ToolDef = {
name: "knowledge_read",
description: "Read a specific knowledge document by filename",
inputSchema: z.object({
filename: z.string().describe("Filename of the knowledge document to read"),
}),
async execute({ filename }) {
const dir = KNOWLEDGE_DIR();
if (!dir) return { text: "HARNESS_KNOWLEDGE_DIR not configured", isError: true };
const filePath = path.resolve(dir, filename as string);
if (!filePath.startsWith(path.resolve(dir))) {
return { text: "Invalid path", isError: true };
}
try {
const content = await readFile(filePath, "utf-8");
return { text: content };
} catch (err) {
return { text: `Error reading ${filename}: ${err}`, isError: true };
}
},
};
const knowledgeWrite: ToolDef = {
name: "knowledge_write",
description: "Create or update a knowledge document in the harness knowledge base",
inputSchema: z.object({
filename: z.string().describe("Filename for the knowledge document (e.g. 'findings.md')"),
content: z.string().describe("Content to write to the knowledge document"),
}),
async execute({ filename, content }) {
const dir = KNOWLEDGE_DIR();
if (!dir) return { text: "HARNESS_KNOWLEDGE_DIR not configured", isError: true };
const filePath = path.resolve(dir, filename as string);
if (!filePath.startsWith(path.resolve(dir))) {
return { text: "Invalid path", isError: true };
}
try {
await mkdir(path.dirname(filePath), { recursive: true });
await writeFile(filePath, content as string, "utf-8");
return { text: `Wrote ${filename} (${(content as string).length} bytes)` };
} catch (err) {
return { text: `Error writing ${filename}: ${err}`, isError: true };
}
},
};
const knowledgeSearch: ToolDef = {
name: "knowledge_search",
description: "Search across all knowledge documents for a text pattern (case-insensitive substring match)",
inputSchema: z.object({
query: z.string().describe("Text to search for across all knowledge documents"),
}),
async execute({ query }) {
const dir = KNOWLEDGE_DIR();
if (!dir) return { text: "HARNESS_KNOWLEDGE_DIR not configured" };
try {
const entries = await readdir(dir, { withFileTypes: true });
const files = entries.filter((e) => e.isFile());
const results: string[] = [];
const lowerQuery = (query as string).toLowerCase();
for (const file of files) {
const filePath = path.join(dir, file.name);
const content = await readFile(filePath, "utf-8");
const lines = content.split("\n");
const matches = lines
.map((line, i) => ({ line, lineNum: i + 1 }))
.filter(({ line }) => line.toLowerCase().includes(lowerQuery));
if (matches.length > 0) {
results.push(
`## ${file.name}\n` +
matches
.slice(0, 10)
.map(({ line, lineNum }) => ` L${lineNum}: ${line.trim()}`)
.join("\n") +
(matches.length > 10 ? `\n ... and ${matches.length - 10} more matches` : ""),
);
}
}
return {
text: results.length > 0 ? results.join("\n\n") : `No matches for "${query}"`,
};
} catch (err) {
return { text: `Error searching: ${err}`, isError: true };
}
},
};
const taskList: ToolDef = {
name: "task_list",
description: "List all harness tasks with their current status and evaluation results",
async execute() {
const rows = await db.select().from(tasksTable);
return { text: JSON.stringify(rows.map(taskSummary), null, 2) };
},
};
const taskGet: ToolDef = {
name: "task_get",
description: "Get full details for a harness task including iteration history and evaluations",
inputSchema: z.object({
taskId: z.string().describe("Task ID to look up"),
}),
async execute({ taskId }) {
const [taskRow] = await db.select().from(tasksTable).where(eq(tasksTable.id, taskId as string));
if (!taskRow) return { text: `Task ${taskId} not found`, isError: true };
const iters = await db
.select()
.from(iterationsTable)
.where(eq(iterationsTable.taskId, taskId as string));
const result = {
...taskSummary(taskRow),
spec: taskRow.spec,
evals: taskRow.evals,
pr: taskRow.pr,
iterations: iters
.sort((a, b) => a.n - b.n)
.map((i) => ({
n: i.n,
status: i.status,
diagnosis: i.diagnosis,
evals: i.evals,
diffStats: i.diffStats,
agentOutput: i.agentOutput ? i.agentOutput.slice(-4000) : null,
startedAt: i.startedAt,
completedAt: i.completedAt,
})),
};
return { text: JSON.stringify(result, null, 2) };
},
};
const taskCreate: ToolDef = {
name: "task_create",
description:
"Create a new harness task. The task will be created in 'pending' status and can be started with task_start.",
inputSchema: z.object({
slug: z.string().describe("Unique short identifier for the task (e.g. 'fix-auth-bug')"),
goal: z.string().describe("High-level description of what the task should accomplish"),
project: z.string().describe("Repository in 'owner/repo' format"),
agentId: z.string().describe("ID of the agent configuration to use"),
maxIterations: z.number().optional().describe("Maximum iterations before giving up (default: 6)"),
criteria: z
.array(
z.object({
label: z.string().describe("Criterion name"),
target: z.string().describe("Evaluation target DSL (e.g. 'exitCode:0', 'filesChanged:>0')"),
}),
)
.optional()
.describe("Success criteria for evaluation"),
constraints: z.array(z.string()).optional().describe("Implementation constraints"),
knowledgeRefs: z.array(z.string()).optional().describe("Knowledge document filenames to include in prompt"),
gitProvider: z.enum(["github", "gitlab", "gitea"]).optional().describe("Git provider (default: github)"),
gitBaseUrl: z.string().optional().describe("Base URL for the git provider API"),
}),
async execute(args) {
const spec = {
slug: args.slug as string,
goal: args.goal as string,
project: args.project as string,
agentId: args.agentId as string,
maxIterations: (args.maxIterations as number) ?? 6,
criteria: (args.criteria as Array<{ label: string; target: string }>) ?? [],
constraints: (args.constraints as string[]) ?? [],
knowledgeRefs: (args.knowledgeRefs as string[]) ?? [],
gitProvider: args.gitProvider as string | undefined,
gitBaseUrl: args.gitBaseUrl as string | undefined,
};
const taskId = `task-${Date.now()}`;
await db.insert(tasksTable).values({
id: taskId,
slug: spec.slug,
goal: spec.goal,
project: spec.project,
status: "pending",
iteration: 0,
maxIterations: spec.maxIterations,
startedAt: null,
evals: {},
spec,
});
return { text: JSON.stringify({ id: taskId, status: "pending", slug: spec.slug }) };
},
};
const taskStart: ToolDef = {
name: "task_start",
description:
"Ensure the orchestrator is running so it will pick up pending tasks. Sets orchestrator state to running in the database.",
async execute() {
await db
.insert(orchTable)
.values({ id: "singleton", running: false })
.onConflictDoNothing();
await db
.update(orchTable)
.set({ running: true, updatedAt: new Date() })
.where(eq(orchTable.id, "singleton"));
return { text: JSON.stringify({ ok: true, message: "Orchestrator set to running — pending tasks will be picked up" }) };
},
};
const taskStop: ToolDef = {
name: "task_stop",
description: "Request cancellation of a running harness task. Sets cancel_requested flag which the orchestrator polls.",
inputSchema: z.object({
taskId: z.string().describe("ID of the running task to cancel"),
}),
async execute({ taskId }) {
const result = await db
.update(tasksTable)
.set({ cancelRequested: true, updatedAt: new Date() })
.where(and(eq(tasksTable.id, taskId as string), eq(tasksTable.status, "running")));
const rowCount = (result as unknown as { rowCount: number }).rowCount;
if (rowCount === 0) {
return { text: `Task ${taskId} is not running or not found`, isError: true };
}
return { text: JSON.stringify({ ok: true, message: "Cancellation requested" }) };
},
};
const agentList: ToolDef = {
name: "agent_list",
description: "List all configured agent runtimes (agent configs with runtime, model, and provider)",
async execute() {
const rows = await db.select().from(agentTable);
const agents = rows.map((r) => ({
id: r.id,
name: r.name,
runtime: r.runtime,
modelId: r.modelId,
provider: r.provider,
}));
return { text: JSON.stringify(agents, null, 2) };
},
};
const modelList: ToolDef = {
name: "model_list",
description: "List available AI models with pricing information",
async execute() {
const rows = await db.select().from(modelsTable).where(eq(modelsTable.enabled, true));
const models = rows.map((r) => ({
id: r.id,
name: r.name,
provider: r.provider,
contextWindow: r.contextWindow,
costPer1kInput: r.costPer1kInput,
costPer1kOutput: r.costPer1kOutput,
}));
return { text: JSON.stringify(models, null, 2) };
},
};
// ── Web Tools ────────────────────────────────────────────────
const webFetch: ToolDef = {
name: "web_fetch",
description:
"Fetch a URL and return its content. Supports HTML pages (returns text), JSON APIs, and plain text. Useful for checking documentation, API responses, or service status pages.",
inputSchema: z.object({
url: z.string().describe("The URL to fetch"),
headers: z
.record(z.string(), z.string())
.optional()
.describe("Optional HTTP headers to include"),
}),
async execute({ url, headers }) {
try {
const res = await fetch(url as string, {
headers: (headers as Record<string, string>) || {},
signal: AbortSignal.timeout(15_000),
});
const contentType = res.headers.get("content-type") || "";
let body: string;
if (contentType.includes("json")) {
body = JSON.stringify(await res.json(), null, 2);
} else {
body = await res.text();
// Truncate very large HTML pages
if (body.length > 32_000) {
body = body.slice(0, 32_000) + "\n... (truncated)";
}
}
return { text: `HTTP ${res.status} ${res.statusText}\nContent-Type: ${contentType}\n\n${body}` };
} catch (err) {
return { text: `Fetch error: ${err}`, isError: true };
}
},
};
const webSearch: ToolDef = {
name: "web_search",
description:
"Search the web using DuckDuckGo. Returns a list of results with titles, URLs, and snippets. Good for finding documentation, error solutions, or current information.",
inputSchema: z.object({
query: z.string().describe("Search query"),
maxResults: z.number().optional().describe("Maximum results to return (default: 5)"),
}),
async execute({ query, maxResults }) {
const max = (maxResults as number) || 5;
try {
// Use DuckDuckGo HTML search (no API key needed)
const encoded = encodeURIComponent(query as string);
const res = await fetch(`https://html.duckduckgo.com/html/?q=${encoded}`, {
headers: { "User-Agent": "Harness/1.0" },
signal: AbortSignal.timeout(10_000),
});
const html = await res.text();
// Parse results from DDG HTML
const results: Array<{ title: string; url: string; snippet: string }> = [];
const resultRegex = /<a[^>]+class="result__a"[^>]*href="([^"]*)"[^>]*>(.*?)<\/a>[\s\S]*?<a[^>]+class="result__snippet"[^>]*>(.*?)<\/a>/g;
let match;
while ((match = resultRegex.exec(html)) !== null && results.length < max) {
const rawUrl = match[1];
// DDG wraps URLs in a redirect — extract the actual URL
const urlMatch = rawUrl.match(/uddg=([^&]+)/);
const url = urlMatch ? decodeURIComponent(urlMatch[1]) : rawUrl;
results.push({
title: match[2].replace(/<[^>]+>/g, "").trim(),
url,
snippet: match[3].replace(/<[^>]+>/g, "").trim(),
});
}
if (results.length === 0) {
return { text: `No results found for "${query}"` };
}
const formatted = results
.map((r, i) => `${i + 1}. **${r.title}**\n ${r.url}\n ${r.snippet}`)
.join("\n\n");
return { text: formatted };
} catch (err) {
return { text: `Search error: ${err}`, isError: true };
}
},
};
// ── Shell Tools ──────────────────────────────────────────────
const SHELL_TIMEOUT_MS = 30_000;
const SHELL_MAX_OUTPUT = 64_000;
function runCommand(
command: string,
args: string[],
cwd: string,
timeoutMs: number,
): Promise<{ stdout: string; stderr: string; exitCode: number }> {
return new Promise((resolve) => {
const child = execFile(command, args, {
cwd,
timeout: timeoutMs,
maxBuffer: SHELL_MAX_OUTPUT * 2,
shell: true,
env: { ...process.env, TERM: "dumb" },
}, (err, stdout, stderr) => {
let exitCode = 0;
if (err) {
exitCode = (err as any).code === "ERR_CHILD_PROCESS_STDIO_MAXBUFFER" ? 1 : (err as any).status ?? 1;
}
let out = stdout || "";
let errOut = stderr || "";
if (out.length > SHELL_MAX_OUTPUT) out = out.slice(0, SHELL_MAX_OUTPUT) + "\n... (truncated)";
if (errOut.length > SHELL_MAX_OUTPUT) errOut = errOut.slice(0, SHELL_MAX_OUTPUT) + "\n... (truncated)";
resolve({ stdout: out, stderr: errOut, exitCode });
});
});
}
const WORK_DIR = () => process.env.HARNESS_WORK_DIR || "/data/harness";
const shellExec: ToolDef = {
name: "shell_exec",
description:
"Execute a shell command and return its stdout, stderr, and exit code. Commands run in the harness work directory. Use for git operations, checking service status, running scripts, etc. Timeout is 30 seconds.",
inputSchema: z.object({
command: z.string().describe("The shell command to execute (e.g. 'git status', 'ls -la', 'kubectl get pods')"),
cwd: z.string().optional().describe("Working directory (defaults to HARNESS_WORK_DIR)"),
}),
async execute({ command, cwd }) {
const workDir = (cwd as string) || WORK_DIR();
try {
const { stdout, stderr, exitCode } = await runCommand(
"/bin/sh",
["-c", command as string],
workDir,
SHELL_TIMEOUT_MS,
);
const parts: string[] = [];
parts.push(`exit code: ${exitCode}`);
if (stdout) parts.push(`stdout:\n${stdout}`);
if (stderr) parts.push(`stderr:\n${stderr}`);
return { text: parts.join("\n\n") };
} catch (err) {
return { text: `Shell error: ${err}`, isError: true };
}
},
};
// ── Filesystem Tools ─────────────────────────────────────────
const fsRead: ToolDef = {
name: "fs_read",
description:
"Read a file from the filesystem. Returns the file content as text. For large files, use the offset and limit parameters to read a portion.",
inputSchema: z.object({
path: z.string().describe("Absolute path to the file to read"),
offset: z.number().optional().describe("Line number to start reading from (1-based, default: 1)"),
limit: z.number().optional().describe("Maximum number of lines to return (default: all)"),
}),
async execute({ path: filePath, offset, limit }) {
try {
const content = await readFile(filePath as string, "utf-8");
const lines = content.split("\n");
const start = Math.max(0, ((offset as number) || 1) - 1);
const end = limit ? start + (limit as number) : lines.length;
const slice = lines.slice(start, end);
const numbered = slice
.map((line, i) => `${String(start + i + 1).padStart(5)}${line}`)
.join("\n");
const header = `${filePath} (${lines.length} lines total)`;
if (start > 0 || end < lines.length) {
return { text: `${header}, showing lines ${start + 1}-${Math.min(end, lines.length)}:\n${numbered}` };
}
return { text: `${header}:\n${numbered}` };
} catch (err) {
return { text: `Error reading file: ${err}`, isError: true };
}
},
};
const fsWrite: ToolDef = {
name: "fs_write",
description:
"Write content to a file. Creates the file and parent directories if they don't exist. Overwrites existing content.",
inputSchema: z.object({
path: z.string().describe("Absolute path to the file to write"),
content: z.string().describe("Content to write to the file"),
}),
async execute({ path: filePath, content }) {
try {
await mkdir(path.dirname(filePath as string), { recursive: true });
await writeFile(filePath as string, content as string, "utf-8");
return { text: `Wrote ${filePath} (${(content as string).length} bytes)` };
} catch (err) {
return { text: `Error writing file: ${err}`, isError: true };
}
},
};
const fsList: ToolDef = {
name: "fs_list",
description:
"List files and directories at a given path. Returns names with type indicators (/ for directories). Non-recursive by default.",
inputSchema: z.object({
path: z.string().describe("Absolute directory path to list"),
recursive: z.boolean().optional().describe("List recursively (default: false, max depth 3)"),
}),
async execute({ path: dirPath, recursive }) {
try {
const entries = await readdir(dirPath as string, { withFileTypes: true });
if (!recursive) {
const items = entries
.map((e) => (e.isDirectory() ? `${e.name}/` : e.name))
.sort();
return { text: items.join("\n") || "(empty directory)" };
}
// Recursive with depth limit
const results: string[] = [];
async function walk(dir: string, prefix: string, depth: number) {
if (depth > 3) return;
const ents = await readdir(dir, { withFileTypes: true });
for (const e of ents.sort((a, b) => a.name.localeCompare(b.name))) {
if (e.name.startsWith(".") || e.name === "node_modules") continue;
const rel = prefix ? `${prefix}/${e.name}` : e.name;
if (e.isDirectory()) {
results.push(`${rel}/`);
await walk(path.join(dir, e.name), rel, depth + 1);
} else {
results.push(rel);
}
if (results.length > 500) return;
}
}
await walk(dirPath as string, "", 0);
const text = results.join("\n") || "(empty directory)";
return { text: results.length > 500 ? text + "\n... (truncated at 500 entries)" : text };
} catch (err) {
return { text: `Error listing directory: ${err}`, isError: true };
}
},
};
const fsSearch: ToolDef = {
name: "fs_search",
description:
"Search for files by name pattern (glob-style) or search file contents with grep. Useful for finding files across a project.",
inputSchema: z.object({
path: z.string().describe("Root directory to search in"),
pattern: z.string().describe("Search pattern — either a filename glob (e.g. '*.ts') or a content grep pattern"),
mode: z.enum(["filename", "content"]).optional().describe("Search mode: 'filename' for glob matching, 'content' for grep (default: filename)"),
maxResults: z.number().optional().describe("Maximum results (default: 20)"),
}),
async execute({ path: searchPath, pattern, mode, maxResults }) {
const max = (maxResults as number) || 20;
const searchMode = (mode as string) || "filename";
try {
if (searchMode === "content") {
// Use grep
const { stdout, exitCode } = await runCommand(
"grep",
["-rn", "--include=*", "-m", String(max), pattern as string, searchPath as string],
searchPath as string,
10_000,
);
if (!stdout.trim()) return { text: `No content matches for "${pattern}"` };
return { text: stdout.trim() };
} else {
// Use find for filename matching
const { stdout } = await runCommand(
"find",
[searchPath as string, "-name", pattern as string, "-not", "-path", "*/node_modules/*", "-not", "-path", "*/.git/*"],
searchPath as string,
10_000,
);
const files = stdout.trim().split("\n").filter(Boolean).slice(0, max);
if (files.length === 0) return { text: `No files matching "${pattern}"` };
return { text: files.join("\n") };
}
} catch (err) {
return { text: `Search error: ${err}`, isError: true };
}
},
};
// ── Registry ─────────────────────────────────────────────────
export const toolRegistry: ToolDef[] = [
// Harness knowledge
knowledgeList,
knowledgeRead,
knowledgeWrite,
knowledgeSearch,
// Task orchestration
taskList,
taskGet,
taskCreate,
taskStart,
taskStop,
// Agent & model discovery
agentList,
modelList,
// Web
webFetch,
webSearch,
// Shell
shellExec,
// Filesystem
fsRead,
fsWrite,
fsList,
fsSearch,
];
export async function executeToolCall(
name: string,
input: Record<string, unknown>,
): Promise<ToolResult> {
const tool = toolRegistry.find((t) => t.name === name);
if (!tool) return { text: `Unknown tool: ${name}`, isError: true };
return tool.execute(input);
}
// ── Provider format conversions ──────────────────────────────
export function toolsAsAnthropic(): Array<{
name: string;
description: string;
input_schema: Record<string, unknown>;
}> {
return toolRegistry.map((t) => ({
name: t.name,
description: t.description,
input_schema: t.inputSchema
? (z.toJSONSchema(t.inputSchema) as Record<string, unknown>)
: { type: "object", properties: {} },
}));
}
export function toolsAsOpenAI(): Array<{
type: "function";
function: { name: string; description: string; parameters: Record<string, unknown> };
}> {
return toolRegistry.map((t) => ({
type: "function" as const,
function: {
name: t.name,
description: t.description,
parameters: t.inputSchema
? (z.toJSONSchema(t.inputSchema) as Record<string, unknown>)
: { type: "object", properties: {} },
},
}));
}

View File

@@ -0,0 +1,35 @@
export type StreamEvent =
| { type: "text_delta"; content: string }
| { type: "tool_call_start"; id: string; name: string; input: Record<string, unknown> }
| { type: "tool_call_end"; id: string; output: string; durationMs: number }
| { type: "message_end"; model: string; usage?: { input: number; output: number } }
| { type: "error"; message: string };
export interface ChatMessage {
id: string;
role: "user" | "assistant";
content: string;
toolCalls?: ToolCallRecord[];
model?: string;
timestamp: number;
}
export interface ToolCallRecord {
id: string;
name: string;
input: Record<string, unknown>;
output: string;
durationMs: number;
}
/** Wire format for messages sent to/from the API route */
export interface ApiMessage {
role: "user" | "assistant" | "tool_result";
content: string;
toolCallId?: string;
toolCalls?: Array<{
id: string;
name: string;
input: Record<string, unknown>;
}>;
}

View File

@@ -13,364 +13,53 @@
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
import { drizzle } from "drizzle-orm/postgres-js";
import { eq, and } from "drizzle-orm";
import postgres from "postgres";
import {
tasks as tasksTable,
iterations as iterationsTable,
agentConfigs as agentTable,
curatedModels as modelsTable,
orchestratorState as orchTable,
} from "@homelab/db";
import { readFile, writeFile, readdir, mkdir, stat } from "node:fs/promises";
import path from "node:path";
import { toolRegistry } from "./lib/chat-tools";
// ── Database ────────────────────────────────────────────────────
// ── Database check ───────────────────────────────────────────
const connectionString = process.env.DATABASE_URL;
if (!connectionString) {
if (!process.env.DATABASE_URL) {
console.error("DATABASE_URL is required");
process.exit(1);
}
const client = postgres(connectionString);
const db = drizzle(client);
// ── Knowledge dir ───────────────────────────────────────────────
const KNOWLEDGE_DIR = process.env.HARNESS_KNOWLEDGE_DIR || "";
// ── Helpers ─────────────────────────────────────────────────────
function taskSummary(row: typeof tasksTable.$inferSelect) {
return {
id: row.id,
slug: row.slug,
goal: row.goal,
status: row.status,
project: row.project,
iteration: row.iteration,
maxIterations: row.maxIterations,
startedAt: row.startedAt,
completedAt: row.completedAt,
};
}
// ── Server ──────────────────────────────────────────────────────
// ── Server ──────────────────────────────────────────────────
const server = new McpServer({
name: "harness",
version: "1.0.0",
});
// ── Knowledge Tools ─────────────────────────────────────────────
// Register all tools from the shared registry
for (const tool of toolRegistry) {
if (tool.inputSchema) {
server.tool(
tool.name,
tool.description,
tool.inputSchema as any,
async (input: Record<string, unknown>) => {
const result = await tool.execute(input);
return {
content: [{ type: "text" as const, text: result.text }],
isError: result.isError,
};
},
);
} else {
server.tool(
tool.name,
tool.description,
async () => {
const result = await tool.execute({});
return {
content: [{ type: "text" as const, text: result.text }],
isError: result.isError,
};
},
);
}
}
server.tool(
"knowledge_list",
"List all knowledge documents available in the harness knowledge base",
async () => {
if (!KNOWLEDGE_DIR) {
return { content: [{ type: "text", text: "HARNESS_KNOWLEDGE_DIR not configured" }] };
}
try {
const entries = await readdir(KNOWLEDGE_DIR, { withFileTypes: true });
const files = entries
.filter((e) => e.isFile())
.map((e) => e.name);
return {
content: [{ type: "text", text: files.length > 0 ? files.join("\n") : "(empty)" }],
};
} catch (err) {
return { content: [{ type: "text", text: `Error listing knowledge dir: ${err}` }], isError: true };
}
},
);
server.tool(
"knowledge_read",
"Read a specific knowledge document by filename",
{ filename: z.string().describe("Filename of the knowledge document to read") },
async ({ filename }) => {
if (!KNOWLEDGE_DIR) {
return { content: [{ type: "text", text: "HARNESS_KNOWLEDGE_DIR not configured" }], isError: true };
}
const filePath = path.resolve(KNOWLEDGE_DIR, filename);
// Prevent path traversal
if (!filePath.startsWith(path.resolve(KNOWLEDGE_DIR))) {
return { content: [{ type: "text", text: "Invalid path" }], isError: true };
}
try {
const content = await readFile(filePath, "utf-8");
return { content: [{ type: "text", text: content }] };
} catch (err) {
return { content: [{ type: "text", text: `Error reading ${filename}: ${err}` }], isError: true };
}
},
);
server.tool(
"knowledge_write",
"Create or update a knowledge document in the harness knowledge base",
{
filename: z.string().describe("Filename for the knowledge document (e.g. 'findings.md')"),
content: z.string().describe("Content to write to the knowledge document"),
},
async ({ filename, content }) => {
if (!KNOWLEDGE_DIR) {
return { content: [{ type: "text", text: "HARNESS_KNOWLEDGE_DIR not configured" }], isError: true };
}
const filePath = path.resolve(KNOWLEDGE_DIR, filename);
if (!filePath.startsWith(path.resolve(KNOWLEDGE_DIR))) {
return { content: [{ type: "text", text: "Invalid path" }], isError: true };
}
try {
await mkdir(path.dirname(filePath), { recursive: true });
await writeFile(filePath, content, "utf-8");
return { content: [{ type: "text", text: `Wrote ${filename} (${content.length} bytes)` }] };
} catch (err) {
return { content: [{ type: "text", text: `Error writing ${filename}: ${err}` }], isError: true };
}
},
);
server.tool(
"knowledge_search",
"Search across all knowledge documents for a text pattern (case-insensitive substring match)",
{ query: z.string().describe("Text to search for across all knowledge documents") },
async ({ query }) => {
if (!KNOWLEDGE_DIR) {
return { content: [{ type: "text", text: "HARNESS_KNOWLEDGE_DIR not configured" }], isError: true };
}
try {
const entries = await readdir(KNOWLEDGE_DIR, { withFileTypes: true });
const files = entries.filter((e) => e.isFile());
const results: string[] = [];
const lowerQuery = query.toLowerCase();
for (const file of files) {
const filePath = path.join(KNOWLEDGE_DIR, file.name);
const content = await readFile(filePath, "utf-8");
const lines = content.split("\n");
const matches = lines
.map((line, i) => ({ line, lineNum: i + 1 }))
.filter(({ line }) => line.toLowerCase().includes(lowerQuery));
if (matches.length > 0) {
results.push(
`## ${file.name}\n` +
matches
.slice(0, 10)
.map(({ line, lineNum }) => ` L${lineNum}: ${line.trim()}`)
.join("\n") +
(matches.length > 10 ? `\n ... and ${matches.length - 10} more matches` : ""),
);
}
}
return {
content: [{
type: "text",
text: results.length > 0 ? results.join("\n\n") : `No matches for "${query}"`,
}],
};
} catch (err) {
return { content: [{ type: "text", text: `Error searching: ${err}` }], isError: true };
}
},
);
// ── Task Tools ──────────────────────────────────────────────────
server.tool(
"task_list",
"List all harness tasks with their current status and evaluation results",
async () => {
const rows = await db.select().from(tasksTable);
const tasks = rows.map(taskSummary);
return {
content: [{ type: "text", text: JSON.stringify(tasks, null, 2) }],
};
},
);
server.tool(
"task_get",
"Get full details for a harness task including iteration history and evaluations",
{ taskId: z.string().describe("Task ID to look up") },
async ({ taskId }) => {
const [taskRow] = await db.select().from(tasksTable).where(eq(tasksTable.id, taskId));
if (!taskRow) {
return { content: [{ type: "text", text: `Task ${taskId} not found` }], isError: true };
}
const iters = await db
.select()
.from(iterationsTable)
.where(eq(iterationsTable.taskId, taskId));
const result = {
...taskSummary(taskRow),
spec: taskRow.spec,
evals: taskRow.evals,
pr: taskRow.pr,
iterations: iters
.sort((a, b) => a.n - b.n)
.map((i) => ({
n: i.n,
status: i.status,
diagnosis: i.diagnosis,
evals: i.evals,
diffStats: i.diffStats,
agentOutput: i.agentOutput ? i.agentOutput.slice(-4000) : null,
startedAt: i.startedAt,
completedAt: i.completedAt,
})),
};
return {
content: [{ type: "text", text: JSON.stringify(result, null, 2) }],
};
},
);
server.tool(
"task_create",
"Create a new harness task. The task will be created in 'pending' status and can be started with task_start.",
{
slug: z.string().describe("Unique short identifier for the task (e.g. 'fix-auth-bug')"),
goal: z.string().describe("High-level description of what the task should accomplish"),
project: z.string().describe("Repository in 'owner/repo' format"),
agentId: z.string().describe("ID of the agent configuration to use"),
maxIterations: z.number().optional().describe("Maximum iterations before giving up (default: 6)"),
criteria: z
.array(
z.object({
label: z.string().describe("Criterion name"),
target: z.string().describe("Evaluation target DSL (e.g. 'exitCode:0', 'filesChanged:>0')"),
}),
)
.optional()
.describe("Success criteria for evaluation"),
constraints: z.array(z.string()).optional().describe("Implementation constraints"),
knowledgeRefs: z.array(z.string()).optional().describe("Knowledge document filenames to include in prompt"),
gitProvider: z.enum(["github", "gitlab", "gitea"]).optional().describe("Git provider (default: github)"),
gitBaseUrl: z.string().optional().describe("Base URL for the git provider API"),
},
async (args) => {
const spec = {
slug: args.slug,
goal: args.goal,
project: args.project,
agentId: args.agentId,
maxIterations: args.maxIterations ?? 6,
criteria: args.criteria ?? [],
constraints: args.constraints ?? [],
knowledgeRefs: args.knowledgeRefs ?? [],
gitProvider: args.gitProvider,
gitBaseUrl: args.gitBaseUrl,
};
const taskId = `task-${Date.now()}`;
await db.insert(tasksTable).values({
id: taskId,
slug: spec.slug,
goal: spec.goal,
project: spec.project,
status: "pending",
iteration: 0,
maxIterations: spec.maxIterations,
startedAt: null,
evals: {},
spec,
});
return {
content: [{ type: "text", text: JSON.stringify({ id: taskId, status: "pending", slug: spec.slug }) }],
};
},
);
server.tool(
"task_start",
"Ensure the orchestrator is running so it will pick up pending tasks. Sets orchestrator state to running in the database.",
async () => {
// Ensure the singleton row exists
await db
.insert(orchTable)
.values({ id: "singleton", running: false })
.onConflictDoNothing();
// Set running
await db
.update(orchTable)
.set({ running: true, updatedAt: new Date() })
.where(eq(orchTable.id, "singleton"));
return {
content: [{ type: "text", text: JSON.stringify({ ok: true, message: "Orchestrator set to running — pending tasks will be picked up" }) }],
};
},
);
server.tool(
"task_stop",
"Request cancellation of a running harness task. Sets cancel_requested flag which the orchestrator polls.",
{ taskId: z.string().describe("ID of the running task to cancel") },
async ({ taskId }) => {
const result = await db
.update(tasksTable)
.set({ cancelRequested: true, updatedAt: new Date() })
.where(and(eq(tasksTable.id, taskId), eq(tasksTable.status, "running")));
const rowCount = (result as unknown as { rowCount: number }).rowCount;
if (rowCount === 0) {
return { content: [{ type: "text", text: `Task ${taskId} is not running or not found` }], isError: true };
}
return {
content: [{ type: "text", text: JSON.stringify({ ok: true, message: "Cancellation requested" }) }],
};
},
);
// ── Agent & Model Tools ─────────────────────────────────────────
server.tool(
"agent_list",
"List all configured agent runtimes (agent configs with runtime, model, and provider)",
async () => {
const rows = await db.select().from(agentTable);
const agents = rows.map((r) => ({
id: r.id,
name: r.name,
runtime: r.runtime,
modelId: r.modelId,
provider: r.provider,
}));
return {
content: [{ type: "text", text: JSON.stringify(agents, null, 2) }],
};
},
);
server.tool(
"model_list",
"List available AI models with pricing information",
async () => {
const rows = await db.select().from(modelsTable).where(eq(modelsTable.enabled, true));
const models = rows.map((r) => ({
id: r.id,
name: r.name,
provider: r.provider,
contextWindow: r.contextWindow,
costPer1kInput: r.costPer1kInput,
costPer1kOutput: r.costPer1kOutput,
}));
return {
content: [{ type: "text", text: JSON.stringify(models, null, 2) }],
};
},
);
// ── Start ───────────────────────────────────────────────────────
// ── Start ───────────────────────────────────────────────────
async function main() {
const transport = new StdioServerTransport();