From 3fd732b729fa35048f61d3f5e6a4a542509f10da Mon Sep 17 00:00:00 2001 From: Xin Jin Date: Wed, 1 Oct 2025 17:47:00 -0700 Subject: [PATCH 01/11] Add batching to create_examples --- python/langsmith/client.py | 105 +++++++++++------- python/tests/integration_tests/test_client.py | 59 ++++++++++ 2 files changed, 126 insertions(+), 38 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index f78a58f84..0cc7f21f5 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -4854,6 +4854,7 @@ def create_examples( dataset_id: Optional[ID_TYPE] = None, examples: Optional[Sequence[ls_schemas.ExampleCreate | dict]] = None, dangerously_allow_filesystem: bool = False, + batch_size: int = 200, **kwargs: Any, ) -> ls_schemas.UpsertExamplesResponse | dict[str, Any]: """Create examples in a dataset. @@ -4869,6 +4870,8 @@ def create_examples( The examples to create. dangerously_allow_filesystem (bool): Whether to allow uploading files from the filesystem. + batch_size (int): + upload examples in batches of this size, default is 200. **kwargs (Any): Legacy keyword args. Should not be specified if 'examples' is specified. - inputs (Sequence[Mapping[str, Any]]): The input values for the examples. @@ -4986,35 +4989,53 @@ def create_examples( ) ] - if (self.info.instance_flags or {}).get( - "dataset_examples_multipart_enabled", False - ): - return self._upload_examples_multipart( - dataset_id=cast(uuid.UUID, dataset_id), - uploads=uploads, - dangerously_allow_filesystem=dangerously_allow_filesystem, - ) - else: - for upload in uploads: - if getattr(upload, "attachments") is not None: - upload.attachments = None - warnings.warn( - "Must upgrade your LangSmith version to use attachments." - ) - # fallback to old method - response = self.request_with_retries( - "POST", - "/examples/bulk", - headers={**self._headers, "Content-Type": "application/json"}, - data=_dumps_json( - [ + if not uploads: + return ls_schemas.UpsertExamplesResponse(example_ids=[], count=0) + + # Batch large uploads for memory efficiency + all_example_ids = [] + total_count = 0 + + for i in range(0, len(uploads), batch_size): + batch = uploads[i:i + batch_size] + + if (self.info.instance_flags or {}).get( + "dataset_examples_multipart_enabled", False + ): + response = self._upload_examples_multipart( + dataset_id=cast(uuid.UUID, dataset_id), + uploads=batch, + dangerously_allow_filesystem=dangerously_allow_filesystem, + ) + all_example_ids.extend(response.example_ids or []) + total_count += response.count or 0 + else: + # Strip attachments for legacy endpoint + for upload in batch: + if getattr(upload, "attachments") is not None: + upload.attachments = None + warnings.warn( + "Must upgrade your LangSmith version to use attachments." + ) + + response = self.request_with_retries( + "POST", + "/examples/bulk", + headers={**self._headers, "Content-Type": "application/json"}, + data=_dumps_json([ {**dump_model(upload), "dataset_id": str(dataset_id)} - for upload in uploads - ] - ), - ) - ls_utils.raise_for_status_with_text(response) - return response.json() + for upload in batch + ]), + ) + ls_utils.raise_for_status_with_text(response) + response_data = response.json() + all_example_ids.extend(response_data.get("example_ids", [])) + total_count += response_data.get("count", 0) + + return ls_schemas.UpsertExamplesResponse( + example_ids=all_example_ids, + count=total_count, + ) @ls_utils.xor_args(("dataset_id", "dataset_name")) def create_example( @@ -5768,24 +5789,30 @@ def delete_example(self, example_id: ID_TYPE) -> None: ) ls_utils.raise_for_status_with_text(response) - def delete_examples(self, example_ids: Sequence[ID_TYPE]) -> None: + def delete_examples(self, example_ids: Sequence[ID_TYPE], hard_delete: bool = False) -> None: """Delete multiple examples by ID. Parameters ---------- example_ids : Sequence[ID_TYPE] The IDs of the examples to delete. + hard_delete : bool, optional + Whether to permanently delete the examples. Default is False. """ + params = { + "example_ids": [ + str(_as_uuid(id_, f"example_ids[{i}]")) + for i, id_ in enumerate(example_ids) + ] + } + if hard_delete: + params["hard_delete"] = hard_delete + response = self.request_with_retries( "DELETE", - "/examples", - headers={**self._headers, "Content-Type": "application/json"}, - params={ - "example_ids": [ - str(_as_uuid(id_, f"example_ids[{i}]")) - for i, id_ in enumerate(example_ids) - ] - }, + "/api/v1/examples", + headers=self._headers, + params=params, ) ls_utils.raise_for_status_with_text(response) @@ -8372,7 +8399,9 @@ def get_experiment_results( print(f"P50 latency: {results['stats'].latency_p50}") """ - if name and not project_id: + if not (name or project_id): + raise ValueError("Either name or project_id must be provided.") + elif not project_id: projects = list(self.list_projects(name=name)) if not projects: raise ValueError(f"No experiment found with name: '{name}'") diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 3dcb3b6d8..ad9ea513e 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -2579,6 +2579,65 @@ def test_create_examples_errors(langchain_client: Client) -> None: safe_delete_dataset(langchain_client, dataset_id=dataset.id) +def test_create_examples_batching(parameterized_multipart_client: Client) -> None: + """Test create_examples batching with large numbers of examples.""" + dataset_name = "__test_batching_" + uuid4().hex[:4] + dataset = _create_dataset(parameterized_multipart_client, dataset_name) + + # Test batching with 250 examples (> default batch_size=200) + examples = [ + {"inputs": {"q": f"Q{i}"}, "outputs": {"a": f"A{i}"}} + for i in range(250) + ] + + result = parameterized_multipart_client.create_examples( + dataset_id=dataset.id, examples=examples, batch_size=100 + ) + + assert result["count"] == 250 + assert len(result["example_ids"]) == 250 + + # Verify examples exist + listed = list(parameterized_multipart_client.list_examples(dataset_id=dataset.id)) + assert len(listed) == 250 + + safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id) + + +def test_create_examples_large_multipart_batching(parameterized_multipart_client: Client) -> None: + """Test create_examples batching with large multipart payloads.""" + dataset_name = "__test_large_multipart_" + uuid4().hex[:4] + dataset = _create_dataset(parameterized_multipart_client, dataset_name) + + # Create examples with large attachments to simulate >20MB payload + large_data = b"x" * 500_000 # 500KB per attachment + examples = [ + { + "inputs": {"question": f"What's in image {i}?"}, + "outputs": {"answer": f"Image {i} content"}, + "attachments": { + f"image_{i}": ("image/png", large_data), + f"doc_{i}": ("text/plain", large_data), + } + } + for i in range(50) # ~50MB total payload + ] + + result = parameterized_multipart_client.create_examples( + dataset_id=dataset.id, examples=examples, batch_size=15 # Force small batches + ) + + assert result["count"] == 50 + assert len(result["example_ids"]) == 50 + + # Verify attachments were uploaded + first_example = parameterized_multipart_client.read_example(result["example_ids"][0]) + if hasattr(first_example, 'attachments') and first_example.attachments: + assert len(first_example.attachments) == 2 + + safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id) + + @pytest.mark.xfail(reason="Need to wait for backend changes to go endpoint") def test_use_source_run_io_multiple_examples(langchain_client: Client) -> None: dataset_name = "__test_use_source_run_io" + uuid4().hex[:4] From a55b706a7a223f7c3e83a7e24d3353303ad9ed04 Mon Sep 17 00:00:00 2001 From: Xin Jin Date: Wed, 1 Oct 2025 18:24:45 -0700 Subject: [PATCH 02/11] lint --- python/langsmith/client.py | 30 +++++++------- python/tests/integration_tests/test_client.py | 39 +++++++++++-------- 2 files changed, 39 insertions(+), 30 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 0cc7f21f5..fe187dad1 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -4991,14 +4991,14 @@ def create_examples( if not uploads: return ls_schemas.UpsertExamplesResponse(example_ids=[], count=0) - - # Batch large uploads for memory efficiency + + # Batch large uploads for memory efficiency all_example_ids = [] total_count = 0 - + for i in range(0, len(uploads), batch_size): - batch = uploads[i:i + batch_size] - + batch = uploads[i : i + batch_size] + if (self.info.instance_flags or {}).get( "dataset_examples_multipart_enabled", False ): @@ -5017,21 +5017,23 @@ def create_examples( warnings.warn( "Must upgrade your LangSmith version to use attachments." ) - + response = self.request_with_retries( "POST", "/examples/bulk", headers={**self._headers, "Content-Type": "application/json"}, - data=_dumps_json([ - {**dump_model(upload), "dataset_id": str(dataset_id)} - for upload in batch - ]), + data=_dumps_json( + [ + {**dump_model(upload), "dataset_id": str(dataset_id)} + for upload in batch + ] + ), ) ls_utils.raise_for_status_with_text(response) response_data = response.json() all_example_ids.extend(response_data.get("example_ids", [])) total_count += response_data.get("count", 0) - + return ls_schemas.UpsertExamplesResponse( example_ids=all_example_ids, count=total_count, @@ -5789,7 +5791,9 @@ def delete_example(self, example_id: ID_TYPE) -> None: ) ls_utils.raise_for_status_with_text(response) - def delete_examples(self, example_ids: Sequence[ID_TYPE], hard_delete: bool = False) -> None: + def delete_examples( + self, example_ids: Sequence[ID_TYPE], hard_delete: bool = False + ) -> None: """Delete multiple examples by ID. Parameters @@ -5807,7 +5811,7 @@ def delete_examples(self, example_ids: Sequence[ID_TYPE], hard_delete: bool = Fa } if hard_delete: params["hard_delete"] = hard_delete - + response = self.request_with_retries( "DELETE", "/api/v1/examples", diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index ad9ea513e..082c4c30c 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -2583,32 +2583,33 @@ def test_create_examples_batching(parameterized_multipart_client: Client) -> Non """Test create_examples batching with large numbers of examples.""" dataset_name = "__test_batching_" + uuid4().hex[:4] dataset = _create_dataset(parameterized_multipart_client, dataset_name) - + # Test batching with 250 examples (> default batch_size=200) examples = [ - {"inputs": {"q": f"Q{i}"}, "outputs": {"a": f"A{i}"}} - for i in range(250) + {"inputs": {"q": f"Q{i}"}, "outputs": {"a": f"A{i}"}} for i in range(250) ] - + result = parameterized_multipart_client.create_examples( dataset_id=dataset.id, examples=examples, batch_size=100 ) - + assert result["count"] == 250 assert len(result["example_ids"]) == 250 - + # Verify examples exist listed = list(parameterized_multipart_client.list_examples(dataset_id=dataset.id)) assert len(listed) == 250 - + safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id) -def test_create_examples_large_multipart_batching(parameterized_multipart_client: Client) -> None: +def test_create_examples_large_multipart_batching( + parameterized_multipart_client: Client, +) -> None: """Test create_examples batching with large multipart payloads.""" dataset_name = "__test_large_multipart_" + uuid4().hex[:4] dataset = _create_dataset(parameterized_multipart_client, dataset_name) - + # Create examples with large attachments to simulate >20MB payload large_data = b"x" * 500_000 # 500KB per attachment examples = [ @@ -2618,23 +2619,27 @@ def test_create_examples_large_multipart_batching(parameterized_multipart_client "attachments": { f"image_{i}": ("image/png", large_data), f"doc_{i}": ("text/plain", large_data), - } + }, } for i in range(50) # ~50MB total payload ] - + result = parameterized_multipart_client.create_examples( - dataset_id=dataset.id, examples=examples, batch_size=15 # Force small batches + dataset_id=dataset.id, + examples=examples, + batch_size=15, # Force small batches ) - + assert result["count"] == 50 assert len(result["example_ids"]) == 50 - + # Verify attachments were uploaded - first_example = parameterized_multipart_client.read_example(result["example_ids"][0]) - if hasattr(first_example, 'attachments') and first_example.attachments: + first_example = parameterized_multipart_client.read_example( + result["example_ids"][0] + ) + if hasattr(first_example, "attachments") and first_example.attachments: assert len(first_example.attachments) == 2 - + safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id) From 2955f317435b4cd19c6e638999cdb9956e489e2c Mon Sep 17 00:00:00 2001 From: Xin Jin Date: Thu, 2 Oct 2025 17:01:38 -0700 Subject: [PATCH 03/11] size based batch --- python/langsmith/client.py | 85 +++++++++++++++++-- python/tests/integration_tests/test_client.py | 6 +- 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index fe187dad1..e0254bcfb 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -4746,6 +4746,81 @@ def upload_examples_multipart( dangerously_allow_filesystem=dangerously_allow_filesystem, ) + def _estimate_example_size(self, example: ls_schemas.ExampleCreate) -> int: + """Estimate the size of an example in bytes for batching purposes.""" + size = 1000 # Base overhead for JSON structure and boundaries + + # Estimate JSON parts + if example.inputs: + size += len(_dumps_json(example.inputs)) + if example.outputs: + size += len(_dumps_json(example.outputs)) + if example.metadata: + size += len(_dumps_json(example.metadata)) + + # Estimate attachments (largest contributor) + if example.attachments: + for _, attachment in example.attachments.items(): + if isinstance(attachment, dict): + attachment_data = attachment["data"] + else: + _, attachment_data = attachment + + if isinstance(attachment_data, Path): + try: + size += os.path.getsize(attachment_data) + except (FileNotFoundError, OSError): + size += 1_000_000 # 1MB fallback estimate + else: + size += len(attachment_data) + size += 200 # Multipart headers overhead per attachment + + return size + + def _batch_examples_by_size_and_count( + self, + examples: list[ls_schemas.ExampleCreate], + max_batch_size_bytes: int = 20_000_000, # 20MB limit + max_batch_count: int = 200, + ) -> list[list[ls_schemas.ExampleCreate]]: + """Batch examples by both size and count limits.""" + batches = [] + current_batch = [] + current_size = 0 + + for example in examples: + example_size = self._estimate_example_size(example) + + # Handle oversized single examples + if example_size > max_batch_size_bytes: + # Flush current batch first + if current_batch: + batches.append(current_batch) + current_batch = [] + current_size = 0 + # Put oversized example in its own batch + batches.append([example]) + continue + + # Check if adding this example would exceed limits + size_exceeded = current_size + example_size > max_batch_size_bytes + count_exceeded = len(current_batch) >= max_batch_count + + # Start new batch if current batch would be too large + if current_batch and (size_exceeded or count_exceeded): + batches.append(current_batch) + current_batch = [example] + current_size = example_size + else: + current_batch.append(example) + current_size += example_size + + # Add final batch + if current_batch: + batches.append(current_batch) + + return batches + def _upload_examples_multipart( self, *, @@ -4854,7 +4929,6 @@ def create_examples( dataset_id: Optional[ID_TYPE] = None, examples: Optional[Sequence[ls_schemas.ExampleCreate | dict]] = None, dangerously_allow_filesystem: bool = False, - batch_size: int = 200, **kwargs: Any, ) -> ls_schemas.UpsertExamplesResponse | dict[str, Any]: """Create examples in a dataset. @@ -4870,8 +4944,6 @@ def create_examples( The examples to create. dangerously_allow_filesystem (bool): Whether to allow uploading files from the filesystem. - batch_size (int): - upload examples in batches of this size, default is 200. **kwargs (Any): Legacy keyword args. Should not be specified if 'examples' is specified. - inputs (Sequence[Mapping[str, Any]]): The input values for the examples. @@ -4992,13 +5064,14 @@ def create_examples( if not uploads: return ls_schemas.UpsertExamplesResponse(example_ids=[], count=0) - # Batch large uploads for memory efficiency + # Batch uploads by size and count for optimal performance all_example_ids = [] total_count = 0 - for i in range(0, len(uploads), batch_size): - batch = uploads[i : i + batch_size] + # Use size-aware batching to prevent payload limit errors + batches = self._batch_examples_by_size_and_count(uploads) + for batch in batches: if (self.info.instance_flags or {}).get( "dataset_examples_multipart_enabled", False ): diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 082c4c30c..0de1811d3 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -2590,7 +2590,7 @@ def test_create_examples_batching(parameterized_multipart_client: Client) -> Non ] result = parameterized_multipart_client.create_examples( - dataset_id=dataset.id, examples=examples, batch_size=100 + dataset_id=dataset.id, examples=examples ) assert result["count"] == 250 @@ -2625,9 +2625,7 @@ def test_create_examples_large_multipart_batching( ] result = parameterized_multipart_client.create_examples( - dataset_id=dataset.id, - examples=examples, - batch_size=15, # Force small batches + dataset_id=dataset.id, examples=examples ) assert result["count"] == 50 From 7a9a7295587e3fe83edeae6a97239ceff8396172 Mon Sep 17 00:00:00 2001 From: Xin Jin Date: Tue, 7 Oct 2025 15:33:39 -0700 Subject: [PATCH 04/11] fix node/langsmith 403 ci/cd error --- .mlc_config.json | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.mlc_config.json b/.mlc_config.json index 3fff32c22..44d430a31 100644 --- a/.mlc_config.json +++ b/.mlc_config.json @@ -1,3 +1,8 @@ { - "aliveStatusCodes": [429, 200] + "aliveStatusCodes": [429, 200], + "ignorePatterns": [ + { + "pattern": "^https://www\\.npmjs\\.com/" + } + ] } \ No newline at end of file From de99bd15b7219946599233925f5cdba56b51760d Mon Sep 17 00:00:00 2001 From: Xin Jin Date: Wed, 8 Oct 2025 13:24:22 -0700 Subject: [PATCH 05/11] batch by size only --- python/langsmith/client.py | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index cd7da1208..943322246 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -4758,7 +4758,6 @@ def _estimate_example_size(self, example: ls_schemas.ExampleCreate) -> int: """Estimate the size of an example in bytes for batching purposes.""" size = 1000 # Base overhead for JSON structure and boundaries - # Estimate JSON parts if example.inputs: size += len(_dumps_json(example.inputs)) if example.outputs: @@ -4766,7 +4765,7 @@ def _estimate_example_size(self, example: ls_schemas.ExampleCreate) -> int: if example.metadata: size += len(_dumps_json(example.metadata)) - # Estimate attachments (largest contributor) + # Estimate attachments if example.attachments: for _, attachment in example.attachments.items(): if isinstance(attachment, dict): @@ -4785,13 +4784,12 @@ def _estimate_example_size(self, example: ls_schemas.ExampleCreate) -> int: return size - def _batch_examples_by_size_and_count( + def _batch_examples_by_size( self, examples: list[ls_schemas.ExampleCreate], - max_batch_size_bytes: int = 20_000_000, # 20MB limit - max_batch_count: int = 200, + max_batch_size_bytes: int = 20_000_000, # 20MB limit per batch ) -> list[list[ls_schemas.ExampleCreate]]: - """Batch examples by both size and count limits.""" + """Batch examples by size limits.""" batches = [] current_batch = [] current_size = 0 @@ -4806,16 +4804,14 @@ def _batch_examples_by_size_and_count( batches.append(current_batch) current_batch = [] current_size = 0 - # Put oversized example in its own batch + # oversized example batches.append([example]) continue - # Check if adding this example would exceed limits size_exceeded = current_size + example_size > max_batch_size_bytes - count_exceeded = len(current_batch) >= max_batch_count - - # Start new batch if current batch would be too large - if current_batch and (size_exceeded or count_exceeded): + + # new batch + if current_batch and size_exceeded: batches.append(current_batch) current_batch = [example] current_size = example_size @@ -4823,7 +4819,7 @@ def _batch_examples_by_size_and_count( current_batch.append(example) current_size += example_size - # Add final batch + # final batch if current_batch: batches.append(current_batch) @@ -5088,8 +5084,8 @@ def create_examples( uploads=batch, dangerously_allow_filesystem=dangerously_allow_filesystem, ) - all_example_ids.extend(response.example_ids or []) - total_count += response.count or 0 + all_example_ids.extend(response["example_ids"] or []) + total_count += response["count"] or 0 else: # Strip attachments for legacy endpoint for upload in batch: From 627f012d86e55fb9e1e306bd1d33598e3d75f32b Mon Sep 17 00:00:00 2001 From: Xin Jin Date: Wed, 8 Oct 2025 17:57:48 -0700 Subject: [PATCH 06/11] allow concurrency <= 4 --- python/langsmith/client.py | 135 +++++++++++++----- python/tests/integration_tests/test_client.py | 117 ++++++++++++++- 2 files changed, 209 insertions(+), 43 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 943322246..adeaf35b1 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -39,6 +39,7 @@ from queue import PriorityQueue from typing import ( TYPE_CHECKING, + Annotated, Any, Callable, Literal, @@ -48,6 +49,7 @@ ) from urllib import parse as urllib_parse +from pydantic import Field import requests from requests import adapters as requests_adapters from requests_toolbelt import ( # type: ignore[import-untyped] @@ -4809,7 +4811,7 @@ def _batch_examples_by_size( continue size_exceeded = current_size + example_size > max_batch_size_bytes - + # new batch if current_batch and size_exceeded: batches.append(current_batch) @@ -4933,6 +4935,7 @@ def create_examples( dataset_id: Optional[ID_TYPE] = None, examples: Optional[Sequence[ls_schemas.ExampleCreate | dict]] = None, dangerously_allow_filesystem: bool = False, + max_concurrency: Annotated[int, Field(ge=1, le=3)] = 1, **kwargs: Any, ) -> ls_schemas.UpsertExamplesResponse | dict[str, Any]: """Create examples in a dataset. @@ -5001,6 +5004,9 @@ def create_examples( response = client.create_examples(dataset_name="agent-qa", examples=examples) # -> {"example_ids": [... """ # noqa: E501 + if not 1 <= max_concurrency <= 3: + raise ValueError("max_concurrency must be between 1 and 3") + if kwargs and examples: kwarg_keys = ", ".join([f"'{k}'" for k in kwargs]) raise ValueError( @@ -5068,48 +5074,101 @@ def create_examples( if not uploads: return ls_schemas.UpsertExamplesResponse(example_ids=[], count=0) + # Use size-aware batching to prevent payload limit errors + batches = self._batch_examples_by_size(uploads) + + # sequential upload + if len(batches) <= 1 or max_concurrency <= 1: + return self._upload_examples_batches_sequential( + batches, dataset_id, dangerously_allow_filesystem + ) + else: + return self._upload_examples_batches_parallel( + batches, dataset_id, dangerously_allow_filesystem, max_concurrency + ) + + def _upload_examples_batches_parallel( + self, batches, dataset_id, dangerously_allow_filesystem, max_concurrency + ): + all_examples_ids = [] + total_count = 0 + from langsmith.utils import ContextThreadPoolExecutor + + with ContextThreadPoolExecutor(max_workers=max_concurrency) as executor: + # submit all batch uploads to thread pool + futures = [ + executor.submit( + self._upload_single_batch, + batch, + dataset_id, + dangerously_allow_filesystem, + ) + for batch in batches + ] + # collect results as they complete + for future in cf.as_completed(futures): + response = future.result() + all_examples_ids.extend(response.get("example_ids", [])) + total_count += response.get("count", 0) + + return ls_schemas.UpsertExamplesResponse( + example_ids=all_examples_ids, count=total_count + ) + + def _upload_single_batch(self, batch, dataset_id, dangerously_allow_filesystem): + """Upload a single batch of examples (used by both sequential and parallel).""" + if (self.info.instance_flags or {}).get( + "dataset_examples_multipart_enabled", False + ): + response = self._upload_examples_multipart( + dataset_id=cast(uuid.UUID, dataset_id), + uploads=batch, # batch is a list of ExampleCreate objects + dangerously_allow_filesystem=dangerously_allow_filesystem, + ) + return { + "example_ids": response.get("example_ids", []), + "count": response.get("count", 0), + } + else: + # Strip attachments for legacy endpoint + for upload in batch: + if getattr(upload, "attachments") is not None: + upload.attachments = None + warnings.warn( + "Must upgrade your LangSmith version to use attachments." + ) + + response = self.request_with_retries( + "POST", + "/examples/bulk", + headers={**self._headers, "Content-Type": "application/json"}, + data=_dumps_json( + [ + {**dump_model(upload), "dataset_id": str(dataset_id)} + for upload in batch + ] + ), + ) + ls_utils.raise_for_status_with_text(response) + response_data = response.json() + return { + "example_ids": response_data.get("example_ids", []), + "count": response_data.get("count", 0), + } + + def _upload_examples_batches_sequential( + self, batches, dataset_id, dangerously_allow_filesystem + ): # Batch uploads by size and count for optimal performance all_example_ids = [] total_count = 0 - # Use size-aware batching to prevent payload limit errors - batches = self._batch_examples_by_size_and_count(uploads) - for batch in batches: - if (self.info.instance_flags or {}).get( - "dataset_examples_multipart_enabled", False - ): - response = self._upload_examples_multipart( - dataset_id=cast(uuid.UUID, dataset_id), - uploads=batch, - dangerously_allow_filesystem=dangerously_allow_filesystem, - ) - all_example_ids.extend(response["example_ids"] or []) - total_count += response["count"] or 0 - else: - # Strip attachments for legacy endpoint - for upload in batch: - if getattr(upload, "attachments") is not None: - upload.attachments = None - warnings.warn( - "Must upgrade your LangSmith version to use attachments." - ) - - response = self.request_with_retries( - "POST", - "/examples/bulk", - headers={**self._headers, "Content-Type": "application/json"}, - data=_dumps_json( - [ - {**dump_model(upload), "dataset_id": str(dataset_id)} - for upload in batch - ] - ), - ) - ls_utils.raise_for_status_with_text(response) - response_data = response.json() - all_example_ids.extend(response_data.get("example_ids", [])) - total_count += response_data.get("count", 0) + response = self._upload_single_batch( + batch, dataset_id, dangerously_allow_filesystem + ) + all_example_ids.extend(response.get("example_ids", [])) + total_count += response.get("count", 0) return ls_schemas.UpsertExamplesResponse( example_ids=all_example_ids, diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 514b083e7..c5a7f93a1 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -2618,8 +2618,8 @@ def test_create_examples_large_multipart_batching( dataset_name = "__test_large_multipart_" + uuid4().hex[:4] dataset = _create_dataset(parameterized_multipart_client, dataset_name) - # Create examples with large attachments to simulate >20MB payload - large_data = b"x" * 500_000 # 500KB per attachment + # Create examples with large attachments to simulate >100MB payload + large_data = b"x" * 5_000_000 # 5MB per attachment examples = [ { "inputs": {"question": f"What's in image {i}?"}, @@ -2629,15 +2629,15 @@ def test_create_examples_large_multipart_batching( f"doc_{i}": ("text/plain", large_data), }, } - for i in range(50) # ~50MB total payload + for i in range(20) # ~100MB total payload ] result = parameterized_multipart_client.create_examples( dataset_id=dataset.id, examples=examples ) - assert result["count"] == 50 - assert len(result["example_ids"]) == 50 + assert result["count"] == 20 + assert len(result["example_ids"]) == 20 # Verify attachments were uploaded first_example = parameterized_multipart_client.read_example( @@ -2649,6 +2649,113 @@ def test_create_examples_large_multipart_batching( safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id) +def test_create_examples_large_multipart_batching_parallel( + parameterized_multipart_client: Client, +) -> None: + """Test create_examples batching with large multipart payloads in parallel.""" + dataset_name = "__test_large_multipart_" + uuid4().hex[:4] + dataset = _create_dataset(parameterized_multipart_client, dataset_name) + + # Create examples with large attachments to simulate >100MB payload + large_data = b"x" * 5_000_000 # 5MB per attachment + examples = [ + { + "inputs": {"question": f"What's in image {i}?"}, + "outputs": {"answer": f"Image {i} content"}, + "attachments": { + f"image_{i}": ("image/png", large_data), + f"doc_{i}": ("text/plain", large_data), + }, + } + for i in range(20) # ~100MB total payload + ] + + result = parameterized_multipart_client.create_examples( + dataset_id=dataset.id, examples=examples, max_concurrency=3 + ) + + assert result["count"] == 20 + assert len(result["example_ids"]) == 20 + + # Verify attachments were uploaded + first_example = parameterized_multipart_client.read_example( + result["example_ids"][0] + ) + if hasattr(first_example, "attachments") and first_example.attachments: + assert len(first_example.attachments) == 2 + + safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id) + + +def test_create_examples_invalid_max_concurrency( + parameterized_multipart_client: Client, +) -> None: + """Test that invalid max_concurrency values raise errors.""" + dataset_name = "__test_invalid_concurrency_" + uuid4().hex[:4] + dataset = _create_dataset(parameterized_multipart_client, dataset_name) + examples = [{"inputs": {"q": "Q1"}, "outputs": {"a": "A1"}}] + + # Test max_concurrency < 1 + with pytest.raises(ValueError, match="max_concurrency must be between 1 and 3"): + parameterized_multipart_client.create_examples( + dataset_id=dataset.id, examples=examples, max_concurrency=0 + ) + + # Test max_concurrency > 3 + with pytest.raises(ValueError, match="max_concurrency must be between 1 and 3"): + parameterized_multipart_client.create_examples( + dataset_id=dataset.id, examples=examples, max_concurrency=4 + ) + + safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id) + + +def test_create_examples_boundary_concurrency( + parameterized_multipart_client: Client, +) -> None: + """Test max_concurrency boundary values (1 and 3).""" + dataset_name = "__test_boundary_" + uuid4().hex[:4] + dataset = _create_dataset(parameterized_multipart_client, dataset_name) + examples = [ + {"inputs": {"q": f"Q{i}"}, "outputs": {"a": f"A{i}"}} for i in range(50) + ] + + # Test min value (sequential) + result1 = parameterized_multipart_client.create_examples( + dataset_id=dataset.id, examples=examples, max_concurrency=1 + ) + assert result1["count"] == 50 + assert len(result1["example_ids"]) == 50 + + # Test max value (max parallelism) + examples2 = [ + {"inputs": {"q": f"Q{i}_2"}, "outputs": {"a": f"A{i}_2"}} for i in range(50) + ] + result2 = parameterized_multipart_client.create_examples( + dataset_id=dataset.id, examples=examples2, max_concurrency=3 + ) + assert result2["count"] == 50 + assert len(result2["example_ids"]) == 50 + + # Verify all examples exist + listed = list(parameterized_multipart_client.list_examples(dataset_id=dataset.id)) + assert len(listed) == 100 + + safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id) + + +def test_create_examples_empty_list(parameterized_multipart_client: Client) -> None: + """Test create_examples with empty list.""" + dataset_name = "__test_empty_" + uuid4().hex[:4] + dataset = _create_dataset(parameterized_multipart_client, dataset_name) + + # Test max_concurrency > 3 + with pytest.raises(ValueError, match="Must specify either 'examples' or 'inputs.'"): + parameterized_multipart_client.create_examples( + dataset_id=dataset.id, examples=[] + ) + + @pytest.mark.xfail(reason="Need to wait for backend changes to go endpoint") def test_use_source_run_io_multiple_examples(langchain_client: Client) -> None: dataset_name = "__test_use_source_run_io" + uuid4().hex[:4] From a0106fcbeb6a44194e237cdbdc887f01ad9e37e4 Mon Sep 17 00:00:00 2001 From: Xin Jin Date: Wed, 8 Oct 2025 21:56:46 -0700 Subject: [PATCH 07/11] linter --- python/langsmith/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index adeaf35b1..93cd74dd5 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -49,8 +49,8 @@ ) from urllib import parse as urllib_parse -from pydantic import Field import requests +from pydantic import Field from requests import adapters as requests_adapters from requests_toolbelt import ( # type: ignore[import-untyped] multipart as rqtb_multipart, From 5b9e2c08c0b27fc50e7996da51d5f03c54e9aa0b Mon Sep 17 00:00:00 2001 From: Xin Jin Date: Thu, 9 Oct 2025 12:09:17 -0700 Subject: [PATCH 08/11] fix lint --- python/langsmith/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 93cd74dd5..db6b07c4b 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -4793,7 +4793,7 @@ def _batch_examples_by_size( ) -> list[list[ls_schemas.ExampleCreate]]: """Batch examples by size limits.""" batches = [] - current_batch = [] + current_batch: list[ls_schemas.ExampleCreate] = [] current_size = 0 for example in examples: @@ -5939,7 +5939,7 @@ def delete_examples( hard_delete : bool, optional Whether to permanently delete the examples. Default is False. """ - params = { + params: dict[str, Any] = { "example_ids": [ str(_as_uuid(id_, f"example_ids[{i}]")) for i, id_ in enumerate(example_ids) From 5c1ca9c8d5d0c172691496d378bbc08cc5a8fe2d Mon Sep 17 00:00:00 2001 From: Xin Jin Date: Thu, 9 Oct 2025 13:43:45 -0700 Subject: [PATCH 09/11] fix mypy --- python/langsmith/wrappers/_openai.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/langsmith/wrappers/_openai.py b/python/langsmith/wrappers/_openai.py index 15b1e7a74..4e77ce929 100644 --- a/python/langsmith/wrappers/_openai.py +++ b/python/langsmith/wrappers/_openai.py @@ -37,7 +37,7 @@ @functools.lru_cache def _get_omit_types() -> tuple[type, ...]: """Get NotGiven/Omit sentinel types used by OpenAI SDK.""" - types = [] + types: list[type] = [] try: from openai._types import NotGiven, Omit From 7e888a9d8bbfa937cb7a07527ecf6c2f1456d058 Mon Sep 17 00:00:00 2001 From: Xin Jin Date: Thu, 9 Oct 2025 14:22:14 -0700 Subject: [PATCH 10/11] nit --- python/langsmith/client.py | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index db6b07c4b..65f433b56 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -5927,32 +5927,24 @@ def delete_example(self, example_id: ID_TYPE) -> None: ) ls_utils.raise_for_status_with_text(response) - def delete_examples( - self, example_ids: Sequence[ID_TYPE], hard_delete: bool = False - ) -> None: + def delete_examples(self, example_ids: Sequence[ID_TYPE]) -> None: """Delete multiple examples by ID. Parameters ---------- example_ids : Sequence[ID_TYPE] The IDs of the examples to delete. - hard_delete : bool, optional - Whether to permanently delete the examples. Default is False. """ - params: dict[str, Any] = { - "example_ids": [ - str(_as_uuid(id_, f"example_ids[{i}]")) - for i, id_ in enumerate(example_ids) - ] - } - if hard_delete: - params["hard_delete"] = hard_delete - response = self.request_with_retries( "DELETE", - "/api/v1/examples", - headers=self._headers, - params=params, + "/examples", + headers={**self._headers, "Content-Type": "application/json"}, + params={ + "example_ids": [ + str(_as_uuid(id_, f"example_ids[{i}]")) + for i, id_ in enumerate(example_ids) + ] + }, ) ls_utils.raise_for_status_with_text(response) From be3a0b3ccfd98d085b84c7b4e978d0b14f9f540f Mon Sep 17 00:00:00 2001 From: Xin Jin Date: Thu, 9 Oct 2025 16:06:04 -0700 Subject: [PATCH 11/11] address comments --- python/langsmith/client.py | 31 +++---------------------------- 1 file changed, 3 insertions(+), 28 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 65f433b56..146ffc4b5 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -5077,15 +5077,9 @@ def create_examples( # Use size-aware batching to prevent payload limit errors batches = self._batch_examples_by_size(uploads) - # sequential upload - if len(batches) <= 1 or max_concurrency <= 1: - return self._upload_examples_batches_sequential( - batches, dataset_id, dangerously_allow_filesystem - ) - else: - return self._upload_examples_batches_parallel( - batches, dataset_id, dangerously_allow_filesystem, max_concurrency - ) + return self._upload_examples_batches_parallel( + batches, dataset_id, dangerously_allow_filesystem, max_concurrency + ) def _upload_examples_batches_parallel( self, batches, dataset_id, dangerously_allow_filesystem, max_concurrency @@ -5156,25 +5150,6 @@ def _upload_single_batch(self, batch, dataset_id, dangerously_allow_filesystem): "count": response_data.get("count", 0), } - def _upload_examples_batches_sequential( - self, batches, dataset_id, dangerously_allow_filesystem - ): - # Batch uploads by size and count for optimal performance - all_example_ids = [] - total_count = 0 - - for batch in batches: - response = self._upload_single_batch( - batch, dataset_id, dangerously_allow_filesystem - ) - all_example_ids.extend(response.get("example_ids", [])) - total_count += response.get("count", 0) - - return ls_schemas.UpsertExamplesResponse( - example_ids=all_example_ids, - count=total_count, - ) - @ls_utils.xor_args(("dataset_id", "dataset_name")) def create_example( self,