Coverage for binette/io_manager.py: 99%

137 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-14 14:36 +0000

1import logging 

2from collections import defaultdict 

3from collections.abc import Iterable 

4from pathlib import Path 

5 

6import pandas as pd 

7import pyfastx 

8 

9from binette.bin_manager import Bin 

10 

11logger = logging.getLogger(__name__) 

12 

13 

14def get_paths_common_prefix_suffix( 

15 paths: list[Path], 

16) -> tuple[list[str], list[str], list[str]]: 

17 """ 

18 Determine the common prefix parts, suffix parts, and common extensions of the last part of a list of pathlib.Path objects. 

19 

20 :param paths: List of pathlib.Path objects. 

21 :return: A tuple containing three lists: 

22 - The common prefix parts. 

23 - The common suffix parts. 

24 - The common extensions of the last part of the paths. 

25 """ 

26 # Extract parts for all paths 

27 parts = [list(path.parts) for path in paths] 

28 

29 # Find the common prefix 

30 if not parts: 

31 return [], [], [] 

32 

33 # Initialize common prefix and suffix lists 

34 common_prefix = list(parts[0]) 

35 common_suffix = list(parts[0]) 

36 # Determine common prefix 

37 for part_tuple in parts[1:]: 

38 common_prefix_length = min(len(common_prefix), len(part_tuple)) 

39 common_prefix = [ 

40 common_prefix[i] 

41 for i in range(common_prefix_length) 

42 if common_prefix[: i + 1] == part_tuple[: i + 1] 

43 ] 

44 if not common_prefix: 

45 break 

46 

47 # Determine common suffix 

48 for part_tuple in parts[1:]: 

49 common_suffix_length = min(len(common_suffix), len(part_tuple)) 

50 common_suffix = [ 

51 common_suffix[-i] 

52 for i in range(1, common_suffix_length + 1) 

53 if common_suffix[-i:] == part_tuple[-i:] 

54 ] 

55 if not common_suffix: 

56 break 

57 if len(parts) > 1: 

58 common_suffix.reverse() 

59 

60 # Determine common extensions of the last part of the paths 

61 if len(paths) == 1: 

62 common_extensions = paths[0].suffixes 

63 else: 

64 common_extensions = list(paths[0].suffixes) 

65 for path in paths[1:]: 

66 common_extension_length = min(len(common_extensions), len(path.suffixes)) 

67 common_extensions = [ 

68 common_extensions[i] 

69 for i in range(common_extension_length) 

70 if common_extensions[i] == path.suffixes[i] 

71 ] 

72 if not common_extensions: 

73 break 

74 

75 return common_prefix, common_suffix, common_extensions 

76 

77 

78def infer_bin_set_names_from_input_paths(input_bins: list[Path]) -> dict[str, Path]: 

79 """ 

80 Infer bin set names from a list of bin input directories or files. 

81 

82 :param input_bins: List of input bin directories or files. 

83 :return: Dictionary mapping inferred bin names to their corresponding directories or files. 

84 """ 

85 bin_name_to_bin_dir = {} 

86 

87 common_prefix, common_suffix, common_extensions = get_paths_common_prefix_suffix( 

88 input_bins 

89 ) 

90 

91 for path in input_bins: 

92 specific_parts = path.parts[ 

93 len(common_prefix) : len(path.parts) - len(common_suffix) 

94 ] 

95 

96 if not common_suffix and common_extensions: 

97 last_specific_part = specific_parts[-1].split(".")[ 

98 : -len(common_extensions) 

99 ] 

100 specific_parts = list(specific_parts[:-1]) + last_specific_part 

101 

102 bin_set_name = "/".join(specific_parts) 

103 if bin_set_name == "": 

104 bin_set_name = path.as_posix() 

105 

106 bin_name_to_bin_dir[bin_set_name] = path 

107 

108 logger.debug(f"Input bins: {' '.join([path.as_posix() for path in input_bins])}") 

109 logger.debug(f"Common prefix to remove: {common_prefix}") 

110 logger.debug(f"Common suffix to remove: {common_suffix}") 

111 logger.debug(f"Common extension to remove: {common_suffix}") 

112 logger.debug(f"bin_name_to_bin_dir: {bin_name_to_bin_dir}") 

113 

114 return bin_name_to_bin_dir 

115 

116 

117def write_bin_info(bins: Iterable[Bin], output: Path, add_contigs: bool = False): 

