Skip to content
7 changes: 6 additions & 1 deletion .mlc_config.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
{
"aliveStatusCodes": [429, 200]
"aliveStatusCodes": [429, 200],
"ignorePatterns": [
{
"pattern": "^https://www\\.npmjs\\.com/"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to fix ci/cd failure that complains langsmith npm link returns 403, not our fault

}
]
}
180 changes: 142 additions & 38 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4754,6 +4754,81 @@ def upload_examples_multipart(
dangerously_allow_filesystem=dangerously_allow_filesystem,
)

def _estimate_example_size(self, example: ls_schemas.ExampleCreate) -> int:
"""Estimate the size of an example in bytes for batching purposes."""
size = 1000 # Base overhead for JSON structure and boundaries

# Estimate JSON parts
if example.inputs:
size += len(_dumps_json(example.inputs))
if example.outputs:
size += len(_dumps_json(example.outputs))
if example.metadata:
size += len(_dumps_json(example.metadata))

# Estimate attachments (largest contributor)
if example.attachments:
for _, attachment in example.attachments.items():
if isinstance(attachment, dict):
attachment_data = attachment["data"]
else:
_, attachment_data = attachment

if isinstance(attachment_data, Path):
try:
size += os.path.getsize(attachment_data)
except (FileNotFoundError, OSError):
size += 1_000_000 # 1MB fallback estimate
else:
size += len(attachment_data)
size += 200 # Multipart headers overhead per attachment

return size

def _batch_examples_by_size_and_count(
self,
examples: list[ls_schemas.ExampleCreate],
max_batch_size_bytes: int = 20_000_000, # 20MB limit
max_batch_count: int = 200,
) -> list[list[ls_schemas.ExampleCreate]]:
"""Batch examples by both size and count limits."""
batches = []
current_batch = []
current_size = 0

for example in examples:
example_size = self._estimate_example_size(example)

# Handle oversized single examples
if example_size > max_batch_size_bytes:
# Flush current batch first
if current_batch:
batches.append(current_batch)
current_batch = []
current_size = 0
# Put oversized example in its own batch
batches.append([example])
continue

# Check if adding this example would exceed limits
size_exceeded = current_size + example_size > max_batch_size_bytes
count_exceeded = len(current_batch) >= max_batch_count

# Start new batch if current batch would be too large
if current_batch and (size_exceeded or count_exceeded):
batches.append(current_batch)
current_batch = [example]
current_size = example_size
else:
current_batch.append(example)
current_size += example_size

# Add final batch
if current_batch:
batches.append(current_batch)

return batches

def _upload_examples_multipart(
self,
*,
Expand Down Expand Up @@ -4994,35 +5069,56 @@ def create_examples(
)
]

if (self.info.instance_flags or {}).get(
"dataset_examples_multipart_enabled", False
):
return self._upload_examples_multipart(
dataset_id=cast(uuid.UUID, dataset_id),
uploads=uploads,
dangerously_allow_filesystem=dangerously_allow_filesystem,
)
else:
for upload in uploads:
if getattr(upload, "attachments") is not None:
upload.attachments = None
warnings.warn(
"Must upgrade your LangSmith version to use attachments."
)
# fallback to old method
response = self.request_with_retries(
"POST",
"/examples/bulk",
headers={**self._headers, "Content-Type": "application/json"},
data=_dumps_json(
[
{**dump_model(upload), "dataset_id": str(dataset_id)}
for upload in uploads
]
),
)
ls_utils.raise_for_status_with_text(response)
return response.json()
if not uploads:
return ls_schemas.UpsertExamplesResponse(example_ids=[], count=0)

# Batch uploads by size and count for optimal performance
all_example_ids = []
total_count = 0

# Use size-aware batching to prevent payload limit errors
batches = self._batch_examples_by_size_and_count(uploads)

for batch in batches:
if (self.info.instance_flags or {}).get(
"dataset_examples_multipart_enabled", False
):
response = self._upload_examples_multipart(
dataset_id=cast(uuid.UUID, dataset_id),
uploads=batch,
dangerously_allow_filesystem=dangerously_allow_filesystem,
)
all_example_ids.extend(response.example_ids or [])
total_count += response.count or 0
else:
# Strip attachments for legacy endpoint
for upload in batch:
if getattr(upload, "attachments") is not None:
upload.attachments = None
warnings.warn(
"Must upgrade your LangSmith version to use attachments."
)

