Source code for netpyne.batch.grid

"""
Module for grid search parameter optimization and exploration

"""

# required to make json saving work in Python 2/3
try:
    to_unicode = unicode
except NameError:
    to_unicode = str

import pandas as pd
import os, sys
import glob
from time import sleep
from itertools import product
from subprocess import Popen, PIPE
import importlib, types

from neuron import h
from .utils import jobStringHPCSlurm, jobStringHPCTorque, jobStringHPCSGE
from .utils import createFolder

pc = h.ParallelContext()  # use bulletin board master/slave


# -------------------------------------------------------------------------------
# function to run single job using ParallelContext bulletin board (master/slave)
# -------------------------------------------------------------------------------

# func needs to be outside of class
[docs] def runJob(script, cfgSavePath, netParamsSavePath, processes, jobName): """ Function for/to <short description of `netpyne.batch.grid.runJob`> Parameters ---------- script : <type> <Short description of script> **Default:** *required* cfgSavePath : <type> <Short description of cfgSavePath> **Default:** *required* netParamsSavePath : <type> <Short description of netParamsSavePath> **Default:** *required* """ print('\nJob in rank id: ', pc.id()) command = "nrniv %s simConfig=%s netParams=%s" % (script, cfgSavePath, netParamsSavePath) print(command + '\n') stdout = open(jobName + '.run', 'w') stderr = open(jobName + '.err', 'w') proc = Popen(command.split(' '), stdout=PIPE, stderr=PIPE) stdout.write(proc.stdout.read().decode()) stderr.write(proc.stderr.read().decode()) processes.append(proc)
# ------------------------------------------------------------------------------- # Get parameter combinations # -------------------------------------------------------------------------------
[docs] def getParamCombinations(batch): indices = [] values = [] filenames = [] combData = {} # generate param combinations ( groupedParams, ungroupedParams, indexCombGroups, valueCombGroups, indexCombinations, valueCombinations, labelList, valuesList, ) = generateParamCombinations(batch) for iCombG, pCombG in zip(indexCombGroups, valueCombGroups): for iCombNG, pCombNG in zip(indexCombinations, valueCombinations): if groupedParams and ungroupedParams: # temporary hack - improve iComb = iCombG + iCombNG pComb = pCombG + pCombNG elif ungroupedParams: iComb = iCombNG pComb = pCombNG elif groupedParams: iComb = iCombG pComb = pCombG else: iComb = [] pComb = [] # set simLabel and jobName simLabel = batch.batchLabel + ''.join([''.join('_' + str(i)) for i in iComb]) jobName = batch.saveFolder + '/' + simLabel indices.append(iComb) values.append(pComb) filenames.append(jobName) return {'indices': indices, 'values': values, 'labels': labelList, 'filenames': filenames}
# ------------------------------------------------------------------------------- # Generate parameter combinations # -------------------------------------------------------------------------------
[docs] def generateParamCombinations(batch): # iterate over all param combinations groupedParams = False ungroupedParams = False for p in batch.params: if 'group' not in p: p['group'] = False ungroupedParams = True elif p['group'] == True: groupedParams = True if ungroupedParams: labelList, valuesList = zip(*[(p['label'], p['values']) for p in batch.params if p['group'] == False]) valueCombinations = list(product(*(valuesList))) indexCombinations = list(product(*[range(len(x)) for x in valuesList])) else: valueCombinations = [(0,)] # this is a hack -- improve! indexCombinations = [(0,)] labelList = () valuesList = () if groupedParams: labelListGroup, valuesListGroup = zip(*[(p['label'], p['values']) for p in batch.params if p['group'] == True]) valueCombGroups = zip(*(valuesListGroup)) indexCombGroups = zip(*[range(len(x)) for x in valuesListGroup]) labelList = labelListGroup + labelList else: valueCombGroups = [(0,)] # this is a hack -- improve! indexCombGroups = [(0,)] return ( groupedParams, ungroupedParams, indexCombGroups, valueCombGroups, indexCombinations, valueCombinations, labelList, valuesList, )
# ------------------------------------------------------------------------------- # Get parameter combinations # -------------------------------------------------------------------------------
[docs] def gridSearch(batch, pc): """ Function for/to <short description of `netpyne.batch.grid.gridSearch`> Parameters ---------- batch : <type> <Short description of batch> **Default:** *required* pc : <type> <Short description of pc> **Default:** *required* """ # create main sim directory and save scripts batch.saveScripts() netParamsSavePath = batch.netParamsSavePath # set initial cfg initCfg if len(batch.initCfg) > 0: for paramLabel, paramVal in batch.initCfg.items(): batch.setCfgNestedParam(paramLabel, paramVal) if batch.runCfg.get('type', None) == 'mpi_bulletin': pc.runworker() # only 1 runworker needed in rank0 processes, processFiles = [], [] if batch.method == 'list': paramListFile = batch.runCfg.get('paramListFile', 'params.csv') paramLines = pd.read_csv(paramListFile) paramLabels = list(paramLines.columns) print(f'Running {len(paramLines)} simulations from {paramListFile}') for row in paramLines.itertuples(): for paramLabel, paramVal in zip(paramLabels, row[1:]): # 0th element is Index batch.setCfgNestedParam(paramLabel, paramVal) print(f'{paramLabel} = {paramVal}') # set simLabel and jobName simLabel = f'{batch.batchLabel}{row.Index}' jobName = f'{batch.saveFolder}/{simLabel}' gridSubmit(batch, pc, netParamsSavePath, jobName, simLabel, processes, processFiles) elif batch.method == 'grid': # iterate over all param combinations # generate param combinations combinationsData = getParamCombinations(batch) for jobName, iComb, comb in zip( combinationsData['filenames'], combinationsData['indices'], combinationsData['values'] ): print(iComb, comb) for i, paramVal in enumerate(comb): paramLabel = combinationsData['labels'][i] batch.setCfgNestedParam(paramLabel, paramVal) print(str(paramLabel) + ' = ' + str(paramVal)) simLabel = jobName.split('/')[-1] gridSubmit(batch, pc, netParamsSavePath, jobName, simLabel, processes, processFiles) else: print("Error: invalid batch.method selected; valid types are 'list', 'grid'") print("-" * 80) print(" Finished creating jobs for parameter exploration ") print("-" * 80) if batch.runCfg.get('type', None) == 'mpi_bulletin': while pc.working(): pass outfiles = [] for procFile in processFiles: outfiles.append(open(procFile, 'r')) # note: while the process is running the poll() method will return None # depending on the platform or the way the source file is executed (e.g. if run using mpiexec), # the stored processes ids might correspond to completed processes # and therefore return 1 (even though nrniv processes are still running) while any([proc.poll() is None for proc in processes]): for i, proc in enumerate(processes): newline = outfiles[i].readline() if len(newline) > 1: print(newline, end='') # sleep(sleepInterval) # attempt to terminate completed processes for proc in processes: try: proc.terminate() except: pass pc.done()
# TODO: line below was commented out due to issue on Netpyne-UI (https://dura-bernallab.slack.com/archives/C02UT6WECEL/p1671724489096569?thread_ts=1671646899.368969&cid=C02UT6WECEL) # Needs to be re-visited. # h.quit()
[docs] def gridSubmit(batch, pc, netParamsSavePath, jobName, simLabel, processes, processFiles): # skip if output file already exists if batch.runCfg.get('skip', False) and glob.glob(jobName + '_data.json'): print('Skipping job %s since output file already exists...' % (jobName)) return elif batch.runCfg.get('skipCfg', False) and glob.glob(jobName + '_cfg.json'): print('Skipping job %s since cfg file already exists...' % (jobName)) return elif batch.runCfg.get('skipCustom', None) and glob.glob(jobName + batch.runCfg['skipCustom']): print('Skipping job %s since %s file already exists...' % (jobName, batch.runCfg['skipCustom'])) return # save simConfig json to saveFolder batch.cfg.simLabel = simLabel batch.cfg.saveFolder = batch.saveFolder cfgSavePath = batch.saveFolder + '/' + simLabel + '_cfg.json' batch.cfg.save(cfgSavePath) # read params or set defaults sleepInterval = batch.runCfg.get('sleepInterval', 1) nodes = batch.runCfg.get('nodes', 1) script = batch.runCfg.get('script', 'init.py') walltime = batch.runCfg.get('walltime', '00:30:00') folder = batch.runCfg.get('folder', '.') custom = batch.runCfg.get('custom', '') printOutput = batch.runCfg.get('printOutput', False) # hpc torque job submission if batch.runCfg.get('type', None) == 'hpc_torque': ppn = batch.runCfg.get('ppn', 1) mpiCommand = batch.runCfg.get('mpiCommand', 'mpiexec') queueName = batch.runCfg.get('queueName', 'default') numproc = nodes * ppn command = '%s -n %d nrniv -python -mpi %s simConfig=%s netParams=%s' % ( mpiCommand, numproc, script, cfgSavePath, netParamsSavePath, ) jobString = jobStringHPCTorque(jobName, walltime, queueName, nodes, ppn, jobName, custom, command) # Send job_string to qsub print('Submitting job ', jobName) print(jobString + '\n') batchfile = '%s.pbs' % (jobName) with open(batchfile, 'w') as text_file: text_file.write("%s" % jobString) proc = Popen(['qsub', batchfile], stderr=PIPE, stdout=PIPE) # Open a pipe to the qsub command. (output, input) = (proc.stdin, proc.stdout) elif batch.runCfg.get('type', None) == 'hpc_sge': # arguments for SGE submission script, default sge_args = { # def jobStringHPCSGE(jobName, walltime, vmem, queueName, cores, custom, command) 'jobName': simLabel, 'walltime': walltime, 'vmem': '32G', 'queueName': 'cpu.q', 'cores': 2, 'pre': '', 'post': '', 'mpiCommand': 'mpiexec', #'log': "~/qsub/{}".format(jobName) 'log': "{}/{}".format(os.getcwd(), jobName) } # runCfg just sge_args.update(batch.runCfg) #(batch, pc, netParamsSavePath, jobName, simLabel, processes, processFiles): sge_args['command'] = '%s -n $NSLOTS -hosts $(hostname) nrniv -python -mpi %s simConfig=%s netParams=%s' % ( sge_args['mpiCommand'], script, cfgSavePath, netParamsSavePath, ) jobString = jobStringHPCSGE( **sge_args ) # Send job_string to sbatch print('Submitting job ', jobName) print(jobString + '\n') batchfile = '%s.sh' % (jobName) with open(batchfile, 'w') as text_file: text_file.write("%s" % jobString) # subprocess.call proc = Popen(['qsub', batchfile], stdin=PIPE, stdout=PIPE) # Open a pipe to the qsub command. (output, input) = (proc.stdin, proc.stdout) # hpc slurm job submission elif batch.runCfg.get('type', None) == 'hpc_slurm': # read params or set defaults allocation = batch.runCfg.get('allocation', 'csd403') # NSG account coresPerNode = batch.runCfg.get('coresPerNode', 1) email = batch.runCfg.get('email', 'a@b.c') mpiCommand = batch.runCfg.get('mpiCommand', 'ibrun') reservation = batch.runCfg.get('reservation', None) numproc = nodes * coresPerNode command = '%s -n %d nrniv -python -mpi %s simConfig=%s netParams=%s' % ( mpiCommand, numproc, script, cfgSavePath, netParamsSavePath, ) jobString = jobStringHPCSlurm( simLabel, allocation, walltime, nodes, coresPerNode, jobName, email, reservation, custom, folder, command ) # Send job_string to sbatch print('Submitting job ', jobName) print(jobString + '\n') batchfile = '%s.sbatch' % (jobName) with open(batchfile, 'w') as text_file: text_file.write("%s" % jobString) # subprocess.call proc = Popen(['sbatch', batchfile], stdin=PIPE, stdout=PIPE) # Open a pipe to the qsub command. (output, input) = (proc.stdin, proc.stdout) # run mpi jobs directly e.g. if have 16 cores, can run 4 jobs * 4 cores in parallel # eg. usage: python batch.py elif batch.runCfg.get('type', None) == 'mpi_direct': jobName = batch.saveFolder + '/' + simLabel print('Running job ', jobName) cores = batch.runCfg.get('cores', 1) mpiCommand = batch.runCfg.get('mpiCommand', 'mpirun') command = '%s -n %d nrniv -python -mpi %s simConfig=%s netParams=%s' % ( mpiCommand, cores, script, cfgSavePath, netParamsSavePath, ) print(command + '\n') proc = Popen(command.split(' '), stdout=open(jobName + '.run', 'w'), stderr=open(jobName + '.err', 'w')) processes.append(proc) processFiles.append(jobName + '.run') # pc bulletin board job submission (master/slave) via mpi # eg. usage: mpiexec -n 4 nrniv -mpi batch.py elif batch.runCfg.get('type', None) == 'mpi_bulletin': script = batch.runCfg.get('script', 'init.py') jobName = batch.saveFolder + '/' + simLabel print('Submitting job ', jobName) # master/slave bulletin board scheduling of jobs pc.submit(runJob, script, cfgSavePath, netParamsSavePath, processes, jobName) print('Saving output to: ', jobName + '.run') print('Saving errors to: ', jobName + '.err') print('') else: print(batch.runCfg) print( "Error: invalid runCfg 'type' selected; valid types are 'mpi_bulletin', 'mpi_direct', 'hpc_slurm', 'hpc_torque'" ) sys.exit(0)