Source code for pylops_distributed.basicoperators.BlockDiag

import numpy as np
import dask.array as da

from scipy.sparse.linalg.interface import _get_dtype
from pylops_distributed import LinearOperator


[docs]class BlockDiag(LinearOperator): r"""Block-diagonal operator. Create a block-diagonal operator from N linear operators. Parameters ---------- ops : :obj:`list` Linear operators to be stacked chunks : :obj:`tuple`, optional Chunks for model and data (an array with a single chunk is created if ``chunks`` is not provided) compute : :obj:`tuple`, optional Compute the outcome of forward and adjoint or simply define the graph and return a :obj:`dask.array` todask : :obj:`tuple`, optional Apply :func:`dask.array.from_array` to model and data before applying forward and adjoint respectively dtype : :obj:`str`, optional Type of elements in input array. Attributes ---------- shape : :obj:`tuple` Operator shape explicit : :obj:`bool` Operator contains a matrix that can be solved explicitly (``True``) or not (``False``) Notes ----- Refer to :class:`pylops.basicoperators.VStack` for implementation details. """ def __init__(self, ops, chunks=None, compute=(False, False), todask=(False, False), dtype=None): self.ops = ops mops = np.zeros(len(ops), dtype=np.int) nops = np.zeros(len(ops), dtype=np.int) for iop, oper in enumerate(ops): nops[iop] = oper.shape[0] mops[iop] = oper.shape[1] self.nops = nops.sum() self.mops = mops.sum() self.nnops = np.insert(np.cumsum(nops), 0, 0) self.mmops = np.insert(np.cumsum(mops), 0, 0) self.shape = (self.nops, self.mops) if dtype is None: self.dtype = _get_dtype(ops) else: self.dtype = np.dtype(dtype) self.chunks = (self.nops, self.mops) if chunks is None else chunks self.compute = compute self.todask = todask self.Op = None self.explicit = False def _matvec(self, x): y = [] for iop, oper in enumerate(self.ops): y.append(oper.matvec(x[self.mmops[iop]:self.mmops[iop + 1]]).squeeze()) y = da.concatenate(y) y = y.rechunk(self.chunks[0]) return y def _rmatvec(self, x): y = [] for iop, oper in enumerate(self.ops): y.append( oper.rmatvec(x[self.nnops[iop]:self.nnops[iop + 1]]).squeeze()) y = da.concatenate(y) y = y.rechunk(self.chunks[0]) return y