Coverage for binette/bin_manager.py: 97%

222 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-01-06 19:22 +0000

1import logging 

2from collections import defaultdict 

3from pathlib import Path 

4 

5import pyfastx 

6 

7import itertools 

8import networkx as nx 

9from typing import List, Dict, Iterable, Tuple, Set, Mapping 

10 

11 

12class Bin: 

13 counter = 0 

14 

15 def __init__( 

16 self, contigs: Iterable[str], origin: str, name: str, is_original: bool = False 

17 ) -> None: 

18 """ 

19 Initialize a Bin object. 

20 

21 :param contigs: Iterable of contig names belonging to the bin. 

22 :param origin: Origin/source of the bin. 

23 :param name: Name of the bin. 

24 """ 

25 Bin.counter += 1 

26 

27 self.origin = {origin} 

28 self.name = name 

29 self.id = Bin.counter 

30 self.contigs = set(contigs) 

31 self.hash = hash(str(sorted(self.contigs))) 

32 

33 self.length = None 

34 self.N50 = None 

35 

36 self.completeness = None 

37 self.contamination = None 

38 self.score = None 

39 

40 self.is_original = is_original 

41 

42 def __eq__(self, other: "Bin") -> bool: 

43 """ 

44 Compare the Bin object with another object for equality. 

45 

46 :param other: The object to compare with. 

47 :return: True if the objects are equal, False otherwise. 

48 """ 

49 return self.contigs == other.contigs 

50 

51 def __hash__(self) -> int: 

52 """ 

53 Compute the hash value of the Bin object. 

54 

55 :return: The hash value. 

56 """ 

57 return self.hash 

58 

59 def __str__(self) -> str: 

60 """ 

61 Return a string representation of the Bin object. 

62 

63 :return: The string representation of the Bin object. 

64 """ 

65 return ( 

66 f"Bin {self.id} from {';'.join(self.origin)} ({len(self.contigs)} contigs)" 

67 ) 

68 

69 def overlaps_with(self, other: "Bin") -> Set[str]: 

70 """ 

71 Find the contigs that overlap between this bin and another bin. 

72 

73 :param other: The other Bin object. 

74 :return: A set of contig names that overlap between the bins. 

75 """ 

76 return self.contigs & other.contigs 

77 

78 # def __and__(self, other: 'Bin') -> 'Bin': 

79 # """ 

80 # Perform a logical AND operation between this bin and another bin. 

81 

82 # :param other: The other Bin object. 

83 # :return: A new Bin object representing the intersection of the bins. 

84 # """ 

85 # contigs = self.contigs & other.contigs 

86 # name = f"{self.name} & {other.name}" 

87 # origin = "intersection" 

88 

89 # return Bin(contigs, origin, name) 

90 

91 def add_length(self, length: int) -> None: 

92 """ 

93 Add the length attribute to the Bin object if the provided length is a positive integer. 

94 

95 :param length: The length value to add. 

96 :return: None 

97 """ 

98 if isinstance(length, int) and length > 0: 

99 self.length = length 

100 else: 

101 raise ValueError("Length should be a positive integer.") 

102 

103 def add_N50(self, n50: int) -> None: 

104 """ 

105 Add the N50 attribute to the Bin object. 

106 

107 :param n50: The N50 value to add. 

108 :return: None 

109 """ 

110 if isinstance(n50, int) and n50 >= 0: 

111 self.N50 = n50 

112 else: 

113 raise ValueError("N50 should be a positive integer.") 

114 

115 def add_quality( 

116 self, completeness: float, contamination: float, contamination_weight: float 

117 ) -> None: 

118 """ 

119 Set the quality attributes of the bin. 

120 

121 :param completeness: The completeness value. 

122 :param contamination: The contamination value. 

123 :param contamination_weight: The weight assigned to contamination in the score calculation. 

124 """ 

125 self.completeness = completeness 

