Skip to main content
Forge connects to execution runtimes through Bridge adapters. Each adapter implements a single TypeScript interface — ExecutionAdapter — that normalizes the runtime’s execution model into a standard request/result contract. Bridge handles prompt assembly, tool translation, output normalization, and callback delivery. Your adapter’s job: accept a request, run it on the runtime, return a result. This guide walks through writing a new adapter from scratch.

The ExecutionAdapter Interface

Every adapter implements this interface. It lives in packages/bridge/src/execution-adapter.ts and is the foundational contract for all gateway communication.
import type { GatewayCapabilityProfile } from "@tangogroup/shared";

interface ExecutionAdapter {
  /** Unique provider identifier (e.g. "openclaw", "claude-agent-sdk", "langgraph"). */
  readonly provider: string;

  /** Initialize the adapter with gateway-specific configuration. */
  initialize(config: AdapterConfig): Promise<void>;

  /** Clean shutdown -- cancel active tasks, close connections. */
  shutdown(): Promise<void>;

  /** Health check -- can this adapter accept work right now? */
  checkHealth(): Promise<AdapterHealthStatus>;

  /** Declare what this gateway can and cannot do. */
  capabilities(): GatewayCapabilityProfile;

  /**
   * Execute a task. This is the core method.
   *
   * The bridge has already:
   * - Assembled the system prompt from workspace files
   * - Translated tool specs to normalized format
   * - Determined the output capture strategy
   *
   * The adapter's job: run it, return a result.
   */
  execute(request: ExecutionRequest): Promise<ExecutionResult>;

  /**
   * Cancel a running task. Best-effort -- not all gateways support this.
   * Returns true if cancellation was initiated.
   */
  cancel(taskHandle: string): Promise<boolean>;

  /**
   * Subscribe to execution events (tool calls, assistant messages, etc.).
   * The bridge translates these to SSE events for the Forge Console.
   * Optional -- adapters that don't support streaming just never call the listener.
   */
  onEvent?(listener: ExecutionEventListener): void;

  /** Remove an event listener. */
  offEvent?(listener: ExecutionEventListener): void;

  /**
   * Start an adapter-managed event relay (health monitors, WS relays, etc.).
   * Called by connectEventRelay() after the server is listening.
   * Optional -- thin adapters use onEvent/offEvent instead.
   */
  startEventRelay?(eventBus: EventBus): void;

  /** Stop the adapter-managed event relay. */
  stopEventRelay?(): void;
}

Method-by-Method Breakdown

MethodRequiredPurpose
providerYesUnique string ID. Used for adapter routing, logging, and the BRIDGE_ADAPTERS env var.
initialize()YesCalled once at startup. Load API keys, validate configuration, connect to external services.
shutdown()YesCalled on process exit. Cancel in-flight tasks, close connections, release resources.
checkHealth()YesCalled periodically by Bridge and Convex. Return "healthy", "degraded", or "offline".
capabilities()YesReturn a GatewayCapabilityProfile declaring what the runtime supports. Used for routing decisions.
execute()YesThe core method. Receives a normalized ExecutionRequest, runs it, returns an ExecutionResult.
cancel()YesAttempt to cancel a running task by its handle. Return true if cancellation was initiated.
onEvent()OptionalRegister a listener for real-time execution events (streaming text, tool calls).
offEvent()OptionalRemove a previously registered event listener.
startEventRelay()OptionalFor adapters with their own event infrastructure (e.g. WebSocket relays).
stopEventRelay()OptionalTear down the adapter-managed event relay.

Adapter Lifecycle

An adapter goes through a predictable lifecycle managed by Bridge:
1

Registration

The adapter class is registered in packages/bridge/src/adapter-loader.ts with a case in the loadSingleAdapter() switch. Bridge discovers it via the GATEWAY_PROVIDER or BRIDGE_ADAPTERS environment variable.
2

Initialization

Bridge calls adapter.initialize(config) at startup, passing environment variables as the config object. The adapter validates its configuration (API keys, endpoints) and sets up internal state.
3

Health Checks

Bridge exposes a /api/health endpoint that calls adapter.checkHealth(). Convex polls this endpoint to determine whether the gateway can accept work. Return "degraded" (not "offline") if you can partially function.
4

Dispatch

For each workflow step, Bridge calls adapter.execute(request) with a fully assembled ExecutionRequest. The adapter translates this to its native format, runs the task, and returns an ExecutionResult.
5

Event Streaming

During execution, adapters that support streaming emit ExecutionEvent objects to registered listeners. Bridge forwards these as SSE events to connected Forge Console clients.
6

