This notebook is the same as the previous notebook. The goals is simply to update it to make a few enhancements that became apparent during the process of making the first notebook.

Enhancements

  • Implement multiple step prediciton model
  • splitting pipeline into two steps: prep_data and train
  • rendering logged plots
  • switching to table interface for logging scoring metrics instead of list

Notebook Setup

%load_ext autoreload
%autoreload 2
%matplotlib inline

%config Completer.use_jedi = False

Libraries

import os 

import datetime as dt

from PIL import Image
from pathlib import Path
from dotenv import load_dotenv
import pandas as pd
import numpy as np
import tensorflow as tf

import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt


print(f"pandas version {pd.__version__}")
print(f"numpy version {np.__version__}")
print(f"tensorflow version {tf.__version__}")
pandas version 1.2.3
numpy version 1.19.2
tensorflow version 2.3.0
import azureml.core as aml

from azureml.core import Workspace, ScriptRunConfig, Environment, Experiment, Run
from azureml.core import Datastore, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute

from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies

from azureml.core import Model
from azureml.core.resource_configuration import ResourceConfiguration

from azureml.data import OutputFileDatasetConfig

from azureml.pipeline.core import Pipeline, PipelineParameter
from azureml.pipeline.steps import PythonScriptStep

from azureml.widgets import RunDetails


print(f"azureml version {aml.__version__}")
azureml version 1.25.0
import tensorboard

from azureml.tensorboard import Tensorboard

print(f"tensorboard version {tensorboard.__version__}")
tensorboard version 2.4.0
import twelvedata
from twelvedata import TDClient

print(f"twelvedata version {twelvedata.__version__}")
twelvedata version 1.1.8

Project Environment Variables

This is a personal preference of mine to make a .env file per project to encapsulate tokens/secrets/etc outside of notebooks.

In this case I created a file named .env with a single variable apikey=(api key) in the same directory as my experiment.

env_path = Path("../.env")
assert env_path.exists()
_ = load_dotenv(env_path)

Matplotlib

It's useful to set a few global plotting defaults to save from doing them for every plot in a notebook

mpl.rcParams['figure.figsize'] = (12, 8)
mpl.rcParams['axes.grid'] = False

Azure ML Workspace

To setup an Azure ML Workspace you will need an azure account (with credit card). To spin it up simply go to https://portal.azure.com/ and type machine learning in the search bar and create a workspace.

Once you have a workspace you will need to download the config.json prior to going to https://ml.azure.com/ to access your workspace

workspace_config_path = Path("../config.json")
assert workspace_config_path.exists()
ws = Workspace.from_config(path=workspace_config_path)

Twelve Data Client

I setup an account at https://twelvedata.com/ to get a free api key to try it out. I had not heard of it before, but it was the first thing that came up in my google search for free market data...

apikey = os.environ.get("apikey")
td = TDClient(apikey=apikey)

ML Workspace Compute

Get existing compute cluster or create one

compute_name = "aml-compute"
vm_size = "Standard_NC6"
# vm_size = "Standard_NC6s_v3"

if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('Found compute target: ' + compute_name)
else:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size=vm_size,  # STANDARD_NC6 is GPU-enabled
                                                                min_nodes=0,
                                                                max_nodes=4)
    # create the compute target
    compute_target = ComputeTarget.create(
        ws, compute_name, provisioning_config)

    # Can poll for a minimum number of nodes and for a specific timeout.
    # If no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(
        show_output=True, min_node_count=None, timeout_in_minutes=20)

    # For a more detailed view of current cluster status, use the 'status' property
    print(compute_target.status.serialize())
Creating a new compute target...
Creating...
SucceededProvisioning operation finished, operation "Succeeded"
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned
{'currentNodeCount': 0, 'targetNodeCount': 0, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 0, 'idleNodeCount': 0, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2021-04-06T17:41:16.106000+00:00', 'errors': None, 'creationTime': '2021-04-06T17:41:13.362593+00:00', 'modifiedTime': '2021-04-06T17:41:28.964771+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 0, 'maxNodeCount': 4, 'nodeIdleTimeBeforeScaleDown': 'PT120S'}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_NC6'}

ML Workspace Data

TwelveData

List ETFs Available

