Coverage for binette/main.py: 96%

161 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-01-06 19:22 +0000

1#!/usr/bin/env python 

2""" 

3Module : Main 

4Description : The main entry point for the program. 

5Copyright : (c) Jean Mainguy, 28 nov. 2022  

6License : MIT 

7Maintainer : Jean Mainguy 

8Portability : POSIX 

9""" 

10 

11from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter, Action, Namespace 

12 

13import sys 

14import logging 

15import os 

16 

17import binette 

18from binette import ( 

19 contig_manager, 

20 cds, 

21 diamond, 

22 bin_quality, 

23 bin_manager, 

24 io_manager as io, 

25) 

26from typing import List, Dict, Optional, Set, Tuple, Union, Sequence, Any 

27from pathlib import Path 

28 

29 

30def init_logging(verbose, debug): 

31 """Initialise logging.""" 

32 if debug: 

33 level = logging.DEBUG 

34 elif verbose: 

35 level = logging.INFO 

36 else: 

37 level = logging.WARNING 

38 

39 logging.basicConfig( 

40 level=level, 

41 format="%(asctime)s %(levelname)s - %(message)s", 

42 datefmt="[%Y-%m-%d %H:%M:%S]", 

43 ) 

44 

45 logging.info("Program started") 

46 logging.info( 

47 f'command line: {" ".join(sys.argv)}', 

48 ) 

49 

50 

51class UniqueStore(Action): 

52 """ 

53 Custom argparse action to ensure an argument is provided only once. 

54 """ 

55 

56 def __call__( 

57 self, 

58 parser: ArgumentParser, 

59 namespace: Namespace, 

60 values: Union[str, Sequence[Any], None], 

61 option_string: Optional[str] = None, 

62 ) -> None: 

63 """ 

64 Ensures the argument is only used once. Raises an error if the argument appears multiple times. 

65 

66 :param parser: The argparse parser instance. 

67 :param namespace: The namespace object that will contain the parsed arguments. 

68 :param values: The value associated with the argument. 

69 :param option_string: The option string that was used to invoke this action. 

70 """ 

71 # Check if the argument has already been set 

72 if getattr(namespace, self.dest, self.default) is not self.default: 

73 parser.error( 

74 f"Error: The argument {option_string} can only be specified once." 

75 ) 

76 

77 # Set the argument value 

78 setattr(namespace, self.dest, values) 

79 

80 

81def is_valid_file(parser: ArgumentParser, arg: str) -> Path: 

82 """ 

83 Validates that the provided input file exists. 

84 

85 :param parser: The ArgumentParser instance handling command-line arguments. 

86 :param arg: The path to the file provided as an argument. 

87 :return: A Path object representing the valid file. 

88 """ 

89 path_arg = Path(arg) 

90 

91 # Check if the file exists at the provided path 

92 if not path_arg.exists(): 

93 parser.error(f"Error: The specified file '{arg}' does not exist.") 

94 

95 return path_arg 

96 

97 

98def parse_arguments(args): 

99 """Parse script arguments.""" 

100 

101 parser = ArgumentParser( 

102 description=f"Binette version={binette.__version__}", 

103 formatter_class=ArgumentDefaultsHelpFormatter, 

104 ) 

105 

106 # Input arguments category 

107 input_group = parser.add_argument_group("Input Arguments") 

108 input_arg = input_group.add_mutually_exclusive_group(required=True) 

109 

110 input_arg.add_argument( 

111 "-d", 

112 "--bin_dirs", 

113 nargs="+", 

114 type=lambda x: is_valid_file(parser, x), 

115 action=UniqueStore, 

116 help="List of bin folders containing each bin in a fasta file.", 

117 ) 

118 

119 input_arg.add_argument( 

120 "-b", 

121 "--contig2bin_tables", 

122 nargs="+", 

123 action=UniqueStore, 

124 type=lambda x: is_valid_file(parser, x), 

125 help="List of contig2bin table with two columns separated\ 

126 with a tabulation: contig, bin", 

127 ) 

128 

129 input_group.add_argument( 

130 "-c", 

131 "--contigs", 

132 required=True, 

133 type=lambda x: is_valid_file(parser, x), 

134 help="Contigs in fasta format.", 

135 ) 

136 

137 input_group.add_argument( 

138 "-p", 

139 "--proteins", 

140 type=lambda x: is_valid_file(parser, x), 

141 help="FASTA file of predicted proteins in Prodigal format (>contigID_geneID). " 

142 "Skips the gene prediction step if provided.", 

143 ) 