Shutdown

On process exit, Bridge calls adapter.shutdown(). Cancel all active tasks and release resources.

Dispatch Flow

When a .lobsterX workflow step is dispatched, it flows through the following path:

End-to-End Dispatch Flow

1
Convex Interpreter

Resolves the DAG, runs governance gates, and POSTs a dispatch payload to Bridge.

HTTP POST /api/v2/dispatch
2
Bridge Dispatch Orchestrator

Validates the request, authenticates the token, and calls assembleExecutionRequest() to build a normalized ExecutionRequest from the dispatch payload.

adapter.execute(request)
3
Your Adapter

Translates the ExecutionRequest to the runtime’s native format. Calls the runtime. Normalizes the response into an ExecutionResult.

return ExecutionResult
4
Bridge Callback Delivery

Normalizes the output, then POSTs the result to Convex via the callback URL. Retries 3x with exponential backoff.

HTTP POST callback_url
5
Convex Callback Handler

Records the step result, creates token events for cost attribution, and fires a completion event to the interpreter to advance the DAG.

Convex
Bridge
Your Adapter
Your adapter never calls Convex directly. Bridge handles all callback delivery, retry logic, and output normalization. The adapter just returns an ExecutionResult.

Self-Registration and Capabilities

Adapters declare their capabilities at runtime through the capabilities() method. There is no static capability map — capabilities are self-registered by each adapter and stored in the gatewayRegistry table in Convex.

The GatewayCapabilityProfile

// From packages/shared/src/runtime-types.ts

const GATEWAY_CAPABILITIES = [
  "streaming",          // Real-time event streaming during execution
  "session_resume",     // Resume a prior conversation session
  "file_io",            // Native filesystem read/write/edit
  "mcp_servers",        // Connect to MCP tool servers
  "sandboxing",         // Isolated execution environment
  "subagents",          // Delegate to child agents
  "cost_tracking",      // Report token usage and cost
  "structured_output",  // Return structured JSON output
  "management",         // Agent/skill/system CRUD (RuntimeManager)
  "native_tools",       // Runtime has built-in tools
] as const;

interface GatewayCapabilityProfile {
  capabilities: GatewayCapability[];          // Which capabilities this adapter supports
  nativeTools: string[];                      // Built-in tools (e.g. "Read", "Write", "Bash")
  maxConcurrent: number;                      // Maximum concurrent executions
  outputStrategies: ("file" | "stream" | "structured")[];  // Supported output modes
  costTier: "low" | "medium" | "high";        // Cost classification
  latencyProfile: "fast" | "moderate" | "variable";        // Latency classification
}

How Capabilities Are Checked

On the Convex side, the routing engine uses gatewaySupports() from convex/gateways/capabilities.ts to check whether a gateway supports a specific capability:
// From convex/gateways/capabilities.ts

function gatewaySupports(
  profile: { capabilities?: string[] } | undefined | null,
  capability: string,
): boolean {
  if (!profile || !Array.isArray(profile.capabilities)) return false;
  return profile.capabilities.includes(capability);
}
The routing system is a layered stack: Layer 0 (pinned gateway) then Layer 1 (capability matching) then Layer 2 (cost/provider prefs) then Layer 3 (deploy-aware) then Layer 4 (health ranking). Your adapter’s capabilities() return value feeds into Layer 1.
Never read capability boolean fields directly. Always use gatewaySupports(profile, "capability_name") for forward compatibility.

Implementing a Minimal Adapter

Here is a step-by-step walkthrough for adding a new adapter. We will use a hypothetical “Acme AI” runtime as the example.

Step 1: Create the Package

mkdir -p packages/adapter-acme-ai/src
cd packages/adapter-acme-ai
Create package.json:
{
  "name": "@tangogroup/adapter-acme-ai",
  "version": "0.0.1",
  "type": "module",
  "main": "dist/index.js",
  "types": "dist/index.d.ts",
  "scripts": {
    "build": "tsc",
    "test": "vitest run"
  },
  "dependencies": {
    "@tangogroup/bridge": "workspace:*",
    "@tangogroup/shared": "workspace:*"
  },
  "devDependencies": {
    "@tangogroup/tsconfig": "workspace:*",
    "vitest": "^3.0.0"
  }
}
Add the package to pnpm-workspace.yaml if it is not already covered by a glob pattern.

Step 2: Implement the Adapter

