"""
This file is part of APAV.
APAV is a python package for performing analysis and visualization on
atom probe tomography data sets.
Copyright (C) 2018 Jesse Smith
APAV is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
APAV is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with APAV. If not, see <http://www.gnu.org/licenses/>.
"""
from typing import Sequence, Tuple, List, Dict, Any, Union, Type, Optional, TYPE_CHECKING
from numbers import Real, Number
from numpy import ndarray
from apav.analysis.base import AnalysisBase
from apav.utils import validate
from apav import Roi, RangeCollection, Ion
from apav.core.histogram import histogram2d_binwidth
from apav.core.multipleevent import get_mass_indices
from apav.core.isotopic import Element
from scipy.ndimage import gaussian_filter
import numpy as n
import multiprocessing as mp
from apav.analysis.grid_transfer import transfer as _transfer
[docs]def ion_transfer(X: n.ndarray, Y: n.ndarray, Z: n.ndarray, pos: n.ndarray, stddev3: Number) -> ndarray:
"""
Transfer an array of ion positions to a binned grid.
:param X: 3D array of x-coordinates of grid
:param Y: 3D array of y-coordinates of grid
:param Y: 3D array of z-coordinates of grid
:param pos: 2D array of positions
:param stddev3: 3sigma standard deviation of gaussian distribution
:return: 3D array of counts
"""
if len(pos.shape) != 2:
raise ValueError("Positions must be a 2D array")
if pos.size == 0:
raise ValueError("At least one ion position must be provided")
if any(len(i.shape) != 3 for i in [X, Y, Z]):
raise ValueError("All grid coordinate arrays must be three-dimensional")
validate.positive_number(stddev3)
if n.isclose(stddev3, 0):
binx = X[1, 0, 0] - X[0, 0, 0]
biny = Y[0, 1, 0] - Y[0, 0, 0]
binz = Z[0, 0, 1] - Z[0, 0, 0]
x_edge = n.concatenate([X[:, 0, 0] - binx / 2, [X[-1, 0, 0] + binx / 2]])
y_edge = n.concatenate([Y[0, :, 0] - biny / 2, [Y[0, -1, 0] + biny / 2]])
z_edge = n.concatenate([Z[0, 0, :] - binz / 2, [Z[0, 0, -1] + binz / 2]])
counts, _ = n.histogramdd(pos, bins=(x_edge, y_edge, z_edge))
return counts
else:
return _transfer(
X.astype(n.double), Y.astype(n.double), Z.astype(n.double), pos.astype(n.double), float(stddev3)
)
[docs]def make_coordinate_grids(
extents: Sequence[Tuple[Number, Number]], bin_width: Union[Sequence[Number], Number], edges=False
) -> Tuple[ndarray, ndarray, ndarray]:
"""
Generate 3D x/y/z coordinate arrays for indexing into compositional grids
:param extents: The x/y/z extent to generate the grids for
:param bin_width: The bin width of each bin, a single number or sequence of numbers for each dimension
:param edges: Whether the coordinates represent the edges of the bins or centers
"""
assert len(extents) == 3
for i in extents:
validate.interval(i)
assert all(len(i) == 2 for i in extents)
if hasattr(bin_width, "__iter__"):
assert len(bin_width) == 3
if isinstance(bin_width, (float, int)):
bin_width = [
bin_width,
] * 3
bin_width = [float(i) for i in bin_width]
validate.all_positive_nonzero(bin_width)
ext_x, ext_y, ext_z = extents
dx = n.abs(n.diff(ext_x)[0])
dy = n.abs(n.diff(ext_y)[0])
dz = n.abs(n.diff(ext_z)[0])
nx = int(n.ceil(dx / bin_width[0]))
ny = int(n.ceil(dy / bin_width[1]))
nz = int(n.ceil(dz / bin_width[2]))
x = n.array([ext_x[0] + i * bin_width[0] for i in range(nx)])
y = n.array([ext_y[0] + i * bin_width[1] for i in range(ny)])
z = n.array([ext_z[0] + i * bin_width[2] for i in range(nz)])
if x[-1] % 1 == 0:
x = n.concatenate([x, x[-1] + [bin_width[0]]])
y = n.concatenate([y, y[-1] + [bin_width[1]]])
z = n.concatenate([z, z[-1] + [bin_width[2]]])
if edges is True:
x -= bin_width[0] / 2
y -= bin_width[1] / 2
z -= bin_width[2] / 2
x = n.concatenate([x, [x[-1] + bin_width[0]]])
y = n.concatenate([y, [y[-1] + bin_width[1]]])
z = n.concatenate([z, [z[-1] + bin_width[2]]])
return n.meshgrid(x, y, z, indexing="ij")
[docs]class RangedGrid(AnalysisBase):
"""
Compute the ionic and elemental composition spatially distributed among a structured grid
"""
def __init__(
self,
roi: Roi,
ranges: RangeCollection,
bin_width: Number = 1,
first_pass: bool = True,
delocalization: Union[Number, Sequence[Number]] = n.array([3, 3, 1.5]),
gauss_trunc: Number = 4,
):
"""
:param roi: Parent the RangedGrid is competed on
:param ranges: RangeCollection defining the ranges
:param bin_width: symmetric bin width size
:param first_pass: Whether the first pass delocalization is computed using a gaussian transfer function.
:param delocalization: The delocalization distances (as 3 standard deviations of a normal distribution)
:param gauss_trunc: Number of standard deviations to truncate the gaussian kernel for second pass delocalization
"""
super().__init__(roi)
self._ranges = validate.is_type(ranges, RangeCollection)
self._voxel = float(validate.positive_nonzero_number(bin_width))
if isinstance(delocalization, Real):
self._delocalization = n.array([delocalization])
else:
self._delocalization = n.array(delocalization)
if len(self._delocalization.shape) == 1 and self._delocalization.shape[0] == 1:
self._delocalization = n.ones(3) * self._delocalization[0]
if not all(i > 0 for i in self._delocalization):
raise ValueError("Delocalization distances must be positive and non-zero")
if self._delocalization.shape[0] != 3:
raise ValueError(f"Unexpected delocalization shape, expected 3 got {self._delocalization.shape[0]}")
self._gauss_trunc = validate.positive_nonzero_number(gauss_trunc)
self._X = ndarray([])
self._Y = ndarray([])
self._Z = ndarray([])
self._ion_counts = {}
self._elem_counts_array = ndarray([])
self._elem_frac = {}
self._elem_counts = {}
self._elem_cum_counts = None
self._first_pass = first_pass
self._calculate()
@property
def ranges(self) -> RangeCollection:
"""
The ranges used for ranging the mass spectrum
"""
return self._ranges
@property
def extents(self) -> Tuple[Tuple[float, float], Tuple[float, float], Tuple[float, float]]:
"""
Get the spatial extents (by center positions) of the grids
"""
return (
(self._X.min(), self._X.max()),
(self._Y.min(), self._Y.max()),
(self._Z.min(), self._Z.max()),
)
@property
def first_pass(self) -> bool:
"""
Whether to compute first pass delocalization
"""
return self._first_pass
@property
def centers(self) -> Tuple[ndarray, ndarray, ndarray]:
"""
The center positions of the structured grids
For MxNxP voxels this returns 3 arrays of dimensions: Mx1x1, 1xNx1, 1x1xP
"""
return self._X, self._Y, self._Z
@property
def bin_width(self) -> float:
"""
Bin width of the voxels
"""
return self._voxel
@property
def delocalization(self) -> ndarray:
"""
Amount of smoothing used during the delocalization process
"""
return self._delocalization
@property
def gauss_trunc(self) -> Number:
"""
Where to truncate the gaussian kernel for second pass delocalization
"""
return self._gauss_trunc
@property
def all_ionic_counts(self) -> Dict[Ion, ndarray]:
"""
Get all ionic count grids in a dict
"""
return self._ion_counts
@property
def all_elemental_frac(self) -> Dict[Element, ndarray]:
"""
Get all elemental fraction grids as a dict
"""
return self._elem_frac
@property
def all_elemental_frac_str(self) -> Dict[str, ndarray]:
"""
Get all elemental fraction grids as a dictionary (using elemental symbols)
"""
return {i.symbol: j for i, j in self._elem_frac.items()}
@property
def elemental_counts_total(self) -> Number:
"""
Get the total (sum) of all elemental counts
"""
return self._elem_cum_counts
@property
def elemental_counts_grid(self) -> ndarray:
"""
Get an array of the cumulative elemental counts in each bin
"""
return self._elem_counts_array
[docs] def ionic_counts(self, ion: Ion) -> ndarray:
"""
Get a single ionic counts grid
:param ion: The ion of the grid to return
"""
if ion not in self.all_ionic_counts.keys():
raise ValueError("Ion {} does not exist in the RangedGrid".format(ion.hill_formula))
return self.all_ionic_counts[ion]
[docs] def elemental_frac(self, element: Union[str, Element]) -> ndarray:
"""
Get a single elemental fraction grid
:param element: the elemental of the grid to return (Element or str)
"""
if isinstance(element, str):
el = None
for i, j in self.all_elemental_frac.items():
if i.symbol == element:
el = i
break
return self.all_elemental_frac[el]
elif isinstance(element, Element):
return self.all_elemental_frac[element]
else:
raise TypeError("Expected elemental symbol string or Element type, got {} instead".format(type(element)))
def _calculate(self):
"""
Compute the ranged grids
"""
dims = self.roi.dimensions
n_voxels = n.ceil(dims / self.bin_width).ravel().astype(int)
dx, dy, dz = self.roi.xyz_extents
range_elems = self.ranges.elements()
self._ion_counts = {i.ion: n.zeros(n_voxels) for i in self.ranges.ranges}
r = self.bin_width / 2
X, Y, Z = make_coordinate_grids(self.roi.xyz_extents, self.bin_width)
self._X = X
self._Y = Y
self._Z = Z
if not self.first_pass:
pass1_3sigma = 0
stddev = self.delocalization / 3
else:
pass1_3sigma = self.bin_width / 2
stddev = n.sqrt((self.delocalization / 3) ** 2 - n.tile(pass1_3sigma / 3, 3) ** 2)
stddev_vox = stddev / self.bin_width
init_counts = []
final_counts = []
def ranged_xyz(rng):
low, up = rng.interval
idx = n.argwhere((self.roi.mass >= low) & (self.roi.mass < up)).ravel()
init_counts.append(idx.shape[0])
return self.roi.xyz[idx].astype(float)
N = len(self.ranges)
nproc = min(N, mp.cpu_count())
if self.first_pass:
result = [ion_transfer(X, Y, Z, ranged_xyz(i), pass1_3sigma) for i in self.ranges]
else:
result = []
for i, rng in enumerate(self.ranges):
coords = ranged_xyz(rng)
counts, _ = n.histogramdd(coords, bins=n_voxels)
result.append(counts)
for i, data in zip(self.ranges, result):
final_counts.append(n.sum(data))
nan = n.count_nonzero(n.isnan(data))
if nan > 0:
raise ArithmeticError(
"NaNs encountered during first pass delocalization, try picking a different bin width"
)
self._ion_counts[i.ion] += gaussian_filter(
data,
sigma=stddev_vox,
# mode="constant",
truncate=self.gauss_trunc,
)
self._elem_frac = {i: 0 for i in range_elems}
self._elem_counts = {i: 0 for i in range_elems}
elem_counts = self._elem_counts
for ion, counts in self._ion_counts.items():
for elem, mult in ion.comp_dict.items():
elem_counts[elem] += mult * counts
self._elem_counts_array = sum(array for array in elem_counts.values())
norm = sum(i for i in elem_counts.values())
self._elem_cum_counts = norm
for key in elem_counts.keys():
ary = elem_counts[key]
self._elem_frac[key] = n.divide(ary, norm, where=ary > 0)
[docs]class DensityHistogram(AnalysisBase):
"""
Compute density histograms on an Roi
"""
def __init__(self, roi: Roi, bin_width=0.3, axis="z", multiplicity="all"):
"""
:param roi: region of interest
:param bin_width: width of the bin size in Daltons
:param axis: which axis the histogram should be computed on ("x", "y", or "z")
:param multiplicity: the multiplicity order to compute histogram with
"""
super().__init__(roi)
self.bin_width = validate.positive_nonzero_number(bin_width)
self._multiplicity = validate.multiplicity_any(multiplicity)
if multiplicity != "all":
roi.require_multihit_info()
self._histogram = None
self._histogram_extents = None
self._axis = validate.choice(axis, ("x", "y", "z"))
self._bin_vol = None
self._calculate_histogram()
@property
def multiplicity(self) -> Union[str, int]:
return self._multiplicity
@property
def bin_vol(self) -> Number:
return self._bin_vol
@property
def axis(self) -> str:
return self._axis
@property
def histogram(self) -> ndarray:
return self._histogram
@property
def histogram_extents(self) -> ndarray:
return self._histogram_extents
def _calculate_histogram(self):
orient_map = {"x": 0, "y": 1, "z": 2}
ax1, ax2 = (self.roi.xyz[:, val] for key, val in orient_map.items() if key != self.axis)
ext_ax1, ext_ax2 = (self.roi.xyz_extents[val] for key, val in orient_map.items() if key != self.axis)
ext = (ext_ax1, ext_ax2)
if self.multiplicity == "all":
self._histogram = histogram2d_binwidth(ax1, ax2, ext, self.bin_width)
else:
idx = get_mass_indices(self.roi.misc["ipp"], self.multiplicity)
self._histogram = histogram2d_binwidth(ax1[idx], ax2[idx], ext, self.bin_width)
self._histogram_extents = ext