Coverage for binette/diamond.py: 100%
50 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-14 14:36 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-14 14:36 +0000
1import logging
2import re
3import shutil
4import subprocess
5import sys
6from collections import Counter
8import pandas as pd
9from checkm2 import keggData
11logger = logging.getLogger(__name__)
14def get_checkm2_db() -> str:
15 """
16 Get the path to the CheckM2 database.
18 :return: The path to the CheckM2 database.
19 """
20 if shutil.which("checkm2") is None:
21 logger.error("Make sure checkm2 is on your system path")
22 sys.exit(1)
24 checkm2_database_raw = subprocess.run(
25 ["checkm2", "database", "--current"], text=True, stderr=subprocess.PIPE
26 )
28 if checkm2_database_raw.returncode != 0:
29 logger.error(
30 f"Something went wrong with checkm2:\n=======\n{checkm2_database_raw.stderr}========"
31 )
32 sys.exit(1)
34 reg_result = re.search("INFO: (/.*.dmnd)", checkm2_database_raw.stderr)
36 if reg_result is None:
37 logger.error(
38 f"Something went wrong when retrieving checkm2 db path:\n{checkm2_database_raw.stderr}"
39 )
40 sys.exit(1)
41 else:
42 db_path = reg_result.group(1)
44 return db_path
47def check_tool_exists(tool_name: str):
48 """
49 Check if a specified tool is on the system's PATH.
51 :param tool_name: The name of the tool to check for.
52 :type tool_name: str
53 :raises FileNotFoundError: If the tool is not found on the system's PATH.
54 """
55 if shutil.which(tool_name) is None:
56 raise FileNotFoundError(
57 f"The '{tool_name}' tool is not found on your system PATH."
58 )
61def run(
62 faa_file: str,
63 output: str,
64 db: str,
65 log: str,
66 threads: int = 1,
67 query_cover: int = 80,
68 subject_cover: int = 80,
69 percent_id: int = 30,
70 evalue: float = 1e-05,
71 low_mem: bool = False,
72):
73 """
74 Run Diamond with specified parameters.
76 :param faa_file: Path to the input protein sequence file (FASTA format).
77 :param output: Path to the Diamond output file.
78 :param db: Path to the Diamond database.
79 :param log: Path to the log file.
80 :param threads: Number of CPU threads to use (default is 1).
81 :param query_cover: Minimum query coverage percentage (default is 80).
82 :param subject_cover: Minimum subject coverage percentage (default is 80).
83 :param percent_id: Minimum percent identity (default is 30).
84 :param evalue: Maximum e-value threshold (default is 1e-05).
85 :param low_mem: Use low memory mode if True (default is False).
86 """
87 check_tool_exists("diamond")
89 blocksize = 0.5 if low_mem else 2
91 cmd = (
92 "diamond blastp --outfmt 6 --max-target-seqs 1 "
93 f"--query {faa_file} "
94 f"-o {output} "
95 f"--threads {threads} "
96 f"--db {db} "
97 f"--compress 1 "
98 f"--query-cover {query_cover} "
99 f"--subject-cover {subject_cover} "
100 f"--id {percent_id} "
101 f"--evalue {evalue} --block-size {blocksize} 2> {log}"
102 )
104 logger.info("Running diamond")
105 logger.info(f"Command: {cmd}")
107 run = subprocess.run(cmd, shell=True)
109 if run.returncode != 0:
110 logger.error(f"An error occurred while running DIAMOND. Check log file '{log}'")
111 sys.exit(1)
113 logger.info("Finished running DIAMOND")
116def get_contig_to_kegg_id(diamond_result_file: str) -> dict:
117 """
118 Get a dictionary mapping contig IDs to KEGG annotations from a Diamond result file.
120 :param diamond_result_file: Path to the Diamond result file.
121 :return: A dictionary mapping contig IDs to KEGG annotations.
122 """
123 diamond_results_df = pd.read_csv(
124 diamond_result_file, sep="\t", usecols=[0, 1], names=["ProteinID", "annotation"]
125 )
127 if diamond_results_df.empty:
128 logger.error(
129 f"DIAMOND result file '{diamond_result_file}' is empty. "
130 "This can happen with low-quality assemblies where DIAMOND produces no hits."
131 )
132 sys.exit(3)
134 diamond_results_df[["Ref100_hit", "Kegg_annotation"]] = diamond_results_df[
135 "annotation"
136 ].str.split("~", n=1, expand=True)
138 KeggCalc = keggData.KeggCalculator()
139 defaultKOs = KeggCalc.return_default_values_from_category("KO_Genes")
141 diamond_results_df = diamond_results_df.loc[
142 diamond_results_df["Kegg_annotation"].isin(defaultKOs.keys())
143 ]
144 diamond_results_df["contig"] = (
145 diamond_results_df["ProteinID"].str.split("_", n=-1).str[:-1].str.join("_")
146 )
148 contig_to_kegg_counter = (
149 diamond_results_df.groupby("contig")
150 .agg({"Kegg_annotation": Counter})
151 .reset_index()
152 )
154 contig_to_kegg_counter = dict(
155 zip(
156 contig_to_kegg_counter["contig"],
157 contig_to_kegg_counter["Kegg_annotation"],
158 strict=False,
159 )
160 )
162 return contig_to_kegg_counter