From 23e41694f156a5702c68af5153d851e669387ea4 Mon Sep 17 00:00:00 2001 From: David Stirling Date: Thu, 17 Apr 2025 13:47:11 +0100 Subject: [PATCH 1/5] Add remote csv reading mode --- README.md | 36 ++++++++++ omero2pandas/__init__.py | 152 ++++++++++++++++++++++++++++++++++----- omero2pandas/io_tools.py | 149 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 318 insertions(+), 19 deletions(-) create mode 100644 omero2pandas/io_tools.py diff --git a/README.md b/README.md index 5929c73..7d095ee 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,42 @@ my_dataframe.head() Returned dataframes also come with a pandas index column, representing the original row numbers from the OMERO.table. + +### Non-OMERO.tables Tables +Sometimes users attach a CSV file as a FileAnnotation in CSV format rather than +uploading as an OMERO.tables object. omero2pandas can still try to read these +using dedicated methods: + +```python +import omero2pandas +my_dataframe = omero2pandas.read_csv(file_id=101, + column_names=['object', 'intensity']) +my_dataframe.head() +# Returns dataframe with selected columns +``` + +Note that this interface supports less features than using full OMERO.tables, +so queries and row selection are unavailable. However, it is also possible to +load gzip-compressed CSV files (.csv.gz) directly with these methods. + +You can also directly download the OriginalFile as follows: + +```python +import omero2pandas +omero2pandas.download_csv("/path/to/output.csv", file_id=201) +``` + +In both these cases the `chunk_size` parameter controls the number of **bytes** +loaded in each server call rather than the row count. Take care when specifying +this parameter as using small values (e.g. 10) will make the download very slow. + +By default the downloader will only accept csv/csv.gz files, but it can +technically be used with most OriginalFile objects. Supply the `check_type=False` +argument to bypass that restriction. + +N.b. OMERO.tables cannot be downloaded with this method, use `omero2pandas.download_table` instead. + + ## Writing data Pandas dataframes can also be written back as new OMERO.tables. diff --git a/omero2pandas/__init__.py b/omero2pandas/__init__.py index 938b90a..9e2a028 100644 --- a/omero2pandas/__init__.py +++ b/omero2pandas/__init__.py @@ -15,9 +15,11 @@ import pandas import omero import omero.gateway +from omero.rtypes import unwrap from tqdm.auto import tqdm from omero2pandas.connect import OMEROConnection +from omero2pandas.io_tools import get_annotation_reader, infer_compression from omero2pandas.upload import create_table if find_spec("tiledb"): from omero2pandas.remote import register_table @@ -277,7 +279,6 @@ def download_table(target_path, file_id=None, annotation_id=None, returned. Cannot be used with the 'rows' parameter. :param variables: Dictionary containing variables to map onto the query string. - :return: pandas.DataFrame object containing requested data """ if rows is not None and query is not None: raise ValueError("Running a query supersedes the rows argument. " @@ -349,6 +350,133 @@ def download_table(target_path, file_id=None, annotation_id=None, LOGGER.info(f"Download complete, saved to {target_path}") +def read_csv(file_id=None, annotation_id=None, column_names=None, + chunk_size=1048576, omero_connector=None, server=None, port=4064, + username=None, password=None): + """ + Read a csv or csv.gz file stored as an OMERO OriginalFile/FileAnnotation + into a pandas dataframe. + Supply either a file or annotation ID. + Convenience method for scenarios where data was uploaded as a raw CSV + rather than an OMERO.tables object. + :param file_id: ID of the Original to load + :param annotation_id: ID of the FileAnnotation to load + :param column_names: Optional list of column names to return + :param omero_connector: OMERO.client object which is already connected + to a server. Supersedes any other connection details. + :param server: Address of the server + :param port: Port the server runs on (default 4064) + :param username: Username for server login + :param password: Password for server login + :param chunk_size: BYTES to download in a single call (default 1024^2) + :return: pandas.DataFrame + """ + object_id, object_type = _validate_requested_object( + file_id=file_id, annotation_id=annotation_id) + + with OMEROConnection(server=server, username=username, password=password, + port=port, client=omero_connector) as connector: + conn = connector.get_gateway() + orig_file = _get_original_file(conn, object_type, object_id) + file_id = unwrap(orig_file.id) + file_name = unwrap(orig_file.name) + file_mimetype = unwrap(orig_file.mimetype) + + compression = infer_compression(file_mimetype, file_name) + + # Check that the OriginalFile has the expected mimetype + + LOGGER.info(f"Reading file {file_id} of " + f"mimetype '{file_mimetype}' from OMERO") + bar_fmt = '{desc}: {percentage:3.0f}%|{bar}| ' \ + '{n_fmt}/{total_fmt}, {elapsed} {postfix}' + chunk_iter = tqdm(desc="Reading CSV from OMERO", + bar_format=bar_fmt, unit_scale=True) + with get_annotation_reader(conn, file_id, + chunk_size, reporter=chunk_iter) as reader: + df = pandas.read_csv(reader, + compression=compression, + usecols=column_names) + chunk_iter.close() + return df + + +def download_csv(target_path, file_id=None, annotation_id=None, + chunk_size=1048576, check_type=True, omero_connector=None, + server=None, port=4064, username=None, password=None): + """ + Downloads a CSV file stored as a csv or csv.gz file rather than an + OMERO.table. + Supply either a file or annotation ID. + For the connection, supply either an active client object or server + credentials (not both!). If neither are provided the program will search + for an OMERO user token on the system. + :param target_path: Path to the csv file where data will be saved. + :param file_id: ID of an OriginalFile object + :param annotation_id: ID of a FileAnnotation object + :param omero_connector: OMERO.client object which is already connected + to a server. Supersedes any other connection details. + :param server: Address of the server + :param port: Port the server runs on (default 4064) + :param username: Username for server login + :param password: Password for server login + :param chunk_size: BYTES to download in a single call (default 1024^2) + :param check_type: [Boolean] Whether to check that the target file is + actually a CSV + """ + object_id, object_type = _validate_requested_object( + file_id=file_id, annotation_id=annotation_id) + + assert not os.path.exists(target_path), \ + f"Target file {target_path} already exists" + + with (OMEROConnection(server=server, username=username, password=password, + port=port, client=omero_connector) as connector): + conn = connector.get_gateway() + + orig_file = _get_original_file(conn, object_type, object_id) + file_id = unwrap(orig_file.id) + file_name = unwrap(orig_file.name) + file_mimetype = unwrap(orig_file.mimetype) + if check_type: + infer_compression(file_mimetype, file_name) + + LOGGER.info(f"Downloading file {file_id} of " + f"mimetype '{file_mimetype}' from OMERO") + bar_fmt = '{desc}: {percentage:3.0f}%|{bar}| ' \ + '{n_fmt}/{total_fmt}B, {elapsed} {postfix}' + chunk_iter = tqdm(desc="Downloading CSV from OMERO", + bar_format=bar_fmt, unit_scale=True) + with get_annotation_reader(conn, file_id, + chunk_size, reporter=chunk_iter) as reader: + with open(target_path, "wb") as filehandle: + for chunk in reader: + filehandle.write(chunk) + chunk_iter.close() + LOGGER.info(f"Download complete, saved to {target_path}") + + +def _get_original_file(conn, object_type, object_id): + if object_type not in ("FileAnnotation", "OriginalFile"): + raise ValueError(f"Unsupported type '{object_type}'") + # Fetch the object from OMERO + if object_type == "FileAnnotation": + params = omero.sys.ParametersI() + params.addId(object_id) + target = conn.getQueryService().findByQuery( + "SELECT fa.file from FileAnnotation fa where fa.id = :id", + params, {"omero.group": "-1"}) + elif object_type == "OriginalFile": + target = conn.getQueryService().find(object_type, object_id, + {"omero.group": "-1"}) + else: + raise NotImplementedError( + f"OMERO object of type {object_type} is not supported") + assert target is not None, f"{object_type} with ID" \ + f" {object_id} not found" + return target + + def _get_table(conn, object_type, object_id): """ Loads an OMERO.table remotely @@ -359,30 +487,16 @@ def _get_table(conn, object_type, object_id): by both a FileAnnotation and OriginalFile ID :return: Activated OMERO.table wrapper """ - orig_group = conn.SERVICE_OPTS.getOmeroGroup() - conn.SERVICE_OPTS.setOmeroGroup('-1') - # Fetch the object from OMERO - target = conn.getObject(object_type, object_id) - assert target is not None, f"{object_type} with ID" \ - f" {object_id} not found" - # Get the file object containing the table. - if isinstance(target, omero.gateway.FileAnnotationWrapper): - orig_file = target.file - elif isinstance(target, omero.gateway.OriginalFileWrapper): - orig_file = target._obj - else: - raise NotImplementedError( - f"OMERO object of type {type(target)} is not supported") + orig_file = _get_original_file(conn, object_type, object_id) # Check that the OriginalFile has the expected mimetype - if orig_file.mimetype is None or orig_file.mimetype.val != "OMERO.tables": + if unwrap(orig_file.mimetype) != "OMERO.tables": raise ValueError( - f"File {orig_file.id.val} is not a valid OMERO.tables") + f"File {unwrap(orig_file.id)} is not a valid OMERO.tables object") # Load the table resources = conn.c.sf.sharedResources() - data_table = resources.openTable(orig_file, conn.SERVICE_OPTS) - conn.SERVICE_OPTS.setOmeroGroup(orig_group) + data_table = resources.openTable(orig_file, {"omero.group": "-1"}) return data_table diff --git a/omero2pandas/io_tools.py b/omero2pandas/io_tools.py new file mode 100644 index 0000000..03d2467 --- /dev/null +++ b/omero2pandas/io_tools.py @@ -0,0 +1,149 @@ +# encoding: utf-8 +# +# Copyright (c) 2025 Glencoe Software, Inc. All rights reserved. +# +# This software is distributed under the terms described by the LICENCE file +# you can find at the root of the distribution bundle. +# If the file is missing please request a copy by contacting +# support@glencoesoftware.com. +import os +import io +import logging + +import omero +from omero.rtypes import unwrap +from omero.gateway import BlitzGateway + +LOGGER = logging.getLogger(__name__) + + +class OriginalFileIO(io.RawIOBase): + """ + Reader for loading data from OMERO OriginalFile objects with a + Python file-like interface + + Only a single RawFileStore instance can exist per BlitzGateway, + loading multiple readers will change the target file for the FileStore. + + For optimal performance this class should be wrapped in a BufferedReader, + use the get_annotation_reader convenience function for this. + """ + def __init__(self, conn: BlitzGateway, file_id: int, reporter=None): + LOGGER.debug(f"Creating new for file {file_id}") + self._prx = conn.createRawFileStore() + self._file_id = file_id + self.open() + if not (size := self._prx.size()): + raise omero.ClientError(f"Invalid size for OriginalFile {file_id}") + self._size = size + self._offset = 0 + self._reporter = reporter + if self._reporter is not None: + reporter.reset(total=size) + + def __enter__(self): + self.open() + + def __exit__(self, **kwargs): + self.close() + if self._reporter is not None: + self._reporter.close() + + def open(self): + current = unwrap(self._prx.getFileId()) + if current != self._file_id: + LOGGER.debug(f"Setting IO file ID to {self._file_id}") + self._prx.setFileId(self._file_id, {"omero.group": "-1"}) + + def read(self, size=-1): + # Read `size` bytes from the target object. -1 = 'read all' + LOGGER.debug(f"Reading {size} bytes from file {self._file_id}") + # Ensure we're reading the correct object + if size == -1: + size = self._size - self._offset + size = min(size, self._size - self._offset) + if not size or self._offset >= self._size: + LOGGER.debug("Nothing to read") + return b"" + self.open() + data = self._prx.read(self._offset, size) + self._offset += size + if self._reporter is not None: + self._reporter.update(size) + return data + + def readinto(self, buffer): + # Read bytes into the provided object and return number read + if self._offset >= self._size: + return 0 + data = self.read(len(buffer)) + count = len(data) + buffer[:count] = data + return count + + def seek(self, offset, whence=os.SEEK_SET): + # Seek within the target file + if whence == os.SEEK_SET: + self._offset = offset + elif whence == os.SEEK_CUR: + self._offset += offset + elif whence == os.SEEK_END: + self._offset = self._size - offset + else: + raise ValueError(f"Invalid whence value: {whence}") + if self._offset > self._size: + self._offset = self._size + elif self._offset < 0: + self._offset = 0 + + def close(self): + file_id = unwrap(self._prx.getFileId()) + if file_id == self._file_id: + # We only close if the current file is the active one + LOGGER.debug("Closing reader") + self._prx.close() + + def size(self): + return self._size + + def tell(self): + return self._offset + + def readable(self): + return True + + def seekable(self): + return True + + def writable(self): + return False + + +def get_annotation_reader(conn, file_id, chunk_size, reporter=None): + """ + Fetch a buffered reader for loading OriginalFile data + :param conn: BlitzGateway connection object + :param file_id: OriginalFile ID to load + :param chunk_size: Number of bytes to load per server call + :param reporter: TQDM progressbar instance + :return: BufferedReader instance + """ + reader = OriginalFileIO(conn, file_id, reporter=reporter) + return io.BufferedReader(reader, buffer_size=chunk_size) + + +def infer_compression(mimetype, name): + # Validate that the suggested file is actually some sort of CSV + if mimetype is None: + if name.lower().endswith(".csv"): + mimetype = "text/csv" + elif name.lower().endswith(".csv.gz"): + mimetype = "application/x-gzip" + else: + raise ValueError(f"Unsupported filetype: {name}") + mimetype = mimetype.lower() + if mimetype == "application/x-gzip": + return "gzip" + elif mimetype == "text/csv": + return None + raise ValueError(f"Unsupported mimetype: {mimetype}") From 8838839995a631245a6c16b5ca77de4434cf1736 Mon Sep 17 00:00:00 2001 From: David Stirling Date: Fri, 25 Apr 2025 08:43:05 +0100 Subject: [PATCH 2/5] Typo --- omero2pandas/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/omero2pandas/__init__.py b/omero2pandas/__init__.py index 9e2a028..cc44dce 100644 --- a/omero2pandas/__init__.py +++ b/omero2pandas/__init__.py @@ -359,7 +359,7 @@ def read_csv(file_id=None, annotation_id=None, column_names=None, Supply either a file or annotation ID. Convenience method for scenarios where data was uploaded as a raw CSV rather than an OMERO.tables object. - :param file_id: ID of the Original to load + :param file_id: ID of the OriginalFile to load :param annotation_id: ID of the FileAnnotation to load :param column_names: Optional list of column names to return :param omero_connector: OMERO.client object which is already connected From 657f0632b51052f54a0439c39c5335f20a0f3861 Mon Sep 17 00:00:00 2001 From: David Stirling Date: Fri, 25 Apr 2025 08:55:47 +0100 Subject: [PATCH 3/5] Permit supplying read_csv args --- omero2pandas/__init__.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/omero2pandas/__init__.py b/omero2pandas/__init__.py index cc44dce..ff2ab52 100644 --- a/omero2pandas/__init__.py +++ b/omero2pandas/__init__.py @@ -352,13 +352,15 @@ def download_table(target_path, file_id=None, annotation_id=None, def read_csv(file_id=None, annotation_id=None, column_names=None, chunk_size=1048576, omero_connector=None, server=None, port=4064, - username=None, password=None): + username=None, password=None, **kwargs): """ Read a csv or csv.gz file stored as an OMERO OriginalFile/FileAnnotation into a pandas dataframe. Supply either a file or annotation ID. Convenience method for scenarios where data was uploaded as a raw CSV rather than an OMERO.tables object. + Additional keyword arguments will be forwarded to the pandas.read_csv + method :param file_id: ID of the OriginalFile to load :param annotation_id: ID of the FileAnnotation to load :param column_names: Optional list of column names to return @@ -373,6 +375,9 @@ def read_csv(file_id=None, annotation_id=None, column_names=None, """ object_id, object_type = _validate_requested_object( file_id=file_id, annotation_id=annotation_id) + if "usecols" in kwargs: + raise ValueError( + "Provide 'column_names' for column selection, not 'usecols'") with OMEROConnection(server=server, username=username, password=password, port=port, client=omero_connector) as connector: @@ -381,8 +386,8 @@ def read_csv(file_id=None, annotation_id=None, column_names=None, file_id = unwrap(orig_file.id) file_name = unwrap(orig_file.name) file_mimetype = unwrap(orig_file.mimetype) - - compression = infer_compression(file_mimetype, file_name) + if "compression" not in kwargs: + compression = infer_compression(file_mimetype, file_name) # Check that the OriginalFile has the expected mimetype @@ -396,7 +401,7 @@ def read_csv(file_id=None, annotation_id=None, column_names=None, chunk_size, reporter=chunk_iter) as reader: df = pandas.read_csv(reader, compression=compression, - usecols=column_names) + usecols=column_names, **kwargs) chunk_iter.close() return df From 5401d0935304959e71ef0fac97abdce3b1eab94d Mon Sep 17 00:00:00 2001 From: David Stirling Date: Fri, 25 Apr 2025 16:52:23 +0100 Subject: [PATCH 4/5] Don't use Blitz to get RawFileStore objects --- omero2pandas/io_tools.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/omero2pandas/io_tools.py b/omero2pandas/io_tools.py index 03d2467..c988b27 100644 --- a/omero2pandas/io_tools.py +++ b/omero2pandas/io_tools.py @@ -22,15 +22,12 @@ class OriginalFileIO(io.RawIOBase): Reader for loading data from OMERO OriginalFile objects with a Python file-like interface - Only a single RawFileStore instance can exist per BlitzGateway, - loading multiple readers will change the target file for the FileStore. - For optimal performance this class should be wrapped in a BufferedReader, use the get_annotation_reader convenience function for this. """ def __init__(self, conn: BlitzGateway, file_id: int, reporter=None): LOGGER.debug(f"Creating new for file {file_id}") - self._prx = conn.createRawFileStore() + self._prx = conn.c.getSession().createRawFileStore() self._file_id = file_id self.open() if not (size := self._prx.size()): @@ -51,9 +48,13 @@ def __exit__(self, **kwargs): def open(self): current = unwrap(self._prx.getFileId()) - if current != self._file_id: + if current is None: + # RawFileStore is new or was previously closed LOGGER.debug(f"Setting IO file ID to {self._file_id}") self._prx.setFileId(self._file_id, {"omero.group": "-1"}) + elif current != self._file_id: + # Something else is messing with our filestore + raise Exception("RawFileStore pointed to incorrect object") def read(self, size=-1): # Read `size` bytes from the target object. -1 = 'read all' @@ -65,7 +66,6 @@ def read(self, size=-1): if not size or self._offset >= self._size: LOGGER.debug("Nothing to read") return b"" - self.open() data = self._prx.read(self._offset, size) self._offset += size if self._reporter is not None: From efaeac06b3e818509c25d48b8a4eae3664a8980b Mon Sep 17 00:00:00 2001 From: David Stirling Date: Tue, 20 May 2025 15:57:58 +0100 Subject: [PATCH 5/5] Use get_connection with new methods --- omero2pandas/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/omero2pandas/__init__.py b/omero2pandas/__init__.py index 0b18ee4..81becac 100644 --- a/omero2pandas/__init__.py +++ b/omero2pandas/__init__.py @@ -389,7 +389,7 @@ def read_csv(file_id=None, annotation_id=None, column_names=None, raise ValueError( "Provide 'column_names' for column selection, not 'usecols'") - with OMEROConnection(server=server, username=username, password=password, + with _get_connection(server=server, username=username, password=password, port=port, client=omero_connector) as connector: conn = connector.get_gateway() orig_file = _get_original_file(conn, object_type, object_id) @@ -445,8 +445,8 @@ def download_csv(target_path, file_id=None, annotation_id=None, assert not os.path.exists(target_path), \ f"Target file {target_path} already exists" - with (OMEROConnection(server=server, username=username, password=password, - port=port, client=omero_connector) as connector): + with _get_connection(server=server, username=username, password=password, + port=port, client=omero_connector) as connector: conn = connector.get_gateway() orig_file = _get_original_file(conn, object_type, object_id)