Add event-driven tasks via Gitea webhooks
Some checks failed
Deploy Production / deploy (push) Failing after 35s
CI / lint-and-test (push) Successful in 33s
CI / build (push) Has been cancelled

Webhook endpoint at /api/webhooks/gitea receives Gitea status events,
matches them against configurable event triggers with conditions
(event type, repo glob, state, context), renders task templates with
{{variable}} substitution, and creates harness tasks automatically.

Includes circuit breaker: after N consecutive task failures from the
same trigger (default 3), the trigger auto-disables. Re-enable
manually via PATCH /api/event-triggers/:id.

New tables: harness_event_triggers (rules + circuit breaker state),
harness_event_log (audit trail + dedup via X-Gitea-Delivery).
This commit is contained in:
Julia McGhee
2026-03-21 21:15:15 +00:00
parent ccebbc4015
commit eeb87018d7
19 changed files with 2368 additions and 35 deletions

View File

@@ -4,13 +4,14 @@
"private": true,
"scripts": {
"dev": "next dev --port 3100",
"build": "next build",
"build": "node scripts/build-mcp.mjs && next build",
"start": "next start",
"lint": "next lint",
"test": "echo \"no tests yet\""
},
"dependencies": {
"@homelab/db": "workspace:^",
"@modelcontextprotocol/sdk": "^1.27.1",
"@xterm/addon-fit": "^0.11.0",
"@xterm/xterm": "^6.0.0",
"drizzle-orm": "^0.36.0",
@@ -19,14 +20,17 @@
"postgres": "^3.4.0",
"react": "^19.0.0",
"react-dom": "^19.0.0",
"tsx": "^4.19.0",
"ws": "^8.20.0",
"yaml": "^2.7.0"
"yaml": "^2.7.0",
"zod": "^4.3.6"
},
"devDependencies": {
"@types/node": "^22.10.0",
"@types/react": "^19.0.0",
"@types/react-dom": "^19.0.0",
"@types/ws": "^8.18.1",
"esbuild": "^0.27.4",
"typescript": "^5.7.0"
}
}

View File

@@ -0,0 +1,19 @@
import { build } from "esbuild";
import { resolve, dirname } from "node:path";
import { fileURLToPath } from "node:url";
const __dirname = dirname(fileURLToPath(import.meta.url));
await build({
entryPoints: [resolve(__dirname, "../src/mcp-server.ts")],
bundle: true,
platform: "node",
target: "node20",
format: "esm",
outfile: resolve(__dirname, "../dist/mcp-server.mjs"),
external: [],
banner: { js: "#!/usr/bin/env node" },
// Inline all deps — the output is a self-contained script
});
console.log("Built dist/mcp-server.mjs");

View File

@@ -0,0 +1,64 @@
import { NextRequest, NextResponse } from "next/server";
import {
getEventTrigger,
updateEventTrigger,
deleteEventTrigger,
getEventLogByTrigger,
} from "@/lib/event-store";
export async function GET(
_request: NextRequest,
{ params }: { params: Promise<{ id: string }> },
) {
const { id } = await params;
const trigger = await getEventTrigger(id);
if (!trigger) {
return NextResponse.json({ error: "not found" }, { status: 404 });
}
const recentLogs = await getEventLogByTrigger(id, 20);
return NextResponse.json({ ...trigger, recentLogs });
}
export async function PATCH(
request: NextRequest,
{ params }: { params: Promise<{ id: string }> },
) {
const { id } = await params;
const body = await request.json();
// Allow re-enabling a circuit-broken trigger
const updates: Record<string, unknown> = {};
if (body.name !== undefined) updates.name = body.name;
if (body.enabled !== undefined) updates.enabled = body.enabled;
if (body.eventType !== undefined) updates.eventType = body.eventType;
if (body.repoFilter !== undefined) updates.repoFilter = body.repoFilter;
if (body.stateFilter !== undefined) updates.stateFilter = body.stateFilter;
if (body.contextFilter !== undefined) updates.contextFilter = body.contextFilter;
if (body.taskTemplate !== undefined) updates.taskTemplate = body.taskTemplate;
if (body.maxConsecutiveFailures !== undefined) updates.maxConsecutiveFailures = body.maxConsecutiveFailures;
if (body.webhookSecret !== undefined) updates.webhookSecret = body.webhookSecret;
// Re-enable: reset circuit breaker
if (body.enabled === true) {
updates.consecutiveFailures = 0;
updates.disabledReason = null;
}
const updated = await updateEventTrigger(id, updates);
if (!updated) {
return NextResponse.json({ error: "not found" }, { status: 404 });
}
return NextResponse.json(updated);
}
export async function DELETE(
_request: NextRequest,
{ params }: { params: Promise<{ id: string }> },
) {
const { id } = await params;
const deleted = await deleteEventTrigger(id);
if (!deleted) {
return NextResponse.json({ error: "not found" }, { status: 404 });
}
return NextResponse.json({ ok: true });
}

View File

