Skip to content

Add remote csv reading functions #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,42 @@ my_dataframe.head()

Returned dataframes also come with a pandas index column, representing the original row numbers from the OMERO.table.


### Non-OMERO.tables Tables
Sometimes users attach a CSV file as a FileAnnotation in CSV format rather than
uploading as an OMERO.tables object. omero2pandas can still try to read these
using dedicated methods:

```python
import omero2pandas
my_dataframe = omero2pandas.read_csv(file_id=101,
column_names=['object', 'intensity'])
my_dataframe.head()
# Returns dataframe with selected columns
```

Note that this interface supports less features than using full OMERO.tables,
so queries and row selection are unavailable. However, it is also possible to
load gzip-compressed CSV files (.csv.gz) directly with these methods.

You can also directly download the OriginalFile as follows:

```python
import omero2pandas
omero2pandas.download_csv("/path/to/output.csv", file_id=201)
```

In both these cases the `chunk_size` parameter controls the number of **bytes**
loaded in each server call rather than the row count. Take care when specifying
this parameter as using small values (e.g. 10) will make the download very slow.

By default the downloader will only accept csv/csv.gz files, but it can
technically be used with most OriginalFile objects. Supply the `check_type=False`
argument to bypass that restriction.

N.b. OMERO.tables cannot be downloaded with this method, use `omero2pandas.download_table` instead.


## Writing data

Pandas dataframes can also be written back as new OMERO.tables.
Expand Down
152 changes: 133 additions & 19 deletions omero2pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import pandas
import omero
import omero.gateway
from omero.rtypes import unwrap
from tqdm.auto import tqdm

