diff --git a/README.md b/README.md index f3d1ebd..ffbf5aa 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,42 @@ my_dataframe.head() Returned dataframes also come with a pandas index column, representing the original row numbers from the OMERO.table. + +### Non-OMERO.tables Tables +Sometimes users attach a CSV file as a FileAnnotation in CSV format rather than +uploading as an OMERO.tables object. omero2pandas can still try to read these +using dedicated methods: + +```python +import omero2pandas +my_dataframe = omero2pandas.read_csv(file_id=101, + column_names=['object', 'intensity']) +my_dataframe.head() +# Returns dataframe with selected columns +``` + +Note that this interface supports less features than using full OMERO.tables, +so queries and row selection are unavailable. However, it is also possible to +load gzip-compressed CSV files (.csv.gz) directly with these methods. + +You can also directly download the OriginalFile as follows: + +```python +import omero2pandas +omero2pandas.download_csv("/path/to/output.csv", file_id=201) +``` + +In both these cases the `chunk_size` parameter controls the number of **bytes** +loaded in each server call rather than the row count. Take care when specifying +this parameter as using small values (e.g. 10) will make the download very slow. + +By default the downloader will only accept csv/csv.gz files, but it can +technically be used with most OriginalFile objects. Supply the `check_type=False` +argument to bypass that restriction. + +N.b. OMERO.tables cannot be downloaded with this method, use `omero2pandas.download_table` instead. + + ## Writing data Pandas dataframes can also be written back as new OMERO.tables. diff --git a/omero2pandas/__init__.py b/omero2pandas/__init__.py index f42ab58..81becac 100644 --- a/omero2pandas/__init__.py +++ b/omero2pandas/__init__.py @@ -15,9 +15,11 @@ import pandas import omero import omero.gateway +from omero.rtypes import unwrap from tqdm.auto import tqdm from omero2pandas.connect import OMEROConnection +from omero2pandas.io_tools import get_annotation_reader, infer_compression from omero2pandas.upload import create_table if find_spec("tiledb"): from omero2pandas.remote import create_remote_table @@ -287,7 +289,6 @@ def download_table(target_path, file_id=None, annotation_id=None, returned. Cannot be used with the 'rows' parameter. :param variables: Dictionary containing variables to map onto the query string. - :return: pandas.DataFrame object containing requested data """ if rows is not None and query is not None: raise ValueError("Running a query supersedes the rows argument. " @@ -359,6 +360,138 @@ def download_table(target_path, file_id=None, annotation_id=None, LOGGER.info(f"Download complete, saved to {target_path}") +def read_csv(file_id=None, annotation_id=None, column_names=None, + chunk_size=1048576, omero_connector=None, server=None, port=4064, + username=None, password=None, **kwargs): + """ + Read a csv or csv.gz file stored as an OMERO OriginalFile/FileAnnotation + into a pandas dataframe. + Supply either a file or annotation ID. + Convenience method for scenarios where data was uploaded as a raw CSV + rather than an OMERO.tables object. + Additional keyword arguments will be forwarded to the pandas.read_csv + method + :param file_id: ID of the OriginalFile to load + :param annotation_id: ID of the FileAnnotation to load + :param column_names: Optional list of column names to return + :param omero_connector: OMERO.client object which is already connected + to a server. Supersedes any other connection details. + :param server: Address of the server + :param port: Port the server runs on (default 4064) + :param username: Username for server login + :param password: Password for server login + :param chunk_size: BYTES to download in a single call (default 1024^2) + :return: pandas.DataFrame + """ + object_id, object_type = _validate_requested_object( + file_id=file_id, annotation_id=annotation_id) + if "usecols" in kwargs: + raise ValueError( + "Provide 'column_names' for column selection, not 'usecols'") + + with _get_connection(server=server, username=username, password=password, + port=port, client=omero_connector) as connector: + conn = connector.get_gateway() + orig_file = _get_original_file(conn, object_type, object_id) + file_id = unwrap(orig_file.id) + file_name = unwrap(orig_file.name) + file_mimetype = unwrap(orig_file.mimetype) + if "compression" not in kwargs: + compression = infer_compression(file_mimetype, file_name) + + # Check that the OriginalFile has the expected mimetype + + LOGGER.info(f"Reading file {file_id} of " + f"mimetype '{file_mimetype}' from OMERO") + bar_fmt = '{desc}: {percentage:3.0f}%|{bar}| ' \ + '{n_fmt}/{total_fmt}, {elapsed} {postfix}' + chunk_iter = tqdm(desc="Reading CSV from OMERO", + bar_format=bar_fmt, unit_scale=True) + with get_annotation_reader(conn, file_id, + chunk_size, reporter=chunk_iter) as reader: + df = pandas.read_csv(reader, + compression=compression, + usecols=column_names, **kwargs) + chunk_iter.close() + return df + + +def download_csv(target_path, file_id=None, annotation_id=None, + chunk_size=1048576, check_type=True, omero_connector=None, + server=None, port=4064, username=None, password=None): + """ + Downloads a CSV file stored as a csv or csv.gz file rather than an + OMERO.table. + Supply either a file or annotation ID. + For the connection, supply either an active client object or server + credentials (not both!). If neither are provided the program will search + for an OMERO user token on the system. + :param target_path: Path to the csv file where data will be saved. + :param file_id: ID of an OriginalFile object + :param annotation_id: ID of a FileAnnotation object + :param omero_connector: OMERO.client object which is already connected + to a server. Supersedes any other connection details. + :param server: Address of the server + :param port: Port the server runs on (default 4064) + :param username: Username for server login + :param password: Password for server login + :param chunk_size: BYTES to download in a single call (default 1024^2) + :param check_type: [Boolean] Whether to check that the target file is + actually a CSV + """ + object_id, object_type = _validate_requested_object( + file_id=file_id, annotation_id=annotation_id) + + assert not os.path.exists(target_path), \ + f"Target file {target_path} already exists" + + with _get_connection(server=server, username=username, password=password, + port=port, client=omero_connector) as connector: + conn = connector.get_gateway() + + orig_file = _get_original_file(conn, object_type, object_id) + file_id = unwrap(orig_file.id) + file_name = unwrap(orig_file.name) + file_mimetype = unwrap(orig_file.mimetype) + if check_type: + infer_compression(file_mimetype, file_name) + + LOGGER.info(f"Downloading file {file_id} of " + f"mimetype '{file_mimetype}' from OMERO") + bar_fmt = '{desc}: {percentage:3.0f}%|{bar}| ' \ + '{n_fmt}/{total_fmt}B, {elapsed} {postfix}' + chunk_iter = tqdm(desc="Downloading CSV from OMERO", + bar_format=bar_fmt, unit_scale=True) + with get_annotation_reader(conn, file_id, + chunk_size, reporter=chunk_iter) as reader: + with open(target_path, "wb") as filehandle: + for chunk in reader: + filehandle.write(chunk) + chunk_iter.close() + LOGGER.info(f"Download complete, saved to {target_path}") + + +def _get_original_file(conn, object_type, object_id): + if object_type not in ("FileAnnotation", "OriginalFile"): + raise ValueError(f"Unsupported type '{object_type}'") + # Fetch the object from OMERO + if object_type == "FileAnnotation": + params = omero.sys.ParametersI() + params.addId(object_id) + target = conn.getQueryService().findByQuery( + "SELECT fa.file from FileAnnotation fa where fa.id = :id", + params, {"omero.group": "-1"}) + elif object_type == "OriginalFile": + target = conn.getQueryService().find(object_type, object_id, + {"omero.group": "-1"}) + else: + raise NotImplementedError( + f"OMERO object of type {object_type} is not supported") + assert target is not None, f"{object_type} with ID" \ + f" {object_id} not found" + return target + + def _get_table(conn, object_type, object_id): """ Loads an OMERO.table remotely @@ -369,30 +502,16 @@ def _get_table(conn, object_type, object_id): by both a FileAnnotation and OriginalFile ID :return: Activated OMERO.table wrapper """ - orig_group = conn.SERVICE_OPTS.getOmeroGroup() - conn.SERVICE_OPTS.setOmeroGroup('-1') - # Fetch the object from OMERO - target = conn.getObject(object_type, object_id) - assert target is not None, f"{object_type} with ID" \ - f" {object_id} not found" - # Get the file object containing the table. - if isinstance(target, omero.gateway.FileAnnotationWrapper): - orig_file = target.file - elif isinstance(target, omero.gateway.OriginalFileWrapper): - orig_file = target._obj - else: - raise NotImplementedError( - f"OMERO object of type {type(target)} is not supported") + orig_file = _get_original_file(conn, object_type, object_id) # Check that the OriginalFile has the expected mimetype - if orig_file.mimetype is None or orig_file.mimetype.val != "OMERO.tables": + if unwrap(orig_file.mimetype) != "OMERO.tables": raise ValueError( - f"File {orig_file.id.val} is not a valid OMERO.tables") + f"File {unwrap(orig_file.id)} is not a valid OMERO.tables object") # Load the table resources = conn.c.sf.sharedResources() - data_table = resources.openTable(orig_file, conn.SERVICE_OPTS) - conn.SERVICE_OPTS.setOmeroGroup(orig_group) + data_table = resources.openTable(orig_file, {"omero.group": "-1"}) return data_table diff --git a/omero2pandas/io_tools.py b/omero2pandas/io_tools.py new file mode 100644 index 0000000..c988b27 --- /dev/null +++ b/omero2pandas/io_tools.py @@ -0,0 +1,149 @@ +# encoding: utf-8 +# +# Copyright (c) 2025 Glencoe Software, Inc. All rights reserved. +# +# This software is distributed under the terms described by the LICENCE file +# you can find at the root of the distribution bundle. +# If the file is missing please request a copy by contacting +# support@glencoesoftware.com. +import os +import io +import logging + +import omero +from omero.rtypes import unwrap +from omero.gateway import BlitzGateway + +LOGGER = logging.getLogger(__name__) + + +class OriginalFileIO(io.RawIOBase): + """ + Reader for loading data from OMERO OriginalFile objects with a + Python file-like interface + + For optimal performance this class should be wrapped in a BufferedReader, + use the get_annotation_reader convenience function for this. + """ + def __init__(self, conn: BlitzGateway, file_id: int, reporter=None): + LOGGER.debug(f"Creating new for file {file_id}") + self._prx = conn.c.getSession().createRawFileStore() + self._file_id = file_id + self.open() + if not (size := self._prx.size()): + raise omero.ClientError(f"Invalid size for OriginalFile {file_id}") + self._size = size + self._offset = 0 + self._reporter = reporter + if self._reporter is not None: + reporter.reset(total=size) + + def __enter__(self): + self.open() + + def __exit__(self, **kwargs): + self.close() + if self._reporter is not None: + self._reporter.close() + + def open(self): + current = unwrap(self._prx.getFileId()) + if current is None: + # RawFileStore is new or was previously closed + LOGGER.debug(f"Setting IO file ID to {self._file_id}") + self._prx.setFileId(self._file_id, {"omero.group": "-1"}) + elif current != self._file_id: + # Something else is messing with our filestore + raise Exception("RawFileStore pointed to incorrect object") + + def read(self, size=-1): + # Read `size` bytes from the target object. -1 = 'read all' + LOGGER.debug(f"Reading {size} bytes from file {self._file_id}") + # Ensure we're reading the correct object + if size == -1: + size = self._size - self._offset + size = min(size, self._size - self._offset) + if not size or self._offset >= self._size: + LOGGER.debug("Nothing to read") + return b"" + data = self._prx.read(self._offset, size) + self._offset += size + if self._reporter is not None: + self._reporter.update(size) + return data + + def readinto(self, buffer): + # Read bytes into the provided object and return number read + if self._offset >= self._size: + return 0 + data = self.read(len(buffer)) + count = len(data) + buffer[:count] = data + return count + + def seek(self, offset, whence=os.SEEK_SET): + # Seek within the target file + if whence == os.SEEK_SET: + self._offset = offset + elif whence == os.SEEK_CUR: + self._offset += offset + elif whence == os.SEEK_END: + self._offset = self._size - offset + else: + raise ValueError(f"Invalid whence value: {whence}") + if self._offset > self._size: + self._offset = self._size + elif self._offset < 0: + self._offset = 0 + + def close(self): + file_id = unwrap(self._prx.getFileId()) + if file_id == self._file_id: + # We only close if the current file is the active one + LOGGER.debug("Closing reader") + self._prx.close() + + def size(self): + return self._size + + def tell(self): + return self._offset + + def readable(self): + return True + + def seekable(self): + return True + + def writable(self): + return False + + +def get_annotation_reader(conn, file_id, chunk_size, reporter=None): + """ + Fetch a buffered reader for loading OriginalFile data + :param conn: BlitzGateway connection object + :param file_id: OriginalFile ID to load + :param chunk_size: Number of bytes to load per server call + :param reporter: TQDM progressbar instance + :return: BufferedReader instance + """ + reader = OriginalFileIO(conn, file_id, reporter=reporter) + return io.BufferedReader(reader, buffer_size=chunk_size) + + +def infer_compression(mimetype, name): + # Validate that the suggested file is actually some sort of CSV + if mimetype is None: + if name.lower().endswith(".csv"): + mimetype = "text/csv" + elif name.lower().endswith(".csv.gz"): + mimetype = "application/x-gzip" + else: + raise ValueError(f"Unsupported filetype: {name}") + mimetype = mimetype.lower() + if mimetype == "application/x-gzip": + return "gzip" + elif mimetype == "text/csv": + return None + raise ValueError(f"Unsupported mimetype: {mimetype}")