Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions NEWS.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
Release Notes
================================================================================

Version 0.7.7dev: Issue 101
-------------------------------------------------------------------------------

General:

+ Added ``--ordered`` flag to all multiprocessing tools for deterministic output
ordering. When enabled, this flag ensures that output sequences maintain the
same order as input sequences, producing reproducible results across multiple
runs. This is particularly useful for result comparison, testing, and when
downstream analysis depends on sequence order. However, enabling ``--ordered``
reduces processing speed and increases memory usage due to additional indexing
and sorting overhead, so it should only be used when deterministic output is
required. Tools with new ``--ordered`` support: AlignSets, AssemblePairs,
BuildConsensus, ClusterSets, EstimateError, FilterSeq, MaskPrimers, UnifyHeaders.

Version 0.7.6dev: Unreleased
-------------------------------------------------------------------------------

Expand Down
21 changes: 14 additions & 7 deletions bin/AlignSets.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from presto.Applications import runMuscle
from presto.Sequence import calculateDiversity, indexSeqSets
from presto.IO import readPrimerFile, getOutputHandle, printLog, printWarning, printError
from presto.Multiprocessing import SeqResult, manageProcesses, feedSeqQueue, \
from presto.Multiprocessing import SeqResult, manageProcesses, manageProcessesOrdered, feedSeqQueue, \
collectSeqQueue


Expand Down Expand Up @@ -276,7 +276,7 @@ def processQueue(alive, data_queue, result_queue, align_func, align_args={},

def alignSets(seq_file, align_func, align_args, barcode_field=default_barcode_field,
calc_div=False, out_file=None, out_args=default_out_args,
nproc=None, queue_size=None):
nproc=None, queue_size=None, ordered=False):
"""
Performs a multiple alignment on sets of sequences

Expand Down Expand Up @@ -316,7 +316,8 @@ def alignSets(seq_file, align_func, align_args, barcode_field=default_barcode_fi
feed_func = feedSeqQueue
feed_args = {'seq_file': seq_file,
'index_func': indexSeqSets,
'index_args': index_args}
'index_args': index_args,
'ordered': ordered}
# Define worker function and arguments
work_func = processQueue
work_args = {'align_func': align_func,
Expand All @@ -329,12 +330,18 @@ def alignSets(seq_file, align_func, align_args, barcode_field=default_barcode_fi
'label': 'align',
'out_file': out_file,
'out_args': out_args,
'index_field': barcode_field}
'index_field': barcode_field,
'ordered': ordered}

# Call process manager
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
if ordered:
result = manageProcessesOrdered(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
else:
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)

# Print log
result['log']['END'] = 'AlignSets'
Expand Down
21 changes: 14 additions & 7 deletions bin/AssemblePairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from presto.IO import readReferenceFile, countSeqFile, printLog, printError
from presto.Sequence import reverseComplement, AssemblyStats, \
referenceAssembly, joinAssembly, alignAssembly, sequentialAssembly
from presto.Multiprocessing import SeqResult, manageProcesses, \
from presto.Multiprocessing import SeqResult, manageProcesses, manageProcessesOrdered, \
processSeqQueue, feedPairQueue, collectPairQueue

# Defaults
Expand Down Expand Up @@ -133,7 +133,7 @@ def assemblePairs(head_file, tail_file, assemble_func, assemble_args={},
coord_type=default_coord, rc='tail',
head_fields=None, tail_fields=None,
out_file=None, out_args=default_out_args,
nproc=None, queue_size=None):
nproc=None, queue_size=None, ordered=False):
"""
Generates consensus sequences

Expand Down Expand Up @@ -222,7 +222,8 @@ def assemblePairs(head_file, tail_file, assemble_func, assemble_args={},
feed_args = {'seq_file_1': head_file,
'seq_file_2': tail_file,
'coord_type': coord_type,
'delimiter': out_args['delimiter']}
'delimiter': out_args['delimiter'],
'ordered': ordered}
# Define worker function and arguments
process_args = {'assemble_func': assemble_func,
'assemble_args': assemble_args,
Expand All @@ -239,12 +240,18 @@ def assemblePairs(head_file, tail_file, assemble_func, assemble_args={},
'seq_file_2': tail_file,
'label': 'assemble',
'out_file': out_file,
'out_args': out_args}
'out_args': out_args,
'ordered': ordered}

# Call process manager
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
if ordered:
result = manageProcessesOrdered(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
else:
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)

# Close reference database handle
if cmd_name in ('reference', 'sequential'):
Expand Down
21 changes: 14 additions & 7 deletions bin/BuildConsensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from presto.Sequence import subsetSeqSet, calculateDiversity, \
qualityConsensus, frequencyConsensus, indexSeqSets, \
calculateSetError, deleteSeqPositions, findGapPositions
from presto.Multiprocessing import SeqResult, manageProcesses, feedSeqQueue, \
from presto.Multiprocessing import SeqResult, manageProcesses, manageProcessesOrdered, feedSeqQueue, \
collectSeqQueue


