Coverage for binette/io_manager.py: 99%

101 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-01-06 19:22 +0000

1from collections import defaultdict 

2import logging 

3import pyfastx 

4from typing import Iterable, List, Dict, Tuple, Set 

5import csv 

6 

7from binette.bin_manager import Bin 

8 

9from pathlib import Path 

10 

11 

12def get_paths_common_prefix_suffix( 

13 paths: List[Path], 

14) -> Tuple[List[str], List[str], List[str]]: 

15 """ 

16 Determine the common prefix parts, suffix parts, and common extensions of the last part of a list of pathlib.Path objects. 

17 

18 :param paths: List of pathlib.Path objects. 

19 :return: A tuple containing three lists: 

20 - The common prefix parts. 

21 - The common suffix parts. 

22 - The common extensions of the last part of the paths. 

23 """ 

24 # Extract parts for all paths 

25 parts = [list(path.parts) for path in paths] 

26 

27 # Find the common prefix 

28 if not parts: 

29 return [], [], [] 

30 

31 # Initialize common prefix and suffix lists 

32 common_prefix = list(parts[0]) 

33 common_suffix = list(parts[0]) 

34 # Determine common prefix 

35 for part_tuple in parts[1:]: 

36 common_prefix_length = min(len(common_prefix), len(part_tuple)) 

37 common_prefix = [ 

38 common_prefix[i] 

39 for i in range(common_prefix_length) 

40 if common_prefix[: i + 1] == part_tuple[: i + 1] 

41 ] 

42 if not common_prefix: 

43 break 

44 

45 # Determine common suffix 

46 for part_tuple in parts[1:]: 

47 common_suffix_length = min(len(common_suffix), len(part_tuple)) 

48 common_suffix = [ 

49 common_suffix[-i] 

50 for i in range(1, common_suffix_length + 1) 

51 if common_suffix[-i:] == part_tuple[-i:] 

52 ] 

53 if not common_suffix: 

54 break 

55 if len(parts) > 1: 

56 common_suffix.reverse() 

57 

58 # Determine common extensions of the last part of the paths 

59 if len(paths) == 1: 

60 common_extensions = paths[0].suffixes 

61 else: 

62 common_extensions = list(paths[0].suffixes) 

63 for path in paths[1:]: 

64 common_extension_length = min(len(common_extensions), len(path.suffixes)) 

65 common_extensions = [ 

66 common_extensions[i] 

67 for i in range(common_extension_length) 

68 if common_extensions[i] == path.suffixes[i] 

69 ] 

70 if not common_extensions: 

71 break 

72 

73 return common_prefix, common_suffix, common_extensions 

74 

75 

76def infer_bin_set_names_from_input_paths(input_bins: List[Path]) -> Dict[str, Path]: 

77 """ 

78 Infer bin set names from a list of bin input directories or files. 

79 

80 :param input_bins: List of input bin directories or files. 

81 :return: Dictionary mapping inferred bin names to their corresponding directories or files. 

82 """ 

83 bin_name_to_bin_dir = {} 

84 

85 common_prefix, common_suffix, common_extensions = get_paths_common_prefix_suffix( 

86 input_bins 

87 ) 

88 

89 for path in input_bins: 

90 

91 specific_parts = path.parts[ 

92 len(common_prefix) : len(path.parts) - len(common_suffix) 

93 ] 

94 

95 if not common_suffix and common_extensions: 

96 last_specific_part = specific_parts[-1].split(".")[ 

97 : -len(common_extensions) 

98 ] 

99 specific_parts = list(specific_parts[:-1]) + last_specific_part 

100 

101 bin_set_name = "/".join(specific_parts) 

102 if bin_set_name == "": 

103 bin_set_name = path.as_posix() 

104 

105 bin_name_to_bin_dir[bin_set_name] = path 

106 

107 logging.debug(f"Input bins: {' '.join([path.as_posix() for path in input_bins])}") 

108 logging.debug(f"Common prefix to remove: {common_prefix}") 

109 logging.debug(f"Common suffix to remove: {common_suffix}") 

110 logging.debug(f"Common extension to remove: {common_suffix}") 

111 logging.debug(f"bin_name_to_bin_dir: {bin_name_to_bin_dir}") 

112 

113 return bin_name_to_bin_dir 

114 

115 

116def write_bin_info(bins: Iterable[Bin], output: Path, add_contigs: bool = False): 

117 """ 

118 Write bin information to a TSV file. 

119 

120 :param bins: List of Bin objects. 

121 :param output: Output file path for writing the TSV. 

122 :param add_contigs: Flag indicating whether to include contig information. 

123 """ 

124 

125 header = [ 

126 "bin_id", 

127 "origin", 

128 "name", 

129 "completeness", 

130 "contamination", 

131 "score", 

132 "size", 

133 "N50", 

134 "contig_count", 

135 ] 

136 if add_contigs: 

137 header.append("contigs") 

138 

139 bin_infos = [] 

140 for bin_obj in sorted(bins, key=lambda x: (x.score, x.N50, -x.id), reverse=True): 

141 bin_info = [ 

142 bin_obj.id, 

143 ";".join(bin_obj.origin), 

144 bin_obj.name, 

145 bin_obj.completeness, 

146 bin_obj.contamination, 

147 bin_obj.score, 

148 bin_obj.length, 

149 bin_obj.N50, 

150 len(bin_obj.contigs), 

151 ] 

152 if add_contigs: 

153 bin_info.append( 

154 ";".join(str(c) for c in bin_obj.contigs) if add_contigs else "" 

155 ) 

156 

157 bin_infos.append(bin_info) 

158 

159 with open(output, "w", newline="") as fl: 

160 writer = csv.writer(fl, delimiter="\t") 

161 writer.writerow(header) 

162 writer.writerows(bin_infos) 

163 

164 

165def write_bins_fasta(selected_bins: List[Bin], contigs_fasta: Path, outdir: Path): 

166 """ 

167 Write selected bins' contigs to separate FASTA files. 

168 

169 :param selected_bins: List of Bin objects representing the selected bins. 

170 :param contigs_fasta: Path to the input FASTA file containing contig sequences. 

171 :param outdir: Output directory to save the individual bin FASTA files. 

172 """ 

173 

174 fa = pyfastx.Fasta(contigs_fasta.as_posix(), build_index=True) 

175 

176 for sbin in selected_bins: 

177 outfile = outdir / f"bin_{sbin.id}.fa" 

178 

179 with open(outfile, "w") as outfl: 

180 sequences = (f">{c}\n{fa[c]}" for c in sbin.contigs) 

181 outfl.write("\n".join(sequences) + "\n") 

182 

183 

184def check_contig_consistency( 

185 contigs_from_assembly: Iterable[str], 

186 contigs_from_elsewhere: Iterable[str], 

187 assembly_file: str, 

188 elsewhere_file: str, 

189): 

190 """ 

191 Check the consistency of contig names between different sources. 

192 

193 :param contigs_from_assembly: List of contig names from the assembly file. 

194 :param contigs_from_elsewhere: List of contig names from an external source. 

195 :param assembly_file: Path to the assembly file. 

196 :param elsewhere_file: Path to the file from an external source. 

197 :raises AssertionError: If inconsistencies in contig names are found. 

198 """ 

199 logging.debug("check_contig_consistency.") 

200 are_contigs_consistent = len( 

201 set(contigs_from_elsewhere) | set(contigs_from_assembly) 

202 ) <= len(set(contigs_from_assembly)) 

203 

204 issue_countigs = len(set(contigs_from_elsewhere) - set(contigs_from_assembly)) 

205 

206 message = ( 

207 f"{issue_countigs} contigs found in file '{elsewhere_file}' " 

208 f"were not found in assembly_file '{assembly_file}'" 

209 ) 

210 assert are_contigs_consistent, message 

211 

212 

213def check_resume_file(faa_file: Path, diamond_result_file: Path) -> None: 

214 """ 

215 Check the existence of files required for resuming the process. 

216 

217 :param faa_file: Path to the protein file. 

218 :param diamond_result_file: Path to the Diamond result file. 

219 :raises FileNotFoundError: If the required files don't exist for resuming. 

220 """ 

221 

222 if faa_file.exists() and diamond_result_file.exists(): 

223 return 

224 

225 if not faa_file.exists(): 

226 error_msg = ( 

227 f"Protein file '{faa_file}' does not exist. Resuming is not possible." 

228 ) 

229 logging.error(error_msg) 

230 raise FileNotFoundError(error_msg) 

231 

232 if not diamond_result_file.exists(): 

233 error_msg = f"Diamond result file '{diamond_result_file}' does not exist. Resuming is not possible." 

234 logging.error(error_msg) 

235 raise FileNotFoundError(error_msg) 

236 

237 

238def write_original_bin_metrics(original_bins: Set[Bin], original_bin_report_dir: Path): 

239 """ 

240 Write metrics of original input bins to a specified directory. 

241 

242 This function writes the metrics for each bin set to a TSV file in the specified directory. 

243 Each bin set will have its own TSV file named according to its set name. 

244 

245 :param original_bins: A set containing input bins 

246 :param original_bin_report_dir: The directory path (Path) where the bin metrics will be saved. 

247 """ 

248 

249 original_bin_report_dir.mkdir(parents=True, exist_ok=True) 

250 

251 bin_set_name_to_bins = defaultdict(set) 

252 for bin_obj in original_bins: 

253 for origin in bin_obj.origin: 

254 bin_set_name_to_bins[origin].add(bin_obj) 

255 

256 for i, (set_name, bins) in enumerate(sorted(bin_set_name_to_bins.items())): 

257 bins_metric_file = ( 

258 original_bin_report_dir 

259 / f"input_bins_{i + 1}.{set_name.replace('/', '_')}.tsv" 

260 ) 

261 

262 logging.debug( 

263 f"Writing metrics for bin set '{set_name}' to file: {bins_metric_file}" 

264 ) 

265 write_bin_info(bins, bins_metric_file) 

266 

267 logging.debug("Completed writing all original input bin metrics.")