from omero2pandas.connect import OMEROConnection
from omero2pandas.io_tools import get_annotation_reader, infer_compression
from omero2pandas.upload import create_table
if find_spec("tiledb"):
from omero2pandas.remote import register_table
Expand Down Expand Up @@ -277,7 +279,6 @@ def download_table(target_path, file_id=None, annotation_id=None,
returned. Cannot be used with the 'rows' parameter.
:param variables: Dictionary containing variables to map onto the query
string.
:return: pandas.DataFrame object containing requested data
"""
if rows is not None and query is not None:
raise ValueError("Running a query supersedes the rows argument. "
Expand Down Expand Up @@ -349,6 +350,133 @@ def download_table(target_path, file_id=None, annotation_id=None,
LOGGER.info(f"Download complete, saved to {target_path}")


def read_csv(file_id=None, annotation_id=None, column_names=None,
chunk_size=1048576, omero_connector=None, server=None, port=4064,
username=None, password=None):
"""
Read a csv or csv.gz file stored as an OMERO OriginalFile/FileAnnotation
into a pandas dataframe.
Supply either a file or annotation ID.
Convenience method for scenarios where data was uploaded as a raw CSV
rather than an OMERO.tables object.
:param file_id: ID of the Original to load
:param annotation_id: ID of the FileAnnotation to load
:param column_names: Optional list of column names to return
:param omero_connector: OMERO.client object which is already connected
to a server. Supersedes any other connection details.
:param server: Address of the server
:param port: Port the server runs on (default 4064)
:param username: Username for server login
:param password: Password for server login
:param chunk_size: BYTES to download in a single call (default 1024^2)
:return: pandas.DataFrame
"""
object_id, object_type = _validate_requested_object(
file_id=file_id, annotation_id=annotation_id)

with OMEROConnection(server=server, username=username, password=password,
port=port, client=omero_connector) as connector:
conn = connector.get_gateway()
orig_file = _get_original_file(conn, object_type, object_id)
file_id = unwrap(orig_file.id)
file_name = unwrap(orig_file.name)
file_mimetype = unwrap(orig_file.mimetype)

compression = infer_compression(file_mimetype, file_name)

# Check that the OriginalFile has the expected mimetype

LOGGER.info(f"Reading file {file_id} of "
f"mimetype '{file_mimetype}' from OMERO")
bar_fmt = '{desc}: {percentage:3.0f}%|{bar}| ' \
'{n_fmt}/{total_fmt}, {elapsed} {postfix}'
chunk_iter = tqdm(desc="Reading CSV from OMERO",
bar_format=bar_fmt, unit_scale=True)
with get_annotation_reader(conn, file_id,
chunk_size, reporter=chunk_iter) as reader:
df = pandas.read_csv(reader,
compression=compression,
usecols=column_names)
chunk_iter.close()
return df


def download_csv(target_path, file_id=None, annotation_id=None,
chunk_size=1048576, check_type=True, omero_connector=None,
server=None, port=4064, username=None, password=None):
"""
Downloads a CSV file stored as a csv or csv.gz file rather than an
OMERO.table.
Supply either a file or annotation ID.
For the connection, supply either an active client object or server
credentials (not both!). If neither are provided the program will search
for an OMERO user token on the system.
:param target_path: Path to the csv file where data will be saved.
:param file_id: ID of an OriginalFile object
:param annotation_id: ID of a FileAnnotation object
:param omero_connector: OMERO.client object which is already connected
to a server. Supersedes any other connection details.
:param server: Address of the server
:param port: Port the server runs on (default 4064)
:param username: Username for server login
:param password: Password for server login
:param chunk_size: BYTES to download in a single call (default 1024^2)
:param check_type: [Boolean] Whether to check that the target file is
actually a CSV
"""
object_id, object_type = _validate_requested_object(
file_id=file_id, annotation_id=annotation_id)

assert not os.path.exists(target_path), \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an optional overwrite would be a useful parameter here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but debating whether to do that as a distinct PR since we'd also want to apply that to the download_table function.

f"Target file {target_path} already exists"

with (OMEROConnection(server=server, username=username, password=password,
port=port, client=omero_connector) as connector):
conn = connector.get_gateway()

orig_file = _get_original_file(conn, object_type, object_id)
file_id = unwrap(orig_file.id)
file_name = unwrap(orig_file.name)
file_mimetype = unwrap(orig_file.mimetype)
if check_type:
infer_compression(file_mimetype, file_name)

LOGGER.info(f"Downloading file {file_id} of "
f"mimetype '{file_mimetype}' from OMERO")
bar_fmt = '{desc}: {percentage:3.0f}%|{bar}| ' \
'{n_fmt}/{total_fmt}B, {elapsed} {postfix}'
chunk_iter = tqdm(desc="Downloading CSV from OMERO",
bar_format=bar_fmt, unit_scale=True)
with get_annotation_reader(conn, file_id,
chunk_size, reporter=chunk_iter) as reader:
with open(target_path, "wb") as filehandle:
for chunk in reader:
filehandle.write(chunk)
chunk_iter.close()
LOGGER.info(f"Download complete, saved to {target_path}")


def _get_original_file(conn, object_type, object_id):
if object_type not in ("FileAnnotation", "OriginalFile"):
raise ValueError(f"Unsupported type '{object_type}'")
# Fetch the object from OMERO
if object_type == "FileAnnotation":
params = omero.sys.ParametersI()
params.addId(object_id)
target = conn.getQueryService().findByQuery(
"SELECT fa.file from FileAnnotation fa where fa.id = :id",
params, {"omero.group": "-1"})
elif object_type == "OriginalFile":
target = conn.getQueryService().find(object_type, object_id,
{"omero.group": "-1"})
else:
raise NotImplementedError(
f"OMERO object of type {object_type} is not supported")
assert target is not None, f"{object_type} with ID" \
f" {object_id} not found"
return target


def _get_table(conn, object_type, object_id):
"""
Loads an OMERO.table remotely
Expand All @@ -359,30 +487,16 @@ def _get_table(conn, object_type, object_id):
by both a FileAnnotation and OriginalFile ID
:return: Activated OMERO.table wrapper
"""
orig_group = conn.SERVICE_OPTS.getOmeroGroup()
conn.SERVICE_OPTS.setOmeroGroup('-1')
# Fetch the object from OMERO
target = conn.getObject(object_type, object_id)
assert target is not None, f"{object_type} with ID" \
f" {object_id} not found"
# Get the file object containing the table.
if isinstance(target, omero.gateway.FileAnnotationWrapper):
orig_file = target.file
elif isinstance(target, omero.gateway.OriginalFileWrapper):
orig_file = target._obj
else:
raise NotImplementedError(
f"OMERO object of type {type(target)} is not supported")
orig_file = _get_original_file(conn, object_type, object_id)

# Check that the OriginalFile has the expected mimetype
if orig_file.mimetype is None or orig_file.mimetype.val != "OMERO.tables":
if unwrap(orig_file.mimetype) != "OMERO.tables":
raise ValueError(
f"File {orig_file.id.val} is not a valid OMERO.tables")
f"File {unwrap(orig_file.id)} is not a valid OMERO.tables object")

# Load the table
resources = conn.c.sf.sharedResources()
data_table = resources.openTable(orig_file, conn.SERVICE_OPTS)
conn.SERVICE_OPTS.setOmeroGroup(orig_group)
data_table = resources.openTable(orig_file, {"omero.group": "-1"})
return data_table


Expand Down
149 changes: 149 additions & 0 deletions omero2pandas/io_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# encoding: utf-8
#
# Copyright (c) 2025 Glencoe Software, Inc. All rights reserved.
#
# This software is distributed under the terms described by the LICENCE file
# you can find at the root of the distribution bundle.
# If the file is missing please request a copy by contacting
# support@glencoesoftware.com.
import os
import io
import logging

import omero
from omero.rtypes import unwrap
from omero.gateway import BlitzGateway

LOGGER = logging.getLogger(__name__)


class OriginalFileIO(io.RawIOBase):
"""
Reader for loading data from OMERO OriginalFile objects with a
Python file-like interface

Only a single RawFileStore instance can exist per BlitzGateway,
loading multiple readers will change the target file for the FileStore.

For optimal performance this class should be wrapped in a BufferedReader,
use the get_annotation_reader convenience function for this.
"""
def __init__(self, conn: BlitzGateway, file_id: int, reporter=None):
LOGGER.debug(f"Creating new for file {file_id}")
self._prx = conn.createRawFileStore()
self._file_id = file_id
self.open()
if not (size := self._prx.size()):
raise omero.ClientError(f"Invalid size for OriginalFile {file_id}")
self._size = size
self._offset = 0
self._reporter = reporter
if self._reporter is not None:
reporter.reset(total=size)

def __enter__(self):
self.open()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If open() is called in __init__ I don't think it needs to be called here as well.


def __exit__(self, **kwargs):
self.close()
if self._reporter is not None:
self._reporter.close()

def open(self):
current = unwrap(self._prx.getFileId())
if current != self._file_id:
LOGGER.debug(f"Setting IO file ID to {self._file_id}")
self._prx.setFileId(self._file_id, {"omero.group": "-1"})

def read(self, size=-1):
# Read `size` bytes from the target object. -1 = 'read all'
LOGGER.debug(f"Reading {size} bytes from file {self._file_id}")
# Ensure we're reading the correct object
if size == -1:
size = self._size - self._offset
size = min(size, self._size - self._offset)
if not size or self._offset >= self._size:
LOGGER.debug("Nothing to read")
return b""
self.open()
data = self._prx.read(self._offset, size)
self._offset += size
if self._reporter is not None:
self._reporter.update(size)
return data

def readinto(self, buffer):
# Read bytes into the provided object and return number read
if self._offset >= self._size:
return 0
data = self.read(len(buffer))
count = len(data)
buffer[:count] = data
return count

def seek(self, offset, whence=os.SEEK_SET):
# Seek within the target file
if whence == os.SEEK_SET:
self._offset = offset
elif whence == os.SEEK_CUR:
self._offset += offset
elif whence == os.SEEK_END:
self._offset = self._size - offset
else:
raise ValueError(f"Invalid whence value: {whence}")
if self._offset > self._size:
self._offset = self._size
elif self._offset < 0:
self._offset = 0

def close(self):
file_id = unwrap(self._prx.getFileId())
if file_id == self._file_id:
# We only close if the current file is the active one
Comment on lines +101 to +102
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me how this check will ever fail.

Copy link
Member Author

@DavidStirling DavidStirling Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kkoz

Much of this is down to odd behaviour in BlitzGateway. As far as I can tell the user can actually only create a single RawFileStore object, subsequent calls modify the existing instance. Therefore if they were to call and access another file while this object exists any subsequent reads would be interacting with whichever file they accessed last instead of the file our reader was made to interact with.

To solve this I've made the reader defensively check that the target file hasn't changed by calling self.open() before any reads. In most scenarios the active file ID will still be correct and nothing happens, but we don't want to risk reading the contents of a totally irrelevant file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - so is the idea that if the client hands you a BlitzGateway and you attempt to use the RawFileStore associated with it, they may also be using the RawFileStore associated with it and you'll be clobbering each other's setFileId calls? I would say that if that happens (i.e. client calls setFileId after omero2pandas does) we should throw, not just overwrite again. Otherwise we'll screw up whatever the client was trying to do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the most part we can probably expect users to not need to return to RawFileStore objects after the fact, but a challenge with this library is that we also want to support jupyter notebook usage. In that scenario (and indeed during testing) it was possible to create an OriginalFileIO reader, load in a second file and then encounter errors when trying to use that first reader again. From a user perspective if I create two file readers I'd expect them both to work seamlessly, so I opted for checking and updating the file id as reads are requested.

The most notable use case I can imagine would be if a user wanted to merge two large csv files stored as FileAnnotations. To do this chunk-wise you might use two readers simultaneously and so throwing if you touch another reader would break this. Admittedly this is all rather messy and niche to begin with, but perhaps we should show a warning if the file id was changed instead of throwing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the short answer here is you can't really re-use the reader (or create multiple readers) at all. Once close() is called on a RawFileStore, it becomes unusable. See https://docs.openmicroscopy.org/omero-common/5.6.1/javadoc/ome/api/StatefulServiceInterface.html#close--
So if a user attempts to read two files at the same time, once __exit__() is called by the first file, the reads on the second file will begin to fail.
I agree that that's not what users are expecting, but that's just how RawFileStore works, so I don't think there's much we can do about it right now. Maybe @chris-allan would have some ideas about how to make this work as desired.

LOGGER.debug("Closing reader")
self._prx.close()

def size(self):
return self._size

def tell(self):
return self._offset

def readable(self):
return True

def seekable(self):
return True

def writable(self):
return False


def get_annotation_reader(conn, file_id, chunk_size, reporter=None):
"""
Fetch a buffered reader for loading OriginalFile data
:param conn: BlitzGateway connection object
:param file_id: OriginalFile ID to load
:param chunk_size: Number of bytes to load per server call
:param reporter: TQDM progressbar instance
:return: BufferedReader instance
"""
reader = OriginalFileIO(conn, file_id, reporter=reporter)
return io.BufferedReader(reader, buffer_size=chunk_size)


def infer_compression(mimetype, name):
# Validate that the suggested file is actually some sort of CSV
if mimetype is None:
if name.lower().endswith(".csv"):
mimetype = "text/csv"
elif name.lower().endswith(".csv.gz"):
mimetype = "application/x-gzip"
else:
raise ValueError(f"Unsupported filetype: {name}")
mimetype = mimetype.lower()
if mimetype == "application/x-gzip":
return "gzip"
elif mimetype == "text/csv":
return None
raise ValueError(f"Unsupported mimetype: {mimetype}")