126 self.contamination = contamination 

127 self.score = completeness - contamination_weight * contamination 

128 

129 def intersection(self, *others: "Bin") -> "Bin": 

130 """ 

131 Compute the intersection of the bin with other bins. 

132 

133 :param others: Other bins to compute the intersection with. 

134 :return: A new Bin representing the intersection of the bins. 

135 """ 

136 other_contigs = (o.contigs for o in others) 

137 contigs = self.contigs.intersection(*other_contigs) 

138 name = f"{self.id} & {' & '.join([str(other.id) for other in others])}" 

139 origin = "intersec" 

140 

141 return Bin(contigs, origin, name) 

142 

143 def difference(self, *others: "Bin") -> "Bin": 

144 """ 

145 Compute the difference between the bin and other bins. 

146 

147 :param others: Other bins to compute the difference with. 

148 :return: A new Bin representing the difference between the bins. 

149 """ 

150 other_contigs = (o.contigs for o in others) 

151 contigs = self.contigs.difference(*other_contigs) 

152 name = f"{self.id} - {' - '.join([str(other.id) for other in others])}" 

153 origin = "diff" 

154 

155 return Bin(contigs, origin, name) 

156 

157 def union(self, *others: "Bin") -> "Bin": 

158 """ 

159 Compute the union of the bin with other bins. 

160 

161 :param others: Other bins to compute the union with. 

162 :return: A new Bin representing the union of the bins. 

163 """ 

164 other_contigs = (o.contigs for o in others) 

165 contigs = self.contigs.union(*other_contigs) 

166 name = f"{self.id} | {' | '.join([str(other.id) for other in others])}" 

167 origin = "union" 

168 

169 return Bin(contigs, origin, name) 

170 

171 def is_complete_enough(self, min_completeness: float) -> bool: 

172 """ 

173 Determine if a bin is complete enough based on completeness threshold. 

174 

175 :param min_completeness: The minimum completeness required for a bin. 

176 

177 :raises ValueError: If completeness has not been set (is None). 

178 

179 :return: True if the bin meets the min_completeness threshold; False otherwise. 

180 """ 

181 

182 if self.completeness is None: 

183 raise ValueError( 

184 f"The bin '{self.name}' with ID '{self.id}' has not been evaluated for completeness or contamination, " 

185 "and therefore cannot be assessed." 

186 ) 

187 

188 return self.completeness >= min_completeness 

189 

190 def is_high_quality( 

191 self, min_completeness: float, max_contamination: float 

192 ) -> bool: 

193 """ 

194 Determine if a bin is considered high quality based on completeness and contamination thresholds. 

195 

196 :param min_completeness: The minimum completeness required for a bin to be considered high quality. 

197 :param max_contamination: The maximum allowed contamination for a bin to be considered high quality. 

198 

199 :raises ValueError: If either completeness or contamination has not been set (is None). 

200 

201 :return: True if the bin meets the high quality criteria; False otherwise. 

202 """ 

203 if self.completeness is None or self.contamination is None: 

204 raise ValueError( 

205 f"The bin '{self.name}' with ID '{self.id}' has not been evaluated for completeness or contamination, " 

206 "and therefore cannot be assessed for high quality." 

207 ) 

208 

209 return ( 

210 self.completeness >= min_completeness 

211 and self.contamination <= max_contamination 

212 ) 

213 

214 

215def get_bins_from_directory( 

216 bin_dir: Path, set_name: str, fasta_extensions: Set[str] 

217) -> List[Bin]: 

218 """ 

219 Retrieves a list of Bin objects from a directory containing bin FASTA files. 

220 

221 :param bin_dir: The directory path containing bin FASTA files. 

222 :param set_name: The name of the set the bins belong to. 

223 :fasta_extensions: Possible fasta extensions to look for in the bin directory. 

224 

225 :return: A list of Bin objects created from the bin FASTA files. 

226 """ 