response = self.request_with_retries(
"POST",
"/examples/bulk",
headers={**self._headers, "Content-Type": "application/json"},
data=_dumps_json(
[
{**dump_model(upload), "dataset_id": str(dataset_id)}
for upload in batch
]
),
)
ls_utils.raise_for_status_with_text(response)
response_data = response.json()
all_example_ids.extend(response_data.get("example_ids", []))
total_count += response_data.get("count", 0)

return ls_schemas.UpsertExamplesResponse(
example_ids=all_example_ids,
count=total_count,
)

@ls_utils.xor_args(("dataset_id", "dataset_name"))
def create_example(
Expand Down Expand Up @@ -5776,24 +5872,32 @@ def delete_example(self, example_id: ID_TYPE) -> None:
)
ls_utils.raise_for_status_with_text(response)

def delete_examples(self, example_ids: Sequence[ID_TYPE]) -> None:
def delete_examples(
self, example_ids: Sequence[ID_TYPE], hard_delete: bool = False
) -> None:
"""Delete multiple examples by ID.

Parameters
----------
example_ids : Sequence[ID_TYPE]
The IDs of the examples to delete.
hard_delete : bool, optional
Whether to permanently delete the examples. Default is False.
"""
params = {
"example_ids": [
str(_as_uuid(id_, f"example_ids[{i}]"))
for i, id_ in enumerate(example_ids)
]
}
if hard_delete:
params["hard_delete"] = hard_delete

response = self.request_with_retries(
"DELETE",
"/examples",
headers={**self._headers, "Content-Type": "application/json"},
params={
"example_ids": [
str(_as_uuid(id_, f"example_ids[{i}]"))
for i, id_ in enumerate(example_ids)
]
},
"/api/v1/examples",
headers=self._headers,
params=params,
)
ls_utils.raise_for_status_with_text(response)

Expand Down
62 changes: 62 additions & 0 deletions python/tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2587,6 +2587,68 @@ def test_create_examples_errors(langchain_client: Client) -> None:
safe_delete_dataset(langchain_client, dataset_id=dataset.id)


def test_create_examples_batching(parameterized_multipart_client: Client) -> None:
"""Test create_examples batching with large numbers of examples."""
dataset_name = "__test_batching_" + uuid4().hex[:4]
dataset = _create_dataset(parameterized_multipart_client, dataset_name)

# Test batching with 250 examples (> default batch_size=200)
examples = [
{"inputs": {"q": f"Q{i}"}, "outputs": {"a": f"A{i}"}} for i in range(250)
]

result = parameterized_multipart_client.create_examples(
dataset_id=dataset.id, examples=examples
)

assert result["count"] == 250
assert len(result["example_ids"]) == 250

# Verify examples exist
listed = list(parameterized_multipart_client.list_examples(dataset_id=dataset.id))
assert len(listed) == 250

safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id)


def test_create_examples_large_multipart_batching(
parameterized_multipart_client: Client,
) -> None:
"""Test create_examples batching with large multipart payloads."""
dataset_name = "__test_large_multipart_" + uuid4().hex[:4]
dataset = _create_dataset(parameterized_multipart_client, dataset_name)

# Create examples with large attachments to simulate >20MB payload
large_data = b"x" * 500_000 # 500KB per attachment
examples = [
{
"inputs": {"question": f"What's in image {i}?"},
"outputs": {"answer": f"Image {i} content"},
"attachments": {
f"image_{i}": ("image/png", large_data),
f"doc_{i}": ("text/plain", large_data),
},
}
for i in range(50) # ~50MB total payload
]

result = parameterized_multipart_client.create_examples(
dataset_id=dataset.id, examples=examples
)

assert result["count"] == 50
assert len(result["example_ids"]) == 50

# Verify attachments were uploaded
first_example = parameterized_multipart_client.read_example(
result["example_ids"][0]
)
if hasattr(first_example, "attachments") and first_example.attachments:
assert len(first_example.attachments) == 2

safe_delete_dataset(parameterized_multipart_client, dataset_id=dataset.id)


@pytest.mark.xfail(reason="Need to wait for backend changes to go endpoint")
def test_use_source_run_io_multiple_examples(langchain_client: Client) -> None:
dataset_name = "__test_use_source_run_io" + uuid4().hex[:4]
Expand Down
Loading