Coverage for binette/cds.py: 100%

98 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-14 14:36 +0000

1import concurrent.futures as cf 

2import gzip 

3import logging 

4import multiprocessing.pool 

5from collections import Counter, defaultdict 

6from collections.abc import Iterator 

7from pathlib import Path 

8from typing import Any 

9 

10import numpy as np 

11import pyfastx 

12import pyrodigal 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17def get_contig_from_cds_name(cds_name: str) -> str: 

18 """ 

19 Extract the contig name from a CDS name. 

20 

21 :param cds_name: The name of the CDS. 

22 :type cds_name: str 

23 :return: The name of the contig. 

24 :rtype: str 

25 """ 

26 return "_".join(cds_name.split("_")[:-1]) 

27 

28 

29def predict( 

30 contigs_iterator: Iterator, outfaa: str, threads: int = 1 

31) -> tuple[dict[str, list[str]], dict[str, int | None]]: 

32 """ 

33 Predict open reading frames with Pyrodigal. 

34 

35 :param contigs_iterator: An iterator of contig sequences. 

36 :param outfaa: The output file path for predicted protein sequences (in FASTA format). 

37 :param threads: Number of CPU threads to use (default is 1). 

38 

39 :return: A dictionary mapping contig names to predicted genes and a dictionary mapping contig names to coding lengths. 

40 """ 

41 

42 orf_finder = pyrodigal.GeneFinder(meta="meta") 

43 

44 logger.info(f"Predicting CDS sequences with Pyrodigal using {threads} threads") 

45 

46 with multiprocessing.pool.ThreadPool(processes=threads) as pool: 

47 contig_and_genes = pool.starmap( 

48 predict_genes, 

49 ((orf_finder.find_genes, name, seq) for name, seq in contigs_iterator), 

50 ) 

51 

52 write_faa(outfaa, contig_and_genes) 

53 

54 contig_to_genes = { 

55 contig_id: [gene.translate() for gene in pyrodigal_genes] 

56 for contig_id, pyrodigal_genes in contig_and_genes 

57 } 

58 

59 contig_to_coding_length = { 

60 contig_id: get_contig_coding_len(pyrodigal_genes, len(pyrodigal_genes.sequence)) 

61 for contig_id, pyrodigal_genes in contig_and_genes 

62 } 

63 

64 return contig_to_genes, contig_to_coding_length 

65 

66 

67def get_contig_coding_len( 

68 genes: list[pyrodigal.Gene], contig_length: int 

69) -> int | None: 

70 """ 

71 Compute the coding length of a contig. Use a mask to account for overlapping genes. 

72 

73 :param genes: A list of gene annotations for the contig. 

74 :param contig_length: The length of the contig in base pairs. 

75 :return: The coding length as a float, or None if contig_length is zero. 

76 """ 

77 if contig_length == 0: 

78 return None 

79 conding_base_mask = np.zeros(contig_length) 

80 for g in genes: 

81 conding_base_mask[g.begin - 1 : g.end] = 1 

82 return np.sum(conding_base_mask) 

83 

84 

85def predict_genes(find_genes, name, seq) -> tuple[str, pyrodigal.Genes]: 

86 return (name, find_genes(seq)) 

87 

88 

89def write_faa(outfaa: str, contig_to_genes: list[tuple[str, pyrodigal.Genes]]) -> None: 

90 """ 

91 Write predicted protein sequences to a FASTA file. 

92 

93 :param outfaa: The output file path for predicted protein sequences (in FASTA format). 

94 :param contig_to_genes: A dictionary mapping contig names to predicted genes. 

95 

96 """ 

97 logger.info("Writing predicted protein sequences") 

98 with gzip.open(outfaa, "wt") as fl: 

99 for contig_id, genes in contig_to_genes: 

100 genes.write_translations(fl, contig_id) 

101 

102 

103def is_nucleic_acid(sequence: str) -> bool: 