227 bins = [] 

228 fasta_extensions |= { 

229 f".{ext}" for ext in fasta_extensions if not ext.startswith(".") 

230 } # adding a dot in case given extension are lacking one 

231 bin_fasta_files = ( 

232 fasta_file 

233 for fasta_file in bin_dir.glob("*") 

234 if set(fasta_file.suffixes) & fasta_extensions 

235 ) 

236 

237 for bin_fasta_path in bin_fasta_files: 

238 

239 bin_name = bin_fasta_path.name 

240 

241 contigs = { 

242 name for name, _ in pyfastx.Fasta(str(bin_fasta_path), build_index=False) 

243 } 

244 

245 bin_obj = Bin(contigs, set_name, bin_name) 

246 

247 bins.append(bin_obj) 

248 

249 return bins 

250 

251 

252def parse_bin_directories( 

253 bin_name_to_bin_dir: Dict[str, Path], fasta_extensions: Set[str] 

254) -> Dict[str, Set[Bin]]: 

255 """ 

256 Parses multiple bin directories and returns a dictionary mapping bin names to a list of Bin objects. 

257 

258 :param bin_name_to_bin_dir: A dictionary mapping bin names to their respective bin directories. 

259 :fasta_extensions: Possible fasta extensions to look for in the bin directory. 

260 

261 :return: A dictionary mapping bin names to a list of Bin objects created from the bin directories. 

262 """ 

263 bin_set_name_to_bins = {} 

264 

265 for name, bin_dir in bin_name_to_bin_dir.items(): 

266 bins = get_bins_from_directory(bin_dir, name, fasta_extensions) 

267 set_of_bins = set(bins) 

268 

269 # Calculate the number of duplicates 

270 num_duplicates = len(bins) - len(set_of_bins) 

271 

272 if num_duplicates > 0: 

273 logging.warning( 

274 f'{num_duplicates} bins with identical contig compositions detected in bin set "{name}". ' 

275 "These bins were merged to ensure uniqueness." 

276 ) 

277 

278 # Store the unique set of bins 

279 bin_set_name_to_bins[name] = set_of_bins 

280 

281 return bin_set_name_to_bins 

282 

283 

284def parse_contig2bin_tables( 

285 bin_name_to_bin_tables: Dict[str, Path] 

286) -> Dict[str, Set["Bin"]]: 

287 """ 

288 Parses multiple contig-to-bin tables and returns a dictionary mapping bin names to a set of unique Bin objects. 

289 

290 Logs a warning if duplicate bins are detected within a bin set. 

291 

292 :param bin_name_to_bin_tables: A dictionary where keys are bin set names and values are file paths or identifiers 

293 for contig-to-bin tables. Each table is parsed to extract Bin objects. 

294 

295 :return: A dictionary where keys are bin set names and values are sets of Bin objects. Duplicates are removed based 

296 on contig composition. 

297 """ 

298 bin_set_name_to_bins = {} 

299 

300 for name, contig2bin_table in bin_name_to_bin_tables.items(): 

301 bins = get_bins_from_contig2bin_table(contig2bin_table, name) 

302 set_of_bins = set(bins) 

303 

304 # Calculate the number of duplicates 

305 num_duplicates = len(bins) - len(set_of_bins) 

306 

307 if num_duplicates > 0: 

308 logging.warning( 

309 f'{num_duplicates*2} bins with identical contig compositions detected in bin set "{name}". ' 

310 "These bins were merged to ensure uniqueness." 

311 ) 

312 

313 # Store the unique set of bins 

314 bin_set_name_to_bins[name] = set_of_bins 

315 

316 return bin_set_name_to_bins 

317 

318 

319def get_bins_from_contig2bin_table(contig2bin_table: Path, set_name: str) -> List[Bin]: 

