"""
This file contains useful classes and functions for profiling and integration
"""
import warnings
import numpy as np
import pandas as pd
from scipy.spatial import KDTree
[docs]
class Profile:
def __init__(
self,
x,
y,
counts,
cen_x=None,
cen_y=None,
fg_cutoff=1.0,
bg_cutoff=3.0,
minfrac=0.10,
eps=1e-5,
frac_step_size=0.50,
):
"""
Initialize a Profile instance.
Parameters:
x (np.array): x-coordinates.
y (np.array): y-coordinates.
counts (np.array): Counts associated with each coordinate.
cen_x (Optional[np.array]): Center x-coordinate.
cen_y (Optional[np.array]): Center y-coordinate.
fg_cutoff (float): Foreground cutoff value.
bg_cutoff (float): Background cutoff value.
minfrac (float): Minimum fraction of pixels in a profile.
eps (float): Small positive epsilon value to remove negative intensities.
frac_step_size (float): Fractional step size for fitting iterations.
"""
self.frac_step_size = frac_step_size
self.eps = eps
self.minfrac = minfrac
self.fg_cutoff = fg_cutoff
self.bg_cutoff = bg_cutoff
self.counts = counts
self.fg_mask = np.ones_like(counts, dtype=bool)
self.bg_mask = np.ones_like(counts, dtype=bool)
self.x = x
self.y = y
self.cen_x = cen_x
if self.cen_x is None:
self.cen_x = np.average(self.x, weights=self.counts)
self.cen_y = cen_y
if self.cen_y is None:
self.cen_y = np.average(self.y, weights=self.counts)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
self.scale = np.cov(self.difference_vectors.T, aweights=counts)
self.slope = np.zeros(2)
self.intercept = 0.0
self.update_mask()
[docs]
def set_profile_params(self, scale, slope, intercept, cen_x, cen_y):
"""
Set profile parameters.
Parameters:
scale (np.array): Scaling factor.
slope (np.array): Slope of the profile.
intercept (float): Intercept value.
cen_x (np.array): Center x-coordinate.
cen_y (np.array): Center y-coordinate.
Returns:
self (Profile): Updated Profile instance.
"""
self.scale = scale
self.slope = slope
self.intercept = intercept
self.cen_x = cen_x
self.cen_y = cen_y
self.update_mask()
self.update_background_plane()
self.integrate()
return self
[docs]
def fit(self, nsteps=5):
"""
Fit the profile.
Parameters:
nsteps (int): Number of fitting steps.
"""
self.success = True
for i in range(nsteps):
try:
self.update_background_plane(alpha=self.frac_step_size)
self.update_profile(alpha=self.frac_step_size)
self.update_mask()
except np.linalg.LinAlgError:
self.success = False
break
[docs]
@classmethod
def from_dataframe(cls, df, x_key="dx", y_key="dy", count_key="counts", **kwargs):
"""
Create a Profile instance from a DataFrame.
Parameters:
df (pd.DataFrame): Input DataFrame.
x_key (str): x-coordinate column name.
y_key (str): y-coordinate column name.
count_key (str): Counts column name.
kwargs: Additional keyword arguments.
Returns:
Profile: Created Profile instance.
"""
x = df[x_key].astype("float32")
y = df[y_key].astype("float32")
counts = df[count_key].astype("float32")
return cls(x, y, counts, **kwargs)
@property
def difference_vectors(self):
"""
Get the difference vectors.
Returns:
np.array: Difference vectors.
"""
return np.column_stack((self.x - self.cen_x, self.y - self.cen_y))
@property
def background(self):
"""
Get the background.
Returns:
np.array: Background values.
"""
return self.difference_vectors @ self.slope + self.intercept
[docs]
def update_mask(self, fg_cutoff=None, bg_cutoff=None):
"""
Update the mask.
Parameters:
fg_cutoff (Optional[float]): Foreground cutoff value.
bg_cutoff (Optional[float]): Background cutoff value.
"""
if fg_cutoff is None:
fg_cutoff = self.fg_cutoff
if bg_cutoff is None:
bg_cutoff = self.bg_cutoff
dx = self.difference_vectors
try:
Z = np.sqrt(
np.squeeze(
dx[..., None, :] @ np.linalg.pinv(self.scale) @ dx[..., :, None]
)
)
except:
print(
"SVD failed to converge. Mask could not be updated. Skipping reflection."
)
self.success = False
return
self.fg_mask = Z <= fg_cutoff
self.bg_mask = Z >= bg_cutoff
# There must be _some_ pixels in each group otherwise numerical issues happen
minpix = np.round(len(self.counts) * self.minfrac).astype("int")
zorder = np.argsort(Z)
try:
self.fg_mask[zorder[:minpix]] = True
self.bg_mask[zorder[:minpix]] = False
self.bg_mask[zorder[-minpix:]] = True
self.fg_mask[zorder[-minpix:]] = False
except:
print("Spot has 1 or fewer pixels.")
self.success = False
return
[docs]
def update_background_plane(self, alpha=0.9):
"""
Update the background plane.
Parameters:
alpha (float): Alpha value for updating slope and intercept.
"""
y = self.counts
X = np.pad(self.difference_vectors, [[0, 0], [0, 1]], constant_values=1.0)
weights = np.reciprocal(self.counts) # Inverse variance poisson because #stats
weights = weights * (self.bg_mask).astype("float")
weights = weights / weights.mean()
beta = (
np.linalg.pinv(X.T @ np.diag(weights) @ X) @ X.T @ (y * weights)
) # if desperate, break glass
slope = beta[:2]
intercept = beta[2]
self.slope = (1.0 - alpha) * self.slope + alpha * slope
self.intercept = (1.0 - alpha) * self.intercept + alpha * intercept
[docs]
def update_profile(self, alpha=0.9):
"""
Update the profile.
Parameters:
alpha (float): Alpha value for updating scale factor and profile centroid.
"""
weights = self.counts - self.background
weights = np.maximum(
weights, self.eps
) # These can be negative so we need to make them > zero(ish)
weights = weights * self.fg_mask
weights = weights / weights.mean()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
scale = np.cov(self.difference_vectors.T, aweights=weights)
cen_x = np.average(self.x, weights=weights)
cen_y = np.average(self.y, weights=weights)
self.scale = (1.0 - alpha) * self.scale + alpha * scale
self.cen_x = (1.0 - alpha) * self.cen_x + alpha * cen_x
self.cen_y = (1.0 - alpha) * self.cen_y + alpha * cen_y
[docs]
def integrate(self):
"""Integrate the profile."""
bg = self.background
self.I = ((self.counts - bg) * self.fg_mask).sum()
self.SigI = np.sqrt(((self.counts + bg) * self.fg_mask).sum())
[docs]
class SegmentedImage:
def __init__(self, pixels, centroids, radius=20):
"""
Initialize a SegmentedImage instance.
Parameters:
pixels (np.array): Pixel values.
centroids (np.array): Centroids of the pixels.
radius (int): Radius value.
"""
super().__init__()
self.radius = radius
pixels = np.array(pixels).astype("float32")
x, y = np.indices(pixels.shape)
xy = np.dstack(np.indices(pixels.shape)).astype("float32")[:, :, ::-1]
k = KDTree(centroids)
distance, labels = k.query(xy)
distance = distance.astype("float32")
distance_vectors = centroids[labels] - xy
mask = (distance <= radius) & (pixels > 0)
self.data = pd.DataFrame(
{
"x": xy[mask][:, 0].astype("int"),
"y": xy[mask][:, 1].astype("int"),
"counts": pixels[mask],
"label": labels[mask],
"distance": distance[mask],
"dx": distance_vectors[mask][:, 0],
"dy": distance_vectors[mask][:, 1],
"cen_x": centroids[labels[mask], 0],
"cen_y": centroids[labels[mask], 1],
}
)
self.profiles = self.data.groupby("label").apply(Profile.from_dataframe)
[p.fit() for p in self.profiles]
[p.integrate() for p in self.profiles]
self.scale = np.stack([p.scale for p in self.profiles])
self.loc = np.stack([(p.cen_x, p.cen_y) for p in self.profiles])
g = self.data.groupby("label")
idx = np.unique(self.data.label)
idy = np.arange(len(centroids))
idx = np.isin(idy, idx)
self.used_reflections = idx
self.bboxes = np.column_stack(
[
g.min().x,
g.max().x + 1,
g.min().y,
g.max().y + 1,
]
)
self.pixels = pixels
self.centroids = centroids[np.unique(self.data.label)]
self.labels = labels
self.distance = distance
self.distance_vectors = distance_vectors
[docs]
def integrate(self, isigi_cutoff=2.0, knn=5):
"""
Integrate the SegmentedImage.
Parameters:
isigi_cutoff (float): I/SIGI cutoff value.
knn (int): Number of nearest neighbors.
Returns:
strong_idx (np.array): Strong indices.
"""
# Implement k nearest-neighbors for weak spots to average profile params from nearby strong spots
strong_idx = np.array(
[i for i, p in enumerate(self.profiles) if p.I / p.SigI > isigi_cutoff]
)
strong_centroids = self.centroids[strong_idx]
for i, p in enumerate(self.profiles):
scale = np.zeros_like(p.scale)
slope = np.zeros_like(p.slope)
intercept = np.zeros_like(p.intercept)
cen_x = np.zeros_like(p.cen_x)
cen_y = np.zeros_like(p.cen_y)
# Average over the knn profiles
for idx in strong_idx[
np.argsort(
np.linalg.norm(strong_centroids - self.centroids[i], axis=-1)
)
][:knn]:
nearest_neighbor = self.profiles.iloc[idx]
scale += nearest_neighbor.scale / knn
slope += nearest_neighbor.slope / knn
intercept += nearest_neighbor.intercept / knn
cen_x += nearest_neighbor.cen_x / knn
cen_y += nearest_neighbor.cen_y / knn
p.set_profile_params(
scale,
slope,
intercept,
cen_x,
cen_y,
)
self.scale = np.stack([p.scale for p in self.profiles])
self.loc = np.stack([(p.cen_x, p.cen_y) for p in self.profiles])
return strong_idx