API Reference

readfish._config module

class readfish._config.Conf(channels, caller_settings, mapper_settings, split_axis=1, regions=NOTHING, barcodes=NOTHING)[source]

Overall configuration for readfish experiments

The Conf class is the mother if the adaptive sampling experiment. It is constructed from the provided TOML file, via a call to from_file.

Parameters:
  • channels (int) – The number of channels on the flow cell

  • caller_settings (CallerSettings) – The caller settings as listed in the TOML

  • mapper_settings (MapperSettings) – The mapper settings as listed in the TOML

  • split_axis (int) – The axis on which to split a flowcell if there are multiple regions. 0 is horizontal, 1 is vertical.

  • regions (List[Region]) – The regions as listed in the Toml file.

  • barcodes (Dict[str, Barcode]) – A Dictionary of barcode names to Barcode Classes

  • _channel_map – A map of channels number (1 to flowcell size) to the index of the Region (in self.regions) they are part of.

describe_experiment()[source]

Describe the experiment from the given Conf class. For Barcodes we describe the targets and the conditions, but not the region.

Returns:

The description string, human readable.

Return type:

str

classmethod from_dict(dict_, channels)[source]

Create the Conf class from a Dictionary

Parameters:
  • dict – The dictionary that contains the parsed TOML file

  • channels (int) – The number of channels on the flow cell

Raises:

ValueError – If channel is present in the TOML file raise ValueError as it will overwrite something

Returns:

The constructed Conf class

Return type:

Conf

classmethod from_file(path, channels, logger=None)[source]

Create a Conf from a TOML file.

Loads the toml using rtoml then calls from_dict to create the class.

Parameters:
  • path (str | Path) – Path to the toml file

  • channels (int) – Number of channels on the flow cell

  • logger (Logger | None) – Logger to write out a base64 encoded compressed toml, defaults to None

Returns:

The Conf as constructed from this toml

Return type:

Conf

get_barcode(barcode)[source]

Get a barcode for a given barcode name

Parameters:

barcode (str | None) – The name of the barcode, example “barcode01”

Returns:

The barcode class instance for the given barcode name, if there is one

Return type:

Barcode | None

get_conditions(channel, barcode)[source]

Get the condition for this channel or barcode from the Conf TOML

The barcoder should return the barcode name e.g. barcode01 or unclassified if a barcode could not be assigned. If barcoding is not being done then the barcode should be None and channel will be used instead.

Parameters:
  • channel (int) – Channel number for this result

  • barcode (str | None) – Barcode classification from basecalling

Returns control:

Whether this channel/barcode combination is a control condition

Returns condition:

The Barcode or Region that this channel/barcode belongs to

Raises:

ValueError – In the event that the channel/barcode combination does not find a Region or a Barcode

Return type:

Tuple[bool, Barcode | Region]

get_region(channel)[source]

Get the region for a given channel

Parameters:

channel (int) – The channel number

Returns:

Returns a region, if there is one, otherwise None

Return type:

Region | None

get_targets(channel, barcode)[source]

Get the targets for a given channel or barcode, via its condition

Parameters:
  • channel (int) – The channel number

  • barcode (str | None) – The barcode name, optional

Returns:

The targets list for a given channel

Return type:

Targets

to_file(path)[source]

Write a conf to a TOML file

Parameters:

path (str | Path) – File path to create the TOML file for

write_channels_toml(out_dir)[source]

Write out a channels toml file to the given directory. This file is a map of each channel number to the corresponding region name.

Parameters:

out_dir (Path) – Read Until client we are connected to.

class readfish._config._Condition(name, single_on, single_off, multi_on, multi_off, no_map, no_seq, control=False, targets=NOTHING, min_chunks=1, max_chunks=2, below_min_chunks=Action.proceed, above_max_chunks=Action.unblock)[source]

Representation of an experimental condition. This can either be a Barcode or an experimental Region of the flow cell.