320 """ 

321 Retrieves a list of Bin objects from a contig-to-bin table. 

322 

323 :param contig2bin_table: The path to the contig-to-bin table. 

324 :param set_name: The name of the set the bins belong to. 

325 

326 :return: A list of Bin objects created from the contig-to-bin table. 

327 """ 

328 bin_name2contigs = defaultdict(set) 

329 with open(contig2bin_table) as fl: 

330 for line in fl: 

331 if line.startswith("#") or line.startswith("@"): 

332 logging.debug(f"Ignoring a line from {contig2bin_table}: {line}") 

333 continue 

334 contig_name = line.strip().split()[0] 

335 bin_name = line.strip().split("\t")[1] 

336 bin_name2contigs[bin_name].add(contig_name) 

337 

338 bins = [] 

339 for bin_name, contigs in bin_name2contigs.items(): 

340 bin_obj = Bin(contigs, set_name, bin_name) 

341 bins.append(bin_obj) 

342 return bins 

343 

344 

345def from_bins_to_bin_graph(bins) -> nx.Graph: 

346 """ 

347 Creates a bin graph made of overlapping gram a set of bins. 

348 

349 :param bins: a set of bins 

350 

351 :return: A networkx Graph representing the bin graph of overlapping bins. 

352 """ 

353 G = nx.Graph() 

354 

355 for bin1, bin2 in itertools.combinations(bins, 2): 

356 if bin1.overlaps_with(bin2): 

357 G.add_edge(bin1, bin2) 

358 return G 

359 

360 

361def get_all_possible_combinations(clique: List) -> Iterable[Tuple]: 

362 """ 

363 Generates all possible combinations of elements from a given clique. 

364 

365 :param clique: An iterable representing a clique. 

366 

367 :return: An iterable of tuples representing all possible combinations of elements from the clique. 

368 """ 

369 return ( 

370 c for r in range(2, len(clique) + 1) for c in itertools.combinations(clique, r) 

371 ) 

372 

373 

374def get_intersection_bins(G: nx.Graph) -> Set[Bin]: 

375 """ 

376 Retrieves the intersection bins from a given graph. 

377 

378 :param G: A networkx Graph representing the graph. 

379 

380 :return: A set of Bin objects representing the intersection bins. 

381 """ 

382 intersect_bins = set() 

383 

384 for clique in nx.clique.find_cliques(G): 

385 bins_combinations = get_all_possible_combinations(clique) 

386 for bins in bins_combinations: 

387 if max((b.completeness for b in bins)) < 40: 

388 continue 

389 

390 intersec_bin = bins[0].intersection(*bins[1:]) 

391 

392 if intersec_bin.contigs: # and intersec_bin not in clique: 

393 

394 intersect_bins.add(intersec_bin) 

395 

396 return intersect_bins 

397 

398 

399def get_difference_bins(G: nx.Graph) -> Set[Bin]: 

400 """ 

401 Retrieves the difference bins from a given graph. 

402 

403 :param G: A networkx Graph representing the graph. 

404 

405 :return: A set of Bin objects representing the difference bins. 

406 """ 

407 difference_bins = set() 

408 

409 for clique in nx.clique.find_cliques(G): 

410 

411 bins_combinations = get_all_possible_combinations(clique) 

412 for bins in bins_combinations: 

413 

414 for bin_a in bins: 

415 if bin_a.completeness < 40: 

416 continue 

417 bin_diff = bin_a.difference(*(b for b in bins if b != bin_a)) 

418 

419 if bin_diff.contigs: # and bin_diff not in clique: 

420 difference_bins.add(bin_diff) 

421 

422 return difference_bins 

423 

424 

425def get_union_bins(G: nx.Graph, max_conta: int = 50) -> Set[Bin]: 

426 """ 

427 Retrieves the union bins from a given graph. 

428 

429 :param G: A networkx Graph representing the graph of bins. 

430 :param max_conta: Maximum allowed contamination value for a bin to be included in the union. 

431 

432 :return: A set of Bin objects representing the union bins. 

433 """ 

