Coverage for binette/bin_manager.py: 96%

260 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-14 14:36 +0000

1import itertools 

2import logging 

3from collections import Counter, defaultdict 

4from collections.abc import Iterable 

5from functools import cached_property 

6from pathlib import Path 

7from typing import Any 

8 

9import networkx as nx 

10import numpy as np 

11import pyfastx 

12from pyroaring import BitMap 

13from rich.progress import Progress 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class Bin: 

19 CHECKM2_MODELS = ( 

20 "Neural Network (Specific Model)", 

21 "Gradient Boost (General Model)", 

22 ) 

23 

24 def __init__( 

25 self, 

26 contigs: BitMap, 

27 origin: set[str] | None = None, 

28 name: str | None = None, 

29 is_original: bool = False, 

30 ) -> None: 

31 """ 

32 Initialize a Bin object. 

33 

34 :param contigs: Iterable of contig names belonging to the bin. 

35 :param origin: Origin/source of the bin. 

36 :param name: Name of the bin. 

37 """ 

38 

39 if not isinstance(contigs, BitMap): 

40 raise TypeError("Contigs should be a BitMap object.") 

41 

42 if origin is None: 

43 self.origin = set() 

44 else: 

45 self.origin = {origin} 

46 

47 self.name = name 

48 

49 self.is_original = is_original 

50 

51 self.contigs = contigs 

52 

53 self.length = None 

54 self.N50 = None 

55 

56 self.completeness = None 

57 self.contamination = None 

58 self.score = None 

59 self.coding_density = None 

60 self.original_name = None 

61 self._checkm2_model_index = None 

62 

63 @cached_property 

64 def contigs_key(self): 

65 """ 

66 Serialize the contigs for easier comparison. 

67 """ 

68 return self.contigs.serialize() 

69 

70 def __eq__(self, other: "Bin") -> bool: 

71 """ 

72 Compare the Bin object with another object for equality. 

73 

74 :param other: The object to compare with. 

75 :return: True if the objects are equal, False otherwise. 

76 """ 

77 return self.contigs_key == other.contigs_key 

78 

79 def __str__(self) -> str: 

80 """ 

81 Return a string representation of the Bin object. 

82 

83 :return: The string representation of the Bin object. 

84 """ 

85 return f"Bin {self.name} from {';'.join(self.origin)} ({len(self.contigs)} contigs)" 

86 

87 def overlaps_with(self, other: "Bin") -> set[str]: 

88 """ 

89 Find the contigs that overlap between this bin and another bin. 

90 

91 :param other: The other Bin object. 

92 :return: A set of contig names that overlap between the bins. 

93 """ 

94 return self.contigs & other.contigs 

95 

96 def add_length(self, length: int) -> None: 

97 """ 

98 Add the length attribute to the Bin object if the provided length is a positive integer. 

99 

100 :param length: The length value to add. 

101 :return: None 

102 """ 

103 if isinstance(length, int) and length > 0: 

104 self.length = length 

105 else: 

106 raise ValueError("Length should be a positive integer.") 

107 

108 def add_N50(self, n50: int) -> None: 

109 """ 

110 Add the N50 attribute to the Bin object. 

111 

112 :param n50: The N50 value to add. 

113 :return: None 

114 """ 

115 if isinstance(n50, int) and n50 >= 0: 

116 self.N50 = n50 

117 else: 

118 raise ValueError("N50 should be a positive integer.") 

119 

120 def add_quality( 

121 self, completeness: float, contamination: float, contamination_weight: float 

122 ) -> None: 

123 """ 

124 Set the quality attributes of the bin. 

125 

126 :param completeness: The completeness value. 

127 :param contamination: The contamination value. 

128 :param contamination_weight: The weight assigned to contamination in the score calculation. 

129 """ 

130 self.completeness = completeness 

131 self.contamination = contamination 

132 self.score = completeness - contamination_weight * contamination 

133 

134 def add_model(self, model: str) -> None: 

