Coverage for binette/io_manager.py: 99%
101 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-01-06 19:22 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-01-06 19:22 +0000
1from collections import defaultdict
2import logging
3import pyfastx
4from typing import Iterable, List, Dict, Tuple, Set
5import csv
7from binette.bin_manager import Bin
9from pathlib import Path
12def get_paths_common_prefix_suffix(
13 paths: List[Path],
14) -> Tuple[List[str], List[str], List[str]]:
15 """
16 Determine the common prefix parts, suffix parts, and common extensions of the last part of a list of pathlib.Path objects.
18 :param paths: List of pathlib.Path objects.
19 :return: A tuple containing three lists:
20 - The common prefix parts.
21 - The common suffix parts.
22 - The common extensions of the last part of the paths.
23 """
24 # Extract parts for all paths
25 parts = [list(path.parts) for path in paths]
27 # Find the common prefix
28 if not parts:
29 return [], [], []
31 # Initialize common prefix and suffix lists
32 common_prefix = list(parts[0])
33 common_suffix = list(parts[0])
34 # Determine common prefix
35 for part_tuple in parts[1:]:
36 common_prefix_length = min(len(common_prefix), len(part_tuple))
37 common_prefix = [
38 common_prefix[i]
39 for i in range(common_prefix_length)
40 if common_prefix[: i + 1] == part_tuple[: i + 1]
41 ]
42 if not common_prefix:
43 break
45 # Determine common suffix
46 for part_tuple in parts[1:]:
47 common_suffix_length = min(len(common_suffix), len(part_tuple))
48 common_suffix = [
49 common_suffix[-i]
50 for i in range(1, common_suffix_length + 1)
51 if common_suffix[-i:] == part_tuple[-i:]
52 ]
53 if not common_suffix:
54 break
55 if len(parts) > 1:
56 common_suffix.reverse()
58 # Determine common extensions of the last part of the paths
59 if len(paths) == 1:
60 common_extensions = paths[0].suffixes
61 else:
62 common_extensions = list(paths[0].suffixes)
63 for path in paths[1:]:
64 common_extension_length = min(len(common_extensions), len(path.suffixes))
65 common_extensions = [
66 common_extensions[i]
67 for i in range(common_extension_length)
68 if common_extensions[i] == path.suffixes[i]
69 ]
70 if not common_extensions:
71 break
73 return common_prefix, common_suffix, common_extensions
76def infer_bin_set_names_from_input_paths(input_bins: List[Path]) -> Dict[str, Path]:
77 """
78 Infer bin set names from a list of bin input directories or files.
80 :param input_bins: List of input bin directories or files.
81 :return: Dictionary mapping inferred bin names to their corresponding directories or files.
82 """
83 bin_name_to_bin_dir = {}
85 common_prefix, common_suffix, common_extensions = get_paths_common_prefix_suffix(
86 input_bins
87 )
89 for path in input_bins:
91 specific_parts = path.parts[
92 len(common_prefix) : len(path.parts) - len(common_suffix)
93 ]
95 if not common_suffix and common_extensions:
96 last_specific_part = specific_parts[-1].split(".")[
97 : -len(common_extensions)
98 ]
99 specific_parts = list(specific_parts[:-1]) + last_specific_part
101 bin_set_name = "/".join(specific_parts)
102 if bin_set_name == "":
103 bin_set_name = path.as_posix()
105 bin_name_to_bin_dir[bin_set_name] = path
107 logging.debug(f"Input bins: {' '.join([path.as_posix() for path in input_bins])}")
108 logging.debug(f"Common prefix to remove: {common_prefix}")
109 logging.debug(f"Common suffix to remove: {common_suffix}")
110 logging.debug(f"Common extension to remove: {common_suffix}")
111 logging.debug(f"bin_name_to_bin_dir: {bin_name_to_bin_dir}")
113 return bin_name_to_bin_dir
116def write_bin_info(bins: Iterable[Bin], output: Path, add_contigs: bool = False):
117 """
118 Write bin information to a TSV file.
120 :param bins: List of Bin objects.
121 :param output: Output file path for writing the TSV.
122 :param add_contigs: Flag indicating whether to include contig information.
123 """
125 header = [
126 "bin_id",
127 "origin",
128 "name",
129 "completeness",
130 "contamination",
131 "score",
132 "size",
133 "N50",
134 "contig_count",
135 ]
136 if add_contigs:
137 header.append("contigs")
139 bin_infos = []
140 for bin_obj in sorted(bins, key=lambda x: (x.score, x.N50, -x.id), reverse=True):
141 bin_info = [
142 bin_obj.id,
143 ";".join(bin_obj.origin),
144 bin_obj.name,
145 bin_obj.completeness,
146 bin_obj.contamination,
147 bin_obj.score,
148 bin_obj.length,
149 bin_obj.N50,
150 len(bin_obj.contigs),
151 ]
152 if add_contigs:
153 bin_info.append(
154 ";".join(str(c) for c in bin_obj.contigs) if add_contigs else ""
155 )
157 bin_infos.append(bin_info)
159 with open(output, "w", newline="") as fl:
160 writer = csv.writer(fl, delimiter="\t")
161 writer.writerow(header)
162 writer.writerows(bin_infos)
165def write_bins_fasta(selected_bins: List[Bin], contigs_fasta: Path, outdir: Path):
166 """
167 Write selected bins' contigs to separate FASTA files.
169 :param selected_bins: List of Bin objects representing the selected bins.
170 :param contigs_fasta: Path to the input FASTA file containing contig sequences.
171 :param outdir: Output directory to save the individual bin FASTA files.
172 """
174 fa = pyfastx.Fasta(contigs_fasta.as_posix(), build_index=True)
176 for sbin in selected_bins:
177 outfile = outdir / f"bin_{sbin.id}.fa"
179 with open(outfile, "w") as outfl:
180 sequences = (f">{c}\n{fa[c]}" for c in sbin.contigs)
181 outfl.write("\n".join(sequences) + "\n")
184def check_contig_consistency(
185 contigs_from_assembly: Iterable[str],
186 contigs_from_elsewhere: Iterable[str],
187 assembly_file: str,
188 elsewhere_file: str,
189):
190 """
191 Check the consistency of contig names between different sources.
193 :param contigs_from_assembly: List of contig names from the assembly file.
194 :param contigs_from_elsewhere: List of contig names from an external source.
195 :param assembly_file: Path to the assembly file.
196 :param elsewhere_file: Path to the file from an external source.
197 :raises AssertionError: If inconsistencies in contig names are found.
198 """
199 logging.debug("check_contig_consistency.")
200 are_contigs_consistent = len(
201 set(contigs_from_elsewhere) | set(contigs_from_assembly)
202 ) <= len(set(contigs_from_assembly))
204 issue_countigs = len(set(contigs_from_elsewhere) - set(contigs_from_assembly))
206 message = (
207 f"{issue_countigs} contigs found in file '{elsewhere_file}' "
208 f"were not found in assembly_file '{assembly_file}'"
209 )
210 assert are_contigs_consistent, message
213def check_resume_file(faa_file: Path, diamond_result_file: Path) -> None:
214 """
215 Check the existence of files required for resuming the process.
217 :param faa_file: Path to the protein file.
218 :param diamond_result_file: Path to the Diamond result file.
219 :raises FileNotFoundError: If the required files don't exist for resuming.
220 """
222 if faa_file.exists() and diamond_result_file.exists():
223 return
225 if not faa_file.exists():
226 error_msg = (
227 f"Protein file '{faa_file}' does not exist. Resuming is not possible."
228 )
229 logging.error(error_msg)
230 raise FileNotFoundError(error_msg)
232 if not diamond_result_file.exists():
233 error_msg = f"Diamond result file '{diamond_result_file}' does not exist. Resuming is not possible."
234 logging.error(error_msg)
235 raise FileNotFoundError(error_msg)
238def write_original_bin_metrics(original_bins: Set[Bin], original_bin_report_dir: Path):
239 """
240 Write metrics of original input bins to a specified directory.
242 This function writes the metrics for each bin set to a TSV file in the specified directory.
243 Each bin set will have its own TSV file named according to its set name.
245 :param original_bins: A set containing input bins
246 :param original_bin_report_dir: The directory path (Path) where the bin metrics will be saved.
247 """
249 original_bin_report_dir.mkdir(parents=True, exist_ok=True)
251 bin_set_name_to_bins = defaultdict(set)
252 for bin_obj in original_bins:
253 for origin in bin_obj.origin:
254 bin_set_name_to_bins[origin].add(bin_obj)
256 for i, (set_name, bins) in enumerate(sorted(bin_set_name_to_bins.items())):
257 bins_metric_file = (
258 original_bin_report_dir
259 / f"input_bins_{i + 1}.{set_name.replace('/', '_')}.tsv"
260 )
262 logging.debug(
263 f"Writing metrics for bin set '{set_name}' to file: {bins_metric_file}"
264 )
265 write_bin_info(bins, bins_metric_file)
267 logging.debug("Completed writing all original input bin metrics.")