dspeed package#

The dspeed signal processing framework is responsible for running a variety of discrete signal processors on data.

Subpackages#

Submodules#

dspeed.build_dsp module#

This module provides high-level routines for running signal processing chains on waveform data.

dspeed.build_dsp.build_dsp(f_raw: str, f_dsp: str, dsp_config: str | dict | None = None, lh5_tables: list[str] | str | None = None, database: str | dict | None = None, outputs: list[str] | None = None, n_max: int = inf, write_mode: str | None = None, buffer_len: int = 3200, block_width: int = 16, chan_config: dict[str, str] | None = None) None#

Convert raw-tier LH5 data into dsp-tier LH5 data by running a sequence of processors via the ProcessingChain.

Parameters:
  • f_raw (str) – name of raw-tier LH5 file to read from.

  • f_dsp (str) – name of dsp-tier LH5 file to write to.

  • dsp_config (str | dict | None) – dict or name of JSON file containing ProcessingChain config. See build_processing_chain() for details.

  • lh5_tables (list[str] | str | None) – list of LGDO groups to process in the input file. These table should include all input variables for processing or contain a subgroup called raw that contains such a table. If None, process all valid groups. Note that wildcards are accepted (e.g. “ch*”).

  • database (str | dict | None) – dictionary or name of JSON file containing a parameter database. See build_processing_chain() for details.

  • outputs (list[str] | None) – list of parameter names to write to the output file. If not provided, use list provided under "outputs" in the DSP configuration file.

  • n_max (int) – number of waveforms to process.

  • write_mode (str | None) –

    • None – create new output file if it does not exist

    • ’r’ – delete existing output file with same name before writing

    • ’a’ – append to end of existing output file

    • ’u’ – update values in existing output file

  • buffer_len (int) – number of waveforms to read/write from/to disk at a time.

  • block_width (int) – number of waveforms to process at a time.

  • chan_config (dict[str, str] | None) – contains JSON DSP configuration file names for every table in lh5_tables.

dspeed.cli module#

dspeed’s command line interface utilities.

dspeed.cli.dspeed_cli()#

dspeed’s command line interface.

Defines the command line interface (CLI) of the package, which exposes some of the most used functions to the console. This function is added to the entry_points.console_scripts list and defines the dspeed executable (see setuptools’ documentation). To learn more about the CLI, have a look at the help section:

$ dspeed --hep

dspeed.errors module#

exception dspeed.errors.DSPError#

Bases: Exception

Base class for signal processors.

exception dspeed.errors.DSPFatal(*args)#

Bases: DSPError

Fatal error thrown by DSP processors that halts production.

Variables:
  • wf_range (range) – range of wf indices. This will be set after the exception is caught, and appended to the error message

  • processor (str) – string of processor and arguments. This will be set after the exception is caught, and appended to the error message

exception dspeed.errors.ProcessingChainError#

Bases: DSPError

Error thrown when there is a problem setting up a processing chain.

dspeed.logging module#

This module implements some helpers for setting up logging.

dspeed.logging.setup(level: int = 20, logger: Logger | None = None) None#

Setup a colorful logging output.

If logger is None, sets up only the pygama logger.

Parameters:
  • level (int) – logging level (see logging module).

  • logger (Logger | None) – if not None, setup this logger.

Examples

>>> from pygama import logging
>>> logging.setup(level=logging.DEBUG)

dspeed.processing_chain module#

This module provides routines for setting up and running signal processing chains on waveform data.

class dspeed.processing_chain.CoordinateGrid(period: Quantity | Unit | str, offset: Quantity | ProcChainVar | float | int = 0)#

Bases: object

Helper class that describes a system of units, consisting of a period and offset.

period is a unitted pint.Quantity, offset is a scalar in units of period, a pint.Unit or a ProcChainVar. In the last case, a ProcChainVar variable is used to store a different offset for each event.

get_offset(unit: str | Unit | None = None) float#

Get the offset (convert)ed to unit. If unit is None use period.

Return type:

float

get_period(unit: str | Unit) float#
Return type:

float

offset: Quantity | ProcChainVar | float | int = 0#
period: Quantity | Unit | str#
unit_str() str#
Return type:

str

class dspeed.processing_chain.IOManager#

Bases: object

Base class.

IOManagers will be associated with a type of input/output buffer, and must define a read and write for each one. __init__() methods should update variable with any information from buffer, and check that buffer and variable are compatible.

_abc_impl = <_abc._abc_data object>#
abstract read(start: int, end: int) None#
abstract write(start: int, end: int) None#
class dspeed.processing_chain.LGDOArrayIOManager(io_array: Array, var: ProcChainVar)#

Bases: IOManager

IO Manager for buffers that are lgdo.Arrays.

_abc_impl = <_abc._abc_data object>#
read(start: int, end: int) None#
write(start: int, end: int) None#
class dspeed.processing_chain.LGDOArrayOfEqualSizedArraysIOManager(io_array: np.ArrayOfEqualSizedArrays, var: ProcChainVar)#

Bases: IOManager

IOManager for buffers that are lgdo.ArrayOfEqualSizedArrays.

_abc_impl = <_abc._abc_data object>#
read(start: int, end: int) None#
write(start: int, end: int) None#
class dspeed.processing_chain.LGDOVectorOfVectorsIOManager(io_vov: VectorOfVectors, var: ProcChainVar)#

Bases: IOManager

IOManager for buffers that are lgdo.VectorOfVectorss.

_abc_impl = <_abc._abc_data object>#
_vov2nda = <numba._GUFunc '_vov2nda'>#
read(start: int, end: int) None#
write(start: int, end: int) None#
class dspeed.processing_chain.LGDOWaveformIOManager(wf_table: WaveformTable, variable: ProcChainVar)#

Bases: IOManager

_abc_impl = <_abc._abc_data object>#
read(start: int, end: int) None#
write(start: int, end: int) None#
class dspeed.processing_chain.NumpyIOManager(io_buf: ndarray, var: ProcChainVar)#

Bases: IOManager

IOManager for buffers that are numpy.ndarrays.

_abc_impl = <_abc._abc_data object>#
read(start: int, end: int) None#
write(start: int, end: int) None#
class dspeed.processing_chain.ProcChainVar(proc_chain: ProcessingChain, name: str, shape: int | tuple[int, ...] = 'auto', dtype: dtype = 'auto', grid: CoordinateGrid = 'auto', unit: str | Unit = 'auto', is_coord: bool = 'auto', vector_len: str | ProcChainVar | None = None, is_const: bool = False)#

Bases: ProcChainVarBase

Helper data class with buffer and information for internal variables in ProcessingChain.

Members can be set to auto to attempt to deduce these when adding this variable to a processor for the first time.

Parameters:
  • proc_chain (ProcessingChain) – ProcessingChain that contains this variable.

  • name (str) – Name of variable used to look it up.

  • shape (int | tuple[int, ...]) – Shape of variable, without buffer_len dimension.

  • dtype (np.dtype) – Data type of variable.

  • grid (CoordinateGrid) – Coordinate grid associated with variable. This contains the period and offset of the variable. For variables where is_coord is True, use this to perform unit conversions.

  • unit (str | Unit) – Unit associated with variable during I/O.

  • is_coord (bool) – If True, variable represents an array index and can be converted into a unitted number using grid.

  • vector_len (str | ProcChainVar) – For VectorOfVector variables, this points to the variable used to represent the length of each vector

  • is_const (bool) – If True, variable is a constant. Variable will be set before executing, and will not be recomputed. Does not have outer dimension of size _block_width

_abc_impl = <_abc._abc_data object>#
_make_buffer() ndarray#
Return type:

ndarray

property buffer#
description() str#
Return type:

str

get_buffer(unit: str | Unit | None = None) ndarray#
Return type:

ndarray

property offset#
property period#
update_auto(shape: int | tuple[int, ...] = 'auto', dtype: np.dtype = 'auto', grid: CoordinateGrid = 'auto', unit: str | Unit = 'auto', is_coord: bool = 'auto', period: period = None, offset: offset = 0, vector_len: str | ProcChainVar = None) None#

Update any variables set to auto; leave the others alone. Emit a message only if anything was updated.

