AsyncAPI & Event-Driven APIs

When REST is not enough: Kafka, WebSockets, SSE, and the AsyncAPI 3.0 specification

Last Updated:

REST vs Event-Driven: When to Choose

REST is a request/response protocol — the client asks, the server answers. Event-driven communication inverts this: the server pushes data to clients when something happens. Neither is universally better; the right choice depends on your use case.

ScenarioBest ChoiceWhy
CRUD operations on resourcesRESTSimple, cacheable, universal tooling
Real-time chat / collaborationWebSocketBidirectional, low latency
Live dashboards / feedsSSEOne-way server push, simpler than WebSocket
High-throughput microservicesKafka / message queueDecoupled, durable, scalable
Webhook notificationsREST (push) or WebhooksSimple, consumer controls the endpoint
Mobile apps (intermittent connection)REST + pollingReconnect-friendly

What is AsyncAPI?

AsyncAPI is the "OpenAPI for event-driven APIs" — a specification for documenting message-driven interfaces including Kafka topics, WebSocket channels, MQTT topics, and AMQP queues. AsyncAPI 3.0 (released 2024) is the current stable version.

Just as OpenAPI 3.1 describes REST endpoints, AsyncAPI describes channels (topics/queues), messages (payloads), and operations (publish/subscribe).

AsyncAPI 3.0 Document Structure

asyncapi: "3.0.0"
info:
  title: Order Events API
  version: "1.0.0"
  description: Events emitted by the order service

defaultContentType: application/json

servers:
  production:
    host: kafka.example.com:9092
    protocol: kafka
    description: Production Kafka broker

channels:
  OrderCreated:
    address: orders.created
    description: Published when a new order is placed
    messages:
      OrderCreatedMessage:
        $ref: '#/components/messages/OrderCreated'

  OrderShipped:
    address: orders.shipped
    messages:
      OrderShippedMessage:
        $ref: '#/components/messages/OrderShipped'

operations:
  PublishOrderCreated:
    action: send
    channel:
      $ref: '#/channels/OrderCreated'
    description: Order service publishes this when an order is confirmed

  SubscribeOrderShipped:
    action: receive
    channel:
      $ref: '#/channels/OrderShipped'
    description: Fulfillment service subscribes to ship notifications

components:
  messages:
    OrderCreated:
      payload:
        type: object
        required: [order_id, customer_id, total_cents, items]
        properties:
          order_id:    { type: string, format: uuid }
          customer_id: { type: string, format: uuid }
          total_cents: { type: integer, minimum: 0 }
          currency:    { type: string, default: USD }
          items:
            type: array
            items:
              type: object
              properties:
                product_id: { type: string, format: uuid }
                quantity:   { type: integer, minimum: 1 }
      headers:
        type: object
        properties:
          correlation_id: { type: string, format: uuid }
          event_version:  { type: string, default: "1.0" }

AsyncAPI vs OpenAPI

PropertyOpenAPI 3.1AsyncAPI 3.0
ProtocolHTTP/RESTKafka, WebSocket, MQTT, AMQP, SSE
Interaction modelRequest / ResponsePublish / Subscribe, Send / Receive
Schema languageJSON Schema 2020-12JSON Schema 2020-12 (same!)
ServersHTTP base URLsBrokers (Kafka, MQTT broker, etc.)
OperationsHTTP methods on pathssend / receive on channels
Code generationOpenAPI GeneratorAsyncAPI Generator

Kafka + AsyncAPI: Node.js Producer/Consumer

const { Kafka } = require('kafkajs');
const Ajv = require('ajv');

const kafka = new Kafka({ clientId: 'order-service', brokers: ['kafka:9092'] });
const producer = kafka.producer();
const ajv = new Ajv();

