Skip to content
Logo

BatchScheduler

The BatchScheduler class collects individual JSON-RPC requests and sends them as batches. It's used internally by transports but can be used directly for custom batching logic.

Installation

npm i @rpckit/core

Basic Usage

import { BatchScheduler } from '@rpckit/core'
 
const scheduler = new BatchScheduler(
  { batchSize: 100, wait: 10 },
  async (requests) => {
    // Send batch to server and return responses
    const response = await fetch('https://example.com/rpc', {
      method: 'POST',
      body: JSON.stringify(requests),
      headers: { 'Content-Type': 'application/json' }
    })
    return response.json()
  }
)
 
// Enqueue requests - they're batched automatically
const result1 = scheduler.enqueue({ jsonrpc: '2.0', id: 1, method: 'method1', params: [] })
const result2 = scheduler.enqueue({ jsonrpc: '2.0', id: 2, method: 'method2', params: [] })
 
// Both requests are sent together
const [r1, r2] = await Promise.all([result1, result2])

Configuration

Options

OptionTypeDefaultDescription
batchSizenumber100Maximum requests per batch. When reached, batch is sent immediately.
waitnumber0Maximum time (ms) to wait before sending a batch.
rawbooleanfalseReturn full RPC response objects instead of just results.

Batch Size

The batchSize option controls how many requests are collected before sending:

const scheduler = new BatchScheduler(
  { batchSize: 10 },
  sendBatch
)
 
// When 10 requests are enqueued, they're sent immediately
for (let i = 0; i < 10; i++) {
  scheduler.enqueue({ jsonrpc: '2.0', id: i, method: 'ping', params: [] })
}
// ^ Batch sent after 10th request

Wait Time

The wait option sets a timer after the first request is enqueued:

const scheduler = new BatchScheduler(
  { batchSize: 100, wait: 50 },
  sendBatch
)
 
scheduler.enqueue(request1)  // Timer starts (50ms)
scheduler.enqueue(request2)  // Added to batch
// ... 50ms later, batch is sent even if batchSize not reached

Raw Mode

By default, enqueue() resolves with response.result. In raw mode, it resolves with the full response:

const scheduler = new BatchScheduler(
  { raw: true },
  sendBatch
)
 
const response = await scheduler.enqueue(request)
// response: { jsonrpc: '2.0', id: 1, result: 'value' }
// or: { jsonrpc: '2.0', id: 1, error: { code: -32601, message: '...' } }

Methods

enqueue(request)

Adds a request to the batch queue. Returns a promise that resolves when the batch is sent and the response is received.

const promise = scheduler.enqueue({
  jsonrpc: '2.0',
  id: 1,
  method: 'getBalance',
  params: ['0x...']
})
 
const result = await promise

flush()

Immediately sends any pending requests:

scheduler.enqueue(request1)
scheduler.enqueue(request2)
 
// Don't wait for timer or batchSize - send now
await scheduler.flush()

Error Handling

RPC Errors

By default, RPC errors are thrown:

try {
  const result = await scheduler.enqueue({
    jsonrpc: '2.0',
    id: 1,
    method: 'unknownMethod',
    params: []
  })
} catch (error) {
  // error: { code: -32601, message: 'Method not found' }
}

With raw: true, errors are returned instead of thrown:

const scheduler = new BatchScheduler({ raw: true }, sendBatch)
 
const response = await scheduler.enqueue(request)
if (response.error) {
  console.log('RPC error:', response.error)
} else {
  console.log('Result:', response.result)
}

Missing Responses

If the server doesn't return a response for a request, an error is thrown:

const result = await scheduler.enqueue(request)
// Error: No response for request id 1, try reducing batch size

Network Errors

Network errors reject all pending requests in the batch:

const results = await Promise.allSettled([
  scheduler.enqueue(request1),
  scheduler.enqueue(request2)
])
// Both rejected with the same network error

Example: Custom Transport

import { BatchScheduler } from '@rpckit/core'
import type { RpcRequest, RpcResponse } from '@rpckit/core'
 
function createBatchingTransport(url: string) {
  const scheduler = new BatchScheduler(
    { batchSize: 50, wait: 10 },
    async (requests: RpcRequest[]): Promise<RpcResponse[]> => {
      const response = await fetch(url, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(requests)
      })
 
      if (!response.ok) {
        throw new Error(`HTTP ${response.status}`)
      }
 
      return response.json()
    }
  )
 
  let requestId = 0
 
  return {
    async request(method: string, ...params: unknown[]) {
      return scheduler.enqueue({
        jsonrpc: '2.0',
        id: ++requestId,
        method,
        params
      })
    },
 
    async flush() {
      await scheduler.flush()
    }
  }
}
 
// Usage
const transport = createBatchingTransport('https://api.example.com/rpc')
 
const [a, b, c] = await Promise.all([
  transport.request('getA'),
  transport.request('getB'),
  transport.request('getC')
])
// All three sent in single HTTP request

How Batching Works

  1. First enqueue() call starts a timer (if wait > 0)
  2. Subsequent calls add to the queue
  3. Batch is sent when either:
    • Queue reaches batchSize
    • Timer expires (wait ms elapsed)
    • flush() is called manually
  4. Responses are matched to requests by id
  5. Each enqueued promise resolves/rejects with its response