Coverage for binette/diamond.py: 100%

50 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-14 14:36 +0000

1import logging 

2import re 

3import shutil 

4import subprocess 

5import sys 

6from collections import Counter 

7 

8import pandas as pd 

9from checkm2 import keggData 

10 

11logger = logging.getLogger(__name__) 

12 

13 

14def get_checkm2_db() -> str: 

15 """ 

16 Get the path to the CheckM2 database. 

17 

18 :return: The path to the CheckM2 database. 

19 """ 

20 if shutil.which("checkm2") is None: 

21 logger.error("Make sure checkm2 is on your system path") 

22 sys.exit(1) 

23 

24 checkm2_database_raw = subprocess.run( 

25 ["checkm2", "database", "--current"], text=True, stderr=subprocess.PIPE 

26 ) 

27 

28 if checkm2_database_raw.returncode != 0: 

29 logger.error( 

30 f"Something went wrong with checkm2:\n=======\n{checkm2_database_raw.stderr}========" 

31 ) 

32 sys.exit(1) 

33 

34 reg_result = re.search("INFO: (/.*.dmnd)", checkm2_database_raw.stderr) 

35 

36 if reg_result is None: 

37 logger.error( 

38 f"Something went wrong when retrieving checkm2 db path:\n{checkm2_database_raw.stderr}" 

39 ) 

40 sys.exit(1) 

41 else: 

42 db_path = reg_result.group(1) 

43 

44 return db_path 

45 

46 

47def check_tool_exists(tool_name: str): 

48 """ 

49 Check if a specified tool is on the system's PATH. 

50 

51 :param tool_name: The name of the tool to check for. 

52 :type tool_name: str 

53 :raises FileNotFoundError: If the tool is not found on the system's PATH. 

54 """ 

55 if shutil.which(tool_name) is None: 

56 raise FileNotFoundError( 

57 f"The '{tool_name}' tool is not found on your system PATH." 

58 ) 

59 

60 

61def run( 

62 faa_file: str, 

63 output: str, 

64 db: str, 

65 log: str, 

66 threads: int = 1, 

67 query_cover: int = 80, 

68 subject_cover: int = 80, 

69 percent_id: int = 30, 

70 evalue: float = 1e-05, 

71 low_mem: bool = False, 

72): 

73 """ 

74 Run Diamond with specified parameters. 

75 

76 :param faa_file: Path to the input protein sequence file (FASTA format). 

77 :param output: Path to the Diamond output file. 

78 :param db: Path to the Diamond database. 

79 :param log: Path to the log file. 

80 :param threads: Number of CPU threads to use (default is 1). 

81 :param query_cover: Minimum query coverage percentage (default is 80). 

82 :param subject_cover: Minimum subject coverage percentage (default is 80). 

83 :param percent_id: Minimum percent identity (default is 30). 

84 :param evalue: Maximum e-value threshold (default is 1e-05). 

85 :param low_mem: Use low memory mode if True (default is False). 

86 """ 

87 check_tool_exists("diamond") 

88 

89 blocksize = 0.5 if low_mem else 2 

90 

91 cmd = ( 

92 "diamond blastp --outfmt 6 --max-target-seqs 1 " 

93 f"--query {faa_file} " 

94 f"-o {output} " 

95 f"--threads {threads} " 

96 f"--db {db} " 

97 f"--compress 1 " 

98 f"--query-cover {query_cover} " 

99 f"--subject-cover {subject_cover} " 

100 f"--id {percent_id} " 

101 f"--evalue {evalue} --block-size {blocksize} 2> {log}" 

102 ) 

103 

104 logger.info("Running diamond") 

105 logger.info(f"Command: {cmd}") 

106 

107 run = subprocess.run(cmd, shell=True) 

108 

109 if run.returncode != 0: 

110 logger.error(f"An error occurred while running DIAMOND. Check log file '{log}'") 

111 sys.exit(1) 

112 

113 logger.info("Finished running DIAMOND") 

114 

115 

116def get_contig_to_kegg_id(diamond_result_file: str) -> dict: 

117 """ 

118 Get a dictionary mapping contig IDs to KEGG annotations from a Diamond result file. 

119 

120 :param diamond_result_file: Path to the Diamond result file. 

121 :return: A dictionary mapping contig IDs to KEGG annotations. 

122 """ 

123 diamond_results_df = pd.read_csv( 

124 diamond_result_file, sep="\t", usecols=[0, 1], names=["ProteinID", "annotation"] 

125 ) 

126 

127 if diamond_results_df.empty: 

128 logger.error( 

129 f"DIAMOND result file '{diamond_result_file}' is empty. " 

130 "This can happen with low-quality assemblies where DIAMOND produces no hits." 

131 ) 

132 sys.exit(3) 

133 

134 diamond_results_df[["Ref100_hit", "Kegg_annotation"]] = diamond_results_df[ 

135 "annotation" 

136 ].str.split("~", n=1, expand=True) 

137 

138 KeggCalc = keggData.KeggCalculator() 

139 defaultKOs = KeggCalc.return_default_values_from_category("KO_Genes") 

140 

141 diamond_results_df = diamond_results_df.loc[ 

142 diamond_results_df["Kegg_annotation"].isin(defaultKOs.keys()) 

143 ] 

144 diamond_results_df["contig"] = ( 

145 diamond_results_df["ProteinID"].str.split("_", n=-1).str[:-1].str.join("_") 

146 ) 

147 

148 contig_to_kegg_counter = ( 

149 diamond_results_df.groupby("contig") 

150 .agg({"Kegg_annotation": Counter}) 

151 .reset_index() 

152 ) 

153 

154 contig_to_kegg_counter = dict( 

155 zip( 

156 contig_to_kegg_counter["contig"], 

157 contig_to_kegg_counter["Kegg_annotation"], 

158 strict=False, 

159 ) 

160 ) 

161 

162 return contig_to_kegg_counter