Source code for MLT.testrunners.single_benchmark

# pylint: disable=C0103,C0301,C0326,W0703
"""This runner implements the main benchmark for qualitative analysis based on the full training and test sets."""
import os
import warnings
import numpy as np

from MLT.implementations import Autoencoder, HBOS, IsolationForest, LSTM_2_Multiclass, RandomForest, XGBoost

from MLT.metrics import metrics
from MLT.tools import dataset_tools, result_mail, toolbelt

# supress deprecation warning. sklearn is currently built against an older numpy version.
warnings.filterwarnings(
    module='sklearn*', action='ignore',
    category=DeprecationWarning,
    message='The truth value of an empty array is ambiguous*'
)

# supress future warning, as this is in the responsibility of pyod.
warnings.filterwarnings(
    module='scipy*', action='ignore',
    category=FutureWarning,
    message='Using a non-tuple sequence for multidimensional*'
)

[docs]def run_benchmark(train_data, train_labels, test_data, test_labels, result_path, model_savepath, args): """Run the full benchmark. As this is the full benchmark, it needs a train and a test partition. Besides that, it is mostly similar to the kfold_runner. Args: train_data (numpy.ndarray): Training partition train_labels (numpy.ndarray): According labels for supervised learning test_data (numpy.ndarray): Training partition test_labels (numpy.ndarray): According labels for supervised learning result_path (str): Where to save the results model_savepath (str): Where to store the trainned models args (argparse.Namespace): Parsed CMD arguments that contain all the switches and settings Returns: result_path (str): The path where to find the final results """ withXGBoost = args.XGBoost withRandomForest = args.RandomForest withLSTM2 = args.LSTM2 withHBOS = args.HBOS withAutoEnc = args.AutoEncoder withIForest = args.IsolationForest xgboost_stats = [] random_forest_stats = [] lstm2_stats = [] hbos_stats = [] autoenc_stats = [] iforest_stats = [] outliers_fraction = np.count_nonzero(test_labels) / len(test_labels) outliers_percentage = round(outliers_fraction * 100, ndigits=4) print("Outlier Percentage:", outliers_percentage) if args.anomaly: print("Anomaly/Novelty mode! Dropping all attacks from training partition.") before = dict(zip(*np.unique(train_labels, return_counts=True))) print("Before:\n\tLabel Count: {}\n\tEntry Count: {}".format(before, len(train_data))) train_data = train_data[train_labels != 1] train_labels = train_labels[train_labels != 1] print("Entry Count After: {}".format(len(train_data))) if args.unsupervised: train_labels = None # Pass empty train labels if withXGBoost: print("Training XGBoost") full_filename = os.path.join(model_savepath, "XGBoost") xgb_train_pass = XGBoost.train_model( withXGBoost[0], # n_estimators withXGBoost[1], # max_depth withXGBoost[2], # learning_rate train_data, train_labels, test_data, test_labels, full_filename ) xgboost_stats.append(xgb_train_pass) if withRandomForest: print("Training Random Forest") full_filename = os.path.join(model_savepath, "RandomForest") random_forest_pass = RandomForest.train_model( withRandomForest[0], # n_estimators withRandomForest[1], # max_depth train_data, train_labels, test_data, test_labels, full_filename ) random_forest_stats.append(random_forest_pass) if withLSTM2: print("Training 2-Class LSTM") tensorboard_logir = os.path.join(result_path, 'LSTM2C') mode_savepath = os.path.join(model_savepath, 'LSTM2C') lstm2_pass = LSTM_2_Multiclass.train_model( withLSTM2[0], # batch_size withLSTM2[1], # epochs withLSTM2[2], # learning_rate train_data, train_labels, test_data, test_labels, tensorboard_logir, mode_savepath ) lstm2_stats.append(lstm2_pass) if withHBOS: print("Training HBOS") full_filename = os.path.join(model_savepath, "HBOS") hbos_pass = HBOS.train_model( withHBOS[0], # n_bins withHBOS[1], # alpha withHBOS[2], # tol outliers_fraction, # contamination train_data, train_labels, test_data, test_labels, full_filename ) hbos_stats.append(hbos_pass) if withAutoEnc: print("Training AutoEncoder") full_filename = os.path.join(model_savepath, "AutoEncoder") auoenc_pass = Autoencoder.train_model( train_data, train_labels, test_data, test_labels, full_filename, batch_size=withAutoEnc[0], # batch epochs=withAutoEnc[1], # epochs dropout_rate=withAutoEnc[2], # dropout_rate contamination=outliers_fraction, # contamination learning_rate=withAutoEnc[3] # learning rate ) autoenc_stats.append(auoenc_pass) if withIForest: print("Training Isolation Forest") full_filename = os.path.join(model_savepath, "IsolationForest") iforest_pass = IsolationForest.train_model( train_data, train_labels, test_data, test_labels, full_filename, n_estimators=withIForest[0], contamination=outliers_fraction, max_features=withIForest[1], bootstrap=withIForest[2] ) iforest_stats.append(iforest_pass) try: if withXGBoost: metrics.calc_metrics_and_save_to_disk(xgboost_stats, 'XGBoost', result_path) except Exception: print('Ran into exception while saving XGBoost results to disk') try: if withRandomForest: metrics.calc_metrics_and_save_to_disk(random_forest_stats, 'RandomForest', result_path) except Exception: print('Ran into exception while saving Random Forest results to disk') try: if withLSTM2: metrics.calc_metrics_and_save_to_disk(lstm2_stats, 'LSTM2C', result_path) except Exception: print('Ran into exception while saving LSTM2C results to disk') try: if withHBOS: metrics.calc_metrics_and_save_to_disk(hbos_stats, 'HBOS', result_path) except Exception: print('Ran into exception while saving HBOS results to disk') try: if withAutoEnc: metrics.calc_metrics_and_save_to_disk(autoenc_stats, 'AutoEncoder', result_path) except Exception: print('Ran into exception while saving AutoEncoder results to disk') try: if withIForest: metrics.calc_metrics_and_save_to_disk(iforest_stats, 'IsolationForest', result_path) except Exception: print('Ran into exception while saving IsolationForest results to disk') if args.ResultMail: result_mail.prepare_and_send_results(result_path, args) full_respath = os.path.abspath(result_path) print('Results have ben saved to {}'.format(full_respath)) # Return the full path for other runners to use return full_respath