Skip to content

Commit e81a4ed

Browse files
committed
feat: Add configurable timeout to FetchNode
- Add timeout parameter to FetchNode (default: 30 seconds) - Apply timeout to requests.get() calls to prevent indefinite hangs - Implement timeout for PDF parsing using ThreadPoolExecutor - Propagate timeout to ChromiumLoader via loader_kwargs - Add comprehensive unit tests for timeout functionality - Fully backward compatible (timeout can be disabled with None) Fixes issue with requests.get() and PDF parsing blocking indefinitely on slow/unresponsive servers or large documents. Usage: node_config={'timeout': 30} # Custom timeout node_config={'timeout': None} # Disable timeout node_config={} # Use default 30s timeout
1 parent 365761a commit e81a4ed

File tree

2 files changed

+270
-2
lines changed

2 files changed

+270
-2
lines changed

scrapegraphai/nodes/fetch_node.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import json
66
from typing import List, Optional
7+
import concurrent.futures
78

89
import requests
910
from langchain_community.document_loaders import PyPDFLoader
@@ -68,6 +69,10 @@ def __init__(
6869
else node_config.get("openai_md_enabled", False)
6970
)
7071

72+
# Timeout in seconds for blocking operations (HTTP requests, PDF parsing, etc.).
73+
# If set to None, no timeout will be applied.
74+
self.timeout = None if node_config is None else node_config.get("timeout", 30)
75+
7176
self.cut = False if node_config is None else node_config.get("cut", True)
7277

