Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 41 additions & 35 deletions pairtools/cli/dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ def dedup_py(
n_proc,
**kwargs,
):

sep = ast.literal_eval('"""' + sep + '"""')
send_header_to_dedup = send_header_to in ["both", "dedup"]
send_header_to_dup = send_header_to in ["both", "dups"]
Expand Down Expand Up @@ -485,6 +484,34 @@ def dedup_py(
logger.warning(
"Pairs file appears not to be sorted, dedup might produce wrong results."
)

# Canonicalize column names for flexible matching
column_names = headerops.extract_column_names(header)
column_names = headerops.canonicalize_columns(column_names)

# Get column indices with fallbacks
try:
col1 = headerops.get_column_index(column_names, c1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the names like "col1/col2/cols1" are not descriptive enough, so maybe use "col_c1/col_c2/col_s1" instead: (1) use underscore, (2) preserve the recognizable name that was used before (c1/c2/p1/p2/etc.).

col2 = headerops.get_column_index(column_names, c2)
colp1 = headerops.get_column_index(column_names, p1)
colp2 = headerops.get_column_index(column_names, p2)
cols1 = headerops.get_column_index(column_names, s1)
cols2 = headerops.get_column_index(column_names, s2)

# Handle extra column pairs
extra_cols1 = []
extra_cols2 = []
if extra_col_pair is not None:
for col1_spec, col2_spec in extra_col_pair:
try:
extra_cols1.append(headerops.get_column_index(column_names, col1_spec))
extra_cols2.append(headerops.get_column_index(column_names, col2_spec))
except ValueError:
logger.warning(f"Extra column pair ({col1_spec}, {col2_spec}) not found in header, skipping")
continue
except ValueError as e:
raise ValueError(f"Column error: {str(e)}") from e

header = headerops.append_new_pg(header, ID=UTIL_NAME, PN=UTIL_NAME)
dups_header = header.copy()
if keep_parent_id and len(dups_header) > 0:
Expand All @@ -502,38 +529,17 @@ def dedup_py(
):
outstream_unmapped.writelines((l + "\n" for l in header))

column_names = headerops.extract_column_names(header)
extra_cols1 = []
extra_cols2 = []
if extra_col_pair is not None:
for col1, col2 in extra_col_pair:
extra_cols1.append(column_names[col1] if col1.isnumeric() else col1)
extra_cols2.append(column_names[col2] if col2.isnumeric() else col2)

if backend == "cython":
# warnings.warn(
# "'cython' backend is deprecated and provided only"
# " for backwards compatibility",
# DeprecationWarning,
# )
extra_cols1 = [column_names.index(col) for col in extra_cols1]
extra_cols2 = [column_names.index(col) for col in extra_cols2]
c1 = column_names.index(c1)
c2 = column_names.index(c2)
p1 = column_names.index(p1)
p2 = column_names.index(p2)
s1 = column_names.index(s1)
s2 = column_names.index(s2)
streaming_dedup_cython(
method,
max_mismatch,
sep,
c1,
c2,
p1,
p2,
s1,
s2,
col1,
col2,
colp1,
colp2,
cols1,
cols2,
extra_cols1,
extra_cols2,
unmapped_chrom,
Expand All @@ -554,7 +560,7 @@ def dedup_py(
method=method,
mark_dups=mark_dups,
max_mismatch=max_mismatch,
extra_col_pairs=list(extra_col_pair),
extra_col_pairs=list(zip(extra_cols1, extra_cols2)) if extra_cols1 else [],
keep_parent_id=keep_parent_id,
unmapped_chrom=unmapped_chrom,
outstream=outstream,
Expand All @@ -563,12 +569,12 @@ def dedup_py(
out_stat=out_stat,
backend=backend,
n_proc=n_proc,
c1=c1,
c2=c2,
p1=p1,
p2=p2,
s1=s1,
s2=s2,
c1=col1,
c2=col2,
p1=colp1,
p2=colp2,
s1=cols1,
s2=cols2,
)
else:
raise ValueError("Unknown backend")
Expand Down
49 changes: 37 additions & 12 deletions pairtools/cli/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import subprocess
import shutil
import warnings
from .._logging import get_logger

from ..lib import fileio, pairsam_format, headerops
from . import cli, common_io_options

logger = get_logger()
UTIL_NAME = "pairtools_sort"


Expand Down Expand Up @@ -58,6 +60,7 @@
default=pairsam_format.COLUMNS_PAIRS[7],
help=f"Pair type column; default {pairsam_format.COLUMNS_PAIRS[7]}"
"[input format option]",
required=False,
)
@click.option(
"--extra-col",
Expand Down Expand Up @@ -157,7 +160,6 @@ def sort_py(
compress_program,
**kwargs,
):

instream = fileio.auto_open(
pairs_path,
mode="r",
Expand All @@ -176,7 +178,6 @@ def sort_py(
header = headerops.mark_header_as_sorted(header)

outstream.writelines((l + "\n" for l in header))

outstream.flush()

if compress_program == "auto":
Expand All @@ -191,16 +192,42 @@ def sort_py(
compress_program = "gzip"

column_names = headerops.extract_column_names(header)
columns = [c1, c2, p1, p2, pt] + list(extra_col)
# Now generating the "-k <i>,<i><mode>" expressions for all columns.
# If column name is in the default pairsam format and has an integer dtype there, do numerical sorting
column_names = headerops.canonicalize_columns(column_names)

# Get column indices with fallbacks
try:
col1 = headerops.get_column_index(column_names, c1)
col2 = headerops.get_column_index(column_names, c2)
colp1 = headerops.get_column_index(column_names, p1)
colp2 = headerops.get_column_index(column_names, p2)

# Make pair_type optional
try:
colpt = headerops.get_column_index(column_names, pt) if pt else None
except ValueError:
colpt = None

extra_cols = []
for col in extra_col:
try:
extra_cols.append(headerops.get_column_index(column_names, col))
except ValueError:
logger.warning(f"Extra column {col} not found in header, skipping")
continue
except ValueError as e:
raise ValueError(f"Column error: {str(e)}") from e

# Generate sort command columns
cols = []
for col in columns:
colindex = int(col) if col.isnumeric() else column_names.index(col) + 1
for i, col in enumerate([col1, colp1, col2, colp2, colpt] + extra_cols):
if col is None:
continue # Skip optional columns that weren't found
dtype = pairsam_format.DTYPES_PAIRSAM.get(column_names[col], str)
cols.append(
f"-k {colindex},{colindex}{'n' if issubclass(pairsam_format.DTYPES_PAIRSAM.get(column_names[colindex-1], str), int) else ''}"
f"-k {col+1},{col+1}{'n' if issubclass(dtype, int) else ''}"
)
cols = " ".join(cols)

command = rf"""
/bin/bash -c 'export LC_COLLATE=C; export LANG=C; sort
{cols}
Expand All @@ -210,9 +237,8 @@ def sort_py(
{f'--temporary-directory={tmpdir}' if tmpdir else ''}
-S {memory}
{f'--compress-program={compress_program}' if compress_program else ''}'
""".replace(
"\n", " "
)
""".replace("\n", " ")

with subprocess.Popen(
command, stdin=subprocess.PIPE, bufsize=-1, shell=True, stdout=outstream
) as process:
Expand All @@ -224,7 +250,6 @@ def sort_py(

if instream != sys.stdin:
instream.close()

if outstream != sys.stdout:
outstream.close()

Expand Down
45 changes: 45 additions & 0 deletions pairtools/lib/headerops.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,51 @@



def canonicalize_columns(columns):
Copy link
Member

@agalitsyna agalitsyna Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"canonicalize" -> "standardize"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it a function with a single argument.

"""Convert between common column name variants."""
canonical_map = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove identities (when key equals the value)

'chr1': 'chrom1',
'chr2': 'chrom2',
'chrom1': 'chrom1', # Ensure identity mapping
'chrom2': 'chrom2',
'pt': 'pair_type',
'pair_type': 'pair_type'
}
return [canonical_map.get(col.lower(), col) for col in columns]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

canonical_map.get(col.lower(), col.lower())


def get_column_index(column_names, column_spec):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"column_spec" -> "column/col"

"""Get column index with flexible name matching."""
if isinstance(column_spec, int):
if -len(column_names) <= column_spec < len(column_names):
return column_spec % len(column_names) # Handle negative indices
raise ValueError(f"Column index {column_spec} out of range")

if not isinstance(column_spec, (str, int)):
raise AttributeError(f"Column spec must be string or integer, got {type(column_spec)}")

# Try direct match first
try:
return column_names.index(column_spec)
except ValueError:
pass

# Try canonical name
canonical = canonicalize_columns([column_spec])[0]
try:
return column_names.index(canonical)
except ValueError:
pass

# Try case-insensitive
Copy link
Member

@agalitsyna agalitsyna Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the case-insensittive law or explain why it's needed (we have not found the usecase; Open2C mtg 30 June 2025)

lower_columns = [c.lower() for c in column_names]
try:
return lower_columns.index(canonical.lower())
except ValueError:
available = ', '.join(f"'{c}'" for c in column_names)
raise ValueError(
f"Column '{column_spec}' not found. Available columns: {available}"
)

def get_header(instream, comment_char=COMMENT_CHAR, ignore_warning=False):
"""Returns a header from the stream and an the reaminder of the stream
with the actual data.
Expand Down
98 changes: 98 additions & 0 deletions tests/test_headerops.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,104 @@
import pytest


import pytest
from pairtools.lib.headerops import (
canonicalize_columns,
get_column_index,
extract_column_names,
)

def test_canonicalize_columns():
# Test basic canonicalization
assert canonicalize_columns(['chr1', 'chr2']) == ['chrom1', 'chrom2']
assert canonicalize_columns(['chrom1', 'chrom2']) == ['chrom1', 'chrom2']
assert canonicalize_columns(['pt', 'other']) == ['pair_type', 'other']

# Test mixed case
assert canonicalize_columns(['Chr1', 'CHR2']) == ['chrom1', 'chrom2']
assert canonicalize_columns(['CHR1', 'chr2']) == ['chrom1', 'chrom2']

# Test no changes needed
assert canonicalize_columns(['readID', 'pos1']) == ['readID', 'pos1']

# Test empty input
assert canonicalize_columns([]) == []

# Test all known aliases
assert canonicalize_columns(['chr1', 'Chr2', 'PT']) == ['chrom1', 'chrom2', 'pair_type']

def test_get_column_index():
# Setup test columns
columns = ['readID', 'chr1', 'pos1', 'chr2', 'pos2', 'strand1', 'strand2', 'pair_type']

# Test string lookup - direct matches
assert get_column_index(columns, 'chr1') == 1
assert get_column_index(columns, 'pos2') == 4
assert get_column_index(columns, 'pair_type') == 7

# Test string lookup - canonicalized matches
assert get_column_index(columns, 'chrom1') == 1
assert get_column_index(columns, 'CHROM2') == 3
assert get_column_index(columns, 'PT') == 7

# Test case insensitive matches
assert get_column_index(columns, 'CHR1') == 1
assert get_column_index(columns, 'ChR2') == 3

# Test integer lookup
assert get_column_index(columns, 0) == 0
assert get_column_index(columns, 3) == 3
assert get_column_index(columns, 7) == 7

# Test error cases
with pytest.raises(ValueError, match="Column 'nonexistent' not found"):
get_column_index(columns, 'nonexistent')

with pytest.raises(ValueError, match="Column index 100 out of range"):
get_column_index(columns, 100)

with pytest.raises(AttributeError, match="Column spec must be string or integer"):
get_column_index(columns, 3.14)

def test_integration_with_extract_column_names():
# Test with actual header format
header = [
"## pairs format v1.0",
"#columns: readID chr1 pos1 chr2 pos2 strand1 strand2 pair_type",
"#chromsize: chr1 1000",
"#chromsize: chr2 800"
]

columns = extract_column_names(header)
assert columns == ['readID', 'chr1', 'pos1', 'chr2', 'pos2', 'strand1', 'strand2', 'pair_type']

# Test canonicalized column access
assert get_column_index(columns, 'chrom1') == 1
assert get_column_index(columns, 'chrom2') == 3
assert get_column_index(columns, 'pt') == 7

# Test with alternative header format
header2 = [
"## pairs format v1.0",
"#columns: readID chrom1 pos1 chrom2 pos2 strand1 strand2 pair_type",
]
columns2 = extract_column_names(header2)
assert get_column_index(columns2, 'chr1') == 1
assert get_column_index(columns2, 'chr2') == 3

def test_edge_cases():
# Test empty columns
with pytest.raises(ValueError):
get_column_index([], 'chrom1')

# Test invalid column spec type
with pytest.raises(AttributeError):
get_column_index(['a', 'b', 'c'], 3.14) # float not supported

# Test negative indices
assert get_column_index(['a', 'b', 'c'], -1) == 2 # Python-style negative indexing


def test_make_standard_header():
header = headerops.make_standard_pairsheader()

Expand Down