Skip to content

GradientDescent:Example

Linear regression & Gradient descent 를 활용한 Python Example.

Code

#!/usr/bin/env python3

import sys
import optparse
import math

import numpy as np
import matplotlib.pyplot as plt


W1_INIT = 0.1
W2_INIT = 1.0
LR_INIT = 0.5

W1_MIN = -0.1
W1_MAX = 1.5

W2_MIN = -0.5
W2_MAX = 0.5

ELEM_COUNT = 100

TIMER_MILLISEC = 1000


class Data:
    """
    Numpy-array that is an x, y data pair.
    """

    DEFAULT_ELEM_COUNT = ELEM_COUNT
    DATA_MIN = 0.0
    DATA_MAX = 1.0

    def __init__(self):
        self.x = np.array([])
        self.y = np.array([])

    def random_linear0(self, elem_count: int=DEFAULT_ELEM_COUNT):
        self.x = np.arange(elem_count, dtype=np.float)
        self.y = 0.4 * self.x + 3.0 + np.random.uniform(-10, 10, size=(elem_count,))
        self.x /= np.max(self.x)
        self.y /= np.max(self.y)

    def random_linear1(self, elem_count: int=DEFAULT_ELEM_COUNT):
        self.x = np.linspace(1.0, 10.0, elem_count)[:, np.newaxis]
        self.y = np.sin(self.x) + 0.1 * np.power(self.x, 2) + 0.5 * np.random.randn(elem_count, 1)
        self.x /= np.max(self.x)
        self.y /= np.max(self.y)

    def random_linear(self, elem_count: int=DEFAULT_ELEM_COUNT, i: int=0):
        if i == 0:
            self.random_linear0(elem_count)
        elif i == 1:
            self.random_linear1(elem_count)


def calc_predicated(a, x, b):
    return a * x + b


def calc_error(a, x, b, y):
    """
    Error value of predicated result.
    """
    return y - calc_predicated(a, x, b)


def calc_cost_w1(w1, w2, x: np.ndarray, y: np.ndarray):
    s = 0
    elem_size = len(x.tolist())
    for i in range(elem_size):
        s += (w1 * x[i] - y[i]) ** 2
    return s / elem_size


def w1_points(w1, w2, data: Data, w_min: float=W1_MIN, w_max: float=W1_MAX, elem_count=ELEM_COUNT):
    w_range = np.arange(w_min, w_max, math.fabs(w_max - w_min) / float(elem_count))
    return w_range, np.array([calc_cost_w1(w, w2, data.x, data.y) for w in w_range])


def w2_points(w1, w2, data: Data, w_min: float=W2_MIN, w_max: float=W2_MAX, elem_count=ELEM_COUNT):
    w_range = np.arange(w_min, w_max, math.fabs(w_max - w_min) / float(elem_count))
    return w_range, np.array([calc_cost_w1(w1, w, data.x, data.y) for w in w_range])


# def w1_points_all(w2, data: Data, w_min: int=-1, w_max: int=1, elem_count=100):
#     w_range = np.arange(w_min, w_max, math.fabs(w_max - w_min) / float(elem_count))
#     return w_range, np.array([calc_cost(w, w2, data.x, data.y) for w in w_range])
#
#
# def w2_points_all(w1, data: Data, w_min: int=-1, w_max: int=1, elem_count=100):
#     w_range = np.arange(w_min, w_max, math.fabs(w_max - w_min) / float(elem_count))
#     return w_range, np.array([calc_cost(w1, w, data.x, data.y) for w in w_range])


def gradient_descent_w1(w1, w2, x: np.ndarray, y: np.ndarray, learning_rate: float):
    s = 0
    elem_size = len(x.tolist())
    for i in range(elem_size):
        s += (w1 * x[i] - y[i]) * x[i]
    return w1 - (learning_rate * s / elem_size)