@@ -0,0 +1,53 @@
import { NextRequest, NextResponse } from "next/server";
import {
getAllEventTriggers,
createEventTrigger,
} from "@/lib/event-store";
export async function GET() {
return NextResponse.json(await getAllEventTriggers());
}
export async function POST(request: NextRequest) {
const body = await request.json();
if (!body.name || !body.eventType || !body.taskTemplate) {
return NextResponse.json(
{ error: "name, eventType, and taskTemplate are required" },
{ status: 400 },
);
}
if (!body.taskTemplate.agentId || !body.taskTemplate.goal) {
return NextResponse.json(
{ error: "taskTemplate must include agentId and goal" },
{ status: 400 },
);
}
const trigger = await createEventTrigger({
id: body.id || `evt-${Date.now()}`,
name: body.name,
enabled: body.enabled ?? true,
eventType: body.eventType,
repoFilter: body.repoFilter || null,
stateFilter: body.stateFilter || null,
contextFilter: body.contextFilter || null,
taskTemplate: {
slug: body.taskTemplate.slug || "event-{{sha_short}}",
goal: body.taskTemplate.goal,
project: body.taskTemplate.project || "{{repo}}",
gitProvider: body.taskTemplate.gitProvider,
gitBaseUrl: body.taskTemplate.gitBaseUrl,
agentId: body.taskTemplate.agentId,
maxIterations: body.taskTemplate.maxIterations || 6,
criteria: body.taskTemplate.criteria || [],
constraints: body.taskTemplate.constraints || [],
knowledgeRefs: body.taskTemplate.knowledgeRefs || [],
},
maxConsecutiveFailures: body.maxConsecutiveFailures ?? 3,
webhookSecret: body.webhookSecret || null,
});
return NextResponse.json(trigger, { status: 201 });
}

View File

@@ -8,8 +8,8 @@ import {
export async function GET() {
return NextResponse.json({
running: isRunning(),
currentTaskId: currentRunningTaskId(),
running: await isRunning(),
currentTaskId: await currentRunningTaskId(),
});
}
@@ -18,12 +18,12 @@ export async function POST(request: NextRequest) {
const action = body.action as string;
if (action === "start") {
startOrchestrator();
await startOrchestrator();
return NextResponse.json({ ok: true, running: true });
}
if (action === "stop") {
stopOrchestrator();
await stopOrchestrator();
return NextResponse.json({ ok: true, running: false });
}

View File

@@ -20,7 +20,7 @@ export async function POST(
);
}
startOrchestrator();
await startOrchestrator();
return NextResponse.json({ ok: true, message: "Orchestrator started, task will be picked up" });
}

View File

@@ -1,6 +1,5 @@
import { NextRequest, NextResponse } from "next/server";
import { getTask } from "@/lib/store";
import { cancelTask } from "@/lib/orchestrator";
import { getTask, requestTaskCancel } from "@/lib/store";
export async function POST(
_request: NextRequest,
@@ -20,11 +19,11 @@ export async function POST(
);
}
const cancelled = cancelTask(id);
const cancelled = await requestTaskCancel(id);
if (!cancelled) {
return NextResponse.json(
{ error: "Task is not the currently executing task" },
{ status: 400 },
{ error: "Failed to request cancellation" },
{ status: 500 },
);
}

View File