118 """ 

119 Write bin information to a TSV file. 

120 

121 :param bins: List of Bin objects. 

122 :param output: Output file path for writing the TSV. 

123 :param add_contigs: Flag indicating whether to include contig information. 

124 """ 

125 

126 # Define columns for the DataFrame 

127 columns = [ 

128 "name", 

129 "origin", 

130 "is_original", 

131 "original_name", 

132 "completeness", 

133 "contamination", 

134 "score", 

135 "checkm2_model", 

136 "size", 

137 "N50", 

138 "coding_density", 

139 "contig_count", 

140 ] 

141 if add_contigs: 

142 columns.append("contigs") 

143 

144 # Create a list of dictionaries to build the DataFrame 

145 data = [] 

146 for bin_obj in sorted( 

147 bins, key=lambda x: (-x.score, -x.N50, -x.is_original, x.contigs_key) 

148 ): 

149 original_name = bin_obj.original_name if bin_obj.original_name else bin_obj.name 

150 origins = bin_obj.origin if bin_obj.is_original else {"binette"} 

151 

152 bin_info = { 

153 "name": bin_obj.name, 

154 "origin": ";".join(origins), 

155 "is_original": bin_obj.is_original, 

156 "original_name": original_name, 

157 "completeness": bin_obj.completeness, 

158 "contamination": bin_obj.contamination, 

159 "score": round(bin_obj.score, 2), 

160 "checkm2_model": bin_obj.checkm2_model, 

161 "size": bin_obj.length, 

162 "N50": bin_obj.N50, 

163 "coding_density": round(bin_obj.coding_density, 4) 

164 if bin_obj.coding_density is not None 

165 else None, 

166 "contig_count": len(bin_obj.contigs), 

167 } 

168 

169 if add_contigs: 

170 bin_info["contigs"] = ";".join(str(c) for c in bin_obj.contigs) 

171 

172 data.append(bin_info) 

173 

174 # Create pandas DataFrame and write to TSV 

175 df = pd.DataFrame(data, columns=columns) 

176 df.to_csv(output, sep="\t", index=False) 

177 

178 

179def write_bins_fasta( 

180 selected_bins: list[Bin], 

181 contigs_fasta: Path, 

182 outdir: Path, 

183 contigs_names: list[str], 

184 max_buffer_size: int = 50_000_000, 

185): 

186 """ 

187 Write selected bins' contigs to separate FASTA files using pyfastx.Fastx (no index). 

188 Buffer entries by total character size, not just number of sequences. 

189 

190 :param selected_bins: List of Bin objects with .id and .contigs. 

191 :param contigs_fasta: Path to the input FASTA file. 

192 :param outdir: Directory to save bin FASTA files. 

193 :param max_buffer_size: Maximum total character size to buffer before flushing. 

194 """ 

195 outdir.mkdir(parents=True, exist_ok=True) 

196 

197 # Clear existing files for selected bins 

198 for sbin in selected_bins: 

199 out_path = outdir / f"{sbin.name}.fa" 

200 if out_path.exists(): 

201 out_path.unlink() # remove the file 

202 

203 # Map contig name to bin IDs 

204 contig_to_bins = {} 

205 for sbin in selected_bins: 

206 for contig_id in sbin.contigs: 

207 contig_name = contigs_names[contig_id] 

208 contig_to_bins[contig_name] = sbin.name 

209 

210 assert len(contig_to_bins) == sum(len(sbin.contigs) for sbin in selected_bins), ( 

211 "Some contigs are present in multiple bins but should be unique." 

212 ) 

213 

214 buffer = defaultdict(list) 

215 buffer_size = 0 

216 

217 def flush_buffer(): 

218 nonlocal buffer_size 

219 for bin_name, seqs in buffer.items(): 

220 if seqs: 

221 with open(outdir / f"{bin_name}.fa", "a") as f: 

222 f.writelines(seqs) 

223 buffer.clear() 

224 buffer_size = 0 

225 

226 for name, seq in pyfastx.Fastx(contigs_fasta.as_posix()): 

227 bin_name = contig_to_bins.get(name) 

228 if not bin_name: 

229 continue 

230 

231 fasta_entry = f">{name}\n{seq}\n" 

232 entry_size = len(fasta_entry) 

233 

234 buffer[bin_name].append(fasta_entry) 

235 

236 buffer_size += entry_size 

237 if buffer_size >= max_buffer_size: 

238 flush_buffer() 

239 

240 flush_buffer() 

241 

242 

243def check_contig_consistency( 

244 contigs_from_assembly: Iterable[str], 

245 contigs_from_elsewhere: Iterable[str], 

246 assembly_file: str, 

247 elsewhere_file: str, 

248): 

