dspeed package

The dspeed signal processing framework is responsible for running a variety of discrete signal processors on data.

Subpackages

Submodules

dspeed.build_dsp module

This module provides high-level routines for running signal processing chains on waveform data.

dspeed.build_dsp.build_dsp(raw_in, dsp_out=None, dsp_config=None, lh5_tables=None, base_group=None, database=None, outputs=None, write_mode=None, entry_list=None, entry_mask=None, i_start=0, n_entries=None, buffer_len=3200, block_width=16, chan_config=None)

Convert raw-tier LH5 data into dsp-tier LH5 data by running a sequence of processors via the ProcessingChain.

Parameters:
  • raw_in (str | LGDO) – raw data to process. Can be name of raw-tier LH5 file to read from, LH5Iterator, or LGDO Table

  • dsp_out (str | None) – name of file in which to output data. If None return a lgdo.Struct or lgdo.Table

  • dsp_config (str | Mapping | None) –

    dict or name of JSON or YAML file containing the recipe for computing DSP parameters. If chan_config is provided, this is the default configuration to use. Can only be None if chan_config is provided, in which case we skip channels that are not found in chan_config The format is as follows:

    {
       "inputs" : [
         { "file": "fname", "group": "gname", "prefix": "pre_" },
        ]
       "outputs" : [ "par1", "par2" ]
       "processors" : {
           ...
        }
    }
    
    • inputs (optional) – list of files/lh5 table names to read input data from. these will be friended to any input data provided to build_processing_chain. - file – file path - group – lh5 table group name. - prefix (optional) – prefix to disambiguate variable names - suffix (optional) – suffix to disambiguate variable names

    • outputs (optional) – list of output parameters (strings) to compute by default. This will be used if no argument is provided for outputs

    • processors – configuration for ProcessingChain. See build_processing_chain() for details.

  • lh5_tables (Collection[str] | str | None) – list of LGDO groups to process in the input file. These table should include all input variables for processing or contain a subgroup called raw that contains such a table. If None, process all valid groups. Note that wildcards are accepted (e.g. “ch*”). Not a valid argument if raw_in is an lgdo.Table.

  • base_group (str | None) – name of group in which to find tables listed in lh5_tables. By default, check if there is a base group called raw, otherwise use no base.

  • database (str | Mapping | None) – dictionary or name of JSON or YAML file containing a parameter database. See build_processing_chain() for details.

  • outputs (Collection[str] | None) – list of parameter names to write to the output file. If not provided, use list provided under "outputs" in the DSP configuration file.

  • n_max – number of waveforms to process.

  • write_mode (str | None) –

    • None – create new output file if it does not exist

    • ’r’ – delete existing output file with same name before writing

    • ’a’ – append to end of existing output file

    • ’u’ – update values in existing output file

  • buffer_len (int) – number of waveforms to read/write from/to disk at a time.

  • block_width (int) – number of waveforms to process at a time.

  • chan_config (str | Mapping[str, str] | None) –

    an ordered mapping, or a json file containing such a mapping, from a channel or wildcard pattern to a DSP config. Loop over channels in lh5_tables and match them to a separate DSP config. If no matching channel or pattern is found, use dsp_config as a default. If channel matches several patterns, use the first one found; an ordered mapping can be used to override certain patterns. For example:

    {
        "ch1*": "config1.json",
        "ch2000000": "config2.json",
        "ch2*": "config3.json"
    }
    

    will process all channels beginning with 2, except for 2000000, with config3.

dspeed.cli module

dspeed’s command line interface utilities.

dspeed.cli.dspeed_cli()

dspeed’s command line interface.

Defines the command line interface (CLI) of the package, which exposes some of the most used functions to the console. This function is added to the entry_points.console_scripts list and defines the dspeed executable (see setuptools’ documentation). To learn more about the CLI, have a look at the help section:

$ dspeed --hep

dspeed.errors module

exception dspeed.errors.DSPError

Bases: Exception

Base class for signal processors.

exception dspeed.errors.DSPFatal(*args)

