Source code for flexsweep.data

import glob
import os
import re
from collections import OrderedDict
from itertools import chain

# from subprocess import run
# from typing import Any, cast
from warnings import filterwarnings

from allel import GenotypeArray, index_windows, read_vcf, read_vcf_headers
from scipy.interpolate import interp1d

from . import Parallel, delayed, np, pl

filterwarnings("ignore", message="invalid INFO header", module="allel.io.vcf_read")


def _chrom_id(s):
    """
    Canonical chromosome extractor.

    Normalizes:
      chr01, chr1, contig_1, SUPER__01 -> "1"
      chrX -> "x"
      chrMT / chrM -> "mt"
    """
    s = s.lower()

    # unify mitochondrial naming
    s = re.sub(r"chr?m(t)?", "mt", s)

    # extract first meaningful token
    # Use lookahead instead of \b: in Python regex '_' is \w so \b fails for
    # tokens like "chr1_polarized" where '1' is followed by '_'.
    # (?=[^a-z0-9]|$) requires the token to be followed by a separator or EOS,
    # which also prevents 'y' in 'yri' from matching the Y chromosome.
    m = re.search(r"(?:chr|contig|super)?[^a-z0-9]*([0-9]+|x|y|mt)(?=[^a-z0-9]|$)", s)
    if not m:
        return None

    chrom = m.group(1)

    # normalize numeric chromosomes: "01" -> "1"
    if chrom.isdigit():
        chrom = str(int(chrom))

    return chrom


def get_contig_from_vcf_filename(path):
    """
    Returns:
        (contig_name, contig_length)
    """

    # Read headers
    headers = read_vcf_headers(path)

    # Parse contigs
    contig_pattern = re.compile(r"##contig=<ID=([^,]+),length=(\d+)>")

    contigs = {
        m.group(1): int(m.group(2))
        for line in headers.headers
        if (m := contig_pattern.match(line))
    }

    # Extract chrom from filename
    fname = os.path.basename(path)
    file_chr = _chrom_id(fname)

    if file_chr is None:
        return None, None

    # Match against header contigs
    matches = []

    for contig_name, length in contigs.items():
        contig_chr = _chrom_id(contig_name)

        if contig_chr == file_chr:
            matches.append((contig_name, length))

    # Resolve
    if len(matches) == 1:
        return matches[0]

    if len(matches) > 1:
        raise ValueError(f"Ambiguous contig match: {matches}")

    return None, None


