import os
import time
import datetime
from dateutil.relativedelta import relativedelta
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv1D, AveragePooling1D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard
import tensorflow as tf
print(tf.__version__)
if tf.test.is_gpu_available():
from tensorflow.keras.layers import CuDNNLSTM as LSTM, CuDNNGRU as GRU
else:
print("GPU is not available")
from tensorflow.keras.layers import LSTM, GRU
DATA_DIR = "/content/data"
FILE = "NFLX.csv"
PATH = os.path.join(DATA_DIR, FILE)
WINDOW = 100
FEATURE_NAMES = ["High", "Low", "Open", "Close", "Volume", "10ma", "30ma"]
BATCH_SIZE = 64
EPOCHS = 20
# download and / or import data
from pandas_datareader.data import DataReader
YEARS = 5 # how many years of data to download
NAME = "NFLX"
DATA_SOURCE = "yahoo"
if os.path.exists(PATH):
df = pd.read_csv(PATH)
df.set_index("Date", inplace=True)
else:
dt = datetime.datetime.now()
df = DataReader(NAME, DATA_SOURCE, start=dt-relativedelta(years=YEARS), end=dt)
if not os.path.isdir(DATA_DIR):
os.makedirs(DATA_DIR)
df.to_csv(PATH)
df.head()
# preprocessing pipeline
from sklearn.base import BaseEstimator, TransformerMixin
class FeatureUpdater(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
assert isinstance(X, pd.DataFrame)
assert "Close" in X.columns
if "Adj Close" in X.columns:
X_copy = X.drop("Adj Close", axis=1)
else:
X_copy = X.copy()
X_copy["10ma"] = X_copy["Close"].rolling(window=10).mean()
X_copy["30ma"] = X_copy["Close"].rolling(window=30).mean()
return X_copy
class NanDropper(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
if isinstance(X, pd.DataFrame):
return X.dropna(axis=0)
return pd.DataFrame(X).dropna(axis=0)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
pipeline = Pipeline([
("feature_updater", FeatureUpdater()),
("nan_dropper", NanDropper()),
("scaler", MinMaxScaler())
])
df_tr = pd.DataFrame(pipeline.fit_transform(df), columns=FEATURE_NAMES)
df_tr.head()
# plot data
def plot_data(df):
ax1 = plt.subplot2grid(shape=(6, 1), loc=(0, 0), rowspan=5)
ax2 = plt.subplot2grid(shape=(6, 1), loc=(5, 0), sharex=ax1)
df.plot(y=["High", "Low", "Open", "Close", "10ma", "30ma"], ax=ax1)
df.plot(y="Volume", ax=ax2, title="Volume")
plt.show()
plot_data(df_tr)
# split data into training, validation and test sets
"""
from sklearn.model_selection import train_test_split
train_X, test_X, train_y, test_y = train_test_split(X, y, test_size=TEST_RATIO)
train_X, val_X, train_y, val_y = train_test_split(train_X, train_y, test_size=VALIDATION_RATIO)
"""
TRAIN_RATIO = 0.7
VALIDATION_RATIO = 0.15
def split_input_output(df, target_feature="Open"):
if not isinstance(df, pd.DataFrame):
df = pd.DataFrame(df, columns=FEATURE_NAMES)
X = np.array([df.iloc[i : i + WINDOW, :].values for i in range(len(df) - WINDOW)])
y = np.expand_dims(df[target_feature][WINDOW:].values, axis=1)
return X, y
def split_train_val_test(X, y):
assert X.shape[0] == y.shape[0]
train_val_split = int(TRAIN_RATIO * X.shape[0])
val_test_split = int((TRAIN_RATIO + VALIDATION_RATIO) * X.shape[0])
train_X = X[:train_val_split]
train_y = y[:train_val_split]
val_X = X[train_val_split : val_test_split]
val_y = y[train_val_split : val_test_split]
test_X = X[val_test_split:]
test_y = y[val_test_split:]
return train_X, train_y, val_X, val_y, test_X, test_y
X, y = split_input_output(df_tr)
train_X, train_y, val_X, val_y, test_X, test_y = split_train_val_test(X, y)
print("Training input shape: ", train_X.shape)
print("Training output shape: ", train_y.shape)
print("Validation input shape: ", val_X.shape)
print("Validation ouput shape: ", val_y.shape)
print("Test input shape: ", test_X.shape)
print("Test output shape: ", test_y.shape)
# the models and tensorboard logs will be saved to the following folders
LOG_DIR = "/content/logs"
MODEL_DIR = "/content/models"
if not os.path.isdir(LOG_DIR):
os.makedirs(LOG_DIR)
if not os.path.isdir(MODEL_DIR):
os.makedirs(MODEL_DIR)
# feedforward neural network
name_ffnn = "FFNN-" + str(int(time.time()))
model_ffnn = Sequential([
Flatten(input_shape=train_X.shape[1:]),
Dense(128, activation="relu"),
Dense(64, activation="relu"),
Dense(32, activation="relu"),
Dense(16, activation="relu"),
Dense(1)
])
model_ffnn.compile(optimizer="adam", loss="mse")
model_ffnn.summary()
history_ffnn = model_ffnn.fit(train_X, train_y, validation_data=(val_X, val_y), batch_size=BATCH_SIZE, epochs=EPOCHS, callbacks=[TensorBoard(log_dir=os.path.join(LOG_DIR, name_ffnn))])
model_ffnn.evaluate(test_X, test_y)
model_ffnn.save(os.path.join(MODEL_DIR, name_ffnn + ".h5"))
# lstm
name_lstm = "LSTM-" + str(int(time.time()))
model_lstm = Sequential([
LSTM(128, return_sequences=True, input_shape=(None, train_X.shape[2])), # accept input of arbitary length
Dropout(0.2),
LSTM(128, return_sequences=True),
Dropout(0.2),
LSTM(128),
Dropout(0.2),
Flatten(),
Dense(64),
Dropout(0.2),
Dense(1)
])
model_lstm.compile(optimizer=Adam(lr=0.001, decay=1e-6), loss="mse")
model_lstm.summary()
history_lstm = model_lstm.fit(train_X, train_y, validation_data=(val_X, val_y), batch_size=BATCH_SIZE, epochs=EPOCHS, callbacks=[TensorBoard(log_dir=os.path.join(LOG_DIR, name_lstm))])
model_lstm.evaluate(test_X, test_y)
model_lstm.save(os.path.join(MODEL_DIR, name_lstm + ".h5"))
# gru
name_gru = "GRU-" + str(int(time.time()))
model_gru = Sequential([
GRU(128, return_sequences=True, input_shape=(None, train_X.shape[2])), # accept input of arbitary length
Dropout(0.2),
GRU(128, return_sequences=True),
Dropout(0.2),
GRU(128),
Dropout(0.2),
Flatten(),
Dense(64),
Dropout(0.2),
Dense(1)
])
model_gru.compile(optimizer=Adam(lr=0.001, decay=1e-6), loss="mse")
model_gru.summary()
history_gru = model_gru.fit(train_X, train_y, validation_data=(val_X, val_y), batch_size=BATCH_SIZE, epochs=EPOCHS, callbacks=[TensorBoard(log_dir=os.path.join(LOG_DIR, name_gru))])
model_gru.evaluate(test_X, test_y)
model_gru.save(os.path.join(MODEL_DIR, name_gru + ".h5"))
# 1d cnn
name_cnn = "CNN-" + str(int(time.time()))
model_cnn = Sequential([
Conv1D(128, 2, use_bias=True, activation="relu", kernel_initializer="VarianceScaling", input_shape=train_X.shape[1:]),
AveragePooling1D(strides=1),
Conv1D(64, 2, use_bias=True, activation="relu", kernel_initializer="VarianceScaling"),
AveragePooling1D(strides=1),
Conv1D(32, 2, use_bias=True, activation="relu", kernel_initializer="VarianceScaling"),
AveragePooling1D(strides=1),
Flatten(),
Dense(256, activation="relu"),
Dense(1)
])
model_cnn.compile(optimizer="adam", loss="mse")
model_cnn.summary()
history_cnn = model_cnn.fit(train_X, train_y, validation_data=(val_X, val_y), batch_size=BATCH_SIZE, epochs=EPOCHS, callbacks=[TensorBoard(log_dir=os.path.join(LOG_DIR, name_cnn))])
model_cnn.evaluate(test_X, test_y)
model_cnn.save(os.path.join(LOG_DIR, name_cnn + ".h5"))
# visualise model predictions
def visualize_predictions(days, actual, models, X, labels):
assert len(models) == len(labels)
plt.plot(days, actual, label="Actual Price")
for i, model in enumerate(models):
plt.plot(days, model.predict(X), label=labels[i])
plt.legend()
plt.show()
visualize_predictions(
days=df_tr.index[WINDOW:].values,
actual=y,
models=[model_ffnn, model_lstm, model_gru, model_cnn],
X=X,
labels=["FFNN", "LSTM", "GRU", "CNN"]
)
# plot model losses and training
def plot_losses(histories, labels):
assert len(histories) == len(labels)
ax_loss = plt.subplot2grid(shape=(1, 2), loc=(0, 0))
ax_val_loss = plt.subplot2grid(shape=(1, 2), loc=(0, 1))
for i, history in enumerate(histories):
ax_loss.plot(range(EPOCHS), history.history["loss"], label=labels[i])
ax_val_loss.plot(range(EPOCHS), history.history["val_loss"], label=labels[i])
ax_loss.legend()
ax_val_loss.legend()
plt.show()
plot_losses(
histories=[history_ffnn, history_lstm, history_gru, history_cnn],
labels=["FFNN", "LSTM", "GRU", "CNN"]
)
# experimenting with different architectures for the lstm
lstm_sizes = [256, 512]
lstm_layer_nums = [1, 2, 3]
dropout_rates = [0, 0.2, 0.4]
dense_sizes = [256, 512]
for lstm_size in lstm_sizes:
for lstm_layer_num in lstm_layer_nums:
for dropout_rate in dropout_rates:
for dense_size in dense_sizes:
name = f"LSTM-{lstm_size}-{lstm_layer_num}-{dropout_rate}-{dense_size}-{int(time.time())}"
model = Sequential()
if lstm_layer_num == 1:
model.add(LSTM(lstm_size, input_shape=(None, train_X.shape[2])))
model.add(Dropout(dropout_rate))
else:
model.add(LSTM(lstm_size, return_sequences=True, input_shape=(None, train_X.shape[2])))
model.add(Dropout(dropout_rate))
for _ in range(lstm_layer_num - 2):
model.add(LSTM(lstm_size, return_sequences=True))
model.add(Dropout(dropout_rate))
model.add(LSTM(lstm_size))
model.add(Dropout(dropout_rate))
model.add(Flatten())
model.add(Dense(dense_size))
model.add(Dropout(dropout_rate))
model.add(Dense(1))
model.compile(optimizer=Adam(lr=0.001, decay=1e-6), loss="mse")
model.fit(
train_X,
train_y,
validation_data=(val_X, val_y),
batch_size=BATCH_SIZE,
epochs=EPOCHS,
callbacks=[TensorBoard(log_dir=os.path.join(LOG_DIR, name))],
verbose=0
)
loss = model.evaluate(test_X, test_y, verbose=0)
print(name.ljust(37), loss)
model.save(os.path.join(MODEL_DIR, name + ".h5"))
We will now use tensorboard to compare the models. This notebook was run on google colab to train the models using a hosted gpu server, and hence localtunnel is used to help run tensorboard.
# install localtunnel
!npm install -g localtunnel
# tensorboard
get_ipython().system_raw(f'tensorboard --logdir {LOG_DIR} --host 0.0.0.0 --port 6006 &')
get_ipython().system_raw('lt --port 6006 >> url.txt 2>&1 &')
!cat url.txt
It is apparent that the model with 256 lstm nodes, 1 lstm layer, a dropout rate of 0.4 and with 512 dense layers produces the best results, with the lowest test and validation losses (0.00042 and 0.00100 respectively)
BEST_MODEL = os.path.join(MODEL_DIR, "LSTM-256-1-0.4-512-1568463412.h5") # change the model name if the timestamp is different
# download the best model
from google.colab import files
files.download(BEST_MODEL)
# test the final model on other companies' stocks
def test_company(name):
dt = datetime.datetime.now()
df = DataReader(name, "yahoo", start=dt-relativedelta(years=5), end=dt)
df_tr = pd.DataFrame(pipeline.fit_transform(df), columns=FEATURE_NAMES)
X, y = split_input_output(df_tr)
model = tf.keras.models.load_model(BEST_MODEL)
model.evaluate(X, y)
visualize_predictions(
days=df_tr.index[WINDOW:].values,
actual=y,
models=[model],
X=X,
labels=["LSTM Predictions"],
)
# apple stocks
test_company("AAPL")
# google stocks
test_company("GOOGL")
# nvidia stocks
test_company("NVDA")
# disney stocks
test_company("DIS")
# dbs stocks
test_company("D05.SI")
It is apparent that the model generates accurate predictions even for the stock prices of other companies it was not trained on.
# get the actual price and predictions
def unscale_price(data, feature="Open"):
"""data is a 1-D array of opening prices"""
scaler = pipeline.named_steps["scaler"]
scaler_copy = MinMaxScaler()
col_no = FEATURE_NAMES.index(feature)
scaler_copy.min_, scaler_copy.scale_ = scaler.min_[col_no], scaler.scale_[col_no]
return scaler_copy.inverse_transform(data)
def test_company_table(name):
dt = datetime.datetime.now()
df = DataReader(name, "yahoo", start=dt-relativedelta(years=5), end=dt)
df_tr = pd.DataFrame(pipeline.fit_transform(df), columns=FEATURE_NAMES)
X, y = split_input_output(df_tr)
model = tf.keras.models.load_model(BEST_MODEL) # change the model name if the timestamp is different
predictions_unscaled = unscale_price(model.predict(X)).flatten()
actual_unscaled = unscale_price(y).flatten()
table = pd.DataFrame({"Date": df.index[WINDOW + 29:].values, "Predicted": predictions_unscaled, "Actual": actual_unscaled})
table.set_index("Date", inplace=True)
return table
nflx_table = test_company_table("NFLX")
nflx_table.head()
# mean absolute error
np.mean(np.abs(nflx_table["Predicted"] - nflx_table["Actual"]))
As you can see, the actual error of the model in terms of money is around 20 cents, which is too high and not practical in stock trading.
However, predicting the range of prices for that day (the high and low prices) would be more useful as valuable information can still be obtained even if those predictions are not accurate.
We will now modify the lstm model to predict the highest and lowest prices of the day. A separate model will be trained for each feature.
# model to predict day high
X, y = split_input_output(df_tr, target_feature="High")
train_X, train_y, val_X, val_y, test_X, test_y = split_train_val_test(X, y)
model_high = Sequential([
LSTM(256, input_shape=(None, train_X.shape[2])),
Dropout(0.4),
Flatten(),
Dense(512),
Dropout(0.4),
Dense(1)
])
model_high.compile(optimizer=Adam(lr=0.001, decay=1e-6), loss="mse")
model_high.fit(train_X, train_y, validation_data=(val_X, val_y), batch_size=BATCH_SIZE, epochs=EPOCHS)
model_high.evaluate(test_X, test_y)
model_high.save(os.path.join(MODEL_DIR, "LSTM-high.h5"))
# plot predictions
visualize_predictions(
days=df_tr.index[WINDOW:].values,
actual=y,
models=[model_high],
X=X,
labels=["LSTM"]
)
# model to predict day low
X, y = split_input_output(df_tr, target_feature="Low")
train_X, train_y, val_X, val_y, test_X, test_y = split_train_val_test(X, y)
model_low = Sequential([
LSTM(256, input_shape=(None, train_X.shape[2])),
Dropout(0.4),
Flatten(),
Dense(512),
Dropout(0.4),
Dense(1)
])
model_low.compile(optimizer=Adam(lr=0.001, decay=1e-6), loss="mse")
model_low.fit(train_X, train_y, validation_data=(val_X, val_y), batch_size=BATCH_SIZE, epochs=EPOCHS)
model_low.evaluate(test_X, test_y)
model_low.save(os.path.join(MODEL_DIR, "LSTM-low.h5"))
# plot predictions
visualize_predictions(
days=df_tr.index[WINDOW:].values,
actual=y,
models=[model_low],
X=X,
labels=["LSTM"]
)
plt.plot(df_tr.index[WINDOW:].values, model_high.predict(X), label="Predicted high")
plt.plot(df_tr.index[WINDOW:].values, model_low.predict(X), label="Predicted low")
plt.legend()
plt.show()
# model to predict the high and low of the next 10 days
# the number of output neurons is now 20, which may affect the accuracy of the model
NEXT = 10
X = np.array([df_tr.iloc[i : i + WINDOW, :].values for i in range(len(df_tr) - WINDOW - NEXT)])
y = np.array([df_tr.loc[i + WINDOW : i + WINDOW + NEXT - 1, ["High", "Low"]].values for i in range(len(df_tr) - WINDOW - NEXT)])
print(X[:5])
print(y[:5])
train_X, train_y, val_X, val_y, test_X, test_y = split_train_val_test(X, y.reshape(y.shape[0], -1))
model_next = Sequential([
LSTM(256, input_shape=(None, train_X.shape[2])),
Dropout(0.4),
Flatten(),
Dense(512),
Dropout(0.4),
Dense(NEXT * 2)
])
model_next.compile(optimizer=Adam(lr=0.001, decay=1e-6), loss="mse")
model_next.fit(train_X, train_y, validation_data=(val_X, val_y), batch_size=BATCH_SIZE, epochs=EPOCHS)
model_next.evaluate(test_X, test_y)
model_next.save(os.path.join(MODEL_DIR, f"LSTM-next-{NEXT}-high-low.h5"))
As you can see, the accuracy of the model is clearly affected due to the higher amount of predictions and hence greater complexity, thus a larger model may be required to make accurate predictions futher into the future.
predicted = model_next.predict(np.expand_dims(df_tr.iloc[-100:, :].values, axis=0)).flatten()
high = predicted[:NEXT]
low = predicted[NEXT:]
plt.plot(df_tr.index[WINDOW:].values, df_tr["High"][WINDOW:].values, label="Actual High")
plt.plot(df_tr.index[WINDOW:].values, df_tr["Low"][WINDOW:].values, label="Actual Low")
plt.plot(range(df_tr.index[-1], df_tr.index[-1] + NEXT), high, label="Predicted High")
plt.plot(range(df_tr.index[-1], df_tr.index[-1] + NEXT), low, label="Predicted Low")
plt.legend()
plt.show()
pipeline.fit(df)
high_low_predictions_df = pd.DataFrame({"High": unscale_price(np.expand_dims(high, axis=1), feature="High").flatten(),
"Low": unscale_price(np.expand_dims(low, axis=1), feature="Low").flatten()})
high_low_predictions_df
# predict the next day's opening price
model_lstm = tf.keras.models.load_model(BEST_MODEL)
pipeline.fit(df)
unscale_price(model_lstm.predict(np.expand_dims(df_tr.iloc[-100:, :].values, axis=0)))
# sentiment analysis
import tweepy
from textblob import TextBlob
CONSUMER_KEY = "" # missing; the code therefore cannot run
CONSUMER_SECRET = ""
ACCESS_TOKEN = ""
ACCESS_TOKEN_SECRET = ""
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
THRESHOLD = 0
score = 0
public_tweets = api.search("Facebook")
for tweet in public_tweets:
analysis = TextBlob(tweet.text)
if analysis.sentiment.polarity >= THRESHOLD:
score += 1
else:
score -= 1
score