|
|
|
|
@@ -25,11 +25,10 @@ extern "C" __global__ void fitness_kernel(
|
|
|
|
|
int idx = threadIdx.x + blockIdx.x * blockDim.x;
|
|
|
|
|
if (idx < size)
|
|
|
|
|
{
|
|
|
|
|
double ans = 0;
|
|
|
|
|
int lrgst_expo = num_coefficients - 1;
|
|
|
|
|
for (int i = 0; i < num_coefficients; ++i)
|
|
|
|
|
double ans = coefficients[0];
|
|
|
|
|
for (int i = 1; i < num_coefficients; ++i)
|
|
|
|
|
{
|
|
|
|
|
ans += coefficients[i] * pow(x_vals[idx], (double)(lrgst_expo - i));
|
|
|
|
|
ans = ans * x_vals[idx] + coefficients[i];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ans -= y_val;
|
|
|
|
|
@@ -37,31 +36,6 @@ extern "C" __global__ void fitness_kernel(
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
_FITNESS_KERNEL_INT = """
|
|
|
|
|
extern "C" __global__ void fitness_kernel(
|
|
|
|
|
const long long* coefficients,
|
|
|
|
|
int num_coefficients,
|
|
|
|
|
const double* x_vals,
|
|
|
|
|
double* ranks,
|
|
|
|
|
int size,
|
|
|
|
|
double y_val)
|
|
|
|
|
{
|
|
|
|
|
int idx = threadIdx.x + blockIdx.x * blockDim.x;
|
|
|
|
|
if (idx < size)
|
|
|
|
|
{
|
|
|
|
|
double ans = 0;
|
|
|
|
|
int lrgst_expo = num_coefficients - 1;
|
|
|
|
|
for (int i = 0; i < num_coefficients; ++i)
|
|
|
|
|
{
|
|
|
|
|
ans += coefficients[i] * pow(x_vals[idx], (double)(lrgst_expo - i));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ans -= y_val;
|
|
|
|
|
ranks[idx] = (ans == 0) ? 1.7976931348623157e+308 : fabs(1.0 / ans);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class GA_Options:
|
|
|
|
|
@@ -70,18 +44,51 @@ class GA_Options:
|
|
|
|
|
|
|
|
|
|
Attributes:
|
|
|
|
|
min_range (float): The minimum value for the initial random solutions.
|
|
|
|
|
Default: -100.0
|
|
|
|
|
max_range (float): The maximum value for the initial random solutions.
|
|
|
|
|
Default: 100.0
|
|
|
|
|
num_of_generations (int): The number of iterations the algorithm will run.
|
|
|
|
|
sample_size (int): The number of top solutions to keep and return.
|
|
|
|
|
data_size (int): The total number of solutions generated in each generation.
|
|
|
|
|
mutation_percentage (float): The amount by which top solutions are mutated each generation.
|
|
|
|
|
Default: 10
|
|
|
|
|
sample_size (int): The number of top solutions to *return* at the end.
|
|
|
|
|
Default: 1000
|
|
|
|
|
data_size (int): The total number of solutions (population size)
|
|
|
|
|
generated in each generation. Default: 100000
|
|
|
|
|
mutation_strength (float): The percentage (e.g., 0.01 for 1%) by which
|
|
|
|
|
a solution is mutated. Default: 0.01
|
|
|
|
|
elite_ratio (float): The percentage (e.g., 0.05 for 5%) of the *best*
|
|
|
|
|
solutions to carry over to the next generation
|
|
|
|
|
unchanged (elitism). Default: 0.05
|
|
|
|
|
crossover_ratio (float): The percentage (e.g., 0.45 for 45%) of the next
|
|
|
|
|
generation to be created by "breeding" two
|
|
|
|
|
solutions from the parent pool. Default: 0.45
|
|
|
|
|
mutation_ratio (float): The percentage (e.g., 0.40 for 40%) of the next
|
|
|
|
|
generation to be created by mutating solutions
|
|
|
|
|
from the parent pool. Default: 0.40
|
|
|
|
|
"""
|
|
|
|
|
min_range: float = -100.0
|
|
|
|
|
max_range: float = 100.0
|
|
|
|
|
num_of_generations: int = 10
|
|
|
|
|
sample_size: int = 1000
|
|
|
|
|
data_size: int = 100000
|
|
|
|
|
mutation_percentage: float = 0.01
|
|
|
|
|
mutation_strength: float = 0.01
|
|
|
|
|
elite_ratio: float = 0.05
|
|
|
|
|
crossover_ratio: float = 0.45
|
|
|
|
|
mutation_ratio: float = 0.40
|
|
|
|
|
|
|
|
|
|
def __post_init__(self):
|
|
|
|
|
"""Validates the GA options after initialization."""
|
|
|
|
|
total_ratio = self.elite_ratio + self.crossover_ratio + self.mutation_ratio
|
|
|
|
|
if total_ratio > 1.0:
|
|
|
|
|
raise ValueError(
|
|
|
|
|
f"The sum of elite_ratio, crossover_ratio, and mutation_ratio must be <= 1.0, but got {total_ratio}"
|
|
|
|
|
)
|
|
|
|
|
if any(r < 0 for r in [self.elite_ratio, self.crossover_ratio, self.mutation_ratio]):
|
|
|
|
|
raise ValueError("GA ratios cannot be negative.")
|
|
|
|
|
if self.data_size < self.sample_size:
|
|
|
|
|
warnings.warn(
|
|
|
|
|
f"data_size ({self.data_size}) is less than sample_size ({self.sample_size}). "
|
|
|
|
|
"The number of returned solutions will be limited to data_size."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
class Function:
|
|
|
|
|
"""
|
|
|
|
|
@@ -176,7 +183,7 @@ class Function:
|
|
|
|
|
if self._largest_exponent == 0:
|
|
|
|
|
raise ValueError("Cannot differentiate a constant (Function of degree 0).")
|
|
|
|
|
|
|
|
|
|
return self.derivitive()
|
|
|
|
|
return self.derivative()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def derivative(self) -> 'Function':
|
|
|
|
|
@@ -269,8 +276,19 @@ class Function:
|
|
|
|
|
|
|
|
|
|
def _solve_x_numpy(self, y_val: float, options: GA_Options) -> np.ndarray:
|
|
|
|
|
"""Genetic algorithm implementation using NumPy (CPU)."""
|
|
|
|
|
elite_ratio = options.elite_ratio
|
|
|
|
|
crossover_ratio = options.crossover_ratio
|
|
|
|
|
mutation_ratio = options.mutation_ratio
|
|
|
|
|
|
|
|
|
|
data_size = options.data_size
|
|
|
|
|
|
|
|
|
|
elite_size = int(data_size * elite_ratio)
|
|
|
|
|
crossover_size = int(data_size * crossover_ratio)
|
|
|
|
|
mutation_size = int(data_size * mutation_ratio)
|
|
|
|
|
random_size = data_size - elite_size - crossover_size - mutation_size
|
|
|
|
|
|
|
|
|
|
# Create initial random solutions
|
|
|
|
|
solutions = np.random.uniform(options.min_range, options.max_range, options.data_size)
|
|
|
|
|
solutions = np.random.uniform(options.min_range, options.max_range, data_size)
|
|
|
|
|
|
|
|
|
|
for _ in range(options.num_of_generations):
|
|
|
|
|
# Calculate fitness for all solutions (vectorized)
|
|
|
|
|
@@ -283,40 +301,75 @@ class Function:
|
|
|
|
|
sorted_indices = np.argsort(-ranks)
|
|
|
|
|
solutions = solutions[sorted_indices]
|
|
|
|
|
|
|
|
|
|
# Keep only the top solutions
|
|
|
|
|
top_solutions = solutions[:options.sample_size]
|
|
|
|
|
|
|
|
|
|
# For the next generation, start with the mutated top solutions
|
|
|
|
|
# and fill the rest with new random values.
|
|
|
|
|
mutation_factors = np.random.uniform(
|
|
|
|
|
1 - options.mutation_percentage,
|
|
|
|
|
1 + options.mutation_percentage,
|
|
|
|
|
options.sample_size
|
|
|
|
|
)
|
|
|
|
|
mutated_solutions = top_solutions * mutation_factors
|
|
|
|
|
|
|
|
|
|
new_random_solutions = np.random.uniform(
|
|
|
|
|
options.min_range, options.max_range, options.data_size - options.sample_size
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
solutions = np.concatenate([mutated_solutions, new_random_solutions])
|
|
|
|
|
# --- Create the next generation ---
|
|
|
|
|
|
|
|
|
|
# Final sort of the best solutions from the last generation
|
|
|
|
|
final_solutions = np.sort(solutions[:options.sample_size])
|
|
|
|
|
return final_solutions
|
|
|
|
|
# 1. Elitism: Keep the best solutions as-is
|
|
|
|
|
elite_solutions = solutions[:elite_size]
|
|
|
|
|
|
|
|
|
|
# Define a "parent pool" of the top 50% of solutions to breed from
|
|
|
|
|
parent_pool = solutions[:data_size // 2]
|
|
|
|
|
|
|
|
|
|
# 2. Crossover: Breed two parents to create a child
|
|
|
|
|
# Select from the full list (indices 0 to data_size-1)
|
|
|
|
|
parent1_indices = np.random.randint(0, data_size, crossover_size)
|
|
|
|
|
parent2_indices = np.random.randint(0, data_size, crossover_size)
|
|
|
|
|
parents1 = solutions[parent1_indices]
|
|
|
|
|
parents2 = solutions[parent2_indices]
|
|
|
|
|
# Simple "average" crossover
|
|
|
|
|
crossover_solutions = (parents1 + parents2) / 2.0
|
|
|
|
|
|
|
|
|
|
# 3. Mutation:
|
|
|
|
|
# Select from the full list (indices 0 to data_size-1)
|
|
|
|
|
mutation_candidates = solutions[np.random.randint(0, data_size, mutation_size)]
|
|
|
|
|
|
|
|
|
|
# Use mutation_strength (the new name)
|
|
|
|
|
mutation_factors = np.random.uniform(
|
|
|
|
|
1 - options.mutation_strength,
|
|
|
|
|
1 + options.mutation_strength,
|
|
|
|
|
mutation_size
|
|
|
|
|
)
|
|
|
|
|
mutated_solutions = mutation_candidates * mutation_factors
|
|
|
|
|
|
|
|
|
|
# 4. New Randoms: Add new blood to prevent getting stuck
|
|
|
|
|
random_solutions = np.random.uniform(options.min_range, options.max_range, random_size)
|
|
|
|
|
|
|
|
|
|
# Assemble the new generation
|
|
|
|
|
solutions = np.concatenate([
|
|
|
|
|
elite_solutions,
|
|
|
|
|
crossover_solutions,
|
|
|
|
|
mutated_solutions,
|
|
|
|
|
random_solutions
|
|
|
|
|
])
|
|
|
|
|
|
|
|
|
|
# --- Final Step: Return the best results ---
|
|
|
|
|
# After all generations, do one last ranking to find the best solutions
|
|
|
|
|
y_calculated = np.polyval(self.coefficients, solutions)
|
|
|
|
|
error = y_calculated - y_val
|
|
|
|
|
ranks = np.where(error == 0, np.finfo(float).max, np.abs(1.0 / error))
|
|
|
|
|
sorted_indices = np.argsort(-ranks)
|
|
|
|
|
|
|
|
|
|
# Get the top 'sample_size' solutions the user asked for
|
|
|
|
|
best_solutions = solutions[sorted_indices][:options.sample_size]
|
|
|
|
|
|
|
|
|
|
return np.sort(best_solutions)
|
|
|
|
|
|
|
|
|
|
def _solve_x_cuda(self, y_val: float, options: GA_Options) -> np.ndarray:
|
|
|
|
|
"""Genetic algorithm implementation using CuPy (GPU/CUDA)."""
|
|
|
|
|
|
|
|
|
|
elite_ratio = options.elite_ratio
|
|
|
|
|
crossover_ratio = options.crossover_ratio
|
|
|
|
|
mutation_ratio = options.mutation_ratio
|
|
|
|
|
|
|
|
|
|
# Check the dtype of our coefficients array
|
|
|
|
|
if self.coefficients.dtype == np.float64:
|
|
|
|
|
fitness_gpu = cupy.RawKernel(_FITNESS_KERNEL_FLOAT, 'fitness_kernel')
|
|
|
|
|
d_coefficients = cupy.array(self.coefficients, dtype=cupy.float64)
|
|
|
|
|
elif self.coefficients.dtype == np.int64:
|
|
|
|
|
fitness_gpu = cupy.RawKernel(_FITNESS_KERNEL_INT, 'fitness_kernel')
|
|
|
|
|
d_coefficients = cupy.array(self.coefficients, dtype=cupy.int64)
|
|
|
|
|
else:
|
|
|
|
|
raise TypeError(f"Unsupported dtype for CUDA solver: {self.coefficients.dtype}")
|
|
|
|
|
data_size = options.data_size
|
|
|
|
|
|
|
|
|
|
elite_size = int(data_size * elite_ratio)
|
|
|
|
|
crossover_size = int(data_size * crossover_ratio)
|
|
|
|
|
mutation_size = int(data_size * mutation_ratio)
|
|
|
|
|
random_size = data_size - elite_size - crossover_size - mutation_size
|
|
|
|
|
|
|
|
|
|
# ALWAYS cast coefficients to float64 for the kernel.
|
|
|
|
|
fitness_gpu = cupy.RawKernel(_FITNESS_KERNEL_FLOAT, 'fitness_kernel')
|
|
|
|
|
d_coefficients = cupy.array(self.coefficients, dtype=cupy.float64)
|
|
|
|
|
|
|
|
|
|
# Create initial random solutions on the GPU
|
|
|
|
|
d_solutions = cupy.random.uniform(
|
|
|
|
|
@@ -339,27 +392,62 @@ class Function:
|
|
|
|
|
sorted_indices = cupy.argsort(-d_ranks)
|
|
|
|
|
d_solutions = d_solutions[sorted_indices]
|
|
|
|
|
|
|
|
|
|
if i + 1 == options.num_of_generations:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
# Get top solutions
|
|
|
|
|
d_top_solutions = d_solutions[:options.sample_size]
|
|
|
|
|
|
|
|
|
|
# Mutate top solutions on the GPU
|
|
|
|
|
mutation_factors = cupy.random.uniform(
|
|
|
|
|
1 - options.mutation_percentage, 1 + options.mutation_percentage, options.sample_size
|
|
|
|
|
)
|
|
|
|
|
d_mutated = d_top_solutions * mutation_factors
|
|
|
|
|
# --- Create the next generation ---
|
|
|
|
|
|
|
|
|
|
# Create new random solutions for the rest
|
|
|
|
|
d_new_random = cupy.random.uniform(
|
|
|
|
|
options.min_range, options.max_range, options.data_size - options.sample_size
|
|
|
|
|
# 1. Elitism
|
|
|
|
|
d_elite_solutions = d_solutions[:elite_size]
|
|
|
|
|
|
|
|
|
|
# Define a "parent pool" of the top 50% of solutions to breed from
|
|
|
|
|
d_parent_pool = d_solutions[:data_size // 2]
|
|
|
|
|
parent_pool_size = d_parent_pool.size
|
|
|
|
|
|
|
|
|
|
# 2. Crossover
|
|
|
|
|
# Select from the full list (indices 0 to data_size-1)
|
|
|
|
|
parent1_indices = cupy.random.randint(0, data_size, crossover_size)
|
|
|
|
|
parent2_indices = cupy.random.randint(0, data_size, crossover_size)
|
|
|
|
|
d_parents1 = d_solutions[parent1_indices]
|
|
|
|
|
d_parents2 = d_solutions[parent2_indices]
|
|
|
|
|
d_crossover_solutions = (d_parents1 + d_parents2) / 2.0
|
|
|
|
|
|
|
|
|
|
# 3. Mutation
|
|
|
|
|
# Select from the full list (indices 0 to data_size-1)
|
|
|
|
|
mutation_indices = cupy.random.randint(0, data_size, mutation_size)
|
|
|
|
|
d_mutation_candidates = d_solutions[mutation_indices]
|
|
|
|
|
|
|
|
|
|
# Use mutation_strength (the new name)
|
|
|
|
|
d_mutation_factors = cupy.random.uniform(
|
|
|
|
|
1 - options.mutation_strength,
|
|
|
|
|
1 + options.mutation_strength,
|
|
|
|
|
mutation_size
|
|
|
|
|
)
|
|
|
|
|
d_mutated_solutions = d_mutation_candidates * d_mutation_factors
|
|
|
|
|
|
|
|
|
|
# 4. New Randoms
|
|
|
|
|
d_random_solutions = cupy.random.uniform(
|
|
|
|
|
options.min_range, options.max_range, random_size, dtype=cupy.float64
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
d_solutions = cupy.concatenate([d_mutated, d_new_random])
|
|
|
|
|
# Assemble the new generation
|
|
|
|
|
d_solutions = cupy.concatenate([
|
|
|
|
|
d_elite_solutions,
|
|
|
|
|
d_crossover_solutions,
|
|
|
|
|
d_mutated_solutions,
|
|
|
|
|
d_random_solutions
|
|
|
|
|
])
|
|
|
|
|
|
|
|
|
|
# --- Final Step: Return the best results ---
|
|
|
|
|
# After all generations, do one last ranking to find the best solutions
|
|
|
|
|
fitness_gpu(
|
|
|
|
|
(blocks_per_grid,), (threads_per_block,),
|
|
|
|
|
(d_coefficients, d_coefficients.size, d_solutions, d_ranks, d_solutions.size, y_val)
|
|
|
|
|
)
|
|
|
|
|
sorted_indices = cupy.argsort(-d_ranks)
|
|
|
|
|
|
|
|
|
|
# Get the top 'sample_size' solutions
|
|
|
|
|
d_best_solutions = d_solutions[sorted_indices][:options.sample_size]
|
|
|
|
|
|
|
|
|
|
# Get the final sample, sort it, and copy back to CPU
|
|
|
|
|
final_solutions_gpu = cupy.sort(d_solutions[:options.sample_size])
|
|
|
|
|
final_solutions_gpu = cupy.sort(d_best_solutions)
|
|
|
|
|
return final_solutions_gpu.get()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -481,36 +569,52 @@ class Function:
|
|
|
|
|
def __imul__(self, other: Union['Function', int, float]) -> 'Function':
|
|
|
|
|
"""Performs in-place multiplication by a scalar (func *= 3)."""
|
|
|
|
|
|
|
|
|
|
self.coefficients *= other
|
|
|
|
|
self._check_initialized()
|
|
|
|
|
|
|
|
|
|
if isinstance(other, (int, float)):
|
|
|
|
|
if other == 0:
|
|
|
|
|
self.coefficients = np.array([0], dtype=self.coefficients.dtype)
|
|
|
|
|
self._largest_exponent = 0
|
|
|
|
|
else:
|
|
|
|
|
self.coefficients *= other
|
|
|
|
|
|
|
|
|
|
elif isinstance(other, self.__class__):
|
|
|
|
|
other._check_initialized()
|
|
|
|
|
self.coefficients = np.polymul(self.coefficients, other.coefficients)
|
|
|
|
|
self._largest_exponent = len(self.coefficients) - 1
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
return NotImplemented
|
|
|
|
|
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def quadratic_solve(f: Function) -> Optional[List[float]]:
|
|
|
|
|
"""
|
|
|
|
|
Calculates the real roots of a quadratic function using the quadratic formula.
|
|
|
|
|
def quadratic_solve(self) -> Optional[List[float]]:
|
|
|
|
|
"""
|
|
|
|
|
Calculates the real roots of a quadratic function using the quadratic formula.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
f (Function): A Function object of degree 2.
|
|
|
|
|
Args:
|
|
|
|
|
f (Function): A Function object of degree 2.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Optional[List[float]]: A list containing the two real roots, or None if there are no real roots.
|
|
|
|
|
"""
|
|
|
|
|
f._check_initialized()
|
|
|
|
|
if f.largest_exponent != 2:
|
|
|
|
|
raise ValueError("Input function must be quadratic (degree 2).")
|
|
|
|
|
Returns:
|
|
|
|
|
Optional[List[float]]: A list containing the two real roots, or None if there are no real roots.
|
|
|
|
|
"""
|
|
|
|
|
self._check_initialized()
|
|
|
|
|
if self.largest_exponent != 2:
|
|
|
|
|
raise ValueError("Input function must be quadratic (degree 2) to use quadratic_solve.")
|
|
|
|
|
|
|
|
|
|
a, b, c = f.coefficients
|
|
|
|
|
a, b, c = self.coefficients
|
|
|
|
|
|
|
|
|
|
discriminant = (b**2) - (4*a*c)
|
|
|
|
|
discriminant = (b**2) - (4*a*c)
|
|
|
|
|
|
|
|
|
|
if discriminant < 0:
|
|
|
|
|
return None # No real roots
|
|
|
|
|
|
|
|
|
|
sqrt_discriminant = math.sqrt(discriminant)
|
|
|
|
|
root1 = (-b + sqrt_discriminant) / (2 * a)
|
|
|
|
|
root2 = (-b - sqrt_discriminant) / (2 * a)
|
|
|
|
|
if discriminant < 0:
|
|
|
|
|
return None # No real roots
|
|
|
|
|
|
|
|
|
|
return [root1, root2]
|
|
|
|
|
sqrt_discriminant = math.sqrt(discriminant)
|
|
|
|
|
root1 = (-b + sqrt_discriminant) / (2 * a)
|
|
|
|
|
root2 = (-b - sqrt_discriminant) / (2 * a)
|
|
|
|
|
|
|
|
|
|
return [root1, root2]
|
|
|
|
|
|
|
|
|
|
# Example Usage
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|