Parameters:
  • name (str) – The name of the condition.

  • single_on (Action) – The Action to perform when a read has a single, on-target, alignment

  • single_off (Action) – The Action to perform when a read has a single, off-target, alignment

  • multi_on (Action) – The Action to perform when a read has multiple alignments, with at least one on-target

  • multi_off (Action) – The Action to perform when a read has multiple aligments, with all off-target

  • no_map (Action) – The Action to perform when a read has no aligments

  • no_seq (Action) – The Action to perform when a read did not basecall

  • control (bool) – Whether the region should be treated as a control. Defaults to False

  • targets (Targets) – The target sequences for the condition. See Targets for details

  • min_chunks (int) – The minimum number of chunks required before a decision will be made. Defaults to 1

  • max_chunks (int) – The maximum number of chunks that readfish will assess for any single read. Defaults to 2

  • below_min_chunks (Action) – The Action to take when we haven’t evaluated at least min_chunks. Defaults to Action.proceed

  • above_max_chunks (Action) – The Action to take when we have exceeded max_chunks. Defaults to Action.unblock

get_action(decision)[source]

Get the Action that corresponds to decision.

Parameters:

decision (Decision) – Decision for a read

pretty_print()[source]

Pretty print how the conditions match up to the resulting Actions. Used in the describe function.

Returns:

A pretty format string containing all the variables

Return type:

str

class readfish._config.Barcode(...)[source]

See _Condition for details

class readfish._config.Region(...)[source]

See _Condition for details

class readfish._config._PluginModule(name, parameters)[source]

A plugin module

Parameters:
  • name (str) – The name of the plugin module.

  • parameters (dict) – A dictionary of parameters to be passed to the plugin module.

classmethod from_dict(params)[source]

Creates an instance of the _PluginModule class from a dictionary.

Parameters:

params (Dict[str, Dict]) – A dictionary containing a single key-value pair, where the key is the name of the plugin module and the value is a dictionary of parameters to be passed to the plugin module.

Raises:

ValueError – If more than one key value pair is provided in the params

Returns:

An instance of the _PluginModule class with the specified name and parameters.

Return type:

_PluginModule

load_module(override=False)[source]

Load a plugin module with the given name.

If the module is a built-in plugin (as specified in the builtins dictionary), it is loaded from the readfish.plugins package. Otherwise, it is loaded using the importlib library.

Parameters:

override – If True, the built-in module names are ignored. Default is False.

Returns:

The loaded module.

Raises:

ModuleNotFoundError – If the plugin module cannot be found or loaded.

Note that this method is intended to be used as part of a plugin system, where plugin modules are loaded dynamically at runtime. The builtins dictionary maps the names of built-in plugins to the actual module names, and is used to avoid having to specify the full module name when loading a built-in plugin. If override=True, the builtin module names are ignored.

load_object(obj, *, init=True, override=False, **kwargs)[source]

Load a specified object from a plugin module.

First load_module is called to load the plugin module, then the specified object is retreived from the module.

Parameters:
  • obj (str) – The name of the object to load from the plugin module.

  • init (bool) – If True, the returned object is initialized with the parameters provided to the constructor of the parent class, as well as any additional keyword arguments passed in via the **kwargs parameter.

  • override (bool) – If True, ignore builtin readfish plugins.

  • kwargs – Additional keyword arguments to pass to the constructor of the loaded object.

Returns:

The specified object from the plugin module. If init=True, the object is initialized with the provided parameters and returned.

Raises:
  • ModuleNotFoundError – If the plugin module cannot be found or loaded.

  • AttributeError – If the specified object cannot be found in the plugin module.

  • TypeError – If the runtime **kwargs conflict with the module parameters from the TOML file.

class readfish._config.CallerSettings(...)[source]

See _PluginModule for details

class readfish._config.MapperSettings(...)[source]

See _PluginModule for details


readfish._loggers module

readfish._loggers.setup_logger(name, header=None, log_format='%(message)s', log_file=None, log_console=False, mode='a', level=10, propagate=False, queue_bound=100000)[source]

Configures and returns a logging.Logger object with handlers specified by the values set in log_file and log_format, specified format, and level.

A custom header can be included if logging to a file. Log messages will be formatted using the provided format string.

