Source code for allensdk.internal.model.glif.glif_optimizer

import logging

import numpy as np

import time

from scipy.optimize import fminbound, fmin
from scipy.optimize import minimize

import json

from uuid import uuid4

import allensdk.internal.model.glif.error_functions as error_functions

# TODO: clean up
# TODO: license
# TODO: document

[docs]class GlifOptimizer(object): def __init__(self, experiment, dt, outer_iterations, inner_iterations, sigma_outer, sigma_inner, param_fit_names, stim, xtol, ftol, internal_iterations, bessel, error_function = None, error_function_data = None, init_params = None): self.start_time = None self.rng = np.random.RandomState() self.experiment = experiment self.dt = dt self.outer_iterations = outer_iterations self.inner_iterations = inner_iterations self.init_params = init_params self.sigma_outer = sigma_outer self.sigma_inner = sigma_inner self.param_fit_names = param_fit_names self.stim = stim # use MLIN by default if error_function is None: error_function = error_functions.MLIN_list_error self.error_function = error_function self.error_function_data = error_function_data self.xtol = xtol self.ftol = ftol self.internal_iterations = internal_iterations self.bessel = bessel logging.info('internal_iterations: %s' % internal_iterations) logging.info('outer_iterations: %s' % outer_iterations) logging.info('inner_iterations: %s' % inner_iterations) self.iteration_info = []; expected_param_count = experiment.neuron_parameter_count() if self.init_params is None: self.init_params = np.ones(expected_param_count) elif len(self.init_params) != expected_param_count: self.init_params = np.ones(expected_param_count) logging.warning('optimizer init_params has wrong length (given %d, expected %d). settings to all ones' % (len(self.init_params), expected_param_count))
[docs] def to_dict(self): return { 'outer_iterations': self.outer_iterations, 'inner_iterations': self.inner_iterations, 'init_params': self.init_params, 'sigma_outer': self.sigma_outer, 'sigma_inner': self.sigma_inner, 'param_fit_names': self.param_fit_names, 'xtol': self.xtol, 'ftol': self.ftol, 'internal_iterations': self.internal_iterations, 'iteration_info': self.iteration_info, 'bessel': self.bessel }
[docs] def randomize_parameter_values(self, values, sigma): values = np.array(self.rng.normal(values, sigma)) # values might not have a shape if it's a single element long, depending on your numpy version if not values.shape: values = np.array([values]) return values
[docs] def initiate_unique_seed(self, seed=None): if seed == None: x1=str(int(uuid4())) #get a uuid, turn it into int then turn it into string x2=[x1[ii:ii+8]for ii in range(0,40,8)] #break it up into chunks x3=[int(ii) for ii in x2]#turn string chunks back into integers print('seed', x3) self.rng.seed(x3) else: self.rng.seed(seed)
[docs] def evaluate(self, x, dt_multiplier=100): self.experiment.neuron.dt_multiplier = dt_multiplier return self.error_function([x], self.experiment, self.error_function_data)
[docs] def run_many(self, iteration_finished_callback=None, seed=None): self.initiate_unique_seed(seed=seed) params_start = self.init_params self.start_time = time.time() params=params_start # params=self.randomize_parameter_values(params_start, self.sigma_outer) print('actual starting parameters', params) stop_flag=False # TODO: unhardcode this dt_multiplier_list = [100, 32, 10] #Note the following line may be useful when there are more iteration but is hasnt been tested # dt_multiplier_list = np.ceil(np.logspace(1,2,self.inner_iterations))[::-1].astype(int) print(dt_multiplier_list) # dt_multiplier_list = [10,10,10] #TODO: figure out the implications of this being an int versus float #TODO: make this so that dt multiplier actually gets set for outer in range(0, self.outer_iterations): #outerloop for inner in range(0, self.inner_iterations): #innerloop iteration_start_time = time.time() # run the optimizer once. first time is always the passed initial conditions. # print('dt_multiplier_list[inner]', dt_multiplier_list[inner]) #--set this equal to 1 if want to do it slow self.experiment.neuron.dt_multiplier = dt_multiplier_list[inner] #self.experiment.neuron.dt_multiplier = 10 opt = self.run_once(params) xopt, fopt = opt[0], opt[1] logging.info('fmin took %f secs, %f mins, %f hours' % (time.time() - iteration_start_time, (time.time() - iteration_start_time)/60, (time.time() - iteration_start_time)/60/60)) self.iteration_info.append({ 'in_params': np.array(params).tolist(), 'out_params': xopt.tolist(), 'error': float(fopt), 'dt_multiplier': self.experiment.neuron.dt_multiplier }) # ER=self.iteration_info['error'] # ETOL=1.e-4 # if len(ER) >=3: # #!!!!!!!!!!!!!!!fix this to use that actual parameters!!!!!!!!!!!!!!!!!!!!!! # if np.abs(ER[-1]-ER[-2])<ETOL and np.abs(ER[-1]-ER[-3])<ETOL and np.abs(ER[-2]-ER[-3])<ETOL: # stop_flag=True if iteration_finished_callback is not None: iteration_finished_callback(self, outer, inner) if stop_flag is True: break # randomize the best fit parameters params = self.randomize_parameter_values(xopt, self.sigma_inner) #params = xtol*(1-self.eps/2+self.eps*np.random.random(len(params),)) #---Calculate the other fitness functions '''Add other fitness functions here first gotta calculate the spike trains again also look and see what the difference between the size of a pickle file and a text file is to decide how to save stimulus and traces''' #Take the current best values and run the program again. #tempTimeIterStart=time.time() #(voltage_list, threshold_list, AScurrentMatrix_list, gridSpikeTime_list, interpolatedSpikeTime_list, \ # gridSpikeIndex_list, interpolatedSpikeVoltage_list, interpolatedSpikeThreshold_list) = \ # self.experiment.run_base_model(xtol) #timeFor1Iter=time.time()-tempTimeIterStart # outer loop uses the outer standard deviation to randomize the initial values if stop_flag is True: break params = self.randomize_parameter_values(self.init_params, self.sigma_outer) # get the best one! min_error = float("inf") min_i = -1 min_dt_multiplier = float("inf") for i, info in enumerate(self.iteration_info): if info['dt_multiplier'] < min_dt_multiplier: min_dt_multiplier = info['dt_multiplier'] for i, info in enumerate(self.iteration_info): if info['error'] < min_error and info['dt_multiplier'] == min_dt_multiplier: min_error = info['error'] min_i = i best_params = self.iteration_info[min_i]['out_params'] self.experiment.set_neuron_parameters(best_params) logging.info('done optimizing') return best_params, self.init_params
[docs] def run_once_bound(self, low_bound, high_bound): ''' @param low_bound: a scalar initial guess for the optimizer @param high_bound: a scalar high bound for the optimizer @return: tuple including parameters that optimize function and value - see fmin docs ''' return fminbound(self.error_function, low_bound, high_bound, args=(self.experiment,self.error_function_data), maxfun=200, full_output=True )
#Note is defined in the top level script
[docs] def run_once(self, param0): ''' @param param0: a list of the initial guesses for the optimizer @return: tuple including parameters that optimize function and value - see fmin docs ''' # fmin(func, x0, args=(), xtol=1e-4, ftol=1e-4, maxiter=None, maxfun=None, full_output=0, disp=1, retall=0, callback=None): print('self.error_function_data', self.error_function_data) xopt, fopt, _, _, _, _ = fmin(self.error_function, param0, args=(self.experiment,self.error_function_data),xtol=self.xtol, ftol=self.ftol, maxiter=self.internal_iterations, maxfun=self.internal_iterations, retall=1,full_output=1, disp=1) return xopt, fopt
# res = minimize(self.error_function, param0, # method='Nelder-Mead', # args=(self.experiment,self.error_function_data), # options={ # 'maxiter':self.internal_iterations, # 'xtol':self.xtol, # 'ftol':self.ftol, # 'maxfun':self.internal_iterations, # 'retall':1, # 'full_output':1, # 'disp':1} # ) # res = minimize(self.error_function, param0, # jac=False, # method='BFGS', # args=(self.experiment,self.error_function_data), # options={ # 'maxiter':self.internal_iterations, # 'epsilon':1e-8, # 'gtol':1e-5, # 'full_output':1, # 'disp':1} # ) # # # print(res) # return res.x, res.fun # #Note is defined in the top level script # def mycallback_ncg(xk): # print('Using Newton-CG method, xk: ', xk) # def mycallback_nm(xk): # print('Using Nelder-Mead method, xk: ', xk) # eps=1e-15 # options={} # # options['avextox']=eps # options['maxiter']=500 ## options['full_output']=True ## options['disp']=True ## options['retall']=True # # print('Using Newton-CG method') # iteration_start_time = time.time() # xopt = minimize(self.error_function, param0, args=(self.experiment,), method='Newton-CG', jac=f_prime_constructor(self.error_function), callback=mycallback_ncg, options=options, tol=eps) # print('Newton-CG method took', (time.time()-iteration_start_time)/60., 'seconds') # ## print('Using Nelder-Mead method') ## iteration_start_time = time.time() ## xopt = minimize(self.error_function, param0, args=(self.experiment,), method='Nelder-Mead', callback=mycallback_nm, options=options, tol=eps) ## print('Nelder-Mead method took', (time.time()-iteration_start_time)/60., 'seconds') # # print(xopt) # return xopt, fopt