Product Features
...
Machine Learning Models
Model Types

Machine Learning Classification

1min

Machine Learning Classification uses models created in TensorFlow. See Upload a Model. The TensorFlow Images Processor feeds images to an already created TensorFlow Model. The TensorFlow Processor feeds timeseries data to an already created TensorFlow Model.

This use case is a customized classification version of a CNN classification model from the TensorFlow website.

# cnn model from numpy import mean from numpy import std from numpy import dstack from pandas import read_csv import numpy as np import tensorflow as tf from tensorflow import keras from sklearn.preprocessing import StandardScaler import matplotlib.pyplot as plt # load a single file as a numpy array def load_file(filepath): dataframe = read_csv(filepath, header=None, delim_whitespace=True) return dataframe.values # load a list of files and return as a 3d numpy array def load_group(filenames, prefix=''): loaded = list() for name in filenames: data = load_file(prefix + name) loaded.append(data) # stack group so that features are the 3rd dimension loaded = dstack(loaded) return loaded # load a dataset group, such as train or test def load_dataset_group(group, prefix=''): filepath = prefix + group + '/Inertial Signals/' # load all 9 files as a single array filenames = list() # total acceleration filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt'] body acceleration filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt'] # body gyroscopefilenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt'] # load input data X = load_group(filenames, filepath) load class output y = load_file(prefix + group + '/y_'+group+'.txt') return X, y # load the dataset, returns train and test X and y elements def load_dataset(prefix=''): # load all train trainX, trainy = load_dataset_group('train', prefix + 'HARDataset/') print(trainX.shape, trainy.shape) # load all test testX, testy = load_dataset_group('test', prefix + 'HARDataset/') print(testX.shape, testy.shape) zero-offset class values trainy = trainy - 1 testy = testy - 1 one hot encode y trainy = tf.keras.utils.to_categorical(trainy) testy = tf.keras.utils.to_categorical(testy) print(trainX.shape, trainy.shape, testX.shape, testy.shape) return trainX, trainy, testX, testy # standardize data def scale_data(trainX, testX, standardize): # remove overlap cut = int(trainX.shape[1] / 2) longX = trainX[:, -cut:, :] # flatten windows longX = longX.reshape((longX.shape[0] * longX.shape[1], longX.shape[2])) # flatten train and test flatTrainX = trainX.reshape((trainX.shape[0] * trainX.shape[1], trainX.shape[2])) flatTestX = testX.reshape((testX.shape[0] * testX.shape[1], testX.shape[2])) # standardize if standardize: s = StandardScaler() # fit on training data s.fit(longX) # apply to training and test data longX = s.transform(longX) flatTrainX = s.transform(flatTrainX) flatTestX = s.transform(flatTestX) # reshape flatTrainX = flatTrainX.reshape((trainX.shape)) flatTestX = flatTestX.reshape((testX.shape)) return flatTrainX, flatTestX # fit and evaluate a model def evaluate_model(trainX, trainy, testX, testy, param, n_filters, kernal_size): verbose, epochs, batch_size = 0, 10, 32 n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1] # scale data trainX, testX = scale_data(trainX, testX, param) model = keras.Sequential() model.add(tf.keras.layers.Conv1D(filters=n_filters, kernel_size=kernal_size, activation='relu', input_shape=(n_timesteps,n_features))) model.add(tf.keras.layers.Conv1D(filters=n_filters, kernel_size=kernal_size, activation='relu')) model.add(tf.keras.layers.Dropout(0.5)) model.add(tf.keras.layers.MaxPooling1D(pool_size=2)) model.add(tf.keras.layers.Flatten()) model.add(tf.keras.layers.Dense(100, activation='relu')) model.add(tf.keras.layers.Dense(n_outputs, activation='softmax')) model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) # fit network model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose) # evaluate model _, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0) return accuracy, model # summarize scores def summarize_results(scores, params): print(scores, params) summarize mean and standard deviation for i in range(len(scores)): m, s = mean(scores[i]), std(scores[i]) print('Param=%s: %.3f%% (+/-%.3f)' % (params[i], m, s)) # boxplot of scores # plt.boxplot(scores, labels=params) # plt.savefig('exp_cnn_standardize.png') # run an experiment def run_experiment(params, repeats=1): # load data trainX, trainy, testX, testy = load_dataset() # test each parameter all_scores = list() for p in params: # repeat experiment scores = list() model = keras.Sequential() for r in range(repeats): score, model = evaluate_model(trainX, trainy, testX, testy, p, n_filters=64, kernal_size=3) score = score * 100.0 model.summary() # if p: # model.save("/motionModel/") yy = model.predict(trainX) print(np.round(yy,3)) print(testy) print('>p=%s #%d: %.3f' % (p, r+1, score)) scores.append(score) all_scores.append(scores) # summarize results summarize_results(all_scores, params) ### run the experiment n_params = [False, True] run_experiment(n_params) # plt.show()