Parameters:
  • name (str) – Name to assign to the logger.

  • header (str | None) – Optional header to write at the top of the log file.

  • log_format (str) – Format string for log messages using % formatting, default is “%(message)s”.

  • log_file (str | None) – Path to the file where logs should be written.

  • log_console (bool) – Whether to log to console. If True, a console StreamHandler is added.

  • mode (str) – Mode to use when opening the log file, default is ‘a’ (append).

  • level (int) – Logging level, where logging.LEVEL is one of (DEBUG, INFO, WARNING, ERROR, CRITICAL). Default is logging.DEBUG.

  • propagate (bool) – Whether the logger should propagate messages to higher-level loggers, default is False.

  • queue_bound (int) – Maximum number of log messages to store in the queue, default is 100_000. If full, adding to queue will block until space is available.

Returns:

Configured logging.Logger instance.

Example:
>>> logger = setup_logger('my_logger', log_console=True, level=logging.INFO)
>>> logger.info('This is an info message')
>>> import tempfile
>>> with tempfile.NamedTemporaryFile(mode='w+', delete=True) as tmpfile:
...     logger = setup_logger('my_logger', log_file=tmpfile.name, header='Time  Message', level=logging.INFO)
Raises:

IOError – If an I/O error occurs while opening or writing to the file.

Note:
  • If log_file is specified, a QueueHandler and QueueListener will be used to send logs to the specified file.

    The Queue will be bounded, with a default size of 100_000. Putting to queue will block if full.

  • If log_file is specified and log_console is False, logs will only be recorded to the specified file.

  • If log_console is True, logs will be sent to console irrespective of whether log_file is specified.

  • If log_file is None and log_console is False, logs will be sent to a logging.NullHandler instance.

  • If header is provided and the file specified by filename already exists,

    the header will not be written to the file.

Return type:

Logger

readfish._loggers.print_args(args, printer, exclude=None)[source]

Prints and formats all arguments from the command line. Takes all entirely lowercase attributes of args and prints them using the provided printer function (expected to be print or logging.Logger.info). The exclude parameter can be used to exclude certain attributes from being printed.

Parameters:
  • args (Namespace) – The command line arguments object to print.

  • printer (Callable) – A function to print the arguments. This function should take a single string argument.

  • exclude (list | None) – A list of attribute names to exclude from the output.


readfish._utils module

utils.py functions and utilities used internally.

class readfish._utils.ChunkTracker(channels)[source]

Bases: object