Bases: DSPError

Fatal error thrown by DSP processors that halts production.

Variables:
  • wf_range (range) – range of wf indices. This will be set after the exception is caught, and appended to the error message

  • processor (str) – string of processor and arguments. This will be set after the exception is caught, and appended to the error message

exception dspeed.errors.ProcessingChainError

Bases: DSPError

Error thrown when there is a problem setting up a processing chain.

dspeed.logging module

This module implements some helpers for setting up logging.

dspeed.logging.setup(level=20, logger=None)

Setup a colorful logging output.

If logger is None, sets up only the dpeed logger.

Parameters:
  • level (int) – logging level (see logging module).

  • logger (Logger | None) – if not None, setup this logger.

Examples

>>> from dspeed import logging
>>> logging.setup(level=logging.DEBUG)

dspeed.processing_chain module

This module provides routines for setting up and running signal processing chains on waveform data.

class dspeed.processing_chain.CoordinateGrid(period, offset=0)

Bases: object

Helper class that describes a system of units, consisting of a period and offset.

period is a unitted pint.Quantity, offset is a scalar in units of period, a pint.Unit or a ProcChainVar. In the last case, a ProcChainVar variable is used to store a different offset for each event.

get_offset(unit=None)

Get the offset (convert)ed to unit. If unit is None use period.

Return type:

float

get_period(unit)
Return type:

float

offset: Quantity | ProcChainVar | Real = 0
period: Quantity | Unit | str
unit_str()
Return type:

str

class dspeed.processing_chain.IOManager

Bases: object

Base class.

IOManagers will be associated with a type of input/output buffer, and must define a read and write for each one. __init__() methods should update variable with any information from buffer, and check that buffer and variable are compatible.

_abc_impl = <_abc._abc_data object>
abstract read(start, end)

Read from IO buffer into variable buffer

abstract set_buffer(io_buf)

Set IO buffer to read from/write to

abstract write(start, end)

Write from variable buffer to IO buffer

class dspeed.processing_chain.LGDOArrayIOManager(io_array, var)

Bases: IOManager

IO Manager for buffers that are lgdo.Arrays.

_abc_impl = <_abc._abc_data object>
read(start, end)

Read from IO buffer into variable buffer

set_buffer(io_array)

Set IO buffer to read from/write to

write(start, end)

Write from variable buffer to IO buffer

class dspeed.processing_chain.LGDOArrayOfEqualSizedArraysIOManager(io_array, var)

Bases: IOManager

IOManager for buffers that are lgdo.ArrayOfEqualSizedArrays.

_abc_impl = <_abc._abc_data object>
read(start, end)

Read from IO buffer into variable buffer

set_buffer(io_array)

Set IO buffer to read from/write to

write(start, end)

Write from variable buffer to IO buffer

class dspeed.processing_chain.LGDOVectorOfVectorsIOManager(io_vov, var)

Bases: IOManager

IOManager for buffers that are lgdo.VectorOfVectorss.

_abc_impl = <_abc._abc_data object>
_vov2nda = <numba._GUFunc '_vov2nda'>
read(start, end)

Read from IO buffer into variable buffer

set_buffer(io_vov)

Set IO buffer to read from/write to

write(start, end)

Write from variable buffer to IO buffer

class dspeed.processing_chain.LGDOWaveformIOManager(wf_table, variable)

Bases: IOManager

_abc_impl = <_abc._abc_data object>
read(start, end)

Read from IO buffer into variable buffer

set_buffer(wf_table)

Set IO buffer to read from/write to

write(start, end)

Write from variable buffer to IO buffer

class dspeed.processing_chain.NumpyIOManager(io_buf, var)

Bases: IOManager

IOManager for buffers that are numpy.ndarrays.

_abc_impl = <_abc._abc_data object>
read(start, end)

Read from IO buffer into variable buffer

set_buffer(io_buf)

Set IO buffer to read from/write to

write(start, end)

Write from variable buffer to IO buffer