def gradient_descent_w2(w1, w2, x: np.ndarray, y: np.ndarray, learning_rate: float):
    s = 0
    elem_size = len(x.tolist())
    for i in range(elem_size):
        s += (w1 * x[i] + w2 - y[i])
    return w2 - (learning_rate * s / elem_size)


class Cost:
    """
    Cost class.
    """

    def __init__(self, w1=0.5, w2=0.5):
        self.w1 = w1
        self.w2 = w2

    def __str__(self):
        return 'Cost w1({}), w2({})'.format(self.w1, self.w2)

    def gradient_descent(self, data: Data, learning_rate: float):
        self.w1 = gradient_descent_w1(self.w1, self.w2, data.x, data.y, learning_rate)
        self.w2 = gradient_descent_w2(self.w1, self.w2, data.x, data.y, learning_rate)


class Subplot:
    """
    Subplot class.
    """

    def __init__(self, axes: plt.Axes, **kwargs):
        self.axes = axes
        self.polyline = Data()
        self.points = Data()
        self.plot_polyline, = self.axes.plot(self.polyline.x, self.polyline.y, kwargs['polyline_sytle'] if 'polyline_sytle' in kwargs else '-')
        self.plot_points, = self.axes.plot(self.points.x, self.points.y, kwargs['points_sytle'] if 'points_sytle' in kwargs else 'o')
        self.axes.set_title(kwargs['title'] if 'title' in kwargs else 'Subplot')

    def update(self):
        self.plot_polyline.set_data(self.polyline.x, self.polyline.y)
        self.plot_points.set_data(self.points.x, self.points.y)
        self.axes.relim()
        self.axes.autoscale_view()

    def update_linear(self, cost: Cost):
        self.polyline.x = self.points.x
        self.polyline.y = np.array([calc_predicated(cost.w1, x, cost.w2) for x in self.polyline.x])

    def update_w1(self, cost: Cost, data: Data):
        self.polyline.x, self.polyline.y = w1_points(cost.w1, cost.w2, data)
        #self.polyline.x, self.polyline.y = w1_points_all(cost.w2, data)
        self.points.x = np.append(self.points.x, cost.w1)
        self.points.y = np.append(self.points.y, calc_cost_w1(cost.w1, cost.w2, data.x, data.y))
        #self.points.y = np.append(self.points.y, calc_cost(cost.w1, cost.w2, data.x, data.y))

    def update_w2(self, cost: Cost, data: Data):
        #self.polyline.x, self.polyline.y = w2_points(cost.w1, cost.w2, data)
        #self.polyline.x, self.polyline.y = w2_points_all(cost.w1, data)
        self.points.x = np.append(self.points.x, cost.w2)
        self.points.y = np.append(self.points.y, calc_cost_w1(cost.w1, cost.w2, data.x, data.y))
        #self.points.y = np.append(self.points.y, calc_cost(cost.w1, cost.w2, data.x, data.y))



# def calc_cost_func(w1, w2, x: np.ndarray, y: np.ndarray):
#     return (w1 * x + w2) ** 2)


def calc_cost_func2(w1, w2, x: np.ndarray, y: np.ndarray):
    w1 = np.atleast_3d(np.asarray(w1))
    w2 = np.atleast_3d(np.asarray(w2))
    return np.average((y - (w1 * x + w2)) ** 2, axis=2) / 2