Create src/provider.ts:
import type {
  AdapterConfig,
  AdapterHealthStatus,
  ExecutionAdapter,
  ExecutionEvent,
  ExecutionEventListener,
  ExecutionRequest,
  ExecutionResult,
  NormalizedUsage,
} from "@tangogroup/bridge";
import type { GatewayCapabilityProfile } from "@tangogroup/shared";

export class AcmeAIProvider implements ExecutionAdapter {
  readonly provider = "acme-ai";

  private eventListeners: Set<ExecutionEventListener> = new Set();
  private activeAbortControllers: Map<string, AbortController> = new Map();
  private apiKey: string | undefined;

  // -- Lifecycle --

  async initialize(config: AdapterConfig): Promise<void> {
    this.apiKey =
      typeof config.ACME_AI_API_KEY === "string"
        ? config.ACME_AI_API_KEY
        : process.env.ACME_AI_API_KEY;

    if (!this.apiKey) {
      console.warn("[AcmeAIProvider] ACME_AI_API_KEY not set -- requests will fail");
    }

    console.log(`[${new Date().toISOString()}] AcmeAIProvider initialized`);
  }

  async shutdown(): Promise<void> {
    for (const [taskId, controller] of this.activeAbortControllers) {
      controller.abort();
      console.log(`[${new Date().toISOString()}] Cancelled task ${taskId} on shutdown`);
    }
    this.activeAbortControllers.clear();
    console.log(`[${new Date().toISOString()}] AcmeAIProvider shut down`);
  }

  // -- Health --

  async checkHealth(): Promise<AdapterHealthStatus> {
    return this.apiKey ? "healthy" : "degraded";
  }

  // -- Capabilities --

  capabilities(): GatewayCapabilityProfile {
    return {
      capabilities: ["streaming", "cost_tracking"],
      nativeTools: [],
      maxConcurrent: 20,
      outputStrategies: ["stream"],
      costTier: "low",
      latencyProfile: "fast",
    };
  }

  // -- Execute --

  async execute(request: ExecutionRequest): Promise<ExecutionResult> {
    const startedAt = Date.now();
    const abortController = new AbortController();
    this.activeAbortControllers.set(request.taskId, abortController);

    try {
      // Emit start event
      this.emitEvent({
        type: "execution.started",
        taskId: request.taskId,
        agentId: request.agentId,
        timestamp: new Date().toISOString(),
      });

      // Set up timeout
      const timeout = setTimeout(() => {
        abortController.abort();
      }, request.timeoutSeconds * 1000);

      try {
        // --- Your runtime call goes here ---
        // Translate request.message, request.systemPrompt, request.model,
        // request.tools to whatever your runtime expects.
        //
        // const response = await acmeAI.generate({
        //   prompt: request.message,
        //   system: request.systemPrompt,
        //   model: request.model,
        //   signal: abortController.signal,
        // });

        const output = ""; // response.text
        const usage: NormalizedUsage = {
          inputTokens: null,   // response.usage.input_tokens
          outputTokens: null,  // response.usage.output_tokens
          thinkingTokens: null,
          totalTokens: null,
          cacheReadTokens: null,
          cacheWriteTokens: null,
        };

        // Emit completion event
        this.emitEvent({
          type: "execution.completed",
          taskId: request.taskId,
          agentId: request.agentId,
          timestamp: new Date().toISOString(),
        });

        return {
          status: "ok",
          output,
          durationMs: Date.now() - startedAt,
          usage,
          model: request.model,
          modelProvider: "acme",
        };
      } finally {
        clearTimeout(timeout);
      }
    } catch (err) {
      const errorMsg = err instanceof Error ? err.message : String(err);
      console.error(
        `[${new Date().toISOString()}] Acme AI failed for task ${request.taskId}: ${errorMsg}`,
      );

      this.emitEvent({
        type: "execution.failed",
        taskId: request.taskId,
        agentId: request.agentId,
        timestamp: new Date().toISOString(),
        data: { error: errorMsg },
      });

      return {
        status: "error",
        error: errorMsg,
        durationMs: Date.now() - startedAt,
        errorCode: errorMsg.includes("abort") ? "cancelled" : undefined,
      };
    } finally {
      this.activeAbortControllers.delete(request.taskId);
    }
  }

  // -- Cancel --

  async cancel(taskHandle: string): Promise<boolean> {
    const controller = this.activeAbortControllers.get(taskHandle);
    if (controller) {
      controller.abort();
      this.activeAbortControllers.delete(taskHandle);
      return true;
    }
    return false;
  }

  // -- Events --

  onEvent(listener: ExecutionEventListener): void {
    this.eventListeners.add(listener);
  }