class dspeed.processing_chain.ProcChainVar(proc_chain, name, shape='auto', dtype='auto', grid='auto', unit='auto', is_coord='auto', vector_len=None, is_const=False)

Bases: ProcChainVarBase

Helper data class with buffer and information for internal variables in ProcessingChain.

Members can be set to auto to attempt to deduce these when adding this variable to a processor for the first time.

Parameters:
  • proc_chain (ProcessingChain) – ProcessingChain that contains this variable.

  • name (str) – Name of variable used to look it up.

  • shape (int | tuple[int, ...]) – Shape of variable, without buffer_len dimension.

  • dtype (np.dtype) – Data type of variable.

  • grid (CoordinateGrid) – Coordinate grid associated with variable. This contains the period and offset of the variable. For variables where is_coord is True, use this to perform unit conversions.

  • unit (str | Unit) – Unit associated with variable during I/O.

  • is_coord (bool) – If True, variable represents an array index and can be converted into a unitted number using grid.

  • vector_len (str | ProcChainVar) – For VectorOfVector variables, this points to the variable used to represent the length of each vector

  • is_const (bool) – If True, variable is a constant. Variable will be set before executing, and will not be recomputed. Does not have outer dimension of size _block_width

_abc_impl = <_abc._abc_data object>
_make_buffer()
Return type:

ndarray

property buffer
description()
Return type:

str

get_buffer(unit=None)
Return type:

ndarray

property offset
property period
update_auto(shape='auto', dtype='auto', grid='auto', unit='auto', is_coord='auto', period=None, offset=0, vector_len=None)

Update any variables set to auto; leave the others alone. Emit a message only if anything was updated.

class dspeed.processing_chain.ProcessingChain(block_width=8, buffer_len=None)

Bases: object

A class to efficiently perform a sequence of digital signal processing (DSP) transforms.

It contains a list of DSP functions and a set of constant values and named variables contained in fixed memory locations. When executing the ProcessingChain, processors will act on the internal memory without allocating new memory in the process. Furthermore, the memory is allocated in blocks, enabling vectorized processing of many entries at once. To set up a ProcessingChain, use the following methods:

  • link_input_buffer() bind a named variable to an external NumPy array to read data from

  • add_processor() add a dsp function and bind its inputs to a set of named variables and constant values

  • link_output_buffer() bind a named variable to an external NumPy array to write data into

When calling these methods, the ProcessingChain class will use available information to allocate buffers to the correct sizes and data types. For this reason, transforms will ideally implement the numpy.ufunc class, enabling broadcasting of array dimensions. If not enough information is available to correctly allocate memory, it can be provided through the named variable strings or by calling add_vector or add_scalar.

Parameters:
  • block_width (int) – number of entries to simultaneously process.

  • buffer_len (int) – length of input and output buffers. Should be a multiple of block_width.

_astype(var, dtype)
Return type:

ProcChainVar

_execute_procs(begin, end)

Copy from input buffers to variables, call all the processors on their paired arg tuples, copy from variables to list of output buffers.

Return type:

str

_isfinite(var)

Is value finite (i.e. not NaN or infinite)

_isnan(var)

Is value NaN

_length(var)
Return type:

int

_loadlh5(path_to_file, path_in_file)

Load data from an LH5 file.

Args:

path_to_file (str): The path to the LH5 file. path_in_file (str): The path to the data within the LH5 file.

Returns:

list: The loaded data.

Return type:

array

_parse_expr(node, expr, dry_run, var_name_list)

Helper function for ProcessingChain.get_variable() that recursively evaluates the AST tree. Whenever we encounter a variable name, add it to var_name_list (which should begin as an empty list). Only add new variables and processors to the chain if dry_run is True. Based on this Stackoverflow answer.

Return type:

Any

_round(var, to_nearest=1, dtype=None, mode='round')

Round a variable or value to nearest multiple of to_nearest. If var is a ProcChainVar, and to_nearest is a Unit or Quantity, return a new ProcChainVar with a period of to_nearest, and the underlying values and offset rounded. If var is a ProcChainVar and to_nearest is an int or a float, keep the unit and just round the underlying value. Can change mode to “floor”, “ceil”, or “trunc”