Expand Down Expand Up @@ -224,7 +224,7 @@ def buildConsensus(seq_file, barcode_field=default_barcode_field,
min_qual=default_consensus_min_qual, primer_field=None, primer_freq=None,
max_gap=None, max_error=None, max_diversity=None,
copy_fields=None, copy_actions=None, dependent=False,
out_file=None, out_args=default_out_args, nproc=None, queue_size=None):
out_file=None, out_args=default_out_args, nproc=None, queue_size=None, ordered=False):
"""
Generates consensus sequences

Expand Down Expand Up @@ -296,7 +296,8 @@ def buildConsensus(seq_file, barcode_field=default_barcode_field,
feed_func = feedSeqQueue
feed_args = {'seq_file': seq_file,
'index_func': indexSeqSets,
'index_args': index_args}
'index_args': index_args,
'ordered': ordered}
# Define worker function and arguments
work_func = processQueue
work_args = {'cons_func': cons_func,
Expand All @@ -316,12 +317,18 @@ def buildConsensus(seq_file, barcode_field=default_barcode_field,
'label': 'consensus',
'out_file': out_file,
'out_args': out_args,
'index_field': barcode_field}
'index_field': barcode_field,
'ordered': ordered}

# Call process manager
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
if ordered:
result = manageProcessesOrdered(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
else:
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)

# Print log
result['log']['END'] = 'BuildConsensus'
Expand Down
21 changes: 14 additions & 7 deletions bin/ClusterSets.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from presto.IO import countSeqFile, getFileType, getOutputHandle, printLog, printMessage, \
printProgress, readSeqFile, printError, printWarning
from presto.Sequence import indexSeqSets
from presto.Multiprocessing import SeqResult, manageProcesses, feedSeqQueue, \
from presto.Multiprocessing import SeqResult, manageProcesses, manageProcessesOrdered, feedSeqQueue, \
collectSeqQueue

# Defaults
Expand Down Expand Up @@ -137,7 +137,7 @@ def clusterSets(seq_file, ident=default_cluster_ident, length_ratio=default_leng
seq_start=0, seq_end=None, set_field=default_barcode_field, cluster_field=default_cluster_field,
cluster_prefix=default_cluster_prefix, cluster_memory=default_max_memory,
cluster_tool=default_cluster_tool, cluster_exec=default_cluster_exec,
out_file=None, out_args=default_out_args, nproc=None, queue_size=None):
out_file=None, out_args=default_out_args, nproc=None, queue_size=None, ordered=False):
"""
Performs clustering on sets of sequences

Expand Down Expand Up @@ -202,7 +202,8 @@ def clusterSets(seq_file, ident=default_cluster_ident, length_ratio=default_leng
feed_func = feedSeqQueue
feed_args = {'seq_file': seq_file,
'index_func': indexSeqSets,
'index_args': index_args}
'index_args': index_args,
'ordered': ordered}
# Define worker function and arguments
work_func = processQueue
work_args = {'cluster_func': cluster_func,
Expand All @@ -216,12 +217,18 @@ def clusterSets(seq_file, ident=default_cluster_ident, length_ratio=default_leng
'label': 'cluster',
'out_file': out_file,
'out_args': out_args,
'index_field': set_field}
'index_field': set_field,
'ordered': ordered}

# Call process manager
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
if ordered:
result = manageProcessesOrdered(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
else:
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)

# Print log
log = OrderedDict()
Expand Down
21 changes: 14 additions & 7 deletions bin/EstimateError.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
printLog, printProgress, printError, printWarning
from presto.Sequence import getDNAScoreDict, calculateDiversity, qualityConsensus, \
frequencyConsensus, indexSeqSets
from presto.Multiprocessing import SeqResult, manageProcesses, feedSeqQueue
from presto.Multiprocessing import SeqResult, manageProcesses, manageProcessesOrdered, feedSeqQueue
from presto.Annotation import parseAnnotation

# Defaults
Expand Down Expand Up @@ -530,7 +530,7 @@ def writeResults(results, seq_file, out_args):