144 

145 # Other parameters category 

146 other_group = parser.add_argument_group("Other Arguments") 

147 

148 other_group.add_argument( 

149 "-m", 

150 "--min_completeness", 

151 default=40, 

152 type=int, 

153 help="Minimum completeness required for final bin selections.", 

154 ) 

155 

156 other_group.add_argument( 

157 "-t", "--threads", default=1, type=int, help="Number of threads to use." 

158 ) 

159 

160 other_group.add_argument( 

161 "-o", "--outdir", default=Path("results"), type=Path, help="Output directory." 

162 ) 

163 

164 other_group.add_argument( 

165 "-w", 

166 "--contamination_weight", 

167 default=2, 

168 type=float, 

169 help="Bin are scored as follow: completeness - weight * contamination. " 

170 "A low contamination_weight favor complete bins over low contaminated bins.", 

171 ) 

172 

173 other_group.add_argument( 

174 "-e", 

175 "--fasta_extensions", 

176 nargs="+", 

177 default={".fasta", ".fa", ".fna"}, 

178 type=str, 

179 help="Specify the FASTA file extensions to search for in bin directories when using the --bin_dirs option.", 

180 ) 

181 

182 other_group.add_argument( 

183 "--checkm2_db", 

184 type=Path, 

185 help="Provide a path for the CheckM2 diamond database. " 

186 "By default the database set via <checkm2 database> is used.", 

187 ) 

188 

189 other_group.add_argument( 

190 "--low_mem", help="Use low mem mode when running diamond", action="store_true" 

191 ) 

192 

193 other_group.add_argument( 

194 "-v", "--verbose", help="increase output verbosity", action="store_true" 

195 ) 

196 

197 other_group.add_argument("--debug", help="Activate debug mode", action="store_true") 

198 

199 other_group.add_argument( 

200 "--resume", 

201 action="store_true", 

202 help="Activate resume mode. Binette will examine the 'temporary_files' directory " 

203 "within the output directory and reuse any existing files if possible.", 

204 ) 

205 

206 other_group.add_argument("--version", action="version", version=binette.__version__) 

207 

208 args = parser.parse_args(args) 

209 return args 

210 

211 

212def parse_input_files( 

213 bin_dirs: List[Path], 

214 contig2bin_tables: List[Path], 

215 contigs_fasta: Path, 

216 temporary_dir: Path, 

217 fasta_extensions: Set[str] = {".fasta", ".fna", ".fa"}, 

218) -> Tuple[ 

219 Dict[str, Set[bin_manager.Bin]], Set[bin_manager.Bin], Set[str], Dict[str, int] 

220]: 

221 """ 

222 Parses input files to retrieve information related to bins and contigs. 

223 

224 :param bin_dirs: List of paths to directories containing bin FASTA files. 

225 :param contig2bin_tables: List of paths to contig-to-bin tables. 

226 :param contigs_fasta: Path to the contigs FASTA file. 

227 :fasta_extensions: Possible fasta extensions to look for in the bin directory. 

228 

229 :return: A tuple containing: 

230 - List of original bins. 

231 - Dictionary mapping bins to lists of contigs. 

232 - Dictionary mapping contig names to their lengths. 

233 """ 

234 

235 if bin_dirs: 

236 logging.info("Parsing bin directories.") 

237 bin_name_to_bin_dir = io.infer_bin_set_names_from_input_paths(bin_dirs) 

238 bin_set_name_to_bins = bin_manager.parse_bin_directories( 

239 bin_name_to_bin_dir, fasta_extensions 

240 ) 

241 else: 

242 logging.info("Parsing bin2contig files.") 

243 bin_name_to_bin_table = io.infer_bin_set_names_from_input_paths( 

244 contig2bin_tables 

245 ) 

246 bin_set_name_to_bins = bin_manager.parse_contig2bin_tables( 

247 bin_name_to_bin_table 

248 ) 

249 

250 logging.info(f"Processing {len(bin_set_name_to_bins)} bin sets.") 

251 for bin_set_id, bins in bin_set_name_to_bins.items(): 

252 logging.info(f" {bin_set_id} - {len(bins)} bins") 

253 

254 contigs_in_bins = bin_manager.get_contigs_in_bin_sets(bin_set_name_to_bins) 

255 original_bins = bin_manager.dereplicate_bin_sets(bin_set_name_to_bins.values()) 

256 

257 logging.info(f"Parsing contig fasta file: {contigs_fasta}") 

