Skip to content
Logo

WebSocket Transport

The WebSocket transport provides full-duplex communication with JSON-RPC servers, including support for subscriptions.

Installation

npm i @rpckit/websocket

Basic Usage

import { webSocket } from '@rpckit/websocket'
 
const transport = webSocket('wss://example.com:50004')
 
await transport.connect()
const result = await transport.request('server.version', 'client', '1.4')
await transport.close()

Configuration

URL String

const transport = webSocket('wss://example.com:50004')

Configuration Object

const transport = webSocket({
  url: 'wss://example.com:50004',
  timeout: 10000,
  batch: { wait: 10, batchSize: 50 },
  keepAlive: { interval: 30000, method: 'server.ping' },
  reconnect: { delay: 1000, attempts: 5 },
  headers: { 'Authorization': 'Bearer token' }
})

Options

OptionTypeDefaultDescription
urlstring-WebSocket URL (ws:// or wss://)
timeoutnumber30000Request timeout in milliseconds
connectTimeoutnumber-Connection timeout in milliseconds
batchBatchConfig | false{ batchSize: 100 }Batching configuration
keepAliveKeepAliveConfig-Keep-alive ping configuration
reconnect{ delay, attempts }-Auto-reconnect after disconnect
headersRecord<string, string>-Headers for WebSocket handshake
handshakeHandshakeConfig-Custom handshake after connection

Keep-Alive

Send periodic pings to keep the connection alive:

const transport = webSocket('wss://example.com', {
  keepAlive: {
    interval: 30000,        // Ping every 30 seconds
    method: 'server.ping',  // RPC method to call
    params: []              // Optional params
  }
})

Handshake

Execute a custom handshake after connection:

const transport = webSocket('wss://example.com', {
  handshake: {
    method: 'server.version',
    params: ['my-client', '1.4']
  }
})

Unsubscribe Callback

Configure how subscriptions are cleaned up on the server:

const transport = webSocket('wss://example.com', {
  onUnsubscribe: ({ request, method, params }) => {
    // Derive unsubscribe method from subscribe method
    return request(method.replace('subscribe', 'unsubscribe'), ...params)
  }
})

Without onUnsubscribe, calling unsub() only removes the local listener. With it, the transport also notifies the server.

Electrum Cash Variant

For Electrum Cash servers, use the electrum-cash subpath which pre-configures protocol-specific defaults:

import { webSocket } from '@rpckit/websocket/electrum-cash'
 
const transport = webSocket('wss://electrum.example.com:50004', {
  keepAlive: 30000,        // Uses server.ping automatically
  clientName: 'myapp',     // Client name in handshake (default: 'rpckit')
  protocolVersion: '1.6',  // Default
})
 
// server.version handshake is sent automatically
// onUnsubscribe derives method from subscribe method

Ethereum Variant

For Ethereum JSON-RPC nodes, use the ethereum subpath which handles eth_subscription notification routing:

import { webSocket } from '@rpckit/websocket/ethereum'
 
const transport = webSocket('wss://ethereum-rpc.publicnode.com')
 
// Standard requests
const blockNumber = await transport.request('eth_blockNumber')
 
// Subscriptions - notifications routed by subscription ID automatically
const unsub = await transport.subscribe('eth_subscribe', 'newHeads', (header) => {
  console.log('New block:', header.number)
})
 
// eth_unsubscribe called automatically
await unsub()

The Ethereum variant automatically:

  • Routes eth_subscription notifications to the correct callback by subscription ID
  • Calls eth_unsubscribe on cleanup
  • Suppresses subscription IDs from callbacks (handled internally)

Subscriptions

Subscribe to server notifications:

const unsubscribe = await transport.subscribe(
  'events.subscribe',
  'channel-1',
  (data) => {
    console.log('Event:', data)
  }
)
 
// Later, unsubscribe
await unsubscribe()

Subscription Behavior

  1. The subscribe method sends the subscription request
  2. The initial response is delivered to the callback
  3. Subsequent notifications for the same subscription are delivered to the callback
  4. Calling unsubscribe() invokes the onUnsubscribe callback if configured

Subscription Sharing

Multiple callers subscribing to the same method+params share a single server subscription. New subscribers receive the most recent notification data (not stale initial data). The server unsubscribe is only sent when the last listener unsubscribes.

// Both callbacks share one server subscription
const unsub1 = await transport.subscribe('events', callback1)
const unsub2 = await transport.subscribe('events', callback2)
 
await unsub1() // callback1 removed, server subscription stays active
await unsub2() // callback2 removed, NOW server unsubscribe is sent

Automatic Resubscription

Subscriptions are automatically restored after reconnection. The transport tracks active subscriptions and re-sends them when the connection is re-established.

Extended Interface

The WebSocket transport extends the base Transport interface:

interface WebSocketTransport<S extends Schema> extends Transport<S> {
  getSocket(): WebSocket | null
  getSocketAsync(): Promise<WebSocket>
}

getSocket()

Returns the current WebSocket instance, or null if not connected:

const socket = transport.getSocket()
if (socket) {
  console.log('Ready state:', socket.readyState)
}

getSocketAsync()

Waits for connection and returns the WebSocket:

const socket = await transport.getSocketAsync()
console.log('Connected!')

Connection Pooling

Transports with identical configuration share the same WebSocket connection:

const t1 = webSocket('wss://example.com')
const t2 = webSocket('wss://example.com')
 
// t1 and t2 share the same underlying WebSocket
// The connection is reference-counted and closed when all references are closed

Example: Full Application

import { webSocket } from '@rpckit/websocket/electrum-cash'
 
const transport = webSocket({
  url: 'wss://electrum.example.com:50004',
  keepAlive: 30000,
  reconnect: {
    delay: 1000,
    attempts: 10,
  }
})
 
// Connect (handshake sent automatically)
await transport.connect()
 
// Make typed requests
const tip = await transport.request('blockchain.headers.get_tip')
console.log(`Block height: ${tip.height}`)
 
// Subscribe to address updates (unsubscribe method derived automatically)
const unsubscribe = await transport.subscribe(
  'blockchain.address.subscribe',
  address,
  (status) => {
    console.log('Address status changed:', status)
  }
)
 
// Keep running until shutdown
process.on('SIGINT', async () => {
  await unsubscribe()
  await transport.close()
})