434 union_bins = set() 

435 for clique in nx.clique.find_cliques(G): 

436 bins_combinations = get_all_possible_combinations(clique) 

437 for bins in bins_combinations: 

438 if max((b.contamination for b in bins)) > 20: 

439 continue 

440 

441 bins = set(bins) 

442 bin_a = bins.pop() 

443 bin_union = bin_a.union(*bins) 

444 if bin_union.contigs: # and bin_union not in clique: 

445 union_bins.add(bin_union) 

446 

447 return union_bins 

448 

449 

450def select_best_bins(bins: Set[Bin]) -> List[Bin]: 

451 """ 

452 Selects the best bins from a list of bins based on their scores, N50 values, and IDs. 

453 

454 :param bins: A list of Bin objects. 

455 

456 :return: A list of selected Bin objects. 

457 """ 

458 

459 logging.info("Sorting bins") 

460 # Sort on score, N50, and ID. Smaller ID values are preferred to select original bins first. 

461 sorted_bins = sorted(bins, key=lambda x: (x.score, x.N50, -x.id), reverse=True) 

462 

463 logging.info("Selecting bins") 

464 selected_bins = [] 

465 for b in sorted_bins: 

466 if b in bins: 

467 overlapping_bins = {b2 for b2 in bins if b.overlaps_with(b2)} 

468 bins -= overlapping_bins 

469 

470 selected_bins.append(b) 

471 

472 logging.info(f"Selected {len(selected_bins)} bins") 

473 return selected_bins 

474 

475 

476def group_identical_bins(bins: Iterable[Bin]) -> List[List[Bin]]: 

477 """ 

478 Group identical bins together 

479 

480 :param bins: list of bins 

481 

482 return List of list of identical bins 

483 """ 

484 binhash_to_bins = defaultdict(list) 

485 

486 # Collect bins by their hash values 

487 for bin_obj in bins: 

488 binhash_to_bins[bin_obj.hash].append(bin_obj) 

489 

490 return list(binhash_to_bins.values()) 

491 

492 

493def dereplicate_bin_sets(bin_sets: Iterable[Set["Bin"]]) -> Set["Bin"]: 

494 """ 

495 Consolidate bins from multiple bin sets into a single set of non-redundant bins. 

496 

497 Bins with the same hash are considered duplicates. For each group of duplicates, 

498 the origins are merged, and only one representative bin is kept. 

499 

500 :param bin_sets: An iterable of sets, where each set contains `Bin` objects. These sets are merged 

501 into a single set of unique bins by consolidating bins with the same hash. 

502 

503 :return: A set of `Bin` objects with duplicates removed. Each `Bin` in the resulting set has 

504 merged origins from the bins it was consolidated with. 

505 """ 

506 all_bins = (bin_obj for bins in bin_sets for bin_obj in bins) 

507 list_of_identical_bins = group_identical_bins(all_bins) 

508 

509 dereplicated_bins = set() 

510 

511 # Merge bins with the same hash 

512 for identical_bins in list_of_identical_bins: 

513 # Select the first bin as the representative 

514 selected_bin = identical_bins[0] 

515 for bin_obj in identical_bins[1:]: 

516 # Merge origins of all bins with the same hash 

517 selected_bin.origin |= bin_obj.origin 

518 

519 # Add the representative bin to the result set 

520 dereplicated_bins.add(selected_bin) 

521 

522 return dereplicated_bins 

523 

524 

525def get_contigs_in_bin_sets(bin_set_name_to_bins: Dict[str, Set[Bin]]) -> Set[str]: 

526 """ 

527 Processes bin sets to check for duplicated contigs and logs detailed information about each bin set. 

528 

529 :param bin_set_name_to_bins: A dictionary where keys are bin set names and values are sets of Bin objects. 

530 

531 :return: A set of contig names found in bin sets 

532 """ 

