Coverage for binette/diamond.py: 100%
46 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-01-06 19:22 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-01-06 19:22 +0000
1import subprocess
2import logging
3import sys
4import shutil
5import re
6import pandas as pd
7from collections import Counter
9from checkm2 import keggData
12def get_checkm2_db() -> str:
13 """
14 Get the path to the CheckM2 database.
16 :return: The path to the CheckM2 database.
17 """
18 if shutil.which("checkm2") is None:
19 logging.error("Make sure checkm2 is on your system path.")
20 sys.exit(1)
22 checkm2_database_raw = subprocess.run(
23 ["checkm2", "database", "--current"], text=True, stderr=subprocess.PIPE
24 )
26 if checkm2_database_raw.returncode != 0:
27 logging.error(
28 f"Something went wrong with checkm2:\n=======\n{checkm2_database_raw.stderr}========"
29 )
30 sys.exit(1)
32 reg_result = re.search("INFO: (/.*.dmnd)", checkm2_database_raw.stderr)
34 if reg_result is None:
35 logging.error(
36 f"Something went wrong when retrieving checkm2 db path:\n{checkm2_database_raw.stderr}"
37 )
38 sys.exit(1)
39 else:
40 db_path = reg_result.group(1)
42 return db_path
45def check_tool_exists(tool_name: str):
46 """
47 Check if a specified tool is on the system's PATH.
49 :param tool_name: The name of the tool to check for.
50 :type tool_name: str
51 :raises FileNotFoundError: If the tool is not found on the system's PATH.
52 """
53 if shutil.which(tool_name) is None:
54 raise FileNotFoundError(
55 f"The '{tool_name}' tool is not found on your system PATH."
56 )
59def run(
60 faa_file: str,
61 output: str,
62 db: str,
63 log: str,
64 threads: int = 1,
65 query_cover: int = 80,
66 subject_cover: int = 80,
67 percent_id: int = 30,
68 evalue: float = 1e-05,
69 low_mem: bool = False,
70):
71 """
72 Run Diamond with specified parameters.
74 :param faa_file: Path to the input protein sequence file (FASTA format).
75 :param output: Path to the Diamond output file.
76 :param db: Path to the Diamond database.
77 :param log: Path to the log file.
78 :param threads: Number of CPU threads to use (default is 1).
79 :param query_cover: Minimum query coverage percentage (default is 80).
80 :param subject_cover: Minimum subject coverage percentage (default is 80).
81 :param percent_id: Minimum percent identity (default is 30).
82 :param evalue: Maximum e-value threshold (default is 1e-05).
83 :param low_mem: Use low memory mode if True (default is False).
84 """
85 check_tool_exists("diamond")
87 blocksize = 0.5 if low_mem else 2
89 cmd = (
90 "diamond blastp --outfmt 6 --max-target-seqs 1 "
91 f"--query {faa_file} "
92 f"-o {output} "
93 f"--threads {threads} "
94 f"--db {db} "
95 f"--compress 1 "
96 f"--query-cover {query_cover} "
97 f"--subject-cover {subject_cover} "
98 f"--id {percent_id} "
99 f"--evalue {evalue} --block-size {blocksize} 2> {log}"
100 )
102 logging.info("Running diamond")
103 logging.info(cmd)
105 run = subprocess.run(cmd, shell=True)
107 if run.returncode != 0:
108 logging.error(f"An error occurred while running DIAMOND. Check log file: {log}")
109 sys.exit(1)
111 logging.info("Finished Running DIAMOND")
114def get_contig_to_kegg_id(diamond_result_file: str) -> dict:
115 """
116 Get a dictionary mapping contig IDs to KEGG annotations from a Diamond result file.
118 :param diamond_result_file: Path to the Diamond result file.
119 :return: A dictionary mapping contig IDs to KEGG annotations.
120 """
121 diamon_results_df = pd.read_csv(
122 diamond_result_file, sep="\t", usecols=[0, 1], names=["ProteinID", "annotation"]
123 )
124 diamon_results_df[["Ref100_hit", "Kegg_annotation"]] = diamon_results_df[
125 "annotation"
126 ].str.split("~", n=1, expand=True)
128 KeggCalc = keggData.KeggCalculator()
129 defaultKOs = KeggCalc.return_default_values_from_category("KO_Genes")
131 diamon_results_df = diamon_results_df.loc[
132 diamon_results_df["Kegg_annotation"].isin(defaultKOs.keys())
133 ]
134 diamon_results_df["contig"] = (
135 diamon_results_df["ProteinID"].str.split("_", n=-1).str[:-1].str.join("_")
136 )
138 contig_to_kegg_counter = (
139 diamon_results_df.groupby("contig")
140 .agg({"Kegg_annotation": Counter})
141 .reset_index()
142 )
144 contig_to_kegg_counter = dict(
145 zip(contig_to_kegg_counter["contig"], contig_to_kegg_counter["Kegg_annotation"])
146 )
148 return contig_to_kegg_counter