def check_input_is_list(obj, var=None, list_element=0):
if hasattr(obj, var):
attr = getattr(obj, var)
else:
raise ValueError(f"Class does not have '{var}'-attribute")
obj_attr = "func_file"
if hasattr(obj, "tsv_file"):
obj_attr = "tsv_file"
if isinstance(attr, list) or isinstance(attr, np.ndarray):
if len(attr) != len(getattr(obj,obj_attr)):
raise ValueError(f"Length of '{var}' ({len(attr)}) does not match number of func files ({len(getattr(obj,obj_attr))}). Either specify a list of equal lenghts or 1 integer value for all volumes")
return attr[list_element]
else:
return attr
class ParseExpToolsFile(ParseEyetrackerFile):
"""ParseExpToolsFile()
Class for parsing tsv-files created during experiments with Exptools2. The class will read in the file, read when the experiment actually started, correct onset times for this start time and time deleted because of removing the first few volumes (to do this correctly, set the `TR` and `deleted_first_timepoints`). You can also provide a numpy array/file containing eye blinks that should be added to the onset times in real-world time (seconds). In principle, it will return a pandas DataFrame indexed by subject and run that can be easily concatenated over runs. This function relies on the naming used when programming the experiment. In the `session.py` file, you should have created `phase_names=['iti', 'stim']`; the class will use these things to parse the file.
Parameters
----------
tsv_file: str, list
path pointing to the output file of the experiment
subject: int
subject number in the returned pandas DataFrame (should start with 1, ..., n)
run: int
run number you'd like to have the onset times for
button: bool
boolean whether to include onset times of button responses (default is false). ['space'] will be ignored as response
blinks: str, np.ndarray
string or array containing the onset times of eye blinks as extracted with hedfpy
TR: float
repetition time to correct onset times for deleted volumes
deleted_first_timepoints: int
number of volumes to delete to correct onset times for deleted volumes. Can be specified for each individual run if `tsv_file` is a list
use_bids: bool, optional
If true, we'll read BIDS-components such as 'sub', 'run', 'task', etc from the input file and use those as indexers, rather than sequential 1,2,3.
funcs: str, list, optional
List of functional files that is being passed down down to :class:`linescanning.dataset.ParseEyetrackerFile`. Required for correct resampling to functional space
edfs: str, list, optional
List of eyetracking output files that is being passed down down to :class:`linescanning.dataset.ParseEyetrackerFile`.
verbose: bool, optional
Print details to the terminal, default is False
phase_onset: int, optional
Which phase of exptools-trial should be considered the actual stimulus trial. Usually, `phase_onset=0` means the interstimulus interval. Therefore, default = 1
stim_duration: str, int, optional
If desired, add stimulus duration to onset dataframe. Can be one of 'None', 'stim' (to use duration from exptools' log file) or any given integer
add_events: str, list, optional
Add additional events to onset dataframe. Must be an existing column in the exptools log file. For intance, `responses` and `event_type = stim` are read in by default, but if we have a separate column containing the onset of some target (e.g., 'target_onset'), we can add these times to the dataframe with `add_events='target_onset'`.
event_names: str, list, optional
Custom names for manually added events through `add_events` if the column names are not the names you want to use in the dataframe. E.g., if I find `target_onset` too long of a name, I can specify `event_names='target'`. If `add_events` is a list, then `event_names` must be a list of equal length if custom names are desired. By default we'll take the names from `add_events`
RTs: bool, optional
If we have a design that required some response to a stimulus, we can request the reaction times. Default = False
RT_relative_to: str, optional
If `RTs=True`, we need to know relative to what time the button response should be offset. Only correct responses are considered, as there's a conditional statement that requires the present of the reference time (e.g., `target_onset`) and button response. If there's a response but no reference time, the reaction time cannot be calculated. If you do not have a separate reference time column, you can specify `RT_relative_to='start'` to calculate the reaction time relative to onset time. If `RT_relative_to != 'start'`, I'll assume you had a target in your experiment in X/n_trials. From this, we can calculate the accuracy and save that to `self.df_accuracy`, while reaction times are saved in`self.df_rts`
Examples
----------
>>> from linescanning.utils import ParseExpToolsFile
>>> file = 'some/path/to/exptoolsfile.tsv'
>>> parsed_file = ParseExpToolsFile(file, subject=1, run=1, button=True)
>>> onsets = parsed_file.get_onset_df()
>>> # If you want to get all your subjects and runs in 1 nideconv compatible dataframe, you can do something like this:
>>> onsets = []
>>> run_subjects = ['001','002','003']
>>> for sub in run_subjects:
>>> path_tsv_files = os.path.join(f'some/path/sub-{sub}')
>>> f = os.listdir(path_tsv_files)
>>> nr_runs = []; [nr_runs.append(os.path.join(path_tsv_files, r)) for r in f if "events.tsv" in r]
>>>
>>> for run in range(1,len(nr_runs)+1):
>>> sub_idx = run_subjects.index(sub)+1
>>> onsets.append(ParseExpToolsFile(df_onsets, subject=sub_idx, run=run).get_onset_df())
>>>
>>> onsets = pd.concat(onsets).set_index(['subject', 'run', 'event_type'])
"""
def __init__(
self,
tsv_file,
subject=1,
run=1,
button=False,
blinks=None,
RTs=False,
RT_relative_to=None,
TR=0.105,
deleted_first_timepoints=0,
edfs=None,
funcs=None,
use_bids=True,
verbose=False,
phase_onset=1,
stim_duration=None,
add_events=None,
event_names=None,
**kwargs):
self.tsv_file = tsv_file
self.sub = subject
self.run = run
self.TR = TR
self.deleted_first_timepoints = deleted_first_timepoints
self.button = button
self.blinks = blinks
self.funcs = funcs
self.edfs = edfs
self.use_bids = use_bids
self.verbose = verbose
self.phase_onset = phase_onset
self.stim_duration = stim_duration
self.RTs = RTs
self.RT_relative_to = RT_relative_to
self.add_events = add_events
self.event_names = event_names
self.__dict__.update(kwargs)
if self.edfs != None:
super().__init__(
self.edfs,
subject=self.sub,
func_file=self.funcs,
TR1=self.TR,
use_bids=self.use_bids,
verbose=self.verbose)
else:
self.include_blinks = False
if self.verbose:
print("\nEXPTOOLS")
if isinstance(self.tsv_file, str):
self.tsv_file = [self.tsv_file]
if isinstance(self.tsv_file, list):
df_onsets = []
df_rts = []
df_accuracy = []
for run, onset_file in enumerate(self.tsv_file):
if self.use_bids:
bids_comps = utils.split_bids_components(onset_file)
for el in ['sub', 'run']:
setattr(self, el, bids_comps[el])
else:
self.run = run+1
# include eyeblinks?
if self.include_blinks:
self.blinks = self.fetch_blinks_run(run=self.run)
# check if we got different nr of vols to delete per run
delete_vols = check_input_is_list(self, "deleted_first_timepoints", list_element=run)
# check if we got different stimulus durations per run
duration = check_input_is_list(self, var="stim_duration", list_element=run)
# read in the exptools-file
self.preprocess_exptools_file(
onset_file,
run=self.run,
delete_vols=delete_vols,
phase_onset=self.phase_onset,
duration=duration)
# append to df
df_onsets.append(self.get_onset_df(index=False))
# check if we got RTs
try:
df_rts.append(self.get_rts_df(index=False))
except:
pass
# check if we got accuracy (only if RT_relative_to != 'start')
try:
df_accuracy.append(self.get_accuracy(index=False))
except:
pass
# concatemate df
self.df_onsets = pd.concat(df_onsets).set_index(['subject', 'run', 'event_type'])
# rts
try:
self.df_rts = pd.concat(df_rts).set_index(['subject', 'run'])
except:
pass
# accuracy
try:
self.df_accuracy = pd.concat(df_accuracy).set_index(['subject', 'run'])
except:
pass
# get events per run
self.events_per_run = self.events_per_run()
def events_per_run(self):
n_runs = np.unique(self.df_onsets.reset_index()['run'].values)
events = {}
for run in n_runs:
df = utils.select_from_df(self.df_onsets, expression=f"run = {run}", index=None)
events[run] = np.unique(df['event_type'].values)
return events
def events_single_run(self, run=1):
return self.events_per_run[run]
def preprocess_exptools_file(self, tsv_file, run=1, delete_vols=0, phase_onset=1, duration=None):
if self.verbose:
print(f"Preprocessing {tsv_file}")
with open(tsv_file) as f:
self.data = pd.read_csv(f, delimiter='\t')
# trim onsets to first 't'
delete_time = delete_vols*self.TR
self.start_time = float(self.data.loc[(self.data['event_type'] == "pulse") & (self.data['phase'] == 0)]['onset'].values[0])
self.trimmed = self.data.loc[(self.data['event_type'] == "stim") & (self.data['phase'] == phase_onset)].iloc[1:,:]
self.onset_times = self.trimmed['onset'].values[...,np.newaxis]
skip_duration = False
if isinstance(duration, float) or isinstance(duration, int):
self.durations = np.full_like(self.onset_times, float(duration))
elif duration == None:
skip_duration = True
else:
self.durations = self.trimmed['duration'].values[...,np.newaxis]
self.condition = self.trimmed['condition'].values[..., np.newaxis]
if self.verbose:
print(f" 1st 't' @{round(self.start_time,2)}s")
# add button presses
if self.button:
# get dataframe with responses
self.response_df = self.data.loc[(self.data['event_type'] == "response") & (self.data['response'] != 'space')]
# get the onset times
self.response_times = self.response_df['onset'].values[...,np.newaxis]
# stack onset times
self.onset_times = np.vstack([self.onset_times, self.response_times])
# make a condition column
self.response_condition = self.response_df['response'].values[...,np.newaxis]
# stack it onto existing condition array
self.condition = np.vstack([self.condition, self.response_condition])
# check if we should include other events
if isinstance(self.add_events, str):
self.add_events = [self.add_events]
if isinstance(self.event_names, str):
self.event_names = [self.event_names]
if isinstance(self.add_events, list):
if isinstance(self.event_names, list):
if len(self.event_names) != len(self.add_events):
raise ValueError(f"Length ({len(self.add_events)}) of added events {self.add_events} does not equal the length ({len(self.event_names)}) of requested event names {self.event_names}")
else:
self.event_names = self.add_events.copy()
for ix,ev in enumerate(self.add_events):
ev_times = np.array([ii for ii in np.unique(self.data[ev].values)])
# filter for nan (https://stackoverflow.com/a/11620982)
ev_times = ev_times[~np.isnan(ev_times)][...,np.newaxis]
# create condition
ev_names = np.full(ev_times.shape, self.event_names[ix])
# add times and names to array
self.onset_times = np.vstack((self.onset_times, ev_times))
self.condition = np.vstack((self.condition, ev_names))
# check if we should add duration (can't be used in combination with add_events)
if not skip_duration:
if isinstance(self.add_events, list):
raise TypeError(f"Cannot do this operation because I don't know the durations for the added events. Please consider using 'stim_duration!={self.stim_duration}' or 'add_events=None'")
self.onset = np.hstack((self.onset_times, self.condition, self.durations))
else:
self.onset = np.hstack((self.onset_times, self.condition))
# sort array based on onset times (https://stackoverflow.com/a/2828121)
self.onset = self.onset[self.onset[:,0].argsort()]
# add eyeblinks
if isinstance(self.blinks, np.ndarray) or isinstance(self.blinks, str):
if self.verbose:
print(" Including eyeblinks")
if isinstance(self.blinks, np.ndarray):
self.eye_blinks = self.blinks
elif isinstance(self.blinks, str):
if self.blinks.endwith(".npy"):
self.eye_blinks = np.load(self.blinks)
else:
raise ValueError(f"Could not recognize type of {self.blinks}. Should be numpy array or string to numpy file")
self.eye_blinks = self.eye_blinks.astype('object').flatten()
tmp = self.onset[:,0].flatten()
# combine and sort timings
comb = np.concatenate((self.eye_blinks, tmp))
comb = np.sort(comb)[...,np.newaxis]
# add back event types by checking timing values in both arrays
event_array = []
for ii in comb:
if ii in self.onset:
idx = np.where(self.onset == ii)[0][0]
event_array.append(self.onset[idx][-1])
else:
idx = np.where(self.eye_blinks == ii)[0]
event_array.append('blink')
event_array = np.array(event_array)[...,np.newaxis]
self.onset = np.concatenate((comb, event_array), axis=1)
# correct for start time of experiment and deleted time due to removal of inital volumes
self.onset[:, 0] = self.onset[:, 0] - (self.start_time + delete_time)
if self.verbose:
print(f" Cutting {round(self.start_time + delete_time,2)}s from onsets")
if not skip_duration:
print(f" Avg duration = {round(self.durations.mean(),2)}s")
# make dataframe
if skip_duration:
columns = ['onset', 'event_type']
else:
columns = ['onset', 'event_type', 'duration']
self.onset_df = self.index_onset(self.onset, columns=columns, subject=self.sub, run=run)
# check if we should do reaction times
if self.RTs:
if not isinstance(self.RT_relative_to, str):
raise ValueError(f"Need a reference column to calculate reaction times (RTs), not '{self.RT_relative_to}'")
# get response times
if not hasattr(self, "response_df"):
self.response_df = self.data.loc[(self.data['event_type'] == "response") & (self.data['response'] != 'space')]
# fetch trials were target happened
self.id_target = {}
self.id_no_target = []
self.false_alarms = []
self.correct_rejection = []
for idx in self.trimmed['trial_nr'].values:
# cross-check
trial_overview = self.data.loc[(self.data['trial_nr'] == idx)]
# get the values of reference column; skip if all elements are nan
if self.RT_relative_to != "start":
ref_values = np.unique(trial_overview[self.RT_relative_to].values)
has_nans = np.isnan(ref_values)
# should be safe as any response will not have a value in reference column
if False in has_nans:
# increase number of target to remain agnostic about design
id_no_nan = np.where(has_nans == False)[0]
if len(id_no_nan) != 0:
self.id_target[idx] = ref_values[id_no_nan]
else:
# append no targets
self.id_no_target.append(idx)
# false alarm = no target but response
if 'response' in list(trial_overview['event_type'].values):
self.false_alarms.append(idx)
else:
# correct rejection = no target and no response
self.correct_rejection.append(idx)
self.rts = []
self.hits = []
for target_trial in self.id_target.keys():
# get corresponding reference value
if self.RT_relative_to != "start":
ref_value = self.id_target[target_trial]
else:
ref_value = trial_overview.loc[(trial_overview['event_type'] == 'stim')]['onset'].values
# get response value (length will be 0 if dataframe is empty)
response_value = self.response_df.loc[(self.response_df['trial_nr'] == target_trial)]['onset'].values
# check if lengths of reference value and response value are equal
if len(response_value) == len(ref_value):
rt = response_value - ref_value
# ignore negative reaction times..
if rt > 0:
self.rts.append(rt)
if len(self.rts) >= 1:
self.rts = np.array(self.rts)
else:
self.rts = np.array([0])
if len(self.id_target) != 0:
self.hits = len(self.rts)/len(self.id_target)
self.miss = (len(self.id_target)-len(self.rts))/len(self.id_target)
self.fa = len(self.false_alarms)/len(self.id_no_target)
self.cr = len(self.correct_rejection)/len(self.id_no_target)
# set FA to something if 0 to avoid d' = inf
if self.fa == float(0):
self.fa = 0.5*(1/len(self.trimmed['trial_nr'].values))
add_fa_txt = f"[added {round(self.fa,2)} to avoid d' = inf]"
else:
add_fa_txt = ""
# set HITS to something if 0 to avoid d' = inf
if self.hits == float(0):
self.hits = 0.5*(1/len(self.trimmed['trial_nr'].values))
add_hits_txt = f"[added {round(self.hits,2)} to avoid d' = inf]"
else:
add_hits_txt = ""
# calculate d-prime
self.hitZ = stats.norm.ppf(self.hits)
self.faZ = stats.norm.ppf(self.fa)
self.dPrime = self.hitZ-self.faZ
# d-prime=0 is considered as pure guessing.
# d-prime=1 is considered as good measure of signal sensitivity/detectability.
# d-prime=2 is considered as awesome.
self.accuracy_df = self.index_accuracy(np.array([self.hits, self.miss, self.fa, self.cr, self.dPrime], dtype=float)[np.newaxis,...], columns=['hits','miss','fa','cr','d_prime'], subject=self.sub, run=run)
if self.verbose:
if hasattr(self, 'dPrime'):
print(f" Hits:\t{round(self.hits,2)}\t({len(self.rts)}/{len(self.id_target)})\t{add_hits_txt}")
print(f" Miss:\t{round(self.miss,2)}\t({(len(self.id_target)-len(self.rts))}/{len(self.id_target)})")
print(f" FA:\t{round(self.fa,2)}\t({len(self.false_alarms)}/{len(self.id_no_target)})\t{add_fa_txt}")
print(f" CR:\t{round(self.cr,2)}\t({len(self.correct_rejection)}/{len(self.id_no_target)})")
print(f" D':\t{round(self.dPrime,2)}\t(0=guessing;1=good;2=awesome)")
print(f" Average reaction time (RT) = {round(self.rts.mean(),2)}s (relative to '{self.RT_relative_to}').")
# parse into dataframe
self.rts_df = self.index_rts(self.rts, columns=["RTs"], subject=self.sub, run=run)
@staticmethod
def index_onset(array, columns=None, subject=1, run=1, TR=0.105, set_index=False):
if columns == None:
df = pd.DataFrame(array)
else:
df = pd.DataFrame(array, columns=columns)
df['subject'] = subject
df['run'] = run
df['event_type'] = df['event_type'].astype(str)
df['onset'] = df['onset'].astype(float)
# check if we got duration
try:
df['duration'] = df['duration'].astype(float)
except:
pass
if set_index:
return df.set_index(['subject', 'run', 'event_type'])
else:
return df
@staticmethod
def index_rts(array, columns=None, subject=1, run=1, set_index=False):
if columns == None:
df = pd.DataFrame(array)
else:
df = pd.DataFrame(array, columns=columns)
df['subject'] = subject
df['run'] = run
df['RTs'] = df['RTs'].astype(float)
if set_index:
return df.set_index(['subject', 'run'])
else:
return df
@staticmethod
def index_accuracy(array, columns=None, subject=1, run=1, set_index=False):
if columns == None:
df = pd.DataFrame(array)
else:
df = pd.DataFrame(array, columns=columns)
df['subject'] = subject
df['run'] = run
if set_index:
return df.set_index(['subject', 'run'])
else:
return df
def get_onset_df(self, index=False):
"""Return the indexed DataFrame containing onset times"""
if index:
return self.onset_df.set_index(['subject', 'run', 'event_type'])
else:
return self.onset_df
def get_rts_df(self, index=False):
"""Return the indexed DataFrame containing reaction times"""
if index:
return self.rts_df.set_index(['subject', 'run'])
else:
return self.rts_df
def get_accuracy(self, index=False):
"""Return the indexed DataFrame containing reaction times"""
if index:
return self.accuracy_df.set_index(['subject', 'run'])
else:
return self.accuracy_df
def onsets_to_fsl(self, fmt='3-column', amplitude=1, output_base=None):
"""onsets_to_fsl
This function creates a text file with a single column containing the onset times of a given condition. Such a file can be used for SPM or FSL modeling, but it should be noted that the onset times have been corrected for the deleted volumes at the beginning. So make sure your inputting the correct functional data in these cases.
Parameters
----------
subject: int
subject number you'd like to have the onset times for
run: int
run number you'd like to have the onset times for
condition: str
name of the condition you'd like to have the onset times for as specified in the data frame
fname: str
path to output name for text file
Returns
----------
str
if `fname` was specified, a new file will be created and `fname` will be returned as string pointing to that file
list
if `fname` was *None*, the list of onset times will be returned
"""
onsets = self.df_onsets.copy()
subj_list = self.get_subjects(onsets)
for sub in subj_list:
df = utils.select_from_df(onsets, expression=f"subject = {sub}")
n_runs = self.get_runs(df)
for run in n_runs:
onsets_per_run = utils.select_from_df(df, expression=f"run = {run}")
events_per_run = self.get_events(onsets_per_run)
for ix, ev in enumerate(events_per_run):
onsets_per_event = utils.select_from_df(onsets_per_run, expression=f"event_type = {events_per_run[ix]}")
if output_base == None:
if isinstance(self.tsv_file, list):
outdir = os.path.dirname(self.tsv_file[0])
elif isinstance(self.tsv_file, str):
outdir = os.path.dirname(self.tsv_file)
else:
outdir = os.getcwd()
fname = opj(outdir, f"{ev}_run-{run}.txt")
else:
fname = f"{output_base}{ix+1}_run-{run}.txt"
# fetch the onsets
event_onsets = onsets_per_event['onset'].values[..., np.newaxis]
if fmt == "3-column":
# check if we got duration
if 'duration' in list(onsets_per_event.columns):
duration_arr = onsets_per_event['duration'].values[..., np.newaxis]
else:
duration_arr = np.ones_like(onsets_per_event)
amplitude_arr = np.full_like(event_onsets, amplitude)
three_col = np.hstack((event_onsets, duration_arr, amplitude_arr))
print(f"Writing {fname}; {three_col.shape}")
np.savetxt(fname, three_col, delimiter='\t', fmt='%1.3f')
else:
np.savetxt(fname, event_onsets, delimiter='\t', fmt='%1.3f')
@staticmethod
def get_subjects(df):
try:
df = df.reset_index()
except:
pass
return np.unique(df['subject'].values)
@staticmethod
def get_runs(df):
try:
df = df.reset_index()
except:
pass
return np.unique(df['run'].values)
@staticmethod
def get_events(df):
try:
df = df.reset_index()
except:
pass
return np.unique(df['event_type'].values)