353 lines
10 KiB
Python
353 lines
10 KiB
Python
from csv import reader
|
|
from random import seed
|
|
from random import randrange
|
|
from math import sqrt
|
|
from math import exp
|
|
from math import pi
|
|
|
|
def separator():
|
|
print("\n* --------------------------------")
|
|
|
|
def read_csv(filename):
|
|
dataset = list()
|
|
with open(filename, 'r') as file:
|
|
for row in reader(file):
|
|
if not row:
|
|
continue
|
|
dataset.append(row)
|
|
return dataset
|
|
|
|
# convert string column to float
|
|
def str_column_to_float(dataset, column):
|
|
for row in dataset:
|
|
row[column] = float(row[column].strip())
|
|
|
|
# convert string column to integer
|
|
def str_column_to_int(dataset, column):
|
|
class_ids = [row[column] for row in dataset]
|
|
unique = set(class_ids)
|
|
lookup = dict()
|
|
print("Class IDs:")
|
|
for i, value in enumerate(unique):
|
|
lookup[value] = i
|
|
print('[%s] => %d' % (value, i))
|
|
for row in dataset:
|
|
row[column] = lookup[row[column]]
|
|
return lookup
|
|
|
|
|
|
####################
|
|
# Naive Bayes iris #
|
|
####################
|
|
|
|
# split a dataset into n folds
|
|
def cross_validation_split(dataset, n_folds):
|
|
dataset_split = list()
|
|
fold_size = int(len(dataset) / n_folds)
|
|
for _ in range(n_folds):
|
|
fold = list()
|
|
while len(fold) < fold_size:
|
|
index = randrange(len(list(dataset)))
|
|
fold.append((list(dataset)).pop(index))
|
|
dataset_split.append(fold)
|
|
# print(dataset_split)
|
|
return dataset_split
|
|
|
|
# calculate accuracy (in per cent)
|
|
def accuracy_metric(actual, predicted):
|
|
correct = 0
|
|
for i in range(len(actual)):
|
|
if actual[i] == predicted[i]:
|
|
correct += 1
|
|
return correct / float(len(actual)) * 100.0
|
|
|
|
# evaluate using a cross validation split
|
|
def evaluate_algorithm(dataset, n_folds, algorithm, *args):
|
|
folds = cross_validation_split(dataset, n_folds)
|
|
scores = list()
|
|
for fold in folds:
|
|
train_set = list(folds)
|
|
train_set.remove(fold)
|
|
train_set = sum(train_set, [])
|
|
test_set = list()
|
|
for row in fold:
|
|
row_copy = list(row)
|
|
# get a row from each fold
|
|
test_set.append(row_copy)
|
|
# we don't know the class yet
|
|
row_copy[-1] = None
|
|
predicted = algorithm(train_set, test_set, *args)
|
|
actual = [row[-1] for row in fold]
|
|
# calculate accuracy
|
|
accuracy = accuracy_metric(actual, predicted)
|
|
scores.append(accuracy)
|
|
return scores
|
|
|
|
# split the dataset by class values, return a dictionary
|
|
def separate_by_class(dataset):
|
|
separated = dict()
|
|
for i in range(len(dataset)):
|
|
vector = dataset[i]
|
|
class_id = vector[-1]
|
|
if (class_id not in separated):
|
|
separated[class_id] = list()
|
|
separated[class_id].append(vector)
|
|
return separated
|
|
|
|
# mean of a list of numbers
|
|
def mean(numbers):
|
|
return sum(numbers)/float(len(numbers))
|
|
|
|
# calculate the standard deviation (sigma) of a list of numbers
|
|
def stdev(numbers):
|
|
variance = sum([(x-(mean(numbers)))**2 for x in numbers]) / float(len(numbers)-1)
|
|
return sqrt(variance)
|
|
|
|
# summarize - the mean, standard deviation (sigma) and count for each column in a dataset
|
|
def summarize_dataset(dataset):
|
|
summaries = [(mean(column), stdev(column), len(column)) for column in zip(*dataset)]
|
|
del(summaries[-1])
|
|
return summaries
|
|
|
|
# calculate statistics for each row of the dataset split by class
|
|
def summarize_by_class(dataset):
|
|
summaries = dict()
|
|
for class_id, rows in (separate_by_class(dataset)).items():
|
|
summaries[class_id] = summarize_dataset(rows)
|
|
return summaries
|
|
|
|
# Gaussian probability distribution function for x
|
|
def gaussian_probability(x, mean, stdev):
|
|
return (1 / (sqrt(2 * pi) * stdev)) * (exp(-((x-mean)**2 / (2 * stdev**2 ))))
|
|
|
|
# calculate the probabilities of predicting each class for a given row
|
|
def class_probability_predictions(summaries, row):
|
|
total_rows = sum([summaries[label][0][2] for label in summaries])
|
|
probabilities = dict()
|
|
for class_id, class_summaries in summaries.items():
|
|
# for iris, we only care about 3 classes (0-2)
|
|
probabilities[class_id] = summaries[class_id][0][2]/float(total_rows)
|
|
for i in range(len(class_summaries)):
|
|
mean, stdev, _ = class_summaries[i]
|
|
probabilities[class_id] *= gaussian_probability(row[i], mean, stdev)
|
|
return probabilities
|
|
|
|
# predict the class for a given row
|
|
def predict(summaries, row):
|
|
probabilities = class_probability_predictions(summaries, row)
|
|
best_label, best_probability = None, -1
|
|
for class_id, probability in probabilities.items():
|
|
if best_label is None or probability > best_probability:
|
|
best_probability= probability
|
|
best_label = class_id
|
|
return best_label
|
|
|
|
# da thing, the magic
|
|
def naive_bayes(train, test):
|
|
summarize = summarize_by_class(train)
|
|
predictions = list()
|
|
for row in test:
|
|
output = predict(summarize, row)
|
|
predictions.append(output)
|
|
return(predictions)
|
|
|
|
|
|
|
|
|
|
separator()
|
|
print('-- Naive Bayes on iris dataset\n')
|
|
|
|
# run naive bayes on the example dataset
|
|
seed(1)
|
|
filename = 'iris.txt'
|
|
dataset = read_csv(filename)
|
|
for i in range(len(dataset[0])-1):
|
|
str_column_to_float(dataset, i)
|
|
|
|
# convert class column to integers
|
|
str_column_to_int(dataset, len(dataset[0])-1)
|
|
|
|
# evaluate algorithm
|
|
n_folds = 10
|
|
scores = evaluate_algorithm(dataset, n_folds, naive_bayes)
|
|
print('Scores: %s' % scores)
|
|
print('Mean Accuracy: %.3f%%' % (sum(scores)/float(len(scores))))
|
|
|
|
str_column_to_int(dataset, len(dataset[0])-1)
|
|
# fit model
|
|
model = summarize_by_class(dataset)
|
|
# define a new record
|
|
row = [1.7,0.8,5.3,0.2]
|
|
# predict the label
|
|
label = predict(model, row)
|
|
print('Data=%s, Predicted: %s' % (row, label))
|
|
|
|
|
|
|
|
############
|
|
# kNN iris #
|
|
############
|
|
|
|
def dataset_minmax(dataset):
|
|
minmax = list()
|
|
for i in range(len(dataset[0])):
|
|
value_min = min([row[i] for row in dataset])
|
|
value_max = max([row[i] for row in dataset])
|
|
minmax.append([value_min, value_max])
|
|
return minmax
|
|
|
|
def normalize_dataset(dataset, minmax):
|
|
for row in dataset:
|
|
for i in range(len(row)):
|
|
row[i] = (row[i] - minmax[i][0]) / (minmax[i][1] - minmax[i][0])
|
|
|
|
def cross_validation_split(dataset, n_folds):
|
|
dataset_split = list()
|
|
dataset_copy = list(dataset)
|
|
for _ in range(n_folds):
|
|
fold = list()
|
|
while len(fold) < (int(len(dataset) / n_folds)):
|
|
index = randrange(len(dataset_copy))
|
|
fold.append(dataset_copy.pop(index))
|
|
dataset_split.append(fold)
|
|
return dataset_split
|
|
|
|
def accuracy_metric(actual, predicted):
|
|
correct = 0
|
|
for i in range(len(actual)):
|
|
if actual[i] == predicted[i]:
|
|
correct += 1
|
|
return correct / float(len(actual)) * 100.0
|
|
|
|
def evaluate_knn_algorithm(dataset, algorithm, n_folds, *args):
|
|
folds = cross_validation_split(dataset, n_folds)
|
|
scores = list()
|
|
for fold in folds:
|
|
train_set = list(folds)
|
|
train_set.remove(fold)
|
|
train_set = sum(train_set, [])
|
|
test_set = list()
|
|
for row in fold:
|
|
row_copy = list(row)
|
|
test_set.append(row_copy)
|
|
row_copy[-1] = None
|
|
predicted = algorithm(train_set, test_set, *args)
|
|
accuracy = accuracy_metric([row[-1] for row in fold], predicted)
|
|
scores.append(accuracy)
|
|
return scores
|
|
|
|
def euclidean_distance(row1, row2):
|
|
distance = 0.0
|
|
for i in range(len(row1)-1):
|
|
distance += (row1[i] - row2[i])**2
|
|
return sqrt(distance)
|
|
|
|
# find neighbours
|
|
def get_neighbours(train_set, test_row, num_neighbours):
|
|
distances = list()
|
|
for train_row in train_set:
|
|
dist = euclidean_distance(test_row, train_row)
|
|
distances.append((train_row, dist))
|
|
distances.sort(key=lambda tup: tup[1])
|
|
neighbours = list()
|
|
for i in range(num_neighbours):
|
|
neighbours.append(distances[i][0])
|
|
return neighbours
|
|
|
|
# try to make a prediction with neighbours
|
|
def predict_classification(train_set, test_row, num_neighbours):
|
|
neighbours = get_neighbours(train_set, test_row, num_neighbours)
|
|
# output_values = [row[-1] for row in neighbours]
|
|
# prediction = max(set(output_values), key=output_values.count)
|
|
prediction = max(set([row[-1] for row in neighbours]), key=([row[-1] for row in neighbours]).count)
|
|
return prediction
|
|
|
|
# kNN Algorithm
|
|
def k_nearest_neighbours(train_set, test, num_neighbours):
|
|
predictions = list()
|
|
for row in test:
|
|
output = predict_classification(train_set, row, num_neighbours)
|
|
predictions.append(output)
|
|
return(predictions)
|
|
|
|
|
|
separator()
|
|
print('-- kNN on iris dataset\n')
|
|
filename = 'iris.txt'
|
|
dataset = read_csv(filename)
|
|
for i in range(len(dataset[0])-1):
|
|
str_column_to_float(dataset, i)
|
|
# convert class column to integers
|
|
str_column_to_int(dataset, len(dataset[0])-1)
|
|
num_neighbours = 3
|
|
eval_scores = evaluate_knn_algorithm(dataset, k_nearest_neighbours, n_folds, num_neighbours)
|
|
print('Scores: %s' % eval_scores)
|
|
print('Mean Accuracy: %.3f%%' % (sum(eval_scores)/float(len(eval_scores))))
|
|
# predict stuff
|
|
row = [2.8,0.7,1.2,0.3]
|
|
# predict the label
|
|
label = predict_classification(dataset, row, num_neighbours)
|
|
print('Data=%s, Predicted: %s' % (row, label))
|
|
|
|
|
|
separator()
|
|
print('-- Naive Bayes on weather dataset\n')
|
|
|
|
|
|
separator()
|
|
print('-- kNN on weather dataset\n')
|
|
|
|
dontwanna = ['Date', 'Location', 'Evaporation', 'Sunshine', 'WindGustDir', 'WindDir9am', 'WindDir3pm']
|
|
|
|
# def read_w_csv(filename):
|
|
# dataset = list()
|
|
# with open(filename, 'r') as file:
|
|
# for row in reader(file):
|
|
# if row in dontwanna:
|
|
# continue
|
|
# if not row:
|
|
# continue
|
|
# dataset.append(row)
|
|
# return dataset
|
|
|
|
filename = 'weatherAUS.csv'
|
|
|
|
# dataset = read_w_csv(filename)
|
|
# dataset.drop(columns=dontwanna, inplace=True)
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
dataset=pd.read_csv(filename)
|
|
with open(filename, 'r') as f:
|
|
header = next(reader(f))
|
|
|
|
|
|
# rm unneeded columns
|
|
dataset = pd.read_csv(filename, usecols=list(set(header) - {'Date', 'Location', 'Evaporation', 'Sunshine', 'WindGustDir', 'WindDir9am', 'WindDir3pm'}))
|
|
print(dataset.head())
|
|
|
|
# we need proper boolean values
|
|
dataset['RainToday'].replace(['Yes'],int("1"),inplace=True,method=...)
|
|
dataset['RainToday'].replace(['No'],int("0"),inplace=True,method=...)
|
|
dataset['RainTomorrow'].replace(['Yes'],int("1"),inplace=True,method=...)
|
|
dataset['RainTomorrow'].replace(['No'],int("0"),inplace=True,method=...)
|
|
|
|
print(dataset.head())
|
|
|
|
ds = pd.DataFrame(dataset.head(20))
|
|
# get rid of empty fields (fill them up with column means)
|
|
for col in ds.columns:
|
|
ds[col] = ds[col].fillna(np.mean(ds[col]))
|
|
print(ds.shape)
|
|
print(ds.head())
|
|
|
|
num_neighbours = 3
|
|
n_folds = 10
|
|
eval_scores = evaluate_knn_algorithm(ds, k_nearest_neighbours, n_folds, num_neighbours)
|
|
print('Scores: %s' % eval_scores)
|
|
print('Mean Accuracy: %.3f%%' % (sum(eval_scores)/float(len(eval_scores))))
|
|
# predict stuff
|
|
row = [4,17.5,32.3,1.0,41.0,7.0,20.0,8.0,1,3,5,7,17.8,29.7,0.0,0.2,0]
|
|
# predict the label
|
|
label = predict_classification(ds, row, num_neighbours)
|
|
print('Data=%s, Predicted: %s' % (row, label)) |