// Schema from AsyncAPI components.messages.OrderCreated.payload
const orderCreatedSchema = {
  type: 'object',
  required: ['order_id', 'customer_id', 'total_cents', 'items'],
  properties: {
    order_id:    { type: 'string', format: 'uuid' },
    customer_id: { type: 'string', format: 'uuid' },
    total_cents: { type: 'integer', minimum: 0 },
    items:       { type: 'array', items: { type: 'object' } }
  }
};
const validateOrderCreated = ajv.compile(orderCreatedSchema);

// Producer: publish validated event
async function publishOrderCreated(orderData) {
  if (!validateOrderCreated(orderData)) {
    throw new Error('Invalid order event: ' + JSON.stringify(validateOrderCreated.errors));
  }

  await producer.connect();
  await producer.send({
    topic: 'orders.created',
    messages: [{
      key: orderData.order_id,
      value: JSON.stringify(orderData),
      headers: {
        'correlation-id': crypto.randomUUID(),
        'event-version': '1.0',
        'content-type': 'application/json'
      }
    }]
  });
}

// Consumer: subscribe and process
const consumer = kafka.consumer({ groupId: 'fulfillment-service' });

async function startConsumer() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'orders.created', fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value.toString());

      // Idempotency check — Kafka may redeliver messages
      const alreadyProcessed = await idempotencyStore.has(event.order_id);
      if (alreadyProcessed) return;

      await fulfillOrder(event);
      await idempotencyStore.add(event.order_id);
    }
  });
}

WebSocket API Design

const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

// Connection lifecycle
wss.on('connection', (ws, req) => {
  const userId = authenticate(req);  // extract from ?token= or cookie
  if (!userId) return ws.close(4001, 'Unauthorized');

  console.log('Client connected:', userId);

  // Heartbeat to detect stale connections
  ws.isAlive = true;
  ws.on('pong', () => { ws.isAlive = true; });

  // Message handler — define a message schema
  ws.on('message', (data) => {
    let msg;
    try { msg = JSON.parse(data); }
    catch { return ws.send(JSON.stringify({ error: 'Invalid JSON' })); }

    // Dispatch by message type
    switch (msg.type) {
      case 'subscribe':
        subscribeToChannel(ws, msg.channel, userId);
        break;
      case 'unsubscribe':
        unsubscribeFromChannel(ws, msg.channel);
        break;
      default:
        ws.send(JSON.stringify({ error: 'Unknown message type', received: msg.type }));
    }
  });

  ws.on('close', () => cleanup(userId));
});

// Heartbeat interval — close stale connections
setInterval(() => {
  wss.clients.forEach(ws => {
    if (!ws.isAlive) return ws.terminate();
    ws.isAlive = false;
    ws.ping();
  });
}, 30_000);

Server-Sent Events (SSE) — Simpler One-Way Push

For one-way server-to-client streams (live feeds, progress bars, notifications), SSE is simpler than WebSocket — it uses plain HTTP, works through proxies, and auto-reconnects:

app.get('/events', auth, (req, res) => {
  // SSE headers
  res.set({
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no'  // disable Nginx buffering
  });
  res.flushHeaders();

  // Send event
  function sendEvent(event, data) {
    res.write(`event: ${event}\n`);
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  }

  // Subscribe to changes
  const unsubscribe = eventBus.subscribe(req.user.id, (event) => {
    sendEvent(event.type, event.data);
  });

  // Heartbeat (prevent proxy timeouts)
  const heartbeat = setInterval(() => res.write(':heartbeat\n\n'), 15_000);

  req.on('close', () => {
    unsubscribe();
    clearInterval(heartbeat);
  });
});

// Client (browser):
// const es = new EventSource('/events', { withCredentials: true });
// es.addEventListener('order.shipped', e => console.log(JSON.parse(e.data)));

AsyncAPI Tooling

ToolPurpose
AsyncAPI StudioVisual editor for AsyncAPI documents
@asyncapi/generatorCode generation: Node.js, Python, Java, docs
@asyncapi/parserParse and validate AsyncAPI documents programmatically
MicrocksMock event-driven APIs for testing