seen(channel, read_id)[source]
class readfish._utils.Severity(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

Severity states for messaging to MinKNOW

Parameters:
  • INFO – Info level

  • WARN – Warn level

  • ERROR – Error level

ERROR = 3
INFO = 1
WARN = 2
readfish._utils.compress_and_encode_string(original_str)[source]

Compresses a string, encodes it in base-64, and returns an ASCII string representation of the compressed blob.

Parameters:

original_str (str) – The string to be compressed and encoded.

Returns:

An ASCII string representation of the compressed and base-64 encoded blob.

Return type:

str

readfish._utils.decode_and_decompress_string(encoded_str)[source]

Decodes an ASCII string representation of a compressed blob, decompresses it, and returns the original string. This is the reverse of compress_and_encode_string.

Parameters:

encoded_str (str) – An ASCII string representation of the compressed and base-64 encoded blob.

Returns:

The original string that was compressed and encoded.

Return type:

str

readfish._utils.draw_flowcell_split(flowcell_size, split=1, axis=1, index=0, prefix='\t')[source]

Draw unicode representation of the flowcell. If the flowcell is split more than once, and index is passed, the region of the flowcell represented by the index is highlighted solid, whilst the rest is filled with Xs

Rather than representing all the possible channels, we draw a 32 column wide flowcell for gridion and 120 for promethion and divide accordingly

Example

draw_flowcell_split(512)

XXXX XXXX

draw_flowcell_split(512, split = 2) XX00 XX00

draw_flowcell_split(512, split = 2, index = 1) 00XX 00XX

>>> print(draw_flowcell_split(126, 13, index=1, axis=1, prefix=""))

.#...........
.#...........
.#...........
.#...........
.#...........
>>> print(draw_flowcell_split(126, 5, index=1, axis=0, prefix=""))

.............
.............
.............
#############
.............
Parameters:
  • flowcell_size (int) – Number of channels on the flow cell

  • split (int) – The number of regions to split into, defaults to 1

  • index (int) – The index of the region to highlight, defaults to 0

  • prefix (str) – Any leading string character to put on the row. Defaults to

Returns:

String representation of the flowcell in ASCII art

Return type:

str

readfish._utils.escape_message_to_minknow(message, chars)[source]

Escape characters in the chars list if they are in message

>>> escape_message_to_minknow("20%", ["%"])
'20\\%'
>>> escape_message_to_minknow("20\\%", ["%"])
'20\\%'
>>> escape_message_to_minknow("20", ["%"])
'20'
Parameters:
  • message (str) – The message that is being sent

  • chars (list[str] | str) – The characters to escape

Returns:

message that has been escaped

Return type:

str

readfish._utils.format_bases(num, factor=1000, suffix='B')[source]

Return a human readable string of a large number using SI unit prefixes

Pararm num:

A number to convert to decimal form

Parameters:
  • factor (int) – The SI factor, use 1000 for SI units and 1024 for binary multiples

  • suffix (str) – The suffix to place after the SI prefix, for example use B for SI units and iB for binary multiples

Returns:

The input number formatted to two decimal places with the SI unit and suffix

Example:

Return type:

str

>>> format_bases(1_000)
'1.00 kB'
>>> format_bases(1_000_000)
'1.00 MB'
>>> format_bases(1_630_000)
'1.63 MB'
>>> format_bases(1_000_000_000)
'1.00 GB'
readfish._utils.generate_flowcell(flowcell_size, split=1, axis=1, odd_even=False)[source]

Return an list of lists with channels to use in conditions

Representations generated by this method are evenly split based on the physical layout of the flowcell. Each sub-list is the same size. Axis determines whether the flowcell divisions will go left-right (0) or top-bottom (1). As flongle has a shape of (10, 13) the top-bottom axis cannot be split evenly.

Parameters:
  • flowcell_size (int) – The total number of channels on the flowcell; 126 for Flongle, 512 for MinION, and 3000 for PromethION

  • split (int) – The number of sections to split the flowcell into, must be a positive factor of the flowcell dimension, defaults to 1

  • axis (int) – The axis along which to split, see: https://docs.scipy.org/doc/numpy/glossary.html?highlight=axis, defaults to 1

  • odd_even (bool) – Return a list of two lists split into odd-even channels, ignores split and axis, defaults to False

Raises:
  • ValueError – Raised when split is not a positive integer

  • ValueError – Raised when the value for split is not a factor on the axis provided

Returns:

A list of lists with channels divided equally

Return type:

list[list[int]]

>>> len(generate_flowcell(512))
1
>>> len(generate_flowcell(512)[0])
512
>>> len(generate_flowcell(512, split=4))
4
>>> for x in generate_flowcell(512, split=4):
...     print(len(x))
128
128
128
128
>>> generate_flowcell(512, split=5)
Traceback (most recent call last):
    ...
ValueError: The flowcell cannot be split evenly
>>> for x in generate_flowcell(126, 5, axis=0):
...     print(len(x))
26
26
26
26
26
readfish._utils.get_coords(channel, flowcell_size)[source]

Return a channel’s coordinates given a flowcell size

Parameters:
  • channel (int) – The channel to retrieve the coordinates for

  • flowcell_size (int) – The flowcell size, this is used to determine the flowcell layout

Returns:

The column and row of a channel number in the flowcell

Raises:
  • ValueError – channel cannot be below 0 or above flowcell_size

  • ValueError – Raised if flowcell_size not one of [128, 512, 3000]

Return type:

tuple[int, int]

readfish._utils.get_device(device, host='127.0.0.1', port=None)[source]

Get a position for a specific device over the minknow API

Parameters:
  • device (str) – The device name - example X1 or MS00000

  • host (str) – The host the RPC is listening on, defaults to “127.0.0.1”

  • port (int) – The port the RPC is listening on, defaults to None

Raises:

ValueError – If their is no match on any of the positions for the given device name

Returns:

The position representation from the MinkKNOW API

Return type:

FlowCellPosition

readfish._utils.get_flowcell_array(flowcell_size)[source]

Return a numpy.ndarray in the shape of a flowcell

Parameters:

flowcell_size (int) – The total number of channels on the flowcell; 126 for Flongle, 512 for MinION, and 3000 for PromethION

Returns:

An N-dimensional array representation of the flowcell

Return type:

_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]