etf_data = td.get_etf_list()
etf_list = etf_data.as_json()
etf_df = pd.DataFrame(etf_list)
etf_df.head()
symbol name currency exchange
0 8PSG Invesco Physical Gold ETC EUR XETR
1 AAA BetaShares Australian High Interest Cash ETF AUD ASX
2 AAAU Perth Mint Physical Gold ETF USD NYSE
3 AADR AdvisorShares Dorsey Wright ADR ETF USD NYSE
4 AASF Airlie Australian Share Fund -- ETF Feeder AUD ASX

Get ETF Time Series

end_date = pd.Timestamp(dt.datetime.today())
start_date = end_date - pd.tseries.offsets.BDay(252) * 2

start_date.to_pydatetime().date(), end_date.to_pydatetime().date()
(datetime.date(2019, 5, 1), datetime.date(2021, 4, 6))
ticker = "IVOL"
ts = td.time_series(
    symbol=ticker, 
    interval="1day",
    start_date=start_date,
    end_date=end_date,
    outputsize=500
)
df = ts.with_ema().with_vwap().as_pandas()
df.describe()
open high low close volume ema vwap
count 475.00000 475.000000 475.000000 475.000000 4.750000e+02 475.000000 475.000000
mean 26.60264 26.671625 26.540325 26.608292 4.910146e+06 26.156760 26.606747
std 1.12284 1.116020 1.137535 1.123807 9.894917e+07 3.595103 1.123732
min 24.05000 24.727000 24.050000 24.440000 0.000000e+00 0.000000 24.405670
25% 25.50000 25.579500 25.450000 25.530000 1.120000e+04 25.547135 25.519500
50% 26.58000 26.750000 26.510000 26.600000 4.390000e+04 26.538130 26.592400
75% 27.40000 27.465000 27.370000 27.410000 4.084390e+05 27.423495 27.421665
max 28.95000 28.950000 28.820000 28.945000 2.156860e+09 28.736060 28.905000
df.head().reset_index()
datetime open high low close volume ema vwap
0 2021-04-06 28.4200 28.4500 28.42 28.420 703070 28.51697 28.43000
1 2021-04-05 28.4295 28.4800 28.40 28.400 1569462 28.54121 28.42667
2 2021-04-01 28.4861 28.5125 28.41 28.420 3013907 28.57651 28.44750
3 2021-03-31 28.5841 28.6294 28.53 28.590 2156860498 28.61564 28.58313
4 2021-03-30 28.5000 28.5500 28.42 28.505 5507839 28.62205 28.49167

Azure

Azure Workspace Datastore

data_store = ws.get_default_datastore()

Upload ETF Dataset

def get_or_upload_df(ws, data_store, df, ticker):
    
    dataset_name = f'{ticker.lower()}_ds'
    try: 
        ds = Dataset.get_by_name(workspace=ws, name=dataset_name)
        df = ds.to_pandas_dataframe()
    except:
        Dataset.Tabular.register_pandas_dataframe(df, data_store, dataset_name)
        ds = Dataset.get_by_name(workspace=ws, name=dataset_name)
        df = ds.to_pandas_dataframe()
    
    return df
    

aml_df = get_or_upload_df(ws, data_store, df.reset_index(), ticker)
aml_df.head()
datetime open high low close volume ema vwap
0 2021-04-01 04:00:00 28.4861 28.5125 28.4100 28.420 3013907 28.57651 28.44750
1 2021-03-31 04:00:00 28.5841 28.6294 28.5300 28.590 2156860498 28.61564 28.58313
2 2021-03-30 04:00:00 28.5000 28.5500 28.4200 28.505 5507839 28.62205 28.49167
3 2021-03-29 04:00:00 28.7000 28.7200 28.6300 28.650 949622 28.65131 28.66667
4 2021-03-26 04:00:00 28.7153 28.7790 28.7153 28.740 1108546 28.65164 28.74477

Training

Create Training Script

src_dir = 'aml-exp-multi'
aml_exp = Path(src_dir)
if not aml_exp.exists(): aml_exp.mkdir()
%%writefile aml-exp-multi/data-prep.py

# Standard Libraries
import argparse

import datetime as dt

from pathlib import Path

# 3rd Party Libraries
import numpy as np
import pandas as pd

# Plotting Libraries
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt

# Azure ML Libraries
from azureml.core import Run
from azureml.core import Dataset

# ML Run
run = Run.get_context()
workspace = run.experiment.workspace

