Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1795,7 +1795,9 @@ async def aspirate(
immersion_depth_direction = immersion_depth_direction or [
0 if (id_ >= 0) else 1 for id_ in immersion_depth
]
immersion_depth = [im * (-1 if immersion_depth_direction[i] else 1) for i, im in enumerate(immersion_depth)]
immersion_depth = [
im * (-1 if immersion_depth_direction[i] else 1) for i, im in enumerate(immersion_depth)
]
surface_following_distance = _fill_in_defaults(surface_following_distance, [0.0] * n)
flow_rates = [
op.flow_rate or (hlc.aspiration_flow_rate if hlc is not None else 100.0)
Expand Down Expand Up @@ -2101,7 +2103,9 @@ async def dispense(
immersion_depth_direction = immersion_depth_direction or [
0 if (id_ >= 0) else 1 for id_ in immersion_depth
]
immersion_depth = [im * (-1 if immersion_depth_direction[i] else 1) for i, im in enumerate(immersion_depth)]
immersion_depth = [
im * (-1 if immersion_depth_direction[i] else 1) for i, im in enumerate(immersion_depth)
]
surface_following_distance = _fill_in_defaults(surface_following_distance, [0.0] * n)
flow_rates = [
op.flow_rate or (hlc.dispense_flow_rate if hlc is not None else 120.0)
Expand Down
199 changes: 127 additions & 72 deletions pylabrobot/liquid_handling/liquid_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import logging
import threading
import warnings
from itertools import zip_longest
from typing import (
Any,
AsyncIterator,
Awaitable,
Callable,
Dict,
Expand Down Expand Up @@ -1263,93 +1265,146 @@ async def dispense(

async def transfer(
self,
source: Well,
targets: List[Well],
source_vol: Optional[float] = None,
ratios: Optional[List[float]] = None,
target_vols: Optional[List[float]] = None,
aspiration_flow_rate: Optional[float] = None,
dispense_flow_rates: Optional[List[Optional[float]]] = None,
**backend_kwargs,
source_resources: Sequence[Container],
dest_resources: Sequence[Container],
vols: List[float],
tip_spots: Union[List[TipSpot], AsyncIterator[TipSpot]],
aspiration_kwargs: Optional[Dict[str, Any]] = None,
dispense_kwargs: Optional[Dict[str, Any]] = None,
tip_drop_method: Literal["return", "discard"] = "discard",
):
"""Transfer liquid from one well to another.
"""Transfer liquid from one set of resources to another. Each input resource matches to exactly one output resource.

Examples:
Transfer liquid from one column to another column:
>>> await lh.transfer(
... source_resources=plate1["A1":"H8"],
... dest_resources=plate2["A1":"H8"],
... vols=[50] * 8,
... tip_spots=tip_rack["A1":"H1"],
... )
"""

Transfer 50 uL of liquid from the first well to the second well:

>>> await lh.transfer(plate["A1"], plate["B1"], source_vol=50)

Transfer 80 uL of liquid from the first well equally to the first column:
if not (len(source_resources) == len(dest_resources) == len(vols)):
raise ValueError(
"Number of source and destination resources must match, but got "
f"{len(source_resources)} source resources, {len(dest_resources)} destination resources, "
f"and {len(vols)} volumes."
)

>>> await lh.transfer(plate["A1"], plate["A1:H1"], source_vol=80)
if isinstance(tip_spots, list) and len(tip_spots) < len(source_resources):
raise ValueError(
"Number of tip spots must be at least the number of channels, "
f"but got {len(tip_spots)} tip spots and {len(source_resources)} transfers."
)
if hasattr(tip_spots, "__aiter__") and hasattr(tip_spots, "__anext__"):
tip_spots = [await tip_spots.__anext__() for _ in source_resources] # type: ignore
assert isinstance(tip_spots, list)

for batch in range(0, len(source_resources), self.backend.num_channels):
batch_sources = source_resources[batch : batch + self.backend.num_channels]
batch_destinations = dest_resources[batch : batch + self.backend.num_channels]
batch_vols = vols[batch : batch + self.backend.num_channels]
batch_tip_spots = tip_spots[batch : batch + self.backend.num_channels]

await self.pick_up_tips(batch_tip_spots)

await self.aspirate(
resources=batch_sources,
vols=batch_vols,
**(aspiration_kwargs or {}),
)

Transfer 60 uL of liquid from the first well in a 1:2 ratio to 2 other wells:
await self.dispense(
resources=batch_destinations,
vols=batch_vols,
**(dispense_kwargs or {}),
)

>>> await lh.transfer(plate["A1"], plate["B1:C1"], source_vol=60, ratios=[2, 1])
if tip_drop_method == "return":
await self.return_tips()
else:
await self.discard_tips()

Transfer arbitrary volumes to the first column:
async def distribute(
self,
operations: Dict[Container, List[Tuple[Union[Container, List[Container]], float]]],
tip_spots: Union[List[TipSpot], AsyncIterator[TipSpot]],
dead_volume: float = 10,
tip_drop_method: Literal["return", "discard"] = "discard",
aspiration_kwargs: Optional[Dict[str, Any]] = None,
dispense_kwargs: Optional[Dict[str, Any]] = None,
):
"""
Distribute liquid from one resource to multiple resources.

>>> await lh.transfer(plate["A1"], plate["A1:H1"], target_vols=[3, 1, 4, 1, 5, 9, 6, 2])
Examples:
Distribute liquid from one well to multiple wells:
>>> await lh.distribute({
... plate1["A1"]: [(plate2["A1"], 50), (plate2["A2"], 50)],
... plate1["A2"]: [(plate2["B1"], 100), (plate2["B2"], 100), (plate2["B3"], 100)],
... })

Args:
source: The source well.
targets: The target wells.
source_vol: The volume to transfer from the source well.
ratios: The ratios to use when transferring liquid to the target wells. If not specified, then
the volumes will be distributed equally.
target_vols: The volumes to transfer to the target wells. If specified, `source_vols` and
`ratios` must be `None`.
aspiration_flow_rate: The flow rate to use when aspirating, in ul/s. If `None`, the backend
default will be used.
dispense_flow_rates: The flow rates to use when dispensing, in ul/s. If `None`, the backend
default will be used. Either a single flow rate for all channels, or a list of flow rates,
one for each target well.

Raises:
RuntimeError: If the setup has not been run. See :meth:`~LiquidHandler.setup`.
operations: A dictionary mapping source resources to a list of tuples, each containing a
destination resource and the volume to dispense to that resource.
"""

self._log_command(
"transfer",
source=source,
targets=targets,
source_vol=source_vol,
ratios=ratios,
target_vols=target_vols,
aspiration_flow_rate=aspiration_flow_rate,
dispense_flow_rates=dispense_flow_rates,
)

if target_vols is not None:
if ratios is not None:
raise TypeError("Cannot specify ratios and target_vols at the same time")
if source_vol is not None:
raise TypeError("Cannot specify source_vol and target_vols at the same time")
else:
if source_vol is None:
raise TypeError("Must specify either source_vol or target_vols")

if ratios is None:
ratios = [1] * len(targets)
if isinstance(tip_spots, list) and len(tip_spots) < len(operations):
raise ValueError(
"Number of tip spots must be at least the number of channels, "
f"but got {len(tip_spots)} tip spots and {len(operations)} distributions."
)
if hasattr(tip_spots, "__aiter__") and hasattr(tip_spots, "__anext__"):
tip_spots = [await tip_spots.__anext__() for _ in operations] # type: ignore
assert isinstance(tip_spots, list)

for source, dests in operations.items():
for i, (dest, vol) in enumerate(dests):
if isinstance(dest, list):
if len(dest) > 1:
raise ValueError("Only one destination per dispense operation is supported.")
if len(dest) == 0:
raise ValueError("Destination list cannot be empty.")
operations[source][i] = (dest[0], vol)

operations_list = list(operations.items())
for batch in range(0, len(operations_list), self.backend.num_channels):
batch_operations = operations_list[batch : batch + self.backend.num_channels]
batch_tips = tip_spots[batch : batch + self.backend.num_channels]
batch_sources = [src for src, _ in batch_operations]
batch_destinations = [dest for _, dest in batch_operations]
batch_volumes = [sum(v for _, v in dests) + dead_volume for dests in batch_destinations]
use_channels = list(range(len(batch_operations)))

await self.pick_up_tips(batch_tips)

# Aspirate from all source resources
await self.aspirate(
resources=batch_sources,
vols=batch_volumes,
**(aspiration_kwargs or {}),
)

target_vols = [source_vol * r / sum(ratios) for r in ratios]
for group in zip_longest(*batch_destinations):
dest, vols, channels = zip(
*(
(pair[0], pair[1], ch)
for pair, ch in zip_longest(group, use_channels)
if pair is not None
)
)
await self.dispense(
resources=list(dest),
vols=list(vols),
use_channels=list(channels),
**(dispense_kwargs or {}),
)

await self.aspirate(
resources=[source],
vols=[sum(target_vols)],
flow_rates=[aspiration_flow_rate],
**backend_kwargs,
)
dispense_flow_rates = dispense_flow_rates or [None] * len(targets)
for target, vol, dfr in zip(targets, target_vols, dispense_flow_rates):
await self.dispense(
resources=[target],
vols=[vol],
flow_rates=[dfr],
use_channels=[0],
**backend_kwargs,
)
if tip_drop_method == "return":
await self.return_tips()
else:
await self.discard_tips()

@contextlib.contextmanager
def use_channels(self, channels: List[int]):
Expand Down
Loading