Coverage for binette/cds.py: 100%
90 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-01-06 19:22 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-01-06 19:22 +0000
1import concurrent.futures as cf
2import multiprocessing.pool
3import logging
4from collections import Counter, defaultdict
5from typing import Dict, List, Iterator, Tuple, Any, Union, Set
7import pyfastx
8import pyrodigal
9from tqdm import tqdm
10from pathlib import Path
11import gzip
14def get_contig_from_cds_name(cds_name: str) -> str:
15 """
16 Extract the contig name from a CDS name.
18 :param cds_name: The name of the CDS.
19 :type cds_name: str
20 :return: The name of the contig.
21 :rtype: str
22 """
23 return "_".join(cds_name.split("_")[:-1])
26def predict(
27 contigs_iterator: Iterator, outfaa: str, threads: int = 1
28) -> Dict[str, List[str]]:
29 """
30 Predict open reading frames with Pyrodigal.
32 :param contigs_iterator: An iterator of contig sequences.
33 :param outfaa: The output file path for predicted protein sequences (in FASTA format).
34 :param threads: Number of CPU threads to use (default is 1).
36 :return: A dictionary mapping contig names to predicted genes.
37 """
38 try:
39 # for version >=3 of pyrodigal
40 orf_finder = pyrodigal.GeneFinder(meta="meta") # type: ignore
41 except AttributeError:
42 orf_finder = pyrodigal.OrfFinder(meta="meta") # type: ignore
44 logging.info(f"Predicting cds sequences with Pyrodigal using {threads} threads.")
46 with multiprocessing.pool.ThreadPool(processes=threads) as pool:
47 contig_and_genes = pool.starmap(
48 predict_genes, ((orf_finder.find_genes, seq) for seq in contigs_iterator)
49 )
51 write_faa(outfaa, contig_and_genes)
53 contig_to_genes = {
54 contig_id: [gene.translate() for gene in pyrodigal_genes]
55 for contig_id, pyrodigal_genes in contig_and_genes
56 }
58 return contig_to_genes
61def predict_genes(find_genes, seq) -> Tuple[str, pyrodigal.Genes]:
63 return (seq.name, find_genes(seq.seq))
66def write_faa(outfaa: str, contig_to_genes: List[Tuple[str, pyrodigal.Genes]]) -> None:
67 """
68 Write predicted protein sequences to a FASTA file.
70 :param outfaa: The output file path for predicted protein sequences (in FASTA format).
71 :param contig_to_genes: A dictionary mapping contig names to predicted genes.
73 """
74 logging.info("Writing predicted protein sequences.")
75 with gzip.open(outfaa, "wt") as fl:
76 for contig_id, genes in contig_to_genes:
77 genes.write_translations(fl, contig_id)
80def is_nucleic_acid(sequence: str) -> bool:
81 """
82 Determines whether the given sequence is a DNA or RNA sequence.
84 :param sequence: The sequence to check.
85 :return: True if the sequence is a DNA or RNA sequence, False otherwise.
86 """
87 # Define nucleotidic bases (DNA and RNA)
88 nucleotidic_bases = set("ATCGNUatcgnu")
90 # Check if all characters in the sequence are valid nucleotidic bases (DNA or RNA)
91 if all(base in nucleotidic_bases for base in sequence):
92 return True
94 # If any character is invalid, return False
95 return False
98def parse_faa_file(faa_file: str) -> Dict[str, List[str]]:
99 """
100 Parse a FASTA file containing protein sequences and organize them by contig.
102 :param faa_file: Path to the input FASTA file.
103 :return: A dictionary mapping contig names to lists of protein sequences.
104 :raises ValueError: If the file contains nucleotidic sequences instead of protein sequences.
105 """
106 contig_to_genes = defaultdict(list)
107 checked_sequences = []
109 # Iterate through the FASTA file and parse sequences
110 for name, seq in pyfastx.Fastx(faa_file):
111 contig = get_contig_from_cds_name(name)
112 contig_to_genes[contig].append(seq)
114 # Concatenate up to the first 20 sequences for validation
115 if len(checked_sequences) < 20:
116 checked_sequences.append(seq)
118 # Concatenate all checked sequences for a more reliable nucleic acid check
119 concatenated_seq = "".join(checked_sequences)
121 # Check if the concatenated sequence appears to be nucleic acid
122 if is_nucleic_acid(concatenated_seq):
123 raise ValueError(
124 f"The file '{faa_file}' appears to contain nucleotide sequences. "
125 "Ensure that the file contains valid protein sequences in FASTA format."
126 )
128 return dict(contig_to_genes)
131def get_aa_composition(genes: List[str]) -> Counter:
132 """
133 Calculate the amino acid composition of a list of protein sequences.
135 :param genes: A list of protein sequences.
136 :return: A Counter object representing the amino acid composition.
137 """
138 aa_counter = Counter()
139 for gene in genes:
140 aa_counter += Counter(gene)
142 return aa_counter
145def get_contig_cds_metadata_flat(
146 contig_to_genes: Dict[str, List[str]]
147) -> Tuple[Dict[str, int], Dict[str, Counter], Dict[str, int]]:
148 """
149 Calculate metadata for contigs, including CDS count, amino acid composition, and total amino acid length.
151 :param contig_to_genes: A dictionary mapping contig names to lists of protein sequences.
152 :return: A tuple containing dictionaries for CDS count, amino acid composition, and total amino acid length.
153 """
154 contig_to_cds_count = {
155 contig: len(genes) for contig, genes in contig_to_genes.items()
156 }
158 contig_to_aa_counter = {
159 contig: get_aa_composition(genes)
160 for contig, genes in tqdm(contig_to_genes.items(), unit="contig")
161 }
162 logging.info("Calculating amino acid composition.")
164 contig_to_aa_length = {
165 contig: sum(counter.values())
166 for contig, counter in tqdm(contig_to_aa_counter.items(), unit="contig")
167 }
168 logging.info("Calculating total amino acid length.")
170 return contig_to_cds_count, contig_to_aa_counter, contig_to_aa_length
173def get_contig_cds_metadata(
174 contig_to_genes: Dict[int, Union[Any, List[Any]]], threads: int
175) -> Dict[str, Dict]:
176 """
177 Calculate metadata for contigs in parallel, including CDS count, amino acid composition, and total amino acid length.
179 :param contig_to_genes: A dictionary mapping contig names to lists of protein sequences.
180 :param threads: Number of CPU threads to use.
181 :return: A tuple containing dictionaries for CDS count, amino acid composition, and total amino acid length.
182 """
183 contig_to_cds_count = {
184 contig: len(genes) for contig, genes in contig_to_genes.items()
185 }
187 contig_to_future = {}
188 logging.info(f"Collecting contig amino acid composition using {threads} threads.")
189 with cf.ProcessPoolExecutor(max_workers=threads) as tpe:
190 for contig, genes in tqdm(contig_to_genes.items()):
191 contig_to_future[contig] = tpe.submit(get_aa_composition, genes)
193 contig_to_aa_counter = {
194 contig: future.result()
195 for contig, future in tqdm(contig_to_future.items(), unit="contig")
196 }
197 logging.info("Calculating amino acid composition in parallel.")
199 contig_to_aa_length = {
200 contig: sum(counter.values())
201 for contig, counter in tqdm(contig_to_aa_counter.items(), unit="contig")
202 }
203 logging.info("Calculating total amino acid length in parallel.")
205 contig_info = {
206 "contig_to_cds_count": contig_to_cds_count,
207 "contig_to_aa_counter": contig_to_aa_counter,
208 "contig_to_aa_length": contig_to_aa_length,
209 }
211 return contig_info
214def filter_faa_file(
215 contigs_to_keep: Set[str],
216 input_faa_file: Path,
217 filtered_faa_file: Path,
218):
219 """
220 Filters a FASTA file containing protein sequences to only include sequences
221 from contigs present in the provided set of contigs (`contigs_to_keep`).
223 This function processes the input FASTA file, identifies protein sequences
224 originating from contigs listed in `contigs_to_keep`, and writes the filtered
225 sequences to a new FASTA file. The output file supports optional `.gz` compression.
227 :param contigs_to_keep: A set of contig names to retain in the output FASTA file.
228 :param input_faa_file: Path to the input FASTA file containing protein sequences.
229 :param filtered_faa_file: Path to the output FASTA file for filtered sequences.
230 If the filename ends with `.gz`, the output will be compressed.
231 """
232 # Determine whether the output file should be compressed
233 proper_open = gzip.open if str(filtered_faa_file).endswith(".gz") else open
235 # Initialize tracking sets for metrics
236 contigs_with_genes = set()
237 contigs_parsed = set()
239 # Process the input FASTA file and filter sequences based on contigs_to_keep
240 with proper_open(filtered_faa_file, "wt") as fl:
241 for name, seq in pyfastx.Fastx(input_faa_file):
242 contig = get_contig_from_cds_name(name)
243 contigs_parsed.add(contig)
244 if contig in contigs_to_keep:
245 contigs_with_genes.add(contig)
246 fl.write(f">{name}\n{seq}\n")
248 # Calculate metrics
249 total_contigs = len(contigs_to_keep)
250 contigs_with_no_genes = total_contigs - len(contigs_with_genes)
251 contigs_not_in_keep_list = len(contigs_parsed - contigs_to_keep)
253 # Log the computed metrics
254 logging.info(f"Processing protein sequences from '{input_faa_file}'.")
255 logging.info(
256 f"Filtered {input_faa_file} to retain genes from {total_contigs} contigs that are included in the input bins."
257 )
258 logging.debug(
259 f"Found {contigs_with_no_genes}/{total_contigs} contigs ({contigs_with_no_genes / total_contigs:.2%}) with no genes."
260 )
261 logging.debug(
262 f"{contigs_not_in_keep_list} contigs from the input FASTA file are not in the keep list."
263 )