258 index_file = temporary_dir / f"{contigs_fasta.name}.fxi" 

259 contigs_object = contig_manager.parse_fasta_file( 

260 contigs_fasta.as_posix(), index_file=index_file.as_posix() 

261 ) 

262 

263 unexpected_contigs = { 

264 contig for contig in contigs_in_bins if contig not in contigs_object 

265 } 

266 

267 if len(unexpected_contigs): 

268 raise ValueError( 

269 f"{len(unexpected_contigs)} contigs from the input bins were not found in the contigs file '{contigs_fasta}'. " 

270 f"The missing contigs are: {', '.join(unexpected_contigs)}. Please ensure all contigs from input bins are present in contig file." 

271 ) 

272 

273 contig_to_length = { 

274 seq.name: len(seq) for seq in contigs_object if seq.name in contigs_in_bins 

275 } 

276 

277 return original_bins, contigs_in_bins, contig_to_length 

278 

279 

280def manage_protein_alignement( 

281 faa_file: Path, 

282 contigs_fasta: Path, 

283 temporary_dir: Path, 

284 contig_to_length: Dict[str, int], 

285 contigs_in_bins: Set[str], 

286 diamond_result_file: Path, 

287 checkm2_db: Optional[Path], 

288 threads: int, 

289 use_existing_protein_file: bool, 

290 resume_diamond: bool, 

291 low_mem: bool, 

292) -> Tuple[Dict[str, int], Dict[str, List[str]]]: 

293 """ 

294 Predicts or reuses proteins prediction and runs diamond on them. 

295 

296 :param faa_file: The path to the .faa file. 

297 :param contigs_fasta: The path to the contigs FASTA file. 

298 :param contig_to_length: Dictionary mapping contig names to their lengths. 

299 :param contigs_in_bins: Dictionary mapping bin names to lists of contigs. 

300 :param diamond_result_file: The path to the diamond result file. 

301 :param checkm2_db: The path to the CheckM2 database. 

302 :param threads: Number of threads for parallel processing. 

303 :param use_existing_protein_file: Boolean indicating whether to use an existing protein file. 

304 :param resume_diamond: Boolean indicating whether to resume diamond alignement. 

305 :param low_mem: Boolean indicating whether to use low memory mode. 

306 

307 :return: A tuple containing dictionaries - contig_to_kegg_counter and contig_to_genes. 

308 """ 

309 

310 # Predict or reuse proteins prediction and run diamond on them 

311 if use_existing_protein_file: 

312 logging.info(f"Parsing faa file: {faa_file}.") 

313 contig_to_genes = cds.parse_faa_file(faa_file.as_posix()) 

314 io.check_contig_consistency( 

315 contig_to_length, 

316 contig_to_genes, 

317 contigs_fasta.as_posix(), 

318 faa_file.as_posix(), 

319 ) 

320 

321 else: 

322 index_file = temporary_dir / f"{contigs_fasta.name}.fxi" 

323 contigs_iterator = ( 

324 seq 

325 for seq in contig_manager.parse_fasta_file( 

326 contigs_fasta.as_posix(), index_file=index_file.as_posix() 

327 ) 

328 if seq.name in contigs_in_bins 

329 ) 

330 contig_to_genes = cds.predict(contigs_iterator, faa_file.as_posix(), threads) 

331 

332 if not resume_diamond: 

333 if checkm2_db is None: 

334 # get checkm2 db stored in checkm2 install 

335 diamond_db_path = diamond.get_checkm2_db() 

336 elif checkm2_db.exists(): 

337 diamond_db_path = checkm2_db.as_posix() 

338 else: 

339 raise FileNotFoundError(checkm2_db) 

340 

341 diamond_log = ( 

342 diamond_result_file.parents[0] 

343 / f"{diamond_result_file.stem.split('.')[0]}.log" 

344 ) 

345 

346 diamond.run( 

347 faa_file.as_posix(), 

348 diamond_result_file.as_posix(), 

349 diamond_db_path, 

350 diamond_log.as_posix(), 

351 threads, 

352 low_mem=low_mem, 

353 ) 

354 

355 logging.info("Parsing diamond results.") 

356 contig_to_kegg_counter = diamond.get_contig_to_kegg_id( 

357 diamond_result_file.as_posix() 

358 ) 

359 

360 # Check contigs from diamond vs input assembly consistency 

361 io.check_contig_consistency( 

362 contig_to_length, 

363 contig_to_kegg_counter, 

364 contigs_fasta.as_posix(), 

365 diamond_result_file.as_posix(), 

366 ) 

