Coverage for binette/cds.py: 100%
98 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-14 14:36 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-14 14:36 +0000
1import concurrent.futures as cf
2import gzip
3import logging
4import multiprocessing.pool
5from collections import Counter, defaultdict
6from collections.abc import Iterator
7from pathlib import Path
8from typing import Any
10import numpy as np
11import pyfastx
12import pyrodigal
14logger = logging.getLogger(__name__)
17def get_contig_from_cds_name(cds_name: str) -> str:
18 """
19 Extract the contig name from a CDS name.
21 :param cds_name: The name of the CDS.
22 :type cds_name: str
23 :return: The name of the contig.
24 :rtype: str
25 """
26 return "_".join(cds_name.split("_")[:-1])
29def predict(
30 contigs_iterator: Iterator, outfaa: str, threads: int = 1
31) -> tuple[dict[str, list[str]], dict[str, int | None]]:
32 """
33 Predict open reading frames with Pyrodigal.
35 :param contigs_iterator: An iterator of contig sequences.
36 :param outfaa: The output file path for predicted protein sequences (in FASTA format).
37 :param threads: Number of CPU threads to use (default is 1).
39 :return: A dictionary mapping contig names to predicted genes and a dictionary mapping contig names to coding lengths.
40 """
42 orf_finder = pyrodigal.GeneFinder(meta="meta")
44 logger.info(f"Predicting CDS sequences with Pyrodigal using {threads} threads")
46 with multiprocessing.pool.ThreadPool(processes=threads) as pool:
47 contig_and_genes = pool.starmap(
48 predict_genes,
49 ((orf_finder.find_genes, name, seq) for name, seq in contigs_iterator),
50 )
52 write_faa(outfaa, contig_and_genes)
54 contig_to_genes = {
55 contig_id: [gene.translate() for gene in pyrodigal_genes]
56 for contig_id, pyrodigal_genes in contig_and_genes
57 }
59 contig_to_coding_length = {
60 contig_id: get_contig_coding_len(pyrodigal_genes, len(pyrodigal_genes.sequence))
61 for contig_id, pyrodigal_genes in contig_and_genes
62 }
64 return contig_to_genes, contig_to_coding_length
67def get_contig_coding_len(
68 genes: list[pyrodigal.Gene], contig_length: int
69) -> int | None:
70 """
71 Compute the coding length of a contig. Use a mask to account for overlapping genes.
73 :param genes: A list of gene annotations for the contig.
74 :param contig_length: The length of the contig in base pairs.
75 :return: The coding length as a float, or None if contig_length is zero.
76 """
77 if contig_length == 0:
78 return None
79 conding_base_mask = np.zeros(contig_length)
80 for g in genes:
81 conding_base_mask[g.begin - 1 : g.end] = 1
82 return np.sum(conding_base_mask)
85def predict_genes(find_genes, name, seq) -> tuple[str, pyrodigal.Genes]:
86 return (name, find_genes(seq))
89def write_faa(outfaa: str, contig_to_genes: list[tuple[str, pyrodigal.Genes]]) -> None:
90 """
91 Write predicted protein sequences to a FASTA file.
93 :param outfaa: The output file path for predicted protein sequences (in FASTA format).
94 :param contig_to_genes: A dictionary mapping contig names to predicted genes.
96 """
97 logger.info("Writing predicted protein sequences")
98 with gzip.open(outfaa, "wt") as fl:
99 for contig_id, genes in contig_to_genes:
100 genes.write_translations(fl, contig_id)
103def is_nucleic_acid(sequence: str) -> bool:
104 """
105 Determines whether the given sequence is a DNA or RNA sequence.
107 :param sequence: The sequence to check.
108 :return: True if the sequence is a DNA or RNA sequence, False otherwise.
109 """
110 # Define nucleotidic bases (DNA and RNA)
111 nucleotidic_bases = set("ATCGNUatcgnu")
113 # Check if all characters in the sequence are valid nucleotidic bases (DNA or RNA)
114 if all(base in nucleotidic_bases for base in sequence):
115 return True
117 # If any character is invalid, return False
118 return False
121def parse_faa_file(faa_file: str) -> dict[str, list[str]]:
122 """
123 Parse a FASTA file containing protein sequences and organize them by contig.
125 :param faa_file: Path to the input FASTA file.
126 :return: A dictionary mapping contig names to lists of protein sequences.
127 :raises ValueError: If the file contains nucleotidic sequences instead of protein sequences.
128 """
129 contig_to_genes = defaultdict(list)
130 checked_sequences = []
132 # Iterate through the FASTA file and parse sequences
133 for name, seq in pyfastx.Fastx(faa_file):
134 contig = get_contig_from_cds_name(name)
135 contig_to_genes[contig].append(seq)
137 # Concatenate up to the first 20 sequences for validation
138 if len(checked_sequences) < 20:
139 checked_sequences.append(seq)
141 # Concatenate all checked sequences for a more reliable nucleic acid check
142 concatenated_seq = "".join(checked_sequences)
144 # Check if the concatenated sequence appears to be nucleic acid
145 if is_nucleic_acid(concatenated_seq):
146 raise ValueError(
147 f"The file '{faa_file}' appears to contain nucleotide sequences. "
148 "Ensure that the file contains valid protein sequences in FASTA format."
149 )
151 return dict(contig_to_genes)
154def get_aa_composition(genes: list[str]) -> Counter:
155 """
156 Calculate the amino acid composition of a list of protein sequences.
158 :param genes: A list of protein sequences.
159 :return: A Counter object representing the amino acid composition.
160 """
161 aa_counter = Counter()
162 for gene in genes:
163 aa_counter += Counter(gene)
164 # remove * from Counter
165 aa_counter.pop("*", None)
166 return aa_counter
169def get_contig_cds_metadata_flat(
170 contig_to_genes: dict[str, list[str]],
171) -> tuple[dict[str, int], dict[str, Counter], dict[str, int]]:
172 """
173 Calculate metadata for contigs, including CDS count, amino acid composition, and total amino acid length.
175 :param contig_to_genes: A dictionary mapping contig names to lists of protein sequences.
176 :return: A tuple containing dictionaries for CDS count, amino acid composition, and total amino acid length.
177 """
178 contig_to_cds_count = {
179 contig: len(genes) for contig, genes in contig_to_genes.items()
180 }
182 contig_to_aa_counter = {
183 contig: get_aa_composition(genes) for contig, genes in contig_to_genes.items()
184 }
185 logger.info("Calculating amino acid composition")
187 contig_to_aa_length = {
188 contig: sum(counter.values())
189 for contig, counter in contig_to_aa_counter.items()
190 }
191 logger.info("Calculating total amino acid length")
193 return contig_to_cds_count, contig_to_aa_counter, contig_to_aa_length
196def get_contig_cds_metadata(
197 contig_to_genes: dict[int, Any | list[Any]], threads: int
198) -> dict[str, dict]:
199 """
200 Calculate metadata for contigs in parallel, including CDS count, amino acid composition, and total amino acid length.
202 :param contig_to_genes: A dictionary mapping contig names to lists of protein sequences.
203 :param threads: Number of CPU threads to use.
204 :return: A tuple containing dictionaries for CDS count, amino acid composition, and total amino acid length.
205 """
206 contig_to_cds_count = {
207 contig: len(genes) for contig, genes in contig_to_genes.items()
208 }
210 contig_to_future = {}
211 logger.info(f"Collecting contig amino acid composition using {threads} threads")
212 with cf.ProcessPoolExecutor(max_workers=threads) as tpe:
213 for contig, genes in contig_to_genes.items():
214 contig_to_future[contig] = tpe.submit(get_aa_composition, genes)
216 contig_to_aa_counter = {
217 contig: future.result() for contig, future in contig_to_future.items()
218 }
219 logger.info("Calculating amino acid composition in parallel")
221 contig_to_aa_length = {
222 contig: sum(counter.values())
223 for contig, counter in contig_to_aa_counter.items()
224 }
225 logger.info("Calculating total amino acid length in parallel")
227 contig_info = {
228 "contig_to_cds_count": contig_to_cds_count,
229 "contig_to_aa_counter": contig_to_aa_counter,
230 "contig_to_aa_length": contig_to_aa_length,
231 }
233 return contig_info
236def filter_faa_file(
237 contigs_to_keep: list[str],
238 input_faa_file: Path,
239 filtered_faa_file: Path,
240):
241 """
242 Filters a FASTA file containing protein sequences to only include sequences
243 from contigs present in the provided set of contigs (`contigs_to_keep`).
245 This function processes the input FASTA file, identifies protein sequences
246 originating from contigs listed in `contigs_to_keep`, and writes the filtered
247 sequences to a new FASTA file. The output file supports optional `.gz` compression.
249 :param contigs_to_keep: A set of contig names to retain in the output FASTA file.
250 :param input_faa_file: Path to the input FASTA file containing protein sequences.
251 :param filtered_faa_file: Path to the output FASTA file for filtered sequences.
252 If the filename ends with `.gz`, the output will be compressed.
253 """
254 # Determine whether the output file should be compressed
255 proper_open = gzip.open if str(filtered_faa_file).endswith(".gz") else open
257 # Initialize tracking sets for metrics
258 contigs_with_genes = set()
259 contigs_parsed = set()
261 # Process the input FASTA file and filter sequences based on contigs_to_keep
262 with proper_open(filtered_faa_file, "wt") as fl:
263 for name, seq in pyfastx.Fastx(input_faa_file):
264 contig = get_contig_from_cds_name(name)
265 contigs_parsed.add(contig)
266 if contig in contigs_to_keep:
267 contigs_with_genes.add(contig)
268 fl.write(f">{name}\n{seq}\n")
270 # Calculate metrics
271 total_contigs = len(contigs_to_keep)
272 contigs_with_no_genes = total_contigs - len(contigs_with_genes)
273 contigs_not_in_keep_list = len(contigs_parsed - set(contigs_to_keep))
275 # Log the computed metrics
276 logger.info(f"Processing protein sequences from '{input_faa_file}'")
277 logger.info(
278 f"Filtered '{input_faa_file}' to retain genes from {total_contigs} contigs that are included in the input bins"
279 )
280 logger.debug(
281 f"Found {contigs_with_no_genes}/{total_contigs} contigs ({contigs_with_no_genes / total_contigs:.2%}) with no genes"
282 )
283 logger.debug(
284 f"{contigs_not_in_keep_list} contigs from the input FASTA file are not in the keep list"
285 )