>>> get_flowcell_array(126).shape
(10, 13)
>>> get_flowcell_array(512).shape
(16, 32)
>>> get_flowcell_array(3000).shape
(25, 120)
>>> get_flowcell_array(128)
Traceback (most recent call last):
    ...
ValueError: flowcell_size is not recognised
>>> get_flowcell_array(126)[9][-1]
0
>>> get_flowcell_array(512)[15][-1]
1
readfish._utils.iter_exception_group(exc, level=0)[source]

Traverses an exception tree, yielding formatted strings for each exception encountered

Parameters:
  • exc (BaseExceptionGroup) – The exception group to traverse

  • level (int) – The current indentation level, defaults to 0, defaults to 0

Yield:

Formatted (and indented) string representation of each exception encountered in the tree.

>>> exc = BaseExceptionGroup(
...     "level 1.0",
...     [
...         BaseExceptionGroup(
...             "level 2.0",
...             [
...                 BaseExceptionGroup(
...                     "level 3.0",
...                     [
...                         ValueError("abc"),
...                         KeyError("99"),
...                         BaseExceptionGroup("level 4.0", [TabError("nu uh")]),
...                     ],
...                 )
...             ],
...         ),
...         BaseExceptionGroup("level 2.1", [ValueError("345")]),
...     ],
... )
>>> print("\n".join(iter_exception_group(exc)))
level 1.0 (2 sub-exceptions):
 level 2.0 (1 sub-exception):
  level 3.0 (3 sub-exceptions):
   - ValueError('abc')
   - KeyError('99')
   level 4.0 (1 sub-exception):
    - TabError('nu uh')
 level 2.1 (1 sub-exception):
  - ValueError('345')
readfish._utils.nested_get(obj, key, default=None, *, delim='.')[source]

Get a value from a nested structure

>>> class C:
...     def __init__(self, x=None):
...         self.x = x
...     def __repr__(self): return f"C(x={self.x!r})"
>>> data = {"a": {"b": {"c": "d", "e": C(999)}}}
>>> cls = C(C(data))
>>> nested_get(data, "a.b.c")
'd'
>>> nested_get(data, "a.b.c", 0)
'd'
>>> nested_get(data, "a.b.c.d.e", 0)
0
>>> nested_get(cls, "x.x")
{'a': {'b': {'c': 'd', 'e': C(x=999)}}}
>>> nested_get(cls, "x.x.a.b.e.x")
999
>>> nested_get(cls, "missing", "MISSING")
'MISSING'
Parameters:
  • obj (Mapping) – Any with a __get_item__ method

  • key (Any) – The key to get from the Mapping

  • default (Any) – The default value to return if the key is not present, defaults to None

  • delim (str) – Split a string by given the delimiter, to access the Mapping using each key in turn, defaults to “.”

readfish._utils.nice_join(seq, sep=', ', conjunction='or')[source]

Join lists nicely

Parameters:
  • seq (Sequence[Any]) – A sequence of objects that have a __str__ method.

  • sep (str) – The separator for the join, defaults to “, “

  • conjunction (str) – A conjunction between the joined list and the last element, defaults to “or”

Returns:

The nicely joined string

Return type:

str

readfish._utils.send_message(rpc_connection, message, severity)[source]

Send a message to MinKNOW

Parameters:
  • rpc_connection (Connection) – An instance of the rpc.Connection

  • message (str) – The message to send

  • severity (int) – The severity to use for the message: 1=info, 2=warning, 3=error

