Node.js wrapper for the Salesforce Bulk API 2.0. Works with any auth library that gives you an access token — jsforce, @salesforce/sf-core, SF CLI, etc.
Written in TypeScript with full type definitions included.
npm install node-sf-bulk2Create a connection object from any Salesforce auth source, then pass it to BulkAPI2:
import { BulkAPI2, BulkAPI2Connection } from 'node-sf-bulk2';
const connection: BulkAPI2Connection = {
accessToken: '<your-access-token>',
instanceUrl: 'https://yourorg.my.salesforce.com',
apiVersion: '62.0',
};
const bulk = new BulkAPI2(connection);For the Tooling API, set isTooling: true on the connection. To pass call options, add callOptions:
const connection: BulkAPI2Connection = {
accessToken: '<token>',
instanceUrl: 'https://yourorg.my.salesforce.com',
apiVersion: '62.0',
isTooling: true,
callOptions: {
client: 'myApp',
defaultNamespace: 'ns1',
},
};Submit a query, poll until complete, and retrieve CSV results:
import { BulkAPI2, BulkAPI2Connection, QueryInput } from 'node-sf-bulk2';
const connection: BulkAPI2Connection = {
accessToken: '<token>',
instanceUrl: 'https://yourorg.my.salesforce.com',
apiVersion: '62.0',
};
const bulk = new BulkAPI2(connection);
// 1. Submit the query job
const queryInput: QueryInput = {
query: 'SELECT Id, Name FROM Account',
operation: 'query',
};
const job = await bulk.submitBulkQueryJob(queryInput);
console.log('Job created:', job.id);
// 2. Poll until complete
let info = await bulk.getBulkQueryJobInfo(job.id);
while (info.state !== 'JobComplete' && info.state !== 'Failed') {
await new Promise((r) => setTimeout(r, 2000));
info = await bulk.getBulkQueryJobInfo(job.id);
}
if (info.state === 'Failed') {
throw new Error('Query job failed');
}
// 3. Get results (CSV)
const response = await bulk.getBulkQueryResults(job.id);
console.log('Results:\n', response.data);
// 4. Clean up
await bulk.deleteBulkQueryJob(job.id);For large result sets, page through with locator and maxRecords:
let locator: string | undefined;
do {
const response = await bulk.getBulkQueryResults(job.id, locator, 10000);
console.log(response.data);
locator = response.headers['sforce-locator'];
if (locator === 'null') locator = undefined;
} while (locator);For large results that shouldn't be buffered in memory, stream directly to a file:
import { createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';
const stream = await bulk.getBulkQueryResultsStream(job.id);
await pipeline(stream, createWriteStream('results.csv'));Create an ingest job, upload CSV data, and retrieve results:
import { BulkAPI2, BulkAPI2Connection, JobUploadRequest } from 'node-sf-bulk2';
import { readFileSync } from 'fs';
const connection: BulkAPI2Connection = {
accessToken: '<token>',
instanceUrl: 'https://yourorg.my.salesforce.com',
apiVersion: '62.0',
};
const bulk = new BulkAPI2(connection);
// 1. Create the job
const jobRequest: JobUploadRequest = {
object: 'Account',
operation: 'insert',
};
const job = await bulk.createDataUploadJob(jobRequest);
// 2. Upload CSV data
const csv = readFileSync('./accounts.csv', 'utf-8');
await bulk.uploadJobData(job.contentUrl, csv);
// 3. Close the job to begin processing
await bulk.closeOrAbortJob(job.id, 'UploadComplete');
// 4. Poll until complete
let info = await bulk.getIngestJobInfo(job.id);
while (info.state !== 'JobComplete' && info.state !== 'Failed') {
await new Promise((r) => setTimeout(r, 2000));
info = await bulk.getIngestJobInfo(job.id);
}
// 5. Get results
const successful = await bulk.getResults(job.id, 0); // successfulResults
const failed = await bulk.getResults(job.id, 1); // failedResults
console.log('Successful:\n', successful);
console.log('Failed:\n', failed);
// 6. Clean up
await bulk.deleteIngestJob(job.id);You can also create a job and upload data in a single multipart request:
const csv = readFileSync('./accounts.csv', 'utf-8');
const job = await bulk.createDataUploadJobWithData(
{ object: 'Account', operation: 'insert' },
csv,
);| Method | Description |
|---|---|
submitBulkQueryJob(query) |
Submit a new query job |
getBulkQueryJobInfo(jobId) |
Get status/info for a query job |
getAllBulkQueryJobInfo(config?) |
List all query jobs (with optional filters) |
getBulkQueryResults(jobId, locator?, maxRecords?) |
Get query results as CSV |
getBulkQueryResultsStream(jobId, locator?, maxRecords?) |
Stream query results as a Node.js Readable |
getBulkQueryResultPages(jobId) |
Get result page URLs for parallel download |
abortBulkQueryJob(jobId) |
Abort a query job |
deleteBulkQueryJob(jobId) |
Delete a query job |
| Method | Description |
|---|---|
createDataUploadJob(request) |
Create an ingest job |
createDataUploadJobWithData(request, csv) |
Create an ingest job with CSV data in one request |
uploadJobData(contentUrl, csv) |
Upload CSV data to an open job |
closeOrAbortJob(jobId, state) |
Close (UploadComplete) or abort (Aborted) a job |
getIngestJobInfo(jobId) |
Get status/info for an ingest job |
getAllIngestJobInfo(config?) |
List all ingest jobs (with optional filters) |
getResults(jobId, resultType) |
Get results — 0 = successful, 1 = failed, 2 = unprocessed |
deleteIngestJob(jobId) |
Delete an ingest job |
See the full API documentation for details on all request/response types.
The examples/ directory contains runnable scripts:
- query-job.ts — Submit a query, poll, retrieve CSV results
- ingest-job.ts — Create ingest job, upload CSV, poll, retrieve results
To run them, set your Salesforce credentials as environment variables:
export SF_ACCESS_TOKEN="your-access-token"
export SF_INSTANCE_URL="https://yourorg.my.salesforce.com"Then run with ts-node:
npx ts-node examples/query-job.ts
npx ts-node examples/ingest-job.tsnpm test # run all tests
npm run test:unit # unit tests only (mocked, no org needed)
npm run test:watch # watch modeIntegration tests require a Salesforce org:
SF_ACCESS_TOKEN="..." SF_INSTANCE_URL="..." npm run test:integrationISC