AsyncAPI & Event-Driven APIs
When REST is not enough: Kafka, WebSockets, SSE, and the AsyncAPI 3.0 specification
REST vs Event-Driven: When to Choose
REST is a request/response protocol — the client asks, the server answers. Event-driven communication inverts this: the server pushes data to clients when something happens. Neither is universally better; the right choice depends on your use case.
| Scenario | Best Choice | Why |
|---|---|---|
| CRUD operations on resources | REST | Simple, cacheable, universal tooling |
| Real-time chat / collaboration | WebSocket | Bidirectional, low latency |
| Live dashboards / feeds | SSE | One-way server push, simpler than WebSocket |
| High-throughput microservices | Kafka / message queue | Decoupled, durable, scalable |
| Webhook notifications | REST (push) or Webhooks | Simple, consumer controls the endpoint |
| Mobile apps (intermittent connection) | REST + polling | Reconnect-friendly |
What is AsyncAPI?
AsyncAPI is the "OpenAPI for event-driven APIs" — a specification for documenting message-driven interfaces including Kafka topics, WebSocket channels, MQTT topics, and AMQP queues. AsyncAPI 3.0 (released 2024) is the current stable version.
Just as OpenAPI 3.1 describes REST endpoints, AsyncAPI describes channels (topics/queues), messages (payloads), and operations (publish/subscribe).
AsyncAPI 3.0 Document Structure
asyncapi: "3.0.0"
info:
title: Order Events API
version: "1.0.0"
description: Events emitted by the order service
defaultContentType: application/json
servers:
production:
host: kafka.example.com:9092
protocol: kafka
description: Production Kafka broker
channels:
OrderCreated:
address: orders.created
description: Published when a new order is placed
messages:
OrderCreatedMessage:
$ref: '#/components/messages/OrderCreated'
OrderShipped:
address: orders.shipped
messages:
OrderShippedMessage:
$ref: '#/components/messages/OrderShipped'
operations:
PublishOrderCreated:
action: send
channel:
$ref: '#/channels/OrderCreated'
description: Order service publishes this when an order is confirmed
SubscribeOrderShipped:
action: receive
channel:
$ref: '#/channels/OrderShipped'
description: Fulfillment service subscribes to ship notifications
components:
messages:
OrderCreated:
payload:
type: object
required: [order_id, customer_id, total_cents, items]
properties:
order_id: { type: string, format: uuid }
customer_id: { type: string, format: uuid }
total_cents: { type: integer, minimum: 0 }
currency: { type: string, default: USD }
items:
type: array
items:
type: object
properties:
product_id: { type: string, format: uuid }
quantity: { type: integer, minimum: 1 }
headers:
type: object
properties:
correlation_id: { type: string, format: uuid }
event_version: { type: string, default: "1.0" }
AsyncAPI vs OpenAPI
| Property | OpenAPI 3.1 | AsyncAPI 3.0 |
|---|---|---|
| Protocol | HTTP/REST | Kafka, WebSocket, MQTT, AMQP, SSE |
| Interaction model | Request / Response | Publish / Subscribe, Send / Receive |
| Schema language | JSON Schema 2020-12 | JSON Schema 2020-12 (same!) |
| Servers | HTTP base URLs | Brokers (Kafka, MQTT broker, etc.) |
| Operations | HTTP methods on paths | send / receive on channels |
| Code generation | OpenAPI Generator | AsyncAPI Generator |
Kafka + AsyncAPI: Node.js Producer/Consumer
const { Kafka } = require('kafkajs');
const Ajv = require('ajv');
const kafka = new Kafka({ clientId: 'order-service', brokers: ['kafka:9092'] });
const producer = kafka.producer();
const ajv = new Ajv();
// Schema from AsyncAPI components.messages.OrderCreated.payload
const orderCreatedSchema = {
type: 'object',
required: ['order_id', 'customer_id', 'total_cents', 'items'],
properties: {
order_id: { type: 'string', format: 'uuid' },
customer_id: { type: 'string', format: 'uuid' },
total_cents: { type: 'integer', minimum: 0 },
items: { type: 'array', items: { type: 'object' } }
}
};
const validateOrderCreated = ajv.compile(orderCreatedSchema);
// Producer: publish validated event
async function publishOrderCreated(orderData) {
if (!validateOrderCreated(orderData)) {
throw new Error('Invalid order event: ' + JSON.stringify(validateOrderCreated.errors));
}
await producer.connect();
await producer.send({
topic: 'orders.created',
messages: [{
key: orderData.order_id,
value: JSON.stringify(orderData),
headers: {
'correlation-id': crypto.randomUUID(),
'event-version': '1.0',
'content-type': 'application/json'
}
}]
});
}
// Consumer: subscribe and process
const consumer = kafka.consumer({ groupId: 'fulfillment-service' });
async function startConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders.created', fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
// Idempotency check — Kafka may redeliver messages
const alreadyProcessed = await idempotencyStore.has(event.order_id);
if (alreadyProcessed) return;
await fulfillOrder(event);
await idempotencyStore.add(event.order_id);
}
});
}
WebSocket API Design
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
// Connection lifecycle
wss.on('connection', (ws, req) => {
const userId = authenticate(req); // extract from ?token= or cookie
if (!userId) return ws.close(4001, 'Unauthorized');
console.log('Client connected:', userId);
// Heartbeat to detect stale connections
ws.isAlive = true;
ws.on('pong', () => { ws.isAlive = true; });
// Message handler — define a message schema
ws.on('message', (data) => {
let msg;
try { msg = JSON.parse(data); }
catch { return ws.send(JSON.stringify({ error: 'Invalid JSON' })); }
// Dispatch by message type
switch (msg.type) {
case 'subscribe':
subscribeToChannel(ws, msg.channel, userId);
break;
case 'unsubscribe':
unsubscribeFromChannel(ws, msg.channel);
break;
default:
ws.send(JSON.stringify({ error: 'Unknown message type', received: msg.type }));
}
});
ws.on('close', () => cleanup(userId));
});
// Heartbeat interval — close stale connections
setInterval(() => {
wss.clients.forEach(ws => {
if (!ws.isAlive) return ws.terminate();
ws.isAlive = false;
ws.ping();
});
}, 30_000);
Server-Sent Events (SSE) — Simpler One-Way Push
For one-way server-to-client streams (live feeds, progress bars, notifications), SSE is simpler than WebSocket — it uses plain HTTP, works through proxies, and auto-reconnects:
app.get('/events', auth, (req, res) => {
// SSE headers
res.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no' // disable Nginx buffering
});
res.flushHeaders();
// Send event
function sendEvent(event, data) {
res.write(`event: ${event}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
}
// Subscribe to changes
const unsubscribe = eventBus.subscribe(req.user.id, (event) => {
sendEvent(event.type, event.data);
});
// Heartbeat (prevent proxy timeouts)
const heartbeat = setInterval(() => res.write(':heartbeat\n\n'), 15_000);
req.on('close', () => {
unsubscribe();
clearInterval(heartbeat);
});
});
// Client (browser):
// const es = new EventSource('/events', { withCredentials: true });
// es.addEventListener('order.shipped', e => console.log(JSON.parse(e.data)));
AsyncAPI Tooling
| Tool | Purpose |
|---|---|
| AsyncAPI Studio | Visual editor for AsyncAPI documents |
| @asyncapi/generator | Code generation: Node.js, Python, Java, docs |
| @asyncapi/parser | Parse and validate AsyncAPI documents programmatically |
| Microcks | Mock event-driven APIs for testing |