Coverage for binette/bin_manager.py: 97%
222 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-01-06 19:22 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-01-06 19:22 +0000
1import logging
2from collections import defaultdict
3from pathlib import Path
5import pyfastx
7import itertools
8import networkx as nx
9from typing import List, Dict, Iterable, Tuple, Set, Mapping
12class Bin:
13 counter = 0
15 def __init__(
16 self, contigs: Iterable[str], origin: str, name: str, is_original: bool = False
17 ) -> None:
18 """
19 Initialize a Bin object.
21 :param contigs: Iterable of contig names belonging to the bin.
22 :param origin: Origin/source of the bin.
23 :param name: Name of the bin.
24 """
25 Bin.counter += 1
27 self.origin = {origin}
28 self.name = name
29 self.id = Bin.counter
30 self.contigs = set(contigs)
31 self.hash = hash(str(sorted(self.contigs)))
33 self.length = None
34 self.N50 = None
36 self.completeness = None
37 self.contamination = None
38 self.score = None
40 self.is_original = is_original
42 def __eq__(self, other: "Bin") -> bool:
43 """
44 Compare the Bin object with another object for equality.
46 :param other: The object to compare with.
47 :return: True if the objects are equal, False otherwise.
48 """
49 return self.contigs == other.contigs
51 def __hash__(self) -> int:
52 """
53 Compute the hash value of the Bin object.
55 :return: The hash value.
56 """
57 return self.hash
59 def __str__(self) -> str:
60 """
61 Return a string representation of the Bin object.
63 :return: The string representation of the Bin object.
64 """
65 return (
66 f"Bin {self.id} from {';'.join(self.origin)} ({len(self.contigs)} contigs)"
67 )
69 def overlaps_with(self, other: "Bin") -> Set[str]:
70 """
71 Find the contigs that overlap between this bin and another bin.
73 :param other: The other Bin object.
74 :return: A set of contig names that overlap between the bins.
75 """
76 return self.contigs & other.contigs
78 # def __and__(self, other: 'Bin') -> 'Bin':
79 # """
80 # Perform a logical AND operation between this bin and another bin.
82 # :param other: The other Bin object.
83 # :return: A new Bin object representing the intersection of the bins.
84 # """
85 # contigs = self.contigs & other.contigs
86 # name = f"{self.name} & {other.name}"
87 # origin = "intersection"
89 # return Bin(contigs, origin, name)
91 def add_length(self, length: int) -> None:
92 """
93 Add the length attribute to the Bin object if the provided length is a positive integer.
95 :param length: The length value to add.
96 :return: None
97 """
98 if isinstance(length, int) and length > 0:
99 self.length = length
100 else:
101 raise ValueError("Length should be a positive integer.")
103 def add_N50(self, n50: int) -> None:
104 """
105 Add the N50 attribute to the Bin object.
107 :param n50: The N50 value to add.
108 :return: None
109 """
110 if isinstance(n50, int) and n50 >= 0:
111 self.N50 = n50
112 else:
113 raise ValueError("N50 should be a positive integer.")
115 def add_quality(
116 self, completeness: float, contamination: float, contamination_weight: float
117 ) -> None:
118 """
119 Set the quality attributes of the bin.
121 :param completeness: The completeness value.
122 :param contamination: The contamination value.
123 :param contamination_weight: The weight assigned to contamination in the score calculation.
124 """
125 self.completeness = completeness
126 self.contamination = contamination
127 self.score = completeness - contamination_weight * contamination
129 def intersection(self, *others: "Bin") -> "Bin":
130 """
131 Compute the intersection of the bin with other bins.
133 :param others: Other bins to compute the intersection with.
134 :return: A new Bin representing the intersection of the bins.
135 """
136 other_contigs = (o.contigs for o in others)
137 contigs = self.contigs.intersection(*other_contigs)
138 name = f"{self.id} & {' & '.join([str(other.id) for other in others])}"
139 origin = "intersec"
141 return Bin(contigs, origin, name)
143 def difference(self, *others: "Bin") -> "Bin":
144 """
145 Compute the difference between the bin and other bins.
147 :param others: Other bins to compute the difference with.
148 :return: A new Bin representing the difference between the bins.
149 """
150 other_contigs = (o.contigs for o in others)
151 contigs = self.contigs.difference(*other_contigs)
152 name = f"{self.id} - {' - '.join([str(other.id) for other in others])}"
153 origin = "diff"
155 return Bin(contigs, origin, name)
157 def union(self, *others: "Bin") -> "Bin":
158 """
159 Compute the union of the bin with other bins.
161 :param others: Other bins to compute the union with.
162 :return: A new Bin representing the union of the bins.
163 """
164 other_contigs = (o.contigs for o in others)
165 contigs = self.contigs.union(*other_contigs)
166 name = f"{self.id} | {' | '.join([str(other.id) for other in others])}"
167 origin = "union"
169 return Bin(contigs, origin, name)
171 def is_complete_enough(self, min_completeness: float) -> bool:
172 """
173 Determine if a bin is complete enough based on completeness threshold.
175 :param min_completeness: The minimum completeness required for a bin.
177 :raises ValueError: If completeness has not been set (is None).
179 :return: True if the bin meets the min_completeness threshold; False otherwise.
180 """
182 if self.completeness is None:
183 raise ValueError(
184 f"The bin '{self.name}' with ID '{self.id}' has not been evaluated for completeness or contamination, "
185 "and therefore cannot be assessed."
186 )
188 return self.completeness >= min_completeness
190 def is_high_quality(
191 self, min_completeness: float, max_contamination: float
192 ) -> bool:
193 """
194 Determine if a bin is considered high quality based on completeness and contamination thresholds.
196 :param min_completeness: The minimum completeness required for a bin to be considered high quality.
197 :param max_contamination: The maximum allowed contamination for a bin to be considered high quality.
199 :raises ValueError: If either completeness or contamination has not been set (is None).
201 :return: True if the bin meets the high quality criteria; False otherwise.
202 """
203 if self.completeness is None or self.contamination is None:
204 raise ValueError(
205 f"The bin '{self.name}' with ID '{self.id}' has not been evaluated for completeness or contamination, "
206 "and therefore cannot be assessed for high quality."
207 )
209 return (
210 self.completeness >= min_completeness
211 and self.contamination <= max_contamination
212 )
215def get_bins_from_directory(
216 bin_dir: Path, set_name: str, fasta_extensions: Set[str]
217) -> List[Bin]:
218 """
219 Retrieves a list of Bin objects from a directory containing bin FASTA files.
221 :param bin_dir: The directory path containing bin FASTA files.
222 :param set_name: The name of the set the bins belong to.
223 :fasta_extensions: Possible fasta extensions to look for in the bin directory.
225 :return: A list of Bin objects created from the bin FASTA files.
226 """
227 bins = []
228 fasta_extensions |= {
229 f".{ext}" for ext in fasta_extensions if not ext.startswith(".")
230 } # adding a dot in case given extension are lacking one
231 bin_fasta_files = (
232 fasta_file
233 for fasta_file in bin_dir.glob("*")
234 if set(fasta_file.suffixes) & fasta_extensions
235 )
237 for bin_fasta_path in bin_fasta_files:
239 bin_name = bin_fasta_path.name
241 contigs = {
242 name for name, _ in pyfastx.Fasta(str(bin_fasta_path), build_index=False)
243 }
245 bin_obj = Bin(contigs, set_name, bin_name)
247 bins.append(bin_obj)
249 return bins
252def parse_bin_directories(
253 bin_name_to_bin_dir: Dict[str, Path], fasta_extensions: Set[str]
254) -> Dict[str, Set[Bin]]:
255 """
256 Parses multiple bin directories and returns a dictionary mapping bin names to a list of Bin objects.
258 :param bin_name_to_bin_dir: A dictionary mapping bin names to their respective bin directories.
259 :fasta_extensions: Possible fasta extensions to look for in the bin directory.
261 :return: A dictionary mapping bin names to a list of Bin objects created from the bin directories.
262 """
263 bin_set_name_to_bins = {}
265 for name, bin_dir in bin_name_to_bin_dir.items():
266 bins = get_bins_from_directory(bin_dir, name, fasta_extensions)
267 set_of_bins = set(bins)
269 # Calculate the number of duplicates
270 num_duplicates = len(bins) - len(set_of_bins)
272 if num_duplicates > 0:
273 logging.warning(
274 f'{num_duplicates} bins with identical contig compositions detected in bin set "{name}". '
275 "These bins were merged to ensure uniqueness."
276 )
278 # Store the unique set of bins
279 bin_set_name_to_bins[name] = set_of_bins
281 return bin_set_name_to_bins
284def parse_contig2bin_tables(
285 bin_name_to_bin_tables: Dict[str, Path]
286) -> Dict[str, Set["Bin"]]:
287 """
288 Parses multiple contig-to-bin tables and returns a dictionary mapping bin names to a set of unique Bin objects.
290 Logs a warning if duplicate bins are detected within a bin set.
292 :param bin_name_to_bin_tables: A dictionary where keys are bin set names and values are file paths or identifiers
293 for contig-to-bin tables. Each table is parsed to extract Bin objects.
295 :return: A dictionary where keys are bin set names and values are sets of Bin objects. Duplicates are removed based
296 on contig composition.
297 """
298 bin_set_name_to_bins = {}
300 for name, contig2bin_table in bin_name_to_bin_tables.items():
301 bins = get_bins_from_contig2bin_table(contig2bin_table, name)
302 set_of_bins = set(bins)
304 # Calculate the number of duplicates
305 num_duplicates = len(bins) - len(set_of_bins)
307 if num_duplicates > 0:
308 logging.warning(
309 f'{num_duplicates*2} bins with identical contig compositions detected in bin set "{name}". '
310 "These bins were merged to ensure uniqueness."
311 )
313 # Store the unique set of bins
314 bin_set_name_to_bins[name] = set_of_bins
316 return bin_set_name_to_bins
319def get_bins_from_contig2bin_table(contig2bin_table: Path, set_name: str) -> List[Bin]:
320 """
321 Retrieves a list of Bin objects from a contig-to-bin table.
323 :param contig2bin_table: The path to the contig-to-bin table.
324 :param set_name: The name of the set the bins belong to.
326 :return: A list of Bin objects created from the contig-to-bin table.
327 """
328 bin_name2contigs = defaultdict(set)
329 with open(contig2bin_table) as fl:
330 for line in fl:
331 if line.startswith("#") or line.startswith("@"):
332 logging.debug(f"Ignoring a line from {contig2bin_table}: {line}")
333 continue
334 contig_name = line.strip().split()[0]
335 bin_name = line.strip().split("\t")[1]
336 bin_name2contigs[bin_name].add(contig_name)
338 bins = []
339 for bin_name, contigs in bin_name2contigs.items():
340 bin_obj = Bin(contigs, set_name, bin_name)
341 bins.append(bin_obj)
342 return bins
345def from_bins_to_bin_graph(bins) -> nx.Graph:
346 """
347 Creates a bin graph made of overlapping gram a set of bins.
349 :param bins: a set of bins
351 :return: A networkx Graph representing the bin graph of overlapping bins.
352 """
353 G = nx.Graph()
355 for bin1, bin2 in itertools.combinations(bins, 2):
356 if bin1.overlaps_with(bin2):
357 G.add_edge(bin1, bin2)
358 return G
361def get_all_possible_combinations(clique: List) -> Iterable[Tuple]:
362 """
363 Generates all possible combinations of elements from a given clique.
365 :param clique: An iterable representing a clique.
367 :return: An iterable of tuples representing all possible combinations of elements from the clique.
368 """
369 return (
370 c for r in range(2, len(clique) + 1) for c in itertools.combinations(clique, r)
371 )
374def get_intersection_bins(G: nx.Graph) -> Set[Bin]:
375 """
376 Retrieves the intersection bins from a given graph.
378 :param G: A networkx Graph representing the graph.
380 :return: A set of Bin objects representing the intersection bins.
381 """
382 intersect_bins = set()
384 for clique in nx.clique.find_cliques(G):
385 bins_combinations = get_all_possible_combinations(clique)
386 for bins in bins_combinations:
387 if max((b.completeness for b in bins)) < 40:
388 continue
390 intersec_bin = bins[0].intersection(*bins[1:])
392 if intersec_bin.contigs: # and intersec_bin not in clique:
394 intersect_bins.add(intersec_bin)
396 return intersect_bins
399def get_difference_bins(G: nx.Graph) -> Set[Bin]:
400 """
401 Retrieves the difference bins from a given graph.
403 :param G: A networkx Graph representing the graph.
405 :return: A set of Bin objects representing the difference bins.
406 """
407 difference_bins = set()
409 for clique in nx.clique.find_cliques(G):
411 bins_combinations = get_all_possible_combinations(clique)
412 for bins in bins_combinations:
414 for bin_a in bins:
415 if bin_a.completeness < 40:
416 continue
417 bin_diff = bin_a.difference(*(b for b in bins if b != bin_a))
419 if bin_diff.contigs: # and bin_diff not in clique:
420 difference_bins.add(bin_diff)
422 return difference_bins
425def get_union_bins(G: nx.Graph, max_conta: int = 50) -> Set[Bin]:
426 """
427 Retrieves the union bins from a given graph.
429 :param G: A networkx Graph representing the graph of bins.
430 :param max_conta: Maximum allowed contamination value for a bin to be included in the union.
432 :return: A set of Bin objects representing the union bins.
433 """
434 union_bins = set()
435 for clique in nx.clique.find_cliques(G):
436 bins_combinations = get_all_possible_combinations(clique)
437 for bins in bins_combinations:
438 if max((b.contamination for b in bins)) > 20:
439 continue
441 bins = set(bins)
442 bin_a = bins.pop()
443 bin_union = bin_a.union(*bins)
444 if bin_union.contigs: # and bin_union not in clique:
445 union_bins.add(bin_union)
447 return union_bins
450def select_best_bins(bins: Set[Bin]) -> List[Bin]:
451 """
452 Selects the best bins from a list of bins based on their scores, N50 values, and IDs.
454 :param bins: A list of Bin objects.
456 :return: A list of selected Bin objects.
457 """
459 logging.info("Sorting bins")
460 # Sort on score, N50, and ID. Smaller ID values are preferred to select original bins first.
461 sorted_bins = sorted(bins, key=lambda x: (x.score, x.N50, -x.id), reverse=True)
463 logging.info("Selecting bins")
464 selected_bins = []
465 for b in sorted_bins:
466 if b in bins:
467 overlapping_bins = {b2 for b2 in bins if b.overlaps_with(b2)}
468 bins -= overlapping_bins
470 selected_bins.append(b)
472 logging.info(f"Selected {len(selected_bins)} bins")
473 return selected_bins
476def group_identical_bins(bins: Iterable[Bin]) -> List[List[Bin]]:
477 """
478 Group identical bins together
480 :param bins: list of bins
482 return List of list of identical bins
483 """
484 binhash_to_bins = defaultdict(list)
486 # Collect bins by their hash values
487 for bin_obj in bins:
488 binhash_to_bins[bin_obj.hash].append(bin_obj)
490 return list(binhash_to_bins.values())
493def dereplicate_bin_sets(bin_sets: Iterable[Set["Bin"]]) -> Set["Bin"]:
494 """
495 Consolidate bins from multiple bin sets into a single set of non-redundant bins.
497 Bins with the same hash are considered duplicates. For each group of duplicates,
498 the origins are merged, and only one representative bin is kept.
500 :param bin_sets: An iterable of sets, where each set contains `Bin` objects. These sets are merged
501 into a single set of unique bins by consolidating bins with the same hash.
503 :return: A set of `Bin` objects with duplicates removed. Each `Bin` in the resulting set has
504 merged origins from the bins it was consolidated with.
505 """
506 all_bins = (bin_obj for bins in bin_sets for bin_obj in bins)
507 list_of_identical_bins = group_identical_bins(all_bins)
509 dereplicated_bins = set()
511 # Merge bins with the same hash
512 for identical_bins in list_of_identical_bins:
513 # Select the first bin as the representative
514 selected_bin = identical_bins[0]
515 for bin_obj in identical_bins[1:]:
516 # Merge origins of all bins with the same hash
517 selected_bin.origin |= bin_obj.origin
519 # Add the representative bin to the result set
520 dereplicated_bins.add(selected_bin)
522 return dereplicated_bins
525def get_contigs_in_bin_sets(bin_set_name_to_bins: Dict[str, Set[Bin]]) -> Set[str]:
526 """
527 Processes bin sets to check for duplicated contigs and logs detailed information about each bin set.
529 :param bin_set_name_to_bins: A dictionary where keys are bin set names and values are sets of Bin objects.
531 :return: A set of contig names found in bin sets
532 """
533 # To track all unique contigs across bin sets
534 all_contigs_in_bins = set()
536 for bin_set_name, bins in bin_set_name_to_bins.items():
537 list_contigs_in_bin_sets = get_contigs_in_bins(bins)
539 # Count duplicates
540 contig_counts = {
541 contig: list_contigs_in_bin_sets.count(contig)
542 for contig in list_contigs_in_bin_sets
543 }
544 duplicated_contigs = {
545 contig: count for contig, count in contig_counts.items() if count > 1
546 }
548 if duplicated_contigs:
549 logging.warning(
550 f"Bin set '{bin_set_name}' contains {len(duplicated_contigs)} duplicated contigs. "
551 "Details: "
552 + ", ".join(
553 f"{contig} (found {count} times)"
554 for contig, count in duplicated_contigs.items()
555 )
556 )
558 # Unique contigs in current bin set
559 unique_contigs_in_bin_set = set(list_contigs_in_bin_sets)
561 # Update global contig tracker
562 all_contigs_in_bins |= unique_contigs_in_bin_set
564 # Log summary for the current bin set
565 logging.debug(
566 f"Bin set '{bin_set_name}': {len(bins)} bins, {len(unique_contigs_in_bin_set)} unique contigs."
567 )
569 return all_contigs_in_bins
572def get_contigs_in_bins(bins: Iterable[Bin]) -> List[str]:
573 """
574 Retrieves all contigs present in the given list of bins.
576 :param bins: A list of Bin objects.
578 :return: A list of contigs present in the bins.
579 """
580 return [contig for b in bins for contig in b.contigs]
583def rename_bin_contigs(bins: Iterable[Bin], contig_to_index: dict):
584 """
585 Renames the contigs in the bins based on the provided mapping.
587 :param bins: A list of Bin objects.
588 :param contig_to_index: A dictionary mapping old contig names to new index names.
589 """
590 for b in bins:
591 b.contigs = {contig_to_index[contig] for contig in b.contigs}
592 b.hash = hash(str(sorted(b.contigs)))
595def create_intermediate_bins(original_bins: Set[Bin]) -> Set[Bin]:
596 """
597 Creates intermediate bins from a dictionary of bin sets.
599 :param original_bins: Set of input bins.
601 :return: A set of intermediate bins created from intersections, differences, and unions.
602 """
604 logging.info("Making bin graph...")
605 connected_bins_graph = from_bins_to_bin_graph(original_bins)
607 logging.info("Creating intersection bins...")
608 intersection_bins = get_intersection_bins(connected_bins_graph)
610 logging.info(f"{len(intersection_bins)} bins created on intersections.")
612 logging.info("Creating difference bins...")
613 difference_bins = get_difference_bins(connected_bins_graph)
615 logging.info(f"{len(difference_bins)} bins created based on symmetric difference.")
617 logging.info("Creating union bins...")
618 union_bins = get_union_bins(connected_bins_graph)
620 logging.info(f"{len(union_bins)} bins created on unions.")
622 new_bins = difference_bins | intersection_bins | union_bins
624 new_bins = dereplicate_bin_sets((difference_bins, intersection_bins, union_bins))
626 # dereplicate from the original bins
627 original_hashes = [b.hash for b in original_bins]
628 new_bins_not_in_original = set()
630 for b in new_bins:
631 if b.hash not in original_hashes:
632 new_bins_not_in_original.add(b)
634 return new_bins_not_in_original