このブログ記事の中でmatplotlibを使って作成されたグラフをplotlyグラフに書き換えてみたいと思います。
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from datetime import datetime
from datetime import timedelta
from tqdm import tqdm
from pandas_datareader import data as pdr
import yfinance as yf
sns.set()
tf.compat.v1.random.set_random_seed(1234)
tickers = (['AMD'])
import datetime
stocks_start = datetime.datetime(2018, 1, 1)
stocks_end = datetime.datetime(2019, 8, 20)
def get(tickers, startdate, enddate):
def data(ticker):
return (pdr.get_data_yahoo(ticker, start=startdate, end=enddate))
datas = map(data, tickers)
return(pd.concat(datas, keys=tickers, names=['Ticker', 'Date']))
all_data = get(tickers, stocks_start, stocks_end)
df = all_data[['Open','High','Low','Close','Adj Close','Volume']]
df.reset_index(level='Ticker',drop=True,inplace=True)
df.reset_index(inplace=True)
df.tail()
minmax = MinMaxScaler().fit(df.iloc[:, 4:5].astype('float32')) # Close index
df_log = minmax.transform(df.iloc[:, 4:5].astype('float32')) # Close index
df_log = pd.DataFrame(df_log)
df_log.head()
test_size = 30
simulation_size = 10
df_train = df_log.iloc[:-test_size]
df_test = df_log.iloc[-test_size:]
df.shape, df_train.shape, df_test.shape
class Model:
def __init__(
self,
learning_rate,
num_layers,
size,
size_layer,
output_size,
forget_bias = 0.1,
):
def lstm_cell(size_layer):
return tf.nn.rnn_cell.GRUCell(size_layer)
backward_rnn_cells = tf.nn.rnn_cell.MultiRNNCell(
[lstm_cell(size_layer) for _ in range(num_layers)],
state_is_tuple = False,
)
forward_rnn_cells = tf.nn.rnn_cell.MultiRNNCell(
[lstm_cell(size_layer) for _ in range(num_layers)],
state_is_tuple = False,
)
self.X = tf.placeholder(tf.float32, (None, None, size))
self.Y = tf.placeholder(tf.float32, (None, output_size))
drop_backward = tf.contrib.rnn.DropoutWrapper(
backward_rnn_cells, output_keep_prob = forget_bias
)
forward_backward = tf.contrib.rnn.DropoutWrapper(
forward_rnn_cells, output_keep_prob = forget_bias
)
self.backward_hidden_layer = tf.placeholder(
tf.float32, shape = (None, num_layers * size_layer)
)
self.forward_hidden_layer = tf.placeholder(
tf.float32, shape = (None, num_layers * size_layer)
)
_, last_state = tf.nn.bidirectional_dynamic_rnn(
forward_backward,
drop_backward,
self.X,
initial_state_fw = self.forward_hidden_layer,
initial_state_bw = self.backward_hidden_layer,
dtype = tf.float32,
)
with tf.variable_scope('decoder', reuse = False):
backward_rnn_cells_decoder = tf.nn.rnn_cell.MultiRNNCell(
[lstm_cell(size_layer) for _ in range(num_layers)],
state_is_tuple = False,
)
forward_rnn_cells_decoder = tf.nn.rnn_cell.MultiRNNCell(
[lstm_cell(size_layer) for _ in range(num_layers)],
state_is_tuple = False,
)
drop_backward_decoder = tf.contrib.rnn.DropoutWrapper(
backward_rnn_cells_decoder, output_keep_prob = forget_bias
)
forward_backward_decoder = tf.contrib.rnn.DropoutWrapper(
forward_rnn_cells_decoder, output_keep_prob = forget_bias
)
self.outputs, self.last_state = tf.nn.bidirectional_dynamic_rnn(
forward_backward_decoder, drop_backward_decoder, self.X,
initial_state_fw = last_state[0],
initial_state_bw = last_state[1],
dtype = tf.float32
)
self.outputs = tf.concat(self.outputs, 2)
self.logits = tf.layers.dense(self.outputs[-1], output_size)
self.cost = tf.reduce_mean(tf.square(self.Y - self.logits))
self.optimizer = tf.train.AdamOptimizer(learning_rate).minimize(
self.cost
)
def calculate_accuracy(real, predict):
real = np.array(real) + 1
predict = np.array(predict) + 1
percentage = 1 - np.sqrt(np.mean(np.square((real - predict) / real)))
return percentage * 100
def anchor(signal, weight):
buffer = []
last = signal[0]
for i in signal:
smoothed_val = last * weight + (1 - weight) * i
buffer.append(smoothed_val)
last = smoothed_val
return buffer
num_layers = 1
size_layer = 128
timestamp = 5
epoch = 300
dropout_rate = 0.8
future_day = test_size
learning_rate = 0.01
def forecast():
tf.reset_default_graph()
modelnn = Model(
learning_rate, num_layers, df_log.shape[1], size_layer, df_log.shape[1], dropout_rate
)
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())
date_ori = pd.to_datetime(df.iloc[:, 0]).tolist()
pbar = tqdm(range(epoch), desc = 'train loop')
for i in pbar:
init_value_forward = np.zeros((1, num_layers * size_layer))
init_value_backward = np.zeros((1, num_layers * size_layer))
total_loss, total_acc = [], []
for k in range(0, df_train.shape[0] - 1, timestamp):
index = min(k + timestamp, df_train.shape[0] - 1)
batch_x = np.expand_dims(
df_train.iloc[k : index, :].values, axis = 0
)
batch_y = df_train.iloc[k + 1 : index + 1, :].values
logits, last_state, _, loss = sess.run(
[modelnn.logits, modelnn.last_state, modelnn.optimizer, modelnn.cost],
feed_dict = {
modelnn.X: batch_x,
modelnn.Y: batch_y,
modelnn.backward_hidden_layer: init_value_backward,
modelnn.forward_hidden_layer: init_value_forward,
},
)
init_value_forward = last_state[0]
init_value_backward = last_state[1]
total_loss.append(loss)
total_acc.append(calculate_accuracy(batch_y[:, 0], logits[:, 0]))
pbar.set_postfix(cost = np.mean(total_loss), acc = np.mean(total_acc))
future_day = test_size
output_predict = np.zeros((df_train.shape[0] + future_day, df_train.shape[1]))
output_predict[0] = df_train.iloc[0]
upper_b = (df_train.shape[0] // timestamp) * timestamp
init_value_forward = np.zeros((1, num_layers * size_layer))
init_value_backward = np.zeros((1, num_layers * size_layer))
for k in range(0, (df_train.shape[0] // timestamp) * timestamp, timestamp):
out_logits, last_state = sess.run(
[modelnn.logits, modelnn.last_state],
feed_dict = {
modelnn.X: np.expand_dims(
df_train.iloc[k : k + timestamp], axis = 0
),
modelnn.backward_hidden_layer: init_value_backward,
modelnn.forward_hidden_layer: init_value_forward,
},
)
init_value_forward = last_state[0]
init_value_backward = last_state[1]
output_predict[k + 1 : k + timestamp + 1] = out_logits
if upper_b != df_train.shape[0]:
out_logits, last_state = sess.run(
[modelnn.logits, modelnn.last_state],
feed_dict = {
modelnn.X: np.expand_dims(df_train.iloc[upper_b:], axis = 0),
modelnn.backward_hidden_layer: init_value_backward,
modelnn.forward_hidden_layer: init_value_forward,
},
)
output_predict[upper_b + 1 : df_train.shape[0] + 1] = out_logits
future_day -= 1
date_ori.append(date_ori[-1] + timedelta(days = 1))
init_value_forward = last_state[0]
init_value_backward = last_state[1]
for i in range(future_day):
o = output_predict[-future_day - timestamp + i:-future_day + i]
out_logits, last_state = sess.run(
[modelnn.logits, modelnn.last_state],
feed_dict = {
modelnn.X: np.expand_dims(o, axis = 0),
modelnn.backward_hidden_layer: init_value_backward,
modelnn.forward_hidden_layer: init_value_forward,
},
)
init_value_forward = last_state[0]
init_value_backward = last_state[1]
output_predict[-future_day + i] = out_logits[-1]
date_ori.append(date_ori[-1] + timedelta(days = 1))
output_predict = minmax.inverse_transform(output_predict)
deep_future = anchor(output_predict[:, 0], 0.3)
return deep_future[-test_size:]
スポンサーリンク
予測シミュレーション¶
results = []
for i in range(simulation_size):
print('simulation %d'%(i + 1))
results.append(forecast())
上下幅の大きい予測結果を省く。
accepted_results = []
for r in results:
if (np.array(r[-test_size:]) < np.min(df['Close'])).sum() == 0 and \
(np.array(r[-test_size:]) > np.max(df['Close']) * 2).sum() == 0:
accepted_results.append(r)
len(accepted_results)
正確度が88以下の予測結果を省く。
accepted_results2 = []
for r in accepted_results:
if calculate_accuracy(df['Close'].iloc[-test_size:].values, r)>88:
accepted_results2.append(r)
len(accepted_results2)
accuracies = [calculate_accuracy(df['Close'].iloc[-test_size:].values, r) for r in accepted_results2]
plt.figure(figsize = (25, 15))
for no, r in enumerate(accepted_results2):
plt.plot(r, label = 'forecast %d'%(no + 1))
plt.plot(df['Close'].iloc[-test_size:].values, label = 'true trend', c = 'black',lw=2)
plt.legend(prop={'size': 25})
plt.rc('xtick', labelsize=30)
plt.rc('ytick', labelsize=25)
plt.title('average accuracy: %.4f'%(np.mean(accuracies)),fontsize=25)
plt.show()
matplotlibグラフは、明らかにデータの視覚効果に劣ることが一目瞭然かと思われます。とは言っても、plotlyよりも簡単に描画できるので重宝されるのだと思われます。
from plotly.offline import plot,iplot
import plotly.graph_objs as go
plotly_data = []
for no, r in enumerate(accepted_results2):
plotly_data.append(go.Scatter(y=r,
name='forecast %d'%(no + 1)))
trace = go.Scatter(
y = df['Close'].iloc[-test_size:].values,
name = 'true trend',
line = dict(
color = ('Black'),
width = 2)
)
layout = dict(title = 'average accuracy: %.4f'%(np.mean(accuracies)),
title_font=dict(size=24, family='Courier', color='black'),
yaxis=dict(title='株価',title_font=dict(size=22)
,tickfont=dict(size=20)),
xaxis=dict(title='Last 30 days',title_font=dict(size=22),tickfont=dict(size=20)),
autosize=False,width=800, height=640,
hovermode= 'x',
hoverlabel=dict(font=dict(size=24)),
legend=dict(x=-.001,y=1,font=dict(size=21,color='black'),bgcolor='rgba(0,0,0,0)'),
legend_orientation="v"
)
plotly_data.append(trace)
fig = dict(data=plotly_data, layout=layout)
plot(fig,show_link=False,filename="strock_price.html",include_plotlyjs=False)
seeing is believingではありませんが、matplotlibで描いたグラフに比べると、plotlyで描いたグラフの方が圧倒的に視覚効果が高いことが分かるかと思います。
スポンサーリンク
スポンサーリンク
コメント