# Read in Args
parser = argparse.ArgumentParser()
parser.add_argument('--input_dataset', dest='input_dataset', required=True)
parser.add_argument('--train_dataset', dest='train_dataset', required=True)
parser.add_argument('--val_dataset', dest='val_dataset', required=True)
parser.add_argument('--test_dataset', dest='test_dataset', required=True)
args = parser.parse_args()

# Dataset & Prep
ds = run.input_datasets['input_dataset']
df = ds.to_pandas_dataframe()

# Date Feature Prep
day = 24*60*60
year = (365.2425)*day

date_time = pd.to_datetime(df.datetime)
timestamp_s = date_time.map(dt.datetime.timestamp)

df['day_sin'] = np.sin(timestamp_s * (2 * np.pi / day))
df['day_cos'] = np.cos(timestamp_s * (2 * np.pi / day))
df['year_sin'] = np.sin(timestamp_s * (2 * np.pi / year))
df['year_cos'] = np.cos(timestamp_s * (2 * np.pi / year))

# Data Filter
features = ['day_sin', 'day_cos', 'low', 'high', 'volume', 'ema', 'vwap']
target = 'close'
columns = features + [target]
df = df[columns]
num_features = df.shape[1]

# Data Splitting
n = len(df)
train_df = df[0:int(n*0.6)]
val_df = df[int(n*0.6):int(n*0.8)]
test_df = df[int(n*0.8):]

# Data Normalization
train_mean = train_df.mean()
train_std = train_df.std()

train_df = (train_df - train_mean) / train_std
val_df = (val_df - train_mean) / train_std
test_df = (test_df - train_mean) / train_std


# Data Distribution Check
df_std = (df - train_mean) / train_std
df_std = df_std.melt(var_name='Column', value_name='Normalized')
plt.figure(figsize=(12, 6))
ax = sns.violinplot(x='Column', y='Normalized', data=df_std)
_ = ax.set_xticklabels(df.keys(), rotation=45)
run.log_image('feature_distribution_check', plot=plt)

print(run.output_datasets)
print(type(args.train_dataset), args.train_dataset)
with (Path(args.train_dataset) / 'train.csv').open('w') as f:
    train_df.to_csv(f)
with (Path(args.val_dataset)/ 'val.csv').open('w') as f:
    val_df.to_csv(f)
with (Path(args.test_dataset) / 'test.csv').open('w') as f:
    test_df.to_csv(f)
Overwriting aml-exp-multi/data-prep.py
%%writefile aml-exp-multi/train.py

# Standard Libraries
import argparse
import json
import os

import datetime as dt

from functools import partial

# 3rd Party Libraries
import numpy as np
import pandas as pd
import tensorflow as tf

# Plotting Libraries
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt

# Azure ML Libraries
from azureml.core import Run
from azureml.core import Dataset, Datastore
from azureml.data.datapath import DataPath

from azureml.core import Model

from azureml.tensorboard.export import export_to_tensorboard

from sklearn.metrics import confusion_matrix

