I modified the code given in this repo to what I think is a more generalized version (below) where the input is an array containing points generated by any sort of process.
It gives a perfect result on predicting sin functions, but on a constant linear trend gives absolutely terrible, nonsense performance.
By my understanding, that is simply the nature of reservoir computing, it can't handle a trend. Is that correct?
I would also appreciate any other insight you might have on the generalization of this function. Thanks!
import numpy as np
import pandas as pd
def load_linear(long=False, shape=None, start_date: str = "2021-01-01"):
"""Create a dataset of just zeroes for testing edge case."""
if shape is None:
shape = (500, 5)
df_wide = pd.DataFrame(
np.ones(shape), index=pd.date_range(start_date, periods=shape[0], freq="D")
)
df_wide = (df_wide * list(range(0, shape[1]))).cumsum()
if not long:
return df_wide
else:
df_wide.index.name = "datetime"
df_long = df_wide.reset_index(drop=False).melt(
id_vars=['datetime'], var_name='series_id', value_name='value'
)
return df_long
def load_sine(long=False, shape=None, start_date: str = "2021-01-01"):
"""Create a dataset of just zeroes for testing edge case."""
if shape is None:
shape = (500, 5)
df_wide = pd.DataFrame(
np.ones(shape),
index=pd.date_range(start_date, periods=shape[0], freq="D"),
columns=range(shape[1])
)
X = pd.to_numeric(df_wide.index, errors='coerce', downcast='integer').values
def sin_func(a, X):
return a * np.sin(1 * X) + a
for column in df_wide.columns:
df_wide[column] = sin_func(column, X)
if not long:
return df_wide
else:
df_wide.index.name = "datetime"
df_long = df_wide.reset_index(drop=False).melt(
id_vars=['datetime'], var_name='series_id', value_name='value'
)
return df_long
def predict_reservoir(df, forecast_length, warmup_pts, k=2, ridge_param=2.5e-6):
# k = # number of time delay taps
# pass in traintime_pts to limit as .tail() for huge datasets?
n_pts = df.shape[1]
# handle short data edge case
min_train_pts = 10
max_warmup_pts = n_pts - min_train_pts
if warmup_pts >= max_warmup_pts:
warmup_pts = max_warmup_pts if max_warmup_pts > 0 else 0
traintime_pts = n_pts - warmup_pts # round(traintime / dt)
warmtrain_pts = warmup_pts + traintime_pts
testtime_pts = forecast_length + 1 # round(testtime / dt)
maxtime_pts = n_pts # round(maxtime / dt)
# input dimension
d = df.shape[0]
# size of the linear part of the feature vector
dlin = k * d
# size of nonlinear part of feature vector
dnonlin = int(dlin * (dlin + 1) / 2)
# total size of feature vector: constant + linear + nonlinear
dtot = 1 + dlin + dnonlin
# create an array to hold the linear part of the feature vector
x = np.zeros((dlin, maxtime_pts))
# fill in the linear part of the feature vector for all times
for delay in range(k):
for j in range(delay, maxtime_pts):
x[d * delay : d * (delay + 1), j] = df[:, j - delay]
# create an array to hold the full feature vector for training time
# (use ones so the constant term is already 1)
out_train = np.ones((dtot, traintime_pts))
# copy over the linear part (shift over by one to account for constant)
out_train[1 : dlin + 1, :] = x[:, warmup_pts - 1 : warmtrain_pts - 1]
# fill in the non-linear part
cnt = 0
for row in range(dlin):
for column in range(row, dlin):
# shift by one for constant
out_train[dlin + 1 + cnt] = (
x[row, warmup_pts - 1 : warmtrain_pts - 1]
* x[column, warmup_pts - 1 : warmtrain_pts - 1]
)
cnt += 1
# ridge regression: train W_out to map out_train to Lorenz[t] - Lorenz[t - 1]
W_out = (
(x[0:d, warmup_pts:warmtrain_pts] - x[0:d, warmup_pts - 1 : warmtrain_pts - 1])
@ out_train[:, :].T
@ np.linalg.pinv(
out_train[:, :] @ out_train[:, :].T + ridge_param * np.identity(dtot)
)
)
# create a place to store feature vectors for prediction
out_test = np.ones(dtot) # full feature vector
x_test = np.zeros((dlin, testtime_pts)) # linear part
# copy over initial linear feature vector
x_test[:, 0] = x[:, warmtrain_pts - 1]
# do prediction
for j in range(testtime_pts - 1):
# copy linear part into whole feature vector
out_test[1 : dlin + 1] = x_test[:, j] # shift by one for constant
# fill in the non-linear part
cnt = 0
for row in range(dlin):
for column in range(row, dlin):
# shift by one for constant
out_test[dlin + 1 + cnt] = x_test[row, j] * x_test[column, j]
cnt += 1
# fill in the delay taps of the next state
x_test[d:dlin, j + 1] = x_test[0 : (dlin - d), j]
# do a prediction
x_test[0:d, j + 1] = x_test[0:d, j] + W_out @ out_test[:]
return x_test[0:d, 1:]
# note transposed from the opposite of my usual shape
data_pts = 7000
series = 3
forecast_length = 10
df_sine = load_sine(long=False, shape=(data_pts, series)).transpose().to_numpy()
df_sine_train = df_sine[:, :-10]
df_sine_test = df_sine[:, -10:]
prediction_sine = predict_reservoir(df_sine_train, forecast_length=forecast_length, warmup_pts=150, k=2, ridge_param=2.5e-6)
print(f"sine MAE {np.mean(np.abs(df_sine_test - prediction_sine))}")
df_linear = load_linear(long=False, shape=(data_pts, series)).transpose().to_numpy()
df_linear_train = df_linear[:, :-10]
df_linear_test = df_linear[:, -10:]
prediction_linear = predict_reservoir(df_linear_train, forecast_length=forecast_length, warmup_pts=150, k=2, ridge_param=2.5e-6)
print(f"linear MAE {np.mean(np.abs(df_linear_test - prediction_linear))}")