BatchScheduler
The BatchScheduler class collects individual JSON-RPC requests and sends them as batches. It's used internally by transports but can be used directly for custom batching logic.
Installation
npm i @rpckit/coreBasic Usage
import { BatchScheduler } from '@rpckit/core'
const scheduler = new BatchScheduler(
{ batchSize: 100, wait: 10 },
async (requests) => {
// Send batch to server and return responses
const response = await fetch('https://example.com/rpc', {
method: 'POST',
body: JSON.stringify(requests),
headers: { 'Content-Type': 'application/json' }
})
return response.json()
}
)
// Enqueue requests - they're batched automatically
const result1 = scheduler.enqueue({ jsonrpc: '2.0', id: 1, method: 'method1', params: [] })
const result2 = scheduler.enqueue({ jsonrpc: '2.0', id: 2, method: 'method2', params: [] })
// Both requests are sent together
const [r1, r2] = await Promise.all([result1, result2])Configuration
Options
| Option | Type | Default | Description |
|---|---|---|---|
batchSize | number | 100 | Maximum requests per batch. When reached, batch is sent immediately. |
wait | number | 0 | Maximum time (ms) to wait before sending a batch. |
raw | boolean | false | Return full RPC response objects instead of just results. |
disabledCooldown | number | 5000 | Cooldown in ms before re-enabling batching after a server rejection. Set to 0 to disable auto-recovery. |
sendSingle | function | - | Callback to send a single request individually. Enables auto-disable on batch rejection. |
isBatchRejection | function | - | Custom predicate to detect batch rejection errors. |
Batch Size
The batchSize option controls how many requests are collected before sending:
const scheduler = new BatchScheduler(
{ batchSize: 10 },
sendBatch
)
// When 10 requests are enqueued, they're sent immediately
for (let i = 0; i < 10; i++) {
scheduler.enqueue({ jsonrpc: '2.0', id: i, method: 'ping', params: [] })
}
// ^ Batch sent after 10th requestWait Time
The wait option sets a timer after the first request is enqueued:
const scheduler = new BatchScheduler(
{ batchSize: 100, wait: 50 },
sendBatch
)
scheduler.enqueue(request1) // Timer starts (50ms)
scheduler.enqueue(request2) // Added to batch
// ... 50ms later, batch is sent even if batchSize not reachedRaw Mode
By default, enqueue() resolves with response.result. In raw mode, it resolves with the full response:
const scheduler = new BatchScheduler(
{ raw: true },
sendBatch
)
const response = await scheduler.enqueue(request)
// response: { jsonrpc: '2.0', id: 1, result: 'value' }
// or: { jsonrpc: '2.0', id: 1, error: { code: -32601, message: '...' } }Methods
enqueue(request)
Adds a request to the batch queue. Returns a promise that resolves when the batch is sent and the response is received.
const promise = scheduler.enqueue({
jsonrpc: '2.0',
id: 1,
method: 'getBalance',
params: ['0x...']
})
const result = await promiseflush()
Immediately sends any pending requests:
scheduler.enqueue(request1)
scheduler.enqueue(request2)
// Don't wait for timer or batchSize - send now
await scheduler.flush()Auto-Disable on Batch Rejection
When a server can't handle batch requests (e.g. the batch is too large, or the server doesn't support batching), the scheduler can automatically fall back to sending requests individually.
This behavior is enabled when a sendSingle callback is provided. The built-in WebSocket and TCP transports provide this automatically.
How It Works
- A batch is sent to the server
- The server rejects the batch (timeout, parse error, or invalid request)
- The scheduler detects the rejection and disables batching
- Failed requests are retried individually via
sendSingle - Subsequent requests bypass the batch queue and are sent individually
- After
disabledCooldownms (default: 5 seconds), batching is re-enabled
Detection
By default, the following errors trigger auto-disable:
- Batch timeout — the server couldn't process the batch in time
- Parse error (JSON-RPC code
-32700) — the server couldn't parse the batch array - Invalid request (JSON-RPC code
-32600) — the server rejected the batch format
You can provide a custom isBatchRejection predicate for other error patterns:
const scheduler = new BatchScheduler(
{
batchSize: 100,
sendSingle: (req) => sendIndividualRequest(req),
isBatchRejection: (error) => {
// Custom detection logic
return error instanceof Error && error.message.includes('rate limit')
}
},
sendBatch
)Checking Status
The disabled property indicates whether batching is currently disabled:
if (scheduler.disabled) {
console.log('Batching is temporarily disabled')
}Transport Integration
The WebSocket and TCP transports enable auto-disable by default. Configure the cooldown via the batch option:
import { webSocket } from '@rpckit/websocket/electrum-cash'
const transport = webSocket('wss://electrum.example.com', {
batch: {
batchSize: 100,
wait: 10,
disabledCooldown: 10_000 // Re-enable after 10 seconds
}
})
// If the server rejects a batch, the transport transparently
// falls back to individual requests and recovers automaticallyError Handling
RPC Errors
By default, RPC errors are thrown:
try {
const result = await scheduler.enqueue({
jsonrpc: '2.0',
id: 1,
method: 'unknownMethod',
params: []
})
} catch (error) {
// error: { code: -32601, message: 'Method not found' }
}With raw: true, errors are returned instead of thrown:
const scheduler = new BatchScheduler({ raw: true }, sendBatch)
const response = await scheduler.enqueue(request)
if (response.error) {
console.log('RPC error:', response.error)
} else {
console.log('Result:', response.result)
}Missing Responses
If the server doesn't return a response for a request, an error is thrown:
const result = await scheduler.enqueue(request)
// Error: No response for request id 1, try reducing batch sizeNetwork Errors
Network errors reject all pending requests in the batch:
const results = await Promise.allSettled([
scheduler.enqueue(request1),
scheduler.enqueue(request2)
])
// Both rejected with the same network errorExample: Custom Transport
import { BatchScheduler } from '@rpckit/core'
import type { RpcRequest, RpcResponse } from '@rpckit/core'
function createBatchingTransport(url: string) {
const scheduler = new BatchScheduler(
{ batchSize: 50, wait: 10 },
async (requests: RpcRequest[]): Promise<RpcResponse[]> => {
const response = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(requests)
})
if (!response.ok) {
throw new Error(`HTTP ${response.status}`)
}
return response.json()
}
)
let requestId = 0
return {
async request(method: string, params: unknown[] = []) {
return scheduler.enqueue({
jsonrpc: '2.0',
id: ++requestId,
method,
params
})
},
async flush() {
await scheduler.flush()
}
}
}
// Usage
const transport = createBatchingTransport('https://api.example.com/rpc')
const [a, b, c] = await Promise.all([
transport.request('getA'),
transport.request('getB'),
transport.request('getC')
])
// All three sent in single HTTP requestHow Batching Works
- First
enqueue()call starts a timer (ifwait > 0) - Subsequent calls add to the queue
- Batch is sent when either:
- Queue reaches
batchSize - Timer expires (
waitms elapsed) flush()is called manually
- Queue reaches
- Responses are matched to requests by
id - Each enqueued promise resolves/rejects with its response