135 """ 

136 Add a CheckM2 model to the bin. 

137 

138 :param model: The model name to add. 

139 :raises ValueError: If the model name is not recognized. 

140 """ 

141 

142 try: 

143 self._checkm2_model_index = self.CHECKM2_MODELS.index(model) 

144 except ValueError as exc: 

145 raise ValueError( 

146 f"Unknown model '{model}' attempted to be added to bin '{self.name}'. " 

147 f"Valid models are: {', '.join(self.CHECKM2_MODELS)}" 

148 ) from exc 

149 

150 @cached_property 

151 def checkm2_model(self): 

152 """ 

153 Get the CheckM2 model for the bin. 

154 """ 

155 if self._checkm2_model_index is not None: 

156 return self.CHECKM2_MODELS[self._checkm2_model_index] 

157 return None 

158 

159 def contig_intersection(self, *others: "Bin") -> BitMap: 

160 """ 

161 Compute the intersection of the bin with other bins. 

162 

163 :param others: Other bins to compute the intersection with. 

164 """ 

165 return self.contigs.intersection(*(o.contigs for o in others)) 

166 

167 def contig_difference(self, *others: "Bin") -> BitMap: 

168 """ 

169 Compute the difference between the bin and other bins. 

170 

171 :param others: Other bins to compute the difference with. 

172 """ 

173 return self.contigs.difference(*(o.contigs for o in others)) 

174 

175 def contig_union(self, *others: "Bin") -> BitMap: 

176 """ 

177 Compute the union of the bin with other bins. 

178 

179 :param others: Other bins to compute the union with. 

180 """ 

181 return self.contigs.union(*(o.contigs for o in others)) 

182 

183 def is_high_quality( 

184 self, min_completeness: float, max_contamination: float 

185 ) -> bool: 

186 """ 

187 Determine if a bin is considered high quality based on completeness and contamination thresholds. 

188 

189 :param min_completeness: The minimum completeness required for a bin to be considered high quality. 

190 :param max_contamination: The maximum allowed contamination for a bin to be considered high quality. 

191 

192 :raises ValueError: If either completeness or contamination has not been set (is None). 

193 

194 :return: True if the bin meets the high quality criteria; False otherwise. 

195 """ 

196 if self.completeness is None or self.contamination is None: 

197 raise ValueError( 

198 f"The bin '{self.name}' with ID '{self.name}' has not been evaluated for completeness or contamination, " 

199 "and therefore cannot be assessed for high quality." 

200 ) 

201 

202 return ( 

203 self.completeness >= min_completeness 

204 and self.contamination <= max_contamination 

205 ) 

206 

207 def add_coding_density( 

208 self, contig_to_coding_length: dict[int, int] 

209 ) -> float | None: 

210 """ 

211 Calculate the coding density of the bin. 

212 

213 :param contig_to_coding_length: A dictionary mapping contig IDs to their total coding lengths. 

214 

215 :return: The coding density of the bin, or None if the length is not set or is zero. 

216 """ 

217 if self.length is None or self.length == 0: 

218 return None 

219 

220 total_coding = sum( 

221 contig_to_coding_length.get(contig_id, 0) for contig_id in self.contigs 

222 ) 

223 

224 self.coding_density = total_coding / self.length 

225 

226 

227def make_bins_from_bins_info( 

228 bin_set_name_to_bins_info: dict[str, list[dict[str, Any]]], 

229 contig_to_index: dict[str, int], 

230 are_original_bins: bool, 

231): 

232 """ 

233 Create Bin objects from the provided bin information. 

234 

235 :param bin_set_name_to_bins_info: A dictionary mapping bin set names to their bin information. 

236 :param contig_to_index: A mapping of contig names to their indices. 

237 :param are_original_bins: A boolean indicating whether the bins are original. 

238 

239 :return: A dictionary mapping serialized contig bitmaps to their corresponding Bin objects. 

240 """ 

241 

242 contig_key_to_bin: dict[bytes, Bin] = {} 

243 

244 for set_name, bins_info in bin_set_name_to_bins_info.items(): 

