Skip to content

Implement remote table registration calls #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 20, 2025
91 changes: 81 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,23 +271,22 @@ These should match the relevant column type. Mapped variables are substituted in
A `variables` map usually isn't needed for simple queries. The basic condition string should automatically get converted to a meaningful type, but when this fails
replacing tricky elements with a variable may help.

### Remote registration [Experimental]
### Remote registration

For **OMERO Plus** installations which support TileDB as the OMERO.tables backend
it is possible to register tables in-place in a similar manner to in-place image
imports (otherwise table data is stored in the ManagedRepository).

This is a two-step process:
1) Convert the dataframe into a TileDB file
2) Register the remote converted table with the OMERO server

If you don't know what table backend your OMERO Plus server is using, you
probably don't have this feature available. If you have access to the server
machine you can check by running `omero config get omero.tables.module`,
if the response is `omero_plus.run_tables_pytables_or_tiledb` then tiledb is
available.

This feature is currently in active development. The current version of
omero2pandas can export tables locally in TileDB format to be registered with
OMERO using external tooling.


For this mode to be available extra dependencies must also be installed as follows

```bash
Expand All @@ -305,8 +304,80 @@ db_path = omero2pandas.upload_table("/path/to/my_data.csv", "Name for table",
```

Similar to regular table uploads, the input can be a dataframe in memory or a
csv file on disk.
csv file on disk. The input will be converted into a TileDB database and
registered to OMERO in-place.

To perform this kind of registration you need to provide the `local_path` argument
to the standard `omero2pandas.upload_table` function (alongside required params for
a "normal" upload e.g. server connection details). The local path is the file path
where the tiledb file will be written to and registered to OMERO from.
If you provide a directory instead the tiledb file will be named based on the `table_name` argument.

Naturally, the OMERO server will need to be able to access the resulting tiledb file
in order to be registered. If the `local_path` is also visible from the server machine
(e.g. you're running the upload on the server itself) then that's sufficient. Otherwise
a `remote_path` argument is also available to tell the server where it should
find the table. This is typically needed if the tiledb file ends up mounted at a
different location between the local machine and the OMERO server.

For example, if registering from a Windows machine with a network drive to an OMERO server on Linux:
```python
omero2pandas.upload_table(
df, "My Custom Table",
local_path="J:\\data\\tables\\my_omero_table.tiledb",
remote_path="/network_data/tables/my_omero_table.tiledb"
)
```

Effectively, `local_path` is where the current machine should write the data to, `remote_path`
is where that file will be from the OMERO server's point of view. No remote path
implies that both machines will see the file at the local path.

Note that when a table is registered remotely it is not part of the Managed Repository
used to store OMERO data. This means that it becomes the user's responsibility to
update the table object on the OMERO server if the file is moved/deleted.

#### How it works

Remote registration is a two-step process: conversion to TileDB format followed
by registration using a HTTP API.

The TileDB conversion is handled automatically by omero2pandas. This largely involves
creating a TileDB database from your dataframe and adding a few details to
the converted table array metadata. Most native pandas column types are supported.

The actual registration involves telling the server that we'd like to register a
remote table and providing it with the tiledb file location. There is then a
validation process to ensure that the table seen by the server is the same one that
the user has requested the API to register. This is achieved by writing a "SecretToken"
key to the tiledb array metadata. The tiledb file seen by the server must have a key
matching the one provided in the registration call managed by omero2pandas.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of the SecretToken is not really validation - it's to ensure the client attempting to register a file has read access to that file.


While it is possible to manually create and register tables without a security key,
this is strongly discouraged as other users could potentially register and access
the same table without permission. With that in mind the implementation within
omero2pandas could be considered as an example of "best practice" for handling
remote table registration.

If the registration succeeds the tables API will create all the necessary OMERO
objects and return a FileAnnotation ID just as if we'd uploaded the table normally.

#### Converting to TileDB format without registration

While the processes of tiledb conversion and remote registration are intended to
be used together, it is possible to only convert a table to an OMERO Plus-compatible
TileDB file. This can be achieved as follows:

```python
import pandas as pd
from omero2pandas.remote import create_tiledb
df = pd.read_csv("/path/to/table.csv")
security_key = create_tiledb(df, "/path/to/output.tiledb")
```

This will convert an input dataframe of csv file path into a TileDB file with
appropriate metadata for remote registration.

A `remote_path` argument is also available. In future versions this will be
used if the remote table path is different from the server's point of view (e.g.
network drives are mapped at another location).
For convenience the creation function will return the SecretToken needed to perform
remote registration securely. That token could also be retrieved from the TileDB
file metadata if necessary.
44 changes: 30 additions & 14 deletions omero2pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
from omero2pandas.connect import OMEROConnection
from omero2pandas.upload import create_table
if find_spec("tiledb"):
from omero2pandas.remote import register_table
from omero2pandas.remote import create_remote_table
else:
register_table = None
create_remote_table = None

