Skip to content

Commit 6183ec2

Browse files
committed
fix: inline
1 parent 4be66cf commit 6183ec2

File tree

2 files changed

+54
-139
lines changed

2 files changed

+54
-139
lines changed

specs/search/helpers/chunkedPush.yml

Lines changed: 0 additions & 55 deletions
This file was deleted.

templates/javascript/clients/algoliasearch/builds/definition.mustache

Lines changed: 54 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { {{{dependencyName}}}Client } from '{{{dependencyPackage}}}';
88
import type { {{#lambda.titlecase}}{{{dependencyName}}}{{/lambda.titlecase}}Client } from '{{{dependencyPackage}}}';
99
{{/dependencies}}
1010

11-
import type { ChunkedBatchOptions, ReplaceAllObjectsOptions, ReplaceAllObjectsWithTransformationResponse } from '@algolia/client-search';
11+
import type { ReplaceAllObjectsOptions, ReplaceAllObjectsWithTransformationResponse } from '@algolia/client-search';
1212
import type { PartialUpdateObjectsOptions, SaveObjectsOptions } from '@algolia/client-search';
1313
import type { PushTaskRecords, WatchResponse } from '@algolia/ingestion';
1414

@@ -75,20 +75,6 @@ export type Algoliasearch = SearchClient & {
7575
options: ReplaceAllObjectsOptions,
7676
requestOptions?: RequestOptions | undefined,
7777
) => Promise<ReplaceAllObjectsWithTransformationResponse>;
78-
79-
/**
80-
* Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
81-
*
82-
* @summary Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests.
83-
* @param chunkedPush - The `chunkedPush` object.
84-
* @param chunkedPush.indexName - The `indexName` to replace `objects` in.
85-
* @param chunkedPush.objects - The array of `objects` to store in the given Algolia `indexName`.
86-
* @param chunkedPush.action - The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`.
87-
* @param chunkedPush.waitForTasks - Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable.
88-
* @param chunkedPush.batchSize - The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
89-
* @param requestOptions - The requestOptions to send along with the query, they will be forwarded to the `getTask` method and merged with the transporter requestOptions.
90-
*/
91-
chunkedPush: (options: ChunkedBatchOptions, requestOptions?: RequestOptions) => Promise<Array<WatchResponse>>;
9278
};
9379

9480
export type TransformationOptions = {
@@ -174,71 +160,6 @@ export function algoliasearch(
174160
);
175161
},
176162

177-
async chunkedPush(
178-
{ indexName, objects, action = 'addObject', waitForTasks, batchSize = 1000 }: ChunkedBatchOptions,
179-
requestOptions?: RequestOptions,
180-
): Promise<Array<WatchResponse>> {
181-
if (!ingestionTransporter) {
182-
throw new Error('`transformation.region` must be provided at client instantiation before calling this method.');
183-
}
184-
185-
if (!options?.transformation?.region) {
186-
throw new Error('`region` must be provided when leveraging the transformation pipeline');
187-
}
188-
189-
let records: Array<PushTaskRecords> = [];
190-
const responses: Array<WatchResponse> = [];
191-
192-
const objectEntries = objects.entries();
193-
for (const [i, obj] of objectEntries) {
194-
records.push(obj as PushTaskRecords);
195-
if (records.length === batchSize || i === objects.length - 1) {
196-
responses.push(
197-
await ingestionTransporter.push(
198-
{ indexName, pushTaskPayload: { action, records }, watch: waitForTasks },
199-
requestOptions,
200-
),
201-
);
202-
records = [];
203-
}
204-
}
205-
206-
let retryCount = 0;
207-
208-
if (waitForTasks) {
209-
for (const resp of responses) {
210-
if (resp.eventID === undefined || !resp.eventID) {
211-
throw new Error('received unexpected response from the push endpoint, eventID must not be undefined');
212-
}
213-
214-
await createIterablePromise({
215-
func: async () => {
216-
if (resp.eventID === undefined || !resp.eventID) {
217-
throw new Error('received unexpected response from the push endpoint, eventID must not be undefined');
218-
}
219-
220-
return ingestionTransporter.getEvent({ runID: resp.runID, eventID: resp.eventID }).catch((error: ApiError) => {
221-
if (error.status === 404) {
222-
return undefined;
223-
}
224-
225-
throw error;
226-
})
227-
},
228-
validate: (response) => response !== undefined,
229-
aggregator: () => (retryCount += 1),
230-
error: {
231-
validate: () => retryCount >= 50,
232-
message: () => `The maximum number of retries exceeded. (${retryCount}/${50})`,
233-
},
234-
timeout: (): number => Math.min(retryCount * 200, 5000),
235-
});
236-
}
237-
}
238-
239-
return responses;
240-
},
241-
242163
async replaceAllObjectsWithTransformation(
243164
{ indexName, objects, batchSize, scopes }: ReplaceAllObjectsOptions,
244165
requestOptions?: RequestOptions | undefined,
@@ -271,10 +192,59 @@ export function algoliasearch(
271192
requestOptions,
272193
);
273194

274-
const watchResponses = await this.chunkedPush(
275-
{ indexName: tmpIndexName, objects, waitForTasks: true, batchSize },
276-
requestOptions,
277-
);
195+
let records: Array<PushTaskRecords> = [];
196+
const watchResponses: Array<WatchResponse> = [];
197+
198+
const objectEntries = objects.entries();
199+
for (const [i, obj] of objectEntries) {
200+
records.push(obj as PushTaskRecords);
201+
if (records.length === batchSize || i === objects.length - 1) {
202+
watchResponses.push(
203+
await ingestionTransporter.push(
204+
{
205+
indexName: tmpIndexName,
206+
pushTaskPayload: { action: 'addObject', records },
207+
referenceIndexName: indexName,
208+
},
209+
requestOptions,
210+
),
211+
);
212+
records = [];
213+
}
214+
}
215+
216+
let retryCount = 0;
217+
218+
for (const resp of watchResponses) {
219+
if (resp.eventID === undefined || !resp.eventID) {
220+
throw new Error('received unexpected response from the push endpoint, eventID must not be undefined');
221+
}
222+
223+
await createIterablePromise({
224+
func: async () => {
225+
if (resp.eventID === undefined || !resp.eventID) {
226+
throw new Error('received unexpected response from the push endpoint, eventID must not be undefined');
227+
}
228+
229+
return ingestionTransporter
230+
.getEvent({ runID: resp.runID, eventID: resp.eventID })
231+
.catch((error: ApiError) => {
232+
if (error.status === 404) {
233+
return undefined;
234+
}
235+
236+
throw error;
237+
});
238+
},
239+
validate: (response) => response !== undefined,
240+
aggregator: () => (retryCount += 1),
241+
error: {
242+
validate: () => retryCount >= 50,
243+
message: () => `The maximum number of retries exceeded. (${retryCount}/${50})`,
244+
},
245+
timeout: (): number => Math.min(retryCount * 200, 5000),
246+
});
247+
}
278248

279249
await this.waitForTask({
280250
indexName: tmpIndexName,

0 commit comments

Comments
 (0)