Coverage for binette/cds.py: 100%

90 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-01-06 19:22 +0000

1import concurrent.futures as cf 

2import multiprocessing.pool 

3import logging 

4from collections import Counter, defaultdict 

5from typing import Dict, List, Iterator, Tuple, Any, Union, Set 

6 

7import pyfastx 

8import pyrodigal 

9from tqdm import tqdm 

10from pathlib import Path 

11import gzip 

12 

13 

14def get_contig_from_cds_name(cds_name: str) -> str: 

15 """ 

16 Extract the contig name from a CDS name. 

17 

18 :param cds_name: The name of the CDS. 

19 :type cds_name: str 

20 :return: The name of the contig. 

21 :rtype: str 

22 """ 

23 return "_".join(cds_name.split("_")[:-1]) 

24 

25 

26def predict( 

27 contigs_iterator: Iterator, outfaa: str, threads: int = 1 

28) -> Dict[str, List[str]]: 

29 """ 

30 Predict open reading frames with Pyrodigal. 

31 

32 :param contigs_iterator: An iterator of contig sequences. 

33 :param outfaa: The output file path for predicted protein sequences (in FASTA format). 

34 :param threads: Number of CPU threads to use (default is 1). 

35 

36 :return: A dictionary mapping contig names to predicted genes. 

37 """ 

38 try: 

39 # for version >=3 of pyrodigal 

40 orf_finder = pyrodigal.GeneFinder(meta="meta") # type: ignore 

41 except AttributeError: 

42 orf_finder = pyrodigal.OrfFinder(meta="meta") # type: ignore 

43 

44 logging.info(f"Predicting cds sequences with Pyrodigal using {threads} threads.") 

45 

46 with multiprocessing.pool.ThreadPool(processes=threads) as pool: 

47 contig_and_genes = pool.starmap( 

48 predict_genes, ((orf_finder.find_genes, seq) for seq in contigs_iterator) 

49 ) 

50 

51 write_faa(outfaa, contig_and_genes) 

52 

53 contig_to_genes = { 

54 contig_id: [gene.translate() for gene in pyrodigal_genes] 

55 for contig_id, pyrodigal_genes in contig_and_genes 

56 } 

57 

58 return contig_to_genes 

59 

60 

61def predict_genes(find_genes, seq) -> Tuple[str, pyrodigal.Genes]: 

62 

63 return (seq.name, find_genes(seq.seq)) 

64 

65 

66def write_faa(outfaa: str, contig_to_genes: List[Tuple[str, pyrodigal.Genes]]) -> None: 

67 """ 

68 Write predicted protein sequences to a FASTA file. 

69 

70 :param outfaa: The output file path for predicted protein sequences (in FASTA format). 

71 :param contig_to_genes: A dictionary mapping contig names to predicted genes. 

72 

73 """ 

74 logging.info("Writing predicted protein sequences.") 

75 with gzip.open(outfaa, "wt") as fl: 

76 for contig_id, genes in contig_to_genes: 

77 genes.write_translations(fl, contig_id) 

78 

79 

80def is_nucleic_acid(sequence: str) -> bool: 

81 """ 

82 Determines whether the given sequence is a DNA or RNA sequence. 

83 

84 :param sequence: The sequence to check. 

85 :return: True if the sequence is a DNA or RNA sequence, False otherwise. 

86 """ 

87 # Define nucleotidic bases (DNA and RNA) 

88 nucleotidic_bases = set("ATCGNUatcgnu") 

89 

90 # Check if all characters in the sequence are valid nucleotidic bases (DNA or RNA) 

91 if all(base in nucleotidic_bases for base in sequence): 

92 return True 

93 

94 # If any character is invalid, return False 

95 return False 

96 

97 

98def parse_faa_file(faa_file: str) -> Dict[str, List[str]]: 

99 """ 

100 Parse a FASTA file containing protein sequences and organize them by contig. 

101 

102 :param faa_file: Path to the input FASTA file. 

103 :return: A dictionary mapping contig names to lists of protein sequences. 

104 :raises ValueError: If the file contains nucleotidic sequences instead of protein sequences. 

105 """ 

106 contig_to_genes = defaultdict(list) 

107 checked_sequences = [] 

108 

109 # Iterate through the FASTA file and parse sequences 

110 for name, seq in pyfastx.Fastx(faa_file): 

111 contig = get_contig_from_cds_name(name) 

112 contig_to_genes[contig].append(seq) 

113 

114 # Concatenate up to the first 20 sequences for validation 

115 if len(checked_sequences) < 20: 

116 checked_sequences.append(seq) 

117 

118 # Concatenate all checked sequences for a more reliable nucleic acid check 

119 concatenated_seq = "".join(checked_sequences) 

120 

121 # Check if the concatenated sequence appears to be nucleic acid 

122 if is_nucleic_acid(concatenated_seq): 

123 raise ValueError( 

124 f"The file '{faa_file}' appears to contain nucleotide sequences. " 

125 "Ensure that the file contains valid protein sequences in FASTA format." 

126 ) 

127 

128 return dict(contig_to_genes) 

129 

130 

131def get_aa_composition(genes: List[str]) -> Counter: 

132 """ 

133 Calculate the amino acid composition of a list of protein sequences. 

134 

135 :param genes: A list of protein sequences. 

136 :return: A Counter object representing the amino acid composition. 

137 """ 

138 aa_counter = Counter() 

139 for gene in genes: 

140 aa_counter += Counter(gene) 

141 

142 return aa_counter 

