Skip to content
Logo

BatchScheduler

The BatchScheduler class collects individual JSON-RPC requests and sends them as batches. It's used internally by transports but can be used directly for custom batching logic.

Installation

npm i @rpckit/core

Basic Usage

import { BatchScheduler } from '@rpckit/core'
 
const scheduler = new BatchScheduler(
  { batchSize: 100, wait: 10 },
  async (requests) => {
    // Send batch to server and return responses
    const response = await fetch('https://example.com/rpc', {
      method: 'POST',
      body: JSON.stringify(requests),
      headers: { 'Content-Type': 'application/json' }
    })
    return response.json()
  }
)
 
// Enqueue requests - they're batched automatically
const result1 = scheduler.enqueue({ jsonrpc: '2.0', id: 1, method: 'method1', params: [] })
const result2 = scheduler.enqueue({ jsonrpc: '2.0', id: 2, method: 'method2', params: [] })
 
// Both requests are sent together
const [r1, r2] = await Promise.all([result1, result2])

Configuration

Options

OptionTypeDefaultDescription
batchSizenumber100Maximum requests per batch. When reached, batch is sent immediately.
waitnumber0Maximum time (ms) to wait before sending a batch.
rawbooleanfalseReturn full RPC response objects instead of just results.
disabledCooldownnumber5000Cooldown in ms before re-enabling batching after a server rejection. Set to 0 to disable auto-recovery.
sendSinglefunction-Callback to send a single request individually. Enables auto-disable on batch rejection.
isBatchRejectionfunction-Custom predicate to detect batch rejection errors.

Batch Size

The batchSize option controls how many requests are collected before sending:

const scheduler = new BatchScheduler(
  { batchSize: 10 },
  sendBatch
)
 
// When 10 requests are enqueued, they're sent immediately
for (let i = 0; i < 10; i++) {
  scheduler.enqueue({ jsonrpc: '2.0', id: i, method: 'ping', params: [] })
}
// ^ Batch sent after 10th request

Wait Time

The wait option sets a timer after the first request is enqueued:

const scheduler = new BatchScheduler(
  { batchSize: 100, wait: 50 },
  sendBatch
)
 
scheduler.enqueue(request1)  // Timer starts (50ms)
scheduler.enqueue(request2)  // Added to batch
// ... 50ms later, batch is sent even if batchSize not reached

Raw Mode

By default, enqueue() resolves with response.result. In raw mode, it resolves with the full response:

const scheduler = new BatchScheduler(
  { raw: true },
  sendBatch
)
 
const response = await scheduler.enqueue(request)
// response: { jsonrpc: '2.0', id: 1, result: 'value' }
// or: { jsonrpc: '2.0', id: 1, error: { code: -32601, message: '...' } }

Methods

enqueue(request)

Adds a request to the batch queue. Returns a promise that resolves when the batch is sent and the response is received.

const promise = scheduler.enqueue({
  jsonrpc: '2.0',
  id: 1,
  method: 'getBalance',
  params: ['0x...']
})
 
const result = await promise

flush()

Immediately sends any pending requests:

scheduler.enqueue(request1)
scheduler.enqueue(request2)
 
// Don't wait for timer or batchSize - send now
await scheduler.flush()

Auto-Disable on Batch Rejection

When a server can't handle batch requests (e.g. the batch is too large, or the server doesn't support batching), the scheduler can automatically fall back to sending requests individually.

This behavior is enabled when a sendSingle callback is provided. The built-in WebSocket and TCP transports provide this automatically.

How It Works

  1. A batch is sent to the server
  2. The server rejects the batch (timeout, parse error, or invalid request)
  3. The scheduler detects the rejection and disables batching
  4. Failed requests are retried individually via sendSingle
  5. Subsequent requests bypass the batch queue and are sent individually
  6. After disabledCooldown ms (default: 5 seconds), batching is re-enabled

Detection

By default, the following errors trigger auto-disable:

  • Batch timeout — the server couldn't process the batch in time
  • Parse error (JSON-RPC code -32700) — the server couldn't parse the batch array
  • Invalid request (JSON-RPC code -32600) — the server rejected the batch format

You can provide a custom isBatchRejection predicate for other error patterns:

const scheduler = new BatchScheduler(
  {
    batchSize: 100,
    sendSingle: (req) => sendIndividualRequest(req),
    isBatchRejection: (error) => {
      // Custom detection logic
      return error instanceof Error && error.message.includes('rate limit')
    }
  },
  sendBatch
)

Checking Status

The disabled property indicates whether batching is currently disabled:

if (scheduler.disabled) {
  console.log('Batching is temporarily disabled')
}

Transport Integration

The WebSocket and TCP transports enable auto-disable by default. Configure the cooldown via the batch option:

import { webSocket } from '@rpckit/websocket/electrum-cash'
 
const transport = webSocket('wss://electrum.example.com', {
  batch: {
    batchSize: 100,
    wait: 10,
    disabledCooldown: 10_000   // Re-enable after 10 seconds
  }
})
 
// If the server rejects a batch, the transport transparently
// falls back to individual requests and recovers automatically

Error Handling

RPC Errors

By default, RPC errors are thrown:

try {
  const result = await scheduler.enqueue({
    jsonrpc: '2.0',
    id: 1,
    method: 'unknownMethod',
    params: []
  })
} catch (error) {
  // error: { code: -32601, message: 'Method not found' }
}

With raw: true, errors are returned instead of thrown:

const scheduler = new BatchScheduler({ raw: true }, sendBatch)
 
const response = await scheduler.enqueue(request)
if (response.error) {
  console.log('RPC error:', response.error)
} else {
  console.log('Result:', response.result)
}

Missing Responses

If the server doesn't return a response for a request, an error is thrown:

const result = await scheduler.enqueue(request)
// Error: No response for request id 1, try reducing batch size

Network Errors

Network errors reject all pending requests in the batch:

const results = await Promise.allSettled([
  scheduler.enqueue(request1),
  scheduler.enqueue(request2)
])
// Both rejected with the same network error

Example: Custom Transport

import { BatchScheduler } from '@rpckit/core'
import type { RpcRequest, RpcResponse } from '@rpckit/core'
 
function createBatchingTransport(url: string) {
  const scheduler = new BatchScheduler(
    { batchSize: 50, wait: 10 },
    async (requests: RpcRequest[]): Promise<RpcResponse[]> => {
      const response = await fetch(url, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(requests)
      })
 
      if (!response.ok) {
        throw new Error(`HTTP ${response.status}`)
      }
 
      return response.json()
    }
  )
 
  let requestId = 0
 
  return {
    async request(method: string, params: unknown[] = []) {
      return scheduler.enqueue({
        jsonrpc: '2.0',
        id: ++requestId,
        method,
        params
      })
    },
 
    async flush() {
      await scheduler.flush()
    }
  }
}
 
// Usage
const transport = createBatchingTransport('https://api.example.com/rpc')
 
const [a, b, c] = await Promise.all([
  transport.request('getA'),
  transport.request('getB'),
  transport.request('getC')
])
// All three sent in single HTTP request

How Batching Works

  1. First enqueue() call starts a timer (if wait > 0)
  2. Subsequent calls add to the queue
  3. Batch is sent when either:
    • Queue reaches batchSize
    • Timer expires (wait ms elapsed)
    • flush() is called manually
  4. Responses are matched to requests by id
  5. Each enqueued promise resolves/rejects with its response