367 

368 return contig_to_kegg_counter, contig_to_genes 

369 

370 

371def select_bins_and_write_them( 

372 all_bins: Set[bin_manager.Bin], 

373 contigs_fasta: Path, 

374 final_bin_report: Path, 

375 min_completeness: float, 

376 index_to_contig: dict, 

377 outdir: Path, 

378 debug: bool, 

379) -> List[bin_manager.Bin]: 

380 """ 

381 Selects and writes bins based on specific criteria. 

382 

383 :param all_bins: Set of Bin objects. 

384 :param contigs_fasta: Path to the contigs FASTA file. 

385 :param final_bin_report: Path to write the final bin report. 

386 :param min_completeness: Minimum completeness threshold for bin selection. 

387 :param index_to_contig: Dictionary mapping indices to contig names. 

388 :param outdir: Output directory to save final bins and reports. 

389 :param debug: Debug mode flag. 

390 :return: Selected bins that meet the completeness threshold. 

391 """ 

392 

393 outdir_final_bin_set = outdir / "final_bins" 

394 os.makedirs(outdir_final_bin_set, exist_ok=True) 

395 

396 logging.info( 

397 f"Filtering bins: only bins with completeness >= {min_completeness} are kept" 

398 ) 

399 all_bins_complete_enough = { 

400 b for b in all_bins if b.is_complete_enough(min_completeness) 

401 } 

402 

403 logging.info("Selecting best bins") 

404 selected_bins = bin_manager.select_best_bins(all_bins_complete_enough) 

405 

406 logging.info(f"Bin Selection: {len(selected_bins)} selected bins") 

407 

408 logging.info(f"Writing selected bins in {final_bin_report}") 

409 

410 for b in selected_bins: 

411 b.contigs = {index_to_contig[c_index] for c_index in b.contigs} 

412 

413 io.write_bin_info(selected_bins, final_bin_report) 

414 

415 io.write_bins_fasta(selected_bins, contigs_fasta, outdir_final_bin_set) 

416 

417 if debug: 

418 all_bin_compo_file = outdir / "all_bins_quality_reports.tsv" 

419 

420 logging.info(f"Writing all bins in {all_bin_compo_file}") 

421 

422 io.write_bin_info(all_bins, all_bin_compo_file, add_contigs=True) 

423 

424 with open(os.path.join(outdir, "index_to_contig.tsv"), "w") as flout: 

425 flout.write("\n".join((f"{i}\t{c}" for i, c in index_to_contig.items()))) 

426 

427 return selected_bins 

428 

429 

430def log_selected_bin_info( 

431 selected_bins: List[bin_manager.Bin], 

432 hq_min_completeness: float, 

433 hq_max_conta: float, 

434): 

435 """ 

436 Log information about selected bins based on quality thresholds. 

437 

438 :param selected_bins: List of Bin objects to analyze. 

439 :param hq_min_completeness: Minimum completeness threshold for high-quality bins. 

440 :param hq_max_conta: Maximum contamination threshold for high-quality bins. 

441 

442 This function logs information about selected bins that meet specified quality thresholds. 

443 It counts the number of high-quality bins based on completeness and contamination values. 

444 """ 

445 

446 # Log completeness and contamination in debug log 

447 logging.debug("High quality bins:") 

448 for sb in selected_bins: 

449 if sb.is_high_quality( 

450 min_completeness=hq_min_completeness, max_contamination=hq_max_conta 

451 ): 

452 logging.debug( 

453 f"> {sb} completeness={sb.completeness}, contamination={sb.contamination}" 

454 ) 

455 

456 # Count high-quality bins and single-contig high-quality bins 

457 hq_bins = len( 

458 [ 

459 sb 

460 for sb in selected_bins 

461 if sb.is_high_quality( 

462 min_completeness=hq_min_completeness, max_contamination=hq_max_conta 

463 ) 

464 ] 

465 ) 

466 

467 # Log information about high-quality bins 

468 thresholds = ( 

469 f"(completeness >= {hq_min_completeness} and contamination <= {hq_max_conta})" 

470 ) 

471 logging.info( 

472 f"{hq_bins}/{len(selected_bins)} selected bins have a high quality {thresholds}." 

473 ) 

474 

475 

476def main(): 

477 "Orchestrate the execution of the program" 

478 

479 args = parse_arguments( 

480 sys.argv[1:] 

481 ) # sys.argv is passed in order to be able to test the function parse_arguments 

482 

