孤立森林算法 python_用Python实现孤立森林


from random import sample, random, choice, randint

from math import ceil, log

from utils import run_time

class Node(object):

def __init__(self, size):

"""Node class to build tree leavesKeyword Arguments:size{int}-- Node size (default:{None})"""

# Node size

self.size = size

# Feature to split

self.split_feature = None

# Split point

self.split_point = None

# Left child node

self.left = None

# Right child node

self.right = None

class IsolationTree(object):

def __init__(self, X, n_samples, max_depth):

"""Isolation Tree classArguments:X{list}-- 2d list with int or floatn_samples{int}-- Subsample sizemax_depth{int}-- Maximum height of isolation tree"""

self.height = 0

# In case of n_samples is greater than n

n = len(X)

if n_samples > n:

n_samples = n

# Root node

self.root = Node(n_samples)

# Build isolation tree

self._build_tree(X, n_samples, max_depth)

def _get_split(self, X, idx, split_feature):

"""Randomly choose a split pointArguments:X{list}-- 2d list object with int or floatidx{list}-- 1d list object with intsplit_feature{int}-- Column index of XReturns:int -- split point"""

# The split point should be greater than min(X[feature])

unique = set(map(lambda i: X[i][split_feature], idx))

# Cannot split

if len(unique) == 1:

return None


x_min, x_max = min(unique), max(unique)

# Caution: random() -> x in the interval [0, 1).

return random() * (x_max - x_min) + x_min

def _build_tree(self, X, n_samples, max_depth):

"""The current node data space is divided into 2 sub space: less than thesplit point in the specified dimension on the left child of the current node,put greater than or equal to split point data on the current node's right child.Recursively construct new child nodes until the data cannot be splitted in thechild nodes or the child nodes have reached the max_depth.Arguments:X{list}-- 2d list object with int or floatn_samples{int}-- Subsample sizemax_depth{int}-- Maximum depth of IsolationTree"""

# Dataset shape

m = len(X[0])

n = len(X)

# Randomly selected sample points into the root node of the tree

idx = sample(range(n), n_samples)

# Depth, Node and idx

que = [[0, self.root, idx]]


while que and que[0][0] <= max_depth:

depth, nd, idx = que.pop(0)

# Stop split if X cannot be splitted

nd.split_feature = choice(range(m))

nd.split_point = self._get_split(X, idx, nd.split_feature)

if nd.split_point is None:


# Split

idx_left = []

idx_right = []

while idx:

i = idx.pop()

xi = X[i][nd.split_feature]

if xi < nd.split_point:




# Generate left and right child

nd.left = Node(len(idx_left))

nd.right = Node(len(idx_right))

# Put the left and child into the que and depth plus one

que.append([depth+1, nd.left, idx_left])

que.append([depth+1, nd.right, idx_right])

# Update the height of IsolationTree

self.height = depth

def _predict(self, xi):

"""Auxiliary function of predict.Arguments:xi{list}-- 1D list with int or floatReturns:int -- the depth of the node which the xi belongs to"""

# Search xi from the IsolationTree until xi is at an leafnode

nd = self.root

depth = 0

while nd.left and nd.right:

if xi[nd.split_feature] < nd.split_point:

nd = nd.left


nd = nd.right

depth += 1

return depth, nd.size

class IsolationForest(object):

def __init__(self):

"""IsolationForest, randomly build some IsolationTree instance,and the average score of each IsolationTreeAttributes:trees{list}-- 1d list with IsolationTree objectsajustment{float}"""

self.trees = None

self.adjustment = None # TBC

def fit(self, X, n_samples=100, max_depth=10, n_trees=256):

"""Build IsolationForest with dataset XArguments:X{list}-- 2d list with int or floatKeyword Arguments:n_samples{int}-- According to paper, set number of samples to 256 (default:{256})max_depth{int}-- Tree height limit (default:{10})n_trees{int}-- According to paper, set number of trees to 100 (default:{100})"""

self.adjustment = self._get_adjustment(n_samples)

self.trees = [IsolationTree(X, n_samples, max_depth)

for _ in range(n_trees)]

def _get_adjustment(self, node_size):

"""Calculate adjustment according to the formula in the paper.Arguments:node_size{int}-- Number of leaf nodesReturns:float -- ajustment"""

if node_size > 2:

i = node_size - 1

ret = 2 * (log(i) + 0.5772156649) - 2 * i / node_size

elif node_size == 2:

ret = 1


ret = 0

return ret

def _predict(self, xi):

"""Auxiliary function of predict.Arguments:xi{list}-- 1d list object with int or floatReturns:list -- 1d list object with float"""

# Calculate average score of xi at each tree

score = 0

n_trees = len(self.trees)

for tree in self.trees:

depth, node_size = tree._predict(xi)

score += (depth + self._get_adjustment(node_size))

score = score / n_trees

# Scale

return 2 ** -(score / self.adjustment)

def predict(self, X):

"""Get the prediction of y.Arguments:X{list}-- 2d list object with int or floatReturns:list -- 1d list object with float"""

return [self._predict(xi) for xi in X]


def main():

print("Comparing average score of X and outlier's score...")

# Generate a dataset randomly

n = 100

X = [[random() for _ in range(5)] for _ in range(n)]

# Add outliers


# Train model

clf = IsolationForest()

clf.fit(X, n_samples=500)

# Show result

print("Average score is%.2f" % (sum(clf.predict(X)) / len(X)))

print("Outlier's score is%.2f" % clf._predict(X[-1]))

if __name__ == "__main__":



