Optical flow

물체를 추적할 때 사용하는 가장 간단한 방법이 해당 블록(영역)을 다음 프레임에서 어디에 존재하는지 찾는 방법.

calcOpticalFlowPyrLK example

import numpy as np
import cv2

cap = cv2.VideoCapture('tram_20190930_135346.mp4')

# params for ShiTomasi corner detection
feature_params = dict( maxCorners = 100,
                       qualityLevel = 0.3,
                       minDistance = 7,
                       blockSize = 7 )

# Parameters for lucas kanade optical flow
lk_params = dict( winSize  = (15,15),
                  maxLevel = 2,
                  criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# Create some random colors
color = np.random.randint(0,255,(100,3))

# Take first frame and find corners in it
ret, old_frame =
old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
p0 = cv2.goodFeaturesToTrack(old_gray, mask = None, **feature_params)

# Create a mask image for drawing purposes
mask = np.zeros_like(old_frame)

    ret,frame =
    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # calculate optical flow
    p1, st, err = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)

    # Select good points
    good_new = p1[st==1]
    good_old = p0[st==1]

    # draw the tracks
    for i,(new,old) in enumerate(zip(good_new,good_old)):
        a,b = new.ravel()
        c,d = old.ravel()
        mask = cv2.line(mask, (a,b),(c,d), color[i].tolist(), 2)
        frame =,(a,b),5,color[i].tolist(),-1)
    img = cv2.add(frame,mask)

    k = cv2.waitKey(30) & 0xff
    if k == 27:

    # Now update the previous frame and previous points
    old_gray = frame_gray.copy()
    p0 = good_new.reshape(-1,1,2)


calcOpticalFlowFarneback example

#!/usr/bin/env python

example to show optical flow
USAGE: [<video_source>]
 1 - toggle HSV flow visualization
 2 - toggle glitch
    ESC    - exit

# Python 2/3 compatibility
from __future__ import print_function

import numpy as np
import cv2 as cv
import time
import farneback3d

def draw_flow(img: np.ndarray, flow: np.ndarray, step=16):
    h, w = img.shape[:2]
    y, x = np.mgrid[step/2:h:step, step/2:w:step].reshape(2,-1).astype(int)
    fx, fy = flow[y,x].T
    mean_flow = np.mean(flow)
    print('Mean: {}'.format(mean_flow))
    lines = np.vstack([x, y, x+fx, y+fy]).T.reshape(-1, 2, 2)
    lines = np.int32(lines + 0.5)
    vis = cv.cvtColor(img, cv.COLOR_GRAY2BGR)
    cv.polylines(vis, lines, 0, (0, 255, 0))
    for (x1, y1), (_x2, _y2) in lines:, (x1, y1), 1, (0, 255, 0), -1)
    return vis

def draw_hsv(flow):
    h, w = flow.shape[:2]
    fx, fy = flow[:,:,0], flow[:,:,1]
    ang = np.arctan2(fy, fx) + np.pi
    v = np.sqrt(fx*fx+fy*fy)
    hsv = np.zeros((h, w, 3), np.uint8)
    hsv[...,0] = ang*(180/np.pi/2)
    hsv[...,1] = 255
    hsv[...,2] = np.minimum(v*4, 255)
    bgr = cv.cvtColor(hsv, cv.COLOR_HSV2BGR)
    return bgr

def warp_flow(img, flow):
    h, w = flow.shape[:2]
    flow = -flow
    flow[:,:,0] += np.arange(w)
    flow[:,:,1] += np.arange(h)[:,np.newaxis]
    res = cv.remap(img, flow, None, cv.INTER_LINEAR)
    return res

def main():
    import sys
        fn = sys.argv[1]
    except IndexError:
        fn = 0

    crop_x1 = 220
    crop_y1 = 410
    crop_x2 = 905
    crop_y2 = 715

    cam = cv.VideoCapture('/home/wtram/Videos/demo/save-20191127_110845.avi')
    _ret, prev =
    prev = prev[crop_y1:crop_y2, crop_x1:crop_x2, :]
    prevgray = cv.cvtColor(prev, cv.COLOR_BGR2GRAY)
    show_hsv = False
    show_glitch = False
    cur_glitch = prev.copy()

    # optflow = farneback3d.Farneback(
    #     pyr_scale=0.8,  # Scaling between multi-scale pyramid levels
    #     levels=6,  # Number of multi-scale levels
    #     num_iterations=5,  # Iterations on each multi-scale level
    #     winsize=9,  # Window size for Gaussian filtering of polynomial coefficients
    #     poly_n=5,  # Size of window for weighted least-square estimation of polynomial coefficients
    #     poly_sigma=1.2,  # Sigma for Gaussian weighting of least-square estimation of polynomial coefficients
    # )

    while True:
        _ret, img =
        img = img[crop_y1:crop_y2, crop_x1:crop_x2, :]
        gray = cv.cvtColor(img, cv.COLOR_BGR2GRAY)

        start_time = time.time()

        flow = cv.calcOpticalFlowFarneback(prevgray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        # vol0 = prev.astype(np.float32) / 255
        # vol1 = img.astype(np.float32) / 255
        # flow = optflow.calc_flow(vol0, vol1)

        end_time = time.time()
        elapsed_time = end_time - start_time
        print('calcOpticalFlowFarneback time: {:.2f}s'.format(elapsed_time))

        prev = img
        prevgray = gray

        # flow = 255 * flow  # Now scale by 255
        # flow = flow.astype(np.uint8)

        cv.imshow('flow', draw_flow(gray, flow))
        if show_hsv:
            cv.imshow('flow HSV', draw_hsv(flow))
        if show_glitch:
            cur_glitch = warp_flow(cur_glitch, flow)
            cv.imshow('glitch', cur_glitch)

        ch = cv.waitKey(1)
        if ch == 27:
        if ch == ord('1'):
            show_hsv = not show_hsv
            print('HSV flow visualization is', ['off', 'on'][show_hsv])
        if ch == ord('2'):
            show_glitch = not show_glitch
            if show_glitch:
                cur_glitch = img.copy()
            print('glitch is', ['off', 'on'][show_glitch])


if __name__ == '__main__':


A CUDA implementation of the Farneback optical flow algorithm for the calculation of dense volumetric flow fields.
Learning Optical Flow with Convolutional Networks
Optical Flow를 구하기 위해 최초로 딥러닝 접근법을 도입한 논문이다. 비록 real-world 문제에는 적용하기 어려웠지만, 뛰어난 성능을 보였고 end-to-end라는 점에서 주목받았다
FlowNet을 연구한 팀에서 기존의 FlowNetC와 FlowNetS를 결합하고, 학습 데이터의 순서를 조정하는 등 조정을 거쳐 정확도를 높인 모델이다.
Real-world 자료에도 높은 정확도를 보이지만 모델이 복잡하여 계산 시간이 길기 때문에 real-time 적용은 어렵다.
FlowNet2의 각각의 부분들을 제거하는 등 실험을 통해 모델의 필요 없는 부분을 제거, 같은 효율을 내지만 더 간단한 모형으로 대체하여 동일한 성능을 내지만 가볍고 빠르게 optical flow를 구할 수 있는 모델이다.
기존의 딥러닝 접근 방식들은 모두 supervised였는데, 이 논문에서는 unsupervised 방식을 고안했다
현재(2020년 9월 27일) SOTA 모델로, 빠르고 가벼우면서도 최고의 성능을 낸다! 아직 읽어보지는 못했다..