483 init_logging(args.verbose, args.debug) 

484 

485 # High quality threshold used just to log number of high quality bins. 

486 hq_max_conta = 5 

487 hq_min_completeness = 90 

488 

489 # Temporary files # 

490 out_tmp_dir: Path = args.outdir / "temporary_files" 

491 os.makedirs(out_tmp_dir, exist_ok=True) 

492 

493 use_existing_protein_file = False 

494 

495 faa_file = out_tmp_dir / "assembly_proteins.faa.gz" 

496 

497 diamond_result_file = out_tmp_dir / "diamond_result.tsv.gz" 

498 

499 # Output files # 

500 final_bin_report: Path = args.outdir / "final_bins_quality_reports.tsv" 

501 original_bin_report_dir: Path = args.outdir / "input_bins_quality_reports" 

502 

503 if args.resume: 

504 io.check_resume_file(faa_file, diamond_result_file) 

505 use_existing_protein_file = True 

506 

507 original_bins, contigs_in_bins, contig_to_length = parse_input_files( 

508 args.bin_dirs, 

509 args.contig2bin_tables, 

510 args.contigs, 

511 fasta_extensions=set(args.fasta_extensions), 

512 temporary_dir=out_tmp_dir, 

513 ) 

514 

515 if args.proteins and not args.resume: 

516 logging.info(f"Using the provided protein sequences file: {args.proteins}") 

517 use_existing_protein_file = True 

518 

519 cds.filter_faa_file( 

520 contigs_in_bins, 

521 input_faa_file=args.proteins, 

522 filtered_faa_file=faa_file, 

523 ) 

524 

525 contig_to_kegg_counter, contig_to_genes = manage_protein_alignement( 

526 faa_file=faa_file, 

527 contigs_fasta=args.contigs, 

528 temporary_dir=out_tmp_dir, 

529 contig_to_length=contig_to_length, 

530 contigs_in_bins=contigs_in_bins, 

531 diamond_result_file=diamond_result_file, 

532 checkm2_db=args.checkm2_db, 

533 threads=args.threads, 

534 use_existing_protein_file=use_existing_protein_file, 

535 resume_diamond=args.resume, 

536 low_mem=args.low_mem, 

537 ) 

538 

539 # Use contig index instead of contig name to save memory 

540 contig_to_index, index_to_contig = contig_manager.make_contig_index(contigs_in_bins) 

541 

542 contig_to_kegg_counter = contig_manager.apply_contig_index( 

543 contig_to_index, contig_to_kegg_counter 

544 ) 

545 contig_to_genes = contig_manager.apply_contig_index( 

546 contig_to_index, contig_to_genes 

547 ) 

548 contig_to_length = contig_manager.apply_contig_index( 

549 contig_to_index, contig_to_length 

550 ) 

551 

552 bin_manager.rename_bin_contigs(original_bins, contig_to_index) 

553 

554 # Extract cds metadata ## 

555 logging.info("Compute cds metadata.") 

556 contig_metadat = cds.get_contig_cds_metadata(contig_to_genes, args.threads) 

557 

558 contig_metadat["contig_to_kegg_counter"] = contig_to_kegg_counter 

559 contig_metadat["contig_to_length"] = contig_to_length 

560 

561 logging.info("Add size and assess quality of input bins") 

562 bin_quality.add_bin_metrics( 

563 original_bins, contig_metadat, args.contamination_weight, args.threads 

564 ) 

565 

566 logging.info( 

567 f"Writting original input bin metrics to directory: {original_bin_report_dir}" 

568 ) 

569 io.write_original_bin_metrics(original_bins, original_bin_report_dir) 

570 

571 logging.info("Create intermediate bins:") 

572 new_bins = bin_manager.create_intermediate_bins(original_bins) 

573 

574 logging.info(f"Assess quality for {len(new_bins)} intermediate bins.") 

575 bin_quality.add_bin_metrics( 

576 new_bins, 

577 contig_metadat, 

578 args.contamination_weight, 

579 args.threads, 

580 ) 

581 

582 logging.info("Dereplicating input bins and new bins") 

583 all_bins = original_bins | new_bins 

584 

585 selected_bins = select_bins_and_write_them( 

586 all_bins, 

587 args.contigs, 

588 final_bin_report, 

589 args.min_completeness, 

590 index_to_contig, 

591 args.outdir, 

592 args.debug, 

593 ) 

594 

595 log_selected_bin_info(selected_bins, hq_min_completeness, hq_max_conta) 

596 

597 return 0