Azure ML Workspace - [Tensorflow] Time Series Example
Okay, so I kind of live in Azure ML Workspace these days... leading me to want to make a small notebook utilizing it here. It's been changing pretty rapidly every ~6 months, so I'm going to include the versions I work on.
If your company is going down the the Azure road for public cloud, Azure ML Workspace (or AWS SageMaker) is probably the best solution to scale easy access to compute, datasets, experiments, etc. to different data science teams accross a large organization.
%load_ext autoreload
%autoreload 2
%matplotlib inline
import os
import datetime as dt
from pathlib import Path
from dotenv import load_dotenv
import pandas as pd
import numpy as np
# import tensorflow as tf
import matplotlib as mpl
import matplotlib.pyplot as plt
print(f"pandas version {pd.__version__}")
print(f"numpy version {np.__version__}")
# print(f"tensorflow version {tf.__version__}")
import azureml.core as aml
from azureml.core import Workspace, ScriptRunConfig, Environment, Experiment, Run
from azureml.core import Datastore, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core import Model
from azureml.core.resource_configuration import ResourceConfiguration
from azureml.pipeline.core import Pipeline, PipelineParameter
from azureml.pipeline.steps import PythonScriptStep
from azureml.widgets import RunDetails
print(f"azureml version {aml.__version__}")
import twelvedata
from twelvedata import TDClient
print(f"twelvedata version {twelvedata.__version__}")
This is a personal preference of mine to make a .env file per project to encapsulate tokens/secrets/etc outside of notebooks.
In this case I created a file named .env with a single variable apikey=(api key) in the same directory as my experiment.
env_path = Path(".env")
assert env_path.exists()
_ = load_dotenv(env_path)
mpl.rcParams['figure.figsize'] = (12, 8)
mpl.rcParams['axes.grid'] = False
Azure ML Workspace
To setup an Azure ML Workspace you will need an azure account (with credit card). To spin it up simply go to https://portal.azure.com/ and type machine learning in the search bar and create a workspace.
Once you have a workspace you will need to download the config.json prior to going to https://ml.azure.com/ to access your workspace
workspace_config_path = Path("config.json")
assert workspace_config_path.exists()
ws = Workspace.from_config(path=workspace_config_path)
Twelve Data Client
I setup an account at https://twelvedata.com/ to get a free api key to try it out. I had not heard of it before, but it was the first thing that came up in my google search for free market data...
apikey = os.environ.get("apikey")
td = TDClient(apikey=apikey)
compute_name = "aml-compute"
vm_size = "Standard_NC6"
# vm_size = "Standard_NC6s_v3"
if compute_name in ws.compute_targets:
compute_target = ws.compute_targets[compute_name]
if compute_target and type(compute_target) is AmlCompute:
print('Found compute target: ' + compute_name)
else:
print('Creating a new compute target...')
provisioning_config = AmlCompute.provisioning_configuration(vm_size=vm_size, # STANDARD_NC6 is GPU-enabled
min_nodes=0,
max_nodes=4)
# create the compute target
compute_target = ComputeTarget.create(
ws, compute_name, provisioning_config)
# Can poll for a minimum number of nodes and for a specific timeout.
# If no min node count is provided it will use the scale settings for the cluster
compute_target.wait_for_completion(
show_output=True, min_node_count=None, timeout_in_minutes=20)
# For a more detailed view of current cluster status, use the 'status' property
print(compute_target.status.serialize())
etf_data = td.get_etf_list()
etf_list = etf_data.as_json()
etf_df = pd.DataFrame(etf_list)
etf_df.head()
end_date = pd.Timestamp(dt.datetime.today())
start_date = end_date - pd.tseries.offsets.BDay(252)
start_date.to_pydatetime().date(), end_date.to_pydatetime().date()
ticker = "VOO"
ts = td.time_series(
symbol=ticker,
interval="1day",
start_date=start_date,
end_date=end_date,
outputsize=300
)
df = ts.with_ema().as_pandas()
df.describe()
df.head().reset_index()
data_store = ws.get_default_datastore()
def get_or_upload_df(ws, data_store, df, ticker):
dataset_name = f'{ticker.lower()}_ds'
try:
ds = Dataset.get_by_name(workspace=ws, name=dataset_name)
df = ds.to_pandas_dataframe()
except:
Dataset.Tabular.register_pandas_dataframe(df, data_store, dataset_name)
ds = Dataset.get_by_name(workspace=ws, name=dataset_name)
df = ds.to_pandas_dataframe()
return df
aml_df = get_or_upload_df(ws, data_store, df.reset_index(), ticker)
aml_df.head()
src_dir = 'aml-exp'
aml_exp = Path(src_dir)
if not aml_exp.exists(): aml_path.mkdir()
%%writefile aml-exp/train.py
# Standard Libraries
import argparse
import json
import os
import datetime as dt
# 3rd Party Libraries
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib as mpl
import matplotlib.pyplot as plt
from azureml.core import Run
from azureml.core import Dataset
from azureml.core import Model
from azureml.pipeline.core import Pipeline, PipelineParameter
from azureml.pipeline.steps import PythonScriptStep
from azureml.interpret import ExplanationClient
from sklearn.metrics import confusion_matrix
# Classes
class WindowGenerator():
def __init__(self, input_width, label_width, shift,
train_df, val_df, test_df,
label_columns=None):
# Store the raw data.
self.train_df = train_df
self.val_df = val_df
self.test_df = test_df
# Work out the label column indices.
self.label_columns = label_columns
if label_columns is not None:
self.label_columns_indices = {name: i for i, name in
enumerate(label_columns)}
self.column_indices = {name: i for i, name in
enumerate(train_df.columns)}
# Work out the window parameters.
self.input_width = input_width
self.label_width = label_width
self.shift = shift
self.total_window_size = input_width + shift
self.input_slice = slice(0, input_width)
self.input_indices = np.arange(self.total_window_size)[self.input_slice]
self.label_start = self.total_window_size - self.label_width
self.labels_slice = slice(self.label_start, None)
self.label_indices = np.arange(self.total_window_size)[self.labels_slice]
def __repr__(self):
return '\n'.join([
f'Total window size: {self.total_window_size}',
f'Input indices: {self.input_indices}',
f'Label indices: {self.label_indices}',
f'Label column name(s): {self.label_columns}'])
@property
def train(self):
return self.make_dataset(self.train_df)
@property
def val(self):
return self.make_dataset(self.val_df)
@property
def test(self):
return self.make_dataset(self.test_df)
@property
def example(self):
"""Get and cache an example batch of `inputs, labels` for plotting."""
result = getattr(self, '_example', None)
if result is None:
# No example batch was found, so get one from the `.train` dataset
result = next(iter(self.train))
# And cache it for next time
self._example = result
return result
def split_window(self, features):
inputs = features[:, self.input_slice, :]
labels = features[:, self.labels_slice, :]
if self.label_columns is not None:
labels = tf.stack(
[labels[:, :, self.column_indices[name]] for name in self.label_columns],
axis=-1)
# Slicing doesn't preserve static shape information, so set the shapes
# manually. This way the `tf.data.Datasets` are easier to inspect.
inputs.set_shape([None, self.input_width, None])
labels.set_shape([None, self.label_width, None])
return inputs, labels
def plot(self, plot_col, model=None, max_subplots=3):
plt.figure(figsize=(12, 8))
plot_col_index = self.column_indices[plot_col]
inputs, labels = self.example
max_n = min(max_subplots, len(inputs))
for n in range(max_n):
plt.subplot(max_n, 1, n+1)
plt.ylabel(f'{plot_col} [normed]')
plt.plot(self.input_indices, inputs[n, :, plot_col_index],
label='Inputs', marker='.', zorder=-10)
if self.label_columns:
label_col_index = self.label_columns_indices.get(plot_col, None)
else:
label_col_index = plot_col_index
if label_col_index is None:
continue
plt.scatter(self.label_indices, labels[n, :, label_col_index],
edgecolors='k', label='Labels', c='#2ca02c', s=64)
if model is not None:
predictions = model(inputs)
plt.scatter(self.label_indices, predictions[n, :, label_col_index],
marker='X', edgecolors='k', label='Predictions',
c='#ff7f0e', s=64)
if n == 0:
plt.legend()
plt.xlabel('Time')
def make_dataset(self, data):
data = np.array(data, dtype=np.float32)
ds = tf.keras.preprocessing.timeseries_dataset_from_array(
data=data,
targets=None,
sequence_length=self.total_window_size,
sequence_stride=1,
shuffle=True,
batch_size=32,)
ds = ds.map(self.split_window)
return ds
class Baseline(tf.keras.Model):
def __init__(self, label_index=None):
super().__init__()
self.label_index = label_index
def call(self, inputs):
if self.label_index is None:
return inputs
result = inputs[:, :, self.label_index]
return result[:, :, tf.newaxis]
# Global Variables
MAX_EPOCHS = 20
CONV_WIDTH = 3
# Read in Args
parser = argparse.ArgumentParser(description='Train')
parser.add_argument('--dataset_name', type=str, dest='dataset_name')
args = parser.parse_args()
# Paths
os.makedirs('./outputs', exist_ok=True)
os.makedirs('./outputs/model', exist_ok=True)
# ML Run
run = Run.get_context()
workspace = run.experiment.workspace
# ML Dataset
ds = Dataset.get_by_name(workspace=workspace, name=args.dataset_name)
df = ds.to_pandas_dataframe()
# Date Feature Prep
day = 24*60*60
year = (365.2425)*day
date_time = pd.to_datetime(df.datetime)
timestamp_s = date_time.map(dt.datetime.timestamp)
df['day_sin'] = np.sin(timestamp_s * (2 * np.pi / day))
df['day_cos'] = np.cos(timestamp_s * (2 * np.pi / day))
df['year_sin'] = np.sin(timestamp_s * (2 * np.pi / year))
df['year_cos'] = np.cos(timestamp_s * (2 * np.pi / year))
# Data Filter
features = ['day_sin', 'day_cos', 'ema']
target = 'close'
columns = features + [target]
df = df[columns]
# Data Splitting
n = len(df)
train_df = df[0:int(n*0.7)]
val_df = df[int(n*0.7):int(n*0.9)]
test_df = df[int(n*0.9):]
# Data Normalization
# TODO - normalize step based on train_df
# Data Windows
single_step_window = WindowGenerator(
input_width=1, label_width=1, shift=1,
train_df=train_df, val_df=val_df, test_df=test_df,
label_columns=[target])
wide_window = WindowGenerator(
input_width=24, label_width=24, shift=1,
train_df=train_df, val_df=val_df, test_df=test_df,
label_columns=[target])
conv_window = WindowGenerator(
input_width=CONV_WIDTH,
label_width=1, shift=1,
train_df=train_df, val_df=val_df, test_df=test_df,
label_columns=[target])
# Train Baseline
baseline = Baseline(label_index=single_step_window.column_indices.get(target))
baseline.compile(loss=tf.losses.MeanSquaredError(),
metrics=[tf.metrics.MeanAbsoluteError()])
val_performance, tst_performance = {}, {}
val_performance['baseline'] = baseline.evaluate(single_step_window.val)
tst_performance['baseline'] = baseline.evaluate(single_step_window.test, verbose=0)
wide_window.plot(target, baseline)
run.log_image('baseline_pred', plot=plt)
# Train Models
def compile_and_fit(model, window, patience=4):
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
patience=patience,
mode='min')
model.compile(loss=tf.losses.MeanSquaredError(),
optimizer=tf.optimizers.Adam(),
metrics=[tf.metrics.MeanAbsoluteError()])
history = model.fit(window.train, epochs=MAX_EPOCHS,
validation_data=window.val,
callbacks=[early_stopping])
return history
# Train Linear Model
linear = tf.keras.Sequential([
tf.keras.layers.Dense(units=1)
])
history = compile_and_fit(linear, single_step_window)
val_performance['linear'] = linear.evaluate(single_step_window.val)
tst_performance['linear'] = linear.evaluate(single_step_window.test, verbose=0)
tf.saved_model.save(linear, './outputs/model/linear')
fig1 = plt.figure()
ax = fig1.add_subplot(111)
ax.bar(x = range(len(train_df.columns)),
height=linear.layers[0].kernel[:,0].numpy())
ax.set_xticks(range(len(train_df.columns)))
_ = ax.set_xticklabels(train_df.columns, rotation=90)
run.log_image('linear_coef', plot=plt)
wide_window.plot(target, linear)
run.log_image('linear_pred', plot=plt)
# Train Single Step Dense Model
single_step_dense = tf.keras.Sequential([
tf.keras.layers.Dense(units=64, activation='relu'),
tf.keras.layers.Dense(units=64, activation='relu'),
tf.keras.layers.Dense(units=1)
])
history = compile_and_fit(single_step_dense, single_step_window)
val_performance['single_step_dense'] = single_step_dense.evaluate(single_step_window.val)
tst_performance['single_step_dense'] = single_step_dense.evaluate(single_step_window.test, verbose=0)
tf.saved_model.save(single_step_dense, './outputs/model/single_step_dense')
wide_window.plot(target, single_step_dense)
run.log_image('single_step_dense_pred', plot=plt)
# Train Multi Step Dense Model
multi_step_dense = tf.keras.Sequential([
# Shape: (time, features) => (time*features)
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(units=32, activation='relu'),
tf.keras.layers.Dense(units=32, activation='relu'),
tf.keras.layers.Dense(units=1),
# Add back the time dimension.
# Shape: (outputs) => (1, outputs)
tf.keras.layers.Reshape([1, -1]),
])
history = compile_and_fit(multi_step_dense, conv_window)
val_performance['multi_step_dense'] = multi_step_dense.evaluate(conv_window.val)
tst_performance['multi_step_dense'] = multi_step_dense.evaluate(conv_window.test, verbose=0)
tf.saved_model.save(multi_step_dense, './outputs/model/multi_step_dense')
# wide_window.plot(target, multi_step_dense)
# run.log_image('multi_step_dense_pred', plot=plt)
# Train Conv Model
conv = tf.keras.Sequential([
tf.keras.layers.Conv1D(filters=32,
kernel_size=(CONV_WIDTH,),
activation='relu'),
tf.keras.layers.Dense(units=32, activation='relu'),
tf.keras.layers.Dense(units=1),
])
history = compile_and_fit(conv, conv_window)
val_performance['conv'] = conv.evaluate(conv_window.val)
tst_performance['conv'] = conv.evaluate(conv_window.test, verbose=0)
tf.saved_model.save(conv, './outputs/model/conv')
# Train LSTM Model
lstm = tf.keras.models.Sequential([
# Shape [batch, time, features] => [batch, time, lstm_units]
tf.keras.layers.LSTM(10, return_sequences=True),
# Shape => [batch, time, features]
tf.keras.layers.Dense(units=1)
])
history = compile_and_fit(lstm, wide_window)
val_performance['lstm'] = lstm.evaluate(wide_window.val)
tst_performance['lstm'] = lstm.evaluate(wide_window.test, verbose=0)
tf.saved_model.save(lstm, './outputs/model/lstm')
# Performance
x = np.arange(len(val_performance))
width = 0.3
metric_name = 'mean_absolute_error'
metric_index = lstm.metrics_names.index('mean_absolute_error')
val_mae = [v[metric_index] for v in val_performance.values()]
test_mae = [v[metric_index] for v in tst_performance.values()]
fig2 = plt.figure()
ax = fig2.add_subplot(111)
b1 = ax.bar(x - 0.2, val_mae, width, label='validation')
# b2 = ax.bar(x + 0.2, test_mae, width, label='test')
ax.set_xticks(range(len(val_mae)))
_ = ax.set_xticklabels(val_performance.keys(), rotation=90)
run.log_image('performance_mae', plot=plt)
# Log Results & Select Best Model
best_model, best_score = None, None
if run is not None:
for k, v in val_performance.items():
run.log_list(f'val_{k}', v)
for k, v in tst_performance.items():
run.log_list(f'tst_{k}', v)
try:
mae = float(v[1])
if best_score is None and best_model is None:
best_model = k
best_score = mae
elif best_score > mae:
best_model = k
best_score = mae
except:
continue
run.log('best_model', best_model)
run.log('best_score', best_score)
if best_model != "baseline": model = run.register_model(model_name=best_model, model_path=f'outputs/model/{best_model}')
#### Setup Training Environment
aml_run_config = RunConfiguration()
aml_run_config.target = compute_target
aml_run_config.environment.python.user_managed_dependencies = False
# Add some packages relied on by data prep step
deps = CondaDependencies.create(
conda_packages=['pandas','scikit-learn', 'matplotlib'],
pip_packages=['azureml-sdk', 'azureml-dataprep[fuse,pandas]', 'azureml-pipeline', 'azureml-interpret'],
python_version='3.6.2',
pin_sdk_version=True)
deps.add_tensorflow_pip_package(core_type='gpu', version='2.3.1')
aml_run_config.environment.python.conda_dependencies = deps
src = ScriptRunConfig(source_directory=src_dir,
script='train.py',
arguments=['--dataset_name', f'{ticker.lower()}_ds'],
run_config=aml_run_config)
%%capture
experiment = Experiment(ws, 'aml_exp')
script_run = experiment.submit(src)
script_run.wait_for_completion(show_output=False)
RunDetails(script_run).show()
I find it best to simply go to the experiment portal url to review from the gui. It contains all the runs from your experiment and makes it easy to review changes from a central location.
script_run.get_portal_url()
However, you can choose to do the model review inside the notebook too.
The first place to look when doing this is the experiments metrics. In this example I'm logging the mse and mae for validation & test datasets for each model
metrics = script_run.get_metrics()
metrics
mae_metrics = []
for name, value in metrics.items():
if isinstance(value, list) and len(value) >= 2:
splits = name.split("_")
grp, model = splits[0], "_".join(splits[1:])
mae_metrics.append((model, grp, value[1]))
for model, grp, mae in sorted(mae_metrics, key=lambda o: o[0]):
name = f'{model}_{grp}'
print(f'{name:25s}: {mae:0.4f}')
It can also be useful to review the log files to figure out wtf is going wrong constantly...
files = script_run.get_file_names()
files
print("starting compute cleanup")
for name, compute in ws.compute_targets.items():
print(f"deleting {name} instance")
compute.delete()
while len(ws.compute_targets.items()) != 0:
continue
print("compute cleanup complete")