# Classes 
class WindowGenerator():
    def __init__(self, input_width, label_width, shift,
               train_df, val_df, test_df,
               label_columns=None):
        # Store the raw data.
        self.train_df = train_df
        self.val_df = val_df
        self.test_df = test_df

        # Work out the label column indices.
        self.label_columns = label_columns
        if label_columns is not None:
            self.label_columns_indices = {name: i for i, name in
                                        enumerate(label_columns)}
        self.column_indices = {name: i for i, name in
                               enumerate(train_df.columns)}

        # Work out the window parameters.
        self.input_width = input_width
        self.label_width = label_width
        self.shift = shift

        self.total_window_size = input_width + shift

        self.input_slice = slice(0, input_width)
        self.input_indices = np.arange(self.total_window_size)[self.input_slice]

        self.label_start = self.total_window_size - self.label_width
        self.labels_slice = slice(self.label_start, None)
        self.label_indices = np.arange(self.total_window_size)[self.labels_slice]

    def __repr__(self):
        return '\n'.join([
            f'Total window size: {self.total_window_size}',
            f'Input indices: {self.input_indices}',
            f'Label indices: {self.label_indices}',
            f'Label column name(s): {self.label_columns}'])
    
    @property
    def train(self):
        return self.make_dataset(self.train_df)

    @property
    def val(self):
        return self.make_dataset(self.val_df)

    @property
    def test(self):
        return self.make_dataset(self.test_df)

    @property
    def example(self):
        """Get and cache an example batch of `inputs, labels` for plotting."""
        result = getattr(self, '_example', None)
        if result is None:
            # No example batch was found, so get one from the `.train` dataset
            result = next(iter(self.train))
            # And cache it for next time
            self._example = result
        return result
    
    def split_window(self, features):
        inputs = features[:, self.input_slice, :]
        labels = features[:, self.labels_slice, :]
        if self.label_columns is not None:
            labels = tf.stack(
                [labels[:, :, self.column_indices[name]] for name in self.label_columns],
                axis=-1)

        # Slicing doesn't preserve static shape information, so set the shapes
        # manually. This way the `tf.data.Datasets` are easier to inspect.
        inputs.set_shape([None, self.input_width, None])
        labels.set_shape([None, self.label_width, None])

        return inputs, labels
          
    def make_dataset(self, data):
        data = np.array(data, dtype=np.float32)
        ds = tf.keras.preprocessing.timeseries_dataset_from_array(
          data=data,
          targets=None,
          sequence_length=self.total_window_size,
          sequence_stride=1,
          shuffle=True,
          batch_size=32,)

        ds = ds.map(self.split_window)

        return ds
    
    def plot(self, plot_col, model=None, max_subplots=3):
        plt.figure(figsize=(12, 8))
        plot_col_index = self.column_indices[plot_col]
        inputs, labels = self.example
        max_n = min(max_subplots, len(inputs))
        for n in range(max_n):
            plt.subplot(max_n, 1, n+1)
            plt.ylabel(f'{plot_col} [normed]')
            plt.plot(self.input_indices, inputs[n, :, plot_col_index],
                     label='Inputs', marker='.', zorder=-10)

            if self.label_columns:
                label_col_index = self.label_columns_indices.get(plot_col, None)
            else:
                label_col_index = plot_col_index

            if label_col_index is None:
                continue

            plt.scatter(self.label_indices, labels[n, :, label_col_index],
                edgecolors='k', label='Labels', c='#2ca02c', s=64)
            if model is not None:
                predictions = model(inputs)
                plt.scatter(self.label_indices, predictions[n, :, label_col_index],
                          marker='X', edgecolors='k', label='Predictions',
                          c='#ff7f0e', s=64)

            if n == 0:
                plt.legend()

        plt.xlabel('Time')

class MultiStepLastBaseline(tf.keras.Model):
    def __init__(self, label_index=None):
        super().__init__()
        self.label_index = label_index

    def call(self, inputs):
        if self.label_index is None:
            return tf.tile(inputs[:, -1:, :], [1, OUT_STEPS, 1])
        return tf.tile(tf.expand_dims(inputs[:, -1:, self.label_index], -1), [1, OUT_STEPS, 1])
    
class RepeatBaseline(tf.keras.Model):
    def __init__(self, label_index=None):
        super().__init__()
        self.label_index = label_index
        
    def call(self, inputs):
        if self.label_index is None:
            return inputs
        
        result = inputs[:, :, self.label_index]
        return result[:, :, tf.newaxis]
    
class FeedBack(tf.keras.Model):
    def __init__(self, units, out_steps, num_features):
        super().__init__()
        self.out_steps = out_steps
        self.units = units
        self.lstm_cell = tf.keras.layers.LSTMCell(units)
        # Also wrap the LSTMCell in an RNN to simplify the `warmup` method.
        self.lstm_rnn = tf.keras.layers.RNN(self.lstm_cell, return_state=True)
        self.dense = tf.keras.layers.Dense(num_features)
        
    def warmup(self, inputs):
        # inputs.shape => (batch, time, features)
        # x.shape => (batch, lstm_units)
        x, *state = self.lstm_rnn(inputs)

        # predictions.shape => (batch, features)
        prediction = self.dense(x)
        return prediction, state

    def call(self, inputs, training=None):
        # Use a TensorArray to capture dynamically unrolled outputs.
        predictions = []
        # Initialize the lstm state
        prediction, state = self.warmup(inputs)

        # Insert the first prediction
        predictions.append(prediction)

        # Run the rest of the prediction steps
        for n in range(1, self.out_steps):
            # Use the last prediction as input.
            x = prediction
            # Execute one lstm step.
            x, state = self.lstm_cell(x, states=state,
                                      training=training)
            # Convert the lstm output to a prediction.
            prediction = self.dense(x)
            # Add the prediction to the output
            predictions.append(prediction)

        # predictions.shape => (time, batch, features)
        predictions = tf.stack(predictions)
        # predictions.shape => (batch, time, features)
        predictions = tf.transpose(predictions, [1, 0, 2])
        return predictions
    