Example usage: round(tp_0, wf.grid) - convert tp_0 to nearest array index of wf round(5*us, wf.period) - 5 us in wf clock ticks

Return type:

float | Quantity | ProcChainVar

_validate_name(name, raise_exception=False)

Check that name is alphanumeric, and not an already used keyword

Return type:

bool

_where(condition, a, b, dtype='auto')

Select value from a or b depending on if condition is True or False. Used for the where function or a if b else c pattern.

Return type:

ProcChainVar

add_processor(func, *args, signature=None, types=None, coord_grid=None)

Make a list of parameters from *args. Replace any strings in the list with NumPy objects from vars_dict, where able.

add_variable(name, dtype='auto', shape='auto', grid='auto', unit='auto', is_coord='auto', period=None, offset=0, vector_len=None)

Add a named variable containing a block of values or arrays.

Parameters:
  • name (str) – name of variable.

  • dtype (np.dtype | str) – default is None, meaning dtype will be deduced later, if possible.

  • shape (int | tuple[int, ...]) – length or shape tuple of element. Default is None, meaning length will be deduced later, if possible.

  • grid (CoordinateGrid) – for variable, containing period and offset.

  • unit (str | Unit) – unit of variable.

  • period (CoordinateGrid.period) – unit with period of waveform associated with object. Do not use if grid is provided.

  • offset (CoordinateGrid.offset) – unit with offset of waveform associated with object. Requires a period to be provided.

  • is_coord (bool) – if True, transform value based on period and offset.

Return type:

ProcChainVar

execute(start=0, stop=None)

Execute the dsp chain on the entire input/output buffers.

func_list = {'astype': <function ProcessingChain._astype>, 'ceil': functools.partial(<function ProcessingChain._round>, mode='ceil'), 'floor': functools.partial(<function ProcessingChain._round>, mode='floor'), 'isfinite': <function ProcessingChain._isfinite>, 'isnan': <function ProcessingChain._isnan>, 'len': <function ProcessingChain._length>, 'loadlh5': <function ProcessingChain._loadlh5>, 'round': functools.partial(<function ProcessingChain._round>, mode='round'), 'trunc': functools.partial(<function ProcessingChain._round>, mode='trunc'), 'where': <function ProcessingChain._where>}
get_timing()

Get the timing of each processor in the processing chain.

Return type:

dict[str, float]

get_variable(expr, get_names_only=False, expr_only=False)