245 for bin_info in bins_info: 

246 bitmap_contigs = BitMap( 

247 contig_to_index[contig] for contig in bin_info["contigs"] 

248 ) 

249 

250 bin_obj = Bin( 

251 contigs=bitmap_contigs, 

252 origin=set_name, 

253 name=bin_info["bin_name"], 

254 is_original=are_original_bins, 

255 ) 

256 if bin_obj.contigs_key not in contig_key_to_bin: 

257 contig_key_to_bin[bin_obj.contigs_key] = bin_obj 

258 else: 

259 bin_obj.origin.add(set_name) 

260 

261 return contig_key_to_bin 

262 

263 

264def get_bins_from_directory( 

265 bin_dir: Path, set_name: str, fasta_extensions: set[str] 

266) -> list[Bin]: 

267 """ 

268 Retrieves a list of Bin objects from a directory containing bin FASTA files. 

269 

270 :param bin_dir: The directory path containing bin FASTA files. 

271 :param set_name: The name of the set the bins belong to. 

272 :fasta_extensions: Possible fasta extensions to look for in the bin directory. 

273 

274 :return: A list of Bin objects created from the bin FASTA files. 

275 """ 

276 bins = [] 

277 fasta_extensions |= { 

278 f".{ext}" for ext in fasta_extensions if not ext.startswith(".") 

279 } # adding a dot in case given extension are lacking one 

280 bin_fasta_files = ( 

281 fasta_file 

282 for fasta_file in bin_dir.glob("*") 

283 if set(fasta_file.suffixes) & fasta_extensions 

284 ) 

285 

286 for bin_fasta_path in bin_fasta_files: 

287 bin_name = bin_fasta_path.with_suffix("").name 

288 

289 contigs = {name for name, _ in pyfastx.Fastx(str(bin_fasta_path))} 

290 

291 bin_dict = {"contigs": contigs, "set_name": set_name, "bin_name": bin_name} 

292 

293 bins.append(bin_dict) 

294 

295 return bins 

296 

297 

298def parse_bin_directories( 

299 bin_name_to_bin_dir: dict[str, Path], fasta_extensions: set[str] 

300) -> dict[str, list[dict[str, Any]]]: 

301 """ 

302 Parses multiple bin directories and returns a dictionary mapping bin names to a list of Bin objects. 

303 

304 :param bin_name_to_bin_dir: A dictionary mapping bin names to their respective bin directories. 

305 :fasta_extensions: Possible fasta extensions to look for in the bin directory. 

306 

307 :return: A dictionary mapping bin names to a list of dict created from the bin directories. 

308 """ 

309 bin_set_name_to_bins_info = {} 

310 

311 for name, bin_dir in sorted(bin_name_to_bin_dir.items()): 

312 bins = get_bins_from_directory(bin_dir, name, fasta_extensions) 

313 

314 # TODO: redo this check 

315 # set_of_bins = set(bins) 

316 

317 # # Calculate the number of duplicates 

318 # num_duplicates = len(bins) - len(set_of_bins) 

319 

320 # if num_duplicates > 0: 

321 # logger.warning( 

322 # f'{num_duplicates} bins with identical contig compositions detected in bin set "{name}". ' 

323 # "These bins were merged to ensure uniqueness." 

324 # ) 

325 

326 # Store the unique set of bins 

327 bin_set_name_to_bins_info[name] = bins 

328 

329 return bin_set_name_to_bins_info 

330 

331 

332def parse_contig2bin_tables( 

333 bin_name_to_bin_tables: dict[str, Path], 

334) -> dict[str, list[dict[str, Any]]]: 

335 """ 

336 Parses multiple contig-to-bin tables and returns a dictionary mapping bin names to a set of unique Bin objects. 

337 

338 Logs a warning if duplicate bins are detected within a bin set. 

339 

340 :param bin_name_to_bin_tables: A dictionary where keys are bin set names and values are file paths or identifiers 

341 for contig-to-bin tables. Each table is parsed to extract Bin objects. 

342 

343 :return: A dictionary where keys are bin set names and values are sets of Bin objects. Duplicates are removed based 

344 on contig composition. 

345 """ 