533 # To track all unique contigs across bin sets 

534 all_contigs_in_bins = set() 

535 

536 for bin_set_name, bins in bin_set_name_to_bins.items(): 

537 list_contigs_in_bin_sets = get_contigs_in_bins(bins) 

538 

539 # Count duplicates 

540 contig_counts = { 

541 contig: list_contigs_in_bin_sets.count(contig) 

542 for contig in list_contigs_in_bin_sets 

543 } 

544 duplicated_contigs = { 

545 contig: count for contig, count in contig_counts.items() if count > 1 

546 } 

547 

548 if duplicated_contigs: 

549 logging.warning( 

550 f"Bin set '{bin_set_name}' contains {len(duplicated_contigs)} duplicated contigs. " 

551 "Details: " 

552 + ", ".join( 

553 f"{contig} (found {count} times)" 

554 for contig, count in duplicated_contigs.items() 

555 ) 

556 ) 

557 

558 # Unique contigs in current bin set 

559 unique_contigs_in_bin_set = set(list_contigs_in_bin_sets) 

560 

561 # Update global contig tracker 

562 all_contigs_in_bins |= unique_contigs_in_bin_set 

563 

564 # Log summary for the current bin set 

565 logging.debug( 

566 f"Bin set '{bin_set_name}': {len(bins)} bins, {len(unique_contigs_in_bin_set)} unique contigs." 

567 ) 

568 

569 return all_contigs_in_bins 

570 

571 

572def get_contigs_in_bins(bins: Iterable[Bin]) -> List[str]: 

573 """ 

574 Retrieves all contigs present in the given list of bins. 

575 

576 :param bins: A list of Bin objects. 

577 

578 :return: A list of contigs present in the bins. 

579 """ 

580 return [contig for b in bins for contig in b.contigs] 

581 

582 

583def rename_bin_contigs(bins: Iterable[Bin], contig_to_index: dict): 

584 """ 

585 Renames the contigs in the bins based on the provided mapping. 

586 

587 :param bins: A list of Bin objects. 

588 :param contig_to_index: A dictionary mapping old contig names to new index names. 

589 """ 

590 for b in bins: 

591 b.contigs = {contig_to_index[contig] for contig in b.contigs} 

592 b.hash = hash(str(sorted(b.contigs))) 

593 

594 

595def create_intermediate_bins(original_bins: Set[Bin]) -> Set[Bin]: 

596 """ 

597 Creates intermediate bins from a dictionary of bin sets. 

598 

599 :param original_bins: Set of input bins. 

600 

601 :return: A set of intermediate bins created from intersections, differences, and unions. 

602 """ 

603 

604 logging.info("Making bin graph...") 

605 connected_bins_graph = from_bins_to_bin_graph(original_bins) 

606 

607 logging.info("Creating intersection bins...") 

608 intersection_bins = get_intersection_bins(connected_bins_graph) 

609 

610 logging.info(f"{len(intersection_bins)} bins created on intersections.") 

611 

612 logging.info("Creating difference bins...") 

613 difference_bins = get_difference_bins(connected_bins_graph) 

614 

615 logging.info(f"{len(difference_bins)} bins created based on symmetric difference.") 

616 

617 logging.info("Creating union bins...") 

618 union_bins = get_union_bins(connected_bins_graph) 

619 

620 logging.info(f"{len(union_bins)} bins created on unions.") 

621 

622 new_bins = difference_bins | intersection_bins | union_bins 

623 

624 new_bins = dereplicate_bin_sets((difference_bins, intersection_bins, union_bins)) 

625 

626 # dereplicate from the original bins 

627 original_hashes = [b.hash for b in original_bins] 

628 new_bins_not_in_original = set() 

629 

630 for b in new_bins: 

631 if b.hash not in original_hashes: 

632 new_bins_not_in_original.add(b) 

633 

634 return new_bins_not_in_original