Skip to content
Logo

Cluster Transport

The Cluster transport sends requests to multiple transports in parallel and requires M-of-N responses to agree (quorum) before returning.

Installation

npm i @rpckit/cluster

You'll also need at least one base transport:

npm i @rpckit/websocket

Basic Usage

import { webSocket } from '@rpckit/websocket'
import { cluster } from '@rpckit/cluster'
 
const transport = cluster([
  webSocket('wss://node1.example.com'),
  webSocket('wss://node2.example.com'),
  webSocket('wss://node3.example.com')
], { quorum: 2 })
 
// Waits for 2-of-3 nodes to return the same result
const result = await transport.request('blockchain.transaction.get', txid)

Configuration

Options

const transport = cluster(transports, {
  quorum: 2,       // Required number of matching responses
  timeout: 15000   // Max time to wait for quorum
})
OptionTypeDefaultDescription
quorumnumberrequiredMinimum matching responses needed
timeoutnumber10000Timeout in milliseconds

Quorum Rules

  • quorum must be at least 1
  • quorum must not exceed the number of transports
  • Responses are compared using deep equality

How It Works

  1. Request is sent to all transports in parallel
  2. As responses arrive, they're grouped by value (deep equality)
  3. When any group reaches quorum size, that value is returned
  4. If timeout occurs before quorum, an error is thrown
Request: getBalance(addr)

Node1 → { confirmed: 100 }  ─┐
Node2 → { confirmed: 100 }  ─┼─→ Quorum reached (2 matching) → Return { confirmed: 100 }
Node3 → { confirmed: 100 }  ─┘   (Node3 response ignored, already resolved)

Observability

onResponse

Monitor individual transport responses:

transport.onResponse((info) => {
  console.log(`${info.transport}: ${info.status}`)
  if (info.status === 'success') {
    console.log('Result:', info.response)
  } else {
    console.log('Error:', info.error)
  }
})

Extended Interface

interface ClusterTransport<S extends Schema> extends Transport<S> {
  transports: Transport<S>[]
  onResponse(callback: (info: TransportResponse) => void): Unsubscribe
}

Single Transport Optimization

When given a single transport, cluster() returns it unwrapped:

const single = cluster([webSocket('wss://example.com')], { quorum: 1 })
// single is the WebSocketTransport, not wrapped in ClusterTransport

Example: Byzantine Fault Tolerance

For a system that can tolerate f faulty nodes, use 3f + 1 nodes with quorum 2f + 1:

import { webSocket } from '@rpckit/websocket'
import { cluster } from '@rpckit/cluster'
 
// Tolerate 1 faulty node: need 4 nodes, quorum of 3
const transport = cluster([
  webSocket('wss://node1.example.com'),
  webSocket('wss://node2.example.com'),
  webSocket('wss://node3.example.com'),
  webSocket('wss://node4.example.com')
], { quorum: 3 })
 
// Even if one node returns incorrect data, the correct result wins
const balance = await transport.request('getBalance', address)

Example: Cross-Validation

Validate responses across different server implementations:

import { webSocket } from '@rpckit/websocket'
import { http } from '@rpckit/http'
import { cluster } from '@rpckit/cluster'
 
const transport = cluster([
  webSocket('wss://electrum-cash-server.com'),
  http('https://fulcrum-server.com/rpc'),
  webSocket('wss://other-server.com')
], { quorum: 2 })
 
// Request is validated across different implementations
const tx = await transport.request('blockchain.transaction.get', txid)

Subscriptions

Cluster transport supports subscriptions, but note that:

  1. Subscription is established on all transports
  2. Notifications from any transport trigger the callback
  3. There's no quorum check for notifications (first notification wins)
const unsubscribe = await transport.subscribe(
  'blockchain.headers.subscribe',
  (header) => {
    // Called when ANY node sends a notification
    console.log('New block:', header.height)
  }
)

For strict validation of subscription data, implement your own aggregation logic.