readfish._utils.stringify_grid(grid)[source]

Convert a nested list of characters into a 2d grid.

Parameters:

grid (list[list[str]]) – The grid to convert. Represents the flowcell array.

Returns:

String representation of the flowcell in ASCII art

Return type:

str


readfish._cli_args module

Store for command line arguments and defaults, these are used by readfish entry points.

These are held here in an agnostic format and the actual CLI is generated by readfish._cli_base. The two primary items that are exported are BASE_ARGS and DEVICE_BASE_ARGS which define different sets of command line arguments for different purposes. BASE_ARGS are the minimal required arguments for _all_ entry points as they used for initialising loggers. DEVICE_BASE_ARGS are the set of arguments that are used for connecting to a sequencer (device) and some other related settings for selective sequencing scripts.

class readfish._cli_args.Chemistry(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

DUPLEX = 'duplex'

For the “smarter” version of duplex - does this read map to the previous reads opposite strand on the same contig. Won’t work for no map based decisions

DUPLEX_SIMPLE = 'duplex_simple'

Simple duplex - if we are going to unblock a read given the previous read on the same channel was stop receiving, sequence the current read instead.

SIMPLEX = 'simplex'

Normal simplex chemistry - no duplex override shenanigans


readfish._statistics module

This module contains the ReadfishStatistics class, which is designed to track and manage statistics pertaining to a single Readfish run. The ReadfishStatistics class is able to update and query various statistics and counters regarding the performance, decisions, actions, and conditions of Readfish runs.

The ReadfishStatistics class has the ability to compute and return averages related to chunks per second, batch time, and batch size, and it maintains various counters to keep track of the number of chunks processed, actions taken, decisions made, and conditions met. The class also facilitates the addition of new performance and read records to the existing statistics.

Example:
>>> from readfish._statistics import ReadfishStatistics, DEBUG_STATS_LOG_FIELDS
>>> stats = ReadfishStatistics(None)
>>> stats.add_batch_performance(1,1)
>>> stats.log_read(**dict(zip(DEBUG_STATS_LOG_FIELDS, (1, 2, "test_read_id", 7, 100, 3, "single_on", "stop_receiving", "exp_region", None, None, False, 0.0))), region_name="naff", overridden_action_name=None)
>>> print(stats.get_batch_performance())
0001R/1.0000s; Avg: 0001R/1.0000s; Seq:1; Unb:0; Pro:0; Slow batches (>1.00s): 0/1
>>> print(stats.decisions)
Counter({'single_on': 1})
class readfish._statistics.ReadfishStatistics(log_file, break_reads_seconds=1.0, total_chunks=0, actions=NOTHING, conditions=NOTHING, actions_conditions=NOTHING, decisions=NOTHING, first_read_overrides=NOTHING, batch_statistics=NOTHING, lock=NOTHING)[source]

Bases: object

A class for tracking and managing statistics for individual Readfish runs.

The ReadfishStatistics class is designed to manage and present statistics from individual Readfish runs, providing insights into performance, decisions, actions, and conditions encountered during the runs.

Variables:
  • break_reads_seconds – The number of seconds between each collection of chunk signal. Default 1.0.

  • log_file – The name of the log file to write to. If None, no file is output.

  • total_chunks – The total number of chunks processed.

  • actions – A counter tracking the number of Actions sent.

  • conditions – A counter tracking the number of reads seen for each Condition.

  • actions_conditions – A counter tracking number of condition/action/decision combinations.

  • decisions – A counter tracking the number of decisions made, globally.

  • first_read_overrides – A counter tracking whether the first read was sequences readfish was started during sequencing or unblocked if it was not.

  • batch_statistics – A counter tracking performance metrics such as summed batch times, total chunks in batches, and number of batches seen.

Example:

>>> stats = ReadfishStatistics(None)
>>> stats.add_batch_performance(1, 1)
>>> stats.log_read(**dict(zip(DEBUG_STATS_LOG_FIELDS, (1, 2, "test_read_id",7, 100, 3, "single_on", "stop_receiving", "exp_region", None, None,False, 0.0))), region_name="naff", overridden_action_name=None)
>>> print(stats.get_batch_performance())
0001R/1.0000s; Avg: 0001R/1.0000s; Seq:1; Unb:0; Pro:0; Slow batches (>1.00s): 0/1
>>> print(stats.decisions)
Counter({'single_on': 1})

Example with log file

>>> import tempfile
>>> import os
>>> import time
>>> from pprint import pformat
>>> from readfish._statistics import ReadfishStatistics, DEBUG_STATS_LOG_FIELDS
>>> with tempfile.TemporaryDirectory() as tmpdir:
...     # Change the current working directory to the temporary directory
...     os.chdir(tmpdir)
...     # Use the current directory for the log file
...     log_file_name = "readfish.log"
...     # Create an instance of ReadfishStatistics with the log file in the temporary directory
...     stats = ReadfishStatistics(log_file=log_file_name)
...     # Use the log_read method to log a sample read
...     stats.log_read(**dict(zip(DEBUG_STATS_LOG_FIELDS,(1, 2, "test_read_id", 7, 100, 3, "single_on", "stop_receiving", "exp_region",None, None, False, 0.0))), region_name="naff", overridden_action_name=None)
...     # in this test, we need a small amount of time to allow the logger to write the file
...     time.sleep(0.1)
...     # Read the content of the file
...     with open(log_file_name, 'r') as log_file:
...         content = log_file.read()
...     # Prepare the expected content
...     header = " ".join(DEBUG_STATS_LOG_FIELDS)
...     expected_line = " ".join(map(str, (1, 2, "test_read_id", 7, 100, 3, "single_on","stop_receiving", "exp_region", None, None, False, 0.0)))
...     expected = f"{header}\n{expected_line}"
...     # Check that the content matches, don't ask about the replaces, it was the only way
...     expected.replace("  ", " ") == content.replace("\t", " ").strip()
True
log_file: str | None
break_reads_seconds: float
total_chunks: int
actions: Counter
conditions: Counter
actions_conditions: Counter
decisions: Counter
first_read_overrides: Counter
batch_statistics: Counter
debug_logger: Logger
property average_chunks_per_second: float

Calculate and return the average number of chunks processed per second.

Returns:

Average number of chunks processed per second.

Given: batch_statistics = {“batch_count”: 2, “batch_size”: 100, “batch_time”: 50}

>>> stats = ReadfishStatistics(None)
>>> stats.add_batch_performance(number_of_reads=10, batch_time=5)
>>> stats.average_chunks_per_second
2.0

More complex example:

>>> stats = ReadfishStatistics(None)
>>> stats.add_batch_performance(number_of_reads=10, batch_time=5)
>>> stats.add_batch_performance(number_of_reads=10, batch_time=5)
>>> stats.add_batch_performance(number_of_reads=40, batch_time=5)
>>> stats.average_chunks_per_second
4.0

When batch_count is 0, the result will be 0.

>>> stats.batch_statistics["batch_count"] = 0
>>> stats.average_chunks_per_second
0
property average_batch_time: float

Calculate and return the average time taken per batch.

Examples:

Given: batch_statistics = {“batch_count”: 3, “cumulative_batch_size”: 150, “cumulative_batch_time”: 60}

>>> stats = ReadfishStatistics(None)
>>> stats.batch_statistics = {"batch_count": 3, "cumulative_batch_size": 150, "cumulative_batch_time": 60}
>>> stats.average_batch_time
20.0

When batch_count is 0, the result should be 0.

>>> stats.batch_statistics["batch_count"] = 0
>>> stats.average_batch_time
0
property average_batch_size: float

Calculate and return the average size of processed batches.

The method computes the average batch size by dividing the total number of chunks processed by the number of batches seen. If no batches have been processed, the method returns 0.

Returns:

Average number of reads processed per batch.

Example:

>>> stats = ReadfishStatistics(None)
>>> stats.average_batch_size
0
>>> stats.add_batch_performance(50, 20.0)
>>> stats.add_batch_performance(100, 20.0)
>>> stats.average_batch_size
75.0
get_batch_performance()[source]

Generate and return a formatted string representing batch performance.

If no batches have been processed, a placeholder message is returned.

Returns:

String summary of the current performance metrics.

Return type:

str

Examples:

When no batches have been processed:

>>> stats = ReadfishStatistics(None)
>>> stats.batch_statistics = {"batch_count": 0, "cumulative_batch_size": 0,             "cumulative_batch_time": 0, "batch_size": 0, "batch_time": 0}
>>> stats.get_batch_performance()
'No performance data yet'

When 100 chunks is processed in 10 seconds and it has been lagging for 6 consecutive batches:

>>> stats = ReadfishStatistics(None)
>>> stats.batch_statistics.update({"batch_count": 6, "cumulative_batch_size": 100,             "cumulative_batch_time": 10, "batch_size": 10, "batch_time": 10,            "cumulative_lagging_batches": 6, "consecutive_lagging_batches": 6})
>>> stats.get_batch_performance()
'0010R/10.0000s; Avg: 0016R/1.6667s; Seq:0; Unb:0; Pro:0; Slow batches (>1.00s): 6/6'

When three batches of total 300 chunks are processed in a total of 45 seconds:

>>> stats = ReadfishStatistics(None)
>>> stats.batch_statistics.update({"batch_count": 3, "cumulative_batch_size": 300,             "cumulative_batch_time": 45, "batch_size": 300, "batch_time": 45})
>>> stats.get_batch_performance()
'0300R/45.0000s; Avg: 0100R/15.0000s; Seq:0; Unb:0; Pro:0; Slow batches (>1.00s): 0/3'

When five batches of total 500 chunks are processed in a total of 120 seconds:

>>> stats = ReadfishStatistics(None)
>>> stats.batch_statistics.update({"batch_count": 5, "cumulative_batch_size": 500,             "cumulative_batch_time": 120,  "batch_size": 500, "batch_time": 120})
>>> stats.get_batch_performance()
'0500R/120.0000s; Avg: 0100R/24.0000s; Seq:0; Unb:0; Pro:0; Slow batches (>1.00s): 0/5'
add_batch_performance(number_of_reads, batch_time)[source]

Update the collected statistics with new batch performance data.

This method integrates a new set of chunk batch performance metrics into the class’s statistics, specifically updating the batch size, batch time, and batch count based on the provided number of reads and the time taken.

Parameters:
  • number_of_reads (int) – The number of reads processed in the current batch.

  • batch_time (float) – The time taken to process the current batch in seconds.

Example:

>>> stats = ReadfishStatistics(None)
>>> stats.add_batch_performance(100, 10.5)
>>> stats.batch_statistics
Counter({'cumulative_batch_size': 100, 'batch_size': 100, 'cumulative_batch_time': 10.5, 'batch_time': 10.5, 'batch_count': 1, 'cumulative_lagging_batches': 1, 'consecutive_lagging_batches': 1})
>>> stats.add_batch_performance(100, 10.5)
>>> stats.batch_statistics
Counter({'cumulative_batch_size': 200, 'batch_size': 100, 'cumulative_batch_time': 21.0, 'batch_time': 10.5, 'batch_count': 2, 'cumulative_lagging_batches': 2, 'consecutive_lagging_batches': 2})
log_read(region_name, overridden_action_name, **kwargs)[source]

Add a new read chunk record into the collected statistics, and log it to the debug logger.

The following terms are used in this function: decision is expected to be one of Unblock, stop_receiving etc. mode is expected to be one of single_on, single_off, multi_on etc.

The term “action” is used to describe what the sequencer actually did. #ToDo: see and address issue #298 :param region_name: The name of the region on the flow cell. :param overridden_action_name: Optional, if the originally determined action was overridden, the name of the NEW action.


readfish._cli_base module

Main entry point for command line read until scripts.

Set as entrypoint in pyproject.toml

readfish._cli_base.main(argv=None)[source]

Main function for entry point of the read until scripts.

Parameters:

argv (list[str] | None) – used in tests default None

Raises:

SystemExit – Raises a system exit when the command function exits.