GradientDescent:Example
Linear regression & Gradient descent 를 활용한 Python Example.
Code
#!/usr/bin/env python3
import sys
import optparse
import math
import numpy as np
import matplotlib.pyplot as plt
W1_INIT = 0.1
W2_INIT = 1.0
LR_INIT = 0.5
W1_MIN = -0.1
W1_MAX = 1.5
W2_MIN = -0.5
W2_MAX = 0.5
ELEM_COUNT = 100
TIMER_MILLISEC = 1000
class Data:
"""
Numpy-array that is an x, y data pair.
"""
DEFAULT_ELEM_COUNT = ELEM_COUNT
DATA_MIN = 0.0
DATA_MAX = 1.0
def __init__(self):
self.x = np.array([])
self.y = np.array([])
def random_linear0(self, elem_count: int=DEFAULT_ELEM_COUNT):
self.x = np.arange(elem_count, dtype=np.float)
self.y = 0.4 * self.x + 3.0 + np.random.uniform(-10, 10, size=(elem_count,))
self.x /= np.max(self.x)
self.y /= np.max(self.y)
def random_linear1(self, elem_count: int=DEFAULT_ELEM_COUNT):
self.x = np.linspace(1.0, 10.0, elem_count)[:, np.newaxis]
self.y = np.sin(self.x) + 0.1 * np.power(self.x, 2) + 0.5 * np.random.randn(elem_count, 1)
self.x /= np.max(self.x)
self.y /= np.max(self.y)
def random_linear(self, elem_count: int=DEFAULT_ELEM_COUNT, i: int=0):
if i == 0:
self.random_linear0(elem_count)
elif i == 1:
self.random_linear1(elem_count)
def calc_predicated(a, x, b):
return a * x + b
def calc_error(a, x, b, y):
"""
Error value of predicated result.
"""
return y - calc_predicated(a, x, b)
def calc_cost_w1(w1, w2, x: np.ndarray, y: np.ndarray):
s = 0
elem_size = len(x.tolist())
for i in range(elem_size):
s += (w1 * x[i] - y[i]) ** 2
return s / elem_size
def w1_points(w1, w2, data: Data, w_min: float=W1_MIN, w_max: float=W1_MAX, elem_count=ELEM_COUNT):
w_range = np.arange(w_min, w_max, math.fabs(w_max - w_min) / float(elem_count))
return w_range, np.array([calc_cost_w1(w, w2, data.x, data.y) for w in w_range])
def w2_points(w1, w2, data: Data, w_min: float=W2_MIN, w_max: float=W2_MAX, elem_count=ELEM_COUNT):
w_range = np.arange(w_min, w_max, math.fabs(w_max - w_min) / float(elem_count))
return w_range, np.array([calc_cost_w1(w1, w, data.x, data.y) for w in w_range])
# def w1_points_all(w2, data: Data, w_min: int=-1, w_max: int=1, elem_count=100):
# w_range = np.arange(w_min, w_max, math.fabs(w_max - w_min) / float(elem_count))
# return w_range, np.array([calc_cost(w, w2, data.x, data.y) for w in w_range])
#
#
# def w2_points_all(w1, data: Data, w_min: int=-1, w_max: int=1, elem_count=100):
# w_range = np.arange(w_min, w_max, math.fabs(w_max - w_min) / float(elem_count))
# return w_range, np.array([calc_cost(w1, w, data.x, data.y) for w in w_range])
def gradient_descent_w1(w1, w2, x: np.ndarray, y: np.ndarray, learning_rate: float):
s = 0
elem_size = len(x.tolist())
for i in range(elem_size):
s += (w1 * x[i] - y[i]) * x[i]
return w1 - (learning_rate * s / elem_size)
def gradient_descent_w2(w1, w2, x: np.ndarray, y: np.ndarray, learning_rate: float):
s = 0
elem_size = len(x.tolist())
for i in range(elem_size):
s += (w1 * x[i] + w2 - y[i])
return w2 - (learning_rate * s / elem_size)
class Cost:
"""
Cost class.
"""
def __init__(self, w1=0.5, w2=0.5):
self.w1 = w1
self.w2 = w2
def __str__(self):
return 'Cost w1({}), w2({})'.format(self.w1, self.w2)
def gradient_descent(self, data: Data, learning_rate: float):
self.w1 = gradient_descent_w1(self.w1, self.w2, data.x, data.y, learning_rate)
self.w2 = gradient_descent_w2(self.w1, self.w2, data.x, data.y, learning_rate)
class Subplot:
"""
Subplot class.
"""
def __init__(self, axes: plt.Axes, **kwargs):
self.axes = axes
self.polyline = Data()
self.points = Data()
self.plot_polyline, = self.axes.plot(self.polyline.x, self.polyline.y, kwargs['polyline_sytle'] if 'polyline_sytle' in kwargs else '-')
self.plot_points, = self.axes.plot(self.points.x, self.points.y, kwargs['points_sytle'] if 'points_sytle' in kwargs else 'o')
self.axes.set_title(kwargs['title'] if 'title' in kwargs else 'Subplot')
def update(self):
self.plot_polyline.set_data(self.polyline.x, self.polyline.y)
self.plot_points.set_data(self.points.x, self.points.y)
self.axes.relim()
self.axes.autoscale_view()
def update_linear(self, cost: Cost):
self.polyline.x = self.points.x
self.polyline.y = np.array([calc_predicated(cost.w1, x, cost.w2) for x in self.polyline.x])
def update_w1(self, cost: Cost, data: Data):
self.polyline.x, self.polyline.y = w1_points(cost.w1, cost.w2, data)
#self.polyline.x, self.polyline.y = w1_points_all(cost.w2, data)
self.points.x = np.append(self.points.x, cost.w1)
self.points.y = np.append(self.points.y, calc_cost_w1(cost.w1, cost.w2, data.x, data.y))
#self.points.y = np.append(self.points.y, calc_cost(cost.w1, cost.w2, data.x, data.y))
def update_w2(self, cost: Cost, data: Data):
#self.polyline.x, self.polyline.y = w2_points(cost.w1, cost.w2, data)
#self.polyline.x, self.polyline.y = w2_points_all(cost.w1, data)
self.points.x = np.append(self.points.x, cost.w2)
self.points.y = np.append(self.points.y, calc_cost_w1(cost.w1, cost.w2, data.x, data.y))
#self.points.y = np.append(self.points.y, calc_cost(cost.w1, cost.w2, data.x, data.y))
# def calc_cost_func(w1, w2, x: np.ndarray, y: np.ndarray):
# return (w1 * x + w2) ** 2)
def calc_cost_func2(w1, w2, x: np.ndarray, y: np.ndarray):
w1 = np.atleast_3d(np.asarray(w1))
w2 = np.atleast_3d(np.asarray(w2))
return np.average((y - (w1 * x + w2)) ** 2, axis=2) / 2
class ContourSubplot:
"""
ContourSubplot class.
"""
def __init__(self, axes: plt.Axes, **kwargs):
self.axes = axes
self.points = Data()
#self.plot_points, = self.axes.plot(self.points.x, self.points.y, kwargs['points_sytle'] if 'points_sytle' in kwargs else 'o')
self.axes.set_title(kwargs['title'] if 'title' in kwargs else 'Subplot')
#self.axes.axis([-5, 5, -5, 5])
def update(self):
#self.plot_points.set_data(self.points.x, self.points.y)
#self.axes.relim()
#self.axes.autoscale_view()
pass
def init_first(self, data: Data):
n = 100
x = np.linspace(W1_MIN, W1_MAX, n)
y = np.linspace(W2_MIN, W2_MAX, n)
xx, yy = np.meshgrid(x, y)
zz = calc_cost_func2(x[:,np.newaxis,np.newaxis], y[np.newaxis,:,np.newaxis], data.x, data.y)
#zz = calc_cost_func(xx, yy, data.x, data.y)
#zz = np.array([calc_cost_w1(xx[i], yy[i], data.x, data.y) for i in range(len(x.tolist()))])
self.axes.contourf(xx, yy, zz, alpha=.75, cmap='jet')
self.axes.contour(xx, yy, zz, colors='black', linewidths=.5)
def update_gradient(self, w1_data: Data, w2_data: Data, data: Data):
self.init_first(data)
def update_w1_w2(self, cost: Cost):
self.points.x = np.append(self.points.x, cost.w1)
self.points.y = np.append(self.points.y, cost.w2)
self.axes.plot(self.points.x, self.points.y, 'o')
class Plot:
"""
Plot class.
"""
def __init__(self, w1=0.5, w2=0.5, learning_rate=0.001):
self.cost = Cost(w1, w2)
self.learning_rate = learning_rate
self.fig = plt.figure()
self.linear_subplot = Subplot(self.fig.add_subplot(2, 2, 1), title='Linear')
self.cost1_subplot = Subplot(self.fig.add_subplot(2, 2, 2), title='Cost/W1')
self.cost2_subplot = Subplot(self.fig.add_subplot(2, 2, 3), title='Cost/W2')
self.gradient_subplot = ContourSubplot(self.fig.add_subplot(2, 2, 4), title='W2/W1')
self.timer = object()
self.iter = 0
def add_count(self):
self.iter += 1
@staticmethod
def timer_cb(plot):
plot.update()
def update(self):
self.run()
self.draw()
def run(self, count=1):
for _ in range(count):
self.cost.gradient_descent(self.linear_subplot.points, self.learning_rate)
print('Update iterator: {} -> {}'.format(self.iter, self.cost))
self.linear_subplot.update_linear(self.cost)
self.linear_subplot.update()
self.cost1_subplot.update_w1(self.cost, self.linear_subplot.points)
self.cost1_subplot.update()
self.cost2_subplot.update_w2(self.cost, self.linear_subplot.points)
self.cost2_subplot.update()
self.gradient_subplot.update_gradient(self.cost1_subplot.points, self.cost2_subplot.points, self.linear_subplot.points)
self.gradient_subplot.update_w1_w2(self.cost)
self.gradient_subplot.update()
self.add_count()
def start_timer(self, millisec=1000):
self.timer = self.fig.canvas.new_timer(interval=millisec,
callbacks=[(self.timer_cb, [self], {})])
self.timer.start()
def init_raws(self, elem_count, i):
self.linear_subplot.points.random_linear(int(elem_count), int(i))
#self.gradient_subplot.init_first(self.linear_subplot.points)
def draw(self):
self.fig.canvas.draw()
@staticmethod
def show():
plt.show()
def main(opts, args):
plot = Plot(W1_INIT, W2_INIT, LR_INIT)
plot.init_raws(opts.count, opts.random)
plot.start_timer(TIMER_MILLISEC)
plot.show()
if __name__ == '__main__':
parser = optparse.OptionParser()
parser.add_option('-r', '--random',
dest='random',
metavar='{num}',
default=0,
help='Random method number.')
parser.add_option('-c', '--count',
dest='count',
metavar='{num}',
default=100,
help='Sample element count.')
options, args = parser.parse_args(sys.argv)
main(options, args)
See also
Favorite site
References
-
Visualizing_the_gradient_descent_method.pdf ↩