"""Guppy plugin module
Extension of pyguppy Caller that maintains a connection to the basecaller
"""
from __future__ import annotations
import logging
import os
import time
from collections import namedtuple
from pathlib import Path
from typing import Iterable, TYPE_CHECKING
from packaging.version import parse as parse_version
import numpy as np
import numpy.typing as npt
from minknow_api.protocol_pb2 import ProtocolRunInfo
try:
from pyguppy_client_lib.helper_functions import package_read
from pyguppy_client_lib.pyclient import PyGuppyClient
except ImportError:
pass
from readfish._loggers import setup_logger
from readfish.plugins.abc import CallerABC
from readfish.plugins.utils import Result
from readfish._utils import nice_join
if TYPE_CHECKING:
import minknow_api
__all__ = ["Caller"]
logger = logging.getLogger("RU_basecaller")
CALIBRATION = namedtuple("calibration", "scaling offset")
class DefaultDAQValues:
"""Provides default calibration values
Mimics the read_until_api calibration dict value from
https://github.com/nanoporetech/read_until_api/blob/2319bbe/read_until/base.py#L34
all keys return scaling=1.0 and offset=0.0
"""
calibration = CALIBRATION(1.0, 0.0)
def __getitem__(self, _):
return self.calibration
_DefaultDAQValues = DefaultDAQValues()
[docs]
class Caller(CallerABC):
def __init__(
self, run_information: ProtocolRunInfo = None, debug_log=None, **kwargs
):
self.logger = setup_logger("readfish_guppy_logger", log_file=debug_log)
self.supported_barcode_kits = None
self.supported_basecall_models = None
self.run_information = run_information
logging.warn(
"Deprecation warning - As ONT has moved fully to dorado, this plugin is no longer maintained, and will be deprecated in a future release of readfish."
)
if self.run_information:
self.guppy_version = (
self.run_information.software_versions.guppy_connected_version
)
if parse_version(self.guppy_version) < parse_version("7.3.9"):
logging.info(f"Connected to caller version {self.guppy_version}.")
else:
logging.info(
f"Trying to connect to minKNOW with caller version {self.guppy_version}. This plugin requires a version of Dorado or Guppy < 7.3.9. If this is stopping readfish from running try changing [caller_settings.guppy] to [caller_settings.dorado]. You should also check for any updates available to readfish."
)
# Set our own priority
self.guppy_params = kwargs
# Remove the sample rate from the guppy params as it isn't required for the PyGuppyClient
self.guppy_params.pop("sample_rate", None)
self.guppy_params["priority"] = PyGuppyClient.high_priority
# Set our own client name to appear in the guppy server logs
self.guppy_params["client_name"] = "Readfish_connection"
self.validate()
self.caller = PyGuppyClient(**self.guppy_params)
self.caller.connect()
[docs]
def validate(self) -> None:
"""Validate the parameters passed to Guppy to ensure they will initialise PyGuppy Client correctly
Currently checks:
1. That the socket file exists
2. That the Socket file has the correct permissions
3. That the version of py guppy client lib installed matches the system version
:return: None, if the parameters pass all the checks
"""
for key in ("address", "config"):
if key not in self.guppy_params:
raise KeyError(
f"Required `caller_settings.guppy` {key} was not found in provided TOML. Please add."
)
if self.guppy_params["address"].startswith("ipc://"):
# User is attempting to connect to an IPC socket
socket_path = Path(self.guppy_params["address"][6:])
if not socket_path.exists():
raise FileNotFoundError(
f"The provided guppy base-caller socket address doesn't appear to exist. Please check your Guppy Settings. {self.guppy_params['address']}"
)
# check user permissions:
if not os.access(socket_path, os.R_OK):
raise RuntimeError(
f"The user account running readfish doesn't appear to have permissions to read the guppy base-caller socket. Please check permissions on {self.guppy_params['address']}. See https://github.com/LooseLab/readfish/issues/221#issuecomment-1375673490 for more information."
)
if not os.access(socket_path, os.W_OK):
raise RuntimeError(
f"The user account running readfish doesn't appear to have permissions to write to the guppy base-caller socket. Please check permissions on {self.guppy_params['address']}. See https://github.com/LooseLab/readfish/issues/221#issuecomment-1375673490 for more information."
)
# If we are connected to a live run, test if the base-caller model is acceptable.
# Connected to a live run via the minknow_api - get supported basecall and barcoding kits from the run info.
# Check them against provided values
if self.run_information is not None:
tags = self.run_information.meta_info.tags
self.supported_basecall_models = tags[
"available basecall models"
].array_value
# Make a CSV str a list of strings, removing quotes and square brackets
if self.supported_basecall_models and isinstance(
self.supported_basecall_models, str
):
self.supported_basecall_models = (
tags["available basecall models"]
.array_value[1:-1]
.replace('"', "")
.split(",")
)
# Faff on with sorting out available barcoding kits
# See https://github.com/nanoporetech/minknow_api/blob/829dbe8ac8e49efdf268d385b50440c52473188b/python/minknow_api/tools/protocols.py#L97C1-L97C7
self.supported_barcode_kits = tags["barcoding kits"].array_value
# workaround for the set of barcoding kits being returned as a string rather
# that array of strings
if self.supported_barcode_kits and isinstance(
self.supported_barcode_kits, str
):
self.supported_barcode_kits = (
tags["barcoding kits"].array_value[1:-1].replace('"', "").split(",")
)
if tags["barcoding"].bool_value:
self.supported_barcode_kits.append(tags["kit"].string_value)
# If we are connected to a live run, and have suitable base calling models check the base-caller model is suitable for the flowcell and kit
if (
self.supported_basecall_models
and f"{self.guppy_params['config'].replace('.cfg', '')}.cfg"
not in self.supported_basecall_models
):
raise RuntimeError(
"""The {} base-calling config listed in the readfish config TOML is not suitable for this flowcell and kit combination.
Please check the guppy_config value in the caller_settings.guppy section of your TOML file.
The following models are are given by ONT as suitable for this flow cell/kit combo:\n\t{}""".format(
self.guppy_params["config"],
nice_join(
self.supported_basecall_models,
sep="\n\t",
conjunction="and",
),
)
)
# If we are barcoding and have connected to a live run - try checking the listed barcode kit works with the flowcell and kit
if (
barcoding_kits := self.guppy_params.get("barcode_kits", None)
) is not None:
barcoding_kits = barcoding_kits.split()
if barcoding_kits and not set(barcoding_kits).issubset(
self.supported_barcode_kits
):
raise RuntimeError(
"Barcoding kits specified in TOML {} not amongst those supported by the selected kit and protocol.\nSupported kits are:\n\t{}".format(
nice_join(barcoding_kits, conjunction="and"),
nice_join(
self.supported_barcode_kits, sep="\n\t", conjunction="and"
),
),
)
return None
[docs]
def disconnect(self) -> None:
"""Call the disconnect method on the PyGuppyClient"""
return self.caller.disconnect()
[docs]
def basecall(
self,
reads: Iterable[tuple[int, minknow_api.data_pb2.GetLiveReadsResponse.ReadData]],
signal_dtype: npt.DTypeLike,
daq_values: dict[int, namedtuple] = None,
):
"""Basecall live data from minknow RPC
:param reads: List or generator of tuples containing (channel, MinKNOW.rpc.Read)
:param signal_dtype: Numpy dtype of the raw data
:param daq_values: Dictionary mapping channel to offset and scaling values.
If not provided default values of 1.0 and 0.0 are used.
:yield:
:rtype: readfish.plugins.utils.Result
"""
# FIXME: Occasionally guppy can report a read as not sent when it is
# successfully sent. Therefore we capture not sent reads
cache, skipped = {}, {}
reads_received, reads_sent = 0, 0
daq_values = _DefaultDAQValues if daq_values is None else daq_values
for channel, read in reads:
# Attach the "RF-" prefix
read_id = f"RF-{read.id}"
t0 = time.time()
cache[read_id] = (channel, read.number, t0)
success = self.caller.pass_read(
package_read(
read_id=read_id,
raw_data=np.frombuffer(read.raw_data, signal_dtype),
daq_offset=daq_values[channel].offset,
daq_scaling=daq_values[channel].scaling,
)
)
if not success:
logging.warning(f"Could not send read {read_id!r} to Guppy")
# FIXME: This is resolved in later versions of guppy.
skipped[read_id] = cache.pop(read_id)
continue
else:
reads_sent += 1
sleep_time = self.caller.throttle - t0
if sleep_time > 0:
time.sleep(sleep_time)
while reads_received < reads_sent:
results = self.caller.get_completed_reads()
# TODO: incorporate time_received into logging?
# time_received = time.time()
if not results:
time.sleep(self.caller.throttle)
continue
for res_batch in results:
for res in res_batch:
read_id = res["metadata"]["read_id"]
try:
channel, read_number, time_sent = cache.pop(read_id)
except KeyError:
# FIXME: This is resolved in later versions of guppy.
channel, read_number, time_sent = skipped.pop(read_id)
reads_sent += 1
res["metadata"]["read_id"] = read_id[3:]
self.logger.debug(
"@%s ch=%s\n%s\n+\n%s",
res["metadata"]["read_id"],
channel,
res["datasets"]["sequence"],
res["datasets"]["qstring"],
)
barcode = res["metadata"].get("barcode_arrangement", None)
# TODO: Add Filter here
yield Result(
channel=channel,
read_number=read_number,
read_id=res["metadata"]["read_id"],
seq=res["datasets"]["sequence"],
barcode=barcode if barcode else None,
basecall_data=res,
)
reads_received += 1
[docs]
def describe(self) -> str:
"""
Describe the guppy Caller
:return: Description of parameters passed to this guppy Caller plugin
"""
description = ["Utilising the Guppy base-caller plugin:"]
for param in self.guppy_params.keys():
description.append(f"\t- {param}: {self.guppy_params[param]}")
return "\n".join(description)