Product Features
...
Machine Learning Models
Model Types
Machine Learning Classification
1min
Machine Learning Classification uses models created in TensorFlow. See Upload a Model. The TensorFlow Images Processor feeds images to an already created TensorFlow Model. The TensorFlow Processor feeds timeseries data to an already created TensorFlow Model.
This use case is a customized classification version of a CNN classification model from the TensorFlow website.
# cnn model
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
import numpy as np
import tensorflow as tf
from tensorflow import keras
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
# load a single file as a numpy array
def load_file(filepath):
dataframe = read_csv(filepath, header=None, delim_whitespace=True)
return dataframe.values
# load a list of files and return as a 3d numpy array
def load_group(filenames, prefix=''):
loaded = list()
for name in filenames:
data = load_file(prefix + name)
loaded.append(data)
# stack group so that features are the 3rd dimension
loaded = dstack(loaded)
return loaded
# load a dataset group, such as train or test
def load_dataset_group(group, prefix=''):
filepath = prefix + group + '/Inertial Signals/'
# load all 9 files as a single array
filenames = list()
# total acceleration
filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']
body acceleration
filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']
# body gyroscopefilenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']
# load input data
X = load_group(filenames, filepath)
load class output
y = load_file(prefix + group + '/y_'+group+'.txt')
return X, y
# load the dataset, returns train and test X and y elements
def load_dataset(prefix=''):
# load all train
trainX, trainy = load_dataset_group('train', prefix + 'HARDataset/')
print(trainX.shape, trainy.shape)
# load all test
testX, testy = load_dataset_group('test', prefix + 'HARDataset/')
print(testX.shape, testy.shape)
zero-offset class values
trainy = trainy - 1
testy = testy - 1
one hot encode y
trainy = tf.keras.utils.to_categorical(trainy)
testy = tf.keras.utils.to_categorical(testy)
print(trainX.shape, trainy.shape, testX.shape, testy.shape)
return trainX, trainy, testX, testy
# standardize data
def scale_data(trainX, testX, standardize):
# remove overlap
cut = int(trainX.shape[1] / 2)
longX = trainX[:, -cut:, :]
# flatten windows
longX = longX.reshape((longX.shape[0] * longX.shape[1], longX.shape[2]))
# flatten train and test
flatTrainX = trainX.reshape((trainX.shape[0] * trainX.shape[1], trainX.shape[2]))
flatTestX = testX.reshape((testX.shape[0] * testX.shape[1], testX.shape[2]))
# standardize
if standardize:
s = StandardScaler()
# fit on training data
s.fit(longX)
# apply to training and test data
longX = s.transform(longX)
flatTrainX = s.transform(flatTrainX)
flatTestX = s.transform(flatTestX)
# reshape
flatTrainX = flatTrainX.reshape((trainX.shape))
flatTestX = flatTestX.reshape((testX.shape))
return flatTrainX, flatTestX
# fit and evaluate a model
def evaluate_model(trainX, trainy, testX, testy, param, n_filters, kernal_size):
verbose, epochs, batch_size = 0, 10, 32
n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]
# scale data
trainX, testX = scale_data(trainX, testX, param)
model = keras.Sequential()
model.add(tf.keras.layers.Conv1D(filters=n_filters, kernel_size=kernal_size, activation='relu', input_shape=(n_timesteps,n_features)))
model.add(tf.keras.layers.Conv1D(filters=n_filters, kernel_size=kernal_size, activation='relu'))
model.add(tf.keras.layers.Dropout(0.5))
model.add(tf.keras.layers.MaxPooling1D(pool_size=2))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(100, activation='relu'))
model.add(tf.keras.layers.Dense(n_outputs, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
# fit network
model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)
# evaluate model
_, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0)
return accuracy, model
# summarize scores
def summarize_results(scores, params):
print(scores, params)
summarize mean and standard deviation
for i in range(len(scores)):
m, s = mean(scores[i]), std(scores[i])
print('Param=%s: %.3f%% (+/-%.3f)' % (params[i], m, s))
# boxplot of scores
# plt.boxplot(scores, labels=params)
# plt.savefig('exp_cnn_standardize.png')
# run an experiment
def run_experiment(params, repeats=1):
# load data
trainX, trainy, testX, testy = load_dataset()
# test each parameter
all_scores = list()
for p in params:
# repeat experiment
scores = list()
model = keras.Sequential()
for r in range(repeats):
score, model = evaluate_model(trainX, trainy, testX, testy, p, n_filters=64, kernal_size=3)
score = score * 100.0
model.summary()
# if p:
# model.save("/motionModel/")
yy = model.predict(trainX)
print(np.round(yy,3))
print(testy)
print('>p=%s #%d: %.3f' % (p, r+1, score))
scores.append(score)
all_scores.append(scores)
# summarize results
summarize_results(all_scores, params)
### run the experiment
n_params = [False, True]
run_experiment(n_params)
# plt.show()