Parse string expr into a NumPy array or value, using the following syntax:

  • numeric values are parsed into ints or floats

  • units found in the pint package

  • other strings are parsed into variable names. If get_names_only is False, fetch the internal buffer (creating it as needed). Else, return a string of the name

  • if a string is followed by (...), try parsing into one of the following expressions:

    • len(expr): return the length of the array found with expr

    • astype(expr, dtype): cast expr to dtype

    • round(expr, to_nearest = 1, [dtype]): return the value found with

      expr rounded to the nearest multiple of to_nearest

    • floor(expr, to_nearest = 1, [dtype]): return the value found with

      expr rounded to last multiple of to_nearest smaller

    • ceil(expr, to_nearest = 1, [dtype]): return the value found with

      expr rounded to first multiple of to_nearest larger

    • trunc(expr, to_nearest = 1, [dtype]): return the value found with

      expr rounded to first multiple of to_nearest towards zero

    • where(condition, a, b, [dtype]): if condition is True return the

      value held in a, else b

    • isnan(expr): return True if expr is NaN

    • isfinite(expr): return True` if not NaN inf or -inf

    • varname(shape, type): allocate a new buffer with the specified shape and type, using varname. This is used if the automatic type and shape deduction for allocating variables fails

    • loadlh5(file, group): load LH5 object held in group of lh5

      file. Returned object will be treated as a const.

  • Unary and binary operators +, -, *, /, // are available. If a variable name is included in the expression, a processor will be added to the ProcessingChain and a new buffer allocated to store the output

  • varname[slice]: return the variable with a slice applied. Slice values can be floats, and will have round applied to them

  • keyword = expr: return a dict with a single element pointing from keyword to the parsed expr. This is used for kwargs. If expr_only is True, raise an exception if we see this.

  • a if b else c: see where; return value held in a if b is True, else c

If get_names_only is set to True, do not fetch or allocate new arrays, instead return a list of variable names found in the expression.

Return type:

Any

See link_io_buffer()

Return type:

ndarray | LGDO

Link an IO buffer to a variable.

Parameters:
  • varname (str) – name of internal variable to copy into buffer at the end of processor execution. If variable does not yet exist, it will be created with a similar shape to the provided buffer.

  • buff (ndarray | LGDO | None) – object to use as input buffer. If None, create a new buffer with a similar shape to the variable.

  • output (bool) – if True, link as an output buffer; if False (default) link as input

Returns:

bufferbuff or newly allocated input buffer.

Return type:

ndarray | LGDO

See link_io_buffer()

Return type:

ndarray | LGDO

module_list = {'np': <module 'numpy' from '/home/docs/checkouts/readthedocs.org/user_builds/dspeed/envs/latest/lib/python3.10/site-packages/numpy/__init__.py'>, 'numpy': <module 'numpy' from '/home/docs/checkouts/readthedocs.org/user_builds/dspeed/envs/latest/lib/python3.10/site-packages/numpy/__init__.py'>}
set_constant(varname, val, dtype=None, unit=None)

Make a variable act as a constant and set it to val.

Parameters:
  • varname (str) – name of internal variable to set. If it does not exist, create it; otherwise, set existing variable to be constant

  • val (ndarray | Real | Quantity) – value of constant

  • dtype (str | dtype | None) – dtype of constant

  • unit (str | Unit | Quantity | None) – unit of constant

Return type:

ProcChainVar

class dspeed.processing_chain.ProcessorManager(proc_chain, func, params, kw_params=None, signature=None, types=None, grid=None)

Bases: object

The class that calls processors and makes sure variables are compatible.

class DimInfo(length: 'int', grid: 'CoordinateGrid')

Bases: object

grid: CoordinateGrid
length: int
execute()
class dspeed.processing_chain.UnitConversionManager(var, unit, mode=None)

Bases: ProcessorManager

A special processor manager for handling converting variables between unit systems.

dspeed.processing_chain.build_processing_chain(processors, tb_in=None, db_dict=None, outputs=None, block_width=16)

Produces a ProcessingChain object and an LGDO Table for output parameters from an input LGDO Table and a JSON or YAML recipe.

Parameters:
  • processors (dict | str) –

    A dictionary or YAML/JSON filename containing the recipes for computing DSP parameter from raw parameters. The format is as follows:

    outputs: [par1, par2]
    processors:
      "name1, name2":
        function: func1
        module: mod1
        args: [arg1, 3, arg2]
        kwargs:
          key1: val1
        init_args: [arg1, 3, arg2]
        unit: [u1 u2]
        defaults:
          arg1: defval1
    
    • name1, name2 – dictionary. key contains comma-separated names of parameters computed

      • name1, name2 – dictionary. key contains comma-separated names of parameters computed

        • function – string, name of function to call. Function should implement the numpy.gufunc interface, a factory function returning a gufunc, or an arbitrary function that can be mapped onto a gufunc

        • module – string, name of module containing function

        • args– list of strings or numerical values. Contains list of names of computed and input parameters or constant values used as inputs to function. Note that outputs should be fed by reference as args! Arguments read from the database are prepended with db.

        • kwargs – dictionary. Keyword arguments for ProcessingChain.add_processor().

        • init_args – list of strings or numerical values. List of names of computed and input parameters or constant values used to initialize a numpy.gufunc via a factory function

        • unit – list of strings. Units for parameters

        • defaults – dictionary. Default value to be used for arguments read from the database

  • tb_in (Table | None) – input table. This table will be linked to use as inputs when executing processors. Can be empty (for now), as long as fields and attrs are set.

  • db_dict (dict | None) – A nested dict pointing to values for database arguments. As instance, if a processor uses the argument db.trap.risetime, it will look up db_dict['trap']['risetime'] and use the found value. If no value is found, use the default defined in processors.

  • outputs (list[str] | None) – List of parameters to put in the output LGDO table.

  • block_width (int) – number of entries to process at once. To optimize performance, a multiple of 16 is preferred, but if performance is not an issue any value can be used.

Returns:

(proc_chain, field_mask, tb_out)

  • proc_chainProcessingChain object that is executed

  • field_mask – list of names of input fields that will be used. This can be used to ensure only needed values are read in.

  • tb_out – output Table with size 0, with fields and attrs set up to contain outputs

Return type:

tuple[ProcessingChain, list[str], Table]

dspeed.processing_chain.is_in_pint(unit)

dspeed.units module

dspeed.utils module

class dspeed.utils.GUFuncWrapper(fun, signature, types, name=None, vectorized=False, copy_out=True, doc_string=None)

Bases: object

A wrapper class to create a u-func like object from an arbitrary function. This class is callable and is intended for use for processors that require setup with persistent state information; these processors are generated using the “factory” method and typically utilize “init_args”

Example 1:

# set up some object 'obj' that has a function we want to call on w_in
gufunc = GUFuncWrapper(
    lambda w_in: obj.execute(w_in, args...),
    "(n)->()",
    "ff"
)

Example 2:

# fun is a vectorized python function, but we want to use ufunc interface
gufunc = GUFuncWrapper(
    lambda w_in, a, w_out: fun(w_in, a, out=w_out, ...more kwargs),
    "(n),()->(n)",
    "fff",
    vectorized=True,
    copy_out=False
)
param fun:

python function to be wrapped

param signature:

gufunction signature (see https://numpy.org/doc/2.1/reference/c-api/generalized-ufuncs.html)

param types:

string of type chars, e.g. fi->f

param name:

name of function. By default use fun.__name__ (this can be very unhelpful, e.g. “<lambda>”)

param vectorized:

if False, use np.vectorize to loop over function. Set to True if fun is already vectorized

param copy_out:

set to False if function does in-place calculation for outputs. Cannot be False if vectorized is also False

param doc_string:

manually set doc string. If None, use docstring of fun if it exists. Else use this docstring.

class dspeed.utils.NumbaDefaults

Bases: MutableMapping

Bare-bones class to store some Numba default options. Defaults values are set from environment variables

Examples

Set all default option values for a processor at once by expanding the provided dictionary:

>>> from numba import guvectorize
>>> from pygama.dsp.utils import numba_defaults_kwargs as nb_kwargs
>>> @guvectorize([], "", **nb_kwargs, nopython=True) # def proc(...): ...

Customize one argument but still set defaults for the others:

>>> from pygama.dsp.utils import numba_defaults as nb_defaults
>>> @guvectorize([], "", **nb_defaults(cache=False) # def proc(...): ...

Override global options at runtime:

>>> from pygama.dsp.utils import numba_defaults
>>> from pygama.dsp import build_dsp
>>> # must set options before explicitly importing pygama.dsp.processors!
>>> numba_defaults.cache = False
>>> numba_defaults.boundscheck = True
>>> build_dsp(...) # if not explicit, processors imports happen here
_abc_impl = <_abc._abc_data object>
class dspeed.utils.ProcChainVarBase

Bases: object

Base class.

ProcChainVar implements this class. This base class is used by processors that use ProcChainVar in their constructors.

_abc_impl = <_abc._abc_data object>
dspeed.utils.dspeed_guvectorize(*args, **kwargs)

Decorator to create a callable object implementing the gufunc interface. See arguments in GUFuncWrapper initializer

dspeed.utils.getenv_bool(name, default=False)

Get environment value as a boolean, returning True for 1, t and true (caps-insensitive), and False for any other value and default if undefined.

Return type:

bool