"""
Module with functions to import from and export to SONATA format
"""
import os
import sys
try:
import tables # requires installing hdf5 via brew and tables via pip!
from neuroml.hdf5.NeuroMLXMLParser import NeuroMLXMLParser
from neuroml.loaders import read_neuroml2_file
from pyneuroml import pynml
from . import neuromlFormat # import NetPyNEBuilder
from netpyne.support.nml_reader import NMLTree
except ModuleNotFoundError as error:
# soft-fail and suggest which packages to install
from neuron import h
pc = h.ParallelContext() # MPI: Initialize the ParallelContext class
if int(pc.id()) == 0: # only print for master node
needed = [error.name]
for pkg in ['tables', 'pyneuroml', 'neuroml']:
try:
if pkg not in needed:
__import__(pkg)
except ModuleNotFoundError as error:
needed.append(error.name)
print(
'Note: SONATA import failed; import/export functions for SONATA will not be available.\n'
+ ' To use this feature install these Python packages: ',
needed,
)
except ImportError as error:
from neuron import h
pc = h.ParallelContext() # MPI: Initialize the ParallelContext class
if int(pc.id()) == 0: # only print for master node
print('Note: SONATA import failed; import/export functions for SONATA will not be available.\n', error)
from . import neuronPyHoc
from .. import sim, specs
import neuron
from neuron import h
h.load_file('stdgui.hoc')
h.load_file('import3d.hoc')
import pprint
pp = pprint.PrettyPrinter(depth=6)
# ------------------------------------------------------------------------------------------------------------
# Helper functions (some adapted from https://github.com/NeuroML/NeuroMLlite/)
# ------------------------------------------------------------------------------------------------------------
def _parse_entry(w):
try:
return int(w)
except:
try:
return float(w)
except:
return w
[docs]
def load_csv_props(info_file):
"""
Load a generic csv file as used in Sonata
"""
info = {}
columns = {}
for line in open(info_file):
w = line.split()
if len(columns) == 0:
for i in range(len(w)):
columns[i] = _parse_entry(w[i])
else:
info[int(w[0])] = {}
for i in range(len(w)):
if i != 0:
info[int(w[0])][columns[i]] = _parse_entry(w[i])
return info
[docs]
def ascii_encode_dict(data):
ascii_encode = lambda x: x.encode('ascii') if (sys.version_info[0] == 2 and isinstance(x, unicode)) else x
return dict(map(ascii_encode, pair) for pair in data.items())
[docs]
def load_json(filename):
import json
with open(filename, 'r') as f:
data = json.load(f, object_hook=ascii_encode_dict)
return data
def _distributeCells(numCellsPop):
"""
Distribute cells across compute nodes using round-robin
"""
from .. import sim
hostCells = {}
for i in range(sim.nhosts):
hostCells[i] = []
for i in range(numCellsPop):
hostCells[sim.nextHost].append(i)
sim.nextHost += 1
if sim.nextHost >= sim.nhosts:
sim.nextHost = 0
if sim.cfg.verbose:
print(
(
"Distributed population of %i cells on %s hosts: %s, next: %s"
% (numCellsPop, sim.nhosts, hostCells, sim.nextHost)
)
)
return hostCells
# replace axon with AIS stub
[docs]
def fix_axon_peri(hobj):
"""Replace reconstructed axon with a stub
:param hobj: hoc object
"""
if hasattr(hobj, 'axon'):
for i, sec in enumerate(hobj.axon):
# h.delete_section(sec=sec)
hobj.axon[i] = None
for i, sec in enumerate(hobj.all):
if 'axon' in sec.name():
hobj.all[i] = None
hobj.all = [sec for sec in hobj.all if sec is not None]
hobj.axon = None
# h.execute('create axon[2]', hobj)
hobj.axon = [h.Section(name='axon[0]'), h.Section(name='axon[1]')]
hobj.axonal = []
for sec in hobj.axon:
sec.L = 30
sec.diam = 1
hobj.axonal.append(sec)
hobj.all.append(sec) # need to remove this comment
hobj.axon[0].connect(hobj.soma[0], 0.5, 0)
hobj.axon[1].connect(hobj.axon[0], 1, 0)
h.define_shape()
# replace axon with AIS stub (keep order)
[docs]
def fix_axon_peri_v2(hobj):
"""Replace reconstructed axon with a stub (keep order); BBP version
:param hobj: hoc object
"""
if hasattr(hobj, 'axon'):
for i, sec in enumerate(hobj.axon):
h.pt3dclear(sec=sec)
if i < 2:
sec.L = 30
sec.diam = 1
else:
sec.L = 1e-6
sec.diam = 1
h.define_shape()
[docs]
def fix_sec_nseg(secs, dL):
"""Set nseg of sections based on dL param: section.nseg = 1 + 2 * int(section.L / (2*dL))
:param secs: netpyne dictionary with all sections
:param dL: dL from config file
"""
for secName in secs:
secs[secName]['geom']['nseg'] = 1 + 2 * int(secs[secName]['geom']['L'] / (2 * dL))
[docs]
def swap_soma_xy(secs):
"""
Swap soma x and y coords so cylinder is vertical instead of horizontal
"""
for secName in [sec for sec in secs if 'soma' in sec]:
for i, pt in enumerate(secs[secName]['geom']['pt3d']):
secs[secName]['geom']['pt3d'][i] = (pt[1], pt[0], pt[2], pt[3])
# ------------------------------------------------------------------------------------------------------------
# Import SONATA
# ------------------------------------------------------------------------------------------------------------
[docs]
class SONATAImporter:
# ------------------------------------------------------------------------------------------------------------
# class constructor
# ------------------------------------------------------------------------------------------------------------
def __init__(self, **parameters):
print("Creating SONATAImporter %s..." % parameters)
self.parameters = parameters
self.current_node = None
self.current_node_group = None
self.current_edge = None
# check which are used
self.cell_info = {}
self.pop_comp_info = {}
self.syn_comp_info = {}
self.input_comp_info = {}
self.edges_info = {}
self.conn_info = {}
self.nodes_info = {}
self.pre_pop = None
self.post_pop = None
# added by salva
self.pop_id_from_type = {}
# ------------------------------------------------------------------------------------------------------------
# Import a network by reading all the SONATA files and creating the NetPyNE structures
# ------------------------------------------------------------------------------------------------------------
[docs]
def importNet(self, configFile, replaceAxon=True, setdLNseg=True, swapSomaXY=True):
self.configFile = configFile
self.replaceAxon = replaceAxon
self.setdLNseg = setdLNseg
self.swapSomaXY = swapSomaXY
# read config files
filename = os.path.abspath(configFile)
self.rootFolder = os.path.dirname(configFile)
self.config = load_json(filename)
self.substitutes = {
'../': '%s/../' % self.rootFolder,
'./': '%s/' % self.rootFolder,
'.': '%s/' % self.rootFolder,
'${configdir}': '%s' % self.rootFolder,
}
if 'network' in self.config:
self.network_config = load_json(self.subs(self.config['network']))
else:
self.network_config = self.config
if 'simulation' in self.config:
self.simulation_config = load_json(self.subs(self.config['simulation']))
else:
self.simulation_config = None
for m in self.network_config['manifest']:
path = self.subs(self.network_config['manifest'][m])
self.substitutes[m] = path
for m in self.simulation_config['manifest']:
path = self.subs(self.simulation_config['manifest'][m])
self.substitutes[m] = path
# create and initialize sim object
sim.initialize()
sim.cfg.verbose = 1
# create netpyne simConfig
self.createSimulationConfig()
# add compiled mod folder
if 'mechanisms_dir' in self.network_config['components']:
modFolder = self.subs(self.network_config['components']['mechanisms_dir']) + '/modfiles'
neuron.load_mechanisms(str(modFolder))
# create pops
self.createPops()
# create NetStims (before createCells since spkTimes added to NetStim pops)
self.createNetStims()
# create cells
self.createCells()
# create IClamps (after createCells so can can call sim.net.addStims())
self.createIClamps()
# create connections
self.createConns()
# print('STOP HERE TO AVOID SIMULATING')
# from IPython import embed; embed()
# ------------------------------------------------------------------------------------------------------------
# create simulation config
# ------------------------------------------------------------------------------------------------------------
[docs]
def createSimulationConfig(self):
print("\nCreating simulation configuration from %s" % (self.config['simulation']))
# set conditions required to replicate SONATA imported models
sim.cfg.pt3dRelativeToCellLocation = False # Make cell 3d points relative to the cell x,y,z location
sim.cfg.invertedYCoord = (
False # Make y-axis coordinate negative so they represent depth when visualized (0 at the top)
)
sim.cfg.allowSelfConns = True # allow self connections
# run
sim.cfg.duration = self.simulation_config['run']['tstop']
sim.cfg.dt = self.simulation_config['run']['dt']
sim.cfg.dL = self.simulation_config['run'][
'dL'
] # used to calculate nseg = 1 + 2int(L/(2dL)); for all sections?
sim.net.params.defaultThreshold = self.simulation_config['run']['spike_threshold']
sim.cfg.nsteps_block = self.simulation_config['run']['nsteps_block']
# conditions
sim.cfg.hParams = self.simulation_config['conditions']
# node sets
# import IPython; IPython.embed()
# try:
if 'node_sets_file' in self.simulation_config:
# print(self.substitutes)
# print(self.subs(self.rootFolder + '/' + self.simulation_config['node_sets_file']))
# TEMPORARY FIX - FIX!
sim.cfg.node_sets = load_json(
self.subs(self.rootFolder + '/' + self.simulation_config['node_sets_file']).replace('$BASE_DIR', '')
)
elif 'node_sets' in self.simulation_config:
sim.cfg.node_sets = self.simulation_config['node_sets']
# except:
# print('Could not load node_sets...')
# sim.cfg.node_sets = {}
# inputs - add as 'spkTimes' to external population
# output
sim.cfg.log_file = self.simulation_config['output']['log_file']
sim.cfg.simLabel = os.path.abspath(self.configFile)
sim.saveFolder = self.simulation_config['output']['output_dir']
sim.saveJson = True
# recording
for k, v in self.simulation_config['reports'].items():
try:
sim.cfg.recordTraces[k] = {'sec': v['sections'], 'loc': 0.5, 'var': v['variable_name']}
sim.cfg.analysis.plotTraces = {
'include': sim.cfg.node_sets[v['cells']].values()
} # use 'conds' so works for 'model_type' # UPDATE!
except:
pass
# ------------------------------------------------------------------------------------------------------------
# Create populations
# ------------------------------------------------------------------------------------------------------------
[docs]
def createPops(self):
# Get info from nodes files
for n in self.network_config['networks']['nodes']:
nodes_file = self.subs(n['nodes_file'])
node_types_file = self.subs(n['node_types_file'])
print("\nLoading nodes from %s and %s" % (nodes_file, node_types_file))
h5file = tables.open_file(nodes_file, mode='r')
self.parse_group(h5file.root.nodes)
h5file.close()
self.nodes_info[self.current_node] = load_csv_props(node_types_file)
self.current_node = None
pp.pprint(self.nodes_info)
# Use extracted node/cell info to create populations
for sonata_pop in self.cell_info:
# iterate over cell types -- will make one netpyne population per cell type
for type in self.cell_info[sonata_pop]['type_numbers']:
info = self.nodes_info[sonata_pop][type]
pop_name = info['pop_name'] if 'pop_name' in info else None
ref = info['model_name'] if 'model_name' in info else info['model_type']
model_type = info['model_type']
model_template = info['model_template'] if 'model_template' in info else '- None -'
if pop_name:
pop_id = '%s_%s' % (sonata_pop, pop_name)
else:
pop_id = '%s_%s_%s' % (sonata_pop, ref, type)
self.pop_id_from_type[(sonata_pop, type)] = pop_id
print(" - Adding population: %s which has model info: %s" % (pop_id, info))
size = self.cell_info[sonata_pop]['type_numbers'][type]
# create netpyne pop
# Note: alternatively could create sim.net.params.popParams and then call sim.createPops()
popTags = {}
popTags['cellModel'] = model_type
popTags['cellType'] = info['model_name'] if 'model_name' in info else pop_id
popTags['numCells'] = size
popTags['pop'] = pop_id
popTags['ei'] = info['ei'] if 'ei' in info else ''
sim.net.pops[pop_id] = sim.Pop(pop_id, popTags)
sim.net.params.popParams[pop_id] = popTags
# create population cell template (sections) from morphology and dynamics params files
if model_type == 'biophysical':
sim.net.pops[pop_id].cellModelClass = sim.CompartCell
# morphology
morphology_file = (
self.subs(self.network_config['components']['morphologies_dir'])
+ '/'
+ info['morphology']
+ '.swc'
)
cellMorph = EmptyCell()
swcData = h.Import3d_SWC_read()
swcData.input(morphology_file)
swcSecs = h.Import3d_GUI(swcData, 0)
swcSecs.instantiate(cellMorph)
# replace axon with AIS stub
if self.replaceAxon:
fix_axon_peri(cellMorph)
# extract netpyne parameters
secs, secLists, synMechs, globs = neuronPyHoc.getCellParams(cellMorph)
# remove sec vinits since imported temporary cell with morph
for secName in secs:
del secs[secName]['vinit']
# fix nseg based on dL
if self.setdLNseg:
fix_sec_nseg(secs, sim.cfg.dL)
# invert y coordinates
# if self.swapSomaXY:
# swap_soma_xy(secs)
# make soma mid segment (x,y,z) = (0,0,0)
# somaLabel = next((s for s in secs.keys() if 'soma' in s), None)
# somaPtFirst = secs[somaLabel]['geom']['pt3d'][0]
# somaPtLast = secs[somaLabel]['geom']['pt3d'][-1]
# somaPt = [(p1+p2)/2.0 for p1,p2 in zip(somaPtFirst, somaPtLast)]
# for secLabel in secs:
# for ipt3d in range(len(secs[secLabel]['geom']['pt3d'])):
# origPt = secs[secLabel]['geom']['pt3d'][ipt3d]
# offsetX = 0.0
# if 'apic' in secLabel:
# offsetX = 0.0
# newpt = (origPt[0] - somaPt[0] + offsetX, origPt[1] - somaPt[1], origPt[2] - somaPt[2], origPt[3])
# secs[secLabel]['geom']['pt3d'][ipt3d] = newpt
# create mapping of sec ids
secLists['SONATA_sec_id'] = [sim.conversion.getSecName(sec) for sec in cellMorph.all]
cellRule = {'conds': {'pop': pop_id}, 'secs': secs, 'secLists': secLists, 'globals': globs}
# dynamics params
if info['model_template'].startswith('nml'):
dynamics_params_file = self.subs(
self.network_config['components']['biophysical_neuron_models_dir']
+ '/'
+ info['model_template']
)
dynamics_params_file = dynamics_params_file.replace('nml:', '')
# nml_doc = read_neuroml2_file(dynamics_params_file)
# cell_dynamic_params = nml_doc.cells[0]
cell_dynamic_params = NMLTree(dynamics_params_file)
cellRule = self.setCellRuleDynamicParamsFromNeuroml(cell_dynamic_params, cellRule)
elif info['dynamics_params'].endswith('json'):
dynamics_params_file = self.subs(
self.network_config['components']['biophysical_neuron_models_dir']
+ '/'
+ info['dynamics_params']
)
cell_dynamic_params = load_json(dynamics_params_file)
cellRule = self.setCellRuleDynamicParamsFromJson(cell_dynamic_params, cellRule)
# set extracted cell params in cellParams rule
sim.net.params.cellParams[pop_id] = cellRule
# clean up before next import
del swcSecs, cellMorph
h.initnrn()
# create population of virtual cells (VecStims so can add spike times)
elif model_type == 'virtual':
popTags['cellModel'] = 'VecStim'
sim.net.pops[pop_id].cellModelClass = sim.PointCell
# ------------------------------------------------------------------------------------------------------------
# Create cells
# ------------------------------------------------------------------------------------------------------------
[docs]
def createCells(self):
for sonata_pop in self.cell_info:
# find unique groups in order
lookup = set() # a temporary lookup set
sonata_groups = [
str(x)
for x in self.cell_info[sonata_pop]['node_group_id'].values()
if str(x) not in lookup and lookup.add(str(x)) is None
]
cellLocs = {}
for sonata_group in sonata_groups:
cellLocs[sonata_group] = self.cell_info[sonata_pop][sonata_group]['locations']
cellTypes = self.cell_info[sonata_pop]['types']
numCells = len(self.cell_info[sonata_pop]['types'])
self.cell_info[sonata_pop]['gid_from_id'] = {} # keep track of gid as func of cell id
for icell in _distributeCells(numCells)[sim.rank]:
# set gid
gid = sim.net.lastGid + icell
# get node_group info
node_group_id = str(self.cell_info[sonata_pop]['node_group_id'][icell])
# get info from pop
cellTags = {}
cellType = cellTypes[icell]
pop_id = self.pop_id_from_type[(sonata_pop, cellType)]
pop = sim.net.pops[pop_id]
pop.cellGids.append(gid) # add gid list of cells belonging to this population - not needed?
self.cell_info[sonata_pop]['gid_from_id'][icell] = gid
model_type = pop.tags['cellModel']
# set cell tags
cellTags = {
k: v for (k, v) in pop.tags.items() if k in sim.net.params.popTagsCopiedToCells
} # copy all pop tags to cell tags, except those that are pop-specific
cellTags['pop'] = pop.tags['pop']
if model_type == 'biophysical':
cellTags['x'] = cellLocs[node_group_id][icell]['x'] # set x location (um)
cellTags['y'] = (
-1 * cellLocs[node_group_id][icell]['y']
) # set y location (um) (reversed since netpyne assumes depth)
cellTags['z'] = cellLocs[node_group_id][icell]['z'] # set z location (um)
cellTags['xnorm'] = cellTags['x'] / sim.net.params.sizeX # set x location (um)
cellTags['ynorm'] = cellTags['y'] / sim.net.params.sizeY # set y location (um)
cellTags['znorm'] = cellTags['z'] / sim.net.params.sizeZ # set z location (um)
if 'rotation_angle_yaxis' in cellLocs[node_group_id][icell]:
cellTags['rot_y'] = cellLocs[node_group_id][icell][
'rotation_angle_yaxis'
] # set y-axis rotation (implementation MISSING!)
if 'rotation_angle_zaxis' in cellLocs[node_group_id][icell]:
cellTags['rot_z'] = cellLocs[node_group_id][icell][
'rotation_angle_zaxis'
] # set z-axis rotation
# sim.net.cells[-1].randrandRotationAngle = cellTags['rot_z'] # rotate cell in z-axis (y-axis rot missing) MISSING!
elif model_type in ['virtual', 'VecStim', 'NetStim']:
if 'spkTimes' in pop.tags: # if VecStim, copy spike times to params
cellTags['params'] = {}
if isinstance(pop.tags['spkTimes'][0], list):
try:
cellTags['params']['spkTimes'] = pop.tags['spkTimes'][icell] # 2D list
except:
pass
else:
cellTags['params']['spkTimes'] = pop.tags['spkTimes'] # 1D list (same for all)
sim.net.cells.append(pop.cellModelClass(gid, cellTags)) # instantiate Cell object
print(('Cell %d/%d (gid=%d) of pop %s, on node %d, ' % (icell, numCells, gid, pop_id, sim.rank)))
sim.net.lastGid = sim.net.lastGid + numCells
# ------------------------------------------------------------------------------------------------------------
# Create connections
# ------------------------------------------------------------------------------------------------------------
[docs]
def createConns(self):
"""
SONATA method - works but same results as NeuroMLlite
"""
'''
from sonata.io import File, Edge
data = File(data_files=[self.subs('$NETWORK_DIR/excvirt_cortex_edges.h5')],
data_type_files=[self.subs('$NETWORK_DIR/excvirt_cortex_edge_types.csv')])
'''
# NeuroMLlite Method
self.edges_info = {}
self.conn_info = {}
synMechSubs = {'level_of_detail': 'mod', 'erev': 'e'}
if 'edges' in self.network_config['networks']:
for e in self.network_config['networks']['edges']:
edges_file = self.subs(e['edges_file'])
edge_types_file = self.subs(e['edge_types_file'])
print("\nLoading edges from %s and %s" % (edges_file, edge_types_file))
h5file = tables.open_file(edges_file, mode='r')
print("Opened HDF5 file: %s" % (h5file.filename))
self.parse_group(h5file.root.edges)
h5file.close()
self.edges_info[self.current_edge] = load_csv_props(edge_types_file)
self.current_edge = None
for conn in self.conn_info:
pre_node = self.conn_info[conn]['pre_node']
post_node = self.conn_info[conn]['post_node']
print(' Adding projection %s: %s -> %s ' % (conn, pre_node, post_node))
# add all synMechs in this projection to netParams.synMechParams
for type in self.edges_info[conn]:
syn_label = self.edges_info[conn][type]['dynamics_params'].split('.')[0]
if syn_label not in sim.net.params.synMechParams:
dynamics_params_file = (
self.subs(self.network_config['components']['synaptic_models_dir'])
+ '/'
+ self.edges_info[conn][type]['dynamics_params']
)
syn_dyn_params = load_json(dynamics_params_file)
synMechParams = dict(syn_dyn_params)
for k in synMechParams: # replace keys
if k in synMechSubs:
synMechParams[synMechSubs[k]] = synMechParams.pop(k)
synMechParams['mod'] = self.edges_info[conn][type]['model_template']
sim.net.params.synMechParams[syn_label] = synMechParams
print(' Added synMech %s ' % (syn_label))
# add individual connections in this projection
for i in range(len(self.conn_info[conn]['pre_id'])):
pre_id = self.conn_info[conn]['pre_id'][i]
post_id = self.conn_info[conn]['post_id'][i]
pre_gid = self.cell_info[pre_node]['gid_from_id'][pre_id]
post_gid = self.cell_info[post_node]['gid_from_id'][post_id]
if post_gid in sim.net.gid2lid:
type = self.conn_info[conn]['edge_type_id'][i]
print(
' Conn: type %s pop %s (id %s) -> pop %s (id %s) MAPPED TO: cell gid %s -> cell gid %s'
% (type, pre_node, pre_id, post_node, post_id, pre_gid, post_gid)
)
# print(self.edges_info[conn][type])
connParams = {}
postCell = sim.net.cells[sim.net.gid2lid[post_gid]]
# preGid
connParams['preGid'] = pre_gid
# synMech
connParams['synMech'] = self.edges_info[conn][type]['dynamics_params'].split('.')[0]
# weight
sign = syn_dyn_params['sign'] if 'sign' in syn_dyn_params else 1
try:
weight = self.conn_info[conn]['syn_weight'][i]
except:
weight = (
self.edges_info[conn][type]['syn_weight']
if 'syn_weight' in self.edges_info[conn][type]
else 1.0
)
connParams['weight'] = sign * weight
# delay
connParams['delay'] = (
self.edges_info[conn][type]['delay'] if 'delay' in self.edges_info[conn][type] else 0
)
# sec
sec_id = self.conn_info[conn]['sec_id'][i]
connParams['sec'] = postCell.secLists['SONATA_sec_id'][sec_id]
# loc
connParams['loc'] = self.conn_info[conn]['sec_x'][i]
# add connection
postCell.addConn(connParams)
# from IPython import embed; embed()
# ------------------------------------------------------------------------------------------------------------
# Create NetStims
# ------------------------------------------------------------------------------------------------------------
[docs]
def createNetStims(self):
for input in self.simulation_config['inputs']:
# get input info from sim config
info = self.simulation_config['inputs'][input]
if info['input_type'] == 'spikes':
print(" - Adding input: %s which has info: %s" % (input, info))
node_set = info['node_set']
# get cell type and pop_id
cellType = self.cell_info[node_set]['types'][0]
pop_id = self.pop_id_from_type[(node_set, cellType)]
# get stpikes
from pyneuroml.plot.PlotSpikes import read_sonata_spikes_hdf5_file
from pyneuroml.plot.PlotSpikes import POP_NAME_SPIKEFILE_WITH_GIDS
ids_times = read_sonata_spikes_hdf5_file(self.subs(info['input_file']))[POP_NAME_SPIKEFILE_WITH_GIDS]
spkTimes = [[spk for spk in spks] for k, spks in ids_times.items()]
# add spikes to vecstim pop
sim.net.pops[pop_id].tags['spkTimes'] = spkTimes
# ------------------------------------------------------------------------------------------------------------
# Create IClamps
# ------------------------------------------------------------------------------------------------------------
[docs]
def createIClamps(self):
for input in self.simulation_config['inputs']:
# get input info from sim config
info = self.simulation_config['inputs'][input]
if info['input_type'] == 'current_clamp':
print(" - Adding input: %s which has info: %s" % (input, info))
node_set = info['node_set']
sim.net.params.stimSourceParams[input] = {
'type': info['module'],
'delay': info['delay'],
'dur': info['duration'],
'amp': info['amp'],
}
sec = info.get(
'sec', 'soma_0'
) # fix this - default name for soma section? how does SONATA know where to stim?
loc = info.get('loc', 0.5)
conds_sonata = sim.cfg.node_sets[node_set]
if 'model_type' in conds_sonata:
conds = {'cellModel': conds_sonata['model_type']}
sim.net.params.stimTargetParams[input + '->' + node_set] = {
'source': input,
'conds': conds,
'sec': sec,
'loc': loc,
}
sim.net.addStims()
# ------------------------------------------------------------------------------------------------------------
# Set cell dynamic params into a cell rule (netParams.cellParams) from NeuroML
# ------------------------------------------------------------------------------------------------------------
[docs]
def setCellRuleDynamicParamsFromNeuroml(self, nml_params, cellRule):
# Iterate through the NML tree by section and use the properties to manually create cell mechanisms
section_lists = [(sec, sec.split('_')[0][:4]) for sec in cellRule['secs']]
for sec, sec_type in section_lists:
for prop_name, prop_obj in nml_params[sec_type].items():
if prop_obj.element_tag() == 'resistivity':
cellRule['secs'][sec]['geom']['Ra'] = prop_obj.value
elif prop_obj.element_tag() == 'specificCapacitance':
cellRule['secs'][sec]['geom']['cm'] = prop_obj.value
elif prop_obj.element_tag() == 'channelDensity' and prop_obj.ion_channel == 'pas':
cellRule['secs'][sec]['mechs']['pas'] = {'g': prop_obj.cond_density, 'e': prop_obj.erev}
elif prop_obj.element_tag() == 'channelDensity' or prop_obj.element_tag() == 'channelDensityNernst':
cellRule['secs'][sec]['mechs'][prop_obj.ion_channel] = {
prop_obj.id.split('_')[0]: prop_obj.cond_density
}
if 'ions' not in cellRule['secs'][sec]:
cellRule['secs'][sec]['ions'] = {}
if prop_obj.ion == 'na' and prop_obj:
cellRule['secs'][sec]['ions']['na'] = {'e': prop_obj.erev}
# sec.ena = prop_obj.erev
elif prop_obj.ion == 'k':
cellRule['secs'][sec]['ions']['k'] = {'e': prop_obj.erev}
# sec.ek = prop_obj.erev
elif prop_obj.element_tag() == 'concentrationModel':
cellRule['secs'][sec]['mechs'][prop_obj.id] = {'gamma': prop_obj.gamma, 'decay': prop_obj.decay}
# sec.insert(prop_obj.id)
# setattr(sec, 'gamma_' + prop_obj.type, prop_obj.gamma)
# setattr(sec, 'decay_' + prop_obj.type, prop_obj.decay)
return cellRule
# ------------------------------------------------------------------------------------------------------------
# Set cell dynamic params into a cell rule (netParams.cellParams) from NeuroML
# ------------------------------------------------------------------------------------------------------------
[docs]
def setCellRuleDynamicParamsFromNeuroml_old(self, cell, cellRule):
segGroupKeys = set([sec.split('_')[0] for sec in cellRule['secs']])
seg_grps_vs_nrn_sections = {
segGroup: [sec for sec in cellRule['secs'] if sec.startswith(segGroup)] for segGroup in segGroupKeys
}
seg_grps_vs_nrn_sections['all'] = list(cellRule['secs'])
inhomogeneous_parameters = {segGroup: [] for segGroup in segGroupKeys} # how to fill in this from swc file?
for cm in cell.biophysical_properties.membrane_properties.channel_densities:
group = 'all' if not cm.segment_groups else cm.segment_groups
for section_name in seg_grps_vs_nrn_sections[group]:
gmax = pynml.convert_to_units(cm.cond_density, 'S_per_cm2')
if cm.ion_channel == 'pas':
mech = {'g': gmax}
else:
mech = {'gbar': gmax}
erev = pynml.convert_to_units(cm.erev, 'mV')
cellRule['secs'][section_name]['mechs'][cm.ion_channel] = mech
ion = self._determine_ion(cm)
if ion == 'non_specific':
mech['e'] = erev
else:
if 'ions' not in cellRule['secs'][section_name]:
cellRule['secs'][section_name]['ions'] = {}
if ion not in cellRule['secs'][section_name]['ions']:
cellRule['secs'][section_name]['ions'][ion] = {}
cellRule['secs'][section_name]['ions'][ion]['e'] = erev
for cm in cell.biophysical_properties.membrane_properties.channel_density_v_shifts:
group = 'all' if not cm.segment_groups else cm.segment_groups
for section_name in seg_grps_vs_nrn_sections[group]:
gmax = pynml.convert_to_units(cm.cond_density, 'S_per_cm2')
if cm.ion_channel == 'pas':
mech = {'g': gmax}
else:
mech = {'gbar': gmax}
erev = pynml.convert_to_units(cm.erev, 'mV')
cellRule['secs'][section_name]['mechs'][cm.ion_channel] = mech
ion = self._determine_ion(cm)
if ion == 'non_specific':
mech['e'] = erev
else:
if 'ions' not in cellRule['secs'][section_name]:
cellRule['secs'][section_name]['ions'] = {}
if ion not in cellRule['secs'][section_name]['ions']:
cellRule['secs'][section_name]['ions'][ion] = {}
cellRule['secs'][section_name]['ions'][ion]['e'] = erev
mech['vShift'] = pynml.convert_to_units(cm.v_shift, 'mV')
for cm in cell.biophysical_properties.membrane_properties.channel_density_nernsts:
group = 'all' if not cm.segment_groups else cm.segment_groups
for section_name in seg_grps_vs_nrn_sections[group]:
gmax = pynml.convert_to_units(cm.cond_density, 'S_per_cm2')
if cm.ion_channel == 'pas':
mech = {'g': gmax}
else:
mech = {'gbar': gmax}
cellRule['secs'][section_name]['mechs'][cm.ion_channel] = mech
# TODO: erev!!
ion = self._determine_ion(cm)
if ion == 'non_specific':
pass
##mech['e'] = erev
else:
if 'ions' not in cellRule['secs'][section_name]:
cellRule['secs'][section_name]['ions'] = {}
if ion not in cellRule['secs'][section_name]['ions']:
cellRule['secs'][section_name]['ions'][ion] = {}
##cellRule['secs'][section_name]['ions'][ion]['e'] = erev
for cm in cell.biophysical_properties.membrane_properties.channel_density_ghk2s:
group = 'all' if not cm.segment_groups else cm.segment_groups
for section_name in seg_grps_vs_nrn_sections[group]:
gmax = pynml.convert_to_units(cm.cond_density, 'S_per_cm2')
if cm.ion_channel == 'pas':
mech = {'g': gmax}
else:
mech = {'gbar': gmax}
##erev = pynml.convert_to_units(cm.erev,'mV')
cellRule['secs'][section_name]['mechs'][cm.ion_channel] = mech
ion = self._determine_ion(cm)
if ion == 'non_specific':
pass
# mech['e'] = erev
else:
if 'ions' not in cellRule['secs'][section_name]:
cellRule['secs'][section_name]['ions'] = {}
if ion not in cellRule['secs'][section_name]['ions']:
cellRule['secs'][section_name]['ions'][ion] = {}
##cellRule['secs'][section_name]['ions'][ion]['e'] = erev
for cm in cell.biophysical_properties.membrane_properties.channel_density_non_uniforms:
for vp in cm.variable_parameters:
if vp.parameter == "condDensity":
iv = vp.inhomogeneous_value
grp = vp.segment_groups
path_vals = inhomogeneous_parameters[grp]
expr = iv.value.replace('exp(', 'math.exp(')
# print("variable_parameter: %s, %s, %s"%(grp,iv, expr))
for section_name in seg_grps_vs_nrn_sections[grp]:
path_start, path_end = inhomogeneous_parameters[grp][section_name]
p = path_start
gmax_start = pynml.convert_to_units('%s S_per_m2' % eval(expr), 'S_per_cm2')
p = path_end
gmax_end = pynml.convert_to_units('%s S_per_m2' % eval(expr), 'S_per_cm2')
nseg = (
cellRule['secs'][section_name]['geom']['nseg']
if 'nseg' in cellRule['secs'][section_name]['geom']
else 1
)
# print(" Cond dens %s: %s S_per_cm2 (%s um) -> %s S_per_cm2 (%s um); nseg = %s"%(section_name,gmax_start,path_start,gmax_end,path_end, nseg))
gmax = []
for fract in [(2 * i + 1.0) / (2 * nseg) for i in range(nseg)]:
p = path_start + fract * (path_end - path_start)
gmax_i = pynml.convert_to_units('%s S_per_m2' % eval(expr), 'S_per_cm2')
# print(" Point %s at %s = %s"%(p,fract, gmax_i))
gmax.append(gmax_i)
if cm.ion_channel == 'pas':
mech = {'g': gmax}
else:
mech = {'gbar': gmax}
erev = pynml.convert_to_units(cm.erev, 'mV')
cellRule['secs'][section_name]['mechs'][cm.ion_channel] = mech
ion = self._determine_ion(cm)
if ion == 'non_specific':
mech['e'] = erev
else:
if 'ions' not in cellRule['secs'][section_name]:
cellRule['secs'][section_name]['ions'] = {}
if ion not in cellRule['secs'][section_name]['ions']:
cellRule['secs'][section_name]['ions'][ion] = {}
cellRule['secs'][section_name]['ions'][ion]['e'] = erev
for cm in cell.biophysical_properties.membrane_properties.channel_density_ghks:
raise Exception("<channelDensityGHK> not yet supported!")
for cm in cell.biophysical_properties.membrane_properties.channel_density_non_uniform_nernsts:
raise Exception("<channelDensityNonUniformNernst> not yet supported!")
for cm in cell.biophysical_properties.membrane_properties.channel_density_non_uniform_ghks:
raise Exception("<channelDensityNonUniformGHK> not yet supported!")
for vi in cell.biophysical_properties.membrane_properties.init_memb_potentials:
group = 'all' if not vi.segment_groups else vi.segment_groups
for section_name in seg_grps_vs_nrn_sections[group]:
cellRule['secs'][section_name]['vinit'] = pynml.convert_to_units(vi.value, 'mV')
# remove default vinit if vi empty so the global h.v_init is used
if len(cell.biophysical_properties.membrane_properties.init_memb_potentials) == 0:
group = 'all'
for section_name in seg_grps_vs_nrn_sections[group]:
del cellRule['secs'][section_name]['vinit']
for sc in cell.biophysical_properties.membrane_properties.specific_capacitances:
group = 'all' if not sc.segment_groups else sc.segment_groups
for section_name in seg_grps_vs_nrn_sections[group]:
cellRule['secs'][section_name]['geom']['cm'] = pynml.convert_to_units(sc.value, 'uF_per_cm2')
if hasattr(cell.biophysical_properties.intracellular_properties, 'resistivities'):
for ra in cell.biophysical_properties.intracellular_properties.resistivities:
group = 'all' if not ra.segment_groups else ra.segment_groups
for section_name in seg_grps_vs_nrn_sections[group]:
cellRule['secs'][section_name]['geom']['Ra'] = pynml.convert_to_units(ra.value, 'ohm_cm')
concentrationModelParams = {}
excludeConcentrationModel = ['id', 'type', 'ion']
if hasattr(cell, 'concentrationModel'):
concentrationModelParams[cell.concentratrionModel.id] = {}
for param in cell.concentratrionModel:
if param not in excludeConcentrationModel:
concentrationModelParams[cell.concentratrionModel.id][param] = getattr(
cell.concentratrionModel, param
)
if hasattr(cell.biophysical_properties.intracellular_properties, 'species'):
for specie in cell.biophysical_properties.intracellular_properties.species:
group = 'all' if not specie.segment_groups else specie.segment_groups
for section_name in seg_grps_vs_nrn_sections[group]:
cellRule['secs'][section_name]['ions'][specie.ion]['o'] = pynml.convert_to_units(
specie.initial_ext_concentration, 'mM'
)
cellRule['secs'][section_name]['ions'][specie.ion]['i'] = pynml.convert_to_units(
specie.initial_concentration, 'mM'
)
# cellRule['secs'][section_name]['mechs'][cell.concentratrionModel] = concentrationModelParams
# print(cell.concentratrionModel)
print(concentrationModelParams)
return cellRule
def _determine_ion(self, channel_density):
ion = channel_density.ion
if not ion:
if 'na' in channel_density.ion_channel.lower():
ion = 'na'
elif 'k' in channel_density.ion_channel.lower():
ion = 'k'
elif 'ca' in channel_density.ion_channel.lower():
ion = 'ca'
else:
ion = 'non_specific'
return ion
# ------------------------------------------------------------------------------------------------------------
# Set cell dynamic params into a cell rule (netParams.cellParams) from Json
# ------------------------------------------------------------------------------------------------------------
[docs]
def setCellRuleDynamicParamsFromJson(self, cell_dynamic_params, cellRule):
passive = cell_dynamic_params['passive'][0]
conditions = cell_dynamic_params['conditions'][0]
genome = cell_dynamic_params['genome']
# Set passive properties
cm_dict = dict([(c['section'], c['cm']) for c in passive['cm']])
for secName, sec in cellRule['secs'].items():
sec['geom']['Ra'] = passive['ra']
sec['geom']['Ra'] = cm_dict[secName.split('_')[0]]
sec['mechs'] = {'pas': {'e': passive["e_pas"]}}
# Insert channels and set parameters
for p in genome:
sections = [s for s in cellRule['secs'] if s.split('_')[0] == p["section"]]
for sec in sections:
if p["mechanism"] != "":
cellRule['secs'][sec]['mechs'][p['mechanism']] = {p['name'].split('_')[0]: p['value']}
# Set reversal potentials
for erev in conditions['erev']:
sections = [s for s in cellRule['secs'] if s.split('_')[0] == erev["section"]]
for sec in sections:
for eion in erev:
if eion.startswith('e'):
if 'ions' not in cellRule['secs'][sec]:
print(sec, eion)
cellRule['secs'][sec]['ions'] = {}
cellRule['secs'][sec]['ions'][eion[1:]] = {'e': erev[eion]}
if 'v_init' in conditions:
for sec in cellRule['secs'].values():
sec['vinit'] = conditions['v_init']
return cellRule
# ------------------------------------------------------------------------------------------------------------
# Parse SONATA hdf5
# ------------------------------------------------------------------------------------------------------------
[docs]
def parse_group(self, g):
print("+++++++++++++++Parsing group: " + str(g) + ", name: " + g._v_name)
for node in g:
print(
" ------Sub node: %s, class: %s, name: %s (parent: %s)"
% (node, node._c_classid, node._v_name, g._v_name)
)
if node._c_classid == 'GROUP':
if g._v_name == 'nodes':
node_id = node._v_name.replace('-', '_')
self.current_node = node_id
print('# CURRENT NODE: %s' % (self.current_node))
self.cell_info[self.current_node] = {}
self.cell_info[self.current_node]['types'] = {}
self.cell_info[self.current_node]['type_numbers'] = {}
self.cell_info[self.current_node]['node_id'] = {}
self.cell_info[self.current_node]['node_group_id'] = {}
self.cell_info[self.current_node]['node_group_index'] = {}
# self.pop_locations[self.current_population] = {}
if g._v_name == self.current_node:
node_group = node._v_name
self.current_node_group = node_group
print('# CURRENT NODE GROUP: %s' % (self.current_node))
self.cell_info[self.current_node][self.current_node_group] = {}
self.cell_info[self.current_node][self.current_node_group]['locations'] = {}
if g._v_name == 'edges':
edge_id = node._v_name.replace('-', '_')
print(' Found edge: %s' % edge_id)
self.current_edge = edge_id
self.conn_info[self.current_edge] = {}
if g._v_name == self.current_edge:
self.current_pre_node = g._v_name.split('_to_')[0]
self.current_post_node = g._v_name.split('_to_')[1]
print(' Found edge %s -> %s' % (self.current_pre_node, self.current_post_node))
self.conn_info[self.current_edge]['pre_node'] = self.current_pre_node
self.conn_info[self.current_edge]['post_node'] = self.current_post_node
self.parse_group(node)
if self._is_dataset(node):
self.parse_dataset(node)
self.current_population = None
self.current_node_group = None
self.current_edge_group = None ## added to support multiple edge groups
def _is_dataset(self, node):
return node._c_classid == 'ARRAY' or node._c_classid == 'CARRAY'
[docs]
def parse_dataset(self, d):
print(
"Parsing dataset/array: %s; at node: %s, node_group %s"
% (str(d), self.current_node, self.current_node_group)
)
if self.current_node_group:
for i in range(0, d.shape[0]):
# index = 0 if d.name=='x' else (1 if d.name=='y' else 2)
if not i in self.cell_info[self.current_node][self.current_node_group]['locations']:
self.cell_info[self.current_node][self.current_node_group]['locations'][i] = {}
self.cell_info[self.current_node][self.current_node_group]['locations'][i][d.name] = d[i]
elif self.current_node:
if d.name == 'node_id':
for i in range(0, d.shape[0]):
self.cell_info[self.current_node]['node_id'][i] = d[i]
if d.name == 'node_group_id':
for i in range(0, d.shape[0]):
self.cell_info[self.current_node]['node_group_id'][i] = d[i]
if d.name == 'node_group_index':
for i in range(0, d.shape[0]):
self.cell_info[self.current_node]['node_group_index'][i] = d[i]
if d.name == 'node_type_id':
for i in range(0, d.shape[0]):
self.cell_info[self.current_node]['types'][i] = d[i]
if not d[i] in self.cell_info[self.current_node]['type_numbers']:
self.cell_info[self.current_node]['type_numbers'][d[i]] = 0
self.cell_info[self.current_node]['type_numbers'][d[i]] += 1
# TODO: adde here: 'elif self.current_edge_group:' -- to support multiple edge group
elif d.name == 'source_node_id':
self.conn_info[self.current_edge]['pre_id'] = [i for i in d]
elif d.name == 'edge_type_id':
self.conn_info[self.current_edge]['edge_type_id'] = [int(i) for i in d]
elif d.name == 'target_node_id':
self.conn_info[self.current_edge]['post_id'] = [i for i in d]
elif d.name == 'sec_id':
self.conn_info[self.current_edge]['sec_id'] = [i for i in d]
elif d.name == 'sec_x':
self.conn_info[self.current_edge]['sec_x'] = [i for i in d]
elif d.name == 'syn_weight':
self.conn_info[self.current_edge]['syn_weight'] = [i for i in d]
else:
print("Unhandled dataset: %s" % d.name)
# ------------------------------------------------------------------------------------------------------------
# Read simulation output from HDF5
# ------------------------------------------------------------------------------------------------------------
[docs]
def subs(self, path):
"""
Search the strings in a config file for a substitutable value, e.g.
"morphologies_dir": "$COMPONENT_DIR/morphologies",
"""
# print_v('Checking for %s in %s'%(substitutes.keys(),path))
for s in sorted(self.substitutes, key=lambda k: len(k), reverse=True):
if path.startswith(s):
path = path.replace(s, self.substitutes[s])
return path