Azure ML Workspace - [Tensorflow] Time Series Example | Continuation 2
This notebook is the same as the previous notebook. The goals is simply to update it to make a few enhancements that became apparent during the process of making the first notebook.
Enhancements
- Implement multiple step prediciton model
- splitting pipeline into two steps: prep_data and train
- rendering logged plots
- switching to table interface for logging scoring metrics instead of list
%load_ext autoreload
%autoreload 2
%matplotlib inline
%config Completer.use_jedi = False
import os
import datetime as dt
from PIL import Image
from pathlib import Path
from dotenv import load_dotenv
import pandas as pd
import numpy as np
import tensorflow as tf
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
print(f"pandas version {pd.__version__}")
print(f"numpy version {np.__version__}")
print(f"tensorflow version {tf.__version__}")
import azureml.core as aml
from azureml.core import Workspace, ScriptRunConfig, Environment, Experiment, Run
from azureml.core import Datastore, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core import Model
from azureml.core.resource_configuration import ResourceConfiguration
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.core import Pipeline, PipelineParameter
from azureml.pipeline.steps import PythonScriptStep
from azureml.widgets import RunDetails
print(f"azureml version {aml.__version__}")
import tensorboard
from azureml.tensorboard import Tensorboard
print(f"tensorboard version {tensorboard.__version__}")
import twelvedata
from twelvedata import TDClient
print(f"twelvedata version {twelvedata.__version__}")
This is a personal preference of mine to make a .env file per project to encapsulate tokens/secrets/etc outside of notebooks.
In this case I created a file named .env with a single variable apikey=(api key) in the same directory as my experiment.
env_path = Path("../.env")
assert env_path.exists()
_ = load_dotenv(env_path)
mpl.rcParams['figure.figsize'] = (12, 8)
mpl.rcParams['axes.grid'] = False
Azure ML Workspace
To setup an Azure ML Workspace you will need an azure account (with credit card). To spin it up simply go to https://portal.azure.com/ and type machine learning in the search bar and create a workspace.
Once you have a workspace you will need to download the config.json prior to going to https://ml.azure.com/ to access your workspace
workspace_config_path = Path("../config.json")
assert workspace_config_path.exists()
ws = Workspace.from_config(path=workspace_config_path)
Twelve Data Client
I setup an account at https://twelvedata.com/ to get a free api key to try it out. I had not heard of it before, but it was the first thing that came up in my google search for free market data...
apikey = os.environ.get("apikey")
td = TDClient(apikey=apikey)
compute_name = "aml-compute"
vm_size = "Standard_NC6"
# vm_size = "Standard_NC6s_v3"
if compute_name in ws.compute_targets:
compute_target = ws.compute_targets[compute_name]
if compute_target and type(compute_target) is AmlCompute:
print('Found compute target: ' + compute_name)
else:
print('Creating a new compute target...')
provisioning_config = AmlCompute.provisioning_configuration(vm_size=vm_size, # STANDARD_NC6 is GPU-enabled
min_nodes=0,
max_nodes=4)
# create the compute target
compute_target = ComputeTarget.create(
ws, compute_name, provisioning_config)
# Can poll for a minimum number of nodes and for a specific timeout.
# If no min node count is provided it will use the scale settings for the cluster
compute_target.wait_for_completion(
show_output=True, min_node_count=None, timeout_in_minutes=20)
# For a more detailed view of current cluster status, use the 'status' property
print(compute_target.status.serialize())
etf_data = td.get_etf_list()
etf_list = etf_data.as_json()
etf_df = pd.DataFrame(etf_list)
etf_df.head()
end_date = pd.Timestamp(dt.datetime.today())
start_date = end_date - pd.tseries.offsets.BDay(252) * 2
start_date.to_pydatetime().date(), end_date.to_pydatetime().date()
ticker = "IVOL"
ts = td.time_series(
symbol=ticker,
interval="1day",
start_date=start_date,
end_date=end_date,
outputsize=500
)
df = ts.with_ema().with_vwap().as_pandas()
df.describe()
df.head().reset_index()
data_store = ws.get_default_datastore()
def get_or_upload_df(ws, data_store, df, ticker):
dataset_name = f'{ticker.lower()}_ds'
try:
ds = Dataset.get_by_name(workspace=ws, name=dataset_name)
df = ds.to_pandas_dataframe()
except:
Dataset.Tabular.register_pandas_dataframe(df, data_store, dataset_name)
ds = Dataset.get_by_name(workspace=ws, name=dataset_name)
df = ds.to_pandas_dataframe()
return df
aml_df = get_or_upload_df(ws, data_store, df.reset_index(), ticker)
aml_df.head()
src_dir = 'aml-exp-multi'
aml_exp = Path(src_dir)
if not aml_exp.exists(): aml_exp.mkdir()
%%writefile aml-exp-multi/data-prep.py
# Standard Libraries
import argparse
import datetime as dt
from pathlib import Path
# 3rd Party Libraries
import numpy as np
import pandas as pd
# Plotting Libraries
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
# Azure ML Libraries
from azureml.core import Run
from azureml.core import Dataset
# ML Run
run = Run.get_context()
workspace = run.experiment.workspace
# Read in Args
parser = argparse.ArgumentParser()
parser.add_argument('--input_dataset', dest='input_dataset', required=True)
parser.add_argument('--train_dataset', dest='train_dataset', required=True)
parser.add_argument('--val_dataset', dest='val_dataset', required=True)
parser.add_argument('--test_dataset', dest='test_dataset', required=True)
args = parser.parse_args()
# Dataset & Prep
ds = run.input_datasets['input_dataset']
df = ds.to_pandas_dataframe()
# Date Feature Prep
day = 24*60*60
year = (365.2425)*day
date_time = pd.to_datetime(df.datetime)
timestamp_s = date_time.map(dt.datetime.timestamp)
df['day_sin'] = np.sin(timestamp_s * (2 * np.pi / day))
df['day_cos'] = np.cos(timestamp_s * (2 * np.pi / day))
df['year_sin'] = np.sin(timestamp_s * (2 * np.pi / year))
df['year_cos'] = np.cos(timestamp_s * (2 * np.pi / year))
# Data Filter
features = ['day_sin', 'day_cos', 'low', 'high', 'volume', 'ema', 'vwap']
target = 'close'
columns = features + [target]
df = df[columns]
num_features = df.shape[1]
# Data Splitting
n = len(df)
train_df = df[0:int(n*0.6)]
val_df = df[int(n*0.6):int(n*0.8)]
test_df = df[int(n*0.8):]
# Data Normalization
train_mean = train_df.mean()
train_std = train_df.std()
train_df = (train_df - train_mean) / train_std
val_df = (val_df - train_mean) / train_std
test_df = (test_df - train_mean) / train_std
# Data Distribution Check
df_std = (df - train_mean) / train_std
df_std = df_std.melt(var_name='Column', value_name='Normalized')
plt.figure(figsize=(12, 6))
ax = sns.violinplot(x='Column', y='Normalized', data=df_std)
_ = ax.set_xticklabels(df.keys(), rotation=45)
run.log_image('feature_distribution_check', plot=plt)
print(run.output_datasets)
print(type(args.train_dataset), args.train_dataset)
with (Path(args.train_dataset) / 'train.csv').open('w') as f:
train_df.to_csv(f)
with (Path(args.val_dataset)/ 'val.csv').open('w') as f:
val_df.to_csv(f)
with (Path(args.test_dataset) / 'test.csv').open('w') as f:
test_df.to_csv(f)
%%writefile aml-exp-multi/train.py
# Standard Libraries
import argparse
import json
import os
import datetime as dt
from functools import partial
# 3rd Party Libraries
import numpy as np
import pandas as pd
import tensorflow as tf
# Plotting Libraries
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
# Azure ML Libraries
from azureml.core import Run
from azureml.core import Dataset, Datastore
from azureml.data.datapath import DataPath
from azureml.core import Model
from azureml.tensorboard.export import export_to_tensorboard
from sklearn.metrics import confusion_matrix
# Classes
class WindowGenerator():
def __init__(self, input_width, label_width, shift,
train_df, val_df, test_df,
label_columns=None):
# Store the raw data.
self.train_df = train_df
self.val_df = val_df
self.test_df = test_df
# Work out the label column indices.
self.label_columns = label_columns
if label_columns is not None:
self.label_columns_indices = {name: i for i, name in
enumerate(label_columns)}
self.column_indices = {name: i for i, name in
enumerate(train_df.columns)}
# Work out the window parameters.
self.input_width = input_width
self.label_width = label_width
self.shift = shift
self.total_window_size = input_width + shift
self.input_slice = slice(0, input_width)
self.input_indices = np.arange(self.total_window_size)[self.input_slice]
self.label_start = self.total_window_size - self.label_width
self.labels_slice = slice(self.label_start, None)
self.label_indices = np.arange(self.total_window_size)[self.labels_slice]
def __repr__(self):
return '\n'.join([
f'Total window size: {self.total_window_size}',
f'Input indices: {self.input_indices}',
f'Label indices: {self.label_indices}',
f'Label column name(s): {self.label_columns}'])
@property
def train(self):
return self.make_dataset(self.train_df)
@property
def val(self):
return self.make_dataset(self.val_df)
@property
def test(self):
return self.make_dataset(self.test_df)
@property
def example(self):
"""Get and cache an example batch of `inputs, labels` for plotting."""
result = getattr(self, '_example', None)
if result is None:
# No example batch was found, so get one from the `.train` dataset
result = next(iter(self.train))
# And cache it for next time
self._example = result
return result
def split_window(self, features):
inputs = features[:, self.input_slice, :]
labels = features[:, self.labels_slice, :]
if self.label_columns is not None:
labels = tf.stack(
[labels[:, :, self.column_indices[name]] for name in self.label_columns],
axis=-1)
# Slicing doesn't preserve static shape information, so set the shapes
# manually. This way the `tf.data.Datasets` are easier to inspect.
inputs.set_shape([None, self.input_width, None])
labels.set_shape([None, self.label_width, None])
return inputs, labels
def make_dataset(self, data):
data = np.array(data, dtype=np.float32)
ds = tf.keras.preprocessing.timeseries_dataset_from_array(
data=data,
targets=None,
sequence_length=self.total_window_size,
sequence_stride=1,
shuffle=True,
batch_size=32,)
ds = ds.map(self.split_window)
return ds
def plot(self, plot_col, model=None, max_subplots=3):
plt.figure(figsize=(12, 8))
plot_col_index = self.column_indices[plot_col]
inputs, labels = self.example
max_n = min(max_subplots, len(inputs))
for n in range(max_n):
plt.subplot(max_n, 1, n+1)
plt.ylabel(f'{plot_col} [normed]')
plt.plot(self.input_indices, inputs[n, :, plot_col_index],
label='Inputs', marker='.', zorder=-10)
if self.label_columns:
label_col_index = self.label_columns_indices.get(plot_col, None)
else:
label_col_index = plot_col_index
if label_col_index is None:
continue
plt.scatter(self.label_indices, labels[n, :, label_col_index],
edgecolors='k', label='Labels', c='#2ca02c', s=64)
if model is not None:
predictions = model(inputs)
plt.scatter(self.label_indices, predictions[n, :, label_col_index],
marker='X', edgecolors='k', label='Predictions',
c='#ff7f0e', s=64)
if n == 0:
plt.legend()
plt.xlabel('Time')
class MultiStepLastBaseline(tf.keras.Model):
def __init__(self, label_index=None):
super().__init__()
self.label_index = label_index
def call(self, inputs):
if self.label_index is None:
return tf.tile(inputs[:, -1:, :], [1, OUT_STEPS, 1])
return tf.tile(tf.expand_dims(inputs[:, -1:, self.label_index], -1), [1, OUT_STEPS, 1])
class RepeatBaseline(tf.keras.Model):
def __init__(self, label_index=None):
super().__init__()
self.label_index = label_index
def call(self, inputs):
if self.label_index is None:
return inputs
result = inputs[:, :, self.label_index]
return result[:, :, tf.newaxis]
class FeedBack(tf.keras.Model):
def __init__(self, units, out_steps, num_features):
super().__init__()
self.out_steps = out_steps
self.units = units
self.lstm_cell = tf.keras.layers.LSTMCell(units)
# Also wrap the LSTMCell in an RNN to simplify the `warmup` method.
self.lstm_rnn = tf.keras.layers.RNN(self.lstm_cell, return_state=True)
self.dense = tf.keras.layers.Dense(num_features)
def warmup(self, inputs):
# inputs.shape => (batch, time, features)
# x.shape => (batch, lstm_units)
x, *state = self.lstm_rnn(inputs)
# predictions.shape => (batch, features)
prediction = self.dense(x)
return prediction, state
def call(self, inputs, training=None):
# Use a TensorArray to capture dynamically unrolled outputs.
predictions = []
# Initialize the lstm state
prediction, state = self.warmup(inputs)
# Insert the first prediction
predictions.append(prediction)
# Run the rest of the prediction steps
for n in range(1, self.out_steps):
# Use the last prediction as input.
x = prediction
# Execute one lstm step.
x, state = self.lstm_cell(x, states=state,
training=training)
# Convert the lstm output to a prediction.
prediction = self.dense(x)
# Add the prediction to the output
predictions.append(prediction)
# predictions.shape => (time, batch, features)
predictions = tf.stack(predictions)
# predictions.shape => (batch, time, features)
predictions = tf.transpose(predictions, [1, 0, 2])
return predictions
# Global Variables
MAX_EPOCHS = 20
OUT_STEPS = 24
CONV_WIDTH = 3
# Paths
os.makedirs('./outputs', exist_ok=True)
os.makedirs('./outputs/model', exist_ok=True)
os.makedirs('./outputs/log', exist_ok=True)
# Read in Args
parser = argparse.ArgumentParser()
parser.add_argument('--train_dataset', dest='train_dataset', required=True)
parser.add_argument('--val_dataset', dest='val_dataset', required=True)
parser.add_argument('--test_dataset', dest='test_dataset', required=True)
args = parser.parse_args()
# ML Run
run = Run.get_context()
workspace = run.experiment.workspace
datastore = workspace.get_default_datastore()
train_ds = run.input_datasets['train_dataset']
val_ds = run.input_datasets['val_dataset']
test_ds = run.input_datasets['test_dataset']
train_df = train_ds.to_pandas_dataframe()
val_df = val_ds.to_pandas_dataframe()
test_df = test_ds.to_pandas_dataframe()
target = train_df.columns[-1]
num_features = train_df.shape[1]
# Data Windows
multi_window = WindowGenerator(input_width=24,
label_width=OUT_STEPS, shift=OUT_STEPS,
train_df=train_df, val_df=val_df, test_df=test_df,
label_columns=[target])
multi_window.plot(target)
run.log_image(f'{target}_variable', plot=plt)
# Baseline
val_performance, tst_performance = {}, {}
def log_result(name, model, window, target, vals, tsts):
vals[name] = model.evaluate(window.val)
tsts[name] = model.evaluate(window.test, verbose=0)
window.plot(target, model)
run.log_image(f'{name}_pred', plot=plt)
tf.saved_model.save(model, f'outputs/model/{name}')
log_model = partial(log_result, window=multi_window, target=target, vals=val_performance, tsts=tst_performance)
print(f"target indice: {multi_window.column_indices.get(target)}")
last_baseline = MultiStepLastBaseline(-1)
last_baseline.compile(loss=tf.losses.MeanSquaredError(),
metrics=[tf.metrics.MeanAbsoluteError()])
log_model('last_baseline', last_baseline)
repeat_baseline = RepeatBaseline(-1)
repeat_baseline.compile(loss=tf.losses.MeanSquaredError(),
metrics=[tf.metrics.MeanAbsoluteError()])
log_model('repeat_baseline', repeat_baseline)
# Train Models
def compile_and_fit(model, window, patience=4):
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
patience=patience,
mode='min')
model.compile(loss=tf.losses.MeanSquaredError(),
optimizer=tf.optimizers.Adam(),
metrics=[tf.metrics.MeanAbsoluteError()])
history = model.fit(window.train, epochs=MAX_EPOCHS,
validation_data=window.val,
callbacks=[early_stopping])
model.summary()
return history
compile_and_fit_multi = partial(compile_and_fit, window=multi_window)
# Train Linear Model
linear = tf.keras.Sequential([
# Take the last time-step.
# Shape [batch, time, features] => [batch, 1, features]
tf.keras.layers.Lambda(lambda x: x[:, -1:, :]),
# Shape => [batch, 1, out_steps*features]
tf.keras.layers.Dense(OUT_STEPS*num_features,
kernel_initializer=tf.initializers.zeros()),
# Shape => [batch, out_steps, features]
tf.keras.layers.Reshape([OUT_STEPS, num_features])
])
history = compile_and_fit_multi(linear)
log_model('linear', linear)
# Train Dense Model
dense = tf.keras.Sequential([
# Take the last time step.
# Shape [batch, time, features] => [batch, 1, features]
tf.keras.layers.Lambda(lambda x: x[:, -1:, :]),
# Shape => [batch, 1, dense_units]
tf.keras.layers.Dense(512, activation='relu'),
# Shape => [batch, out_steps*features]
tf.keras.layers.Dense(OUT_STEPS*num_features,
kernel_initializer=tf.initializers.zeros()),
# Shape => [batch, out_steps, features]
tf.keras.layers.Reshape([OUT_STEPS, num_features])
])
history = compile_and_fit_multi(dense)
log_model('dense', dense)
# Train Conv Model
conv = tf.keras.Sequential([
# Shape [batch, time, features] => [batch, CONV_WIDTH, features]
tf.keras.layers.Lambda(lambda x: x[:, -CONV_WIDTH:, :]),
# Shape => [batch, 1, conv_units]
tf.keras.layers.Conv1D(256, activation='relu', kernel_size=(CONV_WIDTH)),
# Shape => [batch, 1, out_steps*features]
tf.keras.layers.Dense(OUT_STEPS*num_features,
kernel_initializer=tf.initializers.zeros()),
# Shape => [batch, out_steps, features]
tf.keras.layers.Reshape([OUT_STEPS, num_features])
])
history = compile_and_fit_multi(conv)
log_model('conv', conv)
# Train LSTM Model
lstm = tf.keras.Sequential([
# Shape [batch, time, features] => [batch, lstm_units]
# Adding more `lstm_units` just overfits more quickly.
tf.keras.layers.LSTM(32, return_sequences=False),
# Shape => [batch, out_steps*features]
tf.keras.layers.Dense(OUT_STEPS*num_features,
kernel_initializer=tf.initializers.zeros()),
# Shape => [batch, out_steps, features]
tf.keras.layers.Reshape([OUT_STEPS, num_features])
])
history = compile_and_fit_multi(lstm)
log_model("lstm", lstm)
# Train AR RNN Feedback Model
feedback = FeedBack(units=32, out_steps=OUT_STEPS, num_features=num_features)
prediction, state = feedback.warmup(multi_window.example[0])
history = compile_and_fit_multi(feedback)
log_model('lstm_feedback', feedback)
# Log Results & Select Best Model
best_model, best_score = None, None
if run is not None:
# log results
run.log_table('mae_val', {k: v[1] for (k, v) in val_performance.items()})
run.log_table('mse_val', {k: v[0] for (k, v) in val_performance.items()})
run.log_table('mae_tst', {k: v[1] for (k, v) in tst_performance.items()})
run.log_table('mse_tst', {k: v[0] for (k, v) in tst_performance.items()})
# select best
for k, v in tst_performance.items():
try:
mae = float(v[1])
if best_score is None and best_model is None:
best_model = k
best_score = mae
elif best_score > mae:
best_model = k
best_score = mae
except:
continue
run.log('best_model', best_model)
run.log('best_score', best_score)
# best_model_path = f'outputs/model/{best_model}'
# model = run.register_model(model_name=best_model, model_path=best_model_path)
aml_run_config = RunConfiguration()
aml_run_config.target = compute_target
aml_run_config.environment.python.user_managed_dependencies = False
# Add some packages relied on by data prep step
deps = CondaDependencies.create(
conda_packages=['pandas','scikit-learn', 'matplotlib', 'seaborn'],
pip_packages=['azureml-sdk', 'azureml-dataprep[fuse,pandas]',
'azureml-pipeline', 'azureml.tensorboard', 'azureml-interpret'],
python_version='3.6.2',
pin_sdk_version=True)
deps.add_tensorflow_pip_package(core_type='gpu', version='2.3.1')
aml_run_config.environment.python.conda_dependencies = deps
train_data = OutputFileDatasetConfig(name="train_data", destination=(data_store, 'train/tabular/')).read_delimited_files()
val_data = OutputFileDatasetConfig(name="val_data", destination=(data_store, 'val/tabular/')).read_delimited_files()
test_data = OutputFileDatasetConfig(name="test_data", destination=(data_store, 'test/tabular/')).read_delimited_files()
dataset_name = f'{ticker.lower()}_ds'
input_dataset = Dataset.get_by_name(ws, dataset_name)
prep_data_step = PythonScriptStep(
name="prep_data_step",
source_directory=src_dir,
script_name="data-prep.py",
arguments=[
"--input_dataset", input_dataset.as_named_input("input_dataset"),
"--train_dataset", train_data,
"--val_dataset", val_data,
"--test_dataset", test_data
],
runconfig=aml_run_config,
allow_reuse=True
)
train_step = PythonScriptStep(
name="train_step",
source_directory=src_dir,
script_name="train.py",
arguments=[
"--train_dataset", train_data.as_input(name="train_dataset"),
"--val_dataset", val_data.as_input(name="val_dataset"),
"--test_dataset", test_data.as_input(name="test_dataset")
],
runconfig=aml_run_config
)
steps = [prep_data_step, train_step]
pipeline = Pipeline(workspace=ws, steps=steps)
%%capture
experiment = Experiment(ws, 'aml_exp_multi')
script_run = experiment.submit(pipeline)
script_run.wait_for_completion(show_output=False)
RunDetails(script_run).show()
I find it best to simply go to the experiment portal url to review from the gui. It contains all the runs from your experiment and makes it easy to review changes from a central location.
script_run.get_portal_url()
step_name = 'prep_data_step'
step = script_run.find_step_run(step_name)[0]
step.get_portal_url()
The most useful thing for myself is to display the images stored during the running of a step.
imgs = [f for f in step.get_file_names() if f.endswith('.png')]
for img in imgs:
step.download_file(img)
display(img)
display(Image.open(img))
Path(img).unlink()
We can see that volumne and ema have wide tails / outliers to review and potentially deal with above.
However, you can choose to do the model review inside the notebook too.
The first place to look when doing this is the experiments metrics. In this example I'm logging the mse and mae for validation & test datasets for each model
step_name = 'train_step'
step = script_run.find_step_run(step_name)[0]
step.get_portal_url()
metrics_dict = step.get_metrics()
scores_df = pd.DataFrame()
for k, v in metrics_dict.items():
if isinstance(v, dict):
scores_df = scores_df.append(pd.DataFrame.from_dict(v, orient='index', columns=[k]).T)
scores_df.T.sort_values('mae_tst')
Our baseline is performing really well here. Proof that you should always start simple...
imgs = [f for f in step.get_file_names() if f.endswith('.png')]
for img in imgs:
display(img)
step.download_file(img)
display(Image.open(img))
Path(img).unlink()
print("starting compute cleanup")
for name, compute in ws.compute_targets.items():
print(f"deleting {name} instance")
compute.delete()
while len(ws.compute_targets.items()) != 0:
continue
print("compute cleanup complete")