Cluster Transport
The Cluster transport sends requests to multiple transports in parallel and requires M-of-N responses to agree (quorum) before returning.
Installation
npm i @rpckit/clusterYou'll also need at least one base transport:
npm i @rpckit/websocketBasic Usage
import { webSocket } from '@rpckit/websocket'
import { cluster } from '@rpckit/cluster'
const transport = cluster([
webSocket('wss://node1.example.com'),
webSocket('wss://node2.example.com'),
webSocket('wss://node3.example.com')
], { quorum: 2 })
// Waits for 2-of-3 nodes to return the same result
const result = await transport.request('blockchain.transaction.get', txid)Configuration
Options
const transport = cluster(transports, {
quorum: 2, // Required number of matching responses
timeout: 15000 // Max time to wait for quorum
})| Option | Type | Default | Description |
|---|---|---|---|
quorum | number | required | Minimum matching responses needed |
timeout | number | 10000 | Timeout in milliseconds |
Quorum Rules
quorummust be at least 1quorummust not exceed the number of transports- Responses are compared using deep equality
How It Works
- Request is sent to all transports in parallel
- As responses arrive, they're grouped by value (deep equality)
- When any group reaches
quorumsize, that value is returned - If timeout occurs before quorum, an error is thrown
Request: getBalance(addr)
Node1 → { confirmed: 100 } ─┐
Node2 → { confirmed: 100 } ─┼─→ Quorum reached (2 matching) → Return { confirmed: 100 }
Node3 → { confirmed: 100 } ─┘ (Node3 response ignored, already resolved)
Observability
onResponse
Monitor individual transport responses:
transport.onResponse((info) => {
console.log(`${info.transport}: ${info.status}`)
if (info.status === 'success') {
console.log('Result:', info.response)
} else {
console.log('Error:', info.error)
}
})Extended Interface
interface ClusterTransport<S extends Schema> extends Transport<S> {
transports: Transport<S>[]
onResponse(callback: (info: TransportResponse) => void): Unsubscribe
}Single Transport Optimization
When given a single transport, cluster() returns it unwrapped:
const single = cluster([webSocket('wss://example.com')], { quorum: 1 })
// single is the WebSocketTransport, not wrapped in ClusterTransportExample: Byzantine Fault Tolerance
For a system that can tolerate f faulty nodes, use 3f + 1 nodes with quorum 2f + 1:
import { webSocket } from '@rpckit/websocket'
import { cluster } from '@rpckit/cluster'
// Tolerate 1 faulty node: need 4 nodes, quorum of 3
const transport = cluster([
webSocket('wss://node1.example.com'),
webSocket('wss://node2.example.com'),
webSocket('wss://node3.example.com'),
webSocket('wss://node4.example.com')
], { quorum: 3 })
// Even if one node returns incorrect data, the correct result wins
const balance = await transport.request('getBalance', address)Example: Cross-Validation
Validate responses across different server implementations:
import { webSocket } from '@rpckit/websocket'
import { http } from '@rpckit/http'
import { cluster } from '@rpckit/cluster'
const transport = cluster([
webSocket('wss://electrum-cash-server.com'),
http('https://fulcrum-server.com/rpc'),
webSocket('wss://other-server.com')
], { quorum: 2 })
// Request is validated across different implementations
const tx = await transport.request('blockchain.transaction.get', txid)Subscriptions
Cluster transport supports subscriptions, but note that:
- Subscription is established on all transports
- Notifications from any transport trigger the callback
- There's no quorum check for notifications (first notification wins)
const unsubscribe = await transport.subscribe(
'blockchain.headers.subscribe',
(header) => {
// Called when ANY node sends a notification
console.log('New block:', header.height)
}
)For strict validation of subscription data, implement your own aggregation logic.