feat(ga): Overhaul GA for multi-root robustness and CPU performance
All checks were successful
Run Python Tests / test (3.12) (pull_request) Successful in 34s
Run Python Tests / test (3.8) (pull_request) Successful in 35s
Run Python Tests / test (3.10) (pull_request) Successful in 3m8s
Publish Python Package to PyPI / deploy (push) Successful in 12s

### 🚀 Performance (CPU)
* Replaces `np.polyval` with a parallel Numba JIT function (`_calculate_ranks_numba`).
* Replaces $O(N \log N)$ `np.argsort` with $O(N)$ `np.argpartition` in the GA loop.
* Adds `numba` as a core dependency.

### 🧠 Robustness (Algorithm)
* Implements Blend Crossover (BLX-$\alpha$) for better, extrapolative exploration.
* Uses a hybrid selection model (top X% for crossover, 100% for mutation) to preserve root niches.
* Adds `selection_percentile` and `blend_alpha` to `GA_Options` for tuning.
This commit was merged in pull request #21.
This commit is contained in:
2025-10-30 11:31:00 -04:00
parent 1318006959
commit 9d967210fa
4 changed files with 132 additions and 51 deletions

View File

@@ -1,5 +1,6 @@
import math
import numpy as np
import numba
from dataclasses import dataclass
from typing import List, Optional, Union
import warnings
@@ -37,6 +38,31 @@ extern "C" __global__ void fitness_kernel(
}
"""
@numba.jit(nopython=True, fastmath=True, parallel=True)
def _calculate_ranks_numba(solutions, coefficients, y_val, ranks):
"""
A Numba-jitted, parallel function to calculate fitness.
This replaces np.polyval and the rank calculation.
"""
num_coefficients = coefficients.shape[0]
data_size = solutions.shape[0]
# This prange will be run in parallel on all your CPU cores
for idx in numba.prange(data_size):
x_val = solutions[idx]
# Horner's method (same as np.polyval)
ans = coefficients[0]
for i in range(1, num_coefficients):
ans = ans * x_val + coefficients[i]
ans -= y_val
if ans == 0.0:
ranks[idx] = 1.7976931348623157e+308 # np.finfo(float).max
else:
ranks[idx] = abs(1.0 / ans)
@dataclass
class GA_Options:
"""
@@ -62,6 +88,16 @@ class GA_Options:
mutation_ratio (float): The percentage (e.g., 0.40 for 40%) of the next
generation to be created by mutating solutions
from the parent pool. Default: 0.40
selection_percentile (float): The top percentage (e.g., 0.66 for 66%)
of solutions to use as the parent pool
for crossover. A smaller value speeds
up single-root convergence; a larger
value helps find multiple roots.
Default: 0.66
blend_alpha (float): The expansion factor for Blend Crossover (BLX-alpha).
0.0 = average crossover (no expansion).
0.5 = 50% expansion beyond the parent range.
Default: 0.5
root_precision (int): The number of decimal places to round roots to
when clustering. A smaller number (e.g., 3)
groups roots more aggressively. A larger number
@@ -76,6 +112,8 @@ class GA_Options:
elite_ratio: float = 0.05
crossover_ratio: float = 0.45
mutation_ratio: float = 0.40
selection_percentile: float = 0.66
blend_alpha: float = 0.5
root_precision: int = 5
def __post_init__(self):
@@ -87,6 +125,14 @@ class GA_Options:
)
if any(r < 0 for r in [self.elite_ratio, self.crossover_ratio, self.mutation_ratio]):
raise ValueError("GA ratios cannot be negative.")
if not (0 < self.selection_percentile <= 1.0):
raise ValueError(
f"selection_percentile must be between 0 (exclusive) and 1.0 (inclusive), but got {self.selection_percentile}"
)
if self.blend_alpha < 0:
raise ValueError(
f"blend_alpha cannot be negative, but got {self.blend_alpha}"
)
def _get_cauchy_bound(coeffs: np.ndarray) -> float:
"""
@@ -320,40 +366,57 @@ class Function:
# Create initial random solutions
solutions = np.random.uniform(min_r, max_r, data_size)
# Pre-allocate ranks array
ranks = np.empty(data_size, dtype=np.float64)
for _ in range(options.num_of_generations):
# Calculate fitness for all solutions (vectorized)
y_calculated = np.polyval(self.coefficients, solutions)
error = y_calculated - y_val
_calculate_ranks_numba(solutions, self.coefficients, y_val, ranks)
parent_pool_size = int(data_size * options.selection_percentile)
# 1. Get indices for the elite solutions (O(N) operation)
# We find the 'elite_size'-th largest element.
elite_indices = np.argpartition(-ranks, elite_size)[:elite_size]
with np.errstate(divide='ignore'):
ranks = np.where(error == 0, np.finfo(float).max, np.abs(1.0 / error))
# Sort solutions by fitness (descending)
sorted_indices = np.argsort(-ranks)
solutions = solutions[sorted_indices]
# 2. Get indices for the parent pool (O(N) operation)
# We find the 'parent_pool_size'-th largest element.
parent_pool_indices = np.argpartition(-ranks, parent_pool_size)[:parent_pool_size]
# --- Create the next generation ---
# 1. Elitism: Keep the best solutions as-is
elite_solutions = solutions[:elite_size]
# Define a "parent pool" of the top 50% of solutions to breed from
parent_pool = solutions[:data_size // 2]
elite_solutions = solutions[elite_indices]
# 2. Crossover: Breed two parents to create a child
# Select from the full list (indices 0 to data_size-1)
parent1_indices = np.random.randint(0, data_size, crossover_size)
parent2_indices = np.random.randint(0, data_size, crossover_size)
parents1 = solutions[parent1_indices]
parents2 = solutions[parent2_indices]
# Simple "average" crossover
crossover_solutions = (parents1 + parents2) / 2.0
# Select from the fitter PARENT POOL
parents1_idx = np.random.choice(parent_pool_indices, crossover_size)
parents2_idx = np.random.choice(parent_pool_indices, crossover_size)
parents1 = solutions[parents1_idx]
parents2 = solutions[parents2_idx]
# Blend Crossover (BLX-alpha)
alpha = options.blend_alpha
# Find min/max for all parent pairs
p_min = np.minimum(parents1, parents2)
p_max = np.maximum(parents1, parents2)
# Calculate range (I)
parent_range = p_max - p_min
# Calculate new min/max for the expanded range
new_min = p_min - (alpha * parent_range)
new_max = p_max + (alpha * parent_range)
# Create a new random child within the expanded range
crossover_solutions = np.random.uniform(new_min, new_max, crossover_size)
# 3. Mutation:
# Select from the full list (indices 0 to data_size-1)
mutation_candidates = solutions[np.random.randint(0, data_size, mutation_size)]
# Use mutation_strength (the new name)
# Use mutation_strength
mutation_factors = np.random.uniform(
1 - options.mutation_strength,
1 + options.mutation_strength,
@@ -374,10 +437,7 @@ class Function:
# --- Final Step: Return the best results ---
# After all generations, do one last ranking to find the best solutions
y_calculated = np.polyval(self.coefficients, solutions)
error = y_calculated - y_val
with np.errstate(divide='ignore'):
ranks = np.where(error == 0, np.finfo(float).max, np.abs(1.0 / error))
_calculate_ranks_numba(solutions, self.coefficients, y_val, ranks)
# 1. Define quality based on the user's desired precision
# (e.g., precision=5 -> rank > 1e6, precision=8 -> rank > 1e9)
@@ -458,17 +518,31 @@ class Function:
# 1. Elitism
d_elite_solutions = d_solutions[:elite_size]
# Define a "parent pool" of the top 50% of solutions to breed from
d_parent_pool = d_solutions[:data_size // 2]
parent_pool_size = d_parent_pool.size
# 2. Crossover
# Select from the full list (indices 0 to data_size-1)
parent1_indices = cupy.random.randint(0, data_size, crossover_size)
parent2_indices = cupy.random.randint(0, data_size, crossover_size)
parent_pool_size = int(data_size * options.selection_percentile)
# Select from the fitter PARENT POOL
parent1_indices = cupy.random.randint(0, parent_pool_size, crossover_size)
parent2_indices = cupy.random.randint(0, parent_pool_size, crossover_size)
# Get parents directly from the sorted solutions array using the pool-sized indices
d_parents1 = d_solutions[parent1_indices]
d_parents2 = d_solutions[parent2_indices]
d_crossover_solutions = (d_parents1 + d_parents2) / 2.0
# Blend Crossover (BLX-alpha)
alpha = options.blend_alpha
# Find min/max for all parent pairs
d_p_min = cupy.minimum(d_parents1, d_parents2)
d_p_max = cupy.maximum(d_parents1, d_parents2)
# Calculate range (I)
d_parent_range = d_p_max - d_p_min
# Calculate new min/max for the expanded range
d_new_min = d_p_min - (alpha * d_parent_range)
d_new_max = d_p_max + (alpha * d_parent_range)
# Create a new random child within the expanded range
d_crossover_solutions = cupy.random.uniform(d_new_min, d_new_max, crossover_size)
# 3. Mutation
# Select from the full list (indices 0 to data_size-1)