class dspeed.processing_chain.ProcessingChain(block_width: int = 8, buffer_len: int | None = None)#

Bases: object

A class to efficiently perform a sequence of digital signal processing (DSP) transforms.

It contains a list of DSP functions and a set of constant values and named variables contained in fixed memory locations. When executing the ProcessingChain, processors will act on the internal memory without allocating new memory in the process. Furthermore, the memory is allocated in blocks, enabling vectorized processing of many entries at once. To set up a ProcessingChain, use the following methods:

  • link_input_buffer() bind a named variable to an external NumPy array to read data from

  • add_processor() add a dsp function and bind its inputs to a set of named variables and constant values

  • link_output_buffer() bind a named variable to an external NumPy array to write data into

When calling these methods, the ProcessingChain class will use available information to allocate buffers to the correct sizes and data types. For this reason, transforms will ideally implement the numpy.ufunc class, enabling broadcasting of array dimensions. If not enough information is available to correctly allocate memory, it can be provided through the named variable strings or by calling add_vector or add_scalar.

Parameters:
  • block_width (int) – number of entries to simultaneously process.

  • buffer_len (int) – length of input and output buffers. Should be a multiple of block_width.

_astype(dtype: str) ProcChainVar#
Return type:

ProcChainVar

_execute_procs(begin: int, end: int) str#

Copy from input buffers to variables, call all the processors on their paired arg tuples, copy from variables to list of output buffers.

Return type:

str

_length() int#
Return type:

int

_loadlh5(path_in_file: str) array#

Load data from an LH5 file.

Args:

path_to_file (str): The path to the LH5 file. path_in_file (str): The path to the data within the LH5 file.

Returns:

list: The loaded data.

Return type:

array

_parse_expr(node: Any, expr: str, dry_run: bool, var_name_list: list[str]) Any#

Helper function for ProcessingChain.get_variable() that recursively evaluates the AST tree. Whenever we encounter a variable name, add it to var_name_list (which should begin as an empty list). Only add new variables and processors to the chain if dry_run is True. Based on this Stackoverflow answer.

Return type:

Any

_round(to_nearest: int | float | Unit | Quantity | CoordinateGrid = 1, dtype: str | None = None) float | Quantity | ProcChainVar#

Round a variable or value to nearest multiple of to_nearest. If var is a ProcChainVar, and to_nearest is a Unit or Quantity, return a new ProcChainVar with a period of to_nearest, and the underlying values and offset rounded. If var is a ProcChainVar and to_nearest is an int or a float, keep the unit and just round the underlying value.

Example usage: round(tp_0, wf.grid) - convert tp_0 to nearest array index of wf round(5*us, wf.period) - 5 us in wf clock ticks

Return type:

float | Quantity | ProcChainVar

_validate_name(name: str, raise_exception: bool = False) bool#

Check that name is alphanumeric, and not an already used keyword

Return type:

bool

add_processor(func: ufunc, *args, signature: str | None = None, types: list[str] | None = None) None#

Make a list of parameters from *args. Replace any strings in the list with NumPy objects from vars_dict, where able.

add_variable(name: str, dtype: np.dtype | str = 'auto', shape: int | tuple[int, ...] = 'auto', grid: CoordinateGrid = 'auto', unit: str | Unit = 'auto', is_coord: bool = 'auto', period: CoordinateGrid.period = None, offset: CoordinateGrid.offset = 0, vector_len: str | ProcChainVar = None) ProcChainVar#

Add a named variable containing a block of values or arrays.

Parameters:
  • name (str) – name of variable.

  • dtype (np.dtype | str) – default is None, meaning dtype will be deduced later, if possible.

  • shape (int | tuple[int, ...]) – length or shape tuple of element. Default is None, meaning length will be deduced later, if possible.

  • grid (CoordinateGrid) – for variable, containing period and offset.

  • unit (str | Unit) – unit of variable.

  • period (CoordinateGrid.period) – unit with period of waveform associated with object. Do not use if grid is provided.

  • offset (CoordinateGrid.offset) – unit with offset of waveform associated with object. Requires a period to be provided.

  • is_coord (bool) – if True, transform value based on period and offset.

Return type:

ProcChainVar

execute(start: int = 0, stop: int | None = None) None#

Execute the dsp chain on the entire input/output buffers.

func_list = {'astype': <function ProcessingChain._astype>, 'len': <function ProcessingChain._length>, 'loadlh5': <function ProcessingChain._loadlh5>, 'round': <function ProcessingChain._round>}#
get_variable(expr: str, get_names_only: bool = False, expr_only: bool = False) Any#

Parse string expr into a NumPy array or value, using the following syntax:

  • numeric values are parsed into ints or floats

  • units found in the pint package

  • other strings are parsed into variable names. If get_names_only is False, fetch the internal buffer (creating it as needed). Else, return a string of the name

  • if a string is followed by (...), try parsing into one of the following expressions:

    • len(expr): return the length of the array found with expr

    • round(expr): return the value found with expr to the nearest integer

    • varname(shape, type): allocate a new buffer with the specified shape and type, using varname. This is used if the automatic type and shape deduction for allocating variables fails

  • Unary and binary operators +, -, *, /, // are available. If a variable name is included in the expression, a processor will be added to the ProcessingChain and a new buffer allocated to store the output

  • varname[slice]: return the variable with a slice applied. Slice values can be floats, and will have round applied to them

  • keyword = expr: return a dict with a single element pointing from keyword to the parsed expr. This is used for kwargs. If expr_only is True, raise an exception if we see this.

If get_names_only is set to True, do not fetch or allocate new arrays, instead return a list of variable names found in the expression.

Return type:

Any

Link an input buffer to a variable.

Parameters:
  • varname (str) – name of internal variable to copy into buffer at the end of processor execution. If variable does not yet exist, it will be created with a similar shape to the provided buffer.

  • buff (ndarray | LGDO | None) – object to use as input buffer. If None, create a new buffer with a similar shape to the variable.

Returns:

bufferbuff or newly allocated input buffer.

Return type:

ndarray | LGDO

Link an output buffer to a variable.

Parameters:
  • varname (str) – name of internal variable to copy into buffer at the end of processor execution. If variable does not yet exist, it will be created with a similar shape to the provided buffer.

  • buff (ndarray | LGDO | None) – object to use as output buffer. If None, create a new buffer with a similar shape to the variable.

Returns:

bufferbuff or newly allocated output buffer.

Return type:

ndarray | LGDO

module_list = {'np': <module 'numpy' from '/home/docs/checkouts/readthedocs.org/user_builds/dspeed/envs/stable/lib/python3.10/site-packages/numpy/__init__.py'>, 'numpy': <module 'numpy' from '/home/docs/checkouts/readthedocs.org/user_builds/dspeed/envs/stable/lib/python3.10/site-packages/numpy/__init__.py'>}#
set_constant(varname: str, val: ndarray | int | float | Quantity, dtype: str | dtype | None = None, unit: str | Unit | Quantity | None = None) ProcChainVar#

Make a variable act as a constant and set it to val.

Parameters:
  • varname (str) – name of internal variable to set. If it does not exist, create it; otherwise, set existing variable to be constant

  • val (ndarray | int | float | Quantity) – value of constant

  • dtype (str | dtype | None) – dtype of constant

  • unit (str | Unit | Quantity | None) – unit of constant

Return type:

ProcChainVar

class dspeed.processing_chain.ProcessorManager(proc_chain: ProcessingChain, func: ufunc, params: list[str], kw_params: dict | None = None, signature: str | None = None, types: list[str] | None = None)#

Bases: object

The class that calls processors and makes sure variables are compatible.

class DimInfo(length: 'int', grid: 'CoordinateGrid')#

Bases: object

grid: CoordinateGrid#
length: int#
execute() None#
class dspeed.processing_chain.UnitConversionManager(var: ProcChainVar, unit: str | Unit | Quantity | CoordinateGrid, round=False)#

Bases: ProcessorManager

A special processor manager for handling converting variables between unit systems.

convert = <numba._DUFunc 'convert'>#
convert_int = <numba._DUFunc 'convert_int'>#
convert_round = <numba._DUFunc 'convert_round'>#
dspeed.processing_chain.build_processing_chain(lh5_in: Table, dsp_config: dict | str, db_dict: dict | None = None, outputs: list[str] | None = None, block_width: int = 16) tuple[ProcessingChain, list[str], Table]#