[docs] class Data: def __init__( self, data, region=None, samples=None, recombination_map=None, window_size=int(1.2e6), step=int(1e4), nthreads=1, sequence_length=int(1.2e6), in_memory=False, ): """ Utilities for loading VCF/ms-style simulations and producing windowed inputs. Args: data (str | Any): Path to input data: - Path containing VCF/BCF files - Path contiaining discoal simulations region (str, default=None): Optional genomic region string 'CHR:START-END' to read from VCF/BCF. samples (list[str] | np.ndarray | None, default=None): Optional subset of samples (names or indices) to read from VCF/BCF. recombination_map (str | None, default=None): Optional path to recombination map CSV with columns: [Chr, Begin, End, cMperMb, cM]. If None, assumes physical distance ~ genetic distance. window_size (int, default=1_200_000): Genomic window size in base pairs when scanning a VCF. step (int, default=10_000): Sliding step in base pairs between adjacent windows when scanning a VCF. nthreads (int, default=1): Number of parallel workers for joblib. sequence_length (int, default=1_200_000): Sequence length used to scale ms/discoal 'positions' (0..1) into bp. Notes: - The class handle both functions to read VCF files and discoal simulations. - Randomness is not used here; outputs are deterministic given inputs. """ self.data = data self.region = region self.samples = samples self.recombination_map = recombination_map self.window_size = window_size self.step = step self.nthreads = nthreads self.sequence_length = sequence_length self.in_memory = in_memory
[docs] def genome_reader(self, region, samples): """ Read a genomic region from a VCF/BCF and return haplotypes and rec_map given the region interval. Args: region (str): Region string 'CHR:START-END'. samples (list[str] | np.ndarray): Sample subset to read. Returns: dict[str, tuple[np.ndarray, np.ndarray] | None]: {region: (hap, rec_map_subset)} or {region: None} if no biallelic sites. - hap: (S x N) int8 array of phased haplotypes (derived-allele indicators). - rec_map_subset: (S x 4) array [chrom, idx, genetic_pos(cm or proxy), physical_pos]. Notes: - Filters to biallelic variants via scikit-allel's AlleleCounts.is_biallelic_01. - If no recombination map is supplied, genetic distance is set to physical pos and later remapped to a relative [1..sequence_length] coordinate system. """ filterwarnings( "ignore", message="invalid INFO header", module="allel.io.vcf_read" ) raw_data = read_vcf(self.data, region=region, samples=samples) try: gt = GenotypeArray(raw_data["calldata/GT"]) except Exception: return {region: None} pos = raw_data["variants/POS"] np_chrom = np.char.replace(raw_data["variants/CHROM"].astype(str), "chr", "") try: np_chrom = np_chrom.astype(int) except Exception: pass ac = gt.count_alleles() biallelic_filter = ac.is_biallelic_01() hap = gt.to_haplotypes() hap = hap.values[biallelic_filter] pos = pos[biallelic_filter] np_chrom = np_chrom[biallelic_filter] if hap.shape[0] == 0: return {region: None} tmp = list(map(int, region.split(":")[-1].split("-"))) d_pos = dict( zip(np.arange(tmp[0], tmp[1] + 1), np.arange(self.sequence_length) + 1) ) if self.recombination_map is None: rec_map = pl.DataFrame( {"chrom": np_chrom, "idx": np.arange(pos.size), "pos": pos, "cm": pos} ).to_numpy() for r in rec_map: r[-1] = d_pos[r[-1]] r[-2] = d_pos[r[-2]] else: df_recombination_map = pl.read_csv(self.recombination_map, separator=",") genetic_distance = self.get_cm(df_recombination_map, pos) rec_map = pl.DataFrame( [np_chrom, np.arange(pos.size), pos, genetic_distance] ).to_numpy() for r in rec_map: r[-2] = d_pos[r[-2]] if np.all(rec_map[:, -1] == 0): rec_map[:, -1] = rec_map[:, -2] # # physical position to relative physical position (1,1.2e6) # # this way we do not perform any change on summary_statistics center/windows combinations # f = interp1d(w, [1, int(self.window_size) + 1]) # rec_map = np.column_stack([rec_map, f(rec_map[:, 2]).astype(int)]) # return hap return {region: (hap, rec_map[:, [0, 1, -1, 2]])}
[docs] def read_vcf(self): """ Slide windows across a single-contig VCF/BCF and collect per-window data haplotypes Returns: dict | OrderedDict: If in_memory=True: {'sweep': list[[hap, rec_map, zeros(4)]], 'region': list[str]} where each element corresponds to a window with data. If in_memory=False: OrderedDict({'sweep': <vcf_path>, 'region': list[str]}) with region strings to be used downstream for lazy reading. Raises: IOError: If the file extension is not one of [.vcf, .vcf.gz, .bcf, .bcf.gz]. FileNotFoundError: If the VCF/BCF is not tabix-indexed (<path>.tbi is missing). Notes: - Determines contig name/length by reading last line of the vcf file. - Generates overlapping windows of size `window_size` and step `step`. - Uses multiprocessing when in_memory=True to read per-window haplotypes. """ if ( not self.data.endswith(".vcf") and not self.data.endswith(".vcf.gz") and not self.data.endswith(".bcf") and not self.data.endswith(".bcf.gz") ): raise IOError("VCF file must be vcf or bcf format") if not os.path.exists(self.data + ".tbi"): raise FileNotFoundError( f"Please index the vcf/bcf file before continue using: tabix -p vcf {self.data}" ) # check_contig_length = ( # f"{'zcat' if '.gz' in self.data else 'cat'} {self.data} | tail -n 1" # ) # contig_name, contig_length = run( # check_contig_length, shell=True, capture_output=True, text=True # ).stdout.split("\t")[:2] contig_name, contig_length = get_contig_from_vcf_filename(self.data) # check_contig_start = ( # f'zgrep -v "#" {self.data} | head -n 1' # if self.data.endswith(".gz") # else f'fgrep -v "^#" {self.data} | head -n 1' # ) # contig_start = run( # check_contig_start, shell=True, capture_output=True, text=True # ).stdout.split("\t")[1] # if (int(contig_start) - 6e5) < 0: # contig_start = 1 # if self.step is None: # step = None # else: # step = int(self.step) window_iter = list( index_windows( np.arange(1, int(contig_length)), int(self.window_size - 1), 1, int(contig_length) + int(self.window_size - 1), int(self.step), ) ) if self.in_memory: with Parallel( n_jobs=self.nthreads, backend="multiprocessing", verbose=5 ) as parallel: region_data = parallel( delayed(self.genome_reader)( contig_name + ":" + str(w[0]) + "-" + str(w[1]), self.samples ) for w in window_iter ) out_dict = dict(chain.from_iterable(d.items() for d in region_data)) sims = {"sweep": [], "region": []} for k, v in out_dict.items(): if v is not None: tmp = list(v) tmp.append(np.zeros(4)) sims["sweep"].append(tmp) sims["region"].append(k) else: sims = OrderedDict({"sweep": [], "region": []}) sims["sweep"] = self.data for w in window_iter: # if v is not None: sims["region"].append(contig_name + ":" + str(w[0]) + "-" + str(w[1])) return sims
[docs] def read_simulations(self): """ Load paths (or parsed data if in_memory=True) for discoal simulations. Expects a directory structure: <self.data>/neutral/*.ms.gz <self.data>/sweep/*.ms.gz <self.data>/params.txt.gz Returns: tuple[OrderedDict, pl.DataFrame]: sims: OrderedDict({'neutral': <list|array>, 'sweep': <list|array>}) params: polars DataFrame with columns ['model', 's', 't', 'saf', 'eaf']. Raises: AssertionError: If self.data is not a string. ValueError: If required 'neutral' or 'sweep' subfolders are missing or empty. """ assert isinstance(self.data, str) for folder in ("sweep", "neutral"): folder_path = os.path.join(self.data, folder) if not os.path.exists(folder_path): raise ValueError(f"Required directory not found: {folder_path}") if not glob.glob(os.path.join(folder_path, "*")): raise ValueError(f"Directory is empty: {folder_path}") df_params = pl.read_csv(self.data + "/params.txt.gz") params = df_params.select(["model", "s", "t", "saf", "eaf", "mu", "r"]) df_neutral = df_params.filter(pl.col("model") == "neutral") df_sweeps = df_params.filter(pl.col("model") == "sweep") sweeps = ( (self.data + "/sweep/sweep_" + (df_sweeps.select("iter") + ".ms.gz")) .to_numpy() .flatten() ) neutral = ( (self.data + "/neutral/neutral_" + (df_neutral.select("iter") + ".ms.gz")) .to_numpy() .flatten() ) sims = OrderedDict( { "neutral": neutral, "sweep": sweeps, } ) return sims, params
[docs] def get_cm(self, df_rec_map, positions): """ Interpolate genetic distances (cM) for given physical positions. Args: df_rec_map (pl.DataFrame): Recombination map with numeric columns where: columns[1] holds physical coordinate (bp) and columns[-1] holds cumulative genetic distance (cM). positions (np.ndarray[int]): Physical positions to interpolate. Returns: np.ndarray[float]: Interpolated cumulative cM values (negative values clamped to 0). Notes: Uses linear interpolation with extrapolation at ends. """ interp_func = interp1d( df_rec_map.select(df_rec_map.columns[1]).to_numpy().flatten(), df_rec_map.select(df_rec_map.columns[-1]).to_numpy().flatten(), kind="linear", fill_value="extrapolate", ) # Interpolate the cM values at the interval positions rr1 = interp_func(positions) # rr2 = interp_func(positions[:, 1]) rr1[rr1 < 0] = 0 # Calculate the recombination rate in cM/Mb # rate = (rr2 - rr1) / ((positions[:, 1] - positions[:, 0]) / 1e6) return rr1