249 """ 

250 Check the consistency of contig names between different sources. 

251 

252 :param contigs_from_assembly: List of contig names from the assembly file. 

253 :param contigs_from_elsewhere: List of contig names from an external source. 

254 :param assembly_file: Path to the assembly file. 

255 :param elsewhere_file: Path to the file from an external source. 

256 :raises AssertionError: If inconsistencies in contig names are found. 

257 """ 

258 logger.debug("Checking contig consistency") 

259 are_contigs_consistent = len( 

260 set(contigs_from_elsewhere) | set(contigs_from_assembly) 

261 ) <= len(set(contigs_from_assembly)) 

262 

263 issue_countigs = len(set(contigs_from_elsewhere) - set(contigs_from_assembly)) 

264 

265 message = ( 

266 f"{issue_countigs} contigs found in file '{elsewhere_file}' " 

267 f"were not found in assembly_file '{assembly_file}'" 

268 ) 

269 assert are_contigs_consistent, message 

270 

271 

272def check_resume_file(faa_file: Path, diamond_result_file: Path) -> None: 

273 """ 

274 Check the existence of files required for resuming the process. 

275 

276 :param faa_file: Path to the protein file. 

277 :param diamond_result_file: Path to the Diamond result file. 

278 :raises FileNotFoundError: If the required files don't exist for resuming. 

279 """ 

280 

281 if faa_file.exists() and diamond_result_file.exists(): 

282 return 

283 

284 if not faa_file.exists(): 

285 error_msg = ( 

286 f"Protein file '{faa_file}' does not exist. Resuming is not possible." 

287 ) 

288 logger.error(error_msg) 

289 raise FileNotFoundError(error_msg) 

290 

291 if not diamond_result_file.exists(): 

292 error_msg = f"Diamond result file '{diamond_result_file}' does not exist. Resuming is not possible." 

293 logger.error(error_msg) 

294 raise FileNotFoundError(error_msg) 

295 

296 

297def write_contig2bin_table( 

298 selected_bins: list[Bin], 

299 output_file: Path, 

300 contigs_names: list[str], 

301): 

302 """ 

303 Write a simple TSV file mapping contig IDs to bin IDs. 

304 

305 :param selected_bins: List of selected Bin objects. 

306 :param output_file: Path to the output TSV file. 

307 :param contigs_names: List of contig names where index corresponds to contig ID. 

308 """ 

309 logger.info(f"Writing contig2bin table to '{output_file}'") 

310 

311 # Ensure output directory exists 

312 output_file.parent.mkdir(parents=True, exist_ok=True) 

313 

314 with open(output_file, "w") as f: 

315 # Write contig to bin mappings 

316 for bin_obj in selected_bins: 

317 for contig_index in bin_obj.contigs: 

318 contig_name = contigs_names[contig_index] 

319 f.write(f"{contig_name}\t{bin_obj.name}\n") 

320 

321 total_entries = sum(len(bin_obj.contigs) for bin_obj in selected_bins) 

322 logger.debug(f"Successfully wrote contig2bin table with {total_entries} entries") 

323 

324 

325def write_original_bin_metrics(original_bins: list[Bin], original_bin_report_dir: Path): 

326 """ 

327 Write metrics of original input bins to a specified directory. 

328 

329 This function writes the metrics for each bin set to a TSV file in the specified directory. 

330 Each bin set will have its own TSV file named according to its set name. 

331 

332 :param original_bins: A set containing input bins 

333 :param original_bin_report_dir: The directory path (Path) where the bin metrics will be saved. 

334 """ 

335 

336 original_bin_report_dir.mkdir(parents=True, exist_ok=True) 

337 

338 bin_set_name_to_bins = defaultdict(list) 

339 for bin_obj in original_bins: 

340 for origin in bin_obj.origin: 

341 bin_set_name_to_bins[origin].append(bin_obj) 

342 

343 for i, (set_name, bins) in enumerate(sorted(bin_set_name_to_bins.items())): 

344 bins_metric_file = ( 

345 original_bin_report_dir 

346 / f"input_bins_{i + 1}.{set_name.replace('/', '_')}.tsv" 

347 ) 

348 

349 logger.debug( 

350 f"Writing metrics for bin set '{set_name}' to file '{bins_metric_file}'" 

351 ) 

352 write_bin_info(bins, bins_metric_file) 

353 

354 logger.debug("Completed writing all original input bin metrics")