Produces a ProcessingChain object and an LH5 Table for output parameters from an input LH5 Table and a JSON recipe.

Parameters:
  • lh5_in (Table) – HDF5 table from which raw data is read. At least one row of entries should be read in prior to calling this!

  • dsp_config (dict | str) –

    A dictionary or JSON filename containing the recipes for computing DSP parameter from raw parameters. The format is as follows:

    {
       "outputs" : [ "par1", "par2" ]
       "processors" : {
          "name1, name2" : {
            "function" : "func1"
            "module" : "mod1"
            "args" : ["arg1", 3, "arg2"]
            "kwargs" : {"key1": "val1"}
            "init_args" : ["arg1", 3, "arg2"]
            "unit" : ["u1", "u2"]
            "defaults" : {"arg1": "defval1"}
          }
       }
    }
    
    • outputs – list of output parameters (strings) to compute by default. See outputs argument

    • processors – configuration dictionary

      • name1, name2 – dictionary. key contains comma-separated names of parameters computed

        • function – string, name of function to call. Function should implement the numpy.gufunc interface, a factory function returning a gufunc, or an arbitrary function that can be mapped onto a gufunc

        • module – string, name of module containing function

        • args– list of strings or numerical values. Contains list of names of computed and input parameters or constant values used as inputs to function. Note that outputs should be fed by reference as args! Arguments read from the database are prepended with db.

        • kwargs – dictionary. Keyword arguments for ProcesssingChain.add_processor().

        • init_args – list of strings or numerical values. List of names of computed and input parameters or constant values used to initialize a numpy.gufunc via a factory function

        • unit – list of strings. Units for parameters

        • defaults – dictionary. Default value to be used for arguments read from the database

  • db_dict (dict | None) – A nested dict pointing to values for database arguments. As instance, if a processor uses the argument db.trap.risetime, it will look up db_dict['trap']['risetime'] and use the found value. If no value is found, use the default defined in dsp_config.

  • outputs (list[str] | None) – List of parameters to put in the output LH5 table. If None, use the parameters in the "outputs" list from dsp_config.

  • block_width (int) – number of entries to process at once. To optimize performance, a multiple of 16 is preferred, but if performance is not an issue any value can be used.

Returns:

(proc_chain, field_mask, lh5_out)

  • proc_chainProcessingChain object that is executed

  • field_mask – list of input fields that are used

  • lh5_out – output Table containing processed values

Return type:

tuple[ProcessingChain, list[str], Table]

dspeed.units module#

dspeed.utils module#

class dspeed.utils.NumbaDefaults#

Bases: MutableMapping

Bare-bones class to store some Numba default options. Defaults values are set from environment variables

Examples

Set all default option values for a processor at once by expanding the provided dictionary:

>>> from numba import guvectorize
>>> from pygama.dsp.utils import numba_defaults_kwargs as nb_kwargs
>>> @guvectorize([], "", **nb_kwargs, nopython=True) # def proc(...): ...

Customize one argument but still set defaults for the others:

>>> from pygama.dsp.utils import numba_defaults as nb_defaults
>>> @guvectorize([], "", **nb_defaults(cache=False) # def proc(...): ...

Override global options at runtime:

>>> from pygama.dsp.utils import numba_defaults
>>> from pygama.dsp import build_dsp
>>> # must set options before explicitly importing pygama.dsp.processors!
>>> numba_defaults.cache = False
>>> numba_defaults.boundscheck = True
>>> build_dsp(...) # if not explicit, processors imports happen here
_abc_impl = <_abc._abc_data object>#
class dspeed.utils.ProcChainVarBase#

Bases: object

Base class.

ProcChainVar implements this class. This base class is used by processors that use ProcChainVar in their constructors.

_abc_impl = <_abc._abc_data object>#
dspeed.utils.getenv_bool(name: str, default: bool = False) bool#

Get environment value as a boolean, returning True for 1, t and true (caps-insensitive), and False for any other value and default if undefined.

Return type:

bool