import logging
import numpy as np
import dask.array as da
from math import sqrt
from pylops.waveeqprocessing.mdd import _MDC
from pylops_distributed import LinearOperator
from pylops_distributed.utils import dottest as Dottest
from pylops_distributed import Identity, Transpose
from pylops_distributed.signalprocessing import FFT, Fredholm1
from pylops_distributed.optimization.cg import cgls
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.WARNING)
[docs]class MDC(LinearOperator):
r"""Multi-dimensional convolution.
Apply multi-dimensional convolution between two datasets.
Model and data should be provided after flattening 2- or 3-dimensional
arrays of size :math:`[n_t \times n_r (\times n_{vs})]` and
:math:`[n_t \times n_s (\times n_{vs})]` (or :math:`2*n_t-1` for
``twosided=True``), respectively.
Parameters
----------
G : :obj:`dask.array.ndarray`
Multi-dimensional convolution kernel in frequency domain of size
:math:`[n_{fmax} \times n_s \times n_r]`
nt : :obj:`int`
Number of samples along time axis
nv : :obj:`int`
Number of samples along virtual source axis
dt : :obj:`float`, optional
Sampling of time integration axis
dr : :obj:`float`, optional
Sampling of receiver integration axis
twosided : :obj:`bool`, optional
MDC operator has both negative and positive time (``True``) or
only positive (``False``)
saveGt : :obj:`bool`, optional
Save ``G`` and ``G^H`` to speed up the computation of adjoint of
:class:`pylops_distributed.signalprocessing.Fredholm1` (``True``) or create
``G^H`` on-the-fly (``False``) Note that ``saveGt=True`` will be
faster but double the amount of required memory
conj : :obj:`str`, optional
Perform Fredholm integral computation with complex conjugate of ``G``
prescaled : :obj:`bool`, optional
Apply scaling to kernel (``False``) or not (``False``) when performing
spatial and temporal summations. In case ``prescaled=True``, the
kernel is assumed to have been pre-scaled when passed to the MDC
routine.
compute : :obj:`tuple`, optional
Compute the outcome of forward and adjoint or simply define the graph
and return a :obj:`dask.array`
todask : :obj:`tuple`, optional
Apply :func:`dask.array.from_array` to model and data before applying
forward and adjoint respectively
dtype : :obj:`str`, optional
Type of elements in input array. If ``None``, automatically inferred
from ``G``
Notes
-----
Refer to :class:`pylops.waveeqprocessing.MDC` for implementation
details.
"""
def __init__(self, G, nt, nv, dt=1., dr=1., twosided=True,
saveGt=False, conj=False, prescaled=False,
chunks=(None, None), compute=(False, False),
todask=(False, False), dtype=None):
if twosided and nt % 2 == 0:
raise ValueError('nt must be odd number')
# store G
self.G = G
self.nfmax, self.ns, self.nr = self.G.shape
self.saveGt = saveGt
if self.saveGt:
self.GT = (G.transpose((0, 2, 1)).conj()).persist()
# ensure that nfmax is not bigger than allowed
self.nfft = int(np.ceil((nt + 1) / 2))
if self.nfmax > self.nfft:
self.nfmax = self.nfft
logging.warning('nfmax set equal to ceil[(nt+1)/2=%d]' % self.nfmax)
# store other input parameters
self.nt, self.nv = nt, nv
self.dt, self.dr = dt, dr
self.twosided = twosided
self.conj = conj
self.prescaled = prescaled
self.dims = (self.nt, self.nr, self.nv)
self.dimsd = (self.nt, self.ns, self.nv)
self.dimsdf = (self.nfft, self.ns, self.nv)
# find out dtype of G
self.cdtype = self.G[0, 0, 0].dtype
if dtype is None:
self.dtype = np.real(np.ones(1, dtype=self.cdtype)).dtype
else:
self.dtype = dtype
self.shape = (np.prod(self.dimsd), np.prod(self.dims))
self.compute = compute
self.chunks = chunks
self.todask = todask
self.Op = None
self.explicit = False
def _matvec(self, x):
# apply forward fft
x = da.reshape(x, self.dims)
if self.twosided:
x = da.fft.ifftshift(x, axes=0)
y = sqrt(1. / self.nt) * da.fft.rfft(x, n=self.nt, axis=0)
y = y.astype(self.cdtype)
y = y[:self.nfmax]
# apply batched matrix mult
y = y.rechunk((self.G.chunks[0], self.nr, self.nv))
if self.conj:
y = y.conj()
y = da.matmul(self.G, y)
if self.conj:
y = y.conj()
if not self.prescaled:
y *= self.dr * self.dt * np.sqrt(self.nt)
# apply inverse fft
y = da.pad(y, ((0, self.nfft - self.nfmax), (0, 0), (0, 0)), mode='constant')
y = y.rechunk(self.dimsdf)
y = sqrt(self.nt) * da.fft.irfft(y, n=self.nt, axis=0)
y = y.astype(self.dtype)
y = da.real(y)
return y.ravel()
def _rmatvec(self, x):
# apply forward fft
x = da.reshape(x, self.dimsd)
y = sqrt(1. / self.nt) * da.fft.rfft(x, n=self.nt, axis=0)
y = y.astype(self.cdtype)
y = y[:self.nfmax]
# apply batched matrix mult
y = y.rechunk((self.G.chunks[0], self.nr, self.nv))
if self.saveGt:
if self.conj:
y = y.conj()
y = da.matmul(self.GT, y)
if self.conj:
y = y.conj()
else:
if self.conj:
y = da.matmul(y.transpose(0, 2, 1), self.G).transpose(0, 2, 1)
else:
y = da.matmul(y.transpose(0, 2, 1).conj(), self.G).transpose(0, 2, 1).conj()
if not self.prescaled:
y *= self.dr * self.dt * np.sqrt(self.nt)
# apply inverse fft
y = da.pad(y, ((0, self.nfft - self.nfmax), (0, 0), (0, 0)), mode='constant')
y = y.rechunk(self.dimsdf)
y = sqrt(self.nt) * da.fft.irfft(y, n=self.nt, axis=0)
if self.twosided:
y = da.fft.fftshift(y, axes=0)
y = y.astype(self.dtype)
y = da.real(y)
return y.ravel()
[docs]def MDD(G, d, dt=0.004, dr=1., nfmax=None, wav=None,
twosided=True, adjoint=False, dottest=False,
saveGt=False, add_negative=True, **kwargs_cgls):
r"""Multi-dimensional deconvolution.
Solve multi-dimensional deconvolution problem using
:py:func:`scipy.sparse.linalg.lsqr` iterative solver.
Parameters
----------
G : :obj:`dask.array.ndarray`
Multi-dimensional convolution kernel in frequency domain of size
:math:`[n_{f,max} \times n_s \times n_r]`
d : :obj:`dask.array.ndarray`
Data in time domain :math:`[n_t \times n_s (\times n_vs)]` if
``twosided=False`` or ``twosided=True`` and ``add_negative=True``
(with only positive times) or size
:math:`[2*n_t-1 \times n_s (\times n_vs)]` if ``twosided=True``
dt : :obj:`float`, optional
Sampling of time integration axis
dr : :obj:`float`, optional
Sampling of receiver integration axis
nfmax : :obj:`int`, optional
Index of max frequency to include in deconvolution process
wav : :obj:`numpy.ndarray`, optional
Wavelet to convolve to the inverted model and psf
(must be centered around its index in the middle of the array).
If ``None``, the outputs of the inversion are returned directly.
twosided : :obj:`bool`, optional
MDC operator and data both negative and positive time (``True``)
or only positive (``False``)
add_negative : :obj:`bool`, optional
Add negative side to MDC operator and data (``True``) or not
(``False``)- operator and data are already provided with both positive
and negative sides. To be used only with ``twosided=True``.
adjoint : :obj:`bool`, optional
Compute and return adjoint(s)
dottest : :obj:`bool`, optional
Apply dot-test
saveGt : :obj:`bool`, optional
Save ``G`` and ``G^H`` to speed up the computation of adjoint of
:class:`pylops_distributed.signalprocessing.Fredholm1` (``True``) or
create ``G^H`` on-the-fly (``False``) Note that ``saveGt=True`` will be
faster but double the amount of required memory
**kwargs_cgls
Arbitrary keyword arguments for
:py:func:`pylops_distributed.optimization.cg.cgls` solver
Returns
-------
minv : :obj:`dask.array.ndarray`
Inverted model of size :math:`[n_t \times n_r (\times n_{vs})]`
for ``twosided=False`` or
:math:`[2*n_t-1 \times n_r (\times n_vs)]` for ``twosided=True``
madj : :obj:`dask.array.ndarray`
Adjoint model of size :math:`[n_t \times n_r (\times n_{vs})]`
for ``twosided=False`` or
:math:`[2*n_t-1 \times n_r (\times n_r) ]` for ``twosided=True``
See Also
--------
MDC : Multi-dimensional convolution
Notes
-----
Refer to :class:`pylops.waveeqprocessing.MDD` for implementation
details. Note that this implementation is currently missing the ``wav``
and ``causality_precond=False`` options.
"""
nf, ns, nr = G.shape
nt = d.shape[0]
if len(d.shape) == 2:
nv = 1
else:
nv = d.shape[2]
if twosided:
if add_negative:
nt2 = 2 * nt - 1
else:
nt2 = nt
nt = (nt2 + 1) // 2
nfmax_allowed = int(np.ceil((nt2+1)/2))
else:
nt2 = nt
nfmax_allowed = nt
# Fix nfmax to be at maximum equal to half of the size of fft samples
if nfmax is None or nfmax > nfmax_allowed:
nfmax = nfmax_allowed
logging.warning('nfmax set equal to ceil[(nt+1)/2=%d]' % nfmax)
# Add negative part to data and model
if twosided and add_negative:
if nv == 1:
d = da.concatenate((da.zeros((nt - 1, ns)), d), axis=0)
else:
d = da.concatenate((da.zeros((nt - 1, ns, nv)), d), axis=0)
d = d.rechunk(d.shape)
# Define MDC linear operator
MDCop = MDC(G, nt2, nv=nv, dt=dt, dr=dr,
twosided=twosided, saveGt=saveGt)
if dottest:
Dottest(MDCop, nt2 * ns * nv, nt2 * nr * nv, verb=True)
# Adjoint
if adjoint:
madj = MDCop.H * d.flatten()
madj = da.squeeze(madj.reshape(nt2, nr, nv))
# Inverse
minv = cgls(MDCop, d.flatten(), **kwargs_cgls)[0]
minv = da.squeeze(minv.reshape(nt2, nr, nv))
#if wav is not None:
# minv = sp_convolve1d(minv, wav, axis=-1)
if adjoint:
return minv, madj
else:
return minv