@@ -0,0 +1,158 @@
import { NextRequest, NextResponse } from "next/server";
import { createHmac, timingSafeEqual } from "node:crypto";
import { createTask } from "@/lib/store";
import { extractVariables, renderTaskTemplate } from "@/lib/template";
import { findMatchingTriggers } from "@/lib/event-matching";
import {
createEventLogEntry,
getEventLogByDeliveryId,
getEventTrigger,
} from "@/lib/event-store";
import { startOrchestrator } from "@/lib/orchestrator";
import type { Task } from "@/lib/types";
function verifySignature(
rawBody: string,
signature: string,
secret: string,
): boolean {
const computed = createHmac("sha256", secret).update(rawBody).digest("hex");
try {
return timingSafeEqual(
Buffer.from(computed, "hex"),
Buffer.from(signature, "hex"),
);
} catch {
return false;
}
}
export async function POST(request: NextRequest) {
const deliveryId = request.headers.get("x-gitea-delivery") || "";
const eventType = request.headers.get("x-gitea-event") || "";
const signature = request.headers.get("x-gitea-signature") || "";
const rawBody = await request.text();
let payload: Record<string, unknown>;
try {
payload = JSON.parse(rawBody);
} catch {
return NextResponse.json({ error: "invalid JSON" }, { status: 400 });
}
// HMAC validation
const globalSecret = process.env.GITEA_WEBHOOK_SECRET;
if (globalSecret && signature) {
if (!verifySignature(rawBody, signature, globalSecret)) {
return NextResponse.json({ error: "invalid signature" }, { status: 401 });
}
}
// Dedup
if (deliveryId) {
const existing = await getEventLogByDeliveryId(deliveryId);
if (existing) {
return NextResponse.json({ status: "duplicate", deliveryId });
}
}
// Parse event
const event = extractVariables(eventType, payload);
// Find matching triggers
const triggers = await findMatchingTriggers(event);
if (triggers.length === 0) {
return NextResponse.json({
status: "no_match",
eventType,
repo: event.repo,
state: event.state,
});
}
const results: { triggerId: string; taskId: string | null; status: string }[] = [];
for (const trigger of triggers) {
const logDeliveryId = triggers.length > 1
? `${deliveryId || Date.now()}-${trigger.id}`
: deliveryId || String(Date.now());
try {
// Check per-trigger secret if set
if (trigger.webhookSecret && signature) {
if (!verifySignature(rawBody, signature, trigger.webhookSecret)) {
await createEventLogEntry({
triggerId: trigger.id,
deliveryId: logDeliveryId,
eventType,
repo: event.repo,
commitSha: event.sha,
branch: event.branch,
status: "skipped",
skipReason: "Per-trigger signature validation failed",
payload,
});
results.push({ triggerId: trigger.id, taskId: null, status: "skipped" });
continue;
}
}
// Render task spec from template
const spec = renderTaskTemplate(trigger.taskTemplate, event);
const taskId = `task-${Date.now()}-${trigger.id.slice(-4)}`;
const task: Task = {
id: taskId,
slug: spec.slug,
goal: spec.goal,
project: spec.project,
status: "pending",
iteration: 0,
maxIterations: spec.maxIterations,
startedAt: null,
evals: {},
iterations: [],
spec,
};
await createTask(task);
await createEventLogEntry({
triggerId: trigger.id,
deliveryId: logDeliveryId,
eventType,
repo: event.repo,
commitSha: event.sha,
branch: event.branch,
status: "task_created",
taskId,
payload,
});
results.push({ triggerId: trigger.id, taskId, status: "task_created" });
} catch (err) {
const errorMsg = err instanceof Error ? err.message : String(err);
await createEventLogEntry({
triggerId: trigger.id,
deliveryId: logDeliveryId,
eventType,
repo: event.repo,
commitSha: event.sha,
branch: event.branch,
status: "error",
error: errorMsg,
payload,
}).catch(() => {});
results.push({ triggerId: trigger.id, taskId: null, status: "error" });
}
}
// Ensure orchestrator is running to pick up new tasks
if (results.some(r => r.status === "task_created")) {
startOrchestrator();
}
return NextResponse.json({ status: "processed", results });
}

View File

@@ -0,0 +1,34 @@
// Match webhook events against event trigger conditions.
import { getEnabledEventTriggers, type EventTrigger } from "./event-store";
import { type ParsedEvent } from "./template";
function escapeRegex(s: string): string {
return s.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}
function globMatch(pattern: string, value: string): boolean {
const regex = new RegExp(
"^" + pattern.split("*").map(escapeRegex).join(".*") + "$",
);
return regex.test(value);
}
export function matchesTrigger(trigger: EventTrigger, event: ParsedEvent): boolean {
if (!trigger.enabled) return false;
if (trigger.eventType !== "*" && trigger.eventType !== event.eventType) return false;
if (trigger.repoFilter && !globMatch(trigger.repoFilter, event.repo)) return false;
if (trigger.stateFilter && trigger.stateFilter !== event.state) return false;
if (trigger.contextFilter && !event.context.includes(trigger.contextFilter)) return false;
return true;
}
export async function findMatchingTriggers(event: ParsedEvent): Promise<EventTrigger[]> {
const triggers = await getEnabledEventTriggers();
return triggers.filter(t => matchesTrigger(t, event));
}

View File