104 """ 

105 Determines whether the given sequence is a DNA or RNA sequence. 

106 

107 :param sequence: The sequence to check. 

108 :return: True if the sequence is a DNA or RNA sequence, False otherwise. 

109 """ 

110 # Define nucleotidic bases (DNA and RNA) 

111 nucleotidic_bases = set("ATCGNUatcgnu") 

112 

113 # Check if all characters in the sequence are valid nucleotidic bases (DNA or RNA) 

114 if all(base in nucleotidic_bases for base in sequence): 

115 return True 

116 

117 # If any character is invalid, return False 

118 return False 

119 

120 

121def parse_faa_file(faa_file: str) -> dict[str, list[str]]: 

122 """ 

123 Parse a FASTA file containing protein sequences and organize them by contig. 

124 

125 :param faa_file: Path to the input FASTA file. 

126 :return: A dictionary mapping contig names to lists of protein sequences. 

127 :raises ValueError: If the file contains nucleotidic sequences instead of protein sequences. 

128 """ 

129 contig_to_genes = defaultdict(list) 

130 checked_sequences = [] 

131 

132 # Iterate through the FASTA file and parse sequences 

133 for name, seq in pyfastx.Fastx(faa_file): 

134 contig = get_contig_from_cds_name(name) 

135 contig_to_genes[contig].append(seq) 

136 

137 # Concatenate up to the first 20 sequences for validation 

138 if len(checked_sequences) < 20: 

139 checked_sequences.append(seq) 

140 

141 # Concatenate all checked sequences for a more reliable nucleic acid check 

142 concatenated_seq = "".join(checked_sequences) 

143 

144 # Check if the concatenated sequence appears to be nucleic acid 

145 if is_nucleic_acid(concatenated_seq): 

146 raise ValueError( 

147 f"The file '{faa_file}' appears to contain nucleotide sequences. " 

148 "Ensure that the file contains valid protein sequences in FASTA format." 

149 ) 

150 

151 return dict(contig_to_genes) 

152 

153 

154def get_aa_composition(genes: list[str]) -> Counter: 

155 """ 

156 Calculate the amino acid composition of a list of protein sequences. 

157 

158 :param genes: A list of protein sequences. 

159 :return: A Counter object representing the amino acid composition. 

160 """ 

161 aa_counter = Counter() 

162 for gene in genes: 

163 aa_counter += Counter(gene) 

164 # remove * from Counter 

165 aa_counter.pop("*", None) 

166 return aa_counter 

167 

168 

169def get_contig_cds_metadata_flat( 

170 contig_to_genes: dict[str, list[str]], 

171) -> tuple[dict[str, int], dict[str, Counter], dict[str, int]]: 

172 """ 

173 Calculate metadata for contigs, including CDS count, amino acid composition, and total amino acid length. 

174 

175 :param contig_to_genes: A dictionary mapping contig names to lists of protein sequences. 

176 :return: A tuple containing dictionaries for CDS count, amino acid composition, and total amino acid length. 

177 """ 

178 contig_to_cds_count = { 

179 contig: len(genes) for contig, genes in contig_to_genes.items() 

180 } 

181 

182 contig_to_aa_counter = { 

183 contig: get_aa_composition(genes) for contig, genes in contig_to_genes.items() 

184 } 

185 logger.info("Calculating amino acid composition") 

186 

187 contig_to_aa_length = { 

188 contig: sum(counter.values()) 

189 for contig, counter in contig_to_aa_counter.items() 

190 } 

191 logger.info("Calculating total amino acid length") 

192 

193 return contig_to_cds_count, contig_to_aa_counter, contig_to_aa_length 

194 

195 

196def get_contig_cds_metadata( 

197 contig_to_genes: dict[int, Any | list[Any]], threads: int 

198) -> dict[str, dict]: 