# Global Variables
MAX_EPOCHS = 20
OUT_STEPS = 24
CONV_WIDTH = 3

# Paths
os.makedirs('./outputs', exist_ok=True)
os.makedirs('./outputs/model', exist_ok=True)
os.makedirs('./outputs/log', exist_ok=True)

# Read in Args
parser = argparse.ArgumentParser()
parser.add_argument('--train_dataset', dest='train_dataset', required=True)
parser.add_argument('--val_dataset', dest='val_dataset', required=True)
parser.add_argument('--test_dataset', dest='test_dataset', required=True)
args = parser.parse_args()

# ML Run
run = Run.get_context()
workspace = run.experiment.workspace
datastore = workspace.get_default_datastore()

train_ds = run.input_datasets['train_dataset']
val_ds = run.input_datasets['val_dataset']
test_ds = run.input_datasets['test_dataset']

train_df = train_ds.to_pandas_dataframe()
val_df = val_ds.to_pandas_dataframe()
test_df = test_ds.to_pandas_dataframe()

target = train_df.columns[-1]
num_features = train_df.shape[1]

# Data Windows
multi_window = WindowGenerator(input_width=24,
                               label_width=OUT_STEPS, shift=OUT_STEPS,
                               train_df=train_df, val_df=val_df, test_df=test_df,
                               label_columns=[target])
multi_window.plot(target)
run.log_image(f'{target}_variable', plot=plt)


# Baseline
val_performance, tst_performance = {}, {}
def log_result(name, model, window, target, vals, tsts):
    vals[name] = model.evaluate(window.val)
    tsts[name] = model.evaluate(window.test, verbose=0)
    window.plot(target, model)
    run.log_image(f'{name}_pred', plot=plt)
    tf.saved_model.save(model, f'outputs/model/{name}')   
log_model = partial(log_result, window=multi_window, target=target, vals=val_performance, tsts=tst_performance)

print(f"target indice: {multi_window.column_indices.get(target)}")
last_baseline = MultiStepLastBaseline(-1)
last_baseline.compile(loss=tf.losses.MeanSquaredError(),
                      metrics=[tf.metrics.MeanAbsoluteError()])        
log_model('last_baseline', last_baseline)

repeat_baseline = RepeatBaseline(-1)
repeat_baseline.compile(loss=tf.losses.MeanSquaredError(),
                        metrics=[tf.metrics.MeanAbsoluteError()])
log_model('repeat_baseline', repeat_baseline)


# Train Models
def compile_and_fit(model, window, patience=4):
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
                                                    patience=patience,
                                                    mode='min')
    model.compile(loss=tf.losses.MeanSquaredError(),
                optimizer=tf.optimizers.Adam(),
                metrics=[tf.metrics.MeanAbsoluteError()])
    history = model.fit(window.train, epochs=MAX_EPOCHS,
                      validation_data=window.val,
                      callbacks=[early_stopping])  
    model.summary()
    return history
compile_and_fit_multi = partial(compile_and_fit, window=multi_window)


# Train Linear Model
linear = tf.keras.Sequential([
    # Take the last time-step.
    # Shape [batch, time, features] => [batch, 1, features]
    tf.keras.layers.Lambda(lambda x: x[:, -1:, :]),
    # Shape => [batch, 1, out_steps*features]
    tf.keras.layers.Dense(OUT_STEPS*num_features,
                          kernel_initializer=tf.initializers.zeros()),
    # Shape => [batch, out_steps, features]
    tf.keras.layers.Reshape([OUT_STEPS, num_features])
])
history = compile_and_fit_multi(linear)
log_model('linear', linear)

# Train Dense Model
dense = tf.keras.Sequential([
    # Take the last time step.
    # Shape [batch, time, features] => [batch, 1, features]
    tf.keras.layers.Lambda(lambda x: x[:, -1:, :]),
    # Shape => [batch, 1, dense_units]
    tf.keras.layers.Dense(512, activation='relu'),
    # Shape => [batch, out_steps*features]
    tf.keras.layers.Dense(OUT_STEPS*num_features,
                          kernel_initializer=tf.initializers.zeros()),
    # Shape => [batch, out_steps, features]
    tf.keras.layers.Reshape([OUT_STEPS, num_features])
])
history = compile_and_fit_multi(dense)
log_model('dense', dense)

