Coverage for binette/diamond.py: 100%

46 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-01-06 19:22 +0000

1import subprocess 

2import logging 

3import sys 

4import shutil 

5import re 

6import pandas as pd 

7from collections import Counter 

8 

9from checkm2 import keggData 

10 

11 

12def get_checkm2_db() -> str: 

13 """ 

14 Get the path to the CheckM2 database. 

15 

16 :return: The path to the CheckM2 database. 

17 """ 

18 if shutil.which("checkm2") is None: 

19 logging.error("Make sure checkm2 is on your system path.") 

20 sys.exit(1) 

21 

22 checkm2_database_raw = subprocess.run( 

23 ["checkm2", "database", "--current"], text=True, stderr=subprocess.PIPE 

24 ) 

25 

26 if checkm2_database_raw.returncode != 0: 

27 logging.error( 

28 f"Something went wrong with checkm2:\n=======\n{checkm2_database_raw.stderr}========" 

29 ) 

30 sys.exit(1) 

31 

32 reg_result = re.search("INFO: (/.*.dmnd)", checkm2_database_raw.stderr) 

33 

34 if reg_result is None: 

35 logging.error( 

36 f"Something went wrong when retrieving checkm2 db path:\n{checkm2_database_raw.stderr}" 

37 ) 

38 sys.exit(1) 

39 else: 

40 db_path = reg_result.group(1) 

41 

42 return db_path 

43 

44 

45def check_tool_exists(tool_name: str): 

46 """ 

47 Check if a specified tool is on the system's PATH. 

48 

49 :param tool_name: The name of the tool to check for. 

50 :type tool_name: str 

51 :raises FileNotFoundError: If the tool is not found on the system's PATH. 

52 """ 

53 if shutil.which(tool_name) is None: 

54 raise FileNotFoundError( 

55 f"The '{tool_name}' tool is not found on your system PATH." 

56 ) 

57 

58 

59def run( 

60 faa_file: str, 

61 output: str, 

62 db: str, 

63 log: str, 

64 threads: int = 1, 

65 query_cover: int = 80, 

66 subject_cover: int = 80, 

67 percent_id: int = 30, 

68 evalue: float = 1e-05, 

69 low_mem: bool = False, 

70): 

71 """ 

72 Run Diamond with specified parameters. 

73 

74 :param faa_file: Path to the input protein sequence file (FASTA format). 

75 :param output: Path to the Diamond output file. 

76 :param db: Path to the Diamond database. 

77 :param log: Path to the log file. 

78 :param threads: Number of CPU threads to use (default is 1). 

79 :param query_cover: Minimum query coverage percentage (default is 80). 

80 :param subject_cover: Minimum subject coverage percentage (default is 80). 

81 :param percent_id: Minimum percent identity (default is 30). 

82 :param evalue: Maximum e-value threshold (default is 1e-05). 

83 :param low_mem: Use low memory mode if True (default is False). 

84 """ 

85 check_tool_exists("diamond") 

86 

87 blocksize = 0.5 if low_mem else 2 

88 

89 cmd = ( 

90 "diamond blastp --outfmt 6 --max-target-seqs 1 " 

91 f"--query {faa_file} " 

92 f"-o {output} " 

93 f"--threads {threads} " 

94 f"--db {db} " 

95 f"--compress 1 " 

96 f"--query-cover {query_cover} " 

97 f"--subject-cover {subject_cover} " 

98 f"--id {percent_id} " 

99 f"--evalue {evalue} --block-size {blocksize} 2> {log}" 

100 ) 

101 

102 logging.info("Running diamond") 

103 logging.info(cmd) 

104 

105 run = subprocess.run(cmd, shell=True) 

106 

107 if run.returncode != 0: 

108 logging.error(f"An error occurred while running DIAMOND. Check log file: {log}") 

109 sys.exit(1) 

110 

111 logging.info("Finished Running DIAMOND") 

112 

113 

114def get_contig_to_kegg_id(diamond_result_file: str) -> dict: 

115 """ 

116 Get a dictionary mapping contig IDs to KEGG annotations from a Diamond result file. 

117 

118 :param diamond_result_file: Path to the Diamond result file. 

119 :return: A dictionary mapping contig IDs to KEGG annotations. 

120 """ 

121 diamon_results_df = pd.read_csv( 

122 diamond_result_file, sep="\t", usecols=[0, 1], names=["ProteinID", "annotation"] 

123 ) 

124 diamon_results_df[["Ref100_hit", "Kegg_annotation"]] = diamon_results_df[ 

125 "annotation" 

126 ].str.split("~", n=1, expand=True) 

127 

128 KeggCalc = keggData.KeggCalculator() 

129 defaultKOs = KeggCalc.return_default_values_from_category("KO_Genes") 

130 

131 diamon_results_df = diamon_results_df.loc[ 

132 diamon_results_df["Kegg_annotation"].isin(defaultKOs.keys()) 

133 ] 

134 diamon_results_df["contig"] = ( 

135 diamon_results_df["ProteinID"].str.split("_", n=-1).str[:-1].str.join("_") 

136 ) 

137 

138 contig_to_kegg_counter = ( 

139 diamon_results_df.groupby("contig") 

140 .agg({"Kegg_annotation": Counter}) 

141 .reset_index() 

142 ) 

143 

144 contig_to_kegg_counter = dict( 

145 zip(contig_to_kegg_counter["contig"], contig_to_kegg_counter["Kegg_annotation"]) 

146 ) 

147 

148 return contig_to_kegg_counter