BatchScheduler
The BatchScheduler class collects individual JSON-RPC requests and sends them as batches. It's used internally by transports but can be used directly for custom batching logic.
Installation
npm i @rpckit/coreBasic Usage
import { BatchScheduler } from '@rpckit/core'
const scheduler = new BatchScheduler(
{ batchSize: 100, wait: 10 },
async (requests) => {
// Send batch to server and return responses
const response = await fetch('https://example.com/rpc', {
method: 'POST',
body: JSON.stringify(requests),
headers: { 'Content-Type': 'application/json' }
})
return response.json()
}
)
// Enqueue requests - they're batched automatically
const result1 = scheduler.enqueue({ jsonrpc: '2.0', id: 1, method: 'method1', params: [] })
const result2 = scheduler.enqueue({ jsonrpc: '2.0', id: 2, method: 'method2', params: [] })
// Both requests are sent together
const [r1, r2] = await Promise.all([result1, result2])Configuration
Options
| Option | Type | Default | Description |
|---|---|---|---|
batchSize | number | 100 | Maximum requests per batch. When reached, batch is sent immediately. |
wait | number | 0 | Maximum time (ms) to wait before sending a batch. |
raw | boolean | false | Return full RPC response objects instead of just results. |
Batch Size
The batchSize option controls how many requests are collected before sending:
const scheduler = new BatchScheduler(
{ batchSize: 10 },
sendBatch
)
// When 10 requests are enqueued, they're sent immediately
for (let i = 0; i < 10; i++) {
scheduler.enqueue({ jsonrpc: '2.0', id: i, method: 'ping', params: [] })
}
// ^ Batch sent after 10th requestWait Time
The wait option sets a timer after the first request is enqueued:
const scheduler = new BatchScheduler(
{ batchSize: 100, wait: 50 },
sendBatch
)
scheduler.enqueue(request1) // Timer starts (50ms)
scheduler.enqueue(request2) // Added to batch
// ... 50ms later, batch is sent even if batchSize not reachedRaw Mode
By default, enqueue() resolves with response.result. In raw mode, it resolves with the full response:
const scheduler = new BatchScheduler(
{ raw: true },
sendBatch
)
const response = await scheduler.enqueue(request)
// response: { jsonrpc: '2.0', id: 1, result: 'value' }
// or: { jsonrpc: '2.0', id: 1, error: { code: -32601, message: '...' } }Methods
enqueue(request)
Adds a request to the batch queue. Returns a promise that resolves when the batch is sent and the response is received.
const promise = scheduler.enqueue({
jsonrpc: '2.0',
id: 1,
method: 'getBalance',
params: ['0x...']
})
const result = await promiseflush()
Immediately sends any pending requests:
scheduler.enqueue(request1)
scheduler.enqueue(request2)
// Don't wait for timer or batchSize - send now
await scheduler.flush()Error Handling
RPC Errors
By default, RPC errors are thrown:
try {
const result = await scheduler.enqueue({
jsonrpc: '2.0',
id: 1,
method: 'unknownMethod',
params: []
})
} catch (error) {
// error: { code: -32601, message: 'Method not found' }
}With raw: true, errors are returned instead of thrown:
const scheduler = new BatchScheduler({ raw: true }, sendBatch)
const response = await scheduler.enqueue(request)
if (response.error) {
console.log('RPC error:', response.error)
} else {
console.log('Result:', response.result)
}Missing Responses
If the server doesn't return a response for a request, an error is thrown:
const result = await scheduler.enqueue(request)
// Error: No response for request id 1, try reducing batch sizeNetwork Errors
Network errors reject all pending requests in the batch:
const results = await Promise.allSettled([
scheduler.enqueue(request1),
scheduler.enqueue(request2)
])
// Both rejected with the same network errorExample: Custom Transport
import { BatchScheduler } from '@rpckit/core'
import type { RpcRequest, RpcResponse } from '@rpckit/core'
function createBatchingTransport(url: string) {
const scheduler = new BatchScheduler(
{ batchSize: 50, wait: 10 },
async (requests: RpcRequest[]): Promise<RpcResponse[]> => {
const response = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(requests)
})
if (!response.ok) {
throw new Error(`HTTP ${response.status}`)
}
return response.json()
}
)
let requestId = 0
return {
async request(method: string, ...params: unknown[]) {
return scheduler.enqueue({
jsonrpc: '2.0',
id: ++requestId,
method,
params
})
},
async flush() {
await scheduler.flush()
}
}
}
// Usage
const transport = createBatchingTransport('https://api.example.com/rpc')
const [a, b, c] = await Promise.all([
transport.request('getA'),
transport.request('getB'),
transport.request('getC')
])
// All three sent in single HTTP requestHow Batching Works
- First
enqueue()call starts a timer (ifwait > 0) - Subsequent calls add to the queue
- Batch is sent when either:
- Queue reaches
batchSize - Timer expires (
waitms elapsed) flush()is called manually
- Queue reaches
- Responses are matched to requests by
id - Each enqueued promise resolves/rejects with its response