# Train Conv Model
conv = tf.keras.Sequential([
    # Shape [batch, time, features] => [batch, CONV_WIDTH, features]
    tf.keras.layers.Lambda(lambda x: x[:, -CONV_WIDTH:, :]),
    # Shape => [batch, 1, conv_units]
    tf.keras.layers.Conv1D(256, activation='relu', kernel_size=(CONV_WIDTH)),
    # Shape => [batch, 1,  out_steps*features]
    tf.keras.layers.Dense(OUT_STEPS*num_features,
                          kernel_initializer=tf.initializers.zeros()),
    # Shape => [batch, out_steps, features]
    tf.keras.layers.Reshape([OUT_STEPS, num_features])
])

history = compile_and_fit_multi(conv)
log_model('conv', conv)

# Train LSTM Model
lstm = tf.keras.Sequential([
    # Shape [batch, time, features] => [batch, lstm_units]
    # Adding more `lstm_units` just overfits more quickly.
    tf.keras.layers.LSTM(32, return_sequences=False),
    # Shape => [batch, out_steps*features]
    tf.keras.layers.Dense(OUT_STEPS*num_features,
                          kernel_initializer=tf.initializers.zeros()),
    # Shape => [batch, out_steps, features]
    tf.keras.layers.Reshape([OUT_STEPS, num_features])
])

history = compile_and_fit_multi(lstm)
log_model("lstm", lstm)

# Train AR RNN Feedback Model
feedback = FeedBack(units=32, out_steps=OUT_STEPS, num_features=num_features)
prediction, state = feedback.warmup(multi_window.example[0])

history = compile_and_fit_multi(feedback)
log_model('lstm_feedback', feedback)

# Log Results & Select Best Model
best_model, best_score = None, None
if run is not None:
    
    # log results
    run.log_table('mae_val', {k: v[1] for (k, v) in val_performance.items()})
    run.log_table('mse_val', {k: v[0] for (k, v) in val_performance.items()})
    run.log_table('mae_tst', {k: v[1] for (k, v) in tst_performance.items()})
    run.log_table('mse_tst', {k: v[0] for (k, v) in tst_performance.items()})
    
    # select best
    for k, v in tst_performance.items():
        try:
            mae = float(v[1])    
            if best_score is None and best_model is None: 
                best_model = k
                best_score = mae
            elif best_score > mae:
                best_model = k
                best_score = mae   
        except:
            continue

    run.log('best_model', best_model)
    run.log('best_score', best_score)

# best_model_path = f'outputs/model/{best_model}'
# model = run.register_model(model_name=best_model, model_path=best_model_path)
Overwriting aml-exp-multi/train.py

Setup Training Environment

aml_run_config = RunConfiguration()
aml_run_config.target = compute_target

aml_run_config.environment.python.user_managed_dependencies = False

# Add some packages relied on by data prep step
deps = CondaDependencies.create(
    conda_packages=['pandas','scikit-learn', 'matplotlib', 'seaborn'], 
    pip_packages=['azureml-sdk', 'azureml-dataprep[fuse,pandas]', 
                  'azureml-pipeline', 'azureml.tensorboard', 'azureml-interpret'], 
    python_version='3.6.2',
    pin_sdk_version=True)
deps.add_tensorflow_pip_package(core_type='gpu', version='2.3.1')
aml_run_config.environment.python.conda_dependencies = deps

Build Data Prep Step

train_data = OutputFileDatasetConfig(name="train_data", destination=(data_store, 'train/tabular/')).read_delimited_files()
val_data = OutputFileDatasetConfig(name="val_data", destination=(data_store, 'val/tabular/')).read_delimited_files()
test_data = OutputFileDatasetConfig(name="test_data", destination=(data_store, 'test/tabular/')).read_delimited_files()

dataset_name = f'{ticker.lower()}_ds'
input_dataset = Dataset.get_by_name(ws, dataset_name)

prep_data_step = PythonScriptStep(
    name="prep_data_step",
    source_directory=src_dir,
    script_name="data-prep.py",
    arguments=[
        "--input_dataset", input_dataset.as_named_input("input_dataset"), 
        "--train_dataset", train_data, 
        "--val_dataset", val_data, 
        "--test_dataset", test_data
    ],
    runconfig=aml_run_config,
    allow_reuse=True
)

Build Train Step

