Coverage for binette/io_manager.py: 99%
137 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-14 14:36 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-14 14:36 +0000
1import logging
2from collections import defaultdict
3from collections.abc import Iterable
4from pathlib import Path
6import pandas as pd
7import pyfastx
9from binette.bin_manager import Bin
11logger = logging.getLogger(__name__)
14def get_paths_common_prefix_suffix(
15 paths: list[Path],
16) -> tuple[list[str], list[str], list[str]]:
17 """
18 Determine the common prefix parts, suffix parts, and common extensions of the last part of a list of pathlib.Path objects.
20 :param paths: List of pathlib.Path objects.
21 :return: A tuple containing three lists:
22 - The common prefix parts.
23 - The common suffix parts.
24 - The common extensions of the last part of the paths.
25 """
26 # Extract parts for all paths
27 parts = [list(path.parts) for path in paths]
29 # Find the common prefix
30 if not parts:
31 return [], [], []
33 # Initialize common prefix and suffix lists
34 common_prefix = list(parts[0])
35 common_suffix = list(parts[0])
36 # Determine common prefix
37 for part_tuple in parts[1:]:
38 common_prefix_length = min(len(common_prefix), len(part_tuple))
39 common_prefix = [
40 common_prefix[i]
41 for i in range(common_prefix_length)
42 if common_prefix[: i + 1] == part_tuple[: i + 1]
43 ]
44 if not common_prefix:
45 break
47 # Determine common suffix
48 for part_tuple in parts[1:]:
49 common_suffix_length = min(len(common_suffix), len(part_tuple))
50 common_suffix = [
51 common_suffix[-i]
52 for i in range(1, common_suffix_length + 1)
53 if common_suffix[-i:] == part_tuple[-i:]
54 ]
55 if not common_suffix:
56 break
57 if len(parts) > 1:
58 common_suffix.reverse()
60 # Determine common extensions of the last part of the paths
61 if len(paths) == 1:
62 common_extensions = paths[0].suffixes
63 else:
64 common_extensions = list(paths[0].suffixes)
65 for path in paths[1:]:
66 common_extension_length = min(len(common_extensions), len(path.suffixes))
67 common_extensions = [
68 common_extensions[i]
69 for i in range(common_extension_length)
70 if common_extensions[i] == path.suffixes[i]
71 ]
72 if not common_extensions:
73 break
75 return common_prefix, common_suffix, common_extensions
78def infer_bin_set_names_from_input_paths(input_bins: list[Path]) -> dict[str, Path]:
79 """
80 Infer bin set names from a list of bin input directories or files.
82 :param input_bins: List of input bin directories or files.
83 :return: Dictionary mapping inferred bin names to their corresponding directories or files.
84 """
85 bin_name_to_bin_dir = {}
87 common_prefix, common_suffix, common_extensions = get_paths_common_prefix_suffix(
88 input_bins
89 )
91 for path in input_bins:
92 specific_parts = path.parts[
93 len(common_prefix) : len(path.parts) - len(common_suffix)
94 ]
96 if not common_suffix and common_extensions:
97 last_specific_part = specific_parts[-1].split(".")[
98 : -len(common_extensions)
99 ]
100 specific_parts = list(specific_parts[:-1]) + last_specific_part
102 bin_set_name = "/".join(specific_parts)
103 if bin_set_name == "":
104 bin_set_name = path.as_posix()
106 bin_name_to_bin_dir[bin_set_name] = path
108 logger.debug(f"Input bins: {' '.join([path.as_posix() for path in input_bins])}")
109 logger.debug(f"Common prefix to remove: {common_prefix}")
110 logger.debug(f"Common suffix to remove: {common_suffix}")
111 logger.debug(f"Common extension to remove: {common_suffix}")
112 logger.debug(f"bin_name_to_bin_dir: {bin_name_to_bin_dir}")
114 return bin_name_to_bin_dir
117def write_bin_info(bins: Iterable[Bin], output: Path, add_contigs: bool = False):
118 """
119 Write bin information to a TSV file.
121 :param bins: List of Bin objects.
122 :param output: Output file path for writing the TSV.
123 :param add_contigs: Flag indicating whether to include contig information.
124 """
126 # Define columns for the DataFrame
127 columns = [
128 "name",
129 "origin",
130 "is_original",
131 "original_name",
132 "completeness",
133 "contamination",
134 "score",
135 "checkm2_model",
136 "size",
137 "N50",
138 "coding_density",
139 "contig_count",
140 ]
141 if add_contigs:
142 columns.append("contigs")
144 # Create a list of dictionaries to build the DataFrame
145 data = []
146 for bin_obj in sorted(
147 bins, key=lambda x: (-x.score, -x.N50, -x.is_original, x.contigs_key)
148 ):
149 original_name = bin_obj.original_name if bin_obj.original_name else bin_obj.name
150 origins = bin_obj.origin if bin_obj.is_original else {"binette"}
152 bin_info = {
153 "name": bin_obj.name,
154 "origin": ";".join(origins),
155 "is_original": bin_obj.is_original,
156 "original_name": original_name,
157 "completeness": bin_obj.completeness,
158 "contamination": bin_obj.contamination,
159 "score": round(bin_obj.score, 2),
160 "checkm2_model": bin_obj.checkm2_model,
161 "size": bin_obj.length,
162 "N50": bin_obj.N50,
163 "coding_density": round(bin_obj.coding_density, 4)
164 if bin_obj.coding_density is not None
165 else None,
166 "contig_count": len(bin_obj.contigs),
167 }
169 if add_contigs:
170 bin_info["contigs"] = ";".join(str(c) for c in bin_obj.contigs)
172 data.append(bin_info)
174 # Create pandas DataFrame and write to TSV
175 df = pd.DataFrame(data, columns=columns)
176 df.to_csv(output, sep="\t", index=False)
179def write_bins_fasta(
180 selected_bins: list[Bin],
181 contigs_fasta: Path,
182 outdir: Path,
183 contigs_names: list[str],
184 max_buffer_size: int = 50_000_000,
185):
186 """
187 Write selected bins' contigs to separate FASTA files using pyfastx.Fastx (no index).
188 Buffer entries by total character size, not just number of sequences.
190 :param selected_bins: List of Bin objects with .id and .contigs.
191 :param contigs_fasta: Path to the input FASTA file.
192 :param outdir: Directory to save bin FASTA files.
193 :param max_buffer_size: Maximum total character size to buffer before flushing.
194 """
195 outdir.mkdir(parents=True, exist_ok=True)
197 # Clear existing files for selected bins
198 for sbin in selected_bins:
199 out_path = outdir / f"{sbin.name}.fa"
200 if out_path.exists():
201 out_path.unlink() # remove the file
203 # Map contig name to bin IDs
204 contig_to_bins = {}
205 for sbin in selected_bins:
206 for contig_id in sbin.contigs:
207 contig_name = contigs_names[contig_id]
208 contig_to_bins[contig_name] = sbin.name
210 assert len(contig_to_bins) == sum(len(sbin.contigs) for sbin in selected_bins), (
211 "Some contigs are present in multiple bins but should be unique."
212 )
214 buffer = defaultdict(list)
215 buffer_size = 0
217 def flush_buffer():
218 nonlocal buffer_size
219 for bin_name, seqs in buffer.items():
220 if seqs:
221 with open(outdir / f"{bin_name}.fa", "a") as f:
222 f.writelines(seqs)
223 buffer.clear()
224 buffer_size = 0
226 for name, seq in pyfastx.Fastx(contigs_fasta.as_posix()):
227 bin_name = contig_to_bins.get(name)
228 if not bin_name:
229 continue
231 fasta_entry = f">{name}\n{seq}\n"
232 entry_size = len(fasta_entry)
234 buffer[bin_name].append(fasta_entry)
236 buffer_size += entry_size
237 if buffer_size >= max_buffer_size:
238 flush_buffer()
240 flush_buffer()
243def check_contig_consistency(
244 contigs_from_assembly: Iterable[str],
245 contigs_from_elsewhere: Iterable[str],
246 assembly_file: str,
247 elsewhere_file: str,
248):
249 """
250 Check the consistency of contig names between different sources.
252 :param contigs_from_assembly: List of contig names from the assembly file.
253 :param contigs_from_elsewhere: List of contig names from an external source.
254 :param assembly_file: Path to the assembly file.
255 :param elsewhere_file: Path to the file from an external source.
256 :raises AssertionError: If inconsistencies in contig names are found.
257 """
258 logger.debug("Checking contig consistency")
259 are_contigs_consistent = len(
260 set(contigs_from_elsewhere) | set(contigs_from_assembly)
261 ) <= len(set(contigs_from_assembly))
263 issue_countigs = len(set(contigs_from_elsewhere) - set(contigs_from_assembly))
265 message = (
266 f"{issue_countigs} contigs found in file '{elsewhere_file}' "
267 f"were not found in assembly_file '{assembly_file}'"
268 )
269 assert are_contigs_consistent, message
272def check_resume_file(faa_file: Path, diamond_result_file: Path) -> None:
273 """
274 Check the existence of files required for resuming the process.
276 :param faa_file: Path to the protein file.
277 :param diamond_result_file: Path to the Diamond result file.
278 :raises FileNotFoundError: If the required files don't exist for resuming.
279 """
281 if faa_file.exists() and diamond_result_file.exists():
282 return
284 if not faa_file.exists():
285 error_msg = (
286 f"Protein file '{faa_file}' does not exist. Resuming is not possible."
287 )
288 logger.error(error_msg)
289 raise FileNotFoundError(error_msg)
291 if not diamond_result_file.exists():
292 error_msg = f"Diamond result file '{diamond_result_file}' does not exist. Resuming is not possible."
293 logger.error(error_msg)
294 raise FileNotFoundError(error_msg)
297def write_contig2bin_table(
298 selected_bins: list[Bin],
299 output_file: Path,
300 contigs_names: list[str],
301):
302 """
303 Write a simple TSV file mapping contig IDs to bin IDs.
305 :param selected_bins: List of selected Bin objects.
306 :param output_file: Path to the output TSV file.
307 :param contigs_names: List of contig names where index corresponds to contig ID.
308 """
309 logger.info(f"Writing contig2bin table to '{output_file}'")
311 # Ensure output directory exists
312 output_file.parent.mkdir(parents=True, exist_ok=True)
314 with open(output_file, "w") as f:
315 # Write contig to bin mappings
316 for bin_obj in selected_bins:
317 for contig_index in bin_obj.contigs:
318 contig_name = contigs_names[contig_index]
319 f.write(f"{contig_name}\t{bin_obj.name}\n")
321 total_entries = sum(len(bin_obj.contigs) for bin_obj in selected_bins)
322 logger.debug(f"Successfully wrote contig2bin table with {total_entries} entries")
325def write_original_bin_metrics(original_bins: list[Bin], original_bin_report_dir: Path):
326 """
327 Write metrics of original input bins to a specified directory.
329 This function writes the metrics for each bin set to a TSV file in the specified directory.
330 Each bin set will have its own TSV file named according to its set name.
332 :param original_bins: A set containing input bins
333 :param original_bin_report_dir: The directory path (Path) where the bin metrics will be saved.
334 """
336 original_bin_report_dir.mkdir(parents=True, exist_ok=True)
338 bin_set_name_to_bins = defaultdict(list)
339 for bin_obj in original_bins:
340 for origin in bin_obj.origin:
341 bin_set_name_to_bins[origin].append(bin_obj)
343 for i, (set_name, bins) in enumerate(sorted(bin_set_name_to_bins.items())):
344 bins_metric_file = (
345 original_bin_report_dir
346 / f"input_bins_{i + 1}.{set_name.replace('/', '_')}.tsv"
347 )
349 logger.debug(
350 f"Writing metrics for bin set '{set_name}' to file '{bins_metric_file}'"
351 )
352 write_bin_info(bins, bins_metric_file)
354 logger.debug("Completed writing all original input bin metrics")