From 2599712cdff07b4a20a6d813f53fac9c3c768ce3 Mon Sep 17 00:00:00 2001 From: Rohit Date: Tue, 14 Jan 2025 20:13:14 +0530 Subject: [PATCH 01/21] feat: handle parallel pagination for scrape list --- maxun-core/src/interpret.ts | 233 +++++++++++++++++++++++++++++++++++- 1 file changed, 232 insertions(+), 1 deletion(-) diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index 70d425a18..13a5a991b 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -540,7 +540,7 @@ export default class Interpreter extends EventEmitter { } } - private async handlePagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) { + private async handleSequentialPagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) { let allResults: Record[] = []; let previousHeight = 0; // track unique items per page to avoid re-scraping @@ -767,6 +767,237 @@ export default class Interpreter extends EventEmitter { return allResults; } + private async handlePagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) { + // Constants for parallel processing + const PARALLEL_THRESHOLD = 1000; + const BATCH_SIZE = 100; + + // If limit is under threshold or no limit specified, use existing sequential logic + if (!config.limit || config.limit <= PARALLEL_THRESHOLD) { + return await this.handleSequentialPagination(page, config); + } + + // For larger datasets, use parallel processing + const batches = []; + for (let i = 0; i < config.limit; i += BATCH_SIZE) { + batches.push({ + start: i, + end: Math.min(i + BATCH_SIZE, config.limit) + }); + } + + // Initialize result tracking + const results: Record[][] = []; + const processedItems = new Set(); + const visitedUrls = new Set(); + + switch (config.pagination.type) { + case 'scrollDown': + case 'scrollUp': { + // Handle parallel scroll-based pagination + for (const batch of batches) { + this.concurrency.addJob(async () => { + const newPage = await page.context().newPage(); + await newPage.goto(page.url()); + + const batchConfig = { + ...config, + limit: batch.end - batch.start + }; + + // Initialize scroll position based on pagination type + if (config.pagination.type === 'scrollDown') { + await newPage.evaluate((start) => window.scrollTo(0, start * 100), batch.start); + } else { + await newPage.evaluate( + (start) => window.scrollTo(0, document.body.scrollHeight - start * 100), + batch.start + ); + } + + let previousHeight = await newPage.evaluate(() => document.body.scrollHeight); + let batchResults: Record[] = []; + + while (batchResults.length < batchConfig.limit) { + // Perform scrolling based on pagination type + if (config.pagination.type === 'scrollDown') { + await newPage.evaluate(() => window.scrollTo(0, document.body.scrollHeight)); + } else { + await newPage.evaluate(() => window.scrollTo(0, 0)); + } + + await newPage.waitForTimeout(2000); + + const currentHeight = await newPage.evaluate(() => document.body.scrollHeight); + if (currentHeight === previousHeight) { + break; + } + previousHeight = currentHeight; + + const newResults = await newPage.evaluate((cfg) => window.scrapeList(cfg), batchConfig); + batchResults = newResults; + } + + // Store unique results for this batch + const uniqueBatchResults = batchResults.filter(item => { + const itemHash = JSON.stringify(item); + if (processedItems.has(itemHash)) return false; + processedItems.add(itemHash); + return true; + }); + + results[Math.floor(batch.start / BATCH_SIZE)] = uniqueBatchResults; + await newPage.close(); + }); + } + break; + } + + case 'clickNext': { + // Pre-discover navigation sequence + const navigationSequence: string[] = []; + const mainPage = await page.context().newPage(); + await mainPage.goto(page.url()); + navigationSequence.push(mainPage.url()); + + // Map out the navigation path first + let currentPage = 1; + while (currentPage * BATCH_SIZE < config.limit) { + const availableSelectors = config.pagination.selector.split(','); + let clicked = false; + + for (const selector of availableSelectors) { + try { + const nextButton = await mainPage.waitForSelector(selector, { + state: 'attached', + timeout: 5000 + }); + + if (nextButton) { + try { + await Promise.race([ + Promise.all([ + mainPage.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), + nextButton.click() + ]), + Promise.all([ + mainPage.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), + nextButton.dispatchEvent('click') + ]) + ]); + + const newUrl = mainPage.url(); + if (!visitedUrls.has(newUrl)) { + navigationSequence.push(newUrl); + visitedUrls.add(newUrl); + clicked = true; + break; + } + } catch (error) { + continue; + } + } + } catch (error) { + continue; + } + } + + if (!clicked) break; + currentPage++; + await mainPage.waitForTimeout(1000); + } + await mainPage.close(); + + // Process batches using discovered navigation sequence + for (const batch of batches) { + this.concurrency.addJob(async () => { + const newPage = await page.context().newPage(); + const targetPageIndex = Math.floor(batch.start / BATCH_SIZE); + + if (targetPageIndex < navigationSequence.length) { + await newPage.goto(navigationSequence[targetPageIndex]); + + const batchConfig = { + ...config, + limit: batch.end - batch.start + }; + + const batchResults = await newPage.evaluate((cfg) => window.scrapeList(cfg), batchConfig); + + const uniqueBatchResults = batchResults.filter(item => { + const itemHash = JSON.stringify(item); + if (processedItems.has(itemHash)) return false; + processedItems.add(itemHash); + return true; + }); + + results[targetPageIndex] = uniqueBatchResults; + } + + await newPage.close(); + }); + } + break; + } + + case 'clickLoadMore': { + for (const batch of batches) { + this.concurrency.addJob(async () => { + const newPage = await page.context().newPage(); + await newPage.goto(page.url()); + + // Click through to reach batch start position + for (let i = 0; i < Math.floor(batch.start / BATCH_SIZE); i++) { + const availableSelectors = config.pagination.selector.split(','); + let clicked = false; + + for (const selector of availableSelectors) { + try { + const loadMoreButton = await newPage.waitForSelector(selector, { timeout: 5000 }); + if (loadMoreButton) { + await Promise.race([ + loadMoreButton.click(), + loadMoreButton.dispatchEvent('click') + ]); + clicked = true; + break; + } + } catch (error) { + continue; + } + } + + if (!clicked) break; + await newPage.waitForTimeout(1000); + } + + const batchConfig = { + ...config, + limit: batch.end - batch.start + }; + + const batchResults = await newPage.evaluate((cfg) => window.scrapeList(cfg), batchConfig); + + const uniqueBatchResults = batchResults.filter(item => { + const itemHash = JSON.stringify(item); + if (processedItems.has(itemHash)) return false; + processedItems.add(itemHash); + return true; + }); + + results[Math.floor(batch.start / BATCH_SIZE)] = uniqueBatchResults; + await newPage.close(); + }); + } + break; + } + } + + // Wait for all parallel operations to complete + await this.concurrency.waitForCompletion(); + return results.flat(); + } + private getMatchingActionId(workflow: Workflow, pageState: PageState, usedActions: string[]) { for (let actionId = workflow.length - 1; actionId >= 0; actionId--) { const step = workflow[actionId]; From 0c655fc4e8b76a078ba84a6a4d056e91dda38ed0 Mon Sep 17 00:00:00 2001 From: Rohit Date: Wed, 15 Jan 2025 15:33:08 +0530 Subject: [PATCH 02/21] feat: add support to handle parallel pagination --- maxun-core/src/interpret.ts | 555 +++++++++++++++++++++--------------- 1 file changed, 322 insertions(+), 233 deletions(-) diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index 13a5a991b..6e17bed57 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -48,6 +48,30 @@ interface InterpreterOptions { }> } +interface SharedState { + totalScraped: number; + visitedUrls: Set; + results: any[]; +} + +interface ScrapeListConfig { + listSelector: string; + fields: any; + limit?: number; + pagination: any; +} + +interface WorkerConfig extends ScrapeListConfig { + workerIndex: number; + startIndex: number; + endIndex: number; + batchSize: number; + pagination: { + type: 'scrollDown' | 'scrollUp' | 'clickNext' | 'clickLoadMore'; + selector: string; + }; +} + /** * Class for running the Smart Workflows. */ @@ -451,7 +475,7 @@ export default class Interpreter extends EventEmitter { const scrapeResults: Record[] = await page.evaluate((cfg) => window.scrapeList(cfg), config); await this.options.serializableCallback(scrapeResults); } else { - const scrapeResults: Record[] = await this.handlePagination(page, config); + const scrapeResults: Record[] = await this.handleParallelPagination(page, config); await this.options.serializableCallback(scrapeResults); } }, @@ -540,7 +564,303 @@ export default class Interpreter extends EventEmitter { } } - private async handleSequentialPagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) { + private async handleParallelPagination(page: Page, config: any): Promise { + // Only use parallel processing for large datasets + if (!config.limit || config.limit < 10000) { + return this.handlePagination(page, config); + } + + console.time('parallel-scraping'); + const context = page.context(); + const numWorkers = 4; // Number of parallel processes + const batchSize = Math.ceil(config.limit / numWorkers); + + // Create shared state for coordination + const sharedState: SharedState = { + totalScraped: 0, + visitedUrls: new Set(), + results: [] + }; + + console.log(`Starting parallel scraping with ${numWorkers} workers`); + console.log(`Each worker will handle ${batchSize} items`); + + // Create worker processes + const workers = await Promise.all(Array.from({ length: numWorkers }, async (_, index) => { + // Each worker gets its own browser context + const workerContext = await context.browser().newContext(); + const workerPage = await workerContext.newPage(); + + await this.ensureScriptsLoaded(workerPage); + + // Calculate this worker's range + const startIndex = index * batchSize; + const endIndex = Math.min((index + 1) * batchSize, config.limit); + + console.log(`Worker ${index + 1} starting: Items ${startIndex} to ${endIndex}`); + + try { + // Navigate to the same URL + await workerPage.goto(page.url()); + await workerPage.waitForLoadState('networkidle'); + + // Create worker-specific config + const workerConfig: WorkerConfig = { + ...config, + workerIndex: index, + startIndex, + endIndex, + batchSize + }; + + // Perform scraping for this worker's portion + const workerResults = await this.scrapeWorkerBatch( + workerPage, + workerConfig, + sharedState + ); + + return workerResults; + } finally { + await workerPage.close(); + await workerContext.close(); + } + })); + + // Combine and process results from all workers + const allResults = workers.flat().sort((a, b) => { + // Sort by any unique identifier in your data + // This example assumes items have an index or id field + return (a.index || 0) - (b.index || 0); + }); + + console.timeEnd('parallel-scraping'); + console.log(`Total items scraped: ${allResults.length}`); + + return allResults.slice(0, config.limit); +} + +// Worker-specific scraping implementation +private async scrapeWorkerBatch( + page: Page, + config: WorkerConfig, + sharedState: SharedState +): Promise { + let results: any[] = []; + let previousHeight = 0; + let scrapedItems = new Set(); + let consecutiveEmptyPages = 0; + const maxEmptyPages = 3; // Stop after this many pages with no new items + + const scrapeConfig: ScrapeListConfig = { + listSelector: config.listSelector, + fields: config.fields, + pagination: config.pagination, + // We set a smaller limit for each page scrape to avoid overwhelming the browser + limit: config.endIndex - config.startIndex - results.length + }; + + while (results.length < (config.endIndex - config.startIndex)) { + // Scrape current page + const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), scrapeConfig); + + // Filter duplicates + const newResults = pageResults.filter(item => { + const uniqueKey = JSON.stringify(item); + if (scrapedItems.has(uniqueKey)) return false; + scrapedItems.add(uniqueKey); + return true; + }); + + if (newResults.length === 0) { + consecutiveEmptyPages++; + if (consecutiveEmptyPages >= maxEmptyPages) { + console.log(`Worker ${config.workerIndex + 1}: Stopped after ${maxEmptyPages} empty pages`); + break; + } + } else { + consecutiveEmptyPages = 0; + } + + results = results.concat(newResults); + + // Handle pagination based on type + const paginationSuccess = await this.handleWorkerPagination( + page, + config, + previousHeight, + sharedState.visitedUrls + ); + + if (!paginationSuccess) { + console.log(`Worker ${config.workerIndex + 1}: Pagination ended`); + break; + } + + // Update progress + console.log(`Worker ${config.workerIndex + 1}: ${results.length}/${config.batchSize} items`); + + // Respect the batch limit + if (results.length >= config.batchSize) { + break; + } + + // Add small delay between pages to avoid overwhelming the server + await page.waitForTimeout(1000); + } + + return results; +} + +// Enhanced pagination handler for workers +private async handleWorkerPagination( + page: Page, + config: WorkerConfig, + previousHeight: number, + visitedUrls: Set +): Promise { + try { + switch (config.pagination.type) { + case 'scrollDown': + return await this.handleScrollDownPagination(page, previousHeight); + + case 'scrollUp': + return await this.handleScrollUpPagination(page, previousHeight); + + case 'clickNext': + return await this.handleClickNextPagination(page, config.pagination.selector, visitedUrls); + + case 'clickLoadMore': + return await this.handleLoadMorePagination(page, config.pagination.selector, previousHeight); + + default: + console.log(`Worker ${config.workerIndex + 1}: Unknown pagination type`); + return false; + } + } catch (error) { + console.error(`Pagination error in worker ${config.workerIndex + 1}:`, error); + return false; + } +} + +// Specific pagination type handlers +private async handleScrollDownPagination(page: Page, previousHeight: number): Promise { + await page.evaluate(() => window.scrollTo(0, document.body.scrollHeight)); + await page.waitForTimeout(2000); + + const currentHeight = await page.evaluate(() => document.body.scrollHeight); + return currentHeight !== previousHeight; +} + +private async handleScrollUpPagination(page: Page, previousHeight: number): Promise { + await page.evaluate(() => window.scrollTo(0, 0)); + await page.waitForTimeout(2000); + + const currentTopHeight = await page.evaluate(() => document.documentElement.scrollTop); + return currentTopHeight !== 0; +} + +private async handleClickNextPagination( + page: Page, + selectors: string, + visitedUrls: Set +): Promise { + const availableSelectors = selectors.split(','); + + for (const selector of availableSelectors) { + try { + const button = await page.waitForSelector(selector, { + state: 'attached', + timeout: 5000 + }); + + if (button) { + const currentUrl = page.url(); + if (visitedUrls.has(currentUrl)) { + continue; + } + + visitedUrls.add(currentUrl); + + try { + await Promise.race([ + Promise.all([ + page.waitForNavigation({ + waitUntil: 'networkidle', + timeout: 30000 + }), + button.click() + ]), + Promise.all([ + page.waitForNavigation({ + waitUntil: 'networkidle', + timeout: 30000 + }), + button.dispatchEvent('click') + ]) + ]); + + // Verify navigation succeeded + const newUrl = page.url(); + if (newUrl === currentUrl) { + console.log("Navigation failed - URL didn't change"); + continue; + } + + return true; + } catch (error) { + console.log(`Click navigation failed for selector ${selector}:`); + continue; + } + } + } catch (error) { + continue; + } + } + + return false; +} + +private async handleLoadMorePagination( + page: Page, + selectors: string, + previousHeight: number +): Promise { + const availableSelectors = selectors.split(','); + + for (const selector of availableSelectors) { + try { + const button = await page.waitForSelector(selector, { + state: 'attached', + timeout: 5000 + }); + + if (button) { + try { + await Promise.race([ + button.click(), + button.dispatchEvent('click') + ]); + + await page.waitForTimeout(2000); + await page.evaluate(() => window.scrollTo(0, document.body.scrollHeight)); + + const currentHeight = await page.evaluate(() => document.body.scrollHeight); + return currentHeight !== previousHeight; + } catch (error) { + console.log(`Load more action failed for selector ${selector}:`, error); + continue; + } + } + } catch (error) { + continue; + } + } + + return false; + } + + private async handlePagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) { let allResults: Record[] = []; let previousHeight = 0; // track unique items per page to avoid re-scraping @@ -767,237 +1087,6 @@ export default class Interpreter extends EventEmitter { return allResults; } - private async handlePagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) { - // Constants for parallel processing - const PARALLEL_THRESHOLD = 1000; - const BATCH_SIZE = 100; - - // If limit is under threshold or no limit specified, use existing sequential logic - if (!config.limit || config.limit <= PARALLEL_THRESHOLD) { - return await this.handleSequentialPagination(page, config); - } - - // For larger datasets, use parallel processing - const batches = []; - for (let i = 0; i < config.limit; i += BATCH_SIZE) { - batches.push({ - start: i, - end: Math.min(i + BATCH_SIZE, config.limit) - }); - } - - // Initialize result tracking - const results: Record[][] = []; - const processedItems = new Set(); - const visitedUrls = new Set(); - - switch (config.pagination.type) { - case 'scrollDown': - case 'scrollUp': { - // Handle parallel scroll-based pagination - for (const batch of batches) { - this.concurrency.addJob(async () => { - const newPage = await page.context().newPage(); - await newPage.goto(page.url()); - - const batchConfig = { - ...config, - limit: batch.end - batch.start - }; - - // Initialize scroll position based on pagination type - if (config.pagination.type === 'scrollDown') { - await newPage.evaluate((start) => window.scrollTo(0, start * 100), batch.start); - } else { - await newPage.evaluate( - (start) => window.scrollTo(0, document.body.scrollHeight - start * 100), - batch.start - ); - } - - let previousHeight = await newPage.evaluate(() => document.body.scrollHeight); - let batchResults: Record[] = []; - - while (batchResults.length < batchConfig.limit) { - // Perform scrolling based on pagination type - if (config.pagination.type === 'scrollDown') { - await newPage.evaluate(() => window.scrollTo(0, document.body.scrollHeight)); - } else { - await newPage.evaluate(() => window.scrollTo(0, 0)); - } - - await newPage.waitForTimeout(2000); - - const currentHeight = await newPage.evaluate(() => document.body.scrollHeight); - if (currentHeight === previousHeight) { - break; - } - previousHeight = currentHeight; - - const newResults = await newPage.evaluate((cfg) => window.scrapeList(cfg), batchConfig); - batchResults = newResults; - } - - // Store unique results for this batch - const uniqueBatchResults = batchResults.filter(item => { - const itemHash = JSON.stringify(item); - if (processedItems.has(itemHash)) return false; - processedItems.add(itemHash); - return true; - }); - - results[Math.floor(batch.start / BATCH_SIZE)] = uniqueBatchResults; - await newPage.close(); - }); - } - break; - } - - case 'clickNext': { - // Pre-discover navigation sequence - const navigationSequence: string[] = []; - const mainPage = await page.context().newPage(); - await mainPage.goto(page.url()); - navigationSequence.push(mainPage.url()); - - // Map out the navigation path first - let currentPage = 1; - while (currentPage * BATCH_SIZE < config.limit) { - const availableSelectors = config.pagination.selector.split(','); - let clicked = false; - - for (const selector of availableSelectors) { - try { - const nextButton = await mainPage.waitForSelector(selector, { - state: 'attached', - timeout: 5000 - }); - - if (nextButton) { - try { - await Promise.race([ - Promise.all([ - mainPage.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), - nextButton.click() - ]), - Promise.all([ - mainPage.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), - nextButton.dispatchEvent('click') - ]) - ]); - - const newUrl = mainPage.url(); - if (!visitedUrls.has(newUrl)) { - navigationSequence.push(newUrl); - visitedUrls.add(newUrl); - clicked = true; - break; - } - } catch (error) { - continue; - } - } - } catch (error) { - continue; - } - } - - if (!clicked) break; - currentPage++; - await mainPage.waitForTimeout(1000); - } - await mainPage.close(); - - // Process batches using discovered navigation sequence - for (const batch of batches) { - this.concurrency.addJob(async () => { - const newPage = await page.context().newPage(); - const targetPageIndex = Math.floor(batch.start / BATCH_SIZE); - - if (targetPageIndex < navigationSequence.length) { - await newPage.goto(navigationSequence[targetPageIndex]); - - const batchConfig = { - ...config, - limit: batch.end - batch.start - }; - - const batchResults = await newPage.evaluate((cfg) => window.scrapeList(cfg), batchConfig); - - const uniqueBatchResults = batchResults.filter(item => { - const itemHash = JSON.stringify(item); - if (processedItems.has(itemHash)) return false; - processedItems.add(itemHash); - return true; - }); - - results[targetPageIndex] = uniqueBatchResults; - } - - await newPage.close(); - }); - } - break; - } - - case 'clickLoadMore': { - for (const batch of batches) { - this.concurrency.addJob(async () => { - const newPage = await page.context().newPage(); - await newPage.goto(page.url()); - - // Click through to reach batch start position - for (let i = 0; i < Math.floor(batch.start / BATCH_SIZE); i++) { - const availableSelectors = config.pagination.selector.split(','); - let clicked = false; - - for (const selector of availableSelectors) { - try { - const loadMoreButton = await newPage.waitForSelector(selector, { timeout: 5000 }); - if (loadMoreButton) { - await Promise.race([ - loadMoreButton.click(), - loadMoreButton.dispatchEvent('click') - ]); - clicked = true; - break; - } - } catch (error) { - continue; - } - } - - if (!clicked) break; - await newPage.waitForTimeout(1000); - } - - const batchConfig = { - ...config, - limit: batch.end - batch.start - }; - - const batchResults = await newPage.evaluate((cfg) => window.scrapeList(cfg), batchConfig); - - const uniqueBatchResults = batchResults.filter(item => { - const itemHash = JSON.stringify(item); - if (processedItems.has(itemHash)) return false; - processedItems.add(itemHash); - return true; - }); - - results[Math.floor(batch.start / BATCH_SIZE)] = uniqueBatchResults; - await newPage.close(); - }); - } - break; - } - } - - // Wait for all parallel operations to complete - await this.concurrency.waitForCompletion(); - return results.flat(); - } - private getMatchingActionId(workflow: Workflow, pageState: PageState, usedActions: string[]) { for (let actionId = workflow.length - 1; actionId >= 0; actionId--) { const step = workflow[actionId]; From 641e18241ffd8df8a9950125942219e1ed80bd33 Mon Sep 17 00:00:00 2001 From: Rohit Date: Wed, 15 Jan 2025 19:43:41 +0530 Subject: [PATCH 03/21] feat: add parallel scraping logic for click next pagination --- maxun-core/src/interpret.ts | 392 +++++++++++++++++------------------- 1 file changed, 188 insertions(+), 204 deletions(-) diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index 6e17bed57..11a307f6c 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -70,6 +70,7 @@ interface WorkerConfig extends ScrapeListConfig { type: 'scrollDown' | 'scrollUp' | 'clickNext' | 'clickLoadMore'; selector: string; }; + pageUrls: string[]; } /** @@ -574,6 +575,125 @@ export default class Interpreter extends EventEmitter { const context = page.context(); const numWorkers = 4; // Number of parallel processes const batchSize = Math.ceil(config.limit / numWorkers); + + // First, let's analyze the first page to understand our pagination needs + console.log('Analyzing first page...'); + const firstPageResults = await page.evaluate((cfg) => window.scrapeList(cfg), { + listSelector: config.listSelector, + fields: config.fields, + pagination: config.pagination + }); + + const itemsPerPage = firstPageResults.length; + const estimatedPages = Math.ceil(config.limit / itemsPerPage); + console.log(`Items per page: ${itemsPerPage}`); + console.log(`Estimated pages needed: ${estimatedPages}`); + + const pageUrls: string[] = []; + + try { + let availableSelectors = config.pagination.selector.split(','); + let visitedUrls: string[] = []; + + while (true) { + pageUrls.push(page.url()) + + if (pageUrls.length >= estimatedPages) { + console.log('Reached estimated number of pages. Stopping pagination.'); + break; + } + + let checkButton = null; + let workingSelector = null; + + for (let i = 0; i < availableSelectors.length; i++) { + const selector = availableSelectors[i]; + try { + // Wait for selector with a short timeout + checkButton = await page.waitForSelector(selector, { state: 'attached' }); + if (checkButton) { + workingSelector = selector; + break; + } + } catch (error) { + console.log(`Selector failed: ${selector}`); + } + } + + if(!workingSelector) { + break; + } + + const nextButton = await page.$(workingSelector); + if (!nextButton) { + break; + } + + const selectorIndex = availableSelectors.indexOf(workingSelector!); + availableSelectors = availableSelectors.slice(selectorIndex); + + const previousUrl = page.url(); + visitedUrls.push(previousUrl); + + try { + // Try both click methods simultaneously + await Promise.race([ + Promise.all([ + page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), + nextButton.click() + ]), + Promise.all([ + page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), + nextButton.dispatchEvent('click') + ]) + ]); + } catch (error) { + // Verify if navigation actually succeeded + const currentUrl = page.url(); + if (currentUrl === previousUrl) { + console.log("Previous URL same as current URL. Navigation failed."); + } + } + + const currentUrl = page.url(); + if (visitedUrls.includes(currentUrl)) { + console.log(`Detected navigation to a previously visited URL: ${currentUrl}`); + + // Extract the current page number from the URL + const match = currentUrl.match(/\d+/); + if (match) { + const currentNumber = match[0]; + // Use visitedUrls.length + 1 as the next page number + const nextNumber = visitedUrls.length + 1; + + // Create new URL by replacing the current number with the next number + const nextUrl = currentUrl.replace(currentNumber, nextNumber.toString()); + + console.log(`Navigating to constructed URL: ${nextUrl}`); + + // Navigate to the next page + await Promise.all([ + page.waitForNavigation({ waitUntil: 'networkidle' }), + page.goto(nextUrl) + ]); + } + } + + await page.waitForTimeout(1000); + } + } catch (error) { + console.error('Error collecting page URLs:', error); + } + + console.log(`Collected ${pageUrls.length} unique page URLs`); + + // Distribute pages among workers + const pagesPerWorker = Math.ceil(pageUrls.length / numWorkers); + const workerPageAssignments = Array.from({ length: numWorkers }, (_, i) => { + const start = i * pagesPerWorker; + const end = Math.min(start + pagesPerWorker, pageUrls.length); + return pageUrls.slice(start, end); + }); // Create shared state for coordination const sharedState: SharedState = { @@ -610,9 +730,12 @@ export default class Interpreter extends EventEmitter { workerIndex: index, startIndex, endIndex, - batchSize + batchSize, + pageUrls: workerPageAssignments[index] }; + console.log(`Worker ${index} URLs: ${workerPageAssignments[index]}`); + // Perform scraping for this worker's portion const workerResults = await this.scrapeWorkerBatch( workerPage, @@ -640,224 +763,85 @@ export default class Interpreter extends EventEmitter { return allResults.slice(0, config.limit); } -// Worker-specific scraping implementation private async scrapeWorkerBatch( - page: Page, - config: WorkerConfig, - sharedState: SharedState + page: Page, + config: WorkerConfig, + sharedState: SharedState ): Promise { - let results: any[] = []; - let previousHeight = 0; - let scrapedItems = new Set(); - let consecutiveEmptyPages = 0; - const maxEmptyPages = 3; // Stop after this many pages with no new items - - const scrapeConfig: ScrapeListConfig = { - listSelector: config.listSelector, - fields: config.fields, - pagination: config.pagination, - // We set a smaller limit for each page scrape to avoid overwhelming the browser - limit: config.endIndex - config.startIndex - results.length - }; + const results: any[] = []; + const scrapedItems = new Set(); + + // Now we can access the page URLs directly from the config + console.log(`Worker ${config.workerIndex + 1}: Processing ${config.pageUrls.length} assigned pages`); - while (results.length < (config.endIndex - config.startIndex)) { - // Scrape current page - const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), scrapeConfig); - - // Filter duplicates - const newResults = pageResults.filter(item => { - const uniqueKey = JSON.stringify(item); - if (scrapedItems.has(uniqueKey)) return false; - scrapedItems.add(uniqueKey); - return true; - }); + // Process each assigned URL + for (const [pageIndex, pageUrl] of config.pageUrls.entries()) { + try { + const navigationResult = await page.goto(pageUrl, { + waitUntil: 'networkidle', + timeout: 30000 + }).catch(error => { + console.error( + `Worker ${config.workerIndex + 1}: Navigation failed for ${pageUrl}:`, + error + ); + return null; + }); - if (newResults.length === 0) { - consecutiveEmptyPages++; - if (consecutiveEmptyPages >= maxEmptyPages) { - console.log(`Worker ${config.workerIndex + 1}: Stopped after ${maxEmptyPages} empty pages`); - break; - } - } else { - consecutiveEmptyPages = 0; - } + if (!navigationResult) { + continue; + } - results = results.concat(newResults); - - // Handle pagination based on type - const paginationSuccess = await this.handleWorkerPagination( - page, - config, - previousHeight, - sharedState.visitedUrls - ); - - if (!paginationSuccess) { - console.log(`Worker ${config.workerIndex + 1}: Pagination ended`); - break; - } + await page.waitForLoadState('networkidle').catch(() => {}); - // Update progress - console.log(`Worker ${config.workerIndex + 1}: ${results.length}/${config.batchSize} items`); - - // Respect the batch limit - if (results.length >= config.batchSize) { - break; - } + const scrapeConfig = { + listSelector: config.listSelector, + fields: config.fields, + pagination: config.pagination, + limit: config.endIndex - config.startIndex - results.length + }; - // Add small delay between pages to avoid overwhelming the server - await page.waitForTimeout(1000); - } + console.log(`Worker ${config.workerIndex + 1}, Config limit: ${scrapeConfig.limit}`); - return results; -} + const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), scrapeConfig); -// Enhanced pagination handler for workers -private async handleWorkerPagination( - page: Page, - config: WorkerConfig, - previousHeight: number, - visitedUrls: Set -): Promise { - try { - switch (config.pagination.type) { - case 'scrollDown': - return await this.handleScrollDownPagination(page, previousHeight); - - case 'scrollUp': - return await this.handleScrollUpPagination(page, previousHeight); - - case 'clickNext': - return await this.handleClickNextPagination(page, config.pagination.selector, visitedUrls); - - case 'clickLoadMore': - return await this.handleLoadMorePagination(page, config.pagination.selector, previousHeight); - - default: - console.log(`Worker ${config.workerIndex + 1}: Unknown pagination type`); - return false; - } - } catch (error) { - console.error(`Pagination error in worker ${config.workerIndex + 1}:`, error); - return false; - } -} + // Process and filter results + const newResults = pageResults + .filter(item => { + const uniqueKey = JSON.stringify(item); + if (scrapedItems.has(uniqueKey)) return false; + scrapedItems.add(uniqueKey); + return true; + }) -// Specific pagination type handlers -private async handleScrollDownPagination(page: Page, previousHeight: number): Promise { - await page.evaluate(() => window.scrollTo(0, document.body.scrollHeight)); - await page.waitForTimeout(2000); + results.push(...newResults); + sharedState.totalScraped += newResults.length; + sharedState.visitedUrls.add(pageUrl); - const currentHeight = await page.evaluate(() => document.body.scrollHeight); - return currentHeight !== previousHeight; -} + console.log( + `Worker ${config.workerIndex + 1}: ` + + `Completed page ${pageIndex + 1}/${config.pageUrls.length} - ` + + `Total items: ${results.length}/${config.batchSize}` + ); -private async handleScrollUpPagination(page: Page, previousHeight: number): Promise { - await page.evaluate(() => window.scrollTo(0, 0)); - await page.waitForTimeout(2000); + if (results.length >= config.batchSize) { + console.log(`Worker ${config.workerIndex + 1}: Reached batch limit of ${config.batchSize}`); + break; + } - const currentTopHeight = await page.evaluate(() => document.documentElement.scrollTop); - return currentTopHeight !== 0; -} + await page.waitForTimeout(1000); -private async handleClickNextPagination( - page: Page, - selectors: string, - visitedUrls: Set -): Promise { - const availableSelectors = selectors.split(','); - - for (const selector of availableSelectors) { - try { - const button = await page.waitForSelector(selector, { - state: 'attached', - timeout: 5000 - }); - - if (button) { - const currentUrl = page.url(); - if (visitedUrls.has(currentUrl)) { - continue; - } - - visitedUrls.add(currentUrl); - - try { - await Promise.race([ - Promise.all([ - page.waitForNavigation({ - waitUntil: 'networkidle', - timeout: 30000 - }), - button.click() - ]), - Promise.all([ - page.waitForNavigation({ - waitUntil: 'networkidle', - timeout: 30000 - }), - button.dispatchEvent('click') - ]) - ]); - - // Verify navigation succeeded - const newUrl = page.url(); - if (newUrl === currentUrl) { - console.log("Navigation failed - URL didn't change"); - continue; - } - - return true; - } catch (error) { - console.log(`Click navigation failed for selector ${selector}:`); - continue; - } - } - } catch (error) { - continue; - } - } - - return false; -} + } catch (error) { + console.error( + `Worker ${config.workerIndex + 1}: Error processing page ${pageUrl}:`, + error + ); + continue; + } + } -private async handleLoadMorePagination( - page: Page, - selectors: string, - previousHeight: number -): Promise { - const availableSelectors = selectors.split(','); - - for (const selector of availableSelectors) { - try { - const button = await page.waitForSelector(selector, { - state: 'attached', - timeout: 5000 - }); - - if (button) { - try { - await Promise.race([ - button.click(), - button.dispatchEvent('click') - ]); - - await page.waitForTimeout(2000); - await page.evaluate(() => window.scrollTo(0, document.body.scrollHeight)); - - const currentHeight = await page.evaluate(() => document.body.scrollHeight); - return currentHeight !== previousHeight; - } catch (error) { - console.log(`Load more action failed for selector ${selector}:`, error); - continue; - } - } - } catch (error) { - continue; - } - } - - return false; + console.log(`Worker ${config.workerIndex + 1}: Completed with ${results.length} items scraped`); + return results; } private async handlePagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) { From 352447bfa4a7c898670f48de5cd621b42fd37a6c Mon Sep 17 00:00:00 2001 From: Rohit Date: Wed, 15 Jan 2025 22:48:12 +0530 Subject: [PATCH 04/21] feat: add parallel scraping support for scroll pagination types --- maxun-core/src/interpret.ts | 656 +++++++++++++++++++++++++++--------- 1 file changed, 501 insertions(+), 155 deletions(-) diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index 11a307f6c..420a1c9c9 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -61,6 +61,17 @@ interface ScrapeListConfig { pagination: any; } +interface ScrollRange { + start: number; + end: number; +} + +interface LoadMoreConfig { + totalLoads: number; + startLoad: number; + endLoad: number; +} + interface WorkerConfig extends ScrapeListConfig { workerIndex: number; startIndex: number; @@ -70,7 +81,10 @@ interface WorkerConfig extends ScrapeListConfig { type: 'scrollDown' | 'scrollUp' | 'clickNext' | 'clickLoadMore'; selector: string; }; - pageUrls: string[]; + + pageUrls?: string[]; + scrollRange?: ScrollRange; + loadMoreConfig?: LoadMoreConfig; } /** @@ -575,195 +589,526 @@ export default class Interpreter extends EventEmitter { const context = page.context(); const numWorkers = 4; // Number of parallel processes const batchSize = Math.ceil(config.limit / numWorkers); + const pageUrls: string[] = []; - // First, let's analyze the first page to understand our pagination needs - console.log('Analyzing first page...'); - const firstPageResults = await page.evaluate((cfg) => window.scrapeList(cfg), { - listSelector: config.listSelector, - fields: config.fields, - pagination: config.pagination - }); - - const itemsPerPage = firstPageResults.length; - const estimatedPages = Math.ceil(config.limit / itemsPerPage); - console.log(`Items per page: ${itemsPerPage}`); - console.log(`Estimated pages needed: ${estimatedPages}`); + let workers: any = null; + let availableSelectors = config.pagination.selector.split(','); + let visitedUrls: string[] = []; - const pageUrls: string[] = []; + const sharedState: SharedState = { + totalScraped: 0, + visitedUrls: new Set(), + results: [] + }; - try { - let availableSelectors = config.pagination.selector.split(','); - let visitedUrls: string[] = []; - - while (true) { - pageUrls.push(page.url()) + switch (config.pagination.type) { + case "scrollDown": + case "scrollUp": { + console.log('Calculating total scrollable height...'); + + // Initialize variables to track scroll progress + let previousHeight = 0; + let totalHeight = await page.evaluate(() => document.body.scrollHeight); + let noChangeCount = 0; + const maxNoChanges = 3; // Number of times we'll allow no height change before concluding + + // Keep scrolling until we stop seeing height changes + while (true) { + await page.waitForTimeout(2000); - if (pageUrls.length >= estimatedPages) { - console.log('Reached estimated number of pages. Stopping pagination.'); - break; + // Scroll based on direction + if (config.pagination.type === 'scrollDown') { + await page.evaluate(() => window.scrollTo(0, document.body.scrollHeight)); + } else { + // For scroll up, we start at bottom and scroll up in chunks + await page.evaluate((height) => { + window.scrollTo(0, Math.max(0, height - window.innerHeight)); // Using coordinate syntax + }, previousHeight); + } + + // Give time for content to load + await page.waitForTimeout(2000); + + // Check new height + const currentHeight = await page.evaluate(() => document.body.scrollHeight); + console.log(`Current scroll height: ${currentHeight}px`); + + if (currentHeight === previousHeight) { + noChangeCount++; + console.log(`No height change detected (attempt ${noChangeCount}/${maxNoChanges})`); + + if (noChangeCount >= maxNoChanges) { + console.log('Reached stable scroll height'); + totalHeight = currentHeight; + break; + } + } else { + // Reset counter if height changed + noChangeCount = 0; + console.log(`Height increased by ${currentHeight - previousHeight}px`); + } + + previousHeight = currentHeight; } + + console.log(`Final total scrollable height: ${totalHeight}px`); + + const overlap = 200; // Pixels of overlap between workers + const effectiveHeight = totalHeight + (overlap * (numWorkers - 1)); + const heightPerWorker = Math.floor(effectiveHeight / numWorkers); + + workers = await Promise.all(Array.from({ length: numWorkers }, async (_, index) => { + const workerContext = await context.browser().newContext(); + const workerPage = await workerContext.newPage(); + await this.ensureScriptsLoaded(workerPage); - let checkButton = null; - let workingSelector = null; + // Calculate scroll ranges for this worker + const startHeight = Math.max(0, index * heightPerWorker - (index > 0 ? overlap : 0)); + const endHeight = (index === numWorkers - 1) ? totalHeight : (index + 1) * heightPerWorker; - for (let i = 0; i < availableSelectors.length; i++) { - const selector = availableSelectors[i]; try { - // Wait for selector with a short timeout - checkButton = await page.waitForSelector(selector, { state: 'attached' }); - if (checkButton) { - workingSelector = selector; - break; - } - } catch (error) { - console.log(`Selector failed: ${selector}`); + await workerPage.goto(page.url()); + await workerPage.waitForLoadState('networkidle'); + + const workerConfig: WorkerConfig = { + ...config, + workerIndex: index, + startIndex: index * batchSize, + endIndex: Math.min((index + 1) * batchSize, config.limit), + batchSize, + scrollRange: { start: startHeight, end: endHeight } + }; + + return await this.scrapeScrollWorkerBatch( + workerPage, + workerConfig, + sharedState + ); + } finally { + await workerPage.close(); + await workerContext.close(); } - } + })); + break; + } - if(!workingSelector) { - break; - } + case "clickLoadMore": { + // For clickLoadMore, each worker handles a portion of the total clicks + const firstPageResults = await page.evaluate((cfg) => window.scrapeList(cfg), { + listSelector: config.listSelector, + fields: config.fields, + pagination: config.pagination + }); - const nextButton = await page.$(workingSelector); - if (!nextButton) { - break; - } + const itemsPerLoad = firstPageResults.length; + const estimatedLoads = Math.ceil(config.limit / itemsPerLoad); + const loadsPerWorker = Math.ceil(estimatedLoads / numWorkers); - const selectorIndex = availableSelectors.indexOf(workingSelector!); - availableSelectors = availableSelectors.slice(selectorIndex); + workers = await Promise.all(Array.from({ length: numWorkers }, async (_, index) => { + const workerContext = await context.browser().newContext(); + const workerPage = await workerContext.newPage(); + await this.ensureScriptsLoaded(workerPage); - const previousUrl = page.url(); - visitedUrls.push(previousUrl); + try { + await workerPage.goto(page.url()); + await workerPage.waitForLoadState('networkidle'); + + const workerConfig: WorkerConfig = { + ...config, + workerIndex: index, + startIndex: index * batchSize, + endIndex: Math.min((index + 1) * batchSize, config.limit), + batchSize, + loadMoreConfig: { + totalLoads: loadsPerWorker, + startLoad: index * loadsPerWorker, + endLoad: Math.min((index + 1) * loadsPerWorker, estimatedLoads) + } + }; - try { - // Try both click methods simultaneously - await Promise.race([ - Promise.all([ - page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), - nextButton.click() - ]), - Promise.all([ - page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), - nextButton.dispatchEvent('click') - ]) - ]); - } catch (error) { - // Verify if navigation actually succeeded - const currentUrl = page.url(); - if (currentUrl === previousUrl) { - console.log("Previous URL same as current URL. Navigation failed."); + return await this.scrapeLoadMoreWorkerBatch( + workerPage, + workerConfig, + sharedState + ); + } finally { + await workerPage.close(); + await workerContext.close(); } - } + })); + break; + } - const currentUrl = page.url(); - if (visitedUrls.includes(currentUrl)) { - console.log(`Detected navigation to a previously visited URL: ${currentUrl}`); + case "clickNext": + const firstPageResults = await page.evaluate((cfg) => window.scrapeList(cfg), { + listSelector: config.listSelector, + fields: config.fields, + pagination: config.pagination + }); + + const itemsPerPage = firstPageResults.length; + const estimatedPages = Math.ceil(config.limit / itemsPerPage); + console.log(`Items per page: ${itemsPerPage}`); + console.log(`Estimated pages needed: ${estimatedPages}`); + + try { + while (true) { + pageUrls.push(page.url()) + + if (pageUrls.length >= estimatedPages) { + console.log('Reached estimated number of pages. Stopping pagination.'); + break; + } + + let checkButton = null; + let workingSelector = null; + + for (let i = 0; i < availableSelectors.length; i++) { + const selector = availableSelectors[i]; + try { + // Wait for selector with a short timeout + checkButton = await page.waitForSelector(selector, { state: 'attached' }); + if (checkButton) { + workingSelector = selector; + break; + } + } catch (error) { + console.log(`Selector failed: ${selector}`); + } + } - // Extract the current page number from the URL - const match = currentUrl.match(/\d+/); - if (match) { - const currentNumber = match[0]; - // Use visitedUrls.length + 1 as the next page number - const nextNumber = visitedUrls.length + 1; - - // Create new URL by replacing the current number with the next number - const nextUrl = currentUrl.replace(currentNumber, nextNumber.toString()); - - console.log(`Navigating to constructed URL: ${nextUrl}`); - - // Navigate to the next page - await Promise.all([ - page.waitForNavigation({ waitUntil: 'networkidle' }), - page.goto(nextUrl) + if(!workingSelector) { + break; + } + + const nextButton = await page.$(workingSelector); + if (!nextButton) { + break; + } + + const selectorIndex = availableSelectors.indexOf(workingSelector!); + availableSelectors = availableSelectors.slice(selectorIndex); + + const previousUrl = page.url(); + visitedUrls.push(previousUrl); + + try { + // Try both click methods simultaneously + await Promise.race([ + Promise.all([ + page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), + nextButton.click() + ]), + Promise.all([ + page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), + nextButton.dispatchEvent('click') + ]) ]); + } catch (error) { + // Verify if navigation actually succeeded + const currentUrl = page.url(); + if (currentUrl === previousUrl) { + console.log("Previous URL same as current URL. Navigation failed."); + } } + + const currentUrl = page.url(); + if (visitedUrls.includes(currentUrl)) { + console.log(`Detected navigation to a previously visited URL: ${currentUrl}`); + + // Extract the current page number from the URL + const match = currentUrl.match(/\d+/); + if (match) { + const currentNumber = match[0]; + // Use visitedUrls.length + 1 as the next page number + const nextNumber = visitedUrls.length + 1; + + // Create new URL by replacing the current number with the next number + const nextUrl = currentUrl.replace(currentNumber, nextNumber.toString()); + + console.log(`Navigating to constructed URL: ${nextUrl}`); + + // Navigate to the next page + await Promise.all([ + page.waitForNavigation({ waitUntil: 'networkidle' }), + page.goto(nextUrl) + ]); + } + } + + await page.waitForTimeout(1000); } - - await page.waitForTimeout(1000); + } catch (error) { + console.error('Error collecting page URLs:', error); } - } catch (error) { - console.error('Error collecting page URLs:', error); + + console.log(`Collected ${pageUrls.length} unique page URLs`); + + // Distribute pages among workers + const pagesPerWorker = Math.ceil(pageUrls.length / numWorkers); + const workerPageAssignments = Array.from({ length: numWorkers }, (_, i) => { + const start = i * pagesPerWorker; + const end = Math.min(start + pagesPerWorker, pageUrls.length); + return pageUrls.slice(start, end); + }); + + // Create shared state for coordination + + console.log(`Starting parallel scraping with ${numWorkers} workers`); + console.log(`Each worker will handle ${batchSize} items`); + + // Create worker processes + workers = await Promise.all(Array.from({ length: numWorkers }, async (_, index) => { + // Each worker gets its own browser context + const workerContext = await context.browser().newContext(); + const workerPage = await workerContext.newPage(); + + await this.ensureScriptsLoaded(workerPage); + + // Calculate this worker's range + const startIndex = index * batchSize; + const endIndex = Math.min((index + 1) * batchSize, config.limit); + + console.log(`Worker ${index + 1} starting: Items ${startIndex} to ${endIndex}`); + + try { + // Navigate to the same URL + await workerPage.goto(page.url()); + await workerPage.waitForLoadState('networkidle'); + + // Create worker-specific config + const workerConfig: WorkerConfig = { + ...config, + workerIndex: index, + startIndex, + endIndex, + batchSize, + pageUrls: workerPageAssignments[index] + }; + + console.log(`Worker ${index} URLs: ${workerPageAssignments[index]}`); + + // Perform scraping for this worker's portion + const workerResults = await this.scrapeClickNextWorkerBatch( + workerPage, + workerConfig, + sharedState + ); + + return workerResults; + } finally { + await workerPage.close(); + await workerContext.close(); + } + })); + break; + + default: + return this.handlePagination(page, config); } - console.log(`Collected ${pageUrls.length} unique page URLs`); + // Combine and process results from all workers + const allResults = workers.flat().sort((a, b) => { + // Sort by any unique identifier in your data + // This example assumes items have an index or id field + return (a.index || 0) - (b.index || 0); + }); - // Distribute pages among workers - const pagesPerWorker = Math.ceil(pageUrls.length / numWorkers); - const workerPageAssignments = Array.from({ length: numWorkers }, (_, i) => { - const start = i * pagesPerWorker; - const end = Math.min(start + pagesPerWorker, pageUrls.length); - return pageUrls.slice(start, end); - }); - - // Create shared state for coordination - const sharedState: SharedState = { - totalScraped: 0, - visitedUrls: new Set(), - results: [] - }; + console.timeEnd('parallel-scraping'); + console.log(`Total items scraped: ${allResults.length}`); - console.log(`Starting parallel scraping with ${numWorkers} workers`); - console.log(`Each worker will handle ${batchSize} items`); + return allResults.slice(0, config.limit); +} - // Create worker processes - const workers = await Promise.all(Array.from({ length: numWorkers }, async (_, index) => { - // Each worker gets its own browser context - const workerContext = await context.browser().newContext(); - const workerPage = await workerContext.newPage(); +private async scrapeScrollWorkerBatch( + page: Page, + config: WorkerConfig, + sharedState: SharedState +): Promise { + const results: any[] = []; + const scrapedItems = new Set(); + + console.log(`Worker ${config.workerIndex + 1}: Starting to process scroll range ${config.scrollRange.start} to ${config.scrollRange.end}`); - await this.ensureScriptsLoaded(workerPage); - - // Calculate this worker's range - const startIndex = index * batchSize; - const endIndex = Math.min((index + 1) * batchSize, config.limit); - - console.log(`Worker ${index + 1} starting: Items ${startIndex} to ${endIndex}`); - - try { - // Navigate to the same URL - await workerPage.goto(page.url()); - await workerPage.waitForLoadState('networkidle'); - - // Create worker-specific config - const workerConfig: WorkerConfig = { - ...config, - workerIndex: index, - startIndex, - endIndex, - batchSize, - pageUrls: workerPageAssignments[index] - }; + // First, ensure the page is loaded and ready + await page.waitForLoadState('networkidle'); - console.log(`Worker ${index} URLs: ${workerPageAssignments[index]}`); + // Initialize scrolling parameters with smaller steps for better coverage + const scrollStep = Math.floor(await page.evaluate(() => window.innerHeight * 0.8)); // 80% of viewport height + let currentPosition = config.pagination.type === 'scrollDown' + ? config.scrollRange.start + : config.scrollRange.end; - // Perform scraping for this worker's portion - const workerResults = await this.scrapeWorkerBatch( - workerPage, - workerConfig, - sharedState - ); + // Wait for page to be ready + await page.waitForTimeout(2000); - return workerResults; - } finally { - await workerPage.close(); - await workerContext.close(); - } - })); + // First, scroll to the worker's starting position + await page.evaluate((position) => { + window.scrollTo(0, position); + }, currentPosition); - // Combine and process results from all workers - const allResults = workers.flat().sort((a, b) => { - // Sort by any unique identifier in your data - // This example assumes items have an index or id field - return (a.index || 0) - (b.index || 0); - }); + // Allow time for initial content to load + await page.waitForTimeout(2000); + + let previousHeight = await page.evaluate(() => document.body.scrollHeight); + let noChangeCount = 0; + const maxNoChanges = 3; + + while (true) { + // Calculate next scroll position based on direction + const nextPosition = config.pagination.type === 'scrollDown' + ? Math.min(currentPosition + scrollStep, config.scrollRange.end) + : Math.max(currentPosition - scrollStep, config.scrollRange.start); + + // Perform scroll + await page.evaluate((scrollTo) => { + window.scrollTo(0, scrollTo); + }, nextPosition); + + // Allow time for new content to load + await page.waitForTimeout(2000); + + // Check if the page height has changed + const currentHeight = await page.evaluate(() => document.body.scrollHeight); + + // Scrape the entire visible range, not just the viewport + const pageResults = await page.evaluate((cfg) => { + // Ensure we capture all items in the current scroll range + return window.scrapeList({...cfg}); + }, { + ...config, + limit: config.endIndex - config.startIndex - results.length + }); + + // Process new results + const newResults = pageResults.filter(item => { + const uniqueKey = JSON.stringify(item); + if (!scrapedItems.has(uniqueKey)) { + scrapedItems.add(uniqueKey); + return true; + } + return false; + }); + + if (newResults.length > 0) { + console.log(`Worker ${config.workerIndex + 1}: Found ${newResults.length} new items at position ${nextPosition}`); + } + + results.push(...newResults); + sharedState.totalScraped += newResults.length; + + // Check if we've reached content boundaries + if (currentHeight === previousHeight) { + noChangeCount++; + if (noChangeCount >= maxNoChanges) { + console.log(`Worker ${config.workerIndex + 1}: No new content after ${maxNoChanges} attempts`); + break; + } + } else { + noChangeCount = 0; + previousHeight = currentHeight; + } + + // Update position and check boundaries + currentPosition = nextPosition; + + const reachedBoundary = config.pagination.type === 'scrollDown' + ? currentPosition >= config.scrollRange.end + : currentPosition <= config.scrollRange.start; - console.timeEnd('parallel-scraping'); - console.log(`Total items scraped: ${allResults.length}`); + if (reachedBoundary) { + console.log(`Worker ${config.workerIndex + 1}: Reached assigned boundary`); + break; + } + + // Check if we've reached our batch limit + if (results.length >= config.batchSize || + sharedState.totalScraped >= config.limit) { + console.log(`Worker ${config.workerIndex + 1}: Reached item limit`); + break; + } + } + + console.log(`Worker ${config.workerIndex + 1}: Completed with ${results.length} items`); + return results; +} + +private async scrapeLoadMoreWorkerBatch( + page: Page, + config: WorkerConfig, + sharedState: SharedState +): Promise { + const results: any[] = []; + const scrapedItems = new Set(); + let loadCount = 0; + let availableSelectors = config.pagination.selector.split(','); + + console.log(`Worker ${config.workerIndex + 1}: Processing loads ${config.loadMoreConfig.startLoad} to ${config.loadMoreConfig.endLoad}`); + + while (loadCount < config.loadMoreConfig.totalLoads) { + let workingSelector = null; + for (const selector of availableSelectors) { + try { + const checkButton = await page.waitForSelector(selector, { + state: 'attached', + timeout: 5000 + }); + if (checkButton) { + workingSelector = selector; + break; + } + } catch (error) { + continue; + } + } + + if (!workingSelector) { + break; + } + + const loadMoreButton = await page.$(workingSelector); + if (!loadMoreButton) { + break; + } + + try { + await Promise.race([ + loadMoreButton.click(), + loadMoreButton.dispatchEvent('click') + ]); + } catch (error) { + console.log(`Worker ${config.workerIndex + 1}: Click failed, trying next selector`); + continue; + } + + await page.waitForTimeout(2000); + loadCount++; + + const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), { + ...config, + limit: config.endIndex - config.startIndex - results.length + }); + + const newResults = pageResults.filter(item => { + const uniqueKey = JSON.stringify(item); + if (scrapedItems.has(uniqueKey)) return false; + scrapedItems.add(uniqueKey); + return true; + }); + + results.push(...newResults); + sharedState.totalScraped += newResults.length; + + if (results.length >= config.batchSize) { + break; + } + } - return allResults.slice(0, config.limit); + console.log(`Worker ${config.workerIndex + 1}: Completed with ${results.length} items scraped`); + return results; } -private async scrapeWorkerBatch( +private async scrapeClickNextWorkerBatch( page: Page, config: WorkerConfig, sharedState: SharedState @@ -860,6 +1205,7 @@ private async scrapeWorkerBatch( await page.waitForTimeout(2000); const currentHeight = await page.evaluate(() => document.body.scrollHeight); + console.log(`Current scroll height: ${currentHeight}`); if (currentHeight === previousHeight) { const finalResults = await page.evaluate((cfg) => window.scrapeList(cfg), config); allResults = allResults.concat(finalResults); From 4ffbcba30d3c01fe30522bd6196043e19cdf3726 Mon Sep 17 00:00:00 2001 From: Rohit Date: Thu, 16 Jan 2025 00:16:29 +0530 Subject: [PATCH 05/21] feat: rm parallelism for paginations except click next --- maxun-core/src/interpret.ts | 532 +++++++----------------------------- 1 file changed, 99 insertions(+), 433 deletions(-) diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index 420a1c9c9..455e98cda 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -580,189 +580,46 @@ export default class Interpreter extends EventEmitter { } private async handleParallelPagination(page: Page, config: any): Promise { - // Only use parallel processing for large datasets - if (!config.limit || config.limit < 10000) { - return this.handlePagination(page, config); - } - - console.time('parallel-scraping'); - const context = page.context(); - const numWorkers = 4; // Number of parallel processes - const batchSize = Math.ceil(config.limit / numWorkers); - const pageUrls: string[] = []; - - let workers: any = null; - let availableSelectors = config.pagination.selector.split(','); - let visitedUrls: string[] = []; - - const sharedState: SharedState = { - totalScraped: 0, - visitedUrls: new Set(), - results: [] - }; - - switch (config.pagination.type) { - case "scrollDown": - case "scrollUp": { - console.log('Calculating total scrollable height...'); - - // Initialize variables to track scroll progress - let previousHeight = 0; - let totalHeight = await page.evaluate(() => document.body.scrollHeight); - let noChangeCount = 0; - const maxNoChanges = 3; // Number of times we'll allow no height change before concluding - - // Keep scrolling until we stop seeing height changes - while (true) { - await page.waitForTimeout(2000); - - // Scroll based on direction - if (config.pagination.type === 'scrollDown') { - await page.evaluate(() => window.scrollTo(0, document.body.scrollHeight)); - } else { - // For scroll up, we start at bottom and scroll up in chunks - await page.evaluate((height) => { - window.scrollTo(0, Math.max(0, height - window.innerHeight)); // Using coordinate syntax - }, previousHeight); - } - - // Give time for content to load - await page.waitForTimeout(2000); - - // Check new height - const currentHeight = await page.evaluate(() => document.body.scrollHeight); - console.log(`Current scroll height: ${currentHeight}px`); - - if (currentHeight === previousHeight) { - noChangeCount++; - console.log(`No height change detected (attempt ${noChangeCount}/${maxNoChanges})`); - - if (noChangeCount >= maxNoChanges) { - console.log('Reached stable scroll height'); - totalHeight = currentHeight; - break; - } - } else { - // Reset counter if height changed - noChangeCount = 0; - console.log(`Height increased by ${currentHeight - previousHeight}px`); - } - - previousHeight = currentHeight; - } - - console.log(`Final total scrollable height: ${totalHeight}px`); - - const overlap = 200; // Pixels of overlap between workers - const effectiveHeight = totalHeight + (overlap * (numWorkers - 1)); - const heightPerWorker = Math.floor(effectiveHeight / numWorkers); - - workers = await Promise.all(Array.from({ length: numWorkers }, async (_, index) => { - const workerContext = await context.browser().newContext(); - const workerPage = await workerContext.newPage(); - await this.ensureScriptsLoaded(workerPage); - - // Calculate scroll ranges for this worker - const startHeight = Math.max(0, index * heightPerWorker - (index > 0 ? overlap : 0)); - const endHeight = (index === numWorkers - 1) ? totalHeight : (index + 1) * heightPerWorker; - - try { - await workerPage.goto(page.url()); - await workerPage.waitForLoadState('networkidle'); - - const workerConfig: WorkerConfig = { - ...config, - workerIndex: index, - startIndex: index * batchSize, - endIndex: Math.min((index + 1) * batchSize, config.limit), - batchSize, - scrollRange: { start: startHeight, end: endHeight } - }; - - return await this.scrapeScrollWorkerBatch( - workerPage, - workerConfig, - sharedState - ); - } finally { - await workerPage.close(); - await workerContext.close(); - } - })); - break; - } - - case "clickLoadMore": { - // For clickLoadMore, each worker handles a portion of the total clicks - const firstPageResults = await page.evaluate((cfg) => window.scrapeList(cfg), { - listSelector: config.listSelector, - fields: config.fields, - pagination: config.pagination - }); - - const itemsPerLoad = firstPageResults.length; - const estimatedLoads = Math.ceil(config.limit / itemsPerLoad); - const loadsPerWorker = Math.ceil(estimatedLoads / numWorkers); - - workers = await Promise.all(Array.from({ length: numWorkers }, async (_, index) => { - const workerContext = await context.browser().newContext(); - const workerPage = await workerContext.newPage(); - await this.ensureScriptsLoaded(workerPage); - - try { - await workerPage.goto(page.url()); - await workerPage.waitForLoadState('networkidle'); - - const workerConfig: WorkerConfig = { - ...config, - workerIndex: index, - startIndex: index * batchSize, - endIndex: Math.min((index + 1) * batchSize, config.limit), - batchSize, - loadMoreConfig: { - totalLoads: loadsPerWorker, - startLoad: index * loadsPerWorker, - endLoad: Math.min((index + 1) * loadsPerWorker, estimatedLoads) - } - }; - - return await this.scrapeLoadMoreWorkerBatch( - workerPage, - workerConfig, - sharedState - ); - } finally { - await workerPage.close(); - await workerContext.close(); - } - })); - break; - } + if (config.limit > 10000 && config.pagination.type === 'clickNext') { + console.time('parallel-scraping'); + const context = page.context(); + const numWorkers = 4; // Number of parallel processes + const batchSize = Math.ceil(config.limit / numWorkers); + const pageUrls: string[] = []; + + let workers: any = null; + let availableSelectors = config.pagination.selector.split(','); + let visitedUrls: string[] = []; + + const sharedState: SharedState = { + totalScraped: 0, + visitedUrls: new Set(), + results: [] + }; - case "clickNext": const firstPageResults = await page.evaluate((cfg) => window.scrapeList(cfg), { listSelector: config.listSelector, fields: config.fields, pagination: config.pagination }); - + const itemsPerPage = firstPageResults.length; const estimatedPages = Math.ceil(config.limit / itemsPerPage); console.log(`Items per page: ${itemsPerPage}`); console.log(`Estimated pages needed: ${estimatedPages}`); - + try { while (true) { pageUrls.push(page.url()) - + if (pageUrls.length >= estimatedPages) { console.log('Reached estimated number of pages. Stopping pagination.'); break; } - + let checkButton = null; let workingSelector = null; - + for (let i = 0; i < availableSelectors.length; i++) { const selector = availableSelectors[i]; try { @@ -780,18 +637,18 @@ export default class Interpreter extends EventEmitter { if(!workingSelector) { break; } - + const nextButton = await page.$(workingSelector); if (!nextButton) { break; } - + const selectorIndex = availableSelectors.indexOf(workingSelector!); availableSelectors = availableSelectors.slice(selectorIndex); - + const previousUrl = page.url(); visitedUrls.push(previousUrl); - + try { // Try both click methods simultaneously await Promise.race([ @@ -811,7 +668,7 @@ export default class Interpreter extends EventEmitter { console.log("Previous URL same as current URL. Navigation failed."); } } - + const currentUrl = page.url(); if (visitedUrls.includes(currentUrl)) { console.log(`Detected navigation to a previously visited URL: ${currentUrl}`); @@ -841,9 +698,9 @@ export default class Interpreter extends EventEmitter { } catch (error) { console.error('Error collecting page URLs:', error); } - + console.log(`Collected ${pageUrls.length} unique page URLs`); - + // Distribute pages among workers const pagesPerWorker = Math.ceil(pageUrls.length / numWorkers); const workerPageAssignments = Array.from({ length: numWorkers }, (_, i) => { @@ -853,16 +710,16 @@ export default class Interpreter extends EventEmitter { }); // Create shared state for coordination - + console.log(`Starting parallel scraping with ${numWorkers} workers`); console.log(`Each worker will handle ${batchSize} items`); - + // Create worker processes workers = await Promise.all(Array.from({ length: numWorkers }, async (_, index) => { // Each worker gets its own browser context const workerContext = await context.browser().newContext(); const workerPage = await workerContext.newPage(); - + await this.ensureScriptsLoaded(workerPage); // Calculate this worker's range @@ -885,296 +742,106 @@ export default class Interpreter extends EventEmitter { batchSize, pageUrls: workerPageAssignments[index] }; - + console.log(`Worker ${index} URLs: ${workerPageAssignments[index]}`); - + // Perform scraping for this worker's portion const workerResults = await this.scrapeClickNextWorkerBatch( workerPage, workerConfig, sharedState ); - + return workerResults; } finally { await workerPage.close(); await workerContext.close(); } })); - break; - - default: - return this.handlePagination(page, config); - } - - // Combine and process results from all workers - const allResults = workers.flat().sort((a, b) => { - // Sort by any unique identifier in your data - // This example assumes items have an index or id field - return (a.index || 0) - (b.index || 0); - }); - - console.timeEnd('parallel-scraping'); - console.log(`Total items scraped: ${allResults.length}`); - - return allResults.slice(0, config.limit); -} - -private async scrapeScrollWorkerBatch( - page: Page, - config: WorkerConfig, - sharedState: SharedState -): Promise { - const results: any[] = []; - const scrapedItems = new Set(); - - console.log(`Worker ${config.workerIndex + 1}: Starting to process scroll range ${config.scrollRange.start} to ${config.scrollRange.end}`); - - // First, ensure the page is loaded and ready - await page.waitForLoadState('networkidle'); - - // Initialize scrolling parameters with smaller steps for better coverage - const scrollStep = Math.floor(await page.evaluate(() => window.innerHeight * 0.8)); // 80% of viewport height - let currentPosition = config.pagination.type === 'scrollDown' - ? config.scrollRange.start - : config.scrollRange.end; - - // Wait for page to be ready - await page.waitForTimeout(2000); - - // First, scroll to the worker's starting position - await page.evaluate((position) => { - window.scrollTo(0, position); - }, currentPosition); - - // Allow time for initial content to load - await page.waitForTimeout(2000); - - let previousHeight = await page.evaluate(() => document.body.scrollHeight); - let noChangeCount = 0; - const maxNoChanges = 3; - - while (true) { - // Calculate next scroll position based on direction - const nextPosition = config.pagination.type === 'scrollDown' - ? Math.min(currentPosition + scrollStep, config.scrollRange.end) - : Math.max(currentPosition - scrollStep, config.scrollRange.start); - - // Perform scroll - await page.evaluate((scrollTo) => { - window.scrollTo(0, scrollTo); - }, nextPosition); - - // Allow time for new content to load - await page.waitForTimeout(2000); - - // Check if the page height has changed - const currentHeight = await page.evaluate(() => document.body.scrollHeight); - - // Scrape the entire visible range, not just the viewport - const pageResults = await page.evaluate((cfg) => { - // Ensure we capture all items in the current scroll range - return window.scrapeList({...cfg}); - }, { - ...config, - limit: config.endIndex - config.startIndex - results.length - }); - - // Process new results - const newResults = pageResults.filter(item => { - const uniqueKey = JSON.stringify(item); - if (!scrapedItems.has(uniqueKey)) { - scrapedItems.add(uniqueKey); - return true; - } - return false; - }); - - if (newResults.length > 0) { - console.log(`Worker ${config.workerIndex + 1}: Found ${newResults.length} new items at position ${nextPosition}`); - } - - results.push(...newResults); - sharedState.totalScraped += newResults.length; - - // Check if we've reached content boundaries - if (currentHeight === previousHeight) { - noChangeCount++; - if (noChangeCount >= maxNoChanges) { - console.log(`Worker ${config.workerIndex + 1}: No new content after ${maxNoChanges} attempts`); - break; - } - } else { - noChangeCount = 0; - previousHeight = currentHeight; - } - - // Update position and check boundaries - currentPosition = nextPosition; - - const reachedBoundary = config.pagination.type === 'scrollDown' - ? currentPosition >= config.scrollRange.end - : currentPosition <= config.scrollRange.start; - - if (reachedBoundary) { - console.log(`Worker ${config.workerIndex + 1}: Reached assigned boundary`); - break; - } - - // Check if we've reached our batch limit - if (results.length >= config.batchSize || - sharedState.totalScraped >= config.limit) { - console.log(`Worker ${config.workerIndex + 1}: Reached item limit`); - break; - } - } - - console.log(`Worker ${config.workerIndex + 1}: Completed with ${results.length} items`); - return results; -} - -private async scrapeLoadMoreWorkerBatch( - page: Page, - config: WorkerConfig, - sharedState: SharedState -): Promise { - const results: any[] = []; - const scrapedItems = new Set(); - let loadCount = 0; - let availableSelectors = config.pagination.selector.split(','); - - console.log(`Worker ${config.workerIndex + 1}: Processing loads ${config.loadMoreConfig.startLoad} to ${config.loadMoreConfig.endLoad}`); - - while (loadCount < config.loadMoreConfig.totalLoads) { - let workingSelector = null; - for (const selector of availableSelectors) { - try { - const checkButton = await page.waitForSelector(selector, { - state: 'attached', - timeout: 5000 - }); - if (checkButton) { - workingSelector = selector; - break; - } - } catch (error) { - continue; - } - } - - if (!workingSelector) { - break; - } - - const loadMoreButton = await page.$(workingSelector); - if (!loadMoreButton) { - break; - } - - try { - await Promise.race([ - loadMoreButton.click(), - loadMoreButton.dispatchEvent('click') - ]); - } catch (error) { - console.log(`Worker ${config.workerIndex + 1}: Click failed, trying next selector`); - continue; - } - - await page.waitForTimeout(2000); - loadCount++; - const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), { - ...config, - limit: config.endIndex - config.startIndex - results.length + // Combine and process results from all workers + const allResults = workers.flat().sort((a, b) => { + // Sort by any unique identifier in your data + // This example assumes items have an index or id field + return (a.index || 0) - (b.index || 0); }); - const newResults = pageResults.filter(item => { - const uniqueKey = JSON.stringify(item); - if (scrapedItems.has(uniqueKey)) return false; - scrapedItems.add(uniqueKey); - return true; - }); + console.timeEnd('parallel-scraping'); + console.log(`Total items scraped: ${allResults.length}`); - results.push(...newResults); - sharedState.totalScraped += newResults.length; + return allResults.slice(0, config.limit); + } - if (results.length >= config.batchSize) { - break; - } + return this.handlePagination(page, config); } - console.log(`Worker ${config.workerIndex + 1}: Completed with ${results.length} items scraped`); - return results; -} - -private async scrapeClickNextWorkerBatch( - page: Page, - config: WorkerConfig, - sharedState: SharedState -): Promise { - const results: any[] = []; - const scrapedItems = new Set(); - - // Now we can access the page URLs directly from the config - console.log(`Worker ${config.workerIndex + 1}: Processing ${config.pageUrls.length} assigned pages`); + private async scrapeClickNextWorkerBatch( + page: Page, + config: WorkerConfig, + sharedState: SharedState + ): Promise { + const results: any[] = []; + const scrapedItems = new Set(); + + // Now we can access the page URLs directly from the config + console.log(`Worker ${config.workerIndex + 1}: Processing ${config.pageUrls.length} assigned pages`); - // Process each assigned URL - for (const [pageIndex, pageUrl] of config.pageUrls.entries()) { + // Process each assigned URL + for (const [pageIndex, pageUrl] of config.pageUrls.entries()) { try { - const navigationResult = await page.goto(pageUrl, { - waitUntil: 'networkidle', - timeout: 30000 - }).catch(error => { - console.error( - `Worker ${config.workerIndex + 1}: Navigation failed for ${pageUrl}:`, - error - ); - return null; - }); + const navigationResult = await page.goto(pageUrl, { + waitUntil: 'networkidle', + timeout: 30000 + }).catch(error => { + console.error( + `Worker ${config.workerIndex + 1}: Navigation failed for ${pageUrl}:`, + error + ); + return null; + }); - if (!navigationResult) { - continue; - } + if (!navigationResult) { + continue; + } - await page.waitForLoadState('networkidle').catch(() => {}); + await page.waitForLoadState('networkidle').catch(() => {}); - const scrapeConfig = { - listSelector: config.listSelector, - fields: config.fields, - pagination: config.pagination, - limit: config.endIndex - config.startIndex - results.length - }; + const scrapeConfig = { + listSelector: config.listSelector, + fields: config.fields, + pagination: config.pagination, + limit: config.endIndex - config.startIndex - results.length + }; - console.log(`Worker ${config.workerIndex + 1}, Config limit: ${scrapeConfig.limit}`); + console.log(`Worker ${config.workerIndex + 1}, Config limit: ${scrapeConfig.limit}`); - const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), scrapeConfig); + const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), scrapeConfig); - // Process and filter results - const newResults = pageResults - .filter(item => { - const uniqueKey = JSON.stringify(item); - if (scrapedItems.has(uniqueKey)) return false; - scrapedItems.add(uniqueKey); - return true; - }) + // Process and filter results + const newResults = pageResults + .filter(item => { + const uniqueKey = JSON.stringify(item); + if (scrapedItems.has(uniqueKey)) return false; + scrapedItems.add(uniqueKey); + return true; + }) - results.push(...newResults); - sharedState.totalScraped += newResults.length; - sharedState.visitedUrls.add(pageUrl); + results.push(...newResults); + sharedState.totalScraped += newResults.length; + sharedState.visitedUrls.add(pageUrl); - console.log( - `Worker ${config.workerIndex + 1}: ` + - `Completed page ${pageIndex + 1}/${config.pageUrls.length} - ` + - `Total items: ${results.length}/${config.batchSize}` - ); + console.log( + `Worker ${config.workerIndex + 1}: ` + + `Completed page ${pageIndex + 1}/${config.pageUrls.length} - ` + + `Total items: ${results.length}/${config.batchSize}` + ); - if (results.length >= config.batchSize) { - console.log(`Worker ${config.workerIndex + 1}: Reached batch limit of ${config.batchSize}`); - break; - } + if (results.length >= config.batchSize) { + console.log(`Worker ${config.workerIndex + 1}: Reached batch limit of ${config.batchSize}`); + break; + } - await page.waitForTimeout(1000); + await page.waitForTimeout(1000); } catch (error) { console.error( @@ -1184,7 +851,6 @@ private async scrapeClickNextWorkerBatch( continue; } } - console.log(`Worker ${config.workerIndex + 1}: Completed with ${results.length} items scraped`); return results; } From 4c0eb300ab8f178908f6df65612642554db0a192 Mon Sep 17 00:00:00 2001 From: Rohit Date: Thu, 16 Jan 2025 12:13:58 +0530 Subject: [PATCH 06/21] feat: estimate number of items per page --- maxun-core/src/interpret.ts | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index 455e98cda..4f1bc1305 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -597,14 +597,17 @@ export default class Interpreter extends EventEmitter { results: [] }; - const firstPageResults = await page.evaluate((cfg) => window.scrapeList(cfg), { - listSelector: config.listSelector, - fields: config.fields, - pagination: config.pagination - }); - - const itemsPerPage = firstPageResults.length; - const estimatedPages = Math.ceil(config.limit / itemsPerPage); + const { itemsPerPage, estimatedPages } = await page.evaluate( + ({ listSelector, limit }) => { + const items = document.querySelectorAll(listSelector).length; + return { + itemsPerPage: items, + estimatedPages: Math.ceil(limit / items) + }; + }, + { listSelector: config.listSelector, limit: config.limit } + ); + console.log(`Items per page: ${itemsPerPage}`); console.log(`Estimated pages needed: ${estimatedPages}`); From bd25950d99a1aa138326f50cd7cdd0d3673e4dd5 Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 17 Jan 2025 13:57:36 +0530 Subject: [PATCH 07/21] chore: include all files from src dir --- maxun-core/tsconfig.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maxun-core/tsconfig.json b/maxun-core/tsconfig.json index 68c3ead79..728e4da92 100644 --- a/maxun-core/tsconfig.json +++ b/maxun-core/tsconfig.json @@ -7,5 +7,5 @@ "module": "commonjs", "esModuleInterop": true }, - "include": ["src"] + "include": ["src/**/*"], } From 8360d3de98d0ce9455071bb2d34e1221ae04ced9 Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 17 Jan 2025 13:58:41 +0530 Subject: [PATCH 08/21] feat: add worker pool to support parallelism for click next --- maxun-core/src/interpret.ts | 238 ++++++++---------------------------- 1 file changed, 50 insertions(+), 188 deletions(-) diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index 4f1bc1305..b5bfc740a 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -16,6 +16,10 @@ import Concurrency from './utils/concurrency'; import Preprocessor from './preprocessor'; import log, { Level } from './utils/logger'; +import { WorkerConfig } from './types/worker'; +import { WorkerPool } from './utils/worker-pool'; +import os from 'os'; + /** * Extending the Window interface for custom scraping functions. */ @@ -39,6 +43,7 @@ declare global { interface InterpreterOptions { maxRepeats: number; maxConcurrency: number; + maxWorkers: number; serializableCallback: (output: any) => (void | Promise); binaryCallback: (output: any, mimeType: string) => (void | Promise); debug: boolean; @@ -48,45 +53,6 @@ interface InterpreterOptions { }> } -interface SharedState { - totalScraped: number; - visitedUrls: Set; - results: any[]; -} - -interface ScrapeListConfig { - listSelector: string; - fields: any; - limit?: number; - pagination: any; -} - -interface ScrollRange { - start: number; - end: number; -} - -interface LoadMoreConfig { - totalLoads: number; - startLoad: number; - endLoad: number; -} - -interface WorkerConfig extends ScrapeListConfig { - workerIndex: number; - startIndex: number; - endIndex: number; - batchSize: number; - pagination: { - type: 'scrollDown' | 'scrollUp' | 'clickNext' | 'clickLoadMore'; - selector: string; - }; - - pageUrls?: string[]; - scrollRange?: ScrollRange; - loadMoreConfig?: LoadMoreConfig; -} - /** * Class for running the Smart Workflows. */ @@ -107,6 +73,8 @@ export default class Interpreter extends EventEmitter { private cumulativeResults: Record[] = []; + private workerPool: WorkerPool; + constructor(workflow: WorkflowFile, options?: Partial) { super(); this.workflow = workflow.workflow; @@ -114,6 +82,7 @@ export default class Interpreter extends EventEmitter { this.options = { maxRepeats: 5, maxConcurrency: 5, + maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)), serializableCallback: (data) => { log(JSON.stringify(data), Level.WARN); }, @@ -124,6 +93,7 @@ export default class Interpreter extends EventEmitter { }; this.concurrency = new Concurrency(this.options.maxConcurrency); this.log = (...args) => log(...args); + this.workerPool = new WorkerPool(this.options.maxWorkers); const error = Preprocessor.validateWorkflow(workflow); if (error) { @@ -582,21 +552,16 @@ export default class Interpreter extends EventEmitter { private async handleParallelPagination(page: Page, config: any): Promise { if (config.limit > 10000 && config.pagination.type === 'clickNext') { console.time('parallel-scraping'); - const context = page.context(); - const numWorkers = 4; // Number of parallel processes + + const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, 4)); const batchSize = Math.ceil(config.limit / numWorkers); + const workerPool = new WorkerPool(numWorkers); const pageUrls: string[] = []; let workers: any = null; let availableSelectors = config.pagination.selector.split(','); let visitedUrls: string[] = []; - const sharedState: SharedState = { - totalScraped: 0, - visitedUrls: new Set(), - results: [] - }; - const { itemsPerPage, estimatedPages } = await page.evaluate( ({ listSelector, limit }) => { const items = document.querySelectorAll(listSelector).length; @@ -704,158 +669,55 @@ export default class Interpreter extends EventEmitter { console.log(`Collected ${pageUrls.length} unique page URLs`); - // Distribute pages among workers - const pagesPerWorker = Math.ceil(pageUrls.length / numWorkers); - const workerPageAssignments = Array.from({ length: numWorkers }, (_, i) => { - const start = i * pagesPerWorker; - const end = Math.min(start + pagesPerWorker, pageUrls.length); - return pageUrls.slice(start, end); + workerPool.on('progress', (progress) => { + console.log( + `Worker ${progress.workerId}: ` + + `${progress.percentage.toFixed(2)}% complete, ` + + `${progress.scrapedItems} items scraped, ` + + `ETA: ${Math.round(progress.estimatedTimeRemaining / 1000)}s` + ); }); - - // Create shared state for coordination - - console.log(`Starting parallel scraping with ${numWorkers} workers`); - console.log(`Each worker will handle ${batchSize} items`); - // Create worker processes - workers = await Promise.all(Array.from({ length: numWorkers }, async (_, index) => { - // Each worker gets its own browser context - const workerContext = await context.browser().newContext(); - const workerPage = await workerContext.newPage(); - - await this.ensureScriptsLoaded(workerPage); - - // Calculate this worker's range - const startIndex = index * batchSize; - const endIndex = Math.min((index + 1) * batchSize, config.limit); - - console.log(`Worker ${index + 1} starting: Items ${startIndex} to ${endIndex}`); - - try { - // Navigate to the same URL - await workerPage.goto(page.url()); - await workerPage.waitForLoadState('networkidle'); - - // Create worker-specific config - const workerConfig: WorkerConfig = { - ...config, - workerIndex: index, - startIndex, - endIndex, - batchSize, - pageUrls: workerPageAssignments[index] - }; - - console.log(`Worker ${index} URLs: ${workerPageAssignments[index]}`); - - // Perform scraping for this worker's portion - const workerResults = await this.scrapeClickNextWorkerBatch( - workerPage, - workerConfig, - sharedState - ); - - return workerResults; - } finally { - await workerPage.close(); - await workerContext.close(); - } - })); - - // Combine and process results from all workers - const allResults = workers.flat().sort((a, b) => { - // Sort by any unique identifier in your data - // This example assumes items have an index or id field - return (a.index || 0) - (b.index || 0); + workerPool.on('globalProgress', (metrics) => { + // Global progress is automatically logged by the worker pool }); - console.timeEnd('parallel-scraping'); - console.log(`Total items scraped: ${allResults.length}`); - - return allResults.slice(0, config.limit); - } - - return this.handlePagination(page, config); - } - - private async scrapeClickNextWorkerBatch( - page: Page, - config: WorkerConfig, - sharedState: SharedState - ): Promise { - const results: any[] = []; - const scrapedItems = new Set(); - - // Now we can access the page URLs directly from the config - console.log(`Worker ${config.workerIndex + 1}: Processing ${config.pageUrls.length} assigned pages`); - - // Process each assigned URL - for (const [pageIndex, pageUrl] of config.pageUrls.entries()) { try { - const navigationResult = await page.goto(pageUrl, { - waitUntil: 'networkidle', - timeout: 30000 - }).catch(error => { - console.error( - `Worker ${config.workerIndex + 1}: Navigation failed for ${pageUrl}:`, - error - ); - return null; - }); - - if (!navigationResult) { - continue; - } - - await page.waitForLoadState('networkidle').catch(() => {}); - - const scrapeConfig = { - listSelector: config.listSelector, - fields: config.fields, - pagination: config.pagination, - limit: config.endIndex - config.startIndex - results.length - }; - - console.log(`Worker ${config.workerIndex + 1}, Config limit: ${scrapeConfig.limit}`); - - const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), scrapeConfig); - - // Process and filter results - const newResults = pageResults - .filter(item => { - const uniqueKey = JSON.stringify(item); - if (scrapedItems.has(uniqueKey)) return false; - scrapedItems.add(uniqueKey); - return true; - }) - - results.push(...newResults); - sharedState.totalScraped += newResults.length; - sharedState.visitedUrls.add(pageUrl); - - console.log( - `Worker ${config.workerIndex + 1}: ` + - `Completed page ${pageIndex + 1}/${config.pageUrls.length} - ` + - `Total items: ${results.length}/${config.batchSize}` + // Distribute pages among workers + const pagesPerWorker = Math.ceil(pageUrls.length / numWorkers); + const workerConfigs: WorkerConfig[] = Array.from( + { length: numWorkers }, + (_, i) => ({ + workerIndex: i, + startIndex: i * batchSize, + endIndex: Math.min((i + 1) * batchSize, config.limit), + batchSize, + pageUrls: pageUrls.slice( + i * pagesPerWorker, + Math.min((i + 1) * pagesPerWorker, pageUrls.length) + ), + listSelector: config.listSelector, + fields: config.fields, + pagination: config.pagination + }) ); - if (results.length >= config.batchSize) { - console.log(`Worker ${config.workerIndex + 1}: Reached batch limit of ${config.batchSize}`); - break; - } + const results = await workerPool.runWorkers(workerConfigs); - await page.waitForTimeout(1000); + // Process and sort results + const sortedResults = results.sort((a, b) => { + return (a.index || 0) - (b.index || 0); + }); + console.timeEnd('parallel-scraping'); + return sortedResults.slice(0, config.limit); } catch (error) { - console.error( - `Worker ${config.workerIndex + 1}: Error processing page ${pageUrl}:`, - error - ); - continue; + console.error('Parallel scraping failed!'); + return this.handlePagination(page, config); } - } - console.log(`Worker ${config.workerIndex + 1}: Completed with ${results.length} items scraped`); - return results; + } + + return this.handlePagination(page, config); } private async handlePagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) { From 38539d5855df54cdde1cf3dd104159edcfa4a5c0 Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 17 Jan 2025 15:16:59 +0530 Subject: [PATCH 09/21] feat: add worker pool for parallelization --- maxun-core/src/utils/worker-pool.ts | 282 ++++++++++++++++++++++++++++ 1 file changed, 282 insertions(+) create mode 100644 maxun-core/src/utils/worker-pool.ts diff --git a/maxun-core/src/utils/worker-pool.ts b/maxun-core/src/utils/worker-pool.ts new file mode 100644 index 000000000..b38ce5efa --- /dev/null +++ b/maxun-core/src/utils/worker-pool.ts @@ -0,0 +1,282 @@ +import { Worker } from 'worker_threads'; +import path from 'path'; +import os from 'os'; +import { Browser } from 'playwright'; +import { EventEmitter } from 'events'; +import { + WorkerConfig, SharedState, WorkerProgressData, + PerformanceMetrics, GlobalMetrics +} from '../types/worker'; + +interface WorkerMetrics { + workerId: number; + currentUrl: string; + processedUrls: number; + totalUrls: number; + scrapedItems: number; + failures: number; + startTime: number; + performance: PerformanceMetrics; + status: 'running' | 'completed' | 'failed'; +} + +export class WorkerPool extends EventEmitter { + private workers: Worker[] = []; + private readonly maxWorkers: number; + private isShuttingDown: boolean = false; + private browser: Browser | null = null; + private performanceMetrics: Map = new Map(); + private globalStartTime: number; + private progressInterval: NodeJS.Timeout | null = null; + + constructor(maxWorkers: number = Math.max(1, Math.min(os.cpus().length - 1, 4))) { + super(); + this.maxWorkers = maxWorkers; + this.globalStartTime = Date.now(); + } + + private createWorker(config: WorkerConfig, sharedState: SharedState): Worker { + const worker = new Worker(path.join(__dirname, 'worker.js'), { + workerData: { config, sharedState } + }); + + this.initializeWorkerMetrics(config.workerIndex, config.pageUrls.length); + + worker.on('message', (message) => { + switch (message.type) { + case 'progress': + this.handleWorkerProgress(message.data); + break; + case 'error': + this.handleWorkerError(message.data); + break; + case 'complete': + this.handleWorkerComplete(config.workerIndex, message.data); + break; + } + }); + + return worker; + } + + private handleWorkerProgress(data: any): void { + this.updateWorkerMetrics(data.workerId, { + currentUrl: data.currentUrl, + processedUrls: data.processedUrls, + scrapedItems: data.scrapedItems + }); + } + + private handleWorkerError(data: any): void { + this.updateWorkerMetrics(data.workerId, { + failures: (this.performanceMetrics.get(data.workerId)?.failures || 0) + 1 + }); + console.error(`Worker ${data.workerId} error:`, data.error); + } + + private handleWorkerComplete(workerId: number, results: any[]): void { + this.updateWorkerMetrics(workerId, { + status: 'completed', + scrapedItems: results.length, + performance: { + ...this.performanceMetrics.get(workerId)?.performance!, + endTime: Date.now(), + duration: Date.now() - this.performanceMetrics.get(workerId)?.startTime!, + itemsScraped: results.length + } + }); + } + + private initializeWorkerMetrics(workerId: number, totalUrls: number): void { + this.performanceMetrics.set(workerId, { + workerId, + currentUrl: '', + processedUrls: 0, + totalUrls, + scrapedItems: 0, + failures: 0, + startTime: Date.now(), + performance: { + startTime: Date.now(), + endTime: 0, + duration: 0, + pagesProcessed: 0, + itemsScraped: 0, + failedPages: 0, + averageTimePerPage: 0, + memoryUsage: process.memoryUsage(), + cpuUsage: process.cpuUsage() + }, + status: 'running' + }); + } + + private updateWorkerMetrics(workerId: number, update: Partial): void { + const currentMetrics = this.performanceMetrics.get(workerId); + if (currentMetrics) { + this.performanceMetrics.set(workerId, { ...currentMetrics, ...update }); + this.emitProgressUpdate(workerId); + } + } + + private emitProgressUpdate(workerId: number): void { + const metrics = this.performanceMetrics.get(workerId); + if (metrics) { + const progress: WorkerProgressData = { + percentage: (metrics.processedUrls / metrics.totalUrls) * 100, + currentUrl: metrics.currentUrl, + scrapedItems: metrics.scrapedItems, + timeElapsed: Date.now() - metrics.startTime, + estimatedTimeRemaining: this.calculateEstimatedTimeRemaining(metrics), + failures: metrics.failures, + performance: metrics.performance + }; + + this.emit('progress', { workerId, ...progress }); + } + } + + private calculateEstimatedTimeRemaining(metrics: WorkerMetrics): number { + const timeElapsed = Date.now() - metrics.startTime; + const itemsPerMs = metrics.processedUrls / timeElapsed; + const remainingItems = metrics.totalUrls - metrics.processedUrls; + return remainingItems / itemsPerMs; + } + + private startProgressMonitoring(): void { + this.progressInterval = setInterval(() => { + this.reportGlobalProgress(); + }, 5000); + } + + private reportGlobalProgress(): void { + const globalMetrics: GlobalMetrics = { + totalPagesProcessed: 0, + totalItemsScraped: 0, + totalFailures: 0, + workersActive: 0, + averageSpeed: 0, + timeElapsed: Date.now() - this.globalStartTime, + memoryUsage: process.memoryUsage(), + cpuUsage: process.cpuUsage() + }; + + for (const metrics of this.performanceMetrics.values()) { + globalMetrics.totalPagesProcessed += metrics.processedUrls; + globalMetrics.totalItemsScraped += metrics.scrapedItems; + globalMetrics.totalFailures += metrics.failures; + if (metrics.status === 'running') globalMetrics.workersActive++; + } + + globalMetrics.averageSpeed = globalMetrics.timeElapsed > 0 + ? (globalMetrics.totalItemsScraped / (globalMetrics.timeElapsed / 1000)) + : 0; + + this.emit('globalProgress', globalMetrics); + this.logProgressReport(globalMetrics); + } + + private logProgressReport(metrics: GlobalMetrics): void { + console.log('\n=== Scraping Progress Report ==='); + console.log(`Active Workers: ${metrics.workersActive}/${this.maxWorkers}`); + console.log(`Total Pages Processed: ${metrics.totalPagesProcessed}`); + console.log(`Total Items Scraped: ${metrics.totalItemsScraped}`); + console.log(`Scraping Speed: ${metrics.averageSpeed.toFixed(2)} items/second`); + console.log(`Failed Pages: ${metrics.totalFailures}`); + console.log(`Memory Usage: ${(metrics.memoryUsage.heapUsed / 1024 / 1024).toFixed(2)} MB`); + console.log('=============================\n'); + } + + public async runWorkers(configs: WorkerConfig[]): Promise { + const results: any[] = []; + const errors: Error[] = []; + const sharedState: SharedState = { + totalScraped: 0, + results: [] + }; + + this.globalStartTime = Date.now(); + this.startProgressMonitoring(); + + try { + const workerPromises = configs.map(config => + new Promise(async (resolve, reject) => { + if (this.isShuttingDown) { + reject(new Error('Worker pool is shutting down')); + return; + } + + const worker = this.createWorker(config, sharedState); + this.workers.push(worker); + + let workerResults: any[] = []; + + worker.on('message', (message) => { + if (message.type === 'complete') { + workerResults = message.data; + } + }); + + worker.on('error', (error) => { + errors.push(error); + this.updateWorkerMetrics(config.workerIndex, { status: 'failed' }); + reject(error); + }); + + worker.on('exit', (code) => { + if (code === 0) { + resolve(workerResults); + } else { + reject(new Error(`Worker stopped with exit code ${code}`)); + } + }); + }).catch(error => { + console.error('Worker error:', error); + return []; + }) + ); + + const workerResults = await Promise.all(workerPromises); + + if (errors.length === configs.length) { + throw new Error(`All workers failed: ${errors.map(e => e.message).join(', ')}`); + } + + results.push(...workerResults.flat()); + + } finally { + this.reportGlobalProgress(); // Final report + await this.cleanup(); + } + + return results; + } + + public async cleanup(): Promise { + this.isShuttingDown = true; + + await Promise.all( + this.workers.map(worker => + new Promise((resolve) => { + worker.terminate().then(() => resolve()); + }) + ) + ); + + this.workers = []; + if (this.progressInterval) { + clearInterval(this.progressInterval); + this.progressInterval = null; + } + + this.isShuttingDown = false; + } + + public getActiveWorkerCount(): number { + return this.browser ? 1 : 0; + } + + public isActive(): boolean { + return this.browser !== null && !this.isShuttingDown; + } +} \ No newline at end of file From e0b52c146939f57e54eae7ac6c5aae3015d15537 Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 17 Jan 2025 15:18:05 +0530 Subject: [PATCH 10/21] feat: add worker types --- maxun-core/src/types/worker.ts | 59 +++++++++++++ maxun-core/src/utils/worker.ts | 157 +++++++++++++++++++++++++++++++++ 2 files changed, 216 insertions(+) create mode 100644 maxun-core/src/types/worker.ts create mode 100644 maxun-core/src/utils/worker.ts diff --git a/maxun-core/src/types/worker.ts b/maxun-core/src/types/worker.ts new file mode 100644 index 000000000..3520a2b3c --- /dev/null +++ b/maxun-core/src/types/worker.ts @@ -0,0 +1,59 @@ +export interface WorkerConfig { + workerIndex: number; + startIndex: number; + endIndex: number; + batchSize: number; + pageUrls: string[]; + listSelector: string; + fields: any; + pagination: { + type: string; + selector: string; + }; +} + +export interface SharedState { + totalScraped: number; + results: any[]; +} + +export interface WorkerProgressData { + percentage: number; + currentUrl: string; + scrapedItems: number; + timeElapsed: number; + estimatedTimeRemaining: number; + failures: number; + performance: PerformanceMetrics; +} + +export interface PerformanceMetrics { + startTime: number; + endTime: number; + duration: number; + pagesProcessed: number; + itemsScraped: number; + failedPages: number; + averageTimePerPage: number; + memoryUsage: { + heapUsed: number; + heapTotal: number; + external: number; + rss: number; + }; + cpuUsage: { + user: number; + system: number; + }; +} + +export interface GlobalMetrics { + totalPagesProcessed: number; + totalItemsScraped: number; + totalFailures: number; + workersActive: number; + averageSpeed: number; + timeElapsed: number; + memoryUsage: NodeJS.MemoryUsage; + cpuUsage: NodeJS.CpuUsage; +} \ No newline at end of file diff --git a/maxun-core/src/utils/worker.ts b/maxun-core/src/utils/worker.ts new file mode 100644 index 000000000..94d8accc6 --- /dev/null +++ b/maxun-core/src/utils/worker.ts @@ -0,0 +1,157 @@ +import { parentPort, workerData } from 'worker_threads'; +import { chromium, Browser, Page } from 'playwright'; +import path from 'path'; +import type { WorkerConfig, SharedState } from '../types/worker'; + +async function initializeBrowser(): Promise { + return await chromium.launch({ + headless: true, + args: [ + "--disable-blink-features=AutomationControlled", + "--disable-web-security", + "--disable-features=IsolateOrigins,site-per-process", + "--disable-site-isolation-trials", + "--disable-extensions", + "--no-sandbox", + "--disable-dev-shm-usage", + ] + }); +} + +async function ensureScriptsLoaded(page: Page) { + const isScriptLoaded = await page.evaluate(() => + typeof window.scrape === 'function' && + typeof window.scrapeSchema === 'function' && + typeof window.scrapeList === 'function' && + typeof window.scrapeListAuto === 'function' && + typeof window.scrollDown === 'function' && + typeof window.scrollUp === 'function' + ); + + if (!isScriptLoaded) { + await page.addInitScript({ + path: path.join(__dirname, '..', 'browserSide', 'scraper.js') + }); + } +} + +async function scrapeBatch(config: WorkerConfig, sharedState: SharedState) { + const results: any[] = []; + const scrapedItems = new Set(); + let browser: Browser | null = null; + let page: Page | null = null; + + try { + browser = await initializeBrowser(); + const context = await browser.newContext(); + page = await context.newPage(); + await ensureScriptsLoaded(page); + + for (const [pageIndex, pageUrl] of config.pageUrls.entries()) { + const pageStartTime = Date.now(); + + try { + // Report progress to main thread + parentPort?.postMessage({ + type: 'progress', + data: { + workerId: config.workerIndex, + currentUrl: pageUrl, + processedUrls: pageIndex, + totalUrls: config.pageUrls.length, + timeElapsed: Date.now() - pageStartTime, + scrapedItems: results.length + } + }); + + const navigationResult = await page.goto(pageUrl, { + waitUntil: 'networkidle', + timeout: 30000 + }); + + if (!navigationResult) continue; + + await page.waitForLoadState('networkidle').catch(() => {}); + + const scrapeConfig = { + listSelector: config.listSelector, + fields: config.fields, + pagination: config.pagination, + limit: config.endIndex - config.startIndex - results.length + }; + + const pageResults = await page.evaluate( + (cfg) => window.scrapeList(cfg), + scrapeConfig + ); + + // Filter out duplicates + const newResults = pageResults.filter(item => { + const uniqueKey = JSON.stringify(item); + + // Check against local duplicates + if (scrapedItems.has(uniqueKey)) return false; + + // Check against shared state results + const isDuplicate = sharedState.results.some( + existingItem => JSON.stringify(existingItem) === uniqueKey + ); + + if (isDuplicate) return false; + scrapedItems.add(uniqueKey); + sharedState.results.push(item); + sharedState.totalScraped++; + return true; + }); + + results.push(...newResults); + + if (results.length >= config.batchSize) break; + + await page.waitForTimeout(1000); + + } catch (error) { + parentPort?.postMessage({ + type: 'error', + data: { + workerId: config.workerIndex, + url: pageUrl, + error: error.message + } + }); + continue; + } + } + + return results; + + } catch (error) { + throw error; + } finally { + if (page) await page.close(); + if (browser) await browser.close(); + } +} + +// Handle worker initialization +if (parentPort) { + const config: WorkerConfig = workerData.config; + const sharedState: SharedState = workerData.sharedState; + + scrapeBatch(config, sharedState) + .then(results => { + parentPort?.postMessage({ + type: 'complete', + data: results + }); + }) + .catch(error => { + parentPort?.postMessage({ + type: 'error', + data: { + workerId: config.workerIndex, + error: error.message + } + }); + }); +} \ No newline at end of file From b2e83324448e9862da55f99f8f1e8a5ab6a32557 Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 17 Jan 2025 16:23:42 +0530 Subject: [PATCH 11/21] feat: rm worker pool logic --- maxun-core/src/interpret.ts | 57 ++----------------------------------- 1 file changed, 2 insertions(+), 55 deletions(-) diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index b5bfc740a..b19c8fa43 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -16,9 +16,7 @@ import Concurrency from './utils/concurrency'; import Preprocessor from './preprocessor'; import log, { Level } from './utils/logger'; -import { WorkerConfig } from './types/worker'; -import { WorkerPool } from './utils/worker-pool'; -import os from 'os'; +import os from 'os'; /** * Extending the Window interface for custom scraping functions. @@ -43,7 +41,6 @@ declare global { interface InterpreterOptions { maxRepeats: number; maxConcurrency: number; - maxWorkers: number; serializableCallback: (output: any) => (void | Promise); binaryCallback: (output: any, mimeType: string) => (void | Promise); debug: boolean; @@ -73,8 +70,6 @@ export default class Interpreter extends EventEmitter { private cumulativeResults: Record[] = []; - private workerPool: WorkerPool; - constructor(workflow: WorkflowFile, options?: Partial) { super(); this.workflow = workflow.workflow; @@ -82,7 +77,6 @@ export default class Interpreter extends EventEmitter { this.options = { maxRepeats: 5, maxConcurrency: 5, - maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)), serializableCallback: (data) => { log(JSON.stringify(data), Level.WARN); }, @@ -93,7 +87,6 @@ export default class Interpreter extends EventEmitter { }; this.concurrency = new Concurrency(this.options.maxConcurrency); this.log = (...args) => log(...args); - this.workerPool = new WorkerPool(this.options.maxWorkers); const error = Preprocessor.validateWorkflow(workflow); if (error) { @@ -555,7 +548,6 @@ export default class Interpreter extends EventEmitter { const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, 4)); const batchSize = Math.ceil(config.limit / numWorkers); - const workerPool = new WorkerPool(numWorkers); const pageUrls: string[] = []; let workers: any = null; @@ -669,52 +661,7 @@ export default class Interpreter extends EventEmitter { console.log(`Collected ${pageUrls.length} unique page URLs`); - workerPool.on('progress', (progress) => { - console.log( - `Worker ${progress.workerId}: ` + - `${progress.percentage.toFixed(2)}% complete, ` + - `${progress.scrapedItems} items scraped, ` + - `ETA: ${Math.round(progress.estimatedTimeRemaining / 1000)}s` - ); - }); - - workerPool.on('globalProgress', (metrics) => { - // Global progress is automatically logged by the worker pool - }); - - try { - // Distribute pages among workers - const pagesPerWorker = Math.ceil(pageUrls.length / numWorkers); - const workerConfigs: WorkerConfig[] = Array.from( - { length: numWorkers }, - (_, i) => ({ - workerIndex: i, - startIndex: i * batchSize, - endIndex: Math.min((i + 1) * batchSize, config.limit), - batchSize, - pageUrls: pageUrls.slice( - i * pagesPerWorker, - Math.min((i + 1) * pagesPerWorker, pageUrls.length) - ), - listSelector: config.listSelector, - fields: config.fields, - pagination: config.pagination - }) - ); - - const results = await workerPool.runWorkers(workerConfigs); - - // Process and sort results - const sortedResults = results.sort((a, b) => { - return (a.index || 0) - (b.index || 0); - }); - - console.timeEnd('parallel-scraping'); - return sortedResults.slice(0, config.limit); - } catch (error) { - console.error('Parallel scraping failed!'); - return this.handlePagination(page, config); - } + } return this.handlePagination(page, config); From 7d0339a25e8e9c61662738515460fa988fee3d15 Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 17 Jan 2025 16:23:42 +0530 Subject: [PATCH 12/21] feat: rm worker pool logic --- maxun-core/src/interpret.ts | 57 +----- maxun-core/src/utils/worker-pool.ts | 282 ---------------------------- maxun-core/src/utils/worker.ts | 157 ---------------- 3 files changed, 2 insertions(+), 494 deletions(-) delete mode 100644 maxun-core/src/utils/worker-pool.ts delete mode 100644 maxun-core/src/utils/worker.ts diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index b5bfc740a..b19c8fa43 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -16,9 +16,7 @@ import Concurrency from './utils/concurrency'; import Preprocessor from './preprocessor'; import log, { Level } from './utils/logger'; -import { WorkerConfig } from './types/worker'; -import { WorkerPool } from './utils/worker-pool'; -import os from 'os'; +import os from 'os'; /** * Extending the Window interface for custom scraping functions. @@ -43,7 +41,6 @@ declare global { interface InterpreterOptions { maxRepeats: number; maxConcurrency: number; - maxWorkers: number; serializableCallback: (output: any) => (void | Promise); binaryCallback: (output: any, mimeType: string) => (void | Promise); debug: boolean; @@ -73,8 +70,6 @@ export default class Interpreter extends EventEmitter { private cumulativeResults: Record[] = []; - private workerPool: WorkerPool; - constructor(workflow: WorkflowFile, options?: Partial) { super(); this.workflow = workflow.workflow; @@ -82,7 +77,6 @@ export default class Interpreter extends EventEmitter { this.options = { maxRepeats: 5, maxConcurrency: 5, - maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)), serializableCallback: (data) => { log(JSON.stringify(data), Level.WARN); }, @@ -93,7 +87,6 @@ export default class Interpreter extends EventEmitter { }; this.concurrency = new Concurrency(this.options.maxConcurrency); this.log = (...args) => log(...args); - this.workerPool = new WorkerPool(this.options.maxWorkers); const error = Preprocessor.validateWorkflow(workflow); if (error) { @@ -555,7 +548,6 @@ export default class Interpreter extends EventEmitter { const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, 4)); const batchSize = Math.ceil(config.limit / numWorkers); - const workerPool = new WorkerPool(numWorkers); const pageUrls: string[] = []; let workers: any = null; @@ -669,52 +661,7 @@ export default class Interpreter extends EventEmitter { console.log(`Collected ${pageUrls.length} unique page URLs`); - workerPool.on('progress', (progress) => { - console.log( - `Worker ${progress.workerId}: ` + - `${progress.percentage.toFixed(2)}% complete, ` + - `${progress.scrapedItems} items scraped, ` + - `ETA: ${Math.round(progress.estimatedTimeRemaining / 1000)}s` - ); - }); - - workerPool.on('globalProgress', (metrics) => { - // Global progress is automatically logged by the worker pool - }); - - try { - // Distribute pages among workers - const pagesPerWorker = Math.ceil(pageUrls.length / numWorkers); - const workerConfigs: WorkerConfig[] = Array.from( - { length: numWorkers }, - (_, i) => ({ - workerIndex: i, - startIndex: i * batchSize, - endIndex: Math.min((i + 1) * batchSize, config.limit), - batchSize, - pageUrls: pageUrls.slice( - i * pagesPerWorker, - Math.min((i + 1) * pagesPerWorker, pageUrls.length) - ), - listSelector: config.listSelector, - fields: config.fields, - pagination: config.pagination - }) - ); - - const results = await workerPool.runWorkers(workerConfigs); - - // Process and sort results - const sortedResults = results.sort((a, b) => { - return (a.index || 0) - (b.index || 0); - }); - - console.timeEnd('parallel-scraping'); - return sortedResults.slice(0, config.limit); - } catch (error) { - console.error('Parallel scraping failed!'); - return this.handlePagination(page, config); - } + } return this.handlePagination(page, config); diff --git a/maxun-core/src/utils/worker-pool.ts b/maxun-core/src/utils/worker-pool.ts deleted file mode 100644 index b38ce5efa..000000000 --- a/maxun-core/src/utils/worker-pool.ts +++ /dev/null @@ -1,282 +0,0 @@ -import { Worker } from 'worker_threads'; -import path from 'path'; -import os from 'os'; -import { Browser } from 'playwright'; -import { EventEmitter } from 'events'; -import { - WorkerConfig, SharedState, WorkerProgressData, - PerformanceMetrics, GlobalMetrics -} from '../types/worker'; - -interface WorkerMetrics { - workerId: number; - currentUrl: string; - processedUrls: number; - totalUrls: number; - scrapedItems: number; - failures: number; - startTime: number; - performance: PerformanceMetrics; - status: 'running' | 'completed' | 'failed'; -} - -export class WorkerPool extends EventEmitter { - private workers: Worker[] = []; - private readonly maxWorkers: number; - private isShuttingDown: boolean = false; - private browser: Browser | null = null; - private performanceMetrics: Map = new Map(); - private globalStartTime: number; - private progressInterval: NodeJS.Timeout | null = null; - - constructor(maxWorkers: number = Math.max(1, Math.min(os.cpus().length - 1, 4))) { - super(); - this.maxWorkers = maxWorkers; - this.globalStartTime = Date.now(); - } - - private createWorker(config: WorkerConfig, sharedState: SharedState): Worker { - const worker = new Worker(path.join(__dirname, 'worker.js'), { - workerData: { config, sharedState } - }); - - this.initializeWorkerMetrics(config.workerIndex, config.pageUrls.length); - - worker.on('message', (message) => { - switch (message.type) { - case 'progress': - this.handleWorkerProgress(message.data); - break; - case 'error': - this.handleWorkerError(message.data); - break; - case 'complete': - this.handleWorkerComplete(config.workerIndex, message.data); - break; - } - }); - - return worker; - } - - private handleWorkerProgress(data: any): void { - this.updateWorkerMetrics(data.workerId, { - currentUrl: data.currentUrl, - processedUrls: data.processedUrls, - scrapedItems: data.scrapedItems - }); - } - - private handleWorkerError(data: any): void { - this.updateWorkerMetrics(data.workerId, { - failures: (this.performanceMetrics.get(data.workerId)?.failures || 0) + 1 - }); - console.error(`Worker ${data.workerId} error:`, data.error); - } - - private handleWorkerComplete(workerId: number, results: any[]): void { - this.updateWorkerMetrics(workerId, { - status: 'completed', - scrapedItems: results.length, - performance: { - ...this.performanceMetrics.get(workerId)?.performance!, - endTime: Date.now(), - duration: Date.now() - this.performanceMetrics.get(workerId)?.startTime!, - itemsScraped: results.length - } - }); - } - - private initializeWorkerMetrics(workerId: number, totalUrls: number): void { - this.performanceMetrics.set(workerId, { - workerId, - currentUrl: '', - processedUrls: 0, - totalUrls, - scrapedItems: 0, - failures: 0, - startTime: Date.now(), - performance: { - startTime: Date.now(), - endTime: 0, - duration: 0, - pagesProcessed: 0, - itemsScraped: 0, - failedPages: 0, - averageTimePerPage: 0, - memoryUsage: process.memoryUsage(), - cpuUsage: process.cpuUsage() - }, - status: 'running' - }); - } - - private updateWorkerMetrics(workerId: number, update: Partial): void { - const currentMetrics = this.performanceMetrics.get(workerId); - if (currentMetrics) { - this.performanceMetrics.set(workerId, { ...currentMetrics, ...update }); - this.emitProgressUpdate(workerId); - } - } - - private emitProgressUpdate(workerId: number): void { - const metrics = this.performanceMetrics.get(workerId); - if (metrics) { - const progress: WorkerProgressData = { - percentage: (metrics.processedUrls / metrics.totalUrls) * 100, - currentUrl: metrics.currentUrl, - scrapedItems: metrics.scrapedItems, - timeElapsed: Date.now() - metrics.startTime, - estimatedTimeRemaining: this.calculateEstimatedTimeRemaining(metrics), - failures: metrics.failures, - performance: metrics.performance - }; - - this.emit('progress', { workerId, ...progress }); - } - } - - private calculateEstimatedTimeRemaining(metrics: WorkerMetrics): number { - const timeElapsed = Date.now() - metrics.startTime; - const itemsPerMs = metrics.processedUrls / timeElapsed; - const remainingItems = metrics.totalUrls - metrics.processedUrls; - return remainingItems / itemsPerMs; - } - - private startProgressMonitoring(): void { - this.progressInterval = setInterval(() => { - this.reportGlobalProgress(); - }, 5000); - } - - private reportGlobalProgress(): void { - const globalMetrics: GlobalMetrics = { - totalPagesProcessed: 0, - totalItemsScraped: 0, - totalFailures: 0, - workersActive: 0, - averageSpeed: 0, - timeElapsed: Date.now() - this.globalStartTime, - memoryUsage: process.memoryUsage(), - cpuUsage: process.cpuUsage() - }; - - for (const metrics of this.performanceMetrics.values()) { - globalMetrics.totalPagesProcessed += metrics.processedUrls; - globalMetrics.totalItemsScraped += metrics.scrapedItems; - globalMetrics.totalFailures += metrics.failures; - if (metrics.status === 'running') globalMetrics.workersActive++; - } - - globalMetrics.averageSpeed = globalMetrics.timeElapsed > 0 - ? (globalMetrics.totalItemsScraped / (globalMetrics.timeElapsed / 1000)) - : 0; - - this.emit('globalProgress', globalMetrics); - this.logProgressReport(globalMetrics); - } - - private logProgressReport(metrics: GlobalMetrics): void { - console.log('\n=== Scraping Progress Report ==='); - console.log(`Active Workers: ${metrics.workersActive}/${this.maxWorkers}`); - console.log(`Total Pages Processed: ${metrics.totalPagesProcessed}`); - console.log(`Total Items Scraped: ${metrics.totalItemsScraped}`); - console.log(`Scraping Speed: ${metrics.averageSpeed.toFixed(2)} items/second`); - console.log(`Failed Pages: ${metrics.totalFailures}`); - console.log(`Memory Usage: ${(metrics.memoryUsage.heapUsed / 1024 / 1024).toFixed(2)} MB`); - console.log('=============================\n'); - } - - public async runWorkers(configs: WorkerConfig[]): Promise { - const results: any[] = []; - const errors: Error[] = []; - const sharedState: SharedState = { - totalScraped: 0, - results: [] - }; - - this.globalStartTime = Date.now(); - this.startProgressMonitoring(); - - try { - const workerPromises = configs.map(config => - new Promise(async (resolve, reject) => { - if (this.isShuttingDown) { - reject(new Error('Worker pool is shutting down')); - return; - } - - const worker = this.createWorker(config, sharedState); - this.workers.push(worker); - - let workerResults: any[] = []; - - worker.on('message', (message) => { - if (message.type === 'complete') { - workerResults = message.data; - } - }); - - worker.on('error', (error) => { - errors.push(error); - this.updateWorkerMetrics(config.workerIndex, { status: 'failed' }); - reject(error); - }); - - worker.on('exit', (code) => { - if (code === 0) { - resolve(workerResults); - } else { - reject(new Error(`Worker stopped with exit code ${code}`)); - } - }); - }).catch(error => { - console.error('Worker error:', error); - return []; - }) - ); - - const workerResults = await Promise.all(workerPromises); - - if (errors.length === configs.length) { - throw new Error(`All workers failed: ${errors.map(e => e.message).join(', ')}`); - } - - results.push(...workerResults.flat()); - - } finally { - this.reportGlobalProgress(); // Final report - await this.cleanup(); - } - - return results; - } - - public async cleanup(): Promise { - this.isShuttingDown = true; - - await Promise.all( - this.workers.map(worker => - new Promise((resolve) => { - worker.terminate().then(() => resolve()); - }) - ) - ); - - this.workers = []; - if (this.progressInterval) { - clearInterval(this.progressInterval); - this.progressInterval = null; - } - - this.isShuttingDown = false; - } - - public getActiveWorkerCount(): number { - return this.browser ? 1 : 0; - } - - public isActive(): boolean { - return this.browser !== null && !this.isShuttingDown; - } -} \ No newline at end of file diff --git a/maxun-core/src/utils/worker.ts b/maxun-core/src/utils/worker.ts deleted file mode 100644 index 94d8accc6..000000000 --- a/maxun-core/src/utils/worker.ts +++ /dev/null @@ -1,157 +0,0 @@ -import { parentPort, workerData } from 'worker_threads'; -import { chromium, Browser, Page } from 'playwright'; -import path from 'path'; -import type { WorkerConfig, SharedState } from '../types/worker'; - -async function initializeBrowser(): Promise { - return await chromium.launch({ - headless: true, - args: [ - "--disable-blink-features=AutomationControlled", - "--disable-web-security", - "--disable-features=IsolateOrigins,site-per-process", - "--disable-site-isolation-trials", - "--disable-extensions", - "--no-sandbox", - "--disable-dev-shm-usage", - ] - }); -} - -async function ensureScriptsLoaded(page: Page) { - const isScriptLoaded = await page.evaluate(() => - typeof window.scrape === 'function' && - typeof window.scrapeSchema === 'function' && - typeof window.scrapeList === 'function' && - typeof window.scrapeListAuto === 'function' && - typeof window.scrollDown === 'function' && - typeof window.scrollUp === 'function' - ); - - if (!isScriptLoaded) { - await page.addInitScript({ - path: path.join(__dirname, '..', 'browserSide', 'scraper.js') - }); - } -} - -async function scrapeBatch(config: WorkerConfig, sharedState: SharedState) { - const results: any[] = []; - const scrapedItems = new Set(); - let browser: Browser | null = null; - let page: Page | null = null; - - try { - browser = await initializeBrowser(); - const context = await browser.newContext(); - page = await context.newPage(); - await ensureScriptsLoaded(page); - - for (const [pageIndex, pageUrl] of config.pageUrls.entries()) { - const pageStartTime = Date.now(); - - try { - // Report progress to main thread - parentPort?.postMessage({ - type: 'progress', - data: { - workerId: config.workerIndex, - currentUrl: pageUrl, - processedUrls: pageIndex, - totalUrls: config.pageUrls.length, - timeElapsed: Date.now() - pageStartTime, - scrapedItems: results.length - } - }); - - const navigationResult = await page.goto(pageUrl, { - waitUntil: 'networkidle', - timeout: 30000 - }); - - if (!navigationResult) continue; - - await page.waitForLoadState('networkidle').catch(() => {}); - - const scrapeConfig = { - listSelector: config.listSelector, - fields: config.fields, - pagination: config.pagination, - limit: config.endIndex - config.startIndex - results.length - }; - - const pageResults = await page.evaluate( - (cfg) => window.scrapeList(cfg), - scrapeConfig - ); - - // Filter out duplicates - const newResults = pageResults.filter(item => { - const uniqueKey = JSON.stringify(item); - - // Check against local duplicates - if (scrapedItems.has(uniqueKey)) return false; - - // Check against shared state results - const isDuplicate = sharedState.results.some( - existingItem => JSON.stringify(existingItem) === uniqueKey - ); - - if (isDuplicate) return false; - scrapedItems.add(uniqueKey); - sharedState.results.push(item); - sharedState.totalScraped++; - return true; - }); - - results.push(...newResults); - - if (results.length >= config.batchSize) break; - - await page.waitForTimeout(1000); - - } catch (error) { - parentPort?.postMessage({ - type: 'error', - data: { - workerId: config.workerIndex, - url: pageUrl, - error: error.message - } - }); - continue; - } - } - - return results; - - } catch (error) { - throw error; - } finally { - if (page) await page.close(); - if (browser) await browser.close(); - } -} - -// Handle worker initialization -if (parentPort) { - const config: WorkerConfig = workerData.config; - const sharedState: SharedState = workerData.sharedState; - - scrapeBatch(config, sharedState) - .then(results => { - parentPort?.postMessage({ - type: 'complete', - data: results - }); - }) - .catch(error => { - parentPort?.postMessage({ - type: 'error', - data: { - workerId: config.workerIndex, - error: error.message - } - }); - }); -} \ No newline at end of file From 5c6f478618c515c54b24b4703762a85f3aa3568e Mon Sep 17 00:00:00 2001 From: Rohit Date: Sat, 18 Jan 2025 12:30:27 +0530 Subject: [PATCH 13/21] feat: add kafka config --- maxun-core/src/config/kafka.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 maxun-core/src/config/kafka.ts diff --git a/maxun-core/src/config/kafka.ts b/maxun-core/src/config/kafka.ts new file mode 100644 index 000000000..10f3cc96a --- /dev/null +++ b/maxun-core/src/config/kafka.ts @@ -0,0 +1,10 @@ +export const kafkaConfig = { + clientId: 'maxun-scraper', + brokers: ['localhost:29092'], + topics: { + SCRAPING_TASKS: 'scraping-tasks', + SCRAPING_RESULTS: 'scraping-results', + SCRAPING_DLQ: 'scraping-dlq' + }, + consumerGroup: 'scraping-group' +}; \ No newline at end of file From 5becc84d5e81e9502b962b1328895889a341af59 Mon Sep 17 00:00:00 2001 From: Rohit Date: Sun, 19 Jan 2025 23:02:10 +0530 Subject: [PATCH 14/21] feat: add kafka manager to create topics --- maxun-core/src/utils/kafka-manager.ts | 66 +++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 maxun-core/src/utils/kafka-manager.ts diff --git a/maxun-core/src/utils/kafka-manager.ts b/maxun-core/src/utils/kafka-manager.ts new file mode 100644 index 000000000..7aa78f831 --- /dev/null +++ b/maxun-core/src/utils/kafka-manager.ts @@ -0,0 +1,66 @@ +import { Kafka, Consumer, Producer } from 'kafkajs'; +import { kafkaConfig } from '../config/kafka'; +import { EventEmitter } from 'events'; + +export class KafkaManager extends EventEmitter { + private kafka: Kafka; + private producer: Producer; + private consumer: Consumer; + private metricsInterval: NodeJS.Timeout | null = null; + + constructor() { + super(); + this.kafka = new Kafka({ + clientId: kafkaConfig.clientId, + brokers: kafkaConfig.brokers + }); + + this.producer = this.kafka.producer(); + this.consumer = this.kafka.consumer({ + groupId: kafkaConfig.consumerGroup, + sessionTimeout: 30000 + }); + } + + async initialize() { + await this.producer.connect(); + await this.consumer.connect(); + await this.createTopics(); + this.startMetricsReporting(); + } + + private async createTopics() { + const admin = this.kafka.admin(); + await admin.createTopics({ + topics: [ + { topic: kafkaConfig.topics.SCRAPING_TASKS, numPartitions: 10 }, + { topic: kafkaConfig.topics.SCRAPING_RESULTS, numPartitions: 10 }, + { topic: kafkaConfig.topics.SCRAPING_DLQ, numPartitions: 1 } + ] + }); + await admin.disconnect(); + } + + private startMetricsReporting() { + this.metricsInterval = setInterval(async () => { + const admin = this.kafka.admin(); + const metrics = await admin.fetchTopicMetadata({ + topics: [ + kafkaConfig.topics.SCRAPING_TASKS, + kafkaConfig.topics.SCRAPING_RESULTS + ] + }); + + this.emit('metrics', metrics); + await admin.disconnect(); + }, 5000); + } + + async cleanup() { + if (this.metricsInterval) { + clearInterval(this.metricsInterval); + } + await this.producer.disconnect(); + await this.consumer.disconnect(); + } +} \ No newline at end of file From 5e8a6d1bafc70eb16523afc5912dfb04ef627fc9 Mon Sep 17 00:00:00 2001 From: Rohit Date: Sun, 19 Jan 2025 23:05:35 +0530 Subject: [PATCH 15/21] feat: add scraper to scrape data and store in kafka --- maxun-core/src/utils/scraping-consumer.ts | 152 ++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 maxun-core/src/utils/scraping-consumer.ts diff --git a/maxun-core/src/utils/scraping-consumer.ts b/maxun-core/src/utils/scraping-consumer.ts new file mode 100644 index 000000000..cbb54b5e6 --- /dev/null +++ b/maxun-core/src/utils/scraping-consumer.ts @@ -0,0 +1,152 @@ +import { Kafka, Consumer, Producer } from 'kafkajs'; +import { chromium, Browser, Page } from 'playwright'; +import { kafkaConfig } from '../config/kafka'; +import path from 'path'; + +declare global { + interface Window { + scrape: (selector: string | null) => Record[]; + scrapeSchema: ( + schema: Record + ) => Record; + scrapeList: (config: { listSelector: string; fields: any; limit?: number; pagination: any }) => Record[]; + scrapeListAuto: (listSelector: string) => { selector: string; innerText: string }[]; + scrollDown: (pages?: number) => void; + scrollUp: (pages?: number) => void; + } +} + +export class ScrapingConsumer { + private kafka: Kafka; + private consumer: Consumer; + private producer: Producer; + + constructor() { + this.kafka = new Kafka({ + clientId: `${kafkaConfig.clientId}-consumer`, + brokers: kafkaConfig.brokers + }); + + this.consumer = this.kafka.consumer({ + groupId: kafkaConfig.consumerGroup + }); + this.producer = this.kafka.producer(); + } + + async start() { + await this.consumer.connect(); + await this.producer.connect(); + await this.consumer.subscribe({ + topic: kafkaConfig.topics.SCRAPING_TASKS, + fromBeginning: true + }); + + await this.consumer.run({ + partitionsConsumedConcurrently: 3, + eachMessage: async ({ topic, partition, message }) => { + try { + const task = JSON.parse(message.value!.toString()); + const results = await this.processTask(task); + + await this.producer.send({ + topic: kafkaConfig.topics.SCRAPING_RESULTS, + messages: [{ + key: task.taskId, + value: JSON.stringify({ + taskId: task.taskId, + data: results + }) + }] + }); + } catch (error) { + await this.handleError(message, error); + } + } + }); + } + + private async ensureScriptsLoaded(page: Page) { + const isScriptLoaded = await page.evaluate(() => typeof window.scrape === 'function' && typeof window.scrapeSchema === 'function' && typeof window.scrapeList === 'function' && typeof window.scrapeListAuto === 'function' && typeof window.scrollDown === 'function' && typeof window.scrollUp === 'function'); + if (!isScriptLoaded) { + await page.addInitScript({ path: path.join(__dirname, '..', 'browserSide', 'scraper.js') }); + } +} + + private async processTask(task: any) { + let browser: Browser | null = null; + const results: any[] = []; + + try { + browser = await chromium.launch({ + headless: true, + args: [ + "--disable-blink-features=AutomationControlled", + "--disable-web-security", + "--disable-features=IsolateOrigins,site-per-process", + "--disable-site-isolation-trials", + "--disable-extensions", + "--no-sandbox", + "--disable-dev-shm-usage", + ] + }); + + const context = await browser.newContext(); + const page = await context.newPage(); + + await this.ensureScriptsLoaded(page); + + for (const url of task.urls) { + try { + await page.goto(url, { + waitUntil: 'networkidle', + timeout: 30000 + }); + + const pageResults = await page.evaluate( + (cfg) => window.scrapeList(cfg), + task.config + ); + + results.push(...pageResults); + } catch (error) { + console.error(`Error processing URL ${url}:`, error); + } + } + + await page.close(); + } finally { + if (browser) await browser.close(); + } + + return results; + } + + private async handleError(message: any, error: Error) { + const retryCount = parseInt(message.headers['retry-count'] || '0'); + + if (retryCount < 3) { + await this.producer.send({ + topic: kafkaConfig.topics.SCRAPING_TASKS, + messages: [{ + key: message.key, + value: message.value, + headers: { + 'retry-count': (retryCount + 1).toString(), + 'error': error.message + } + }] + }); + } else { + await this.producer.send({ + topic: kafkaConfig.topics.SCRAPING_DLQ, + messages: [{ + key: message.key, + value: message.value, + headers: { + 'final-error': error.message + } + }] + }); + } + } +} \ No newline at end of file From 73c2cc3c0db5cdd2056808eed8c795b0ae180345 Mon Sep 17 00:00:00 2001 From: Rohit Date: Mon, 20 Jan 2025 13:01:35 +0530 Subject: [PATCH 16/21] feat: add kafka util to consume task data and produce messages --- maxun-core/src/utils/scraping-consumer.ts | 115 +++++++++++++++++++--- 1 file changed, 104 insertions(+), 11 deletions(-) diff --git a/maxun-core/src/utils/scraping-consumer.ts b/maxun-core/src/utils/scraping-consumer.ts index cbb54b5e6..2e6e79edf 100644 --- a/maxun-core/src/utils/scraping-consumer.ts +++ b/maxun-core/src/utils/scraping-consumer.ts @@ -20,6 +20,13 @@ export class ScrapingConsumer { private kafka: Kafka; private consumer: Consumer; private producer: Producer; + private processedWorkflows: Map>; + private workflowStats: Map; constructor() { this.kafka = new Kafka({ @@ -28,9 +35,14 @@ export class ScrapingConsumer { }); this.consumer = this.kafka.consumer({ - groupId: kafkaConfig.consumerGroup + groupId: kafkaConfig.consumerGroup, + sessionTimeout: 30000, + heartbeatInterval: 3000, + maxWaitTimeInMs: 1000, }); this.producer = this.kafka.producer(); + this.processedWorkflows = new Map(); + this.workflowStats = new Map(); } async start() { @@ -38,26 +50,83 @@ export class ScrapingConsumer { await this.producer.connect(); await this.consumer.subscribe({ topic: kafkaConfig.topics.SCRAPING_TASKS, - fromBeginning: true + fromBeginning: false }); await this.consumer.run({ - partitionsConsumedConcurrently: 3, + partitionsConsumedConcurrently: 4, + autoCommit: false, eachMessage: async ({ topic, partition, message }) => { try { const task = JSON.parse(message.value!.toString()); + const workflowId = task.workflowId; + + // Initialize workflow tracking if needed + if (!this.processedWorkflows.has(workflowId)) { + this.processedWorkflows.set(workflowId, new Set()); + this.workflowStats.set(workflowId, { + startTime: Date.now(), + totalTasks: parseInt(message.headers['total-tasks']?.toString() || '0'), + processedTasks: 0, + totalItems: 0 + }); + } + + // Check if this task was already processed within its workflow + if (this.processedWorkflows.get(workflowId)?.has(task.taskId)) { + console.log(`Task ${task.taskId} from workflow ${workflowId} already processed`); + await this.consumer.commitOffsets([{ + topic, + partition, + offset: (Number(message.offset) + 1).toString() + }]); + return; + } + const results = await this.processTask(task); + + const stats = this.workflowStats.get(workflowId); + if (stats) { + stats.processedTasks += 1; + stats.totalItems += results.length; + + console.log( + `Workflow ${workflowId} progress: ` + + `${stats.processedTasks}/${stats.totalTasks} tasks, ` + + `${stats.totalItems} items collected` + ); + } + // Send results with workflow context await this.producer.send({ topic: kafkaConfig.topics.SCRAPING_RESULTS, messages: [{ key: task.taskId, value: JSON.stringify({ taskId: task.taskId, + workflowId: task.workflowId, data: results - }) + }), + // Add workflow headers for better tracking + headers: { + 'workflow-id': task.workflowId, + 'items-count': results.length.toString() + } }] }); + + // Mark task as processed within its workflow + this.processedWorkflows.get(workflowId)?.add(task.taskId); + + // Clean up old workflows periodically + this.cleanupOldWorkflows(); + + await this.consumer.commitOffsets([{ + topic, + partition, + offset: (Number(message.offset) + 1).toString() + }]); + } catch (error) { await this.handleError(message, error); } @@ -74,7 +143,8 @@ export class ScrapingConsumer { private async processTask(task: any) { let browser: Browser | null = null; - const results: any[] = []; + let scrapedItems: Set = new Set(); + let allResults: Record[] = []; try { browser = await chromium.launch({ @@ -102,12 +172,20 @@ export class ScrapingConsumer { timeout: 30000 }); - const pageResults = await page.evaluate( - (cfg) => window.scrapeList(cfg), - task.config - ); + await page.waitForTimeout(1000); - results.push(...pageResults); + const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), task.config); + + // Filter out already scraped items + const newResults = pageResults.filter(item => { + const uniqueKey = JSON.stringify(item); + if (scrapedItems.has(uniqueKey)) return false; // Ignore if already scraped + scrapedItems.add(uniqueKey); // Mark as scraped + return true; + }); + + allResults = allResults.concat(newResults); + console.log(`Results so far (${task.taskId}): ${allResults.length}`); } catch (error) { console.error(`Error processing URL ${url}:`, error); } @@ -118,11 +196,24 @@ export class ScrapingConsumer { if (browser) await browser.close(); } - return results; + return allResults; + } + + private async cleanupOldWorkflows() { + const ONE_HOUR = 60 * 60 * 1000; + const now = Date.now(); + + for (const [workflowId] of this.processedWorkflows) { + const workflowTimestamp = parseInt(workflowId.split('-')[1]); + if (now - workflowTimestamp > ONE_HOUR) { + this.processedWorkflows.delete(workflowId); + } + } } private async handleError(message: any, error: Error) { const retryCount = parseInt(message.headers['retry-count'] || '0'); + const task = JSON.parse(message.value!.toString()); if (retryCount < 3) { await this.producer.send({ @@ -131,6 +222,7 @@ export class ScrapingConsumer { key: message.key, value: message.value, headers: { + 'workflow-id': task.workflowId, 'retry-count': (retryCount + 1).toString(), 'error': error.message } @@ -143,6 +235,7 @@ export class ScrapingConsumer { key: message.key, value: message.value, headers: { + 'workflow-id': task.workflowId, 'final-error': error.message } }] From 698440168c458a6463b2d2a12d9e04fbca11c5ce Mon Sep 17 00:00:00 2001 From: Rohit Date: Mon, 20 Jan 2025 13:10:55 +0530 Subject: [PATCH 17/21] feat: add parallel scraping support using kafka --- maxun-core/src/interpret.ts | 171 +++++++++++++++++++++++++++++++++++- 1 file changed, 169 insertions(+), 2 deletions(-) diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index b19c8fa43..31c1b4161 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -16,6 +16,9 @@ import Concurrency from './utils/concurrency'; import Preprocessor from './preprocessor'; import log, { Level } from './utils/logger'; +import { Kafka } from 'kafkajs'; +import { kafkaConfig } from './config/kafka'; + import os from 'os'; /** @@ -41,6 +44,7 @@ declare global { interface InterpreterOptions { maxRepeats: number; maxConcurrency: number; + maxWorkers: number; serializableCallback: (output: any) => (void | Promise); binaryCallback: (output: any, mimeType: string) => (void | Promise); debug: boolean; @@ -70,13 +74,31 @@ export default class Interpreter extends EventEmitter { private cumulativeResults: Record[] = []; + private kafka: Kafka; + + private producer: any; + + private async initializeKafka() { + this.producer = this.kafka.producer({ + allowAutoTopicCreation: true, + idempotent: true + }); + await this.producer.connect(); + } + constructor(workflow: WorkflowFile, options?: Partial) { super(); this.workflow = workflow.workflow; this.initializedWorkflow = null; + this.kafka = new Kafka({ + clientId: kafkaConfig.clientId, + brokers: kafkaConfig.brokers + }); + this.initializeKafka(); this.options = { maxRepeats: 5, maxConcurrency: 5, + maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)), serializableCallback: (data) => { log(JSON.stringify(data), Level.WARN); }, @@ -546,11 +568,14 @@ export default class Interpreter extends EventEmitter { if (config.limit > 10000 && config.pagination.type === 'clickNext') { console.time('parallel-scraping'); + const workflowId = `workflow-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; + console.log(`Starting workflow with ID: ${workflowId}`); + const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, 4)); const batchSize = Math.ceil(config.limit / numWorkers); + const tasks = []; const pageUrls: string[] = []; - let workers: any = null; let availableSelectors = config.pagination.selector.split(','); let visitedUrls: string[] = []; @@ -661,12 +686,154 @@ export default class Interpreter extends EventEmitter { console.log(`Collected ${pageUrls.length} unique page URLs`); - + for (let i = 0; i < numWorkers; i++) { + const startIndex = i * batchSize; + const endIndex = Math.min((i + 1) * batchSize, config.limit); + const workerUrls = pageUrls.slice( + i * Math.ceil(pageUrls.length / numWorkers), + (i + 1) * Math.ceil(pageUrls.length / numWorkers) + ); + + const task = { + taskId: `${workflowId}-task-${i}`, + workflowId, + urls: workerUrls, + config: { + listSelector: config.listSelector, + fields: config.fields, + pagination: config.pagination, + batchSize: endIndex - startIndex, + startIndex, + endIndex + } + }; + + await this.producer.send({ + topic: kafkaConfig.topics.SCRAPING_TASKS, + messages: [{ + key: task.taskId, + value: JSON.stringify(task), + headers: { + 'workflow-id': workflowId, + 'retry-count': '0', + 'total-tasks': numWorkers.toString() + } + }] + }); + + tasks.push(task); + } + + console.log("TASKS SENT TO KAFKA (Not stringified)", tasks); + + // Wait for results from Kafka + const results = await this.waitForScrapingResults(tasks); + console.timeEnd('parallel-scraping'); + return results; } return this.handlePagination(page, config); } + private async waitForScrapingResults(tasks: any[]): Promise { + // Create a map to store our workflow's results + const resultsMap = new Map(); + + // Extract the workflow ID from the first task - all tasks in this batch will share the same workflow ID + const workflowId = tasks[0].workflowId; + console.log(`Waiting for results from workflow: ${workflowId}`); + + // Create a Set of task IDs for quick lookup - these are the only tasks we care about + const expectedTaskIds = new Set(tasks.map(task => task.taskId)); + + // Create a consumer specifically for this workflow + const resultConsumer = this.kafka.consumer({ + groupId: `scraping-group-results-${workflowId}`, + maxWaitTimeInMs: 1000, + maxBytesPerPartition: 2097152 // 2MB + }); + + try { + await resultConsumer.connect(); + console.log('Result consumer connected successfully'); + + await resultConsumer.subscribe({ + topic: kafkaConfig.topics.SCRAPING_RESULTS, + fromBeginning: true + }); + console.log('Result consumer subscribed to topic successfully'); + + return new Promise((resolve, reject) => { + let isRunning = true; + + resultConsumer.run({ + eachMessage: async ({ topic, partition, message }) => { + if (!isRunning) return; + + try { + const result = JSON.parse(message.value!.toString()); + + // Verify both task ID and workflow ID match + if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) { + // Store this task's results + if (!resultsMap.has(result.taskId)) { + resultsMap.set(result.taskId, result.data); + console.log(`Received results for task ${result.taskId}. ` + + `Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`); + } + + // Check if we have all our workflow's results + if (resultsMap.size === tasks.length) { + isRunning = false; + + // Sort tasks by their numeric index (extract number from task ID) + const sortedTasks = [...tasks].sort((a, b) => { + const aIndex = parseInt(a.taskId.split('-').pop() || '0'); + const bIndex = parseInt(b.taskId.split('-').pop() || '0'); + return aIndex - bIndex; + }); + + // Combine results in the sorted task order + const allResults = sortedTasks + .map(task => { + const taskResults = resultsMap.get(task.taskId); + if (!taskResults) { + console.warn(`Missing results for task ${task.taskId} in workflow ${workflowId}`); + return []; + } + return taskResults; + }) + .flat(); + + console.log(`Successfully collected all results from workflow ${workflowId}`); + + resolve(allResults); + } + } + } catch (error) { + console.error(`Error processing message in workflow ${workflowId}:`, error); + reject(error); + } + } + }); + + // // Add a timeout to prevent hanging + // const timeout = setTimeout(() => { + // if (isRunning) { + // isRunning = false; + // console.error(`Timeout waiting for results from workflow ${workflowId}. ` + + // `Received ${resultsMap.size} of ${tasks.length} expected results.`); + // reject(new Error(`Timeout waiting for results from workflow ${workflowId}`)); + // } + // }, 30000); // 30 second timeout + }); + + } catch (error) { + console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error); + throw error; + } + } + private async handlePagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) { let allResults: Record[] = []; let previousHeight = 0; From 710574967839fef783d3895a7d905e561dd921c1 Mon Sep 17 00:00:00 2001 From: Rohit Date: Mon, 20 Jan 2025 13:12:25 +0530 Subject: [PATCH 18/21] feat: add initial kafka setup script --- maxun-core/src/scripts/setup-kafka.ts | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 maxun-core/src/scripts/setup-kafka.ts diff --git a/maxun-core/src/scripts/setup-kafka.ts b/maxun-core/src/scripts/setup-kafka.ts new file mode 100644 index 000000000..99b4a2796 --- /dev/null +++ b/maxun-core/src/scripts/setup-kafka.ts @@ -0,0 +1,23 @@ +import { KafkaManager } from '../utils/kafka-manager'; + +async function setupKafka() { + const manager = new KafkaManager(); + + try { + console.log('Initializing Kafka manager...'); + await manager.initialize(); + console.log('Kafka setup completed successfully'); + + // Keep monitoring for a while to verify setup + setTimeout(async () => { + await manager.cleanup(); + process.exit(0); + }, 10000); + + } catch (error) { + console.error('Failed to setup Kafka:', error); + process.exit(1); + } +} + +setupKafka().catch(console.error); \ No newline at end of file From a931a1387f66573736c3a322f6b148f025cd739b Mon Sep 17 00:00:00 2001 From: Rohit Date: Mon, 20 Jan 2025 13:12:53 +0530 Subject: [PATCH 19/21] feat: add start consumer kafka script --- maxun-core/src/scripts/start-consumer.ts | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 maxun-core/src/scripts/start-consumer.ts diff --git a/maxun-core/src/scripts/start-consumer.ts b/maxun-core/src/scripts/start-consumer.ts new file mode 100644 index 000000000..5b96ee6e9 --- /dev/null +++ b/maxun-core/src/scripts/start-consumer.ts @@ -0,0 +1,22 @@ +import { ScrapingConsumer } from '../utils/scraping-consumer'; + +async function main() { + const consumer = new ScrapingConsumer(); + + // Handle graceful shutdown + process.on('SIGINT', async () => { + console.log('Shutting down consumer...'); + process.exit(0); + }); + + try { + console.log('Starting scraping consumer...'); + await consumer.start(); + console.log('Consumer is running and waiting for tasks...'); + } catch (error) { + console.error('Failed to start consumer:', error); + process.exit(1); + } +} + +main().catch(console.error); \ No newline at end of file From 5411484e4c19b974f3024ab66ab6a875017c51d4 Mon Sep 17 00:00:00 2001 From: Rohit Date: Mon, 20 Jan 2025 13:29:17 +0530 Subject: [PATCH 20/21] feat: add limit in task config --- maxun-core/src/interpret.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index 31c1b4161..cfef300f3 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -702,7 +702,7 @@ export default class Interpreter extends EventEmitter { listSelector: config.listSelector, fields: config.fields, pagination: config.pagination, - batchSize: endIndex - startIndex, + limit: endIndex - startIndex, startIndex, endIndex } From 76507943bf92fcde9f90258e6b6ecdd2ee4dd56c Mon Sep 17 00:00:00 2001 From: Rohit Date: Mon, 20 Jan 2025 14:23:01 +0530 Subject: [PATCH 21/21] chore: add kafka services --- docker-compose.yml | 60 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index b571cc6f2..0551d1d27 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,6 +39,60 @@ services: volumes: - minio_data:/data + zookeeper: + image: confluentinc/cp-zookeeper:7.4.0 + environment: + ZOOKEEPER_CLIENT_PORT: ${ZOOKEEPER_PORT:-2181} + ZOOKEEPER_TICK_TIME: 2000 + ports: + - "${ZOOKEEPER_PORT:-2181}:2181" + volumes: + - zookeeper_data:/var/lib/zookeeper/data + - zookeeper_log:/var/lib/zookeeper/log + healthcheck: + test: ["CMD-SHELL", "zookeeper-shell.sh localhost:${ZOOKEEPER_PORT:-2181} ls /"] + interval: 10s + timeout: 5s + retries: 5 + + # Add Kafka + kafka: + image: confluentinc/cp-kafka:7.4.0 + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:${ZOOKEEPER_PORT:-2181} + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + ports: + - "${KAFKA_PORT:-9092}:9092" + - "${KAFKA_EXTERNAL_PORT:-29092}:29092" + volumes: + - kafka_data:/var/lib/kafka/data + healthcheck: + test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list"] + interval: 10s + timeout: 5s + retries: 5 + + kafka-ui: + image: provectuslabs/kafka-ui:latest + container_name: kafka-ui + ports: + - "${KAFKA_UI_PORT:-9080}:8080" + environment: + KAFKA_CLUSTERS_0_NAME: maxun-cluster + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 + depends_on: + - kafka + backend: #build: #context: . @@ -63,6 +117,7 @@ services: - postgres - redis - minio + - kafka volumes: - /var/run/dbus:/var/run/dbus @@ -83,4 +138,7 @@ services: volumes: postgres_data: minio_data: - redis_data: \ No newline at end of file + redis_data: + kafka_data: + zookeeper_data: + zookeeper_log: \ No newline at end of file