7378
self.browser_base = (
@@ -174,7 +179,19 @@ def load_file_content(self, source, input_type):
174179

175180
if input_type == "pdf":
176181
loader = PyPDFLoader(source)
177-
return loader.load()
182+
# PyPDFLoader.load() can be blocking for large PDFs. Run it in a thread and
183+
# enforce the configured timeout if provided.
184+
if self.timeout is None:
185+
return loader.load()
186+
else:
187+
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
188+
future = executor.submit(loader.load)
189+
try:
190+
return future.result(timeout=self.timeout)
191+
except concurrent.futures.TimeoutError:
192+
raise TimeoutError(
193+
f"PDF parsing exceeded timeout of {self.timeout} seconds"
194+
)
178195
elif input_type == "csv":
179196
try:
180197
import pandas as pd
@@ -260,7 +277,12 @@ def handle_web_source(self, state, source):
260277

261278
self.logger.info(f"--- (Fetching HTML from: {source}) ---")
262279
if self.use_soup:
263-
response = requests.get(source)
280+
# Apply configured timeout to blocking HTTP requests. If timeout is None,
281+
# don't pass the timeout argument (requests will block until completion).
282+
if self.timeout is None:
283+
response = requests.get(source)
284+
else:
285+
response = requests.get(source, timeout=self.timeout)
264286
if response.status_code == 200:
265287
if not response.text.strip():
266288
raise ValueError("No HTML body content found in the response.")
@@ -286,6 +308,11 @@ def handle_web_source(self, state, source):
286308
if self.node_config:
287309
loader_kwargs = self.node_config.get("loader_kwargs", {})
288310

311+
# If a global timeout is configured on the node and no loader-specific timeout
312+
# was provided, propagate it to ChromiumLoader so it can apply the same limit.
313+
if "timeout" not in loader_kwargs and self.timeout is not None:
314+
loader_kwargs["timeout"] = self.timeout
315+
289316
if self.browser_base:
290317
try:
291318
from ..docloaders.browser_base import browser_base_fetch

tests/test_fetch_node_timeout.py

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
"""
2+
Unit tests for FetchNode timeout functionality.
3+
4+
These tests verify that:
5+
1. The timeout configuration is properly read and stored
6+
2. HTTP requests use the configured timeout
7+
3. PDF parsing respects the timeout
8+
4. Timeout is propagated to ChromiumLoader via loader_kwargs
9+
"""
10+
import sys
11+
import time
12+
import unittest
13+
from unittest.mock import Mock, patch, MagicMock
14+
from pathlib import Path
15+
16+
# Add the project root to path to import modules
17+
sys.path.insert(0, str(Path(__file__).parent.parent))
18+
19+
20+
class TestFetchNodeTimeout(unittest.TestCase):
21+
"""Test suite for FetchNode timeout configuration and usage."""
22+
23+
def setUp(self):
24+
"""Set up test fixtures."""
25+
# Mock all the heavy external dependencies at import time
26+
self.mock_modules = {}
27+
for module in ['langchain_core', 'langchain_core.documents',
28+
'langchain_community', 'langchain_community.document_loaders',
29+
'langchain_openai', 'minify_html', 'pydantic',
30+
'langchain', 'langchain.prompts']:
31+
if module not in sys.modules:
32+
sys.modules[module] = MagicMock()
33+
34+
# Create mock Document class
35+
class MockDocument:
36+
def __init__(self, page_content, metadata=None):
37+
self.page_content = page_content
38+
self.metadata = metadata or {}
39+
40+
sys.modules['langchain_core.documents'].Document = MockDocument
41+
42+
# Create mock PyPDFLoader
43+
class MockPyPDFLoader:
44+
def __init__(self, source):
45+
self.source = source
46+
47+
def load(self):
48+
time.sleep(0.1) # Simulate some work
49+
return [MockDocument(page_content=f"PDF content from {self.source}")]
50+
51+
sys.modules['langchain_community.document_loaders'].PyPDFLoader = MockPyPDFLoader
52+
53+
# Now import FetchNode
54+
from scrapegraphai.nodes.fetch_node import FetchNode
55+
self.FetchNode = FetchNode
56+
57+
def tearDown(self):
58+
"""Clean up after tests."""
59+
# Remove mocked modules
60+
for module in list(sys.modules.keys()):
61+
if 'langchain' in module or module in ['minify_html', 'pydantic']:
62+
if module in self.mock_modules or module.startswith('langchain'):
63+
sys.modules.pop(module, None)
64+
65+
def test_timeout_default_value(self):
66+
"""Test that default timeout is set to 30 seconds."""
67+
node = self.FetchNode(
68+
input="url",
69+
output=["doc"],
70+
node_config={}
71+
)
72+
self.assertEqual(node.timeout, 30)
73+
74+
def test_timeout_custom_value(self):
75+
"""Test that custom timeout value is properly stored."""
76+
node = self.FetchNode(
77+
input="url",
78+
output=["doc"],
79+
node_config={"timeout": 10}
80+
)
81+
self.assertEqual(node.timeout, 10)
82+
83+
def test_timeout_none_value(self):
84+
"""Test that timeout can be disabled by setting to None."""
85+
node = self.FetchNode(
86+
input="url",
87+
output=["doc"],
88+
node_config={"timeout": None}
89+
)
90+
self.assertIsNone(node.timeout)
91+
92+
def test_timeout_no_config(self):
93+
"""Test that timeout defaults to 30 when no node_config provided."""
94+
node = self.FetchNode(
95+
input="url",
96+
output=["doc"],
97+
node_config=None
98+
)
99+
self.assertEqual(node.timeout, 30)
100+
101+
@patch('scrapegraphai.nodes.fetch_node.requests')
102+
def test_requests_get_with_timeout(self, mock_requests):
103+
"""Test that requests.get is called with timeout when use_soup=True."""
104+
mock_response = Mock()
105+
mock_response.status_code = 200
106+
mock_response.text = "<html><body>Test content</body></html>"
107+
mock_requests.get.return_value = mock_response
108+
109+
node = self.FetchNode(
110+
input="url",
111+
output=["doc"],
112+
node_config={"use_soup": True, "timeout": 15}
113+
)
114+
115+
# Execute with a URL
116+
state = {"url": "https://example.com"}
117+
node.execute(state)
118+
119+
# Verify requests.get was called with timeout
120+
mock_requests.get.assert_called_once()
121+
call_args = mock_requests.get.call_args
122+
self.assertEqual(call_args[1].get('timeout'), 15)
123+
124+
@patch('scrapegraphai.nodes.fetch_node.requests')
125+
def test_requests_get_without_timeout_when_none(self, mock_requests):
126+
"""Test that requests.get is called without timeout argument when timeout=None."""
127+
mock_response = Mock()
128+
mock_response.status_code = 200
129+
mock_response.text = "<html><body>Test content</body></html>"
130+
mock_requests.get.return_value = mock_response
131+
132+
node = self.FetchNode(
133+
input="url",
134+
output=["doc"],
135+
node_config={"use_soup": True, "timeout": None}
136+
)
137+
138+
# Execute with a URL
139+
state = {"url": "https://example.com"}
140+
node.execute(state)
141+
142+
# Verify requests.get was called without timeout
143+
mock_requests.get.assert_called_once()
144+
call_args = mock_requests.get.call_args
145+
self.assertNotIn('timeout', call_args[1])
146+
147+
def test_pdf_parsing_with_timeout(self):
148+
"""Test that PDF parsing completes within timeout."""
149+
node = self.FetchNode(
150+
input="pdf",
151+
output=["doc"],
152+
node_config={"timeout": 5}
153+
)
154+
155+
# Execute with a PDF file
156+
state = {"pdf": "test.pdf"}
157+
result = node.execute(state)
158+
159+
# Should complete successfully
160+
self.assertIn("doc", result)
161+
self.assertIsNotNone(result["doc"])
162+
163+
def test_pdf_parsing_timeout_exceeded(self):
164+
"""Test that PDF parsing raises TimeoutError when timeout is exceeded."""
165+
# Create a mock loader that takes longer than timeout
166+
class SlowPyPDFLoader:
167+
def __init__(self, source):
168+
self.source = source
169+
170+
def load(self):
171+
time.sleep(2) # Sleep longer than timeout
172+
return []
173+
174+
with patch('scrapegraphai.nodes.fetch_node.PyPDFLoader', SlowPyPDFLoader):
175+
node = self.FetchNode(
176+
input="pdf",
177+
output=["doc"],
178+
node_config={"timeout": 0.5} # Very short timeout
179+
)
180+
181+
# Execute should raise TimeoutError
182+
state = {"pdf": "slow.pdf"}
183+
with self.assertRaises(TimeoutError) as context:
184+
node.execute(state)
185+
186+
self.assertIn("PDF parsing exceeded timeout", str(context.exception))
187+
188+
@patch('scrapegraphai.nodes.fetch_node.ChromiumLoader')
189+
def test_timeout_propagated_to_chromium_loader(self, mock_loader_class):
190+
"""Test that timeout is propagated to ChromiumLoader via loader_kwargs."""
191+
mock_loader = Mock()
192+
mock_doc = Mock()
193+
mock_doc.page_content = "<html>Test</html>"
194+
mock_loader.load.return_value = [mock_doc]
195+
mock_loader_class.return_value = mock_loader
196+
197+
node = self.FetchNode(
198+
input="url",
199+
output=["doc"],
200+
node_config={"timeout": 20, "headless": True}
201+
)
202+
203+
# Execute with a URL (not using soup, so ChromiumLoader is used)
204+
state = {"url": "https://example.com"}
205+
node.execute(state)
206+
207+
# Verify ChromiumLoader was instantiated with timeout in kwargs
208+
mock_loader_class.assert_called_once()
209+
call_kwargs = mock_loader_class.call_args[1]
210+
self.assertEqual(call_kwargs.get('timeout'), 20)
211+
212+
@patch('scrapegraphai.nodes.fetch_node.ChromiumLoader')
213+
def test_timeout_not_overridden_in_loader_kwargs(self, mock_loader_class):
214+
"""Test that existing timeout in loader_kwargs is not overridden."""
215+
mock_loader = Mock()
216+
mock_doc = Mock()
217+
mock_doc.page_content = "<html>Test</html>"
218+
mock_loader.load.return_value = [mock_doc]
219+
mock_loader_class.return_value = mock_loader
220+
221+
node = self.FetchNode(
222+
input="url",
223+
output=["doc"],
224+
node_config={
225+
"timeout": 20,
226+
"loader_kwargs": {"timeout": 50} # Explicit loader timeout
227+
}
228+
)
229+
230+
# Execute with a URL
231+
state = {"url": "https://example.com"}
232+
node.execute(state)
233+
234+
# Verify ChromiumLoader got the loader_kwargs timeout, not node timeout
235+
mock_loader_class.assert_called_once()
236+
call_kwargs = mock_loader_class.call_args[1]
237+
self.assertEqual(call_kwargs.get('timeout'), 50)
238+
239+
240+
if __name__ == '__main__':
241+
unittest.main()

0 commit comments

Comments
 (0)