LOGGER = logging.getLogger(__name__)

Expand All @@ -48,7 +48,7 @@ def get_table_size(file_id=None, annotation_id=None, omero_connector=None,
object_id, object_type = _validate_requested_object(
file_id=file_id, annotation_id=annotation_id)

with OMEROConnection(server=server, username=username, password=password,
with _get_connection(server=server, username=username, password=password,
port=port, client=omero_connector) as connector:
conn = connector.get_gateway()
data_table = _get_table(conn, object_type, object_id)
Expand Down Expand Up @@ -78,7 +78,7 @@ def get_table_columns(file_id=None, annotation_id=None,
object_id, object_type = _validate_requested_object(
file_id=file_id, annotation_id=annotation_id)

with OMEROConnection(server=server, username=username, password=password,
with _get_connection(server=server, username=username, password=password,
port=port, client=omero_connector) as connector:
conn = connector.get_gateway()

Expand Down Expand Up @@ -124,7 +124,7 @@ def read_table(file_id=None, annotation_id=None, column_names=(), rows=None,
object_id, object_type = _validate_requested_object(
file_id=file_id, annotation_id=annotation_id)

with OMEROConnection(server=server, username=username, password=password,
with _get_connection(server=server, username=username, password=password,
port=port, client=omero_connector) as connector:
conn = connector.get_gateway()

Expand Down Expand Up @@ -186,7 +186,7 @@ def read_table(file_id=None, annotation_id=None, column_names=(), rows=None,
def upload_table(source, table_name, parent_id=None, parent_type='Image',
links=None, chunk_size=None, omero_connector=None,
server=None, port=4064, username=None, password=None,
local_path=None, remote_path=None):
local_path=None, remote_path=None, prefix=""):
"""
Upload a pandas dataframe to a new OMERO table.
For the connection, supply either an active client object or server
Expand All @@ -210,6 +210,11 @@ def upload_table(source, table_name, parent_id=None, parent_type='Image',
register remotely
:param remote_path: [TileDB only], mapping for local_path on the server
(if different from local system)
:param prefix: [TileDB only], API prefix for your OMERO server,
relative to server URL. Use this if your OMERO server
is not at the top-level URL of the server.
e.g. for my.omero.server/custom_omero
supply prefix="custom_omero"
:param password: Password for server login
:return: File Annotation ID of the new table
"""
Expand All @@ -230,17 +235,21 @@ def upload_table(source, table_name, parent_id=None, parent_type='Image',
elif not isinstance(links, Iterable):
raise ValueError(f"Links should be an iterable list of "
f"type/id pairs, not {type(links)}")
with OMEROConnection(server=server, username=username, password=password,
with _get_connection(server=server, username=username, password=password,
port=port, client=omero_connector) as connector:
conn = connector.get_gateway()
conn.SERVICE_OPTS.setOmeroGroup('-1')
if local_path or remote_path:
if not register_table:
if not create_remote_table:
raise ValueError("Remote table support is not installed")
ann_id = register_table(source, local_path,
remote_path=remote_path,
chunk_size=chunk_size)
ann_id = create_remote_table(source, local_path,
remote_path=remote_path,
table_name=table_name,
links=links,
chunk_size=chunk_size,
connector=connector,
prefix=prefix)
else:
conn = connector.get_gateway()
conn.SERVICE_OPTS.setOmeroGroup('-1')
ann_id = create_table(source, table_name, links, conn, chunk_size)
if ann_id is None:
LOGGER.warning("Failed to create OMERO table")
Expand Down Expand Up @@ -288,7 +297,7 @@ def download_table(target_path, file_id=None, annotation_id=None,
assert not os.path.exists(target_path), \
f"Target file {target_path} already exists"

with OMEROConnection(server=server, username=username, password=password,
with _get_connection(server=server, username=username, password=password,
port=port, client=omero_connector) as connector:
conn = connector.get_gateway()

Expand Down Expand Up @@ -433,3 +442,10 @@ def connect_to_omero(client=None, server=None, port=4064,
allow_token=allow_token)
connector.connect(interactive=interactive, keep_alive=keep_alive)
return connector


def _get_connection(client=None, **kwargs):
"""Create an OMEROConnection instance or use existing if supplied"""
if client is not None and isinstance(client, OMEROConnection):
return client
return OMEROConnection(client=client, **kwargs)
106 changes: 96 additions & 10 deletions omero2pandas/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,59 @@
# If the file is missing please request a copy by contacting
# support@glencoesoftware.com.
import logging
import secrets
from pathlib import Path, PurePosixPath
import time

import pandas as pd
import requests
import tiledb
from requests import HTTPError
from tqdm.auto import tqdm

LOGGER = logging.getLogger(__name__)

OMERO_TILEDB_VERSION = '3' # Version of the omero table implementation
CSRF_TOKEN_HEADER = "X-CSRFToken"
SEC_TOKEN_HEADER = "X-SecretToken"
SEC_TOKEN_METADATA_KEY = 'SecretToken' # Metadata key for secret token
TOKEN_ENDPOINT = "/api/v0/token"
REGISTER_ENDPOINT = "/omero_plus/api/v0/table"


def register_table(source, local_path, remote_path=None, chunk_size=1000):
def create_remote_table(source, local_path, remote_path=None, table_name=None,
links=(), chunk_size=1000, connector=None,
prefix=""):
LOGGER.info("Registering remote table")
# Default filters from tiledb.from_pandas()
write_path = Path(local_path or remote_path).with_suffix(".tiledb")
write_path = Path(local_path)
if write_path.is_dir():
write_path = (write_path / table_name).with_suffix(".tiledb")
# Assume the server will be running on Linux
remote_path = PurePosixPath(
remote_path or local_path).with_suffix(".tiledb")
if remote_path is None:
remote_path = PurePosixPath(write_path)
else:
remote_path = PurePosixPath(remote_path)
if remote_path.suffix != '.tiledb':
remote_path = remote_path / write_path.name
LOGGER.debug(f"Remote path would be {str(remote_path)}")
if write_path.exists():
raise ValueError(f"Table file {write_path} already exists")
token = create_tiledb(source, write_path, chunk_size=chunk_size)
if not write_path.exists():
raise ValueError(f"Table {write_path} appears to be missing?")
ann_id = register_table(connector, remote_path, table_name, links, token,
prefix=prefix)
return ann_id


def create_tiledb(source, output_path, chunk_size=1000):
if not isinstance(output_path, Path):
# Convert strings to proper path objects
output_path = Path(output_path)
if output_path.exists():
raise ValueError(f"Table file {output_path} already exists")
output_path.parent.mkdir(parents=True, exist_ok=True)
# path.as_uri() exists but mangles any spaces in the path!
write_path = str(write_path)
output_path = str(output_path)
# Use a default chunk size if not set
chunk_size = chunk_size or 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the default chunk_size in the function definition sufficient here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functions above this one permit explicitly supplying chunk_size=None which is interpreted as "determine automatically". For most modes this process involves inspecting the input table to figure out how many rows the API can process at once, but this is largely irrelevant for local TileDB conversion.

It'd be possible to resolve this in the calls to create_tiledb by not passing chunk_size at all if it was None, but having to unpack this created more mess than just having a fallback here.

LOGGER.info("Writing data to TileDB")
Expand All @@ -49,7 +78,7 @@ def register_table(source, local_path, remote_path=None, chunk_size=1000):
'{n_fmt}/{total_fmt} rows, {elapsed} {postfix}')
row_idx = 0
for chunk in data_iterator:
tiledb.from_pandas(write_path, chunk, sparse=True, full_domain=True,
tiledb.from_pandas(output_path, chunk, sparse=True, full_domain=True,
tile=10000, attr_filters=None,
row_start_idx=row_idx, allows_duplicates=False,
mode="append" if row_idx else "ingest")
Expand All @@ -58,8 +87,65 @@ def register_table(source, local_path, remote_path=None, chunk_size=1000):
progress_monitor.close()
LOGGER.debug("Appending metadata to TileDB")
# Append omero metadata
with tiledb.open(write_path, mode="w") as array:
security_token = secrets.token_urlsafe()
with tiledb.open(output_path, mode="w") as array:
array.meta['__version'] = OMERO_TILEDB_VERSION
array.meta['__initialized'] = time.time()
array.meta[SEC_TOKEN_METADATA_KEY] = security_token
LOGGER.info("Table saved successfully")
return write_path
return security_token


def register_table(connector, remote_path, table_name, links, token,
prefix=""):
if not connector.server:
raise ValueError("Unknown server? This should never happen!")
server = f"https://{connector.server}"
# Fix malformed prefix arguments if provided
if prefix and not prefix.startswith("/"):
prefix = f"/{prefix}"
if prefix.endswith("/"):
prefix = prefix[:-1]
# Determine endpoint URLs to use. Must be HTTPS
token_url = f"{server}{prefix}{TOKEN_ENDPOINT}"
target_url = (f"{server}{prefix}{REGISTER_ENDPOINT}"
f"?bsession={connector.getSessionId()}")
# We first need to get a CSRF security token and cookie from the server
LOGGER.debug(f"Fetching CSRF token from {connector.server}")
token_result = requests.get(token_url)
token_data = _check_response(token_result)
# Now that we have the token, construct the POST to do registration
payload = {
"uri": str(remote_path),
"name": table_name,
"targets": [f"{kind}:{ob_id}" for kind, ob_id in links],
}
headers = {
"Content-Type": "application/json",
SEC_TOKEN_HEADER: token,
CSRF_TOKEN_HEADER: token_data["data"],
"Referer": server,
}
LOGGER.info(f"Registering table to {connector.server}")
LOGGER.debug(f"Request params: {payload=}, {headers=}, url={target_url}")
result = requests.post(url=target_url, json=payload, headers=headers,
cookies=token_result.cookies, allow_redirects=False)
content = _check_response(result)
ann_id = content["data"]["file_annotation"]
LOGGER.info(f"Registered table successfully as FileAnnotation {ann_id}")
return ann_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we return the annotation ID instead of the Original File ID? In most places we use Original File ID for table identification.

Copy link
Contributor

@mabruce mabruce Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the annotation ID in omero table register

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also do this elsewhere in the library. This was chosen on the basis that getting an OriginalFile given the FileAnnotation is easier than figuring out the FileAnnotation starting from the OriginalFile.



def _check_response(response):
# Check response from an OMERO HTTP request and show error messages
if 200 <= response.status_code < 300:
return response.json()
error_message = "<No further message>"
if response.headers.get("content-type") == "application/json":
error_message = response.json()
if "message" in error_message:
error_message = error_message["message"]
LOGGER.error(
f"Request returned HTTP code {response.status_code}: {error_message}")
response.raise_for_status()
raise HTTPError(f"Unhandled response code: {response.status_code}")