class ContourSubplot:
    """
    ContourSubplot class.
    """

    def __init__(self, axes: plt.Axes, **kwargs):
        self.axes = axes
        self.points = Data()
        #self.plot_points, = self.axes.plot(self.points.x, self.points.y, kwargs['points_sytle'] if 'points_sytle' in kwargs else 'o')
        self.axes.set_title(kwargs['title'] if 'title' in kwargs else 'Subplot')
        #self.axes.axis([-5, 5, -5, 5])

    def update(self):
        #self.plot_points.set_data(self.points.x, self.points.y)
        #self.axes.relim()
        #self.axes.autoscale_view()
        pass

    def init_first(self, data: Data):
        n = 100
        x = np.linspace(W1_MIN, W1_MAX, n)
        y = np.linspace(W2_MIN, W2_MAX, n)
        xx, yy = np.meshgrid(x, y)

        zz = calc_cost_func2(x[:,np.newaxis,np.newaxis], y[np.newaxis,:,np.newaxis], data.x, data.y)
        #zz = calc_cost_func(xx, yy, data.x, data.y)
        #zz = np.array([calc_cost_w1(xx[i], yy[i], data.x, data.y) for i in range(len(x.tolist()))])

        self.axes.contourf(xx, yy, zz, alpha=.75, cmap='jet')
        self.axes.contour(xx, yy, zz, colors='black', linewidths=.5)

    def update_gradient(self, w1_data: Data, w2_data: Data, data: Data):
        self.init_first(data)

    def update_w1_w2(self, cost: Cost):
        self.points.x = np.append(self.points.x, cost.w1)
        self.points.y = np.append(self.points.y, cost.w2)
        self.axes.plot(self.points.x, self.points.y, 'o')


class Plot:
    """
    Plot class.
    """

    def __init__(self, w1=0.5, w2=0.5, learning_rate=0.001):
        self.cost = Cost(w1, w2)
        self.learning_rate = learning_rate
        self.fig = plt.figure()
        self.linear_subplot = Subplot(self.fig.add_subplot(2, 2, 1), title='Linear')
        self.cost1_subplot = Subplot(self.fig.add_subplot(2, 2, 2), title='Cost/W1')
        self.cost2_subplot = Subplot(self.fig.add_subplot(2, 2, 3), title='Cost/W2')
        self.gradient_subplot = ContourSubplot(self.fig.add_subplot(2, 2, 4), title='W2/W1')

        self.timer = object()
        self.iter = 0

    def add_count(self):
        self.iter += 1

    @staticmethod
    def timer_cb(plot):
        plot.update()

    def update(self):
        self.run()
        self.draw()

    def run(self, count=1):
        for _ in range(count):
            self.cost.gradient_descent(self.linear_subplot.points, self.learning_rate)
            print('Update iterator: {} -> {}'.format(self.iter, self.cost))

            self.linear_subplot.update_linear(self.cost)
            self.linear_subplot.update()

            self.cost1_subplot.update_w1(self.cost, self.linear_subplot.points)
            self.cost1_subplot.update()

            self.cost2_subplot.update_w2(self.cost, self.linear_subplot.points)
            self.cost2_subplot.update()

            self.gradient_subplot.update_gradient(self.cost1_subplot.points, self.cost2_subplot.points, self.linear_subplot.points)
            self.gradient_subplot.update_w1_w2(self.cost)
            self.gradient_subplot.update()

            self.add_count()

    def start_timer(self, millisec=1000):
        self.timer = self.fig.canvas.new_timer(interval=millisec,
                                               callbacks=[(self.timer_cb, [self], {})])
        self.timer.start()

    def init_raws(self, elem_count, i):
        self.linear_subplot.points.random_linear(int(elem_count), int(i))
        #self.gradient_subplot.init_first(self.linear_subplot.points)

    def draw(self):
        self.fig.canvas.draw()

    @staticmethod
    def show():
        plt.show()


def main(opts, args):
    plot = Plot(W1_INIT, W2_INIT, LR_INIT)
    plot.init_raws(opts.count, opts.random)
    plot.start_timer(TIMER_MILLISEC)
    plot.show()


if __name__ == '__main__':
    parser = optparse.OptionParser()
    parser.add_option('-r', '--random',
                      dest='random',
                      metavar='{num}',
                      default=0,
                      help='Random method number.')
    parser.add_option('-c', '--count',
                      dest='count',
                      metavar='{num}',
                      default=100,
                      help='Sample element count.')
    options, args = parser.parse_args(sys.argv)
    main(options, args)

See also

Favorite site

References


  1. Visualizing_the_gradient_descent_method.pdf