WebSocket Transport
The WebSocket transport provides full-duplex communication with JSON-RPC servers, including support for subscriptions.
Installation
npm i @rpckit/websocketBasic Usage
import { webSocket } from '@rpckit/websocket'
const transport = webSocket('wss://example.com:50004')
await transport.connect()
const result = await transport.request('server.version', 'client', '1.4')
await transport.close()Configuration
URL String
const transport = webSocket('wss://example.com:50004')Configuration Object
const transport = webSocket({
url: 'wss://example.com:50004',
timeout: 10000,
batch: { wait: 10, batchSize: 50 },
keepAlive: { interval: 30000, method: 'server.ping' },
reconnect: { delay: 1000, attempts: 5 },
headers: { 'Authorization': 'Bearer token' }
})Options
| Option | Type | Default | Description |
|---|---|---|---|
url | string | - | WebSocket URL (ws:// or wss://) |
timeout | number | 30000 | Request timeout in milliseconds |
connectTimeout | number | - | Connection timeout in milliseconds |
batch | BatchConfig | false | { batchSize: 100 } | Batching configuration |
keepAlive | KeepAliveConfig | - | Keep-alive ping configuration |
reconnect | { delay, attempts } | - | Auto-reconnect after disconnect |
headers | Record<string, string> | - | Headers for WebSocket handshake |
handshake | HandshakeConfig | - | Custom handshake after connection |
Keep-Alive
Send periodic pings to keep the connection alive:
const transport = webSocket('wss://example.com', {
keepAlive: {
interval: 30000, // Ping every 30 seconds
method: 'server.ping', // RPC method to call
params: [] // Optional params
}
})Handshake
Execute a custom handshake after connection:
const transport = webSocket('wss://example.com', {
handshake: {
method: 'server.version',
params: ['my-client', '1.4']
}
})Unsubscribe Callback
Configure how subscriptions are cleaned up on the server:
const transport = webSocket('wss://example.com', {
onUnsubscribe: ({ request, method, params }) => {
// Derive unsubscribe method from subscribe method
return request(method.replace('subscribe', 'unsubscribe'), ...params)
}
})Without onUnsubscribe, calling unsub() only removes the local listener. With it, the transport also notifies the server.
Electrum Cash Variant
For Electrum Cash servers, use the electrum-cash subpath which pre-configures protocol-specific defaults:
import { webSocket } from '@rpckit/websocket/electrum-cash'
const transport = webSocket('wss://electrum.example.com:50004', {
keepAlive: 30000, // Uses server.ping automatically
clientName: 'myapp', // Client name in handshake (default: 'rpckit')
protocolVersion: '1.6', // Default
})
// server.version handshake is sent automatically
// onUnsubscribe derives method from subscribe methodEthereum Variant
For Ethereum JSON-RPC nodes, use the ethereum subpath which handles eth_subscription notification routing:
import { webSocket } from '@rpckit/websocket/ethereum'
const transport = webSocket('wss://ethereum-rpc.publicnode.com')
// Standard requests
const blockNumber = await transport.request('eth_blockNumber')
// Subscriptions - notifications routed by subscription ID automatically
const unsub = await transport.subscribe('eth_subscribe', 'newHeads', (header) => {
console.log('New block:', header.number)
})
// eth_unsubscribe called automatically
await unsub()The Ethereum variant automatically:
- Routes
eth_subscriptionnotifications to the correct callback by subscription ID - Calls
eth_unsubscribeon cleanup - Suppresses subscription IDs from callbacks (handled internally)
Subscriptions
Subscribe to server notifications:
const unsubscribe = await transport.subscribe(
'events.subscribe',
'channel-1',
(data) => {
console.log('Event:', data)
}
)
// Later, unsubscribe
await unsubscribe()Subscription Behavior
- The subscribe method sends the subscription request
- The initial response is delivered to the callback
- Subsequent notifications for the same subscription are delivered to the callback
- Calling
unsubscribe()invokes theonUnsubscribecallback if configured
Subscription Sharing
Multiple callers subscribing to the same method+params share a single server subscription. New subscribers receive the most recent notification data (not stale initial data). The server unsubscribe is only sent when the last listener unsubscribes.
// Both callbacks share one server subscription
const unsub1 = await transport.subscribe('events', callback1)
const unsub2 = await transport.subscribe('events', callback2)
await unsub1() // callback1 removed, server subscription stays active
await unsub2() // callback2 removed, NOW server unsubscribe is sentAutomatic Resubscription
Subscriptions are automatically restored after reconnection. The transport tracks active subscriptions and re-sends them when the connection is re-established.
Extended Interface
The WebSocket transport extends the base Transport interface:
interface WebSocketTransport<S extends Schema> extends Transport<S> {
getSocket(): WebSocket | null
getSocketAsync(): Promise<WebSocket>
}getSocket()
Returns the current WebSocket instance, or null if not connected:
const socket = transport.getSocket()
if (socket) {
console.log('Ready state:', socket.readyState)
}getSocketAsync()
Waits for connection and returns the WebSocket:
const socket = await transport.getSocketAsync()
console.log('Connected!')Connection Pooling
Transports with identical configuration share the same WebSocket connection:
const t1 = webSocket('wss://example.com')
const t2 = webSocket('wss://example.com')
// t1 and t2 share the same underlying WebSocket
// The connection is reference-counted and closed when all references are closedExample: Full Application
import { webSocket } from '@rpckit/websocket/electrum-cash'
const transport = webSocket({
url: 'wss://electrum.example.com:50004',
keepAlive: 30000,
reconnect: {
delay: 1000,
attempts: 10,
}
})
// Connect (handshake sent automatically)
await transport.connect()
// Make typed requests
const tip = await transport.request('blockchain.headers.get_tip')
console.log(`Block height: ${tip.height}`)
// Subscribe to address updates (unsubscribe method derived automatically)
const unsubscribe = await transport.subscribe(
'blockchain.address.subscribe',
address,
(status) => {
console.log('Address status changed:', status)
}
)
// Keep running until shutdown
process.on('SIGINT', async () => {
await unsubscribe()
await transport.close()
})