199 """ 

200 Calculate metadata for contigs in parallel, including CDS count, amino acid composition, and total amino acid length. 

201 

202 :param contig_to_genes: A dictionary mapping contig names to lists of protein sequences. 

203 :param threads: Number of CPU threads to use. 

204 :return: A tuple containing dictionaries for CDS count, amino acid composition, and total amino acid length. 

205 """ 

206 contig_to_cds_count = { 

207 contig: len(genes) for contig, genes in contig_to_genes.items() 

208 } 

209 

210 contig_to_future = {} 

211 logger.info(f"Collecting contig amino acid composition using {threads} threads") 

212 with cf.ProcessPoolExecutor(max_workers=threads) as tpe: 

213 for contig, genes in contig_to_genes.items(): 

214 contig_to_future[contig] = tpe.submit(get_aa_composition, genes) 

215 

216 contig_to_aa_counter = { 

217 contig: future.result() for contig, future in contig_to_future.items() 

218 } 

219 logger.info("Calculating amino acid composition in parallel") 

220 

221 contig_to_aa_length = { 

222 contig: sum(counter.values()) 

223 for contig, counter in contig_to_aa_counter.items() 

224 } 

225 logger.info("Calculating total amino acid length in parallel") 

226 

227 contig_info = { 

228 "contig_to_cds_count": contig_to_cds_count, 

229 "contig_to_aa_counter": contig_to_aa_counter, 

230 "contig_to_aa_length": contig_to_aa_length, 

231 } 

232 

233 return contig_info 

234 

235 

236def filter_faa_file( 

237 contigs_to_keep: list[str], 

238 input_faa_file: Path, 

239 filtered_faa_file: Path, 

240): 

241 """ 

242 Filters a FASTA file containing protein sequences to only include sequences 

243 from contigs present in the provided set of contigs (`contigs_to_keep`). 

244 

245 This function processes the input FASTA file, identifies protein sequences 

246 originating from contigs listed in `contigs_to_keep`, and writes the filtered 

247 sequences to a new FASTA file. The output file supports optional `.gz` compression. 

248 

249 :param contigs_to_keep: A set of contig names to retain in the output FASTA file. 

250 :param input_faa_file: Path to the input FASTA file containing protein sequences. 

251 :param filtered_faa_file: Path to the output FASTA file for filtered sequences. 

252 If the filename ends with `.gz`, the output will be compressed. 

253 """ 

254 # Determine whether the output file should be compressed 

255 proper_open = gzip.open if str(filtered_faa_file).endswith(".gz") else open 

256 

257 # Initialize tracking sets for metrics 

258 contigs_with_genes = set() 

259 contigs_parsed = set() 

260 

261 # Process the input FASTA file and filter sequences based on contigs_to_keep 

262 with proper_open(filtered_faa_file, "wt") as fl: 

263 for name, seq in pyfastx.Fastx(input_faa_file): 

264 contig = get_contig_from_cds_name(name) 

265 contigs_parsed.add(contig) 

266 if contig in contigs_to_keep: 

267 contigs_with_genes.add(contig) 

268 fl.write(f">{name}\n{seq}\n") 

269 

270 # Calculate metrics 

271 total_contigs = len(contigs_to_keep) 

272 contigs_with_no_genes = total_contigs - len(contigs_with_genes) 

273 contigs_not_in_keep_list = len(contigs_parsed - set(contigs_to_keep)) 

274 

275 # Log the computed metrics 

276 logger.info(f"Processing protein sequences from '{input_faa_file}'") 

277 logger.info( 

278 f"Filtered '{input_faa_file}' to retain genes from {total_contigs} contigs that are included in the input bins" 

279 ) 

280 logger.debug( 

281 f"Found {contigs_with_no_genes}/{total_contigs} contigs ({contigs_with_no_genes / total_contigs:.2%}) with no genes" 

282 ) 

283 logger.debug( 

284 f"{contigs_not_in_keep_list} contigs from the input FASTA file are not in the keep list" 

285 )