-
Notifications
You must be signed in to change notification settings - Fork 4
Add remote csv reading functions #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
23e4169
8838839
657f063
5401d09
6fb6a70
efaeac0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,9 +15,11 @@ | |
import pandas | ||
import omero | ||
import omero.gateway | ||
from omero.rtypes import unwrap | ||
from tqdm.auto import tqdm | ||
|
||
from omero2pandas.connect import OMEROConnection | ||
from omero2pandas.io_tools import get_annotation_reader, infer_compression | ||
from omero2pandas.upload import create_table | ||
if find_spec("tiledb"): | ||
from omero2pandas.remote import register_table | ||
|
@@ -277,7 +279,6 @@ def download_table(target_path, file_id=None, annotation_id=None, | |
returned. Cannot be used with the 'rows' parameter. | ||
:param variables: Dictionary containing variables to map onto the query | ||
string. | ||
:return: pandas.DataFrame object containing requested data | ||
""" | ||
if rows is not None and query is not None: | ||
raise ValueError("Running a query supersedes the rows argument. " | ||
|
@@ -349,6 +350,133 @@ def download_table(target_path, file_id=None, annotation_id=None, | |
LOGGER.info(f"Download complete, saved to {target_path}") | ||
|
||
|
||
def read_csv(file_id=None, annotation_id=None, column_names=None, | ||
chunk_size=1048576, omero_connector=None, server=None, port=4064, | ||
username=None, password=None): | ||
""" | ||
Read a csv or csv.gz file stored as an OMERO OriginalFile/FileAnnotation | ||
into a pandas dataframe. | ||
Supply either a file or annotation ID. | ||
Convenience method for scenarios where data was uploaded as a raw CSV | ||
rather than an OMERO.tables object. | ||
:param file_id: ID of the Original to load | ||
:param annotation_id: ID of the FileAnnotation to load | ||
:param column_names: Optional list of column names to return | ||
:param omero_connector: OMERO.client object which is already connected | ||
to a server. Supersedes any other connection details. | ||
:param server: Address of the server | ||
:param port: Port the server runs on (default 4064) | ||
:param username: Username for server login | ||
:param password: Password for server login | ||
:param chunk_size: BYTES to download in a single call (default 1024^2) | ||
:return: pandas.DataFrame | ||
""" | ||
object_id, object_type = _validate_requested_object( | ||
file_id=file_id, annotation_id=annotation_id) | ||
|
||
with OMEROConnection(server=server, username=username, password=password, | ||
port=port, client=omero_connector) as connector: | ||
conn = connector.get_gateway() | ||
orig_file = _get_original_file(conn, object_type, object_id) | ||
file_id = unwrap(orig_file.id) | ||
file_name = unwrap(orig_file.name) | ||
file_mimetype = unwrap(orig_file.mimetype) | ||
|
||
compression = infer_compression(file_mimetype, file_name) | ||
|
||
# Check that the OriginalFile has the expected mimetype | ||
|
||
LOGGER.info(f"Reading file {file_id} of " | ||
f"mimetype '{file_mimetype}' from OMERO") | ||
bar_fmt = '{desc}: {percentage:3.0f}%|{bar}| ' \ | ||
'{n_fmt}/{total_fmt}, {elapsed} {postfix}' | ||
chunk_iter = tqdm(desc="Reading CSV from OMERO", | ||
bar_format=bar_fmt, unit_scale=True) | ||
with get_annotation_reader(conn, file_id, | ||
chunk_size, reporter=chunk_iter) as reader: | ||
df = pandas.read_csv(reader, | ||
compression=compression, | ||
usecols=column_names) | ||
chunk_iter.close() | ||
return df | ||
|
||
|
||
def download_csv(target_path, file_id=None, annotation_id=None, | ||
chunk_size=1048576, check_type=True, omero_connector=None, | ||
server=None, port=4064, username=None, password=None): | ||
""" | ||
Downloads a CSV file stored as a csv or csv.gz file rather than an | ||
OMERO.table. | ||
Supply either a file or annotation ID. | ||
For the connection, supply either an active client object or server | ||
credentials (not both!). If neither are provided the program will search | ||
for an OMERO user token on the system. | ||
:param target_path: Path to the csv file where data will be saved. | ||
:param file_id: ID of an OriginalFile object | ||
:param annotation_id: ID of a FileAnnotation object | ||
:param omero_connector: OMERO.client object which is already connected | ||
to a server. Supersedes any other connection details. | ||
:param server: Address of the server | ||
:param port: Port the server runs on (default 4064) | ||
:param username: Username for server login | ||
:param password: Password for server login | ||
:param chunk_size: BYTES to download in a single call (default 1024^2) | ||
:param check_type: [Boolean] Whether to check that the target file is | ||
actually a CSV | ||
""" | ||
object_id, object_type = _validate_requested_object( | ||
file_id=file_id, annotation_id=annotation_id) | ||
|
||
assert not os.path.exists(target_path), \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think an optional There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, but debating whether to do that as a distinct PR since we'd also want to apply that to the |
||
f"Target file {target_path} already exists" | ||
|
||
with (OMEROConnection(server=server, username=username, password=password, | ||
port=port, client=omero_connector) as connector): | ||
conn = connector.get_gateway() | ||
|
||
orig_file = _get_original_file(conn, object_type, object_id) | ||
file_id = unwrap(orig_file.id) | ||
file_name = unwrap(orig_file.name) | ||
file_mimetype = unwrap(orig_file.mimetype) | ||
if check_type: | ||
infer_compression(file_mimetype, file_name) | ||
|
||
LOGGER.info(f"Downloading file {file_id} of " | ||
f"mimetype '{file_mimetype}' from OMERO") | ||
bar_fmt = '{desc}: {percentage:3.0f}%|{bar}| ' \ | ||
'{n_fmt}/{total_fmt}B, {elapsed} {postfix}' | ||
chunk_iter = tqdm(desc="Downloading CSV from OMERO", | ||
bar_format=bar_fmt, unit_scale=True) | ||
with get_annotation_reader(conn, file_id, | ||
chunk_size, reporter=chunk_iter) as reader: | ||
with open(target_path, "wb") as filehandle: | ||
for chunk in reader: | ||
filehandle.write(chunk) | ||
chunk_iter.close() | ||
LOGGER.info(f"Download complete, saved to {target_path}") | ||
|
||
|
||
def _get_original_file(conn, object_type, object_id): | ||
if object_type not in ("FileAnnotation", "OriginalFile"): | ||
raise ValueError(f"Unsupported type '{object_type}'") | ||
# Fetch the object from OMERO | ||
if object_type == "FileAnnotation": | ||
params = omero.sys.ParametersI() | ||
params.addId(object_id) | ||
target = conn.getQueryService().findByQuery( | ||
"SELECT fa.file from FileAnnotation fa where fa.id = :id", | ||
params, {"omero.group": "-1"}) | ||
elif object_type == "OriginalFile": | ||
target = conn.getQueryService().find(object_type, object_id, | ||
{"omero.group": "-1"}) | ||
else: | ||
raise NotImplementedError( | ||
f"OMERO object of type {object_type} is not supported") | ||
assert target is not None, f"{object_type} with ID" \ | ||
f" {object_id} not found" | ||
return target | ||
|
||
|
||
def _get_table(conn, object_type, object_id): | ||
""" | ||
Loads an OMERO.table remotely | ||
|
@@ -359,30 +487,16 @@ def _get_table(conn, object_type, object_id): | |
by both a FileAnnotation and OriginalFile ID | ||
:return: Activated OMERO.table wrapper | ||
""" | ||
orig_group = conn.SERVICE_OPTS.getOmeroGroup() | ||
conn.SERVICE_OPTS.setOmeroGroup('-1') | ||
# Fetch the object from OMERO | ||
target = conn.getObject(object_type, object_id) | ||
assert target is not None, f"{object_type} with ID" \ | ||
f" {object_id} not found" | ||
# Get the file object containing the table. | ||
if isinstance(target, omero.gateway.FileAnnotationWrapper): | ||
orig_file = target.file | ||
elif isinstance(target, omero.gateway.OriginalFileWrapper): | ||
orig_file = target._obj | ||
else: | ||
raise NotImplementedError( | ||
f"OMERO object of type {type(target)} is not supported") | ||
orig_file = _get_original_file(conn, object_type, object_id) | ||
|
||
# Check that the OriginalFile has the expected mimetype | ||
if orig_file.mimetype is None or orig_file.mimetype.val != "OMERO.tables": | ||
if unwrap(orig_file.mimetype) != "OMERO.tables": | ||
raise ValueError( | ||
f"File {orig_file.id.val} is not a valid OMERO.tables") | ||
f"File {unwrap(orig_file.id)} is not a valid OMERO.tables object") | ||
|
||
# Load the table | ||
resources = conn.c.sf.sharedResources() | ||
data_table = resources.openTable(orig_file, conn.SERVICE_OPTS) | ||
conn.SERVICE_OPTS.setOmeroGroup(orig_group) | ||
data_table = resources.openTable(orig_file, {"omero.group": "-1"}) | ||
return data_table | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
# encoding: utf-8 | ||
# | ||
# Copyright (c) 2025 Glencoe Software, Inc. All rights reserved. | ||
# | ||
# This software is distributed under the terms described by the LICENCE file | ||
# you can find at the root of the distribution bundle. | ||
# If the file is missing please request a copy by contacting | ||
# support@glencoesoftware.com. | ||
import os | ||
import io | ||
import logging | ||
|
||
import omero | ||
from omero.rtypes import unwrap | ||
from omero.gateway import BlitzGateway | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class OriginalFileIO(io.RawIOBase): | ||
""" | ||
Reader for loading data from OMERO OriginalFile objects with a | ||
Python file-like interface | ||
|
||
Only a single RawFileStore instance can exist per BlitzGateway, | ||
loading multiple readers will change the target file for the FileStore. | ||
|
||
For optimal performance this class should be wrapped in a BufferedReader, | ||
use the get_annotation_reader convenience function for this. | ||
""" | ||
def __init__(self, conn: BlitzGateway, file_id: int, reporter=None): | ||
LOGGER.debug(f"Creating new for file {file_id}") | ||
self._prx = conn.createRawFileStore() | ||
self._file_id = file_id | ||
self.open() | ||
if not (size := self._prx.size()): | ||
raise omero.ClientError(f"Invalid size for OriginalFile {file_id}") | ||
self._size = size | ||
self._offset = 0 | ||
self._reporter = reporter | ||
if self._reporter is not None: | ||
reporter.reset(total=size) | ||
|
||
def __enter__(self): | ||
self.open() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If |
||
|
||
def __exit__(self, **kwargs): | ||
self.close() | ||
if self._reporter is not None: | ||
self._reporter.close() | ||
|
||
def open(self): | ||
current = unwrap(self._prx.getFileId()) | ||
if current != self._file_id: | ||
LOGGER.debug(f"Setting IO file ID to {self._file_id}") | ||
self._prx.setFileId(self._file_id, {"omero.group": "-1"}) | ||
|
||
def read(self, size=-1): | ||
# Read `size` bytes from the target object. -1 = 'read all' | ||
LOGGER.debug(f"Reading {size} bytes from file {self._file_id}") | ||
# Ensure we're reading the correct object | ||
if size == -1: | ||
size = self._size - self._offset | ||
size = min(size, self._size - self._offset) | ||
if not size or self._offset >= self._size: | ||
LOGGER.debug("Nothing to read") | ||
return b"" | ||
self.open() | ||
DavidStirling marked this conversation as resolved.
Show resolved
Hide resolved
|
||
data = self._prx.read(self._offset, size) | ||
self._offset += size | ||
if self._reporter is not None: | ||
self._reporter.update(size) | ||
return data | ||
|
||
def readinto(self, buffer): | ||
# Read bytes into the provided object and return number read | ||
if self._offset >= self._size: | ||
return 0 | ||
data = self.read(len(buffer)) | ||
count = len(data) | ||
buffer[:count] = data | ||
return count | ||
|
||
def seek(self, offset, whence=os.SEEK_SET): | ||
# Seek within the target file | ||
if whence == os.SEEK_SET: | ||
self._offset = offset | ||
elif whence == os.SEEK_CUR: | ||
self._offset += offset | ||
elif whence == os.SEEK_END: | ||
self._offset = self._size - offset | ||
else: | ||
raise ValueError(f"Invalid whence value: {whence}") | ||
if self._offset > self._size: | ||
self._offset = self._size | ||
elif self._offset < 0: | ||
self._offset = 0 | ||
|
||
def close(self): | ||
file_id = unwrap(self._prx.getFileId()) | ||
if file_id == self._file_id: | ||
# We only close if the current file is the active one | ||
Comment on lines
+101
to
+102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not clear to me how this check will ever fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @kkoz Much of this is down to odd behaviour in BlitzGateway. As far as I can tell the user can actually only create a single To solve this I've made the reader defensively check that the target file hasn't changed by calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm - so is the idea that if the client hands you a BlitzGateway and you attempt to use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the most part we can probably expect users to not need to return to The most notable use case I can imagine would be if a user wanted to merge two large csv files stored as FileAnnotations. To do this chunk-wise you might use two readers simultaneously and so throwing if you touch another reader would break this. Admittedly this is all rather messy and niche to begin with, but perhaps we should show a warning if the file id was changed instead of throwing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is that the short answer here is you can't really re-use the reader (or create multiple readers) at all. Once |
||
LOGGER.debug("Closing reader") | ||
self._prx.close() | ||
|
||
def size(self): | ||
return self._size | ||
|
||
def tell(self): | ||
return self._offset | ||
|
||
def readable(self): | ||
return True | ||
|
||
def seekable(self): | ||
return True | ||
|
||
def writable(self): | ||
return False | ||
|
||
|
||
def get_annotation_reader(conn, file_id, chunk_size, reporter=None): | ||
""" | ||
Fetch a buffered reader for loading OriginalFile data | ||
:param conn: BlitzGateway connection object | ||
:param file_id: OriginalFile ID to load | ||
:param chunk_size: Number of bytes to load per server call | ||
:param reporter: TQDM progressbar instance | ||
:return: BufferedReader instance | ||
""" | ||
reader = OriginalFileIO(conn, file_id, reporter=reporter) | ||
return io.BufferedReader(reader, buffer_size=chunk_size) | ||
|
||
|
||
def infer_compression(mimetype, name): | ||
# Validate that the suggested file is actually some sort of CSV | ||
if mimetype is None: | ||
if name.lower().endswith(".csv"): | ||
mimetype = "text/csv" | ||
elif name.lower().endswith(".csv.gz"): | ||
mimetype = "application/x-gzip" | ||
else: | ||
raise ValueError(f"Unsupported filetype: {name}") | ||
mimetype = mimetype.lower() | ||
if mimetype == "application/x-gzip": | ||
return "gzip" | ||
elif mimetype == "text/csv": | ||
return None | ||
raise ValueError(f"Unsupported mimetype: {mimetype}") |
Uh oh!
There was an error while loading. Please reload this page.