-
Notifications
You must be signed in to change notification settings - Fork 73
Example of aggregating file with associated metadata #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
As another example, what is the best practice for aggregating multiple sets of files. For example: per-file job:
For one output channel, one could just do the following:
...but for multiple output channels, the following doesn't work:
Must one create a |
Assuming that one must provide metadata values via the file paths, when aggregating multiple files in Nextflow, I created the following script to aggregate files (assumes structured table files) and extracts values from the input file names (e.g., the sample name from files named "{sample_name}.tsv") and adds those extracted values as extra columns in the aggregated output table. I just used the same python code as used in snakemake for the same purpose. concat_tables.py#!/usr/bin/env python
from __future__ import print_function
import os
import re
import sys
import argparse
import logging
import collections
from itertools import chain
from glob import glob
# logging
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.DEBUG)
# argparse
class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter,
argparse.RawDescriptionHelpFormatter):
pass
desc = 'Concatenate tables with the same headers'
epi = """DESCRIPTION:
Aggregate >=1 structured table.
The columns are assumed to be all of the same among the tables.
Values can be extracted from the input file names (e.g., sample_id from sample1.txt)
and added as extra columns in the aggregated output table.
Use a glob pattern to identify >=1 input file.
Use --pattern to pull out values from the file path.
For example, use "{sample_id}.txt" to get the sample
id from files named "sample1.txt", "sample2.txt", etc.
Output written to STDOUT.
"""
parser = argparse.ArgumentParser(description=desc, epilog=epi,
formatter_class=CustomFormatter)
parser.add_argument('glob', type=str,
help='Glob pattern to find the input files, Example: "*.stats.tsv"')
parser.add_argument('-p', '--pattern', type=str, default=None,
help='Matching pattern in file naming scheme. Example: "{sample_id}_taxid-{taxid}"')
parser.add_argument('-f', '--fields', type=str, default=None, nargs='+',
help='Pattern matching fields to use in output. Example: "taxid"')
parser.add_argument('-d', '--delimiter', type=str, default='\t',
help='Field delimiter')
parser.add_argument('-H', '--no-header', action='store_true', default=False,
help='No headers?')
parser.add_argument('--version', action='version', version='0.0.1')
_wildcard_regex = re.compile(
r"""
\{
(?=( # This lookahead assertion emulates an 'atomic group'
# which is required for performance
\s*(?P<name>\w+) # wildcard name
(\s*,\s*
(?P<constraint> # an optional constraint
([^{}]+ | \{\d+(,\d+)?\})* # allow curly braces to nest one level
) # ... as in '{w,a{3,5}}'
)?\s*
))\1
\}
""",
re.VERBOSE,
)
def regex(filepattern):
"""
# Function taken from snakemake repo
"""
f = []
last = 0
wildcards = set()
for match in _wildcard_regex.finditer(filepattern):
f.append(re.escape(filepattern[last : match.start()]))
wildcard = match.group("name")
if wildcard in wildcards:
if match.group("constraint"):
raise ValueError(
"Constraint regex must be defined only in the first "
"occurence of the wildcard in a string."
)
f.append("(?P={})".format(wildcard))
else:
wildcards.add(wildcard)
f.append(
"(?P<{}>{})".format(
wildcard,
match.group("constraint") if match.group("constraint") else ".+",
)
)
last = match.end()
f.append(re.escape(filepattern[last:]))
f.append("$") # ensure that the match spans the whole file
return "".join(f)
def glob_wildcards(pattern, files=None, followlinks=False):
"""
# Function taken from snakemake repo
Glob the values of the wildcards by matching the given pattern to the filesystem.
Returns a named tuple with a list of values for each wildcard.
"""
pattern = os.path.normpath(pattern)
first_wildcard = re.search("{[^{]", pattern)
dirname = (
os.path.dirname(pattern[: first_wildcard.start()])
if first_wildcard
else os.path.dirname(pattern)
)
if not dirname:
dirname = "."
names = [match.group("name") for match in _wildcard_regex.finditer(pattern)]
Wildcards = collections.namedtuple("Wildcards", names)
wildcards = Wildcards(*[list() for name in names])
pattern = re.compile(regex(pattern))
if files is None:
files = (
os.path.normpath(os.path.join(dirpath, f))
for dirpath, dirnames, filenames in os.walk(
dirname, followlinks=followlinks
)
for f in chain(filenames, dirnames)
)
for f in files:
match = re.match(pattern, f)
if match:
for name, value in match.groupdict().items():
getattr(wildcards, name).append(value)
return wildcards
def main(args):
# listing files
infiles = glob(args.glob)
logging.info(f'No. of files found: {len(infiles)}')
# reading
header_written = False
for F in infiles:
# file path pattern matching
if args.pattern is not None:
wcs = glob_wildcards(args.pattern, files=[F])
else:
wcs = None
## select just certain fields
if args.fields is not None:
wcs_fields = args.fields
else:
wcs_fields = list(wcs._fields)
## pattern didn't match?
if (wcs is not None and
any([len(getattr(wcs, x)) == 0 for x in wcs_fields])):
logging.warning(f'>=1 pattern field is empty for file: {F}')
wcs = None
# reading file
with open(F) as inF:
logging.info(f'Reading file: {F}')
skip_cnt = 0
for i,line in enumerate(inF):
# parsing
line = line.rstrip().split(args.delimiter)
# skipping lines
if line[0] == '' or line[0].startswith('#'):
skip_cnt += 1
continue
# header
if not args.no_header and i - skip_cnt == 0:
if header_written:
continue
header_written = True
# adding extra fields
if wcs is not None:
line = wcs_fields + line
else:
# adding extra fields
if wcs is not None:
line = [getattr(wcs, x)[0] for x in wcs_fields] + line
# printing line to STDOUT
print(args.delimiter.join(line))
## script main
if __name__ == '__main__':
args = parser.parse_args()
main(args) Is this the "best" way of aggregating files and associated file metadata? |
Currently, there is no example of aggregating files AND associated metadata. For instance, in many/most nf-core pipelines the process outputs are something like:
...but what if one wants to then aggregate all of the
file.txt
outputs into one table AND include themeta
metadata in that output table?As far as I can tell from scouring the nextflow slack channel, one must "embed" the metadata in the file paths and then parse the file paths in the aggregation step. For example:
Per-file process:
Aggregation process:
Is there a better way, especially given the substantial limitations of trying to embed metadata into a file path (eg., dealing with multiple values and special characters in the metadata values)?
I'm sure a lot of pipeline developers would like a best-practices example of how to deal with this situation (without having to decipher how
meta
is dealt with in aggregation steps of nf-core pipelines).The text was updated successfully, but these errors were encountered: