Skip to content
7 changes: 6 additions & 1 deletion .mlc_config.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
{
"aliveStatusCodes": [429, 200]
"aliveStatusCodes": [429, 200],
"ignorePatterns": [
{
"pattern": "^https://www\\.npmjs\\.com/"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to fix ci/cd failure that complains langsmith npm link returns 403, not our fault

}
]
}
138 changes: 132 additions & 6 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from queue import PriorityQueue
from typing import (
TYPE_CHECKING,
Annotated,
Any,
Callable,
Literal,
Expand All @@ -49,6 +50,7 @@
from urllib import parse as urllib_parse

import requests
from pydantic import Field
from requests import adapters as requests_adapters
from requests_toolbelt import ( # type: ignore[import-untyped]
multipart as rqtb_multipart,
Expand Down Expand Up @@ -4754,6 +4756,77 @@ def upload_examples_multipart(
dangerously_allow_filesystem=dangerously_allow_filesystem,
)

def _estimate_example_size(self, example: ls_schemas.ExampleCreate) -> int:
"""Estimate the size of an example in bytes for batching purposes."""
size = 1000 # Base overhead for JSON structure and boundaries

if example.inputs:
size += len(_dumps_json(example.inputs))
if example.outputs:
size += len(_dumps_json(example.outputs))
if example.metadata:
size += len(_dumps_json(example.metadata))

# Estimate attachments
if example.attachments:
for _, attachment in example.attachments.items():
if isinstance(attachment, dict):
attachment_data = attachment["data"]
else:
_, attachment_data = attachment

if isinstance(attachment_data, Path):
try:
size += os.path.getsize(attachment_data)
except (FileNotFoundError, OSError):
size += 1_000_000 # 1MB fallback estimate
else:
size += len(attachment_data)
size += 200 # Multipart headers overhead per attachment

return size

def _batch_examples_by_size(
self,
examples: list[ls_schemas.ExampleCreate],
max_batch_size_bytes: int = 20_000_000, # 20MB limit per batch
) -> list[list[ls_schemas.ExampleCreate]]:
"""Batch examples by size limits."""
batches = []
current_batch: list[ls_schemas.ExampleCreate] = []
current_size = 0

for example in examples:
example_size = self._estimate_example_size(example)

# Handle oversized single examples
if example_size > max_batch_size_bytes:
# Flush current batch first
if current_batch:
batches.append(current_batch)
current_batch = []
current_size = 0
# oversized example
batches.append([example])
continue

size_exceeded = current_size + example_size > max_batch_size_bytes

# new batch
if current_batch and size_exceeded:
batches.append(current_batch)
current_batch = [example]
current_size = example_size
else:
current_batch.append(example)
current_size += example_size

# final batch
if current_batch:
batches.append(current_batch)

return batches

def _upload_examples_multipart(
self,
*,
Expand Down Expand Up @@ -4862,6 +4935,7 @@ def create_examples(
dataset_id: Optional[ID_TYPE] = None,
examples: Optional[Sequence[ls_schemas.ExampleCreate | dict]] = None,
dangerously_allow_filesystem: bool = False,
max_concurrency: Annotated[int, Field(ge=1, le=3)] = 1,
**kwargs: Any,
) -> ls_schemas.UpsertExamplesResponse | dict[str, Any]:
"""Create examples in a dataset.
Expand Down Expand Up @@ -4930,6 +5004,9 @@ def create_examples(
response = client.create_examples(dataset_name="agent-qa", examples=examples)
# -> {"example_ids": [...
""" # noqa: E501
if not 1 <= max_concurrency <= 3:
Copy link
Contributor Author

@EugeneJinXin EugeneJinXin Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think @validate_call pydantic runtime is ugly, so i prefer this since it works as simple and effective runtime check

raise ValueError("max_concurrency must be between 1 and 3")

if kwargs and examples:
kwarg_keys = ", ".join([f"'{k}'" for k in kwargs])
raise ValueError(
Expand Down Expand Up @@ -4994,35 +5071,84 @@ def create_examples(
)
]

if not uploads:
return ls_schemas.UpsertExamplesResponse(example_ids=[], count=0)

# Use size-aware batching to prevent payload limit errors
batches = self._batch_examples_by_size(uploads)

return self._upload_examples_batches_parallel(
batches, dataset_id, dangerously_allow_filesystem, max_concurrency
)

def _upload_examples_batches_parallel(
self, batches, dataset_id, dangerously_allow_filesystem, max_concurrency
):
all_examples_ids = []
total_count = 0
from langsmith.utils import ContextThreadPoolExecutor

with ContextThreadPoolExecutor(max_workers=max_concurrency) as executor:
# submit all batch uploads to thread pool
futures = [
executor.submit(
self._upload_single_batch,
batch,
dataset_id,
dangerously_allow_filesystem,
)
for batch in batches
]
# collect results as they complete
for future in cf.as_completed(futures):
response = future.result()
all_examples_ids.extend(response.get("example_ids", []))
total_count += response.get("count", 0)

return ls_schemas.UpsertExamplesResponse(
example_ids=all_examples_ids, count=total_count
)

def _upload_single_batch(self, batch, dataset_id, dangerously_allow_filesystem):
"""Upload a single batch of examples (used by both sequential and parallel)."""
if (self.info.instance_flags or {}).get(
"dataset_examples_multipart_enabled", False
):
return self._upload_examples_multipart(
response = self._upload_examples_multipart(
dataset_id=cast(uuid.UUID, dataset_id),
uploads=uploads,
uploads=batch, # batch is a list of ExampleCreate objects
dangerously_allow_filesystem=dangerously_allow_filesystem,
)
return {
"example_ids": response.get("example_ids", []),
"count": response.get("count", 0),
}
else:
for upload in uploads:
# Strip attachments for legacy endpoint
for upload in batch:
if getattr(upload, "attachments") is not None:
upload.attachments = None
warnings.warn(
"Must upgrade your LangSmith version to use attachments."
)
# fallback to old method

response = self.request_with_retries(
"POST",
"/examples/bulk",
headers={**self._headers, "Content-Type": "application/json"},
data=_dumps_json(
[
{**dump_model(upload), "dataset_id": str(dataset_id)}
for upload in uploads
for upload in batch
]
),
)
ls_utils.raise_for_status_with_text(response)
return response.json()
response_data = response.json()
return {
"example_ids": response_data.get("example_ids", []),
"count": response_data.get("count", 0),
}

@ls_utils.xor_args(("dataset_id", "dataset_name"))
def create_example(
Expand Down
2 changes: 1 addition & 1 deletion python/langsmith/wrappers/_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
@functools.lru_cache
def _get_omit_types() -> tuple[type, ...]:
"""Get NotGiven/Omit sentinel types used by OpenAI SDK."""
types = []
types: list[type] = []
try:
from openai._types import NotGiven, Omit

Expand Down
169 changes: 169 additions & 0 deletions python/tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2587,6 +2587,175 @@ def test_create_examples_errors(langchain_client: Client) -> None:
safe_delete_dataset(langchain_client, dataset_id=dataset.id)


def test_create_examples_batching(parameterized_multipart_client: Client) -> None:
"""Test create_examples batching with large numbers of examples."""
dataset_name = "__test_batching_" + uuid4().hex[:4]
dataset = _create_dataset(parameterized_multipart_client, dataset_name)

# Test batching with 250 examples (> default batch_size=200)
examples = [
{"inputs": {"q": f"Q{i}"}, "outputs": {"a": f"A{i}"}} for i in range(250)
]

result = parameterized_multipart_client.create_examples(
dataset_id=dataset.id, examples=examples
)

assert result["count"] == 250
assert len(result["example_ids"]) == 250

# Verify examples exist
listed = list(parameterized_multipart_client.list_examples(dataset_id=dataset.id))
assert len(listed) == 250

safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id)


def test_create_examples_large_multipart_batching(
parameterized_multipart_client: Client,
) -> None:
"""Test create_examples batching with large multipart payloads."""
dataset_name = "__test_large_multipart_" + uuid4().hex[:4]
dataset = _create_dataset(parameterized_multipart_client, dataset_name)

# Create examples with large attachments to simulate >100MB payload
large_data = b"x" * 5_000_000 # 5MB per attachment
examples = [
{
"inputs": {"question": f"What's in image {i}?"},
"outputs": {"answer": f"Image {i} content"},
"attachments": {
f"image_{i}": ("image/png", large_data),
f"doc_{i}": ("text/plain", large_data),
},
}
for i in range(20) # ~100MB total payload
]

result = parameterized_multipart_client.create_examples(
dataset_id=dataset.id, examples=examples
)

assert result["count"] == 20
assert len(result["example_ids"]) == 20

# Verify attachments were uploaded
first_example = parameterized_multipart_client.read_example(
result["example_ids"][0]
)
if hasattr(first_example, "attachments") and first_example.attachments:
assert len(first_example.attachments) == 2

safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id)


def test_create_examples_large_multipart_batching_parallel(
parameterized_multipart_client: Client,
) -> None:
"""Test create_examples batching with large multipart payloads in parallel."""
dataset_name = "__test_large_multipart_" + uuid4().hex[:4]
dataset = _create_dataset(parameterized_multipart_client, dataset_name)

# Create examples with large attachments to simulate >100MB payload
large_data = b"x" * 5_000_000 # 5MB per attachment
examples = [
{
"inputs": {"question": f"What's in image {i}?"},
"outputs": {"answer": f"Image {i} content"},
"attachments": {
f"image_{i}": ("image/png", large_data),
f"doc_{i}": ("text/plain", large_data),
},
}
for i in range(20) # ~100MB total payload
]

result = parameterized_multipart_client.create_examples(
dataset_id=dataset.id, examples=examples, max_concurrency=3
)

assert result["count"] == 20
assert len(result["example_ids"]) == 20

# Verify attachments were uploaded
first_example = parameterized_multipart_client.read_example(
result["example_ids"][0]
)
if hasattr(first_example, "attachments") and first_example.attachments:
assert len(first_example.attachments) == 2

safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id)


def test_create_examples_invalid_max_concurrency(
parameterized_multipart_client: Client,
) -> None:
"""Test that invalid max_concurrency values raise errors."""
dataset_name = "__test_invalid_concurrency_" + uuid4().hex[:4]
dataset = _create_dataset(parameterized_multipart_client, dataset_name)
examples = [{"inputs": {"q": "Q1"}, "outputs": {"a": "A1"}}]

# Test max_concurrency < 1
with pytest.raises(ValueError, match="max_concurrency must be between 1 and 3"):
parameterized_multipart_client.create_examples(
dataset_id=dataset.id, examples=examples, max_concurrency=0
)

# Test max_concurrency > 3
with pytest.raises(ValueError, match="max_concurrency must be between 1 and 3"):
parameterized_multipart_client.create_examples(
dataset_id=dataset.id, examples=examples, max_concurrency=4
)

safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id)


def test_create_examples_boundary_concurrency(
parameterized_multipart_client: Client,
) -> None:
"""Test max_concurrency boundary values (1 and 3)."""
dataset_name = "__test_boundary_" + uuid4().hex[:4]
dataset = _create_dataset(parameterized_multipart_client, dataset_name)
examples = [
{"inputs": {"q": f"Q{i}"}, "outputs": {"a": f"A{i}"}} for i in range(50)
]

# Test min value (sequential)
result1 = parameterized_multipart_client.create_examples(
dataset_id=dataset.id, examples=examples, max_concurrency=1
)
assert result1["count"] == 50
assert len(result1["example_ids"]) == 50

# Test max value (max parallelism)
examples2 = [
{"inputs": {"q": f"Q{i}_2"}, "outputs": {"a": f"A{i}_2"}} for i in range(50)
]
result2 = parameterized_multipart_client.create_examples(
dataset_id=dataset.id, examples=examples2, max_concurrency=3
)
assert result2["count"] == 50
assert len(result2["example_ids"]) == 50

# Verify all examples exist
listed = list(parameterized_multipart_client.list_examples(dataset_id=dataset.id))
assert len(listed) == 100

safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id)


def test_create_examples_empty_list(parameterized_multipart_client: Client) -> None:
"""Test create_examples with empty list."""
dataset_name = "__test_empty_" + uuid4().hex[:4]
dataset = _create_dataset(parameterized_multipart_client, dataset_name)

# Test max_concurrency > 3
with pytest.raises(ValueError, match="Must specify either 'examples' or 'inputs.'"):
parameterized_multipart_client.create_examples(
dataset_id=dataset.id, examples=[]
)


@pytest.mark.xfail(reason="Need to wait for backend changes to go endpoint")
def test_use_source_run_io_multiple_examples(langchain_client: Client) -> None:
dataset_name = "__test_use_source_run_io" + uuid4().hex[:4]
Expand Down
Loading