def find_growth_features(
y: NDArray,
time_step: float,
first_growth: float,
first_frame: int = 1,
) -> dict:
"""
Extract some growth descriptors from a time zeries.
Parameters
----------
y : NDArray
Raw signal (one measurement per frame). A NumPy 1‑D array.
time_step : float
Temporal spacing between successive frames (the R script calls it
``time_step`` and later multiplies it by frame indices).
first_growth : float
Threshold used to locate the *pseudo‑peak* (the index where the
curve first rises above ``y[2] + first_growth`` for increasing
curves, or falls below it for decreasing curves).
If first_growth is negative the curve is treated as decreasing;
otherwise it is treated as increasing.
first_frame : int, default 1
1‑based index of the frame where analysis should start.
Frames before this are discarded.
Returns
-------
GrowthFeatures
A namedtuple with 14 fields; the order matches exactly the column
order of the data.frame produced by the R implementation.
If the data are insufficient for regression the numeric fields are
``np.nan`` and the two rupture‑related fields contain the string
``"censored"`` (again matching the R behaviour).
Notes
-----
* The wrapper calls a handful of internal, JIT‑compiled helpers:
`_slope_shifts`, `_fill_r_squared_matrices`, and `_linregress`.
Those helpers must be present in the same module – they are defined
in the first part of *curve_features.py*.
* the return type is a lightweight ``namedtuple``.
"""
if first_frame < 1:
raise ValueError("first_frame is 1‑based and must be >= 1")
y = y[first_frame - 1 :] # 0‑based slice
# Decide whether the curve is increasing or decreasing
if first_growth < 0:
shape = "decreasing"
shape_flag = -1
else:
shape = "increasing"
shape_flag = 1
# Remove a leading zero
if len(y) > 1 and y[0] == 0.0:
y = y[1:]
# Replace interior zeros by the smallest non‑zero value
zeros = y == 0.0
if np.any(zeros):
positive = y[~zeros]
if positive.size == 0: # completely zero vector
raise ValueError("All values are zero; cannot replace zeros")
smallest = np.min(positive)
y[zeros] = smallest
n_frame = len(y)
if n_frame == 0:
# Degenerate input – nothing to analyze
return default_features
# Basic statistics needed later
max_y = np.max(y)
min_y = np.min(y)
maximal_variation = np.abs(max_y - min_y) # abs(max-min)
mean_diff = np.mean(np.diff(y))
# Detect slope‑shift frames (possible rupture points)
# _slope_shifts returns **1‑based* frame numbers multiplied by the cluster length.
cluster_len = 5
shift_frames_onebased = _slope_shifts(y, cluster_len, shape_flag, mean_diff)
shift_frames = shift_frames_onebased - 1 # now 0‑based
# Determine where (if ever) a “rupture” occurs
# A rupture is the *first* slope‑shift that happens after the signal
# has moved through 90 % of its total amplitude.
if shape == "increasing":
high_thr = y[1] + 0.9 * maximal_variation
high_idx = np.where(y > high_thr)[0]
else:
high_thr = y[1] - 0.9 * maximal_variation
high_idx = np.where(y < high_thr)[0]
# Intersection of the two sets gives candidate rupture frames
cand = np.intersect1d(shift_frames, high_idx, assume_unique=True)
if cand.size:
rupture_time_idx = int(cand[0]) # 0‑based
rupture_time_min = time_step * rupture_time_idx
rupture_value = float(y[rupture_time_idx])
else:
rupture_time_min = "censored"
rupture_value = "censored"
rupture_time_idx = n_frame - 1 # we will search up to the end
max_end = rupture_time_idx # upper bound for the regression loops
# Determine the “growth” windows that bound the exhaustive search
if shape == "increasing":
max_start = int(np.where(y > (y[1] + 0.1 * maximal_variation))[0][0])
max_exp_start = int(np.where(y > (y[1] + 0.01 * maximal_variation))[0][0])
else:
max_start = int(np.where(y < (y[1] - 0.1 * maximal_variation))[0][0])
max_exp_start = int(np.where(y < (y[1] - 0.01 * maximal_variation))[0][0])
# Guard against pathological series where the thresholds are never crossed
if max_start < 0 or max_exp_start < 0:
return default_features
# Fill the two R‑squared matrices (the O(N²) part)
exp_r2_mat, lin_r2_mat = _fill_r_squared_matrices(
y,
time_step,
max_start,
max_exp_start,
max_end,
shape_flag,
max_y,
maximal_variation,
)
# Locate the windows with maximal R² for each model
exp_flat_idx = int(np.argmax(exp_r2_mat))
exp_i, exp_j = np.unravel_index(exp_flat_idx, exp_r2_mat.shape)
best_exp_r2 = float(exp_r2_mat[exp_i, exp_j])
lin_flat_idx = int(np.argmax(lin_r2_mat))
lin_i, lin_j = np.unravel_index(lin_flat_idx, lin_r2_mat.shape)
best_lin_r2 = float(lin_r2_mat[lin_i, lin_j])
# Perform the *final* regressions on the selected windows
# Exponential regression (log‑linear)
if shape == "decreasing":
# Transform the data exactly as the R code does:
# y_neg_exp = -y + 2*max_y → then take log
y_exp_window = -y[exp_i : exp_j + 1] + 2.0 * max_y
else:
y_exp_window = y[exp_i : exp_j + 1]
# Time vector for the selected exponential window
t_exp = np.arange(exp_i, exp_j + 1) * time_step
# Linear regression on log(y) – SciPy gives slope, intercept,
# rvalue, pvalue, stderr; we need only the first two.
log_y = np.log(y_exp_window)
exp_reg = stats.linregress(t_exp, log_y)
exp_intercept = exp_reg.intercept # this is the log‑intercept
exp_slope = exp_reg.slope # growth rate (per minute)
# Linear regression on raw data
t_lin = np.arange(lin_i, lin_j + 1) * time_step
lin_reg = stats.linregress(t_lin, y[lin_i : lin_j + 1])
lin_intercept = lin_reg.intercept
lin_slope = lin_reg.slope
# Assemble the final output (exactly the same order as R)
features = {"exp_intercept": exp_intercept, # intercept of best exponential fit (log‑scale)
"exp_growth_rate_mm2s": exp_slope, # slope of best exponential fit
"exp_start": time_step * exp_i, # start time (min) of that exponential window
"exp_end": time_step * exp_j, # end time (min) of that exponential window
"exp_r_squared": best_exp_r2, # Maximum R² among exponential windows
"lin_intercept": lin_intercept, # intercept of best linear fit
"lin_growth_rate_mm2s": lin_slope, # slope of best linear fit
"lin_start": time_step * lin_i, # start time (min) of that linear window
"lin_end": time_step * lin_j, # end time (min) of that linear window
"lin_r_squared": best_lin_r2, # Maximum R² among linear windows
"growth_rupture_time_min": rupture_time_min,
"growth_rupture_surface_mm2": rupture_value}
return features