Source code for caput.pfb

"""Tools for calculating the effects of the CASPER tools PFB.

This module can:
- Evaluate the typical window functions used
- Evaluate a python model of the PFB
- Calculate the decorrelation effect for signals offset by a known time delay.

Window functions
================
- :py:meth:`sinc_window`
- :py:meth:`sinc_hanning`
- :py:meth:`sinc_hamming`

PFB
===
- :py:meth:`pfb`
- :py:meth:`decorrelation_ratio`
"""

import numpy as np
from scipy.interpolate import interp1d


[docs]def sinc_window(ntap, lblock): """Sinc window function. Parameters ---------- ntap : integer Number of taps. lblock: integer Length of block. Returns ------- window : np.ndarray[ntap * lblock] """ # Sampling locations of sinc function X = np.linspace(-ntap / 2, ntap / 2, ntap * lblock, endpoint=False) # np.sinc function is sin(pi*x)/pi*x, not sin(x)/x, so we can just X return np.sinc(X)
[docs]def sinc_hann(ntap, lblock): """Hann-sinc window function. Parameters ---------- ntap : integer Number of taps. lblock: integer Length of block. Returns ------- window : np.ndarray[ntap * lblock] """ return sinc_window(ntap, lblock) * np.hanning(ntap * lblock)
[docs]def sinc_hamming(ntap, lblock): """Hamming-sinc window function. Parameters ---------- ntap : integer Number of taps. lblock: integer Length of block. Returns ------- window : np.ndarray[ntap * lblock] """ return sinc_window(ntap, lblock) * np.hamming(ntap * lblock)
[docs]class PFB: """Model for the CASPER PFB. This is the PFB used in CHIME and other experiments. Parameters ---------- ntap : int Number of taps (i.e. blocks) used in one step of the PFB. lblock : int The length of a block that gets transformed. This is twice the number of output frequencies. window : function, optional The window function being used. If not set, use a Sinc-Hamming window. oversample : int, optional The amount to oversample when calculating the decorrelation ratio. This will improve accuracy. """ def __init__(self, ntap, lblock, window=None, oversample=4): self.ntap = ntap self.lblock = lblock self.window = sinc_hamming if window is None else window self.oversample = oversample
[docs] def apply(self, timestream): """Apply the PFB to a timestream. Parameters ---------- timestream : np.ndarray Timestream to process. Returns ------- pfb : np.ndarray[:, lblock // 2] Array of PFB frequencies. """ # Number of blocks nblock = timestream.size // self.lblock - (self.ntap - 1) # Initialise array for spectrum spec = np.zeros((nblock, self.lblock // 2), dtype=np.complex128) # Window function w = self.window(self.ntap, self.lblock) # Iterate over blocks and perform the PFB for bi in range(nblock): # Cut out the correct timestream section ts_sec = timestream[(bi * self.lblock) : ((bi + self.ntap) * self.lblock)] # Perform a real FFT (with applied window function) ft = np.fft.rfft(ts_sec * w) # Choose every n-th frequency spec[bi] = ft[: ((self.lblock // 2) * self.ntap) : self.ntap] return spec
_decorr_interp = None
[docs] def decorrelation_ratio(self, delay): """Calculates the decorrelation caused by a relative relay of two timestreams. This is caused by the fact that the PFB is generated from a finite time window of data. Parameters ---------- delay : array_like The relative delay between the correlated streams in units of samples (not required to be an integer). Returns ------- decorrelation : array_like The decorrelation ratio. """ if self._decorr_interp is None: N = self.ntap * self.lblock # Calculate the window and zero pad the array by a factor of oversample window_extended = np.zeros(N * self.oversample) window_extended[:N] = self.window(self.ntap, self.lblock) # Calculate the FFT and copy into an array over padded by another factor of # oversample. As we are doing real/inverse-real FFTs the actual length of # this array has the usual 1/2 N + 1 sizing. wf = np.fft.rfft(window_extended) wfpad = np.zeros(N * self.oversample**2 // 2 + 1, dtype=np.complex128) wfpad[: wf.size] = np.abs(wf) ** 2 # Calculate the ratio and the effective delays it is available at decorrelation_ratio = np.fft.irfft(wfpad) tau = np.fft.fftfreq( N * self.oversample**2, d=(1.0 / (N * self.oversample)) ) # Extract only the relevant range of time tau_r = tau[np.abs(tau) <= N] dc_r = decorrelation_ratio[np.abs(tau) <= N] / decorrelation_ratio[0] self._decorr_interp = interp1d( tau_r, dc_r, kind="linear", fill_value=0, assume_sorted=False, bounds_error=False, ) return self._decorr_interp(delay)