  offEvent(listener: ExecutionEventListener): void {
    this.eventListeners.delete(listener);
  }

  private emitEvent(event: ExecutionEvent): void {
    for (const listener of this.eventListeners) {
      try {
        listener(event);
      } catch {
        // Don't let listener errors break execution
      }
    }
  }
}
Create src/index.ts (barrel export):
export { AcmeAIProvider } from "./provider.js";

Step 3: Register in the Adapter Loader

Add your adapter to the switch in packages/bridge/src/adapter-loader.ts:
case "acme-ai": {
  const mod = await import(/* @vite-ignore */ adapterDir("acme-ai"));
  return new mod.AcmeAIProvider() as ExecutionAdapter;
}

Step 4: Add to the Runtime Providers List

Add "acme-ai" to the RUNTIME_PROVIDERS array in packages/shared/src/runtime-types.ts:
export const RUNTIME_PROVIDERS = [
  "openclaw",
  "claude-agent-sdk",
  "vercel-ai",
  "langgraph",
  "convex-native",
  "gloo-ai",
  "hermes-agent",
  "acme-ai",     // <-- add here
  "custom",
] as const;

Step 5: Configure and Run

Set the environment variable and start Bridge:
BRIDGE_ADAPTERS=acme-ai ACME_AI_API_KEY=sk-... pnpm dev
Or run alongside other adapters in multi-adapter mode:
BRIDGE_ADAPTERS=claude-agent-sdk,acme-ai pnpm dev

Existing Adapters

Forge ships with five adapters. Use these as reference implementations when building your own.
AdapterPackageRuntimeCapabilitiesCost TierBest For
OpenClawadapter-openclawStateful daemon with WebSocket relaystreaming, session_resume, file_io, sandboxing, subagents, managementMediumFull-featured agent execution with persistent state, workspace files, and agent CRUD
Claude Agent SDKadapter-claude-sdkAnthropic Claude Agent SDKstreaming, session_resume, file_io, mcp_servers, subagents, cost_trackingHighClaude-native execution with tool use, subagent orchestration, and conversation capture
Vercel AIadapter-vercel-aiVercel AI SDK (multi-provider)streaming, mcp_servers, cost_tracking, structured_outputLowMulti-provider model routing (Anthropic, OpenAI, Google), structured output, high-concurrency serverless
Gloo AIadapter-gloo-aiGloo AI Completions v2streaming, cost_trackingLowIntelligent model routing, grounded completions (RAG), tradition-aware responses
Hermes Agentadapter-hermes-agentHermes Agent runtimestreaming, file_io, mcp_servers, management, native_toolsMediumFull management interface with rich native tool set (terminal, browser, vision, memory)

Interfaces Implemented

Most adapters implement only ExecutionAdapter. Stateful runtimes that manage agent configurations additionally implement management sub-interfaces:
InterfacePurposeImplemented by
ExecutionAdapterCore execution contractAll adapters
RuntimeManagerAgent sync, skill sync, model discoveryOpenClaw
AgentManagerAgent CRUD, workspace files, channel bindingsOpenClaw
SkillManagerSkill CRUD, runtime status, dependency installationOpenClaw
SystemManagerConfig reload, model catalog, security postureOpenClaw
Bridge detects these interfaces via type guards (isAgentManager(), isSkillManager(), isSystemManager()) and exposes the corresponding gateway API routes only when the loaded adapter supports them. If your runtime manages agent state, implement the relevant sub-interfaces.

Optional: Management Sub-Interfaces

If your runtime stores agent configurations and supports management operations, you can implement the RuntimeManager interface alongside ExecutionAdapter:
import type {
  ExecutionAdapter,
  RuntimeManager,
  AgentSyncSpec,
  DiscoveredAgent,
  DiscoveredSkill,
  DiscoveredModel,
  SkillSyncSpec,
} from "@tangogroup/bridge";

export class AcmeAIProvider implements ExecutionAdapter, RuntimeManager {
  // ... ExecutionAdapter methods ...

  async pushAgent(spec: AgentSyncSpec): Promise<void> { /* ... */ }
  async removeAgent(agentId: string): Promise<void> { /* ... */ }
  async discoverAgents(): Promise<DiscoveredAgent[]> { /* ... */ }
  async pushFile(agentId: string, filename: string, content: string): Promise<void> { /* ... */ }
  async pushSkill(spec: SkillSyncSpec): Promise<void> { /* ... */ }
  async removeSkill(skillKey: string): Promise<void> { /* ... */ }
  async discoverSkills(): Promise<DiscoveredSkill[]> { /* ... */ }
  async discoverModels(): Promise<DiscoveredModel[]> { /* ... */ }
}
Most adapters do not need management interfaces. Stateless runtimes like Claude Agent SDK and Vercel AI define agents at execution time — no persistent state to manage.

