# ============================================================================
# ============================================================================
# Copyright (c) 2022 Nghia T. Vo. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
# Author: Nghia T. Vo
# E-mail:
# Description: Python module of correlation-related methods.
# Publication date: 20th June 2022
# Contributors:
# ============================================================================
"""
Module of correlation-based methods for finding shifts between images or
stacks of images. The methods are designed to be flexible to:
- Run on multicore CPU or GPU.
- Use small/large RAM or small/large GPU memory.
- Work with small/large size of data.
- Find shifts locally or globally.
"""
import math
import warnings
import multiprocessing as mp
import numpy as np
from numba import jit, cuda
from joblib import Parallel, delayed
from scipy.signal import correlate
from algotom.rec.reconstruction import make_smoothing_window
[docs]def normalize_image(mat):
"""
Normalize an image.
Parameters
----------
mat : array_like
2D or 3D array.
Returns
-------
array_like
2D or 3D array. Normalized image.
"""
if len(mat.shape) == 3:
mat1 = (mat - np.mean(mat)) / np.std(mat)
mat1 = mat1 - np.min(mat1)
else:
rlist = np.sum(mat, axis=0)
clist = np.transpose([np.sum(mat, axis=1)])
mat1 = (mat / rlist) / clist
mat1 = (mat1 - np.mean(mat1)) / np.std(mat1)
return mat1
@jit(nopython=True, parallel=False, cache=True)
def _generate_correlation_map_2d_input_cpu(ref_mat, mat): # pragma: no cover
"""
CPU-function to generate a correlation map (Pearson coefficients) by
shifting the second image around the first image.
Parameters
----------
ref_mat : array_like
2D array. Reference image (size of height0 x width0).
mat : array_like
2D array. Image to be translated (size of height1 x width1). Its
size should be smaller than the reference image.
Returns
-------
array_like
2D array with the size of (height0-height1+1) x (width0-width1+1)
"""
(height0, width0) = ref_mat.shape
(height1, width1) = mat.shape
height2, width2 = height0 - height1 + 1, width0 - width1 + 1
mat1 = mat - np.mean(mat)
num = np.sum(mat1 * mat1)
coef_mat = np.zeros((height2, width2), dtype=np.float32)
for i in range(height2):
for j in range(width2):
row0, row1 = i, i + height1
col0, col1 = j, j + width1
ref_mat1 = ref_mat[row0:row1, col0:col1]
ref_mat1 = ref_mat1 - np.mean(ref_mat1)
ref_num = np.sum(ref_mat1 * ref_mat1)
num1 = math.sqrt(num * ref_num)
if num1 != 0.0:
coef_mat[i, j] = np.sum(mat1 * ref_mat1) / num1
return coef_mat
@jit(nopython=True, parallel=False, cache=True)
def _generate_correlation_map_3d_input_cpu(ref_mat, mat): # pragma: no cover
"""
CPU-function to generate a correlation map (Pearson coefficients) by
shifting the second 3d-image around the first 3d-image along the axes of
1 and 2.
Parameters
----------
ref_mat : array_like
3D array. Reference image (size of depth0 x height0 x width0).
mat : array_like
3D array. Image to be translated (size of depth0 x height1 x width1).
Its size (height1, width1) must be smaller than the reference image.
Returns
-------
array_like
2D array with the size of (height0-height1+1) x (width0-width1+1)
"""
(height0, width0) = ref_mat.shape[-2:]
(height1, width1) = mat.shape[-2:]
height2, width2 = height0 - height1 + 1, width0 - width1 + 1
coef_mat = np.zeros((height2, width2), dtype=np.float32)
mat1 = mat - np.mean(mat)
num = np.sum(mat1 * mat1)
for i in range(height2):
for j in range(width2):
row0, row1 = i, i + height1
col0, col1 = j, j + width1
ref_mat1 = ref_mat[:, row0:row1, col0:col1]
ref_mat1 = ref_mat1 - np.mean(ref_mat1)
ref_num = np.sum(ref_mat1 * ref_mat1)
num1 = math.sqrt(num * ref_num)
if num1 != 0.0:
coef_mat[i, j] = np.sum(mat1 * ref_mat1) / num1
return coef_mat
@cuda.jit(device=True)
def __mean_2d(mat): # pragma: no cover
"""
GPU-kernel function to calculate mean of a 2d-array.
"""
(height, width) = mat.shape
num = 0.0
for i in range(height):
for j in range(width):
num += mat[i, j]
return num / (height * width)
@cuda.jit(device=True)
def __mean_3d(mat): # pragma: no cover
"""
GPU-kernel function to calculate mean of a 3d-array.
"""
(depth, height, width) = mat.shape
num = 0.0
for i in range(depth):
for j in range(height):
for k in range(width):
num += mat[i, j, k]
return num / (depth * height * width)
@cuda.jit(device=True)
def __sum_square_2d(mat, mean): # pragma: no cover
"""
GPU-kernel function to calculate the sum of squares of a normed 2d-array.
Parameters
----------
mat : array_like
2D array.
mean : float
Mean of the array.
Returns
-------
float
"""
(height, width) = mat.shape
sum_sqr = 0.0
for i in range(height):
for j in range(width):
val = mat[i, j] - mean
sum_sqr += val * val
return sum_sqr
@cuda.jit(device=True)
def __sum_square_3d(mat, mean): # pragma: no cover
"""
GPU-kernel function to calculate the sum of squares of a normed 3d-array.
Parameters
----------
mat : array_like
3D array.
mean : float
Mean of the array.
Returns
-------
float
"""
(depth, height, width) = mat.shape
sum_sqr = 0.0
for i in range(depth):
for j in range(height):
for k in range(width):
val = mat[i, j, k] - mean
sum_sqr += val * val
return sum_sqr
@cuda.jit(device=True)
def __sum_multiply_2d(ref_mat, mat, ref_mean, mat_mean): # pragma: no cover
"""
GPU-kernel function to calculate the sum of multiplies of two normed
2d-arrays.
Parameters
----------
ref_mat : array_like
2D array. The first image.
mat : array_like
2D array. The second image.
ref_mean : float
Mean of the first image.
mat_mean : float
Mean of the second image.
Returns
-------
float
"""
(height, width) = ref_mat.shape
sum_mul = 0.0
for i in range(height):
for j in range(width):
val = (ref_mat[i, j] - ref_mean) * (mat[i, j] - mat_mean)
sum_mul += val
return sum_mul
@cuda.jit(device=True)
def __sum_multiply_3d(ref_mat, mat, ref_mean, mat_mean): # pragma: no cover
"""
GPU-kernel function to calculate the sum of multiplies of two normed
2d-arrays.
Parameters
----------
ref_mat : array_like
2D array. The first image.
mat : array_like
2D array. The second image.
ref_mean : float
Mean of the first image.
mat_mean : float
Mean of the second image.
Returns
-------
float
"""
(depth, height, width) = ref_mat.shape
sum_mul = 0.0
for i in range(depth):
for j in range(height):
for k in range(width):
val = (ref_mat[i, j, k] - ref_mean) * (mat[i, j, k] - mat_mean)
sum_mul += val
return sum_mul
@cuda.jit(device=True)
def __pearson_coefficient_2d(ref_mat, mat): # pragma: no cover
"""
Supplementary method. GPU-kernel function to calculate the Pearson
correlation coefficient between two images.
Parameters
----------
ref_mat : array_like
2D array. The first image.
mat : array_like
2D array. The second image.
Returns
-------
float
Correlation coefficient.
"""
mat_mean = __mean_2d(mat)
mat_sqr = __sum_square_2d(mat, mat_mean)
ref_mean = __mean_2d(ref_mat)
ref_sqr = __sum_square_2d(ref_mat, ref_mean)
sum_mul = __sum_multiply_2d(ref_mat, mat, ref_mean, mat_mean)
num = math.sqrt(ref_sqr * mat_sqr)
coef_mat = 0.0
if num != 0.0:
coef_mat = sum_mul / num
return coef_mat
@cuda.jit(device=True)
def __pearson_coefficient_3d(ref_mat, mat): # pragma: no cover
"""
Supplementary method. GPU-kernel function to calculate the Pearson
correlation coefficient between two 3d-images.
Parameters
----------
ref_mat : array_like
3D array. The first 3d-image.
mat : array_like
3D array. The second 3d-image.
Returns
-------
float
Correlation coefficient.
"""
mat_mean = __mean_3d(mat)
mat_sqr = __sum_square_3d(mat, mat_mean)
ref_mean = __mean_3d(ref_mat)
ref_sqr = __sum_square_3d(ref_mat, ref_mean)
sum_mul = __sum_multiply_3d(ref_mat, mat, ref_mean, mat_mean)
num = math.sqrt(ref_sqr * mat_sqr)
coef_mat = 0.0
if num != 0.0:
coef_mat = sum_mul / num
return coef_mat
@cuda.jit
def _generate_correlation_map_2d_input_gpu(coef_mat, ref_mat, mat, sum_sqr,
height1, width1, height2,
width2): # pragma: no cover
"""
GPU-CPU function to generate correlation map between two 2d-images.
Parameters
----------
coef_mat : array_like
2D array. Coefficient map which is initialized and passed from CPU.
In GPU global-memory.
ref_mat : array_like
2D array. Reference image, passed from CPU.
mat : array_like
2D array. The second image, passed from CPU.
sum_sqr : float
Sum of squares of values of the second image.
height1 : int
Height of the second image.
width1 : int
Width of the second image.
height2 : int
Height of the coefficient map.
width2 : int
Width of the coefficient map.
Returns
-------
Update of the coefficient map passed from CPU.
"""
(x_index, y_index) = cuda.grid(2)
if (y_index < height2) and (x_index < width2):
ref_mean = __mean_2d(
ref_mat[y_index:y_index + height1, x_index:x_index + width1])
num_sqr = 0.0
num_mul = 0.0
for i in range(y_index, y_index + height1):
for j in range(x_index, x_index + width1):
i1 = i - y_index
j1 = j - x_index
val = ref_mat[i, j] - ref_mean
num_sqr += val * val
num_mul += mat[i1, j1] * val
num = math.sqrt(num_sqr * sum_sqr)
if num != 0.0:
coef_mat[y_index, x_index] = num_mul / num
@cuda.jit
def _generate_correlation_map_3d_input_gpu(coef_mat, ref_mat, mat, sum_sqr,
depth, height1, width1, height2,
width2): # pragma: no cover
"""
GPU-CPU function to generate correlation map between two 3d-images.
Parameters
----------
coef_mat : array_like
3D array. Coefficient map which is initialized and passed from CPU.
In GPU global-memory.
ref_mat : array_like
3D array. Reference image, passed from CPU.
mat : array_like
3D array. The second image, passed from CPU.
sum_sqr : float
Sum of squares of values of the second image.
depth : int
Number of images, or the size of the 3d-arrays along axis 0.
height1 : int
Height of the second image.
width1 : int
Width of the second image.
height2 : int
Height of the coefficient map.
width2 : int
Width of the coefficient map.
Returns
-------
Update of the coefficient map passed from CPU.
"""
(x_index, y_index) = cuda.grid(2)
if (y_index < height2) and (x_index < width2):
ref_mean = __mean_3d(
ref_mat[:, y_index:y_index + height1, x_index:x_index + width1])
num_sqr = 0.0
num_mul = 0.0
for i in range(depth):
for j in range(y_index, y_index + height1):
for k in range(x_index, x_index + width1):
j1 = j - y_index
k1 = k - x_index
val = ref_mat[i, j, k] - ref_mean
num_sqr += val * val
num_mul += mat[i, j1, k1] * val
num = math.sqrt(num_sqr * sum_sqr)
if num != 0.0:
coef_mat[y_index, x_index] = num_mul / num
[docs]def generate_correlation_map(ref_mat, mat, gpu=False, block=(16, 16)):
"""
Generate the correlation map (Pearson coefficients) between two images by
shifting the second image over the reference image.
Parameters
----------
ref_mat : array_like
2D or 3D array. The reference image (e.g. with height0 x width0).
mat : array_like
2D or 3D array. The second image (e.g. with height1 x width1). If 3D,
the size of the first dimension (i.e. depth) must be the same as the
reference image.
gpu : bool, optional
Use GPU for computing if True.
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
Returns
-------
array_like
2D array with the size of (height0-height1+1) x (width0-width1+1).
"""
if gpu is True:
if cuda.is_available() is False:
warnings.warn("!!!No Nvidia GPU found!!!Run with CPU instead!!!")
gpu = False
dim = len(ref_mat.shape)
if dim != 2 and dim != 3:
raise ValueError("Inputs must be 2d or 3d arrays !!!")
(height0, width0) = ref_mat.shape[-2:]
(height1, width1) = mat.shape[-2:]
height2, width2 = height0 - height1 + 1, width0 - width1 + 1
if (height1 > height0) or (width1 > width0):
raise ValueError("Size of the image must be equal or smaller than the "
"reference image !!!")
if dim == 3:
if ref_mat.shape[0] != mat.shape[0]:
raise ValueError("Size of axis-0 must be the same !!!")
if gpu is True:
coef_mat = np.zeros((height2, width2), dtype=np.float32)
if not isinstance(block, tuple):
raise ValueError("Block must be a tuple of two integers !!!")
block = block[-2:]
size = max(height2, width2)
grid = (int(np.ceil(1.0 * size / block[0])),
int(np.ceil(1.0 * size / block[1])))
if dim == 2:
mat1 = mat - np.mean(mat)
sum_sqr = np.sum(mat1 * mat1)
ref_mat1 = np.float32(np.ascontiguousarray(ref_mat))
mat1 = np.float32(np.ascontiguousarray(mat1))
f_alias = _generate_correlation_map_2d_input_gpu
f_alias[grid, block](coef_mat, ref_mat1, mat1, np.float32(sum_sqr),
np.int32(height1), np.int32(width1),
np.int32(height2), np.int32(width2))
else:
mat1 = mat - np.mean(mat)
sum_sqr = np.sum(mat1 * mat1)
ref_mat1 = np.float32(np.ascontiguousarray(ref_mat))
mat1 = np.float32(np.ascontiguousarray(mat1))
f_alias = _generate_correlation_map_3d_input_gpu
f_alias[grid, block](coef_mat, ref_mat1, mat1, np.float32(sum_sqr),
np.int32(ref_mat1.shape[0]),
np.int32(height1),
np.int32(width1), np.int32(height2),
np.int32(width2))
else:
if dim == 2:
coef_mat = _generate_correlation_map_2d_input_cpu(ref_mat, mat)
else:
coef_mat = _generate_correlation_map_3d_input_cpu(ref_mat, mat)
return coef_mat
def _locate_1d_peak_subpixel(list_data, method="diff"):
"""
Locate the position of the maximum value of a 1d-array with sub-pixel
accuracy.
Parameters
----------
list_data : array_like
1D array.
method : {"diff", "poly_fit"}
Method for finding sub-pixel shift. Two options: a differential
method or a polynomial method.
Returns
-------
float
Sub-pixel position of the maximum value.
"""
size = len(list_data)
pos = np.argmax(list_data)
pos_b, pos_f, size1 = pos - 1, pos + 1, size - 1
if method == "poly_fit":
if size > 9:
warnings.warn("Large array can cause numerical error!!!")
afact, bfact = np.polyfit(np.arange(size), list_data, 2)[:2]
if afact != 0.0:
sub_pos = - bfact / (2 * afact)
if size1 > sub_pos > 0:
pos = sub_pos
else:
if size1 > pos > 0:
d1fx = (list_data[pos_f] - list_data[pos_b]) / 2.0
d2fx = (list_data[pos_f] + list_data[pos_b] - 2 * list_data[pos])
if d2fx != 0.0:
x_off = - d1fx / d2fx
pos = pos + x_off
return pos
def _locate_2d_peak_subpixel(mat, method="diff"):
"""
Locate the position of the maximum value of a 2d-array with sub-pixel
accuracy.
Parameters
----------
mat : array_like
2D array.
method : {"diff", "poly_fit"}
Method for finding sub-pixel shift. Two options: a differential
method (Ref. [1]) or a polynomial method (Ref. [2]).
Returns
-------
list of two floats
Sub-pixel position (x, y), i.e. (column, row), of the maximum value.
References
----------
[1] : https://doi.org/10.48550/arXiv.0712.4289
[2] : https://doi.org/10.1088/0957-0233/17/6/045
"""
(height, width) = mat.shape
(y0, x0) = np.unravel_index(np.argmax(mat, axis=None), mat.shape)
y_pos, x_pos = 1.0 * y0, 1.0 * x0
height1, width1 = height - 1, width - 1
y_check = False if (y0 == 0 or y0 == height1) else True
x_check = False if (x0 == 0 or x0 == width1) else True
if (method == "poly_fit") and (y_check or x_check):
size = min(height, width)
if size > 9:
warnings.warn("Large array can cause numerical error!!!")
xlist, ylist = np.arange(0.0, width), np.arange(0.0, height)
x_mat, y_mat = np.meshgrid(xlist, ylist)
x, y = np.ndarray.flatten(x_mat), np.ndarray.flatten(y_mat)
Amatrix = []
for i in range(len(x)):
Amatrix.append(
[1.0, x[i], y[i], x[i] * x[i], x[i] * y[i], y[i] * y[i]])
Amatrix = np.asarray(Amatrix)
Bmatrix = np.transpose(np.ndarray.flatten(mat))
coefs = np.linalg.lstsq(Amatrix, Bmatrix, rcond=1e-64)[0]
a0, a1, a2, a3, a4, a5 = coefs
num = a4 * a4 - 4 * a3 * a5
if num != 0:
x_pos1 = (2 * a1 * a5 - a2 * a4) / num
y_pos1 = (2 * a2 * a3 - a1 * a4) / num
if height1 > y_pos1 > 0:
y_pos = y_pos1
if width1 > x_pos1 > 0:
x_pos = x_pos1
else:
d1fx, d2fx, d1fy, d2fy, d1fxy = 0.0, 0.0, 0.0, 0.0, 0.0
if y_check and not x_check:
d1fy = (mat[y0 + 1, x0] - mat[y0 - 1, x0]) / 2.0
d2fy = (mat[y0 + 1, x0] + mat[y0 - 1, x0] - 2 * mat[y0, x0])
if not y_check and x_check:
d1fx = (mat[y0, x0 + 1] - mat[y0, x0 - 1]) / 2.0
d2fx = (mat[y0, x0 + 1] + mat[y0, x0 - 1] - 2 * mat[y0, x0])
if y_check and x_check:
d1fx = 0.5 * (mat[y0, x0 + 1] - mat[y0, x0 - 1])
d1fy = 0.5 * (mat[y0 + 1, x0] - mat[y0 - 1, x0])
d2fx = (mat[y0, x0 + 1] + mat[y0, x0 - 1] - 2 * mat[y0, x0])
d2fy = (mat[y0 + 1, x0] + mat[y0 - 1, x0] - 2 * mat[y0, x0])
d1fxy = 0.25 * (mat[y0 + 1, x0 + 1] + mat[y0 - 1, x0 - 1] - mat[
y0 + 1, x0 - 1] - mat[y0 - 1, x0 + 1])
num = d1fxy ** 2 - d2fx * d2fy
if num != 0:
x_off = (d2fy * d1fx - d1fxy * d1fy) / num
y_off = (d2fx * d1fy - d1fxy * d1fx) / num
if 1.0 > abs(x_off) > 0.0:
x_pos = x0 + x_off
if 1.0 > abs(y_off) > 0.0:
y_pos = y0 + y_off
return x_pos, y_pos
[docs]def locate_peak(mat, sub_pixel=True, method="diff", dim=2, size=3,
max_peak=True):
"""
Locate the position of the maximum value of a 2d-array with sub-pixel
accuracy.
Parameters
----------
mat : array_like
2D array.
sub_pixel : bool, optional
Enable sub-pixel location.
method : {"diff", "poly_fit"}
Method for finding sub-pixel shift. Two options: a differential
method (Ref. [1]) or a polynomial method (Ref. [2]).
dim : {1, 2}
Searching dimension for sub-pixel location.
size : int
Window size around the integer location of the maximum value used for
sub-pixel searching.
max_peak : bool, optional
Used to locate the minimum value if False.
Returns
-------
list of two floats
Sub-pixel position (x, y), i.e. (column, row), of the maximum value.
References
----------
[1] : https://doi.org/10.48550/arXiv.0712.4289
[2] : https://doi.org/10.1088/0957-0233/17/6/045
"""
(height, width) = mat.shape
size = 3 if method == "diff" else 2 * (size // 2) + 1
size = np.clip(size, 3, 9)
if not max_peak:
mat = np.max(mat) - mat
(y_pos, x_pos) = np.unravel_index(np.argmax(mat, axis=None), mat.shape)
if sub_pixel is True:
height1, width1 = height - 1, width - 1
radius = size // 2
if ((height1 > y_pos > 0) and (width1 > x_pos > 0)
and (width >= size) and (height >= size)):
row = np.clip(y_pos - radius, 0, height - size)
col = np.clip(x_pos - radius, 0, width - size)
if dim == 1:
x_vals = mat[y_pos, col:col + size]
y_vals = mat[row:row + size, x_pos]
x_pos1 = _locate_1d_peak_subpixel(x_vals, method=method)
y_pos1 = _locate_1d_peak_subpixel(y_vals, method=method)
x_pos = col + x_pos1
y_pos = row + y_pos1
else:
sub_mat = mat[row:row + size, col:col + size]
(x_pos1, y_pos1) = _locate_2d_peak_subpixel(sub_mat,
method=method)
x_pos = col + x_pos1
y_pos = row + y_pos1
if ((y_pos == 0 or y_pos == height1)
and (width1 > x_pos > 0) and (width >= size)):
col = np.clip(x_pos - radius, 0, width - size)
x_vals = mat[y_pos, col:col + size]
x_pos1 = _locate_1d_peak_subpixel(x_vals, method=method)
x_pos = col + x_pos1
if ((x_pos == 0 or x_pos == width1)
and (height1 > y_pos > 0) and (height >= size)):
row = np.clip(y_pos - radius, 0, height - size)
y_vals = mat[row:row + size, x_pos]
y_pos1 = _locate_1d_peak_subpixel(y_vals, method=method)
y_pos = row + y_pos1
return x_pos, y_pos
[docs]def find_shift_based_correlation_map(ref_mat, mat, margin=10, axis=None,
sub_pixel=True, method="diff", dim=2,
size=3, gpu=False, block=(16, 16)):
"""
Find the relative translations of the second image against the first image
using the correlation map generated by sliding the 2nd image over the 1st
one. If the inputs are 3d-arrays, the size of the first axis must be the
same.
Parameters
----------
ref_mat : array_like
2D or 3D array. Reference image.
mat : array_like
2D or 3D array. The second image. If 3D, the size of the first
dimension (i.e. depth) must be the same as the reference image.
margin : int, optional
If the second image and the first image are the same size, the second
image will be cropped with the margin amount from the edges before
sliding. Basically, this value defines the sliding range.
axis : {0, 1, None}
To select the axis for sliding. If the inputs are 3d-arrays, 0 and 1
corresponding to axis-1 and axis-2 of a 3d-array.
sub_pixel : bool, optional
Enable sub-pixel location.
method : {"diff", "poly_fit"}
Method for finding sub-pixel shift. Two options: a differential
method (Ref. [1]) or a polynomial method (Ref. [2]).
dim : {1, 2}
Searching dimension for sub-pixel location.
size : int
Window size around the integer location of the maximum value used for
sub-pixel searching.
gpu : bool, optional
Use GPU for computing if True.
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
Returns
-------
list of 2 floats
The shifts in x and y-direction of the second image referred to the
middle of the reference image.
References
----------
[1] : https://doi.org/10.48550/arXiv.0712.4289
[2] : https://doi.org/10.1088/0957-0233/17/6/045
"""
(height0, width0) = ref_mat.shape[-2:]
(height1, width1) = mat.shape[-2:]
check_3d = True if len(mat.shape) == 3 else False
check1, check2 = True, True
if (height0 == height1) and (width0 == width1):
if axis == 0:
if (height1 - 2 * margin) > 3:
if check_3d:
mat = mat[:, margin:-margin, :]
height1 = mat.shape[1]
else:
mat = mat[margin:-margin, :]
height1 = mat.shape[0]
else:
check1 = False
elif axis == 1:
if (width1 - 2 * margin) > 3:
if check_3d:
mat = mat[:, :, margin:-margin]
width1 = mat.shape[-1]
else:
mat = mat[:, margin:-margin]
width1 = mat.shape[-1]
else:
check1 = False
else:
if (height1 - 2 * margin) > 3 and (width1 - 2 * margin) > 3:
if check_3d:
mat = mat[:, margin:-margin, margin:-margin]
(height1, width1) = mat.shape[-2:]
else:
mat = mat[margin:-margin, margin:-margin]
(height1, width1) = mat.shape
else:
check1 = False
if check1 is True:
if axis == 0:
check2 = False if (height0 - 2 * margin < height1) else True
elif axis == 1:
check2 = False if (width0 - 2 * margin < width1) else True
else:
check2 = False if ((height0 - 2 * margin < height1) and (
width0 - 2 * margin < width1)) else True
if check2:
match_img = generate_correlation_map(ref_mat, mat, gpu, block)
(height2, width2) = match_img.shape
y_mid, x_mid = height2 // 2, width2 // 2
if axis is None:
(x_peak, y_peak) = locate_peak(match_img, sub_pixel=sub_pixel,
method=method, dim=dim, size=size)
else:
(x_peak, y_peak) = locate_peak(match_img, sub_pixel=sub_pixel,
method=method, dim=1, size=size)
x_peak = x_mid if axis == 0 else x_peak
y_peak = y_mid if axis == 1 else y_peak
y_shift, x_shift = y_peak - 1.0 * y_mid, x_peak - 1.0 * x_mid
else:
raise ValueError("The image shapes and the selected margin value "
"don't match !!!")
return x_shift, y_shift
def _get_1d_shift_single_row_2d_input(ref_mat, mat, win_size=7, margin=10,
sub_pixel=True, method="diff", size=3,
gpu=False, block=(16, 16), pad=True,
norm=True):
"""
To find local 1d-shifts of the second image against the reference image
where their shapes are the same. Each shifting value is associated with a
column-index of the second image. The value is determined by selecting a
slab of image-columns of the second image, defined by the 'win_size'
parameter, and sliding over a slightly larger area of the reference image,
defined by the 'win_size' and 'margin' parameters.
Parameters
----------
ref_mat : array_like
2D array. Reference image.
mat : array_like
2D array. The 2nd image. Must be the same size as the reference image.
win_size : int
To define the size of the slab of image-columns around the selected
column of the 2nd image.
margin : int
To define the size of the selected area of the reference image for
sliding, i.e. size = 2 * margin + win_size
sub_pixel : bool, optional
Enable sub-pixel location.
method : {"diff", "poly_fit"}
Method for finding 1d sub-pixel position. Two options: a
differential method or a polynomial method.
size : int
Window size around the integer location of the maximum value used for
sub-pixel searching.
gpu : bool, optional
Use GPU for computing if True.
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
pad : bool, optional
Padding the result to the same width-size as the original images.
norm : bool, optional
Normalize the input images if True.
Returns
-------
array_like
1D array. X-shifting values associated with columns of the 2nd image.
Starting index is at (margin + win_size // 2). Using the pad option
to make it easier to find which value corresponding to which column.
"""
(height, width) = ref_mat.shape
win_size = 2 * (win_size // 2) + 1
radi = win_size // 2
start = radi + margin
if ref_mat.shape != mat.shape:
raise ValueError("Shapes of the inputs are not the same !!!")
if width < (win_size + 2 * margin):
raise ValueError("Width {0} of the inputs is smaller than the "
"requested size (win_size + 2*margin) = "
"{1}".format(width, win_size + 2 * margin))
stop, start1, radi1 = width - start, start + 1, radi + 1
f_alias = find_shift_based_correlation_map
if norm:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
shifts = [f_alias(ref_mat[:, j - start:j + start1],
mat[:, j - radi:j + radi1], margin, 1, sub_pixel, method,
1, size, gpu, block)
for j in range(start, stop)]
shifting_line = np.asarray(shifts)[:, 0]
if pad:
shifting_line = np.pad(shifting_line, radi + margin, mode="constant")
return shifting_line
def _get_1d_shift_multi_rows_3d_input(ref_mat, mat, direction="x", win_size=7,
margin=10, sub_pixel=True, method="diff",
size=3, gpu=False, block=(16, 16),
pad=True, ncore=None, norm=True):
"""
To find local 1d-shifts of the second 3d-image against the reference
3d-image where their shapes are the same. Each shifting value is associated
with a row- or column-index of the image. The value is determined by
selecting a slab of columns or rows across the axis-0 (stack) of the
second image, defined by the 'win_size' parameter, and sliding over a
slightly larger area of the reference image, defined by the 'win_size' and
'margin' parameters.
Parameters
----------
ref_mat : array_like
3D array. Reference image.
mat : array_like
3D array. The 2nd image. Must be the same size as the reference image.
direction : {"x", "y"}
Select the direction for finding 1d-shift.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image.
margin : int
To define the size of the area of the reference image for
sliding, i.e. size = 2 * margin + win_size
sub_pixel : bool, optional
Enable sub-pixel location.
method : {"diff", "poly_fit"}
Method for finding 1d sub-pixel position. Two options: a differential
method or a polynomial method.
size : int
Window size around the integer location of the maximum value used for
sub-pixel searching.
gpu : bool, optional
Use GPU for computing if True.
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
pad : bool, optional
Padding the result to the same as the original images.
ncore : int or None
Number of cpu-cores used for computing. Automatically selected if None.
norm : bool, optional
Normalize the input images if True.
Returns
-------
array_like:
2D array. Shifting values associated with the pixel-positions of the
2nd image. Starting index is at (margin + win_size // 2). Using the pad
option to make it easier to find which value corresponding to which
pixel-position.
"""
if len(ref_mat.shape) != 3 or len(mat.shape) != 3:
raise ValueError("Inputs must be 3d arrays !!!")
if direction != "x" and direction != "y":
raise ValueError("Only two options: 'x' or 'y''")
(depth, height, width) = ref_mat.shape
if ncore is None:
ncore = mp.cpu_count() - 1
f_alias = _get_1d_shift_single_row_2d_input
if direction == "x":
if ncore == 1:
shifts = np.asarray(
[f_alias(ref_mat[:, i, :], mat[:, i, :], win_size, margin,
sub_pixel, method, size, gpu, block, pad, norm)
for i in range(height)])
else:
shifts = np.asarray(Parallel(n_jobs=ncore)(
delayed(f_alias)(ref_mat[:, i, :], mat[:, i, :], win_size,
margin, sub_pixel, method, size, False, block,
pad, norm) for i in range(height)))
else:
if ncore == 1:
shifts = np.asarray(
[f_alias(ref_mat[:, :, j], mat[:, :, j], win_size, margin,
sub_pixel, method, size, gpu, block, pad, norm)
for j in range(width)])
else:
shifts = np.asarray(Parallel(n_jobs=ncore)(
delayed(f_alias)(ref_mat[:, :, j], mat[:, :, j], win_size,
margin, sub_pixel, method, size, False, block,
pad, norm) for j in range(width)))
shifts = np.transpose(shifts)
return shifts
def _get_1d_shift_full_image_3d_input_cpu(ref_mat, mat, direction="x",
chunk_size=None, win_size=7,
margin=10, sub_pixel=True,
method="diff", size=3, ncore=None,
norm=True, norm_global=False):
"""
CPU-function to find local 1d-shifts of the second 3d-image against the
reference 3d-image where their shapes are the same and their size may be
too big for memory. Each shifting value is associated with a row- or
column-index of the image. The value is determined by selecting a slab of
columns or rows across the axis-0 (stack) of the second image, defined by
the 'win_size' parameter, and sliding over a slightly larger area of the
reference image, defined by the 'win_size' and 'margin' parameters.
Parameters
----------
ref_mat : array_like
3D array, can be a numpy array or hdf object. Reference image.
mat : array_like
3D array, can be a numpy array or hdf object. The 2nd image. Must be
the same size as the reference image.
direction : {"x", "y"}
Select the direction for finding 1d-shift.
chunk_size : int or None
Size of each chunk extracted along the axis-1 (height) of the 3d-image.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image.
margin : int
To define the size of the area of the reference image for
sliding, i.e. size = 2 * margin + win_size
sub_pixel : bool, optional
Enable sub-pixel location.
method : {"diff", "poly_fit"}
Method for finding 1d sub-pixel position. Two options: a
differential method or a polynomial method.
size : int
Window size around the integer location of the maximum value used for
sub-pixel searching.
ncore : int or None
Number of cpu-cores used for computing. Automatically selected if None.
norm : bool, optional
Normalize the input images if True.
norm_global : bool, optional
Normalize by using the full size of the inputs if True.
Returns
-------
array_like:
2D array. Shifting values along the x-direction or y-direction.
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 3:
raise ValueError("Inputs must be 3d-arrays !!!")
(depth, height, width) = ref_mat.shape
if chunk_size is None:
chunk_size = height + 1
else:
chunk_size = np.clip(chunk_size, 1, height)
num_chunk = np.clip(height // chunk_size + 1, 1, height)
f_alias = _get_1d_shift_multi_rows_3d_input
shifts = np.zeros((height, width), dtype=np.float32)
edge = margin + win_size // 2
if norm_global:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
if direction == "x":
list_index = np.array_split(np.arange(height), num_chunk)
for pos in list_index:
bindex, eindex = pos[0], pos[-1] + 1
ref_mat1 = ref_mat[:, bindex:eindex, :]
mat1 = mat[:, bindex:eindex, :]
shift_chunk = f_alias(ref_mat1, mat1, direction, win_size, margin,
sub_pixel, method, size, False, None,
True, ncore, norm)
shifts[bindex:eindex] = shift_chunk
else:
list_index = np.array_split(np.arange(edge, height - edge), num_chunk)
for pos in list_index:
bindex, eindex = pos[0], pos[-1] + 1
ref_mat1 = ref_mat[:, bindex - edge:eindex + edge, :]
mat1 = mat[:, bindex - edge:eindex + edge, :]
shift_chunk = f_alias(ref_mat1, mat1, direction, win_size, margin,
sub_pixel, method, size, False, None,
False, ncore, norm)
shifts[bindex:eindex] = shift_chunk
return shifts
def _get_2d_shift_full_image_2d_input(ref_mat, mat, win_size=7, margin=10,
sub_pixel=True, method="diff", size=3,
gpu=False, block=(16, 16), ncore=None,
norm=True):
"""
To find local (y,x)-shifts of the second image against the reference
image where their shapes are the same. Each (y,x)-shifting value is
associated with a pixel-location of the second image. The value is
determined by selecting a small area of the second image, defined by
the 'win_size' parameter, and sliding (y,x-direction) over a slightly
larger area of the reference image, defined by the 'win_size' and 'margin'
parameters.
Parameters
----------
ref_mat : array_like
2D array. Reference image.
mat : array_like
2D array. The 2nd image. Must be the same size as the reference image.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image.
margin : int
To define the size of the area of the reference image for sliding,
i.e. size = 2 * margin + win_size
sub_pixel : bool, optional
Enable sub-pixel location.
method : {"diff", "poly_fit"}
Method for finding sub-pixel shift. Two options: a differential
method (Ref. [1]) or a polynomial method (Ref. [2]).
size : int
Window size around the integer location of the maximum value used for
sub-pixel searching.
gpu : bool, optional
Use GPU for computing if True.
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
ncore : int or None
Number of cpu-cores used for computing. Automatically selected if None.
norm : bool, optional
Normalizing the inputs.
Returns
-------
list of two 2d-arrays
x-shift array and y-shift array. Zeros at the outer area of the size of
(margin + win_size // 2)
References
----------
[1] : https://doi.org/10.48550/arXiv.0712.4289
[2] : https://doi.org/10.1088/0957-0233/17/6/045
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 2:
raise ValueError("Inputs must be 2d-arrays !!!")
(height, width) = ref_mat.shape
win_size = 2 * (win_size // 2) + 1
radi = win_size // 2
start = radi + margin
als_size = win_size + 2 * margin
if width < als_size or height < als_size:
raise ValueError("Shapes of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin) = "
"{1}".format(ref_mat.shape, als_size))
stop_col, stop_row = width - start, height - start
start1, radi1 = start + 1, radi + 1
if norm:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
f_alias = find_shift_based_correlation_map
if (gpu is True) or (ncore == 1):
shifts = [
[f_alias(ref_mat[i - start:i + start1, j - start:j + start1],
mat[i - radi:i + radi1, j - radi:j + radi1],
margin, None, sub_pixel, method, 2, size, gpu, block)
for j in range(start, stop_col)]
for i in range(start, stop_row)]
shifts = np.asarray(shifts)
else:
if ncore is None:
ncore = np.clip(mp.cpu_count() - 1, 1, None)
shifts = np.asarray(Parallel(n_jobs=ncore)(
delayed(f_alias)(
ref_mat[i - start:i + start1, j - start:j + start1],
mat[i - radi:i + radi1, j - radi:j + radi1],
margin, None, sub_pixel, method, 2, size, False, block)
for i in range(start, stop_row) for j in range(start, stop_col)))
shifts = np.reshape(np.asarray(shifts),
(stop_row - start, stop_col - start, 2))
(x_shifts, y_shifts) = np.copy(np.moveaxis(shifts, 2, 0))
x_shifts = np.pad(x_shifts, radi + margin, mode="constant")
y_shifts = np.pad(y_shifts, radi + margin, mode="constant")
return x_shifts, y_shifts
def _get_2d_shift_multi_rows_3d_input(ref_mat, mat, win_size=7, margin=10,
sub_pixel=True, method="diff", size=3,
gpu=False, block=(16, 16), pad=True,
ncore=None, norm=False):
"""
To find local (y,x)-shifts of the second 3d-image against the reference
3d-image where their shapes are the same. Each (y,x)-shifting value is
associated with a pixel-location of the second image. The value is
determined by selecting a small volume of the second image, defined by
the 'win_size' parameter, and sliding (y,x only) over a slightly larger
volume of the reference image, defined by the 'win_size' and 'margin'
parameters. Note that this function is computational expensive.
Parameters
----------
ref_mat : array_like
3D array. Reference image.
mat : array_like
3D array. The 2nd image. Must be the same size as the reference image.
win_size : int
To define the size of the image volume around the selected pixel of the
2nd image.
margin : int
To define the size of the selected volume of the reference image for
sliding i.e. size = 2 * margin + win_size
sub_pixel : bool, optional
Enable sub-pixel location.
method : {"diff", "poly_fit"}
Method for finding sub-pixel shift. Two options: a differential
method (Ref. [1]) or a polynomial method (Ref. [2]).
size : int
Window size around the integer location of the maximum value used for
sub-pixel searching.
gpu : bool, optional
Use GPU for computing if True.
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
pad : bool, optional
Padding the result to the same as the original images.
ncore : int or None
Number of cpu-cores used for computing. Automatically selected if None.
norm : bool, optional
Normalizing the inputs.
Returns
-------
list of two 2d-arrays
x-shift array and y-shift array, corresponding to the pixel-locations
in the 2nd image where the starting location is at
(margin + win_size/2, margin + win_size/2). Using the pad option to
make it easier to find which value corresponding to which
pixel-location.
References
----------
[1] : https://doi.org/10.48550/arXiv.0712.4289
[2] : https://doi.org/10.1088/0957-0233/17/6/045
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 3:
raise ValueError("Inputs must be 3d-arrays !!!")
(height, width) = ref_mat.shape[-2:]
win_size = 2 * (win_size // 2) + 1
radi = win_size // 2
start = radi + margin
als_size = win_size + 2 * margin
if width < als_size or height < als_size:
raise ValueError("Shapes of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin) = "
"{1} x {1}".format((height, width), als_size))
if norm:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
stop_col, stop_row = width - start, height - start
start1, radi1 = start + 1, radi + 1
f_alias = find_shift_based_correlation_map
if (gpu is True) or (ncore == 1):
shifts = [
[f_alias(ref_mat[:, i - start:i + start1, j - start:j + start1],
mat[:, i - radi:i + radi1, j - radi:j + radi1],
margin, None, sub_pixel, method, 2, size, gpu, block)
for j in range(start, stop_col)]
for i in range(start, stop_row)]
shifts = np.asarray(shifts)
else:
if ncore is None:
ncore = np.clip(mp.cpu_count() - 1, 1, None)
shifts = np.asarray(
Parallel(n_jobs=ncore)(delayed(f_alias)(
ref_mat[:, i - start:i + start1, j - start:j + start1],
mat[:, i - radi:i + radi1, j - radi:j + radi1],
margin, None, sub_pixel, method, 2, size, False, None)
for i in range(start, stop_row)
for j in range(start, stop_col)))
shifts = np.reshape(np.asarray(shifts),
(stop_row - start, stop_col - start, 2))
(x_shifts, y_shifts) = np.copy(np.moveaxis(shifts, 2, 0))
if pad:
x_shifts = np.pad(x_shifts, radi + margin, mode="constant")
y_shifts = np.pad(y_shifts, radi + margin, mode="constant")
return x_shifts, y_shifts
def _get_2d_shift_full_image_3d_input_cpu(ref_mat, mat, chunk_size=None,
win_size=7, margin=10,
sub_pixel=True, method="diff",
size=3, ncore=None, norm=False,
norm_global=False):
"""
CPU function to find local (y,x)-shifts of the second 3d-image against the
reference 3d-image where their shapes are the same and their size may be
too big for memory. Each (y,x)-shifting value is associated with a
pixel-location of the second image. The value is determined by selecting a
small volume of the second image, defined by the 'win_size' parameter, and
sliding (y,x only) over a slightly larger volume of the reference image,
defined by the 'win_size' and 'margin' parameters. Note that this function
is computational expensive.
Parameters
----------
ref_mat : array_like
3D array, can be a numpy array or hdf object. Reference image.
mat : array_like
3D array, can be a numpy array or hdf object. The 2nd image. Must be
the same size as the reference image.
chunk_size : int or None
Size of each chunk extracted along the axis-1 (height) of the 3d-image.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image.
margin : int
To define the size of the area of the reference image for
sliding, i.e. size = 2 * margin + win_size
sub_pixel : bool, optional
Enable sub-pixel location.
method : {"diff", "poly_fit"}
Method for finding sub-pixel position. Two options: a differential
method or a polynomial method.
size : int
Window size around the integer location of the maximum value used for
sub-pixel searching.
ncore : int or None
Number of cpu-cores used for computing. Automatically selected if None.
norm : bool, optional
Normalize the input images if True.
norm_global : bool, optional
Normalize by using the full size of the inputs if True.
Returns
-------
list of two 2d-arrays
x-shift array and y-shift array. Zeros at the outer area of the size of
(margin + win_size // 2).
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 3:
raise ValueError("Inputs must be 3d-arrays !!!")
(height, width) = ref_mat.shape[-2:]
if chunk_size is None:
chunk_size = height + 1
else:
chunk_size = np.clip(chunk_size, 1, height)
num_chunk = np.clip(height // chunk_size + 1, 1, height)
edge = margin + win_size // 2
if height < (2 * edge + 1):
raise ValueError("Height of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin + 1) = "
"{1} x {1}".format(height, 2 * edge + 1))
list_index = np.array_split(np.arange(edge, height - edge), num_chunk)
x_shifts = np.zeros((height, width), dtype=np.float32)
y_shifts = np.zeros((height, width), dtype=np.float32)
f_alias = _get_2d_shift_multi_rows_3d_input
if norm_global:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
for pos in list_index:
bindex, eindex = pos[0], pos[-1] + 1
ref_mat1 = ref_mat[:, bindex - edge:eindex + edge, :]
mat1 = mat[:, bindex - edge:eindex + edge, :]
shift_chunk = f_alias(ref_mat1, mat1, win_size, margin, sub_pixel,
method, size, False, None, False, ncore, norm)
x_shifts[bindex:eindex, edge:-edge] = shift_chunk[0]
y_shifts[bindex:eindex, edge:-edge] = shift_chunk[1]
return x_shifts, y_shifts
@cuda.jit(device=True)
def __gen_1d_corr_map_kernel(ref_mat, mat, list_coef): # pragma: no cover
"""
GPU-kernel function to generate a list of local correlation-coefficients
between two images having the same height.
Parameters
----------
ref_mat : array_like
2D array. The first image.
mat : array_like
2D array. The second image.
list_coef : array_like
list of zeros, passed from GPU global memory.
Returns
-------
array_like
Calculated coefficients.
"""
(height0, width0) = ref_mat.shape
(height1, width1) = mat.shape
width2 = width0 - width1 + 1
mat_mean = __mean_2d(mat)
mat_sqr = __sum_square_2d(mat, mat_mean)
for j in range(width2):
ref_mat1 = ref_mat[:, j: j + width1]
ref_mean = __mean_2d(ref_mat1)
ref_sqr = __sum_square_2d(ref_mat1, ref_mean)
sum_mul = __sum_multiply_2d(ref_mat1, mat, ref_mean, mat_mean)
num = math.sqrt(ref_sqr * mat_sqr)
if num != 0.0:
list_coef[j] = sum_mul / num
return list_coef
@cuda.jit(device=True)
def __locate_1d_peak_kernel(list_data): # pragma: no cover
"""
GPU-kernel function to locate the position of the maximum value of a
1d-array with sub-pixel accuracy.
Parameters
----------
list_data : array_like
1D array.
Returns
-------
float
Sub-pixel position of the maximum value.
"""
num_point = len(list_data)
pos_max = 0
val_max = list_data[pos_max]
for i in range(1, num_point):
val = list_data[i]
if val > val_max:
val_max = val
pos_max = i
if (num_point - 1) > pos_max > 0:
val1 = list_data[pos_max - 1]
val2 = list_data[pos_max + 1]
d1fx = (val2 - val1) / 2.0
d2fx = (val2 + val1 - 2.0 * val_max)
if d2fx != 0:
x_off = - d1fx / d2fx
pos_max = pos_max + x_off
return pos_max
@cuda.jit
def _get_1d_shift_multi_rows_3d_input_kernel(shift_mat, ref_mat, mat,
list_coef, height, width, radi,
margin): # pragma: no cover
"""
GPU-CPU function to find local 1d-shifts of the second 3d-image against
the reference 3d-image where their shapes are the same.
Parameters
----------
shift_mat : array_like
2D array of zeros, initialized at CPU.
ref_mat : array_like
3D array. Reference image.
mat : array_like
3D array. The 2nd image. Must be the same size as the reference image.
list_coef : array_like
list of the list of zeros, initialized at CPU.
height : int
Height of the image.
width : int
Width of the image.
radi : int
Radius of the window to select a local area of the image.
margin : int
To define the size of the area of the reference image for
sliding.
Returns
-------
Update of the shifting image passed from CPU.
"""
(x_index, y_index) = cuda.grid(2)
if (y_index < height) and (x_index < width):
edge = radi + margin
pos = x_index + edge
ref_mat1 = ref_mat[y_index, :, pos - edge: pos + edge + 1]
mat1 = mat[y_index, :, pos - radi: pos + radi + 1]
list_coef1 = list_coef[y_index, x_index, :]
list_coef1 = __gen_1d_corr_map_kernel(ref_mat1, mat1, list_coef1)
pos_max = __locate_1d_peak_kernel(list_coef1)
shift_mat[y_index, x_index] = pos_max - margin
def _get_1d_shift_multi_rows_3d_input_gpu(ref_mat, mat, direction="x",
win_size=7, margin=10,
block=(16, 16), pad=True, norm=True):
"""
GPU function to find local 1d-shifts of the second 3d-image against the
reference 3d-image where their shapes are the same. Each shifting value is
associated with a row- or column-index of the image. The value is
determined by selecting a slab of columns or rows across the axis-0 (stack)
of the second image, defined by the 'win_size' parameter, and sliding over
a slightly larger area of the reference image, defined by the 'win_size'
and 'margin' parameters.
Parameters
----------
ref_mat : array_like
3D array. Reference image.
mat : array_like
3D array. The 2nd image. Must be the same size as the reference image.
direction : {"x", "y"}
Select the direction for finding 1d-shift.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image.
margin : int
To define the size of the area of the reference image for
sliding, i.e. size = 2 * margin + win_size
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
pad : bool, optional
Padding the result to the same as the original images.
norm : bool, optional
Normalize the inputs if True.
Returns
-------
array_like:
2D array. Shifting values associated with the pixel-positions of the
2nd image. Starting index is at (margin + win_size // 2). Using the pad
option to make it easier to find which value corresponding to which
pixel-position.
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) == 2:
ref_mat = np.expand_dims(ref_mat, axis=0)
mat = np.expand_dims(mat, axis=0)
if len(ref_mat.shape) != 3:
raise ValueError("Inputs must be 3d arrays !!!")
if direction != "x" and direction != "y":
raise ValueError("Only two options: 'x' or 'y''")
if direction != "x":
ref_mat = np.transpose(ref_mat, axes=(0, 2, 1))
mat = np.transpose(mat, axes=(0, 2, 1))
ref_mat = np.copy(np.moveaxis(ref_mat, 1, 0))
mat = np.copy(np.moveaxis(mat, 1, 0))
(height, depth, width) = ref_mat.shape
if norm:
ref_mat = np.asarray(
[normalize_image(ref_mat[i]) for i in range(height)])
mat = np.asarray([normalize_image(mat[i]) for i in range(height)])
win_size = 2 * (win_size // 2) + 1
radi = win_size // 2
edge = radi + margin
width1 = width - 2 * edge
if width1 < 1:
raise ValueError("Width {0} of the inputs is smaller than the "
"requested size (win_size + 2*margin) = "
"{1}".format(width, win_size + 2 * margin))
shifts = np.zeros((height, width1), dtype=np.float32)
grid = (int(np.ceil(1.0 * width1 / block[0])),
int(np.ceil(1.0 * height / block[1])))
f_alias = _get_1d_shift_multi_rows_3d_input_kernel
ref_mat1 = np.float32(np.ascontiguousarray(ref_mat))
mat1 = np.float32(np.ascontiguousarray(mat))
list_coef = np.zeros((height, width1, 2 * margin + 1), dtype=np.float32)
f_alias[grid, block](shifts, ref_mat1, mat1, list_coef, np.int32(height),
np.int32(width1), np.int32(radi), np.int32(margin))
if pad:
shifts = np.pad(shifts, ((0, 0), (edge, edge)), mode="constant")
if direction != "x":
shifts = np.transpose(shifts)
return shifts
def _get_1d_shift_full_image_3d_input_gpu(ref_mat, mat, direction="x",
chunk_size=None, win_size=7,
margin=10, block=(16, 16), norm=True,
norm_global=True):
"""
GPU function to find local 1d-shifts of the second 3d-image against the
reference 3d-image where their shapes are the same and their size may be
too big for memory. Each shifting value is associated with a row- or
column-index of the image. The value is determined by selecting a slab of
columns or rows across the axis-0 (stack) of the second image, defined by
the 'win_size' parameter, and sliding over a slightly larger area of the
reference image, defined by the 'win_size' and 'margin' parameters.
Parameters
----------
ref_mat : array_like
3D array, can be a numpy array or hdf object. Reference image.
mat : array_like
3D array, can be a numpy array or hdf object. The 2nd image. Must be
the same size as the reference image.
direction : {"x", "y"}
Select the direction for finding 1d-shift.
chunk_size : int or None
Size of each chunk extracted along the axis-1 (height) of the 3d-image.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image.
margin : int
To define the size of the area of the reference image for sliding,
i.e. size = 2 * margin + win_size.
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
norm : bool, optional
Normalize the input images if True.
norm_global : bool, optional
Normalize by using the full size of the inputs if True.
Returns
-------
array_like:
2D array. Shifting values along the x-direction or y-direction.
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 3:
raise ValueError("Inputs must be 3d-arrays !!!")
(depth, height, width) = ref_mat.shape
if chunk_size is None:
chunk_size = height + 1
else:
chunk_size = np.clip(chunk_size, 1, height)
num_chunk = np.clip(height // chunk_size + 1, 1, height)
f_alias = _get_1d_shift_multi_rows_3d_input_gpu
shifts = np.zeros((height, width), dtype=np.float32)
edge = margin + win_size // 2
if norm_global:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
if direction == "x":
list_index = np.array_split(np.arange(height), num_chunk)
for pos in list_index:
bindex, eindex = pos[0], pos[-1] + 1
ref_mat1 = ref_mat[:, bindex:eindex, :]
mat1 = mat[:, bindex:eindex, :]
shift_chunk = f_alias(ref_mat1, mat1, direction, win_size, margin,
block, True, norm)
shifts[bindex:eindex] = shift_chunk
else:
list_index = np.array_split(np.arange(edge, height - edge), num_chunk)
for pos in list_index:
bindex, eindex = pos[0], pos[-1] + 1
ref_mat1 = ref_mat[:, bindex - edge:eindex + edge, :]
mat1 = mat[:, bindex - edge:eindex + edge, :]
shift_chunk = f_alias(ref_mat1, mat1, direction, win_size, margin,
block, False, norm)
shifts[bindex:eindex] = shift_chunk
return shifts
@cuda.jit(device=True)
def __gen_2d_corr_map_2d_input(ref_mat, mat, coef_mat): # pragma: no cover
"""
GPU-kernel function to generate a correlation map between two images where
the second image is smaller than the first image.
Parameters
----------
ref_mat : array_like
2D array. The first image.
mat : array_like
2D array. The second image.
coef_mat : array_like
2D array of zeros, loaded from GPU global memory
Returns
-------
array_like
2D array. Calculated coefficient map.
"""
(height0, width0) = ref_mat.shape
(height1, width1) = mat.shape
height2, width2 = height0 - height1 + 1, width0 - width1 + 1
mat_mean = __mean_2d(mat)
mat_sqr = __sum_square_2d(mat, mat_mean)
# coef_mat is at GPU global memory with the size of (height2, width2)
for i in range(height2):
for j in range(width2):
row0, row1 = i, i + height1
col0, col1 = j, j + width1
ref_mat1 = ref_mat[row0:row1, col0:col1]
ref_mean = __mean_2d(ref_mat1)
ref_sqr = __sum_square_2d(ref_mat1, ref_mean)
sum_mul = __sum_multiply_2d(ref_mat1, mat, ref_mean, mat_mean)
num = math.sqrt(ref_sqr * mat_sqr)
if num != 0.0:
coef_mat[i, j] = sum_mul / num
return coef_mat
@cuda.jit(device=True)
def __gen_2d_corr_map_3d_input(ref_mat, mat, coef_mat): # pragma: no cover
"""
GPU-kernel function to generate a correlation map between two stacks of
images where the number of images are the same but the image size of the
second stack is smaller than the first one.
Parameters
----------
ref_mat : array_like
3D array. The first image.
mat : array_like
3D array. The second image.
coef_mat : array_like
2D array, loaded from GPU global memory.
Returns
-------
array_like
2D array. Calculated coefficient map.
"""
(depth, height0, width0) = ref_mat.shape
(_, height1, width1) = mat.shape
height2, width2 = height0 - height1 + 1, width0 - width1 + 1
mat_mean = __mean_3d(mat)
mat_sqr = __sum_square_3d(mat, mat_mean)
# coef_mat is at GPU global memory with the size of (height2, width2)
for i in range(height2):
for j in range(width2):
row0, row1 = i, i + height1
col0, col1 = j, j + width1
ref_mat1 = ref_mat[:, row0:row1, col0:col1]
ref_mean = __mean_3d(ref_mat1)
ref_sqr = __sum_square_3d(ref_mat1, ref_mean)
sum_mul = __sum_multiply_3d(ref_mat1, mat, ref_mean, mat_mean)
num = math.sqrt(ref_sqr * mat_sqr)
if num != 0.0:
coef_mat[i, j] = sum_mul / num
return coef_mat
@cuda.jit(device=True)
def __locate_max_value(mat): # pragma: no cover
"""
GPU-kernel function to find the indices of the maximum value of a 2D array.
Parameters
----------
mat : array_like
2D array.
Returns
-------
list of two floats.
(column-index, row-index)
"""
(height, width) = mat.shape
x_max = 0
y_max = 0
val_max = mat[y_max, x_max]
for i in range(height):
for j in range(width):
val = mat[i, j]
if val > val_max:
y_max = i
x_max = j
val_max = val
return x_max, y_max
@cuda.jit(device=True)
def __get_max_value(mat): # pragma: no cover
"""
GPU-kernel function to find the maximum value of a 2D array.
Parameters
----------
mat : array_like
2D array.
Returns
-------
list of two floats.
(column-index, row-index)
"""
(height, width) = mat.shape
val_max = mat[0, 0]
for i in range(height):
for j in range(width):
val = mat[i, j]
if val > val_max:
val_max = val
return val_max
@cuda.jit(device=True)
def __inverse_values(mat, val_max): # pragma: no cover
"""
GPU-kernel function to inverse values of a 2D array.
Parameters
----------
mat : array_like
2D array.
val_max : float
Maximum value of the array.
Returns
-------
array_like
"""
(height, width) = mat.shape
for i in range(height):
for j in range(width):
mat[i, j] = val_max - mat[i, j]
return mat
@cuda.jit(device=True)
def __locate_2d_peak_kernel(mat, x0, y0): # pragma: no cover
"""
GPU-kernel function to locate the position of the maximum value of a
2d-array with sub-pixel accuracy (Ref. [1]).
Parameters
----------
mat : array_like
2D array.
x0 : int
Column index of the maximum value.
y0 : int
Row index of the maximum value.
Returns
-------
list of two floats
Sub-pixel position (x, y), i.e. (column, row), of the maximum value.
References
----------
[1] : https://doi.org/10.48550/arXiv.0712.4289
"""
(height, width) = mat.shape
height1 = height - 1
width1 = width - 1
d1fx, d2fx, d1fy, d2fy, d1fxy = 0.0, 0.0, 0.0, 0.0, 0.0
x_pos = x0
y_pos = y0
if (x0 == 0 or x0 == width1) and (height1 > y0 > 0):
d1fy = (mat[y0 + 1, x0] - mat[y0 - 1, x0]) / 2.0
d2fy = (mat[y0 + 1, x0] + mat[y0 - 1, x0] - 2 * mat[y0, x0])
if (y0 == 0 or y0 == height1) and (width1 > x0 > 0):
d1fx = (mat[y0, x0 + 1] - mat[y0, x0 - 1]) / 2.0
d2fx = (mat[y0, x0 + 1] + mat[y0, x0 - 1] - 2 * mat[y0, x0])
if (height1 > y0 > 0) and (width1 > x0 > 0):
d1fx = 0.5 * (mat[y0, x0 + 1] - mat[y0, x0 - 1])
d1fy = 0.5 * (mat[y0 + 1, x0] - mat[y0 - 1, x0])
d2fx = (mat[y0, x0 + 1] + mat[y0, x0 - 1] - 2 * mat[y0, x0])
d2fy = (mat[y0 + 1, x0] + mat[y0 - 1, x0] - 2 * mat[y0, x0])
d1fxy = 0.25 * (mat[y0 + 1, x0 + 1] + mat[y0 - 1, x0 - 1] - mat[
y0 + 1, x0 - 1] - mat[y0 - 1, x0 + 1])
num = d1fxy ** 2 - d2fx * d2fy
if num != 0:
x_off = (d2fy * d1fx - d1fxy * d1fy) / num
y_off = (d2fx * d1fy - d1fxy * d1fx) / num
if 1.0 > abs(x_off) > 0.0:
x_pos = x0 + x_off
if 1.0 > abs(y_off) > 0.0:
y_pos = y0 + y_off
return x_pos, y_pos
@cuda.jit
def _get_2d_shift_multi_rows_2d_input_kernel(shifts, ref_mat, mat, coef_4d,
height, width, radi,
margin): # pragma: no cover
"""
GPU-CPU function to find local (y,x)-shifts of the second image against
the reference image.
Parameters
----------
shifts : array_like
Two of 2D arrays of zeros, initialized at CPU.
ref_mat : array_like
2D array. Reference image.
mat : array_like
2D array. The 2nd image. Must be the same size as the reference image.
coef_4d : array_like
4D array of zeros, initialized at CPU.
height : int
Height of the image.
width : int
Width of the image.
radi : int
Radius of the window to select a local area of the image.
margin : int
To define the size of the area of the reference image for sliding.
Returns
-------
Update of the (y,x)-shifting images passed from CPU.
"""
(x_index, y_index) = cuda.grid(2)
if (y_index < height) and (x_index < width):
radi_ref = radi + margin
j = x_index + radi_ref
i = y_index + radi_ref
ref_mat1 = ref_mat[i - radi_ref:i + radi_ref + 1,
j - radi_ref: j + radi_ref + 1]
mat1 = mat[i - radi:i + radi + 1, j - radi: j + radi + 1]
coef_mat1 = coef_4d[y_index, x_index, :, :]
coef_mat1 = __gen_2d_corr_map_2d_input(ref_mat1, mat1, coef_mat1)
x_max, y_max = __locate_max_value(coef_mat1)
x_pos, y_pos = __locate_2d_peak_kernel(coef_mat1, x_max, y_max)
shifts[0, y_index, x_index] = x_pos - margin
shifts[1, y_index, x_index] = y_pos - margin
def _get_2d_shift_multi_rows_2d_input_gpu(ref_mat, mat, win_size=7, margin=10,
block=(16, 16), pad=True,
norm=False):
"""
Using gpu to find local (y,x)-shifts of the second image against the
reference image where their shapes are the same. Each (y,x)-shifting value
is associated with a pixel-location of the second image. The value is
determined by selecting a small area of the second image, defined by the
'win_size' parameter, and sliding (y,x-direction) over a slightly larger
area of the reference image, defined by the 'win_size' and 'margin'
parameters.
Parameters
----------
ref_mat : array_like
2D array. Reference image.
mat : array_like
2D array. The 2nd image. Must be the same size as the reference image.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image.
margin : int
To define the size of the area of the reference image for sliding,
i.e. size = 2 * margin + win_size
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
pad : bool, optional
Pad the result to the same as the original images.
norm : bool, optional
Normalize the input images if True.
Returns
-------
list of two 2d-arrays
x-shift array and y-shift array. Zeros at the outer area of the size of
(margin + win_size // 2) if using the pad option.
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 2:
raise ValueError("Inputs must be 2d-arrays !!!")
(height, width) = ref_mat.shape
win_size = 2 * (win_size // 2) + 1
radi = win_size // 2
edge, size = radi + margin, 2 * margin + 1
height1, width1 = height - 2 * edge, width - 2 * edge
if width1 < 1 or height1 < 1:
raise ValueError("Shapes of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin) = "
"{1}".format(ref_mat.shape, edge))
if norm:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
shifts = np.zeros((2, height1, width1), dtype=np.float32)
grid = (int(np.ceil(1.0 * width1 / block[0])),
int(np.ceil(1.0 * height1 / block[1])))
f_alias = _get_2d_shift_multi_rows_2d_input_kernel
ref_mat1 = np.float32(np.ascontiguousarray(ref_mat))
mat1 = np.float32(np.ascontiguousarray(mat))
coef_4d = np.zeros((height1, width1, size, size), dtype=np.float32)
f_alias[grid, block](shifts, ref_mat1, mat1, coef_4d, np.int32(height1),
np.int32(width1), np.int32(radi), np.int32(margin))
if pad:
shifts = np.pad(shifts, ((0, 0), (edge, edge), (edge, edge)),
mode="constant")
return shifts[0], shifts[1]
def _get_2d_shift_full_image_2d_input_gpu(ref_mat, mat, chunk_size=None,
win_size=7, margin=10,
block=(16, 16), norm=False,
norm_global=False):
"""
GPU function to find local (y,x)-shifts of the second image against the
reference image where their shapes are the same and their size may be
too big for GPU memory. Each (y,x)-shifting value is associated with a
pixel-location of the second image. The value is determined by selecting
a small area of the second image, defined by the 'win_size' parameter, and
sliding (y,x-direction) over a slightly larger area of the reference image,
defined by the 'win_size' and 'margin' parameters.
Parameters
----------
ref_mat : array_like
2D array, can be a numpy array or hdf object. Reference image.
mat : array_like
2D array, can be a numpy array or hdf object. The 2nd image. Must be
the same size as the reference image.
chunk_size : int or None
Size of each chunk extracted along the height of the image.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image.
margin : int
To define the size of the area of the reference image for
sliding, i.e. size = 2 * margin + win_size
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
norm : bool, optional
Normalize the input images if True.
norm_global : bool, optional
Normalize by using the full size of the inputs if True.
Returns
-------
list of two 2d-arrays
x-shift array and y-shift array. Zeros at the outer area of the size of
(margin + win_size // 2).
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 2:
raise ValueError("Inputs must be 2d-arrays !!!")
(height, width) = ref_mat.shape
if chunk_size is None:
chunk_size = height + 1
else:
chunk_size = np.clip(chunk_size, 1, height)
num_chunk = np.clip(height // chunk_size + 1, 1, height)
edge = margin + win_size // 2
if height < (2 * edge + 1):
raise ValueError("Height of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin + 1) = "
"{1} x {1}".format(height, 2 * edge + 1))
list_index = np.array_split(np.arange(edge, height - edge), num_chunk)
x_shifts = np.zeros((height, width), dtype=np.float32)
y_shifts = np.zeros((height, width), dtype=np.float32)
f_alias = _get_2d_shift_multi_rows_2d_input_gpu
if norm_global:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
for pos in list_index:
bindex, eindex = pos[0], pos[-1] + 1
ref_mat1 = ref_mat[bindex - edge:eindex + edge, :]
mat1 = mat[bindex - edge:eindex + edge, :]
shift_chunk = f_alias(ref_mat1, mat1, win_size, margin, block,
False, norm)
x_shifts[bindex:eindex, edge:-edge] = shift_chunk[0]
y_shifts[bindex:eindex, edge:-edge] = shift_chunk[1]
return x_shifts, y_shifts
@cuda.jit
def _get_2d_shift_multi_rows_3d_input_kernel(shifts, ref_mat, mat, coef_4d,
height, width, radi,
margin): # pragma: no cover
"""
GPU-CPU function to find local (y,x)-shifts of the second 3d-image against
the reference 3d-image.
Parameters
----------
shifts : array_like
Two of 2D arrays of zeros, initialized at CPU.
ref_mat : array_like
3D array. Reference image.
mat : array_like
3D array. The 2nd image. Must be the same size as the reference image.
coef_4d : array_like
4D array of zeros, initialized at CPU.
height : int
Height of the image.
width : int
Width of the image.
radi : int
Radius of the window to select a local area of the image.
margin : int
To define the size of the area of the reference image for sliding.
Returns
-------
Update of the (y,x)-shifting images passed from CPU.
"""
(x_index, y_index) = cuda.grid(2)
if (y_index < height) and (x_index < width):
radi_ref = margin + radi
j = x_index + radi_ref
i = y_index + radi_ref
ref_mat1 = ref_mat[:, i - radi_ref:i + radi_ref + 1,
j - radi_ref: j + radi_ref + 1]
mat1 = mat[:, i - radi:i + radi + 1, j - radi: j + radi + 1]
coef_mat1 = coef_4d[y_index, x_index, :, :]
coef_mat1 = __gen_2d_corr_map_3d_input(ref_mat1, mat1, coef_mat1)
x_max, y_max = __locate_max_value(coef_mat1)
x_pos, y_pos = __locate_2d_peak_kernel(coef_mat1, x_max, y_max)
shifts[0, y_index, x_index] = x_pos - margin
shifts[1, y_index, x_index] = y_pos - margin
def _get_2d_shift_multi_rows_3d_input_gpu(ref_mat, mat, win_size=7, margin=10,
block=(16, 16), pad=True,
norm=False):
"""
GPU function to find local (y,x)-shifts of the second 3d-image against the
reference 3d-image where their shapes are the same. Each (y,x)-shifting
value is associated with a pixel-location of the second image. The value is
determined by selecting a small volume of the second image, defined by
the 'win_size' parameter, and sliding (y,x only) over a slightly larger
volume of the reference image, defined by the 'win_size' and 'margin'
parameters. This function may be computational expensive.
Parameters
----------
ref_mat : array_like
3D array. Reference image.
mat : array_like
3D array. The 2nd image. Must be the same size as the reference image.
win_size : int
To define the size of the image volume around the selected pixel of the
2nd image.
margin : int
To define the size of the selected volume of the reference image for
sliding i.e. size = 2 * margin + win_size
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
pad : bool, optional
Padding the result to the same as the original images.
norm : bool, optional
Normalizing the inputs.
Returns
-------
list of two 2d-arrays
x-shift array and y-shift array, corresponding to the pixel-locations
in the 2nd image where the starting location is at
(margin + win_size/2, margin + win_size/2). Using the pad option to
make it easier to find which value corresponding to
which pixel-location.
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 3:
raise ValueError("Inputs must be 3d-arrays !!!")
(height, width) = ref_mat.shape[-2:]
win_size = 2 * (win_size // 2) + 1
radi = win_size // 2
edge, size = radi + margin, 2 * margin + 1
height1, width1 = height - 2 * edge, width - 2 * edge
if width1 < 1 or height1 < 1:
if width1 < 1 or height1 < 1:
raise ValueError("Shapes of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin) = "
"{1}".format(ref_mat.shape, edge))
if norm:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
shifts = np.zeros((2, height1, width1), dtype=np.float32)
grid = (int(np.ceil(1.0 * width1 / block[0])),
int(np.ceil(1.0 * height1 / block[1])))
f_alias = _get_2d_shift_multi_rows_3d_input_kernel
ref_mat1 = np.float32(np.ascontiguousarray(ref_mat))
mat1 = np.float32(np.ascontiguousarray(mat))
coef_4d = np.zeros((height1, width1, size, size), dtype=np.float32)
f_alias[grid, block](shifts, ref_mat1, mat1, coef_4d, np.int32(height1),
np.int32(width1), np.int32(radi), np.int32(margin))
if pad:
shifts = np.pad(shifts, ((0, 0), (edge, edge), (edge, edge)),
mode="constant")
return shifts[0], shifts[1]
def _get_2d_shift_full_image_3d_input_gpu(ref_mat, mat, chunk_size=None,
win_size=7, margin=10,
block=(16, 16), norm=False,
norm_global=False):
"""
GPU function to find local (y,x)-shifts of the second 3d-image against the
reference 3d-image where their shapes are the same and their size may be
too big for memory. Each (y,x)-shifting value is associated with a
pixel-location of the second image. The value is determined by selecting a
small volume of the second image, defined by the 'win_size' parameter, and
sliding (y,x only) over a slightly larger volume of the reference image,
defined by the 'win_size' and 'margin' parameters. This function may be
computational expensive.
Parameters
----------
ref_mat : array_like
3D array, can be a numpy array or hdf object. Reference image.
mat : array_like
3D array, can be a numpy array or hdf object. The 2nd image. Must be
the same size as the reference image.
chunk_size : int or None
Size of each chunk extracted along the height of the image.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image.
margin : int
To define the size of the area of the reference image for
sliding, i.e. size = 2 * margin + win_size
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
norm : bool, optional
Normalize the input images if True.
norm_global : bool, optional
Normalize by using the full size of the inputs if True.
Returns
-------
list of two 2d-arrays
x-shift array and y-shift array. Zeros at the outer area of the size of
(margin + win_size // 2).
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 3:
raise ValueError("Inputs must be 3d-arrays !!!")
(height, width) = ref_mat.shape[-2:]
if chunk_size is None:
chunk_size = height + 1
else:
chunk_size = np.clip(chunk_size, 1, height)
num_chunk = np.clip(height // chunk_size + 1, 1, height)
edge = margin + win_size // 2
if height < (2 * edge + 1):
raise ValueError("Height of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin + 1) = "
"{1} x {1}".format(height, 2 * edge + 1))
list_index = np.array_split(np.arange(edge, height - edge), num_chunk)
x_shifts = np.zeros((height, width), dtype=np.float32)
y_shifts = np.zeros((height, width), dtype=np.float32)
f_alias = _get_2d_shift_multi_rows_3d_input_gpu
if norm_global:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
norm = False
for pos in list_index:
bindex, eindex = pos[0], pos[-1] + 1
ref_mat1 = ref_mat[:, bindex - edge:eindex + edge, :]
mat1 = mat[:, bindex - edge:eindex + edge, :]
shift_chunk = f_alias(ref_mat1, mat1, win_size, margin, block,
False, norm)
x_shifts[bindex:eindex, edge:-edge] = shift_chunk[0]
y_shifts[bindex:eindex, edge:-edge] = shift_chunk[1]
return x_shifts, y_shifts
@cuda.jit
def _generate_4d_correlation_map_3d_input_kernel(coef_4d, ref_mat, mat, height,
width, radi,
margin): # pragma: no cover
"""
GPU-CPU function to calculate 2D correlation maps for each pixel location
in height and width -> 4D array.
Parameters
----------
coef_4d : array_like
4D array of zeros, initialized at CPU.
ref_mat : array_like
3D array. Reference image.
mat : array_like
3D array. The 2nd image. Must be the same size as the reference image.
height : int
Height of the image.
width : int
Width of the image.
radi : int
Radius of the window to select a local area of the image.
margin : int
To define the size of the area of the reference image for sliding.
Returns
-------
Updated of 4D correlation map.
"""
(x_index, y_index) = cuda.grid(2)
if (y_index < height) and (x_index < width):
radi_ref = margin + radi
size = 2 * margin + 1
j = x_index + radi_ref
i = y_index + radi_ref
ref_mat1 = ref_mat[:, i - radi_ref:i + radi_ref + 1,
j - radi_ref: j + radi_ref + 1]
mat1 = mat[:, i - radi:i + radi + 1, j - radi: j + radi + 1]
coef_mat1 = coef_4d[y_index, x_index, :, :]
coef_mat1 = __gen_2d_corr_map_3d_input(ref_mat1, mat1, coef_mat1)
for i in range(size):
for j in range(size):
coef_4d[y_index, x_index, i, j] = coef_mat1[i, j]
def _get_2d_shift_multi_rows_3d_input_cpu_gpu(ref_mat, mat, win_size=7,
margin=10, method="diff", size=3,
block=(16, 16), pad=True,
ncore=None, norm=False):
"""
Combine GPU and CPU to find local (y,x)-shifts of the second 3d-image
against the reference 3d-image where their shapes are the same.
Each (y,x)-shifting value is associated with a pixel-location of the second
image. The value is determined by selecting a small volume of the second
image, defined by the 'win_size' parameter, and sliding (y,x only) over a
slightly larger volume of the reference image, defined by the 'win_size'
and 'margin' parameters. This function may be computational expensive.
Parameters
----------
ref_mat : array_like
3D array. Reference image.
mat : array_like
3D array. The 2nd image. Must be the same size as the reference image.
win_size : int
To define the size of the image volume around the selected pixel of the
2nd image.
margin : int
To define the size of the selected volume of the reference image for
sliding i.e. size = 2 * margin + win_size
method : {"diff", "poly_fit"}
Method for finding sub-pixel shift. Two options: a differential
method (Ref. [1]) or a polynomial method (Ref. [2]).
size : int
Window size around the integer location of the maximum value used for
sub-pixel searching.
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
pad : bool, optional
Padding the result to the same as the original images.
ncore : int or None
Number of cpu-cores used for computing. Automatically selected if None.
norm : bool, optional
Normalizing the inputs.
Returns
-------
list of two 2d-arrays
x-shift array and y-shift array, corresponding to the pixel-locations
in the 2nd image where the starting location is at
(margin + win_size/2, margin + win_size/2). Using the pad option to
make it easier to find which value corresponding to
which pixel-location.
References
----------
[1] : https://doi.org/10.48550/arXiv.0712.4289
[2] : https://doi.org/10.1088/0957-0233/17/6/045
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 3:
raise ValueError("Inputs must be 3d-arrays !!!")
(height, width) = ref_mat.shape[-2:]
win_size = 2 * (win_size // 2) + 1
radi = win_size // 2
edge, size_mag = radi + margin, 2 * margin + 1
height1, width1 = height - 2 * edge, width - 2 * edge
if width1 < 1 or height1 < 1:
if width1 < 1 or height1 < 1:
raise ValueError("Shapes of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin) = "
"{1}".format(ref_mat.shape, edge))
if norm:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
grid = (int(np.ceil(1.0 * width1 / block[0])),
int(np.ceil(1.0 * height1 / block[1])))
f_alias = _generate_4d_correlation_map_3d_input_kernel
ref_mat1 = np.float32(np.ascontiguousarray(ref_mat))
mat1 = np.float32(np.ascontiguousarray(mat))
coef_4d = np.zeros((height1, width1, size_mag, size_mag), dtype=np.float32)
f_alias[grid, block](coef_4d, ref_mat1, mat1, np.int32(height1),
np.int32(width1), np.int32(radi), np.int32(margin))
if ncore is None:
ncore = mp.cpu_count() - 1
shifts = np.asarray(Parallel(n_jobs=ncore)(
delayed(locate_peak)(coef_4d[i, j, :, :], True, method, 2, size)
for i in range(height1) for j in range(width1)))
shifts = np.moveaxis(np.reshape(
np.asarray(shifts), (height1, width1, 2)), 2, 0) - margin
if pad:
shifts = np.pad(shifts, ((0, 0), (edge, edge), (edge, edge)),
mode="constant")
return shifts[0], shifts[1]
def _get_2d_shift_full_image_3d_input_cpu_gpu(ref_mat, mat, chunk_size=None,
win_size=7, margin=10,
method="diff", size=3,
block=(16, 16), ncore=None,
norm=False, norm_global=False):
"""
Combine GPU and CPU to find local (y,x)-shifts of the second 3d-image
against the reference 3d-image where their shapes are the same and their
size may be too big for memory. Each (y,x)-shifting value is
associated with a pixel-location of the second image. The value is
determined by selecting a small volume of the second image, defined by
the 'win_size' parameter, and sliding (y,x only) over a slightly larger
volume of the reference image, defined by the 'win_size' and 'margin'
parameters. This function may be computational expensive.
Parameters
----------
ref_mat : array_like
3D array, can be a numpy array or hdf object. Reference image.
mat : array_like
3D array, can be a numpy array or hdf object. The 2nd image. Must be
the same size as the reference image.
chunk_size : int or None
Size of each chunk extracted along the height of the image.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image.
margin : int
To define the size of the area of the reference image for
sliding, i.e. size = 2 * margin + win_size
method : {"diff", "poly_fit"}
Method for finding sub-pixel shift. Two options: a differential
method (Ref. [1]) or a polynomial method (Ref. [2]).
size : int
Window size around the integer location of the maximum value used for
sub-pixel searching.
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (4,4), (8, 8), ...
ncore : int or None
Number of cpu-cores used for computing. Automatically selected if None.
norm : bool, optional
Normalizing the inputs if True.
norm_global : bool, optional
Normalize by using the full size of the inputs if True.
Returns
-------
list of two 2d-arrays
x-shift array and y-shift array. Zeros at the outer area of the size of
(margin + win_size // 2).
References
----------
[1] : https://doi.org/10.48550/arXiv.0712.4289
[2] : https://doi.org/10.1088/0957-0233/17/6/045
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 3:
raise ValueError("Inputs must be 3d-arrays !!!")
(height, width) = ref_mat.shape[-2:]
if chunk_size is None:
chunk_size = height + 1
else:
chunk_size = np.clip(chunk_size, 1, height)
num_chunk = np.clip(height // chunk_size + 1, 1, height)
edge = margin + win_size // 2
if height < (2 * edge + 1):
raise ValueError("Height of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin + 1) = "
"{1} x {1}".format(height, 2 * edge + 1))
list_index = np.array_split(np.arange(edge, height - edge), num_chunk)
x_shifts = np.zeros((height, width), dtype=np.float32)
y_shifts = np.zeros((height, width), dtype=np.float32)
f_alias = _get_2d_shift_multi_rows_3d_input_cpu_gpu
if norm_global:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
norm = False
for pos in list_index:
bindex, eindex = pos[0], pos[-1] + 1
ref_mat1 = ref_mat[:, bindex - edge:eindex + edge, :]
mat1 = mat[:, bindex - edge:eindex + edge, :]
shift_chunk = f_alias(ref_mat1, mat1, win_size, margin, method, size,
block, False, ncore, norm)
x_shifts[bindex:eindex, edge:-edge] = shift_chunk[0]
y_shifts[bindex:eindex, edge:-edge] = shift_chunk[1]
return x_shifts, y_shifts
[docs]def find_local_shifts(ref_mat, mat, dim=1, win_size=7, margin=10,
method="diff", size=3, gpu=False, block=(16, 16),
ncore=None, norm=True, norm_global=False,
chunk_size=100):
"""
To find local shifts (in x and y direction) between two images by selecting
a small area/volume of the second image and sliding over a slightly larger
area/volume of the reference image.
Parameters
----------
ref_mat : array_like
2D/3D array, can be a numpy array or hdf object. Reference image.
mat : array_like
2D/3D array, can be a numpy array or hdf object. The second image, must
be the same size as the reference image.
dim : {1, 2}
To find the shifts (in x and y) separately or together.
win_size : int
Size of local areas in the second image.
margin : int
To define the sliding range of the second image.
method : {"diff", "poly_fit"}
Method for finding sub-pixel shift. Two options: a differential
method (Ref. [1]) or a polynomial method (Ref. [2]). The "poly_fit"
option is not available if using GPU.
size : int
Window size around the integer location of the maximum value used for
sub-pixel location. Adjustable if using the polynomial method.
gpu : {False, True, "hybrid"}
Use GPU for computing if True or in "hybrid" mode.
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (8, 8), (16, 16), (32, 32), ...
ncore : int or None
Number of cpu-cores used for computing. Automatically selected if None.
norm : bool, optional
Normalizing the inputs if True.
norm_global : bool, optional
Normalize by using the full size of the inputs if True.
chunk_size : int or None
Size of each chunk extracted along the height of the image.
Returns
-------
list of two 2d-arrays
x-shift array and y-shift array. Zeros at the outer area of the size of
(margin + win_size // 2).
References
----------
[1] : https://doi.org/10.48550/arXiv.0712.4289
[2] : https://doi.org/10.1088/0957-0233/17/6/045
"""
if gpu is True:
if cuda.is_available() is False:
warnings.warn("!!!No Nvidia GPU found!!!Run with CPU instead!!!")
gpu = False
if len(ref_mat.shape) == 2:
if gpu is False:
f_alias = _get_2d_shift_full_image_2d_input
(x_shifts, y_shifts) = f_alias(ref_mat, mat, win_size=win_size,
margin=margin, sub_pixel=True,
method=method, size=size, gpu=False,
ncore=ncore, norm=norm)
else:
f_alias = _get_2d_shift_full_image_2d_input_gpu
(x_shifts, y_shifts) = f_alias(ref_mat, mat, chunk_size=chunk_size,
win_size=win_size, margin=margin,
block=block, norm=norm)
else:
if gpu is False:
if dim == 1:
f_alias = _get_1d_shift_full_image_3d_input_cpu
x_shifts = f_alias(ref_mat, mat, direction="x",
chunk_size=chunk_size, win_size=win_size,
margin=margin, sub_pixel=True,
method=method, size=size, ncore=ncore,
norm=norm, norm_global=norm_global)
y_shifts = f_alias(ref_mat, mat, direction="y",
chunk_size=chunk_size, win_size=win_size,
margin=margin, sub_pixel=True,
method=method, size=size, ncore=ncore,
norm=norm, norm_global=norm_global)
else:
f_alias = _get_2d_shift_full_image_3d_input_cpu
(x_shifts, y_shifts) = f_alias(ref_mat, mat,
chunk_size=chunk_size,
win_size=win_size,
margin=margin,
sub_pixel=True, method=method,
size=size, ncore=ncore,
norm=norm,
norm_global=norm_global)
elif gpu == "Hybrid" or gpu == "hybrid":
f_alias = _get_2d_shift_full_image_3d_input_cpu_gpu
(x_shifts, y_shifts) = f_alias(ref_mat, mat, chunk_size=chunk_size,
win_size=win_size, margin=margin,
method=method, size=size,
block=block, ncore=ncore,
norm=norm, norm_global=norm_global)
else:
if dim == 1:
f_alias = _get_1d_shift_full_image_3d_input_gpu
x_shifts = f_alias(ref_mat, mat, direction="x",
chunk_size=chunk_size, win_size=win_size,
margin=margin, block=block, norm=norm,
norm_global=norm_global)
y_shifts = f_alias(ref_mat, mat, direction="y",
chunk_size=chunk_size, win_size=win_size,
margin=margin, block=block, norm=norm,
norm_global=norm_global)
else:
f_alias = _get_2d_shift_full_image_3d_input_gpu
(x_shifts, y_shifts) = f_alias(ref_mat, mat,
chunk_size=chunk_size,
win_size=win_size,
margin=margin,
block=block, norm=norm,
norm_global=norm_global)
return np.float32(x_shifts), np.float32(y_shifts)
def __get_input_list(list_ij, height, width, gap):
"""
Supplementary method for methods of finding global_shift based on
local_shifts.
"""
input_format = "Please use the format of [i, j] for a single " \
"point or [list_i, list_j] for multiple points"
box = [[gap, height - gap], [gap, width - gap]]
msg = "The given list of points are not inside the box: {}".format(box)
if isinstance(list_ij, list) or isinstance(list_ij, tuple) or \
isinstance(list_ij, np.ndarray):
if len(list_ij) != 2:
raise ValueError(input_format)
else:
list_i = np.int32(np.asarray(list_ij[0]))
list_j = np.int32(np.asarray(list_ij[1]))
if isinstance(list_i, np.integer) and \
isinstance(list_j, np.integer):
if ((height - gap) > list_i > gap) and \
((width - gap) > list_j > gap):
list_i = [list_i]
list_j = [list_j]
else:
raise ValueError(msg)
else:
if isinstance(list_i, np.integer) or \
isinstance(list_j, np.integer):
raise ValueError(input_format)
else:
if len(list_i) != len(list_j):
raise ValueError("Size of each list of indices is not "
"the same")
else:
i_tmp, j_tmp = [], []
for k in range(len(list_i)):
if ((height - gap) > list_i[k] > gap) and \
((width - gap) > list_j[k] > gap):
i_tmp.append(list_i[k])
j_tmp.append(list_j[k])
if len(i_tmp) == 0:
raise ValueError(msg)
else:
list_i, list_j = i_tmp, j_tmp
else:
raise ValueError(input_format)
return list_i, list_j
def _find_global_shift_based_local_shifts_cpu(ref_mat, mat, win_size, margin,
list_ij=None, num_point=None,
global_value="mixed",
sub_pixel=True, method="diff",
size=3, ncore=None, norm=False,
return_list=False):
"""
CPU function to find global shift between two images based on finding
local shifts.
Parameters
----------
ref_mat : array_like
2D array. Reference image.
mat : array_like
2D array. The 2nd image. Must be the same size as the reference image.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image. E.g. 41, 61, ..
margin : int
To define the searching range (in pixel) for finding shift.
E.g. 20, 30,...
list_ij : list of lists of int or None
List of indices of points used for local search. Accept the value of
[i_index, j_index] for a single point or
[[i_index0, i_index1,...], [j_index0, j_index1,...]]
for multiple points. Automatically generated if None.
num_point : int or None
Number of points used for local search if list_ij is None.
global_value : {"median", "mean", "mixed"}
Method for calculating the global value from local values.
sub_pixel : bool, optional
Enable sub-pixel location.
method : {"diff", "poly_fit"}
Method for finding 1d sub-pixel position. Two options: a differential
method (Ref. [1]) or a polynomial method (Ref. [2]).
size : int, optional
Window size around the integer location of the maximum value used for
sub-pixel searching.
ncore : int or None
Number of cpu-cores used for computing. Automatically selected if None.
norm : bool, optional
Normalize the input images if True.
return_list : bool
Return all local values if True.
Returns
-------
float or list of float
Shift in x-direction. Return a list of float if return_list is True.
float or list of float
Shift in y-direction. Return a list of float if return_list is True.
References
----------
[1] : https://doi.org/10.48550/arXiv.0712.4289
[2] : https://doi.org/10.1088/0957-0233/17/6/045
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 2:
raise ValueError("Inputs must be 2d-arrays !!!")
(height, width) = ref_mat.shape
win_size = 2 * (win_size // 2) + 1
radi = win_size // 2
start = radi + margin
als_size = win_size + 2 * margin
if width < als_size or height < als_size:
raise ValueError("Shapes of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin) = "
"{1}".format(ref_mat.shape, als_size))
start1, radi1 = start + 1, radi + 1
if norm:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
if num_point is None:
num_point = 30
if list_ij is None:
list_i = np.random.randint(start, height - start, size=num_point)
list_j = np.random.randint(start, width - start, size=num_point)
else:
list_i, list_j = __get_input_list(list_ij, height, width, start)
num_point = len(list_i)
f_alias = find_shift_based_correlation_map
if ncore == 1:
shifts = [f_alias(ref_mat[list_i[k] - start:list_i[k] + start1,
list_j[k] - start:list_j[k] + start1],
mat[list_i[k] - radi:list_i[k] + radi1,
list_j[k] - radi:list_j[k] + radi1],
margin, None, sub_pixel, method, 2, size,
False, None) for k in range(num_point)]
shifts = np.asarray(shifts)
else:
if ncore is None:
ncore = np.clip(mp.cpu_count() - 1, 1, None)
shifts = np.asarray(Parallel(n_jobs=ncore)(
delayed(f_alias)(
ref_mat[list_i[k] - start:list_i[k] + start1,
list_j[k] - start:list_j[k] + start1],
mat[list_i[k] - radi:list_i[k] + radi1,
list_j[k] - radi:list_j[k] + radi1],
margin, None, sub_pixel, method, 2, size, False, None)
for k in range(num_point)))
x_shifts, y_shifts = shifts[:, 0], shifts[:, 1]
if return_list:
return x_shifts, y_shifts
else:
if global_value == "median":
global_xshift = np.median(x_shifts)
global_yshift = np.median(y_shifts)
elif global_value == "mixed":
mid = num_point // 2
begin = np.clip(mid - 3, 0, None)
end = np.clip(mid + 3, mid, num_point)
if end > begin:
global_xshift = np.mean(np.sort(x_shifts)[begin:end])
global_yshift = np.mean(np.sort(y_shifts)[begin:end])
else:
global_xshift = np.median(x_shifts)
global_yshift = np.median(y_shifts)
else:
if num_point > 7:
global_xshift = np.mean(np.sort(x_shifts)[2:-2])
global_yshift = np.mean(np.sort(y_shifts)[2:-2])
else:
global_xshift = np.mean(x_shifts)
global_yshift = np.mean(y_shifts)
return global_xshift, global_yshift
@cuda.jit
def _get_local_shifts_gpu_kernel(shifts, ref_mat, mat, list_i, list_j,
list_2d_coef, radi, margin,
num_point): # pragma: no cover
"""
GPU-CPU function to find local (y,x)-shifts of the second image against
the reference image.
Parameters
----------
shifts : array_like
2D array of zeros, initialized at CPU.
ref_mat : array_like
2D array. Reference image.
list_i : array_like
1D array. i-index of points using for search.
list_j : array_like
1D array. j-index of points using for search.
list_2d_coef : array_like
List of 2D array of zeros, initialized at CPU.
radi : int
Radius of the window to select a local area of the image.
margin : int
To define the size of the area of the reference image for sliding.
num_point : int
Number of points using for search.
Returns
-------
Update of the (y,x)-shifting images passed from CPU.
"""
idx = cuda.grid(1)
if idx < num_point:
radi_ref = radi + margin
i = list_i[idx]
j = list_j[idx]
ref_mat1 = ref_mat[i - radi_ref:i + radi_ref + 1,
j - radi_ref: j + radi_ref + 1]
mat1 = mat[i - radi:i + radi + 1, j - radi: j + radi + 1]
coef_mat1 = list_2d_coef[idx, :, :]
coef_mat1 = __gen_2d_corr_map_2d_input(ref_mat1, mat1, coef_mat1)
x_max, y_max = __locate_max_value(coef_mat1)
x_pos, y_pos = __locate_2d_peak_kernel(coef_mat1, x_max, y_max)
shifts[0, idx] = x_pos - margin
shifts[1, idx] = y_pos - margin
def _find_global_shift_based_local_shifts_gpu(ref_mat, mat, win_size, margin,
list_ij=None, num_point=None,
global_value="mixed", block=32,
norm=False, return_list=False):
"""
GPU function to find global shift between two images based on finding
local shifts.
Parameters
----------
ref_mat : array_like
2D array. Reference image.
mat : array_like
2D array. The 2nd image. Must be the same size as the reference image.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image. E.g. 41, 61, ..
margin : int
To define the searching range (in pixel) for finding shift.
E.g. 20, 30,...
list_ij : list of lists of int or None
List of indices of points used for local search. Accept the value of
[i_index, j_index] for a single point or
[[i_index0, i_index1,...], [j_index0, j_index1,...]]
for multiple points. Automatically generated if None.
num_point : int or None
Number of points used for local search if list_ij is None.
global_value : {"median", "mean", "mixed"}
Method for calculating the global value from local values.
block : int
Size of a GPU block. E.g. 16, 32, 64, ...
norm : bool, optional
Normalize the input images if True.
return_list : bool
Return all local values if True.
Returns
-------
float or list of float
Shift in x-direction. Return a list of float if return_list is True.
float or list of float
Shift in y-direction. Return a list of float if return_list is True.
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 2:
raise ValueError("Inputs must be 2d-arrays !!!")
(height, width) = ref_mat.shape
win_size = 2 * (win_size // 2) + 1
radi = win_size // 2
edge, size = radi + margin, 2 * margin + 1
height1, width1 = height - 2 * edge, width - 2 * edge
if width1 < 1 or height1 < 1:
raise ValueError("Shapes of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin) = "
"{1}".format(ref_mat.shape, edge))
if norm:
ref_mat = normalize_image(ref_mat)
mat = normalize_image(mat)
if num_point is None:
num_point = 30
if list_ij is None:
list_i = np.random.randint(edge, height - edge, size=num_point)
list_j = np.random.randint(edge, width - edge, size=num_point)
else:
list_i, list_j = __get_input_list(list_ij, height, width, edge)
num_point = len(list_i)
shifts = np.zeros((2, num_point), dtype=np.float32)
grid = int(np.ceil(1.0 * num_point / block))
f_alias = _get_local_shifts_gpu_kernel
ref_mat1 = np.float32(np.ascontiguousarray(ref_mat))
mat1 = np.float32(np.ascontiguousarray(mat))
list_2d_coef = np.zeros((num_point, size, size), dtype=np.float32)
list_i, list_j = np.int32(list_i), np.int32(list_j)
f_alias[grid, block](shifts, ref_mat1, mat1, list_i, list_j, list_2d_coef,
np.int32(radi), np.int32(margin), np.int32(num_point))
x_shifts, y_shifts = shifts[0], shifts[1]
if return_list:
return x_shifts, y_shifts
else:
if global_value == "median":
global_xshift = np.median(x_shifts)
global_yshift = np.median(y_shifts)
elif global_value == "mixed":
mid = num_point // 2
begin = np.clip(mid - 3, 0, None)
end = np.clip(mid + 3, mid, num_point)
if end > begin:
global_xshift = np.mean(np.sort(x_shifts)[begin:end])
global_yshift = np.mean(np.sort(y_shifts)[begin:end])
else:
global_xshift = np.median(x_shifts)
global_yshift = np.median(y_shifts)
else:
if num_point > 7:
global_xshift = np.mean(np.sort(x_shifts)[2:-2])
global_yshift = np.mean(np.sort(y_shifts)[2:-2])
else:
global_xshift = np.mean(x_shifts)
global_yshift = np.mean(y_shifts)
return global_xshift, global_yshift
[docs]def find_global_shift_based_local_shifts(ref_mat, mat, win_size, margin,
list_ij=None, num_point=None,
global_value="mixed", gpu=False,
block=32, sub_pixel=True,
method="diff", size=3, ncore=None,
norm=False, return_list=False):
"""
Find global shift between two images based on finding local shifts.
Parameters
----------
ref_mat : array_like
2D array. Reference image.
mat : array_like
2D array. The 2nd image. Must be the same size as the reference image.
win_size : int
To define the size of the area around the selected pixel of the 2nd
image. E.g. 41, 61, ..
margin : int
To define the searching range (in pixel) for finding shift.
E.g. 20, 30,...
list_ij : list of lists of int or None
List of indices of points used for local search. Accept the value of
[i_index, j_index] for a single point or
[[i_index0, i_index1,...], [j_index0, j_index1,...]]
for multiple points. Automatically generated if None.
num_point : int or None
Number of points used for local search if list_ij is None.
global_value : {"median", "mean", "mixed"}
Method for calculating the global value from local values.
gpu : bool, optional
Use GPU for computing if True. If win_size and image size is large
(e.g. > 201 x 2k x 2k), using CPU may be better.
block : int, optional
Size of a GPU block if using GPU. E.g. 16, 32, 64, ...
sub_pixel : bool, optional
Enable sub-pixel location.
method : {"diff", "poly_fit"}
Method for finding sub-pixel shift. Two options: a differential
method (Ref. [1]) or a polynomial method (Ref. [2]). The "poly_fit"
option is not available if using GPU.
size : int, optional
Window size around the integer location of the maximum value used for
sub-pixel searching.
ncore : int or None
Number of cpu-cores used for computing. Automatically selected if None.
norm : bool, optional
Normalize the input images if True.
return_list : bool
Return all local values if True.
Returns
-------
float or list of float
Shift in x-direction. Return a list of float if return_list is True.
float or list of float
Shift in y-direction. Return a list of float if return_list is True.
References
----------
[1] : https://doi.org/10.48550/arXiv.0712.4289
[2] : https://doi.org/10.1088/0957-0233/17/6/045
"""
if gpu is True:
if cuda.is_available() is False:
warnings.warn("!!!No Nvidia GPU found!!!Run with CPU instead!!!")
gpu = False
if gpu is True:
f_alias = _find_global_shift_based_local_shifts_gpu
x_out, y_out = f_alias(ref_mat, mat, win_size, margin, list_ij=list_ij,
num_point=num_point, global_value=global_value,
block=block, norm=norm, return_list=return_list)
else:
f_alias = _find_global_shift_based_local_shifts_cpu
x_out, y_out = f_alias(ref_mat, mat, win_size, margin, list_ij=list_ij,
num_point=num_point, global_value=global_value,
sub_pixel=sub_pixel, method=method, size=size,
ncore=ncore, norm=norm, return_list=return_list)
return x_out, y_out
def __calc_shift_umpa(ref_mat, mat, window, margin, t1, t3, t2, t4, t6,
dark_signal=False, method="poly_fit", size=3):
"""
Supplementary CPU-function for finding local shifts using the UMPA
approach.
"""
t5 = np.zeros((2 * margin + 1, 2 * margin + 1), dtype=np.float32)
for i in range(len(ref_mat)):
t5 += correlate(ref_mat[i], window * mat[i], mode='valid')
if dark_signal:
mat_tmp = (t2 * t3 - t6 ** 2)
K = (t2 * t5 - t4 * t6) / mat_tmp
beta = (t3 * t4 - t5 * t6) / mat_tmp
A = beta + K
V = K / A
else:
K = t5 / t3
beta = 0.0
D = t1 + (beta ** 2) * t2 + (K ** 2) * t3 \
- 2 * beta * t4 - 2 * K * t5 + 2 * beta * K * t6
x_pos, y_pos = locate_peak(D, sub_pixel=True, method=method, dim=2,
size=size, max_peak=False)
j, i = int(np.round(x_pos)), int(np.round(y_pos))
x_shift, y_shift = x_pos - margin, y_pos - margin
if dark_signal:
return x_shift, y_shift, np.abs(A[i, j]), np.abs(V[i, j])
else:
return x_shift, y_shift
def __get_2d_shift_multi_rows_3d_input_umpa_cpu(ref_mat, mat, win_size, margin,
window, L1, L3, L2, L4, L6,
method="poly_fit", size=3,
ncore=None, dark_signal=False):
"""
Supplementary CPU-function for finding local shifts using the UMPA
approach.
"""
(height, width) = ref_mat.shape[-2:]
win_size = 2 * (win_size // 2) + 1
radi = win_size // 2
start = radi + margin
stop_col, stop_row = width - start, height - start
start1, radi1, margin1 = start + 1, radi + 1, margin + 1
f_alias = __calc_shift_umpa
if ncore is None:
ncore = np.clip(mp.cpu_count() - 1, 1, None)
if dark_signal:
results = np.asarray(
Parallel(n_jobs=ncore)(delayed(f_alias)(
ref_mat[:, i - start:i + start1, j - start:j + start1],
mat[:, i - radi:i + radi1, j - radi:j + radi1], window, margin,
L1[i, j], L3[i - margin:i + margin1, j - margin:j + margin1],
L2, L4[i, j],
L6[i - margin:i + margin1, j - margin:j + margin1],
dark_signal, method=method, size=size)
for i in range(start, stop_row)
for j in range(start, stop_col)))
results = np.reshape(np.asarray(results),
(stop_row - start, stop_col - start, 4))
x_shifts, y_shifts = results[:, :, 0], results[:, :, 1]
trans, dark = results[:, :, 2], results[:, :, 3]
else:
results = np.asarray(
Parallel(n_jobs=ncore)(delayed(f_alias)(
ref_mat[:, i - start:i + start1, j - start:j + start1],
mat[:, i - radi:i + radi1, j - radi:j + radi1], window, margin,
L1[i, j], L3[i - margin:i + margin1, j - margin:j + margin1],
0.0, 0.0, 0.0, dark_signal, method=method, size=size)
for i in range(start, stop_row)
for j in range(start, stop_col)))
results = np.reshape(np.asarray(results),
(stop_row - start, stop_col - start, 2))
x_shifts, y_shifts = results[:, :, 0], results[:, :, 1]
if dark_signal:
return x_shifts, y_shifts, trans, dark
else:
return x_shifts, y_shifts
@cuda.jit(device=True)
def __sum_multiply_no_norm_2d(ref_mat, mat, window): # pragma: no cover
"""
GPU-kernel function to calculate the sum of multiplies of 2d-arrays.
Parameters
----------
ref_mat : array_like
2D array. The first image.
mat : array_like
2D array. The second image.
window : array_like
2D array. Smoothing window.
Returns
-------
float
"""
(height, width) = ref_mat.shape
sum_mul = 0.0
for i in range(height):
for j in range(width):
val = ref_mat[i, j] * mat[i, j] * window[i, j]
sum_mul += val
return sum_mul
@cuda.jit(device=True)
def __accu_correlate(ref_mat, mat, window, coef_mat): # pragma: no cover
"""
GPU-kernel function to calculate cross-correlation between two images.
Parameters
----------
ref_mat : array_like
2D array. The first image.
mat : array_like
2D array. The second image.
window : array_like
2D array. Smoothing window.
coef_mat : array_like
2D array. Resulting map.
Returns
-------
array_like
"""
(height0, width0) = ref_mat.shape
(height1, width1) = mat.shape
height2, width2 = height0 - height1 + 1, width0 - width1 + 1
# coef_mat is at GPU global memory with the size of (height2, width2)
for i in range(height2):
for j in range(width2):
row0, row1 = i, i + height1
col0, col1 = j, j + width1
ref_mat1 = ref_mat[row0:row1, col0:col1]
sum_mul = __sum_multiply_no_norm_2d(ref_mat1, mat, window)
coef_mat[i, j] += sum_mul
return coef_mat
@cuda.jit
def __calc_shift_umpa_gpu_kernel(shifts, trans, dark, ref_mat, mat, coef_4d,
depth, height, width, radi, margin, window,
L1, L3, L2, L4, L6, A0, V0, D0,
get_dark): # pragma: no cover
"""
Supplementary GPU-CPU function for finding local shifts using the UMPA
approach.
"""
(x_index, y_index) = cuda.grid(2)
if (y_index < height) and (x_index < width):
radi_ref = margin + radi
radi_ref1 = radi_ref + 1
margin1 = margin + 1
radi1 = radi + 1
size = 2 * margin + 1
j = x_index + radi_ref
i = y_index + radi_ref
ref_mat1 = ref_mat[:, i - radi_ref:i + radi_ref1,
j - radi_ref: j + radi_ref1]
mat1 = mat[:, i - radi:i + radi1, j - radi: j + radi1]
A = A0[y_index, x_index, :, :]
V = V0[y_index, x_index, :, :]
D = D0[y_index, x_index, :, :]
t5 = coef_4d[y_index, x_index, :, :]
for k in range(depth):
__accu_correlate(ref_mat1[k], mat1[k], window, t5)
t1 = L1[i, j]
t3 = L3[i - margin:i + margin1, j - margin:j + margin1]
t2, t4 = L2, L4[i, j]
t6 = L6[i - margin:i + margin1, j - margin:j + margin1]
for u in range(size):
for v in range(size):
if get_dark == 1:
num = t2 * t3[u, v] - t6[u, v] ** 2
K = (t2 * t5[u, v] - t4 * t6[u, v]) / num
beta = (t3[u, v] * t4 - t5[u, v] * t6[u, v]) / num
A[u, v] = beta + K
if A[u, v] != 0.0:
V[u, v] = K / A[u, v]
num1 = t1 + ((beta ** 2) * t2 + (K ** 2) * t3[u, v]
- 2 * beta * t4 - 2 * K * t5[u, v]
+ 2 * beta * K * t6[u, v])
else:
K = t5[u, v] / t3[u, v]
num1 = t1 + (K ** 2) * t3[u, v] - 2 * K * t5[u, v]
D[u, v] = num1
val_max = __get_max_value(D)
D = __inverse_values(D, val_max)
x_max, y_max = __locate_max_value(D)
x_pos, y_pos = __locate_2d_peak_kernel(D, x_max, y_max)
j1, i1 = int(round(x_pos)), int(round(y_pos))
shifts[0, y_index, x_index] = x_pos - margin
shifts[1, y_index, x_index] = y_pos - margin
if get_dark == 1:
trans[y_index, x_index] = abs(A[i1, j1])
dark[y_index, x_index] = abs(V[i1, j1])
def __get_2d_shift_multi_rows_3d_input_umpa_gpu(ref_mat, mat, win_size, margin,
window, L1, L3, L2, L4, L6,
block=(16, 16),
dark_signal=True):
"""
Supplementary GPU-function for finding local shifts using the UMPA
approach.
"""
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 3:
raise ValueError("Inputs must be 3d-arrays !!!")
(depth, height, width) = ref_mat.shape
win_size = 2 * (win_size // 2) + 1
radi = win_size // 2
edge, size = radi + margin, 2 * margin + 1
height1, width1 = height - 2 * edge, width - 2 * edge
if width1 < 1 or height1 < 1:
if width1 < 1 or height1 < 1:
raise ValueError("Shapes of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin) = "
"{1}".format(ref_mat[0].shape, edge))
shifts = np.zeros((2, height1, width1), dtype=np.float32)
trans = np.ones((height1, width1), dtype=np.float32)
dark = np.ones_like(trans)
ref_mat1 = np.float32(np.ascontiguousarray(ref_mat))
mat1 = np.float32(np.ascontiguousarray(mat))
coef_4d = np.zeros((height1, width1, size, size), dtype=np.float32)
A0 = np.zeros((height1, width1, size, size), dtype=np.float32)
V0 = np.zeros((height1, width1, size, size), dtype=np.float32)
D0 = np.zeros((height1, width1, size, size), dtype=np.float32)
L1, L3 = np.float32(L1), np.float32(L3)
L2, L4, L6 = np.float32(L2), np.float32(L4), np.float32(L6)
window = np.float32(window)
grid = (int(np.ceil(1.0 * width1 / block[0])),
int(np.ceil(1.0 * height1 / block[1])))
f_alias = __calc_shift_umpa_gpu_kernel
if dark_signal:
get_dark = 1
else:
get_dark = 0
f_alias[grid, block](shifts, trans, dark, ref_mat1, mat1, coef_4d,
np.int32(depth), np.int32(height1),
np.int32(width1), np.int32(radi), np.int32(margin),
window, L1, L3, L2, L4, L6, A0, V0, D0,
np.int32(get_dark))
return shifts[0], shifts[1], trans, dark
[docs]def find_local_shifts_umpa(ref_mat, mat, win_size=7, margin=10,
method="diff", size=3, gpu=True, block=(16, 16),
ncore=None, chunk_size=100, filter_name="hamming",
dark_signal=False):
"""
To find local shifts (in x and y direction) of each pixel between
two 3d-images by selecting a small volume of the second image and sliding
over a slightly larger volume of the reference image. The cost function
uses the formula in Ref. [1], known as UMPA.
Parameters
----------
ref_mat : array_like
3D array, can be a numpy array or hdf object. Reference image.
mat : array_like
3D array, can be a numpy array or hdf object. The second image, must
be the same size as the reference image.
win_size : int
Size of local areas in the second image.
margin : int
To define the sliding range of the second image.
method : {"diff", "poly_fit"}
Method for finding sub-pixel shift. Two options: a differential
method (Ref. [2]) or a polynomial method (Ref. [3]). The "poly_fit"
option is not available if using GPU.
size : int
Window size around the integer location of the maximum value used for
sub-pixel location. Adjustable if using the polynomial method.
gpu : bool
Use GPU for computing if True.
block : tuple of two integer-values, optional
Size of a GPU block. E.g. (8, 8), (16, 16), (32, 32), ...
ncore : int or None
Number of cpu-cores used for computing. Automatically selected if None.
chunk_size : int or None
Size of each chunk extracted along the height of the image. Use to
avoid the out of memory problem.
filter_name : {None, "hann", "bartlett", "blackman", "hamming",\
"nuttall", "parzen", "triang"}
To select a smoothing filter.
dark_signal : bool
Return both dark-signal image and transmission-signal image if True
Returns
-------
list of two 2d-arrays or four 2d-arrays
x-shift image and y-shift image. Zeros at the outer area of the size of
(margin + win_size // 2). And/or dark-signal image and
transmission-signal image If the 'dark_signal' option is True.
References
----------
[1] : https://doi.org/10.1103/PhysRevLett.118.203903
[2] : https://doi.org/10.48550/arXiv.0712.4289
[3] : https://doi.org/10.1088/0957-0233/17/6/045
"""
if gpu is True:
if cuda.is_available() is False:
warnings.warn("!!!No Nvidia GPU found!!!Run with CPU instead!!!")
gpu = False
if ref_mat.shape != mat.shape:
raise ValueError("Data shape must be the same !!!")
if len(ref_mat.shape) != 3:
raise ValueError("Inputs must be 3d-arrays !!!")
(depth, height, width) = ref_mat.shape
if chunk_size is None:
chunk_size = height + 1
else:
chunk_size = np.clip(chunk_size, 1, height)
num_chunk = np.clip(height // chunk_size + 1, 1, height)
win_size = 2 * (win_size // 2) + 1
edge = margin + win_size // 2
als_size = win_size + 2 * margin
if width < als_size or height < als_size:
raise ValueError("Shapes of the inputs {0} are smaller than the "
"requested size (win_size + 2*margin) = "
"{1} x {1}".format((height, width), als_size))
win_1d = make_smoothing_window(filter_name, win_size)
window = np.multiply.outer(win_1d, win_1d)
window = window / np.sum(window)
S2 = np.sum(mat ** 2, axis=0)
R2 = np.sum(ref_mat ** 2, axis=0)
L1 = correlate(S2, window, mode="same")
L3 = correlate(R2, window, mode="same")
S1 = np.sum(mat, axis=0)
R1 = np.sum(ref_mat, axis=0)
Im = np.mean(R1) / depth
L2 = (Im ** 2) * depth
L4 = Im * correlate(S1, window, mode="same")
L6 = Im * correlate(R1, window, mode="same")
x_shifts = np.zeros((height, width), dtype=np.float32)
y_shifts = np.zeros_like(x_shifts)
trans = np.ones_like(x_shifts)
dark = np.ones_like(x_shifts)
list_index = np.array_split(np.arange(edge, height - edge), num_chunk)
f_alias1 = __get_2d_shift_multi_rows_3d_input_umpa_gpu
f_alias2 = __get_2d_shift_multi_rows_3d_input_umpa_cpu
for pos in list_index:
b, e = pos[0], pos[-1] + 1
b1, e1 = b - edge, e + edge
ref_mat1 = ref_mat[:, b1:e1, :]
mat1 = mat[:, b1:e1, :]
if gpu:
results = f_alias1(ref_mat1, mat1, win_size, margin, window,
L1[b1:e1], L3[b1:e1], L2, L4[b1:e1], L6[b1:e1],
block=block, dark_signal=dark_signal)
else:
if dark_signal:
results = f_alias2(ref_mat1, mat1, win_size, margin, window,
L1[b1:e1], L3[b1:e1], L2, L4[b1:e1],
L6[b1:e1], method=method, size=size,
ncore=ncore, dark_signal=dark_signal)
else:
results = f_alias2(ref_mat1, mat1, win_size, margin, window,
L1[b1:e1], L3[b1:e1], 0.0, 0.0, 0.0,
method=method, size=size, ncore=ncore,
dark_signal=dark_signal)
x_shifts[b:e, edge:-edge] = results[0]
y_shifts[b:e, edge:-edge] = results[1]
if dark_signal:
trans[b:e, edge:-edge] = results[2]
dark[b:e, edge:-edge] = results[3]
if dark_signal:
trans = np.pad(trans[edge:-edge, edge:-edge], edge, mode="edge")
dark = np.pad(dark[edge:-edge, edge:-edge], edge, mode="edge")
return x_shifts, y_shifts, trans, dark
else:
return x_shifts, y_shifts