143 

144 

145def get_contig_cds_metadata_flat( 

146 contig_to_genes: Dict[str, List[str]] 

147) -> Tuple[Dict[str, int], Dict[str, Counter], Dict[str, int]]: 

148 """ 

149 Calculate metadata for contigs, including CDS count, amino acid composition, and total amino acid length. 

150 

151 :param contig_to_genes: A dictionary mapping contig names to lists of protein sequences. 

152 :return: A tuple containing dictionaries for CDS count, amino acid composition, and total amino acid length. 

153 """ 

154 contig_to_cds_count = { 

155 contig: len(genes) for contig, genes in contig_to_genes.items() 

156 } 

157 

158 contig_to_aa_counter = { 

159 contig: get_aa_composition(genes) 

160 for contig, genes in tqdm(contig_to_genes.items(), unit="contig") 

161 } 

162 logging.info("Calculating amino acid composition.") 

163 

164 contig_to_aa_length = { 

165 contig: sum(counter.values()) 

166 for contig, counter in tqdm(contig_to_aa_counter.items(), unit="contig") 

167 } 

168 logging.info("Calculating total amino acid length.") 

169 

170 return contig_to_cds_count, contig_to_aa_counter, contig_to_aa_length 

171 

172 

173def get_contig_cds_metadata( 

174 contig_to_genes: Dict[int, Union[Any, List[Any]]], threads: int 

175) -> Dict[str, Dict]: 

176 """ 

177 Calculate metadata for contigs in parallel, including CDS count, amino acid composition, and total amino acid length. 

178 

179 :param contig_to_genes: A dictionary mapping contig names to lists of protein sequences. 

180 :param threads: Number of CPU threads to use. 

181 :return: A tuple containing dictionaries for CDS count, amino acid composition, and total amino acid length. 

182 """ 

183 contig_to_cds_count = { 

184 contig: len(genes) for contig, genes in contig_to_genes.items() 

185 } 

186 

187 contig_to_future = {} 

188 logging.info(f"Collecting contig amino acid composition using {threads} threads.") 

189 with cf.ProcessPoolExecutor(max_workers=threads) as tpe: 

190 for contig, genes in tqdm(contig_to_genes.items()): 

191 contig_to_future[contig] = tpe.submit(get_aa_composition, genes) 

192 

193 contig_to_aa_counter = { 

194 contig: future.result() 

195 for contig, future in tqdm(contig_to_future.items(), unit="contig") 

196 } 

197 logging.info("Calculating amino acid composition in parallel.") 

198 

199 contig_to_aa_length = { 

200 contig: sum(counter.values()) 

201 for contig, counter in tqdm(contig_to_aa_counter.items(), unit="contig") 

202 } 

203 logging.info("Calculating total amino acid length in parallel.") 

204 

205 contig_info = { 

206 "contig_to_cds_count": contig_to_cds_count, 

207 "contig_to_aa_counter": contig_to_aa_counter, 

208 "contig_to_aa_length": contig_to_aa_length, 

209 } 

210 

211 return contig_info 

212 

213 

214def filter_faa_file( 

215 contigs_to_keep: Set[str], 

216 input_faa_file: Path, 

217 filtered_faa_file: Path, 

218): 

219 """ 

220 Filters a FASTA file containing protein sequences to only include sequences 

221 from contigs present in the provided set of contigs (`contigs_to_keep`). 

222 

223 This function processes the input FASTA file, identifies protein sequences 

224 originating from contigs listed in `contigs_to_keep`, and writes the filtered 

225 sequences to a new FASTA file. The output file supports optional `.gz` compression. 

226 

227 :param contigs_to_keep: A set of contig names to retain in the output FASTA file. 

228 :param input_faa_file: Path to the input FASTA file containing protein sequences. 

229 :param filtered_faa_file: Path to the output FASTA file for filtered sequences. 

230 If the filename ends with `.gz`, the output will be compressed. 

231 """ 

232 # Determine whether the output file should be compressed 

233 proper_open = gzip.open if str(filtered_faa_file).endswith(".gz") else open 

234 

235 # Initialize tracking sets for metrics 

236 contigs_with_genes = set() 

237 contigs_parsed = set() 

238 

239 # Process the input FASTA file and filter sequences based on contigs_to_keep 

240 with proper_open(filtered_faa_file, "wt") as fl: 

241 for name, seq in pyfastx.Fastx(input_faa_file): 

242 contig = get_contig_from_cds_name(name) 

243 contigs_parsed.add(contig) 

244 if contig in contigs_to_keep: 

245 contigs_with_genes.add(contig) 

246 fl.write(f">{name}\n{seq}\n") 

247 

248 # Calculate metrics 

249 total_contigs = len(contigs_to_keep) 

250 contigs_with_no_genes = total_contigs - len(contigs_with_genes) 

251 contigs_not_in_keep_list = len(contigs_parsed - contigs_to_keep) 

252 

253 # Log the computed metrics 

254 logging.info(f"Processing protein sequences from '{input_faa_file}'.") 

255 logging.info( 

256 f"Filtered {input_faa_file} to retain genes from {total_contigs} contigs that are included in the input bins." 

257 ) 

258 logging.debug( 

259 f"Found {contigs_with_no_genes}/{total_contigs} contigs ({contigs_with_no_genes / total_contigs:.2%}) with no genes." 

260 ) 

261 logging.debug( 

262 f"{contigs_not_in_keep_list} contigs from the input FASTA file are not in the keep list." 

263 )