346 bin_set_name_to_bins_info = {} 

347 

348 for name, contig2bin_table in sorted(bin_name_to_bin_tables.items()): 

349 bins = get_bins_from_contig2bin_table(contig2bin_table, name) 

350 

351 # TODO: redo this check 

352 # set_of_bins = set(bins) 

353 

354 # # Calculate the number of duplicates 

355 # num_duplicates = len(bins) - len(set_of_bins) 

356 

357 # if num_duplicates > 0: 

358 # logger.warning( 

359 # f'{num_duplicates*2} bins with identical contig compositions detected in bin set "{name}". ' 

360 # "These bins were merged to ensure uniqueness." 

361 # ) 

362 

363 # Store the unique set of bins 

364 bin_set_name_to_bins_info[name] = bins 

365 

366 return bin_set_name_to_bins_info 

367 

368 

369def get_bins_from_contig2bin_table( 

370 contig2bin_table: Path, set_name: str 

371) -> list[dict[str, Any]]: 

372 """ 

373 Retrieves a list of Bin objects from a contig-to-bin table. 

374 

375 :param contig2bin_table: The path to the contig-to-bin table. 

376 :param set_name: The name of the set the bins belong to. 

377 

378 :return: A list of Bin info in dict created from the contig-to-bin table. 

379 """ 

380 bin_name2contigs = defaultdict(set) 

381 with open(contig2bin_table) as fl: 

382 for line in fl: 

383 if line.startswith("#") or line.startswith("@"): 

384 logger.debug(f"Ignoring a line from {contig2bin_table}: {line}") 

385 continue 

386 contig_name = line.strip().split()[0] 

387 bin_name = line.strip().split("\t")[1] 

388 bin_name2contigs[bin_name].add(contig_name) 

389 

390 bins = [] 

391 for bin_name, contigs in bin_name2contigs.items(): 

392 bin_dict = {"contigs": contigs, "set_name": set_name, "bin_name": bin_name} 

393 bins.append(bin_dict) 

394 return bins 

395 

396 

397def from_bins_to_bin_graph(bins: Iterable[Bin]) -> nx.Graph: 

398 """ 

399 Creates a bin graph made of overlapping gram a set of bins. 

400 

401 :param bins: a set of bins 

402 

403 :return: A networkx Graph representing the bin graph of overlapping bins. 

404 """ 

405 G = nx.Graph() 

406 

407 for bin1, bin2 in itertools.combinations(bins, 2): 

408 if bin1.overlaps_with(bin2): 

409 G.add_edge(bin1.contigs_key, bin2.contigs_key) 

410 return G 

411 

412 

413def get_all_possible_combinations(clique: list) -> Iterable[tuple]: 

414 """ 

415 Generates all possible combinations of elements from a given clique. 

416 

417 :param clique: An iterable representing a clique. 

418 

419 :return: An iterable of tuples representing all possible combinations of elements from the clique. 

420 """ 

421 return ( 

422 c for r in range(2, len(clique) + 1) for c in itertools.combinations(clique, r) 

423 ) 

424 

425 

426def build_contig_index(bins_dict: dict[bytes, Bin]) -> dict[int, set[bytes]]: 

427 """ 

428 Build an inverted index: contig_id -> set of contigs_key of bins containing it. 

429 :param bins_dict: Mapping from contigs_key -> Bin. 

430 :return: Inverted index (contig_id -> set of contigs_key). 

431 """ 

432 contig_to_bins = defaultdict(set) 

433 for key, bin_obj in bins_dict.items(): 

434 for contig in bin_obj.contigs: 

435 contig_to_bins[contig].add(key) 

436 return contig_to_bins 

437 

438 

439def remove_bins_from_index( 

440 bin_keys: set[bytes], 

441 bins_dict: dict[bytes, Bin], 

442 contig_to_bins: dict[int, set[bytes]], 

443) -> None: 