def estimateSets(seq_file, cons_func=frequencyConsensus, cons_args={},
set_field=default_barcode_field, min_count=default_min_count, max_diversity=None,
out_args=default_out_args, nproc=None, queue_size=None):
out_args=default_out_args, nproc=None, queue_size=None, ordered=False):
"""
Calculates error rates of sequence sets

Expand Down Expand Up @@ -575,7 +575,8 @@ def estimateSets(seq_file, cons_func=frequencyConsensus, cons_args={},
feed_func = feedSeqQueue
feed_args = {'seq_file': seq_file,
'index_func': indexSeqSets,
'index_args': index_args}
'index_args': index_args,
'ordered': ordered}
# Define worker function and arguments
work_func = processEEQueue
work_args = {'cons_func': cons_func,
Expand All @@ -586,12 +587,18 @@ def estimateSets(seq_file, cons_func=frequencyConsensus, cons_args={},
collect_func = collectEEQueue
collect_args = {'seq_file': seq_file,
'out_args': out_args,
'set_field': set_field}
'set_field': set_field,
'ordered': ordered}

# Call process manager
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
if ordered:
result = manageProcessesOrdered(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
else:
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)

# Print log
result['log']['END'] = 'EstimateError'
Expand Down
20 changes: 13 additions & 7 deletions bin/FilterSeq.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
default_filter_max_repeat, default_filter_window
from presto.Commandline import CommonHelpFormatter, checkArgs, getCommonArgParser, parseCommonArgs
from presto.IO import getFileType, printLog, printError
from presto.Multiprocessing import manageProcesses, feedSeqQueue, \
from presto.Multiprocessing import manageProcesses, manageProcessesOrdered, feedSeqQueue, \
processSeqQueue, collectSeqQueue
from presto.Sequence import filterLength, filterMissing, filterRepeats, filterQuality, \
trimQuality, maskQuality


def filterSeq(seq_file, filter_func, filter_args={},
out_file=None, out_args=default_out_args,
nproc=None, queue_size=None):
nproc=None, queue_size=None, ordered=False):
"""
Filters sequences by fraction of ambiguous nucleotides

Expand Down Expand Up @@ -65,7 +65,7 @@ def filterSeq(seq_file, filter_func, filter_args={},

# Define feeder function and arguments
feed_func = feedSeqQueue
feed_args = {'seq_file': seq_file}
feed_args = {'seq_file': seq_file, 'ordered': ordered}
# Define worker function and arguments
work_func = processSeqQueue
work_args = {'process_func': filter_func,
Expand All @@ -75,12 +75,18 @@ def filterSeq(seq_file, filter_func, filter_args={},
collect_args = {'seq_file': seq_file,
'label': cmd_dict[filter_func],
'out_file': out_file,
'out_args': out_args}
'out_args': out_args,
'ordered': ordered}

# Call process manager
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
if ordered:
result = manageProcessesOrdered(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
else:
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)

# Print log
result['log']['END'] = 'FilterSeq'
Expand Down
21 changes: 14 additions & 7 deletions bin/MaskPrimers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from presto.Sequence import localAlignment, compilePrimers, extractAlignment, getDNAScoreDict, \
maskSeq, reverseComplement, scoreAlignment
from presto.IO import readPrimerFile, printLog
from presto.Multiprocessing import SeqResult, manageProcesses, feedSeqQueue, \
from presto.Multiprocessing import SeqResult, manageProcesses, manageProcessesOrdered, feedSeqQueue, \
processSeqQueue, collectSeqQueue

def extractPrimers(data, start, length, rev_primer=False, mode='mask', barcode=False,
Expand Down Expand Up @@ -215,7 +215,7 @@ def scorePrimers(data, primers, max_error=default_primer_max_error, start=defaul

def maskPrimers(seq_file, primer_file, align_func, align_args={},
out_file=None, out_args=default_out_args,
nproc=None, queue_size=None):
nproc=None, queue_size=None, ordered=False):
"""
Masks or cuts primers from sample sequences using local alignment

Expand All @@ -230,6 +230,7 @@ def maskPrimers(seq_file, primer_file, align_func, align_args={},
if None defaults to the number of CPUs.
queue_size : maximum size of the argument queue;
if None defaults to 2*nproc.
ordered : if True maintain deterministic output order for reproducible results.

Returns:
list: a list of successful output file names.
Expand Down Expand Up @@ -276,7 +277,7 @@ def maskPrimers(seq_file, primer_file, align_func, align_args={},

# Define feeder function and arguments
feed_func = feedSeqQueue
feed_args = {'seq_file': seq_file}
feed_args = {'seq_file': seq_file, 'ordered': ordered}
# Define worker function and arguments
work_func = processSeqQueue
work_args = {'process_func': align_func,
Expand All @@ -286,12 +287,18 @@ def maskPrimers(seq_file, primer_file, align_func, align_args={},
collect_args = {'seq_file': seq_file,
'label': 'primers',
'out_file': out_file,
'out_args': out_args}
'out_args': out_args,
'ordered': ordered}

# Call process manager
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
if ordered:
result = manageProcessesOrdered(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)
else:
result = manageProcesses(feed_func, work_func, collect_func,
feed_args, work_args, collect_args,
nproc, queue_size)

# Print log
result['log']['END'] = 'MaskPrimers'
Expand Down
Loading