@@ -0,0 +1,123 @@
import { eq, desc } from "drizzle-orm";
import { db } from "./db";
import { eventTriggers, eventLog } from "@homelab/db";
// ─── TYPES ──────────────────────────────────────────────────
export type EventTrigger = typeof eventTriggers.$inferSelect;
export type EventTriggerInsert = typeof eventTriggers.$inferInsert;
export type EventLogEntry = typeof eventLog.$inferSelect;
// ─── EVENT TRIGGERS ─────────────────────────────────────────
export async function getAllEventTriggers(): Promise<EventTrigger[]> {
return db.select().from(eventTriggers);
}
export async function getEnabledEventTriggers(): Promise<EventTrigger[]> {
return db.select().from(eventTriggers).where(eq(eventTriggers.enabled, true));
}
export async function getEventTrigger(id: string): Promise<EventTrigger | undefined> {
const [row] = await db.select().from(eventTriggers).where(eq(eventTriggers.id, id));
return row;
}
export async function createEventTrigger(trigger: EventTriggerInsert): Promise<EventTrigger> {
const [row] = await db.insert(eventTriggers).values(trigger).returning();
return row;
}
export async function updateEventTrigger(
id: string,
updates: Partial<EventTriggerInsert>,
): Promise<EventTrigger | undefined> {
const [row] = await db
.update(eventTriggers)
.set({ ...updates, updatedAt: new Date() })
.where(eq(eventTriggers.id, id))
.returning();
return row;
}
export async function deleteEventTrigger(id: string): Promise<boolean> {
await db.delete(eventLog).where(eq(eventLog.triggerId, id));
const result = await db.delete(eventTriggers).where(eq(eventTriggers.id, id));
return (result as unknown as { rowCount: number }).rowCount > 0;
}
// ─── EVENT LOG ──────────────────────────────────────────────
export async function createEventLogEntry(
entry: typeof eventLog.$inferInsert,
): Promise<EventLogEntry> {
const [row] = await db.insert(eventLog).values(entry).returning();
return row;
}
export async function getEventLogByDeliveryId(
deliveryId: string,
): Promise<EventLogEntry | undefined> {
const [row] = await db
.select()
.from(eventLog)
.where(eq(eventLog.deliveryId, deliveryId));
return row;
}
export async function getEventLogByTaskId(
taskId: string,
): Promise<EventLogEntry | undefined> {
const [row] = await db
.select()
.from(eventLog)
.where(eq(eventLog.taskId, taskId));
return row;
}
export async function getEventLogByTrigger(
triggerId: string,
limit = 50,
offset = 0,
): Promise<EventLogEntry[]> {
return db
.select()
.from(eventLog)
.where(eq(eventLog.triggerId, triggerId))
.orderBy(desc(eventLog.createdAt))
.limit(limit)
.offset(offset);
}
// ─── CIRCUIT BREAKER ────────────────────────────────────────
export async function recordTaskOutcome(
triggerId: string,
passed: boolean,
): Promise<void> {
const trigger = await getEventTrigger(triggerId);
if (!trigger) return;
if (passed) {
await db
.update(eventTriggers)
.set({ consecutiveFailures: 0, updatedAt: new Date() })
.where(eq(eventTriggers.id, triggerId));
return;
}
const newCount = trigger.consecutiveFailures + 1;
const shouldDisable = newCount >= trigger.maxConsecutiveFailures;
await db
.update(eventTriggers)
.set({
consecutiveFailures: newCount,
enabled: shouldDisable ? false : trigger.enabled,
disabledReason: shouldDisable
? `Circuit breaker: ${newCount} consecutive failures`
: trigger.disabledReason,
updatedAt: new Date(),
})
.where(eq(eventTriggers.id, triggerId));
}

View File

@@ -5,9 +5,15 @@ import {
updateIteration,
getFirstPendingTask,
getRunningTasks,
getOrchestratorState,
setOrchestratorRunning,
setOrchestratorCurrentTask,
updateOrchestratorHeartbeat,
isTaskCancelRequested,
} from "./store";
import { recordUsage } from "./model-store";
import { getAgentConfig } from "./agents";
import { getEventLogByTaskId, recordTaskOutcome } from "./event-store";
import { getRawCredentialsByProvider } from "./credentials";
import {
ensureBareClone,
@@ -25,47 +31,48 @@ import { evaluate } from "./evaluator";
import { Task, Iteration } from "./types";
const POLL_INTERVAL_MS = 2000;
const CANCEL_CHECK_INTERVAL_MS = 3000;
// Local process state — only the AbortController and poll timer live in memory.
// Everything else (running, currentTaskId) is in Postgres.
let pollTimer: ReturnType<typeof setInterval> | null = null;
let running = false;
let currentTaskId: string | null = null;
let currentAbort: AbortController | null = null;
export function isRunning(): boolean {
return running;
// ─── Public API (all DB-backed) ─────────────────────────────
export async function isRunning(): Promise<boolean> {
const state = await getOrchestratorState();
return state.running;
}
export function currentRunningTaskId(): string | null {
return currentTaskId;
export async function currentRunningTaskId(): Promise<string | null> {
const state = await getOrchestratorState();
return state.currentTaskId;
}
export function startOrchestrator(): void {
if (running) return;
running = true;
export async function startOrchestrator(): Promise<void> {
const state = await getOrchestratorState();
if (state.running && pollTimer) return;
recoverCrashedTasks();
await setOrchestratorRunning(true);
await recoverCrashedTasks();
pollTimer = setInterval(() => {
if (currentTaskId) return;
poll();
}, POLL_INTERVAL_MS);
poll();
}
export function stopOrchestrator(): void {
running = false;
export async function stopOrchestrator(): Promise<void> {
await setOrchestratorRunning(false);
if (pollTimer) {
clearInterval(pollTimer);
pollTimer = null;
}
}
export function cancelTask(taskId: string): boolean {
if (currentTaskId !== taskId) return false;
currentAbort?.abort();
return true;
}
// ─── Internals ──────────────────────────────────────────────
async function recoverCrashedTasks(): Promise<void> {
const runningTasks = await getRunningTasks();
@@ -84,15 +91,17 @@ async function recoverCrashedTasks(): Promise<void> {
completedAt: Date.now(),
});
}
await setOrchestratorCurrentTask(null);
}
async function poll(): Promise<void> {
if (!running || currentTaskId) return;
const state = await getOrchestratorState();
if (!state.running || state.currentTaskId) return;
const task = await getFirstPendingTask();
if (!task) return;
currentTaskId = task.id;
await setOrchestratorCurrentTask(task.id);
currentAbort = new AbortController();
try {
@@ -104,11 +113,43 @@ async function poll(): Promise<void> {
completedAt: Date.now(),
});
} finally {
currentTaskId = null;
// Circuit breaker: update event trigger if this task was event-driven
try {
const finalTask = await getTask(task.id);
if (finalTask && (finalTask.status === "completed" || finalTask.status === "failed")) {
const logEntry = await getEventLogByTaskId(task.id);
if (logEntry) {
await recordTaskOutcome(logEntry.triggerId, finalTask.status === "completed");
}
}
} catch (err) {
console.error("[orchestrator] Circuit breaker update failed:", err);
}
await setOrchestratorCurrentTask(null);
currentAbort = null;
}
}
/**
* Periodically checks the cancel_requested flag in Postgres and triggers
* the local AbortController if set. Returns a cleanup function.
*/
function startCancelWatcher(taskId: string, abort: AbortController): () => void {
const timer = setInterval(async () => {
try {
if (await isTaskCancelRequested(taskId)) {
abort.abort();
clearInterval(timer);
}
} catch {
// DB read failure — don't crash the watcher, just retry next tick
}
}, CANCEL_CHECK_INTERVAL_MS);
return () => clearInterval(timer);
}
async function runTask(task: Task): Promise<void> {
const agentConfig = await getAgentConfig(task.spec.agentId);
if (!agentConfig) {
@@ -138,11 +179,15 @@ async function runTask(task: Task): Promise<void> {
startedAt: Date.now(),
});
// Start watching for cancel_requested in DB
const stopCancelWatcher = startCancelWatcher(task.id, currentAbort!);
let bareClone: string;
try {
bareClone = await ensureBareClone(repoUrl, task.slug);
} catch (err) {
console.error(`[orchestrator] Failed to clone repo for task ${task.id}:`, err);
stopCancelWatcher();
await updateTask(task.id, {
status: "failed",
completedAt: Date.now(),
@@ -159,11 +204,17 @@ async function runTask(task: Task): Promise<void> {
status: "failed",
completedAt: Date.now(),
});
stopCancelWatcher();
return;
}
await updateOrchestratorHeartbeat();
const result = await runIteration(task, n, bareClone, branchName);
if (!result) return;
if (!result) {
stopCancelWatcher();
return;
}
if (result.allPassed) {
converged = true;
@@ -171,6 +222,8 @@ async function runTask(task: Task): Promise<void> {
}
}
stopCancelWatcher();
if (converged) {
try {
const finalTask = await getTask(task.id);

View File

@@ -1,8 +1,77 @@
import { eq, and } from "drizzle-orm";
import { db } from "./db";
import { tasks as tasksTable, iterations as iterationsTable } from "@homelab/db";
import {
tasks as tasksTable,
iterations as iterationsTable,
orchestratorState as orchTable,
} from "@homelab/db";
import { Task, Iteration } from "./types";
// ─── ORCHESTRATOR STATE ─────────────────────────────────────
export interface OrchestratorState {
running: boolean;
currentTaskId: string | null;
heartbeat: number | null;
}
async function ensureOrchestratorRow(): Promise<void> {
await db
.insert(orchTable)
.values({ id: "singleton", running: false })
.onConflictDoNothing();
}
export async function getOrchestratorState(): Promise<OrchestratorState> {
await ensureOrchestratorRow();
const [row] = await db.select().from(orchTable).where(eq(orchTable.id, "singleton"));
return {
running: row.running,
currentTaskId: row.currentTaskId ?? null,
heartbeat: row.heartbeat ?? null,
};
}
export async function setOrchestratorRunning(running: boolean): Promise<void> {
await ensureOrchestratorRow();
await db
.update(orchTable)
.set({ running, updatedAt: new Date() })
.where(eq(orchTable.id, "singleton"));
}
export async function setOrchestratorCurrentTask(taskId: string | null): Promise<void> {
await db
.update(orchTable)
.set({ currentTaskId: taskId, heartbeat: Date.now(), updatedAt: new Date() })
.where(eq(orchTable.id, "singleton"));
}
export async function updateOrchestratorHeartbeat(): Promise<void> {
await db
.update(orchTable)
.set({ heartbeat: Date.now(), updatedAt: new Date() })
.where(eq(orchTable.id, "singleton"));
}
// ─── TASK CANCEL FLAG ───────────────────────────────────────
export async function requestTaskCancel(taskId: string): Promise<boolean> {
const result = await db
.update(tasksTable)
.set({ cancelRequested: true, updatedAt: new Date() })
.where(and(eq(tasksTable.id, taskId), eq(tasksTable.status, "running")));
return (result as unknown as { rowCount: number }).rowCount > 0;
}
export async function isTaskCancelRequested(taskId: string): Promise<boolean> {
const [row] = await db
.select({ cancelRequested: tasksTable.cancelRequested })
.from(tasksTable)
.where(eq(tasksTable.id, taskId));
return row?.cancelRequested ?? false;
}
function rowToTask(
row: typeof tasksTable.$inferSelect,
iters: (typeof iterationsTable.$inferSelect)[],

View File

@@ -0,0 +1,95 @@
// Template variable extraction and substitution for event-driven tasks.
export interface ParsedEvent {
eventType: string;
repo: string;
repoName: string;
owner: string;
sha: string;
shaShort: string;
branch: string;
state: string;
context: string;
targetUrl: string;
commitMessage: string;
timestamp: string;
}
export function extractVariables(
eventType: string,
payload: Record<string, unknown>,
): ParsedEvent {
const repository = payload.repository as Record<string, unknown> | undefined;
const owner = repository?.owner as Record<string, unknown> | undefined;
const commit = payload.commit as Record<string, unknown> | undefined;
const branches = payload.branches as { name: string }[] | undefined;
const sha = String(payload.sha || "");
return {
eventType,
repo: String(repository?.full_name || ""),
repoName: String(repository?.name || ""),
owner: String(owner?.login || ""),
sha,
shaShort: sha.slice(0, 7),
branch: branches?.[0]?.name || "",
state: String(payload.state || ""),
context: String(payload.context || ""),
targetUrl: String(payload.target_url || ""),
commitMessage: String(commit?.message || payload.description || ""),
timestamp: new Date().toISOString(),
};
}
export function renderTemplate(
template: string,
vars: ParsedEvent,
): string {
const map: Record<string, string> = {
repo: vars.repo,
repo_name: vars.repoName,
owner: vars.owner,
sha: vars.sha,
sha_short: vars.shaShort,
branch: vars.branch,
state: vars.state,
context: vars.context,
target_url: vars.targetUrl,
commit_message: vars.commitMessage,
timestamp: vars.timestamp,
};
return template.replace(/\{\{(\w+)\}\}/g, (match, key) => map[key] ?? match);
}
export function renderTaskTemplate(
template: {
slug: string;
goal: string;
project: string;
gitProvider?: string;
gitBaseUrl?: string;
agentId: string;
maxIterations: number;
criteria: { label: string; target: string }[];
constraints: string[];
knowledgeRefs: string[];
},
vars: ParsedEvent,
) {
return {
slug: renderTemplate(template.slug, vars),
goal: renderTemplate(template.goal, vars),
project: renderTemplate(template.project, vars),
gitProvider: template.gitProvider as "github" | "gitlab" | "gitea" | undefined,
gitBaseUrl: template.gitBaseUrl,
agentId: template.agentId,
maxIterations: template.maxIterations,
criteria: template.criteria.map(c => ({
label: renderTemplate(c.label, vars),
target: renderTemplate(c.target, vars),
})),
constraints: template.constraints.map(c => renderTemplate(c, vars)),
knowledgeRefs: template.knowledgeRefs.map(k => renderTemplate(k, vars)),
};
}

View File

@@ -0,0 +1,383 @@
#!/usr/bin/env node
/**
* Harness MCP Server
*
* Stdio MCP server that exposes harness knowledge management and task
* orchestration tools to agents spawned by the harness orchestrator.
*
* Environment variables:
* DATABASE_URL — Postgres connection string (required)
* HARNESS_KNOWLEDGE_DIR — Path to knowledge documents directory
*/
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
import { drizzle } from "drizzle-orm/postgres-js";
import { eq, and } from "drizzle-orm";
import postgres from "postgres";
import {
tasks as tasksTable,
iterations as iterationsTable,
agentConfigs as agentTable,
curatedModels as modelsTable,
orchestratorState as orchTable,
} from "@homelab/db";
import { readFile, writeFile, readdir, mkdir, stat } from "node:fs/promises";
import path from "node:path";
// ── Database ────────────────────────────────────────────────────
const connectionString = process.env.DATABASE_URL;
if (!connectionString) {
console.error("DATABASE_URL is required");
process.exit(1);
}
const client = postgres(connectionString);
const db = drizzle(client);
// ── Knowledge dir ───────────────────────────────────────────────
const KNOWLEDGE_DIR = process.env.HARNESS_KNOWLEDGE_DIR || "";
// ── Helpers ─────────────────────────────────────────────────────
function taskSummary(row: typeof tasksTable.$inferSelect) {
return {
id: row.id,
slug: row.slug,
goal: row.goal,
status: row.status,
project: row.project,
iteration: row.iteration,
maxIterations: row.maxIterations,
startedAt: row.startedAt,
completedAt: row.completedAt,
};
}
// ── Server ──────────────────────────────────────────────────────
const server = new McpServer({
name: "harness",
version: "1.0.0",
});
// ── Knowledge Tools ─────────────────────────────────────────────
server.tool(
"knowledge_list",
"List all knowledge documents available in the harness knowledge base",
async () => {
if (!KNOWLEDGE_DIR) {
return { content: [{ type: "text", text: "HARNESS_KNOWLEDGE_DIR not configured" }] };
}
try {
const entries = await readdir(KNOWLEDGE_DIR, { withFileTypes: true });
const files = entries
.filter((e) => e.isFile())
.map((e) => e.name);
return {
content: [{ type: "text", text: files.length > 0 ? files.join("\n") : "(empty)" }],
};
} catch (err) {
return { content: [{ type: "text", text: `Error listing knowledge dir: ${err}` }], isError: true };
}
},
);
server.tool(
"knowledge_read",
"Read a specific knowledge document by filename",
{ filename: z.string().describe("Filename of the knowledge document to read") },
async ({ filename }) => {
if (!KNOWLEDGE_DIR) {
return { content: [{ type: "text", text: "HARNESS_KNOWLEDGE_DIR not configured" }], isError: true };
}
const filePath = path.resolve(KNOWLEDGE_DIR, filename);
// Prevent path traversal
if (!filePath.startsWith(path.resolve(KNOWLEDGE_DIR))) {
return { content: [{ type: "text", text: "Invalid path" }], isError: true };
}
try {
const content = await readFile(filePath, "utf-8");
return { content: [{ type: "text", text: content }] };
} catch (err) {
return { content: [{ type: "text", text: `Error reading ${filename}: ${err}` }], isError: true };
}
},
);
server.tool(
"knowledge_write",
"Create or update a knowledge document in the harness knowledge base",
{
filename: z.string().describe("Filename for the knowledge document (e.g. 'findings.md')"),
content: z.string().describe("Content to write to the knowledge document"),
},
async ({ filename, content }) => {
if (!KNOWLEDGE_DIR) {
return { content: [{ type: "text", text: "HARNESS_KNOWLEDGE_DIR not configured" }], isError: true };
}
const filePath = path.resolve(KNOWLEDGE_DIR, filename);
if (!filePath.startsWith(path.resolve(KNOWLEDGE_DIR))) {
return { content: [{ type: "text", text: "Invalid path" }], isError: true };
}
try {
await mkdir(path.dirname(filePath), { recursive: true });
await writeFile(filePath, content, "utf-8");
return { content: [{ type: "text", text: `Wrote ${filename} (${content.length} bytes)` }] };
} catch (err) {
return { content: [{ type: "text", text: `Error writing ${filename}: ${err}` }], isError: true };
}
},
);
server.tool(
"knowledge_search",
"Search across all knowledge documents for a text pattern (case-insensitive substring match)",
{ query: z.string().describe("Text to search for across all knowledge documents") },
async ({ query }) => {
if (!KNOWLEDGE_DIR) {
return { content: [{ type: "text", text: "HARNESS_KNOWLEDGE_DIR not configured" }], isError: true };
}
try {
const entries = await readdir(KNOWLEDGE_DIR, { withFileTypes: true });
const files = entries.filter((e) => e.isFile());
const results: string[] = [];
const lowerQuery = query.toLowerCase();
for (const file of files) {
const filePath = path.join(KNOWLEDGE_DIR, file.name);
const content = await readFile(filePath, "utf-8");
const lines = content.split("\n");
const matches = lines
.map((line, i) => ({ line, lineNum: i + 1 }))
.filter(({ line }) => line.toLowerCase().includes(lowerQuery));
if (matches.length > 0) {
results.push(
`## ${file.name}\n` +
matches
.slice(0, 10)
.map(({ line, lineNum }) => ` L${lineNum}: ${line.trim()}`)
.join("\n") +
(matches.length > 10 ? `\n ... and ${matches.length - 10} more matches` : ""),
);
}
}
return {
content: [{
type: "text",
text: results.length > 0 ? results.join("\n\n") : `No matches for "${query}"`,
}],
};
} catch (err) {
return { content: [{ type: "text", text: `Error searching: ${err}` }], isError: true };
}
},
);
// ── Task Tools ──────────────────────────────────────────────────
server.tool(
"task_list",
"List all harness tasks with their current status and evaluation results",
async () => {
const rows = await db.select().from(tasksTable);
const tasks = rows.map(taskSummary);
return {
content: [{ type: "text", text: JSON.stringify(tasks, null, 2) }],
};
},
);
server.tool(
"task_get",
"Get full details for a harness task including iteration history and evaluations",
{ taskId: z.string().describe("Task ID to look up") },
async ({ taskId }) => {
const [taskRow] = await db.select().from(tasksTable).where(eq(tasksTable.id, taskId));
if (!taskRow) {
return { content: [{ type: "text", text: `Task ${taskId} not found` }], isError: true };
}
const iters = await db
.select()
.from(iterationsTable)
.where(eq(iterationsTable.taskId, taskId));
const result = {
...taskSummary(taskRow),
spec: taskRow.spec,
evals: taskRow.evals,
pr: taskRow.pr,
iterations: iters
.sort((a, b) => a.n - b.n)
.map((i) => ({
n: i.n,
status: i.status,
diagnosis: i.diagnosis,
evals: i.evals,
diffStats: i.diffStats,
agentOutput: i.agentOutput ? i.agentOutput.slice(-4000) : null,
startedAt: i.startedAt,
completedAt: i.completedAt,
})),
};
return {
content: [{ type: "text", text: JSON.stringify(result, null, 2) }],
};
},
);
server.tool(
"task_create",
"Create a new harness task. The task will be created in 'pending' status and can be started with task_start.",
{
slug: z.string().describe("Unique short identifier for the task (e.g. 'fix-auth-bug')"),
goal: z.string().describe("High-level description of what the task should accomplish"),
project: z.string().describe("Repository in 'owner/repo' format"),
agentId: z.string().describe("ID of the agent configuration to use"),
maxIterations: z.number().optional().describe("Maximum iterations before giving up (default: 6)"),
criteria: z
.array(
z.object({
label: z.string().describe("Criterion name"),
target: z.string().describe("Evaluation target DSL (e.g. 'exitCode:0', 'filesChanged:>0')"),
}),
)
.optional()
.describe("Success criteria for evaluation"),
constraints: z.array(z.string()).optional().describe("Implementation constraints"),
knowledgeRefs: z.array(z.string()).optional().describe("Knowledge document filenames to include in prompt"),
gitProvider: z.enum(["github", "gitlab", "gitea"]).optional().describe("Git provider (default: github)"),
gitBaseUrl: z.string().optional().describe("Base URL for the git provider API"),
},
async (args) => {
const spec = {
slug: args.slug,
goal: args.goal,
project: args.project,
agentId: args.agentId,
maxIterations: args.maxIterations ?? 6,
criteria: args.criteria ?? [],
constraints: args.constraints ?? [],
knowledgeRefs: args.knowledgeRefs ?? [],
gitProvider: args.gitProvider,
gitBaseUrl: args.gitBaseUrl,
};
const taskId = `task-${Date.now()}`;
await db.insert(tasksTable).values({
id: taskId,
slug: spec.slug,
goal: spec.goal,
project: spec.project,
status: "pending",
iteration: 0,
maxIterations: spec.maxIterations,
startedAt: null,
evals: {},
spec,
});
return {
content: [{ type: "text", text: JSON.stringify({ id: taskId, status: "pending", slug: spec.slug }) }],
};
},
);
server.tool(
"task_start",
"Ensure the orchestrator is running so it will pick up pending tasks. Sets orchestrator state to running in the database.",
async () => {
// Ensure the singleton row exists
await db
.insert(orchTable)
.values({ id: "singleton", running: false })
.onConflictDoNothing();
// Set running
await db
.update(orchTable)
.set({ running: true, updatedAt: new Date() })
.where(eq(orchTable.id, "singleton"));
return {
content: [{ type: "text", text: JSON.stringify({ ok: true, message: "Orchestrator set to running — pending tasks will be picked up" }) }],
};
},
);
server.tool(
"task_stop",
"Request cancellation of a running harness task. Sets cancel_requested flag which the orchestrator polls.",
{ taskId: z.string().describe("ID of the running task to cancel") },
async ({ taskId }) => {
const result = await db
.update(tasksTable)
.set({ cancelRequested: true, updatedAt: new Date() })
.where(and(eq(tasksTable.id, taskId), eq(tasksTable.status, "running")));
const rowCount = (result as unknown as { rowCount: number }).rowCount;
if (rowCount === 0) {
return { content: [{ type: "text", text: `Task ${taskId} is not running or not found` }], isError: true };
}
return {
content: [{ type: "text", text: JSON.stringify({ ok: true, message: "Cancellation requested" }) }],
};
},
);
// ── Agent & Model Tools ─────────────────────────────────────────
server.tool(
"agent_list",
"List all configured agent runtimes (agent configs with runtime, model, and provider)",
async () => {
const rows = await db.select().from(agentTable);
const agents = rows.map((r) => ({
id: r.id,
name: r.name,
runtime: r.runtime,
modelId: r.modelId,
provider: r.provider,
}));
return {
content: [{ type: "text", text: JSON.stringify(agents, null, 2) }],
};
},
);
server.tool(
"model_list",
"List available AI models with pricing information",
async () => {
const rows = await db.select().from(modelsTable).where(eq(modelsTable.enabled, true));
const models = rows.map((r) => ({
id: r.id,
name: r.name,
provider: r.provider,
contextWindow: r.contextWindow,
costPer1kInput: r.costPer1kInput,
costPer1kOutput: r.costPer1kOutput,
}));
return {
content: [{ type: "text", text: JSON.stringify(models, null, 2) }],
};
},
);
// ── Start ───────────────────────────────────────────────────────
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
}
main().catch((err) => {
console.error("Harness MCP server failed:", err);
process.exit(1);
});