train_step = PythonScriptStep(
    name="train_step",
    source_directory=src_dir,
    script_name="train.py", 
    arguments=[
        "--train_dataset", train_data.as_input(name="train_dataset"), 
        "--val_dataset", val_data.as_input(name="val_dataset"), 
        "--test_dataset", test_data.as_input(name="test_dataset")
    ],
    runconfig=aml_run_config
)

Build Pipeline

steps = [prep_data_step, train_step]
pipeline = Pipeline(workspace=ws, steps=steps)

Run Experiment

%%capture
experiment = Experiment(ws, 'aml_exp_multi')
script_run = experiment.submit(pipeline)
script_run.wait_for_completion(show_output=False)
RunDetails(script_run).show()

Review

Azure Portal

I find it best to simply go to the experiment portal url to review from the gui. It contains all the runs from your experiment and makes it easy to review changes from a central location.

script_run.get_portal_url()
'https://ml.azure.com/runs/b3e487ab-146d-4d0c-a47c-1fb9e3e534ce?wsid=/subscriptions/f3b5840b-706e-44ba-8aa1-6fd3fc8aaab0/resourcegroups/ds-workspace/workspaces/minion-lab&tid=e6777dcd-6f87-4dd0-92e5-e98312157dac'

Data Preprocessing

step_name = 'prep_data_step'
step = script_run.find_step_run(step_name)[0]
step.get_portal_url()
'https://ml.azure.com/runs/fff72717-d728-4ebb-8065-c3fdcaf22876?wsid=/subscriptions/f3b5840b-706e-44ba-8aa1-6fd3fc8aaab0/resourcegroups/ds-workspace/workspaces/minion-lab&tid=e6777dcd-6f87-4dd0-92e5-e98312157dac'

The most useful thing for myself is to display the images stored during the running of a step.

imgs = [f for f in step.get_file_names() if f.endswith('.png')]
for img in imgs:
    step.download_file(img)
    display(img)
    display(Image.open(img))
    Path(img).unlink()
'feature_distribution_check_1617743006.png'

We can see that volumne and ema have wide tails / outliers to review and potentially deal with above.

Model Metrics

However, you can choose to do the model review inside the notebook too.

The first place to look when doing this is the experiments metrics. In this example I'm logging the mse and mae for validation & test datasets for each model

step_name = 'train_step'
step = script_run.find_step_run(step_name)[0]
step.get_portal_url()
'https://ml.azure.com/runs/ddaf55c1-11fb-4a9c-8b19-987e7d61422b?wsid=/subscriptions/f3b5840b-706e-44ba-8aa1-6fd3fc8aaab0/resourcegroups/ds-workspace/workspaces/minion-lab&tid=e6777dcd-6f87-4dd0-92e5-e98312157dac'
metrics_dict = step.get_metrics()

scores_df = pd.DataFrame()
for k, v in metrics_dict.items():
    if isinstance(v, dict):
        scores_df = scores_df.append(pd.DataFrame.from_dict(v, orient='index', columns=[k]).T)
        
scores_df.T.sort_values('mae_tst')
mae_val mse_val mae_tst mse_tst
last_baseline 0.107446 0.023849 0.216114 0.091769
dense 0.913333 0.903850 0.269590 0.126594
repeat_baseline 0.128062 0.029194 0.276632 0.119011
lstm 0.816010 0.706341 0.288581 0.131727
linear 1.056055 1.173291 0.318440 0.144961
conv 0.225789 0.099458 0.431668 0.297038
lstm_feedback 0.633682 0.755861 0.457924 0.367349

Our baseline is performing really well here. Proof that you should always start simple...

imgs = [f for f in step.get_file_names() if f.endswith('.png')]
for img in imgs:
    display(img)
    step.download_file(img)
    display(Image.open(img))
    Path(img).unlink()
'close_variable_1617743085.png'
'conv_pred_1617743096.png'
'dense_pred_1617743092.png'
'last_baseline_pred_1617743086.png'
'linear_pred_1617743089.png'
'lstm_feedback_pred_1617743118.png'
'lstm_pred_1617743105.png'
'repeat_baseline_pred_1617743087.png'

Clean up

Delete Compute Cluster

This is an important step if you don't want save some money 😉

print("starting compute cleanup")

for name, compute in ws.compute_targets.items():
    print(f"deleting {name} instance")
    compute.delete()
    
while len(ws.compute_targets.items()) != 0:
    continue

print("compute cleanup complete")
starting compute cleanup
deleting aml-compute instance
compute cleanup complete