Source code for botiverse.models.NN.NN

#!/usr/bin/env python
# coding: utf-8

# # <font color="cyan">Feedforward Neural Network </font> From Scratch Implementation
# In this notebook, we will implement a feedforward neural network with arbitrary structure, loss and activations. You can find a full tutorial on this notebook [here](hhttps://medium.com/towards-data-science/backpropagation-the-natural-proof-946c5abf63b1).

# In[1]:


'''
This module implements and provides an interface for an arbitrary neural network architecture.
'''
try:
    from botiverse.models.NN.utils import split_data, batchify
except:
    pass
# check if running on notebook
from tqdm import tqdm
import numpy as np
import copy 
import pickle


# ### Define the Architecture

# In[2]:


[docs]class NeuralNet(): ''' Defines the hypothesis set and learning algorithm for a neural network. ''' def __init__(self, structure, activation='sigmoid', optimizer='ADAM'): ''' Initialize the hyperparameters, weights and biases for the network. :param structure: A list of integers representing the number of neurons in each layer. For example, [2, 3, 1] is a network with 2 neurons in the input layer, 3 in the hidden layer, and 1 in the output layer. :type structure: list :param activation: The activation function to use. Can be 'sigmoid' or 'relu'. :type activation: str :param optimizer: The optimizer to use. Can be 'ADAM' or 'SGD'. :type optimizer: str ''' ### Hyperparameters self.structure = structure self.num_layers = len(structure) self.activation = activation self.optimizer = optimizer if activation == 'sigmoid': def σ(z): return 1.0/(1.0+np.exp(-z)) # activation function def σࠤ(z): return σ(z)*(1-σ(z)) # derivative of the activation function elif activation == 'relu': def σ(z): return np.maximum(0, z) def σࠤ(z): return np.greater(z, 0).astype(int) self.h = σ self.dh = σࠤ def softmax(z): return np.exp(z-np.max(z))/np.sum(np.exp(z-np.max(z)), axis=0) self.g = softmax ### Parameters # Xaiver initialization #for each layer except the first y neurons randomize a (y, 1) vector for the bias vector: self.Bₙ = [np.random.randn(l, 1) * np.sqrt(2/l) for l in structure[1:]] #for each two consecutive layers with x and y neurons respectively randomize a (y,x) matrix for the weight matrix: self.Wₙ = [np.random.randn(l, next_l) * np.sqrt(2/l) for l, next_l in zip(structure[:-1], structure[1:])] ### Loss def cross_entropy(aᴺ, y): return -np.sum(y * np.log(aᴺ)) / aᴺ.shape[1] def cross_entropyࠤ(aᴺ, y): return (aᴺ-y) self.J = cross_entropy self.ᐁJ = cross_entropyࠤ ### Misc self.optimal_epochs = None
NNClass = lambda func: setattr(NeuralNet, func.__name__, func) or func # ### Backpropagation # In[3]:
[docs]@NNClass def backprop(self, xₛ , yₛ ): ''' Compute the loss gradients მJⳆმBₙₛ, მJⳆმWₙₛ given an observation (xₛ , yₛ ) where xₛ and yₛ are column vectors. :param xₛ: The input vector of shape (d, 1). :type xₛ: numpy.ndarray :param yₛ: The output vector of shape (K, 1). :type yₛ: numpy.ndarray :return: A tuple of lists (მJⳆმBₙₛ, მJⳆმWₙₛ) where მJⳆმBₙₛ is a list of the gradients of the loss function with respect to the biases and მJⳆმWₙₛ is a list of the gradients of the loss function with respect to the weights. :rtype: tuple of lists ''' h, dh, ᐁJ, g = self.h, self.dh, self.ᐁJ, self.g მJⳆმBₙₛ = [np.zeros(b.shape) for b in self.Bₙ] მJⳆმWₙₛ = [np.zeros(W.shape) for W in self.Wₙ] # forward pass (computing z for all layers) Zₙ = [] # list to store all the z vectors, layer by layer Aₙ = [] # list to store all the a vectors layer by layer for i, (b, W) in enumerate(zip(self.Bₙ, self.Wₙ)): z = W.T @ a + b if Zₙ else W.T @ xₛ + b a = h(z) if i != self.num_layers-2 else g(z) Zₙ.append(z) Aₙ.append(a) #Zₙ and Aₙ are now ready. # backward pass (computing δ and consequently მJⳆმBₙₛ and მJⳆმWₙₛ layer by layer ) H = self.num_layers-2 for L in range(H, -1, -1): δ = dh(Zₙ[L]) * (self.Wₙ[L+1] @ δ) if L != H else ᐁJ(Aₙ[L], yₛ ) მJⳆმBₙₛ[L] = δ მJⳆმWₙₛ[L] = Aₙ[L-1] @ δ.T if L != 0 else xₛ @ δ.T return (მJⳆმBₙₛ, მJⳆმWₙₛ)
# ### Gradient Descent # In[4]:
[docs]@NNClass def SGD(self, x_batch, y_batch, λ, α=0.01): ''' Given a minibatch (a list/numpy array of tuples (xₛ, yₛ )) this will update Bₙ and Wₙ by applying SGD with L2 regularization. :param x_batch: A list/numpy array of input vectors of shape (d, 1). :type x_batch: numpy.ndarray or list :param y_batch: A list/numpy array of output vectors of shape (K, 1). :type y_batch: numpy.ndarray or list :param λ: The learning rate. :type λ: float :param α: The regularization parameter. :type α: float ''' მJⳆმBₙ = [np.zeros(b.shape) for b in self.Bₙ] მJⳆმWₙ = [np.zeros(W.shape) for W in self.Wₙ] for x, y in zip(x_batch, y_batch): მJⳆმBₙₛ, მJⳆმWₙₛ = self.backprop(x, y) მJⳆმBₙ = [მJⳆმb + მJⳆმbₛ for მJⳆმb, მJⳆმbₛ in zip(მJⳆმBₙ, მJⳆმBₙₛ)] მJⳆმWₙ = [მJⳆმW + მJⳆმWₛ for მJⳆმW, მJⳆმWₛ in zip(მJⳆმWₙ, მJⳆმWₙₛ)] d = len(x_batch) self.Wₙ = [(1 - λ * α / d) * W - λ / d * მJⳆმW / np.linalg.norm(მJⳆმW) for W, მJⳆმW in zip(self.Wₙ, მJⳆმWₙ)] self.Bₙ = [(1 - λ * α / d) * b - λ / d * მJⳆმb / np.linalg.norm(მJⳆმb) for b, მJⳆმb in zip(self.Bₙ, მJⳆმBₙ)]
# In[5]:
[docs]@NNClass def ADAM(self, x_batch, y_batch, λ, α=0.01, beta1=0.9, beta2=0.999, epsilon=1e-8): ''' Given a minibatch (a list of tuples (xₛ, yₛ )) this will update Bₙ and Wₙ by applying Adam optimizer with L2 regularization. :param x_batch: A list/numpy array of input vectors of shape (d, 1). :type x_batch: numpy.ndarray or list :param y_batch: A list/numpy array of output vectors of shape (K, 1). :type y_batch: numpy.ndarray or list :param λ: The learning rate. :type λ: float :param α: The regularization parameter. :type α: float :param beta1: The exponential decay rate for the first moment estimates. :type beta1: float :param beta2: The exponential decay rate for the second-moment estimates. :type beta2: float :param epsilon: A small constant for numerical stability. :type epsilon: float ''' mB = [np.zeros(b.shape) for b in self.Bₙ] # momentum for Bₙ vB = [np.zeros(b.shape) for b in self.Bₙ] # RMSprop for Bₙ mW = [np.zeros(W.shape) for W in self.Wₙ] # momentum for Wₙ vW = [np.zeros(W.shape) for W in self.Wₙ] # RMSprop for Wₙ for x, y in zip(x_batch, y_batch): mJdB, mJdW = self.backprop(x, y) # update the momentum and RMSprop for Wₙ and Bₙ mB = [beta1 * mb + (1 - beta1) * mJdb for mb, mJdb in zip(mB, mJdB)] vB = [beta2 * vb + (1 - beta2) * (mJdb ** 2) for vb, mJdb in zip(vB, mJdB)] mW = [beta1 * mw + (1 - beta1) * mJdw for mw, mJdw in zip(mW, mJdW)] vW = [beta2 * vw + (1 - beta2) * (mJdw ** 2) for vw, mJdw in zip(vW, mJdW)] # update the parameters Wₙ and Bₙ d = len(x_batch) self.Wₙ = [(1 - α * λ / d) * W - λ * mb / (np.sqrt(vb) + epsilon) for W, mb, vb in zip(self.Wₙ, mW, vW)] self.Bₙ = [(1 - α * λ / d) * b - λ * mb / (np.sqrt(vb) + epsilon) for b, mb, vb in zip(self.Bₙ, mB, vB)]
# ### Feedforward # In[6]:
[docs]@NNClass def feedforward(self, x ): h, g = self.h, self.g ''' The forward pass of the network. Given an input x this will return the output of the network. :param x: The input vector of shape (d, 1) where d is the number of input features. :type x: numpy.ndarray :return: The output vector of shape (K, 1) where K is the number of classes. :rtype: numpy.ndarray ''' a = x for i, (b, W) in enumerate(zip(self.Bₙ, self.Wₙ)): z = W.T @ a + b a = h(z) if i != self.num_layers-2 else g(z) = a return
# ### Training Method # In[7]:
[docs]@NNClass def fit(self, X, y, batch_size=32, epochs=200, λ=30, α=0.01, optimizer='ADAM', val_split=0.0, patience=50, eval_train=False): ''' For each epoch, go over each minibatch and perform a gradient descent update accordingly and evaluate the model on the training and validation sets if needed. :param X: The training data arranged as a float numpy array of shape (N, d) :type X: numpy.ndarray :param y: The training labels arranged as a numpy array of shape (N,) where each element is an integer between 0 and k-1 :type y: numpy.ndarray :param batch_size: The size of the minibatches to use. :type batch_size: int :param epochs: The number of epochs to run. :type epochs: int :param λ: The learning rate. :type λ: float :param α: The regularization parameter. :type α: float :param optimizer: The optimizer to use. Can be 'ADAM' or 'SGD'. :type optimizer: str :param val_split: The ratio of the validation set to the training set. If given, per-epoch validation will be performed. :type val_split: float :param eval_train: Whether to evaluate the model on the training set. :type eval_train: bool ''' if val_split: X, y, X_v, y_v = split_data(X, y, val_split) Xc, yc = copy.deepcopy(X), copy.deepcopy(y) X, y = batchify(X, y, batch_size) if self.optimal_epochs is not None: print(f"Using optimal epochs: {self.optimal_epochs} found from an earlier run.") epochs = self.optimal_epochs # training loop self.gradient_descent = self.SGD if optimizer == 'SGD' else self.ADAM train_acc, val_acc = '', '' pbar = tqdm(range(epochs)) best_loss = np.inf val_loss = 0 bad_epochs = 0 for epoch in pbar: # randomly shuffle X, y p = np.random.permutation(len(X)) X, y = X[p], y[p] for x_batch, y_batch in zip(X, y): self.gradient_descent(x_batch, y_batch, λ) #update the parameters after learning from the mini_batch. if eval_train: train_acc = self.evaluate(Xc, yc) # update bar desc1, desc2 = f"Train Acc: {train_acc}" if eval_train else '', f" | Val Acc: {val_acc}" if val_split else '' desc3 = f" | Val Loss: {val_loss}" if val_split else '' desc = desc1 + desc2 + desc3 pbar.set_description(desc) # early stopping if val_split: val_loss = self.evaluate(X_v, y_v, loss=True) if val_loss <= best_loss: best_loss = val_loss bad_epochs = 0 else: bad_epochs += 1 if bad_epochs == patience: print(f"{patience} epochs have passed without improvement. Early stopping... \n") self.optimal_epochs = epoch - patience + 2 self.optimal_epochs = int(self.optimal_epochs * (1 + val_split)) break val_acc = self.evaluate(X_v, y_v)
# In[8]:
[docs]@NNClass def predict(self, X): ''' Predict the class of each example in X. :param X: The test data arranged as a float numpy array of shape (N, d) :type X: numpy.ndarray ''' if len(X.shape) == 2: X = X[..., np.newaxis] # return the predicted class and the probability of the predicted class return np.array([np.argmax(self.feedforward(x)) for x in X]), np.array([np.max(self.feedforward(x)) for x in X])
[docs]@NNClass def evaluate(self, X,y, loss=False): ''' Compare the one-hot vector y with the networks output yᴺ and calculate the accuracy. :param X: The test data arranged as a float numpy array of shape (N, d) :type X: numpy.ndarray :param y: The test labels arranged as a numpy array of shape (N,) where each element is an integer between 0 and k-1 :type y: numpy.ndarray ''' if len(X.shape) == 2: X = X[..., np.newaxis] if loss: return np.sum([self.J(self.feedforward(x), y) for x, y in zip(X, y)]) / len(X) else: validation_results = [(np.argmax(self.feedforward(x)), y) for (x, y) in zip(X, y)] #the index is the number itself accuracy = sum(int( == y) for (, y) in validation_results) / len(X) return round(accuracy, 2)
# #### Saving and Loading Model # In[9]:
[docs]@NNClass def save(self, path): ''' Save the model to the given path. :param path: The path to save the model to. :type path: str ''' # store self.Bₙ, self.Wₙ, self.structure, self.activation, self.optimizer with open(path, 'wb') as f: # save self.Bₙ, self.Wₙ, self.structure, self.activation, self.optimizer class_dict = {key: value for key, value in self.__dict__.items() if key in ['Bn', 'Wn', 'structure', 'activation', 'optimizer']} f.write(pickle.dumps(class_dict))
@staticmethod @NNClass def load(path): ''' Load the model from the given path. :param path: The path to load the model from. :type path: str ''' with open(path, 'rb') as f: class_dict = pickle.loads(f.read()) model = NeuralNet(class_dict['structure'], class_dict['activation'], class_dict['optimizer']) # now use setattr to set Bₙ and Wₙ for key, value in class_dict.items(): if key in ['Bn', 'Wn']: setattr(model, key, value) return model # In[10]: # if running from notebook if __name__ == '__main__': get_ipython().system('jupyter nbconvert --to script NN.ipynb')