-
Notifications
You must be signed in to change notification settings - Fork 164
feat: add batching to create_examples #2047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 13 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
3fd732b
Add batching to create_examples
EugeneJinXin a55b706
lint
EugeneJinXin 2955f31
size based batch
EugeneJinXin 7a9a729
fix node/langsmith 403 ci/cd error
EugeneJinXin 34b020e
Merge branch 'main' into batch-upload-examples
EugeneJinXin de99bd1
batch by size only
EugeneJinXin 627f012
allow concurrency <= 4
EugeneJinXin 1beae0a
Merge branch 'main' into batch-upload-examples
EugeneJinXin a0106fc
linter
EugeneJinXin 5b9e2c0
fix lint
EugeneJinXin 5c1ca9c
fix mypy
EugeneJinXin 7e888a9
nit
EugeneJinXin be3a0b3
address comments
EugeneJinXin a9f897c
Merge branch 'main' into batch-upload-examples
EugeneJinXin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,8 @@ | ||
| { | ||
| "aliveStatusCodes": [429, 200] | ||
| "aliveStatusCodes": [429, 200], | ||
| "ignorePatterns": [ | ||
| { | ||
| "pattern": "^https://www\\.npmjs\\.com/" | ||
| } | ||
| ] | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,7 @@ | |
| from queue import PriorityQueue | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Annotated, | ||
| Any, | ||
| Callable, | ||
| Literal, | ||
|
|
@@ -49,6 +50,7 @@ | |
| from urllib import parse as urllib_parse | ||
|
|
||
| import requests | ||
| from pydantic import Field | ||
| from requests import adapters as requests_adapters | ||
| from requests_toolbelt import ( # type: ignore[import-untyped] | ||
| multipart as rqtb_multipart, | ||
|
|
@@ -4754,6 +4756,77 @@ def upload_examples_multipart( | |
| dangerously_allow_filesystem=dangerously_allow_filesystem, | ||
| ) | ||
|
|
||
| def _estimate_example_size(self, example: ls_schemas.ExampleCreate) -> int: | ||
| """Estimate the size of an example in bytes for batching purposes.""" | ||
| size = 1000 # Base overhead for JSON structure and boundaries | ||
|
|
||
| if example.inputs: | ||
| size += len(_dumps_json(example.inputs)) | ||
| if example.outputs: | ||
| size += len(_dumps_json(example.outputs)) | ||
| if example.metadata: | ||
| size += len(_dumps_json(example.metadata)) | ||
|
|
||
| # Estimate attachments | ||
| if example.attachments: | ||
| for _, attachment in example.attachments.items(): | ||
| if isinstance(attachment, dict): | ||
| attachment_data = attachment["data"] | ||
| else: | ||
| _, attachment_data = attachment | ||
|
|
||
| if isinstance(attachment_data, Path): | ||
| try: | ||
| size += os.path.getsize(attachment_data) | ||
| except (FileNotFoundError, OSError): | ||
| size += 1_000_000 # 1MB fallback estimate | ||
| else: | ||
| size += len(attachment_data) | ||
| size += 200 # Multipart headers overhead per attachment | ||
|
|
||
| return size | ||
|
|
||
| def _batch_examples_by_size( | ||
| self, | ||
| examples: list[ls_schemas.ExampleCreate], | ||
| max_batch_size_bytes: int = 20_000_000, # 20MB limit per batch | ||
| ) -> list[list[ls_schemas.ExampleCreate]]: | ||
| """Batch examples by size limits.""" | ||
| batches = [] | ||
| current_batch: list[ls_schemas.ExampleCreate] = [] | ||
| current_size = 0 | ||
|
|
||
| for example in examples: | ||
| example_size = self._estimate_example_size(example) | ||
|
|
||
| # Handle oversized single examples | ||
| if example_size > max_batch_size_bytes: | ||
| # Flush current batch first | ||
| if current_batch: | ||
| batches.append(current_batch) | ||
| current_batch = [] | ||
| current_size = 0 | ||
| # oversized example | ||
| batches.append([example]) | ||
| continue | ||
|
|
||
| size_exceeded = current_size + example_size > max_batch_size_bytes | ||
|
|
||
| # new batch | ||
| if current_batch and size_exceeded: | ||
| batches.append(current_batch) | ||
| current_batch = [example] | ||
| current_size = example_size | ||
| else: | ||
| current_batch.append(example) | ||
| current_size += example_size | ||
|
|
||
| # final batch | ||
| if current_batch: | ||
| batches.append(current_batch) | ||
|
|
||
| return batches | ||
|
|
||
| def _upload_examples_multipart( | ||
| self, | ||
| *, | ||
|
|
@@ -4862,6 +4935,7 @@ def create_examples( | |
| dataset_id: Optional[ID_TYPE] = None, | ||
| examples: Optional[Sequence[ls_schemas.ExampleCreate | dict]] = None, | ||
| dangerously_allow_filesystem: bool = False, | ||
| max_concurrency: Annotated[int, Field(ge=1, le=3)] = 1, | ||
| **kwargs: Any, | ||
| ) -> ls_schemas.UpsertExamplesResponse | dict[str, Any]: | ||
| """Create examples in a dataset. | ||
|
|
@@ -4930,6 +5004,9 @@ def create_examples( | |
| response = client.create_examples(dataset_name="agent-qa", examples=examples) | ||
| # -> {"example_ids": [... | ||
| """ # noqa: E501 | ||
| if not 1 <= max_concurrency <= 3: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. think @validate_call pydantic runtime is ugly, so i prefer this since it works as simple and effective runtime check |
||
| raise ValueError("max_concurrency must be between 1 and 3") | ||
|
|
||
| if kwargs and examples: | ||
| kwarg_keys = ", ".join([f"'{k}'" for k in kwargs]) | ||
| raise ValueError( | ||
|
|
@@ -4994,35 +5071,84 @@ def create_examples( | |
| ) | ||
| ] | ||
|
|
||
| if not uploads: | ||
| return ls_schemas.UpsertExamplesResponse(example_ids=[], count=0) | ||
|
|
||
| # Use size-aware batching to prevent payload limit errors | ||
| batches = self._batch_examples_by_size(uploads) | ||
|
|
||
| return self._upload_examples_batches_parallel( | ||
| batches, dataset_id, dangerously_allow_filesystem, max_concurrency | ||
| ) | ||
|
|
||
| def _upload_examples_batches_parallel( | ||
| self, batches, dataset_id, dangerously_allow_filesystem, max_concurrency | ||
| ): | ||
| all_examples_ids = [] | ||
| total_count = 0 | ||
| from langsmith.utils import ContextThreadPoolExecutor | ||
|
|
||
| with ContextThreadPoolExecutor(max_workers=max_concurrency) as executor: | ||
| # submit all batch uploads to thread pool | ||
| futures = [ | ||
| executor.submit( | ||
| self._upload_single_batch, | ||
| batch, | ||
| dataset_id, | ||
| dangerously_allow_filesystem, | ||
| ) | ||
| for batch in batches | ||
| ] | ||
| # collect results as they complete | ||
| for future in cf.as_completed(futures): | ||
| response = future.result() | ||
| all_examples_ids.extend(response.get("example_ids", [])) | ||
| total_count += response.get("count", 0) | ||
|
|
||
| return ls_schemas.UpsertExamplesResponse( | ||
| example_ids=all_examples_ids, count=total_count | ||
| ) | ||
|
|
||
| def _upload_single_batch(self, batch, dataset_id, dangerously_allow_filesystem): | ||
| """Upload a single batch of examples (used by both sequential and parallel).""" | ||
| if (self.info.instance_flags or {}).get( | ||
| "dataset_examples_multipart_enabled", False | ||
| ): | ||
| return self._upload_examples_multipart( | ||
| response = self._upload_examples_multipart( | ||
| dataset_id=cast(uuid.UUID, dataset_id), | ||
| uploads=uploads, | ||
| uploads=batch, # batch is a list of ExampleCreate objects | ||
| dangerously_allow_filesystem=dangerously_allow_filesystem, | ||
| ) | ||
| return { | ||
| "example_ids": response.get("example_ids", []), | ||
| "count": response.get("count", 0), | ||
| } | ||
| else: | ||
| for upload in uploads: | ||
| # Strip attachments for legacy endpoint | ||
| for upload in batch: | ||
| if getattr(upload, "attachments") is not None: | ||
| upload.attachments = None | ||
| warnings.warn( | ||
| "Must upgrade your LangSmith version to use attachments." | ||
| ) | ||
| # fallback to old method | ||
|
|
||
| response = self.request_with_retries( | ||
| "POST", | ||
| "/examples/bulk", | ||
| headers={**self._headers, "Content-Type": "application/json"}, | ||
| data=_dumps_json( | ||
| [ | ||
| {**dump_model(upload), "dataset_id": str(dataset_id)} | ||
| for upload in uploads | ||
| for upload in batch | ||
| ] | ||
| ), | ||
| ) | ||
| ls_utils.raise_for_status_with_text(response) | ||
| return response.json() | ||
| response_data = response.json() | ||
| return { | ||
| "example_ids": response_data.get("example_ids", []), | ||
| "count": response_data.get("count", 0), | ||
| } | ||
|
|
||
| @ls_utils.xor_args(("dataset_id", "dataset_name")) | ||
| def create_example( | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to fix ci/cd failure that complains langsmith npm link returns 403, not our fault