Testing

Adapters are tested with Vitest. Tests go in src/__tests__/provider.test.ts within the adapter package.

What to Test

AreaWhat to verify
Identityprovider returns the correct string
Capabilitiescapabilities() returns the expected profile
HealthcheckHealth() returns "degraded" without config, "healthy" with config
Cancellationcancel() returns false for unknown tasks, true for active tasks
Shutdownshutdown() aborts all active controllers and clears state
Event listenersonEvent()/offEvent() correctly register and unregister listeners
Error isolationA throwing event listener does not break emitEvent() for other listeners

Example Test

import type { ExecutionEvent } from "@tangogroup/bridge";
import { afterEach, describe, expect, it, vi } from "vitest";
import { AcmeAIProvider } from "../provider.js";

describe("AcmeAIProvider", () => {
  afterEach(() => {
    vi.restoreAllMocks();
  });

  it("exposes provider identity", () => {
    const provider = new AcmeAIProvider();
    expect(provider.provider).toBe("acme-ai");
  });

  it("returns expected capabilities", () => {
    const provider = new AcmeAIProvider();
    expect(provider.capabilities()).toEqual({
      capabilities: ["streaming", "cost_tracking"],
      nativeTools: [],
      maxConcurrent: 20,
      outputStrategies: ["stream"],
      costTier: "low",
      latencyProfile: "fast",
    });
  });

  it("reports degraded health without API key", async () => {
    const provider = new AcmeAIProvider();
    await expect(provider.checkHealth()).resolves.toBe("degraded");
  });

  it("cancels active task and returns true", async () => {
    const provider = new AcmeAIProvider();
    // Access the private Map via Reflect to inject a controller
    const controllers = Reflect.get(provider, "activeAbortControllers") as Map<
      string,
      AbortController
    >;
    const controller = new AbortController();
    controllers.set("task-42", controller);

    await expect(provider.cancel("task-42")).resolves.toBe(true);
    expect(controller.signal.aborted).toBe(true);
    expect(controllers.has("task-42")).toBe(false);
  });

  it("returns false when cancelling unknown task", async () => {
    const provider = new AcmeAIProvider();
    await expect(provider.cancel("nonexistent")).resolves.toBe(false);
  });

  it("swallows listener errors without breaking other listeners", () => {
    const provider = new AcmeAIProvider();
    const events: ExecutionEvent[] = [];

    provider.onEvent(() => { throw new Error("boom"); });
    provider.onEvent((event) => { events.push(event); });

    const emitEvent = Reflect.get(provider, "emitEvent").bind(provider) as (
      event: ExecutionEvent,
    ) => void;

    expect(() =>
      emitEvent({
        type: "execution.completed",
        taskId: "t1",
        agentId: "a1",
        timestamp: new Date().toISOString(),
      }),
    ).not.toThrow();

    expect(events).toHaveLength(1);
  });
});
Run tests:
cd packages/adapter-acme-ai
pnpm test

Checklist

Before shipping a new adapter:
  • Implements all required ExecutionAdapter methods
  • provider string is unique and added to RUNTIME_PROVIDERS
  • capabilities() accurately reflects what the runtime supports
  • initialize() validates configuration and logs startup
  • shutdown() cancels all active tasks and cleans up
  • checkHealth() returns meaningful status based on actual connectivity
  • execute() handles timeouts via AbortController
  • execute() returns NormalizedUsage with token counts when available
  • cancel() handles both known and unknown task handles
  • Error paths return status: "error" with a descriptive error field
  • Event listeners are isolated — a throwing listener never breaks execution
  • Adapter registered in packages/bridge/src/adapter-loader.ts
  • Provider name added to RUNTIME_PROVIDERS in packages/shared/src/runtime-types.ts
  • Barrel export in src/index.ts
  • Package added to pnpm-workspace.yaml if needed
  • @tangogroup/bridge listed as a dependency in package.json
  • Unit tests for identity, capabilities, health, cancellation, shutdown, event listeners
  • Tests pass with pnpm test
  • Verified end-to-end with BRIDGE_ADAPTERS=acme-ai pnpm dev
See also: Bridge & Adapters Architecture | Gateways | Workflow Execution Flow