444 """ 

445 Remove a set of bins from the inverted index. 

446 

447 :param bin_keys: The contig_keys of bins to remove. 

448 :param bins_dict: Mapping from contig_key -> Bin. 

449 :param contig_to_bins: Inverted index (contig_id -> set of contig_keys). 

450 """ 

451 for key in bin_keys: 

452 ob = bins_dict.get(key) 

453 if ob is not None: 

454 for c in ob.contigs: 

455 contig_to_bins[c].discard(key) 

456 

457 

458def select_best_bins( 

459 bins_dict: dict[bytes, Bin], 

460 min_completeness: float, 

461 max_contamination: float, 

462 prefix: str = "binette", 

463) -> list[Bin]: 

464 """ 

465 Select the best non-overlapping bins based on score, N50, and ID. 

466 

467 :param bins_dict: Mapping from contig_key -> Bin. 

468 :param min_completeness: Minimum completeness threshold for a bin to be considered. 

469 :param max_contamination: Maximum contamination threshold for a bin to be considered. 

470 :param prefix: Prefix to use for naming selected bins. 

471 

472 """ 

473 logger.info("Selecting best bins") 

474 

475 logger.info( 

476 f"Filtering bins: only bins with completeness >= {min_completeness} " 

477 f"and contamination <= {max_contamination}" 

478 ) 

479 good_enough_bins = { 

480 k: b 

481 for k, b in bins_dict.items() 

482 if b.completeness >= min_completeness and b.contamination <= max_contamination 

483 } 

484 

485 logger.info("Sorting bins") 

486 sorted_bin_keys = sorted( 

487 good_enough_bins, 

488 key=lambda k: ( 

489 -good_enough_bins[k].score, 

490 -good_enough_bins[k].N50, 

491 -good_enough_bins[k].is_original, 

492 k, # contigs_key itself is sortable (bytes) 

493 ), 

494 ) 

495 

496 logger.info("Building contig index") 

497 contig_to_bin_keys = build_contig_index(good_enough_bins) 

498 

499 logger.info("Selecting bins") 

500 selected_bins = [] 

501 discarded_keys = set() 

502 

503 for bin_key in sorted_bin_keys: 

504 if bin_key in discarded_keys: 

505 continue 

506 

507 bin_obj = good_enough_bins[bin_key] 

508 selected_bins.append(bin_obj) 

509 

510 # Gather overlapping bins via inverted index 

511 overlapping_bin_keys = set() 

512 for contig in bin_obj.contigs: 

513 overlapping_bin_keys |= contig_to_bin_keys[contig] 

514 

515 # Discard them 

516 discarded_keys |= overlapping_bin_keys 

517 

518 # Remove discarded bins from index to shrink future lookups 

519 remove_bins_from_index( 

520 overlapping_bin_keys, good_enough_bins, contig_to_bin_keys 

521 ) 

522 

523 logger.info(f"Selected {len(selected_bins)} bins") 

524 

525 for i, selected_bin in enumerate(selected_bins, start=1): 

526 if not selected_bin.origin: 

527 selected_bin.origin = {"binette"} 

528 if selected_bin.name is not None: 

529 selected_bin.original_name = selected_bin.name 

530 selected_bin.name = f"{prefix}_bin{i}" 

531 return selected_bins 

532 

533 

534def get_contigs_in_bin_sets(bin_set_name_to_bins: dict[str, set[Bin]]) -> list[str]: 

535 """ 

536 Processes bin sets to check for duplicated contigs and logs detailed information about each bin set. 

537 

538 :param bin_set_name_to_bins: A dictionary where keys are bin set names and values are sets of Bin objects. 

539 

540 :return: A set of contig names found in bin sets 

541 """ 

542 # To track all unique contigs across bin sets 

543 all_contigs_in_bins = set() 

544 

545 for bin_set_name, bins_info in bin_set_name_to_bins.items(): 

546 list_contigs_in_bin_sets = [ 

547 contig for bin_info in bins_info for contig in bin_info["contigs"] 

548 ] 

549 

550 contig_counts = Counter(list_contigs_in_bin_sets) 

551 duplicated_contigs = { 

552 contig: count for contig, count in contig_counts.items() if count > 1 

553 } 

554 

555 if duplicated_contigs: 

556 logger.warning( 

557 f"Bin set '{bin_set_name}' contains {len(duplicated_contigs)} duplicated contigs. " 

558 "Details: " 

559 + ", ".join( 

560 f"{contig} (found {count} times)" 

561 for contig, count in duplicated_contigs.items() 

562 ) 

563 ) 

564 

565 # Unique contigs in current bin set 

566 unique_contigs_in_bin_set = set(list_contigs_in_bin_sets) 

567 

568 # Update global contig tracker 

569 all_contigs_in_bins |= unique_contigs_in_bin_set 

570 

571 # Log summary for the current bin set 

572 logger.debug( 

573 f"Bin set '{bin_set_name}': {len(bins_info)} bins, {len(unique_contigs_in_bin_set)} unique contigs." 

574 ) 

575 

576 return list(all_contigs_in_bins) 

577 

578 

579def sum_contig_lengths( 

580 bm_contigs: BitMap, 

581 contig_lengths: np.ndarray, 

582 cache: dict[bytes, int] | None = None, 

583 key: bytes | None = None, 

584): 

585 if cache is None: 

586 cache = {} 

587 if key is None: 

588 key = bm_contigs.serialize() 

589 if key not in cache: 

590 cache[key] = int(contig_lengths[np.fromiter(bm_contigs, dtype=np.int32)].sum()) 

591 return cache[key] 

592 

593 

594def create_intermediate_bins( 

595 contig_key_to_initial_bin: dict[bytes, Bin], 

596 contig_lengths: np.ndarray, 

597 min_comp: float, 

598 max_conta: float, 

599 min_len: int, 

600 max_len: int, 

601 disable_progress_bar: bool = False, 

602) -> dict[bytes, Bin]: 

603 """ 

604 Creates intermediate bins from a dictionary of bin sets. 

605 

606 :param original_bins: Set of input bins. 

607 

608 :return: A set of intermediate bins created from intersections, differences, and unions. 

609 """ 

610 bin_length_cache = {} 

611 

612 logger.info("Making bin graph") 

613 connected_bins_graph = from_bins_to_bin_graph(contig_key_to_initial_bin.values()) 

614 

615 cliques_of_bins = sorted( 

616 [sorted(clique) for clique in nx.clique.find_cliques(connected_bins_graph)] 

617 ) 

618 

619 logger.info("Creating union, difference, and intersection bins") 

620 logger.debug(f"{min_comp} min completeness for intersection and difference bins") 

621 logger.debug(f"{max_conta} max contamination for intersection and difference bins") 

622 logger.debug(f"{min_len} min length for intersection and difference bins") 

623 logger.debug(f"{max_len} max length for intersection and difference bins") 

624 logger.info( 

625 f"Intermediate bins filtered by minimum length of {min_len} and maximum length of {max_len}." 

626 ) 

627 intersec_count = 0 

628 union_count = 0 

629 diff_count = 0 

630 

631 intersec_size_discarded_count = 0 

632 diff_size_discarded_count = 0 

633 union_size_discarded_count = 0 

634 

635 contig_key_to_new_contigs_set = {} 

636 discarded_contig_set_keys = set() 

637 with Progress(disable=disable_progress_bar) as progress: 

638 task = progress.add_task( 

639 f"Processing {len(cliques_of_bins)} cliques of bins", 

640 total=len(cliques_of_bins), 

641 ) 

642 for clique in cliques_of_bins: 

643 progress.update(task, advance=1) 

644 bins_combinations = get_all_possible_combinations(clique) 

645 

646 for bin_contig_keys in bins_combinations: 

647 bins = [contig_key_to_initial_bin[ck] for ck in bin_contig_keys] 

648 

649 if all( 

650 b.completeness >= min_comp and b.length >= min_len for b in bins 

651 ): 

652 intersec_contigs = bins[0].contig_intersection(*bins[1:]) 

653 

654 if intersec_contigs: 

655 contig_key = intersec_contigs.serialize() 

656 

657 if ( 

658 contig_key not in contig_key_to_initial_bin 

659 and contig_key not in contig_key_to_new_contigs_set 

660 and contig_key not in discarded_contig_set_keys 

661 ): 

662 contigs_length = sum_contig_lengths( 

663 intersec_contigs, 

664 contig_lengths, 

665 cache=bin_length_cache, 

666 key=contig_key, 

667 ) 

668 

669 if contigs_length >= min_len and contigs_length <= max_len: 

670 contig_key_to_new_contigs_set[contig_key] = ( 

671 intersec_contigs 

672 ) 

673 intersec_count += 1 

674 else: 

675 discarded_contig_set_keys.add(contig_key) 

676 intersec_size_discarded_count += 1 

677 

678 for bin_a in bins: 

679 if bin_a.completeness >= min_comp and bin_a.length >= min_len: 

680 diff_contigs = bin_a.contig_difference( 

681 *(b for b in bins if b != bin_a) 

682 ) 

683 

684 if diff_contigs: 

685 contig_key = diff_contigs.serialize() 

686 

687 if ( 

688 contig_key not in contig_key_to_initial_bin 

689 and contig_key not in contig_key_to_new_contigs_set 

690 and contig_key not in discarded_contig_set_keys 

691 ): 

692 contigs_length = sum_contig_lengths( 

693 diff_contigs, 

694 contig_lengths, 

695 cache=bin_length_cache, 

696 key=contig_key, 

697 ) 

698 

699 if ( 

700 contigs_length >= min_len 

701 and contigs_length <= max_len 

702 ): 

703 contig_key_to_new_contigs_set[contig_key] = ( 

704 diff_contigs 

705 ) 

706 diff_count += 1 

707 else: 

708 discarded_contig_set_keys.add(contig_key) 

709 diff_size_discarded_count += 1 

710 

711 if all( 

712 b.contamination <= max_conta and b.length <= max_len for b in bins 

713 ): 

714 union_contigs = bins[0].contig_union(*bins[1:]) 

715 if union_contigs: 

716 contig_key = union_contigs.serialize() 

717 if ( 

718 contig_key not in contig_key_to_initial_bin 

719 and contig_key not in contig_key_to_new_contigs_set 

720 and contig_key not in discarded_contig_set_keys 

721 ): 

722 contigs_length = sum_contig_lengths( 

723 union_contigs, 

724 contig_lengths, 

725 cache=bin_length_cache, 

726 key=contig_key, 

727 ) 

728 if contigs_length >= min_len and contigs_length <= max_len: 

729 contig_key_to_new_contigs_set[contig_key] = ( 

730 union_contigs 

731 ) 

732 union_count += 1 

733 else: 

734 discarded_contig_set_keys.add(contig_key) 

735 union_size_discarded_count += 1 

736 

737 logger.info( 

738 f"Intersection: {intersec_count} bins created, {intersec_size_discarded_count} discarded due to size constraints." 

739 ) 

740 

741 logger.info( 

742 f"Symmetric Difference: {diff_count} bins created, {diff_size_discarded_count} discarded due to size constraints." 

743 ) 

744 

745 logger.info( 

746 f"Union: {union_count} bins created, {union_size_discarded_count} discarded due to size constraints." 

747 ) 

748 

749 contig_key_to_new_bin: dict[bytes, Bin] = { 

750 contig_key: Bin(contigs, is_original=False) 

751 for contig_key, contigs in contig_key_to_new_contigs_set.items() 

752 } 

753 

754 logger.info( 

755 f"{len(contig_key_to_new_bin)} new bins created from {len(contig_key_to_initial_bin)} input bins." 

756 ) 

757 

758 return contig_key_to_new_bin