Motion detection in a video footage has always been an interesting topic among the Machine Learning enthusiasts. It has many purposes. Motion detection can be used for automatic video analysis in video surveillance systems. It can be used to capture movements on a wildlife camera. Another application is to detect number of vehicles passing by on a highway.
In this article, we will build a fully working motion detector. It will not only detect motion, but will also be able to track motion of different objects. By this process, we will learn a lot about image processing techniques in OpenCV. By the end, you will have all the codes required to build your own motion detector.
Problem Statement
We will build a Computer Vision model for motion detection. It will be able to detect moving people in a video footage. Each person detected will have a different colored bounding box. The motion’s path will be shown with the same color as that of the bounding box.
We will achieve motion detection by comparing each video frame to the previous one. The spots which are different from the last frame are detected. For the predicting motion’s path, Kalman Filter is used. The final result will look like this:
Coding Our Motion Detector with Python
Firstly, we look at and understand the code for the motion detection. Then, we write scripts for tracking the path of moving objects. We will use the scripts and draw path of the objects on the frame. So, let’s begin-
1. Reading the video from location
We’ve used cv2.VideoCapture to capture the video from a file location. You can change it to your own location.
import cv2
import imutils
import numpy as np
cap = cv2.VideoCapture('video.mp4')
2. Preparing the frames
while cap.isOpened():
# Capture frame-by-frame
ret, frame = cap.read()
if ret is None:
break
try:
frame = imutils.resize(frame, width=750)
except:
break
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (21, 21), 0)
In this block of code, we capture each frame of the video and resize it. Then, we convert it to gray scale image. Now the frame’s pixels have values in the range of 0-255. Later, the frame is blurred to smooth it out a little.
3. Detecting motion in the frames
We begin with setting the first frame if it is None. If the number of frames exceed 5, we reset the delay counter and set the first frame with next frame.Motion is detected by comparing first frame with the next frame. The difference between frames is calculated using absdiff method.
if first_frame is None:
first_frame = gray
next_frame = gray
delay_counter += 1
if delay_counter > 5:
delay_counter = 0
first_frame = next_frame
frame_delta = cv2.absdiff(first_frame, next_frame)
thresh = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1]
thresh = cv2.dilate(thresh, None, iterations=2)
cnts, _ = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
threshold() method is used to make the pixels white whose values are greater than threshold value 25. The frames are dilated and we finally find the contours where motion is detected.
This is what it looks like-
5. Finding centroids of the objects in motion
For each contour with contour area greater than 1000, we find its centroid and store it. Simultaneously, we also store the coordinates of bounding box the centroid belongs to.
centers = []
coords = []
for c in cnts:
try:
(x, y, w, h) = cv2.boundingRect(c)
if cv2.contourArea(c) > 1000:
b = np.array([[x + w // 2], [y + h // 2]])
centers.append(np.round(b))
coords.append([x, y, x + w, y + h])
except ZeroDivisionError:
pass
We have successfully detected bounding boxes and their centroids for the motion detected in the video footage. So, the first half of our task is completed. Now, we proceed towards tracking the objects in motion.
Kalman Filter
Kalman filter algorithm estimates the states of a system given the observations or measurements. It is a useful tool for a variety of different applications including object tracking and autonomous navigation systems, economics prediction, etc.
The Kalman filter is essentially a set of mathematical equations that implement a predictor-corrector type estimator that is optimal in the sense that it minimizes the estimated error covariance when some presumed conditions are met [1].
Mathematical Equation [1]:
The Kalman filter addresses the general problem of trying to estimate the state x ∈ ℜn of a discrete-time controlled process that is governed by the linear stochastic difference equation
xk = Axk-1 + Buk + wk-1
with a measurement m y ∈ℜ that is
yk = Hxk +vk
The random variables wk and kv represent the process and measurement noise respectively. They are assumed to be independent of each other, white, and with normal probability distributions p(w) ≈ N ,0( Q) (3) p(v) ≈ N ,0( R)
The more detailed working of Kalman filter can be found in [1].
Tracking Path of Moving Object
We provide you with two python scripts named tracker.py and kalman_filter.py. The scripts contain codes to track the object with its centroid. It makes use of Kalman filter algorithm. The codes are explained within the scripts with the help of comments.
tracker.py
# Import python libraries
import numpy as np
from kalman_filter import KalmanFilter
from common import dprint
from scipy.optimize import linear_sum_assignment
class Track(object):
"""Track class for every object to be tracked
Attributes:
None
"""
def __init__(self, prediction, trackIdCount):
"""Initialize variables used by Track class
Args:
prediction: predicted centroids of object to be tracked
trackIdCount: identification of each track object
Return:
None
"""
self.track_id = trackIdCount # identification of each track object
self.KF = KalmanFilter() # KF instance to track this object
self.prediction = np.asarray(prediction) # predicted centroids (x,y)
self.skipped_frames = 0 # number of frames skipped undetected
self.trace = [] # trace path
class Tracker(object):
"""Tracker class that updates track vectors of object tracked
Attributes:
None
"""
def __init__(self, dist_thresh, max_frames_to_skip, max_trace_length,
trackIdCount):
"""Initialize variable used by Tracker class
Args:
dist_thresh: distance threshold. When exceeds the threshold,
track will be deleted and new track is created
max_frames_to_skip: maximum allowed frames to be skipped for
the track object undetected
max_trace_lenght: trace path history length
trackIdCount: identification of each track object
Return:
None
"""
self.dist_thresh = dist_thresh
self.max_frames_to_skip = max_frames_to_skip
self.max_trace_length = max_trace_length
self.tracks = []
self.trackIdCount = trackIdCount
def Update(self, detections):
"""Update tracks vector using following steps:
- Create tracks if no tracks vector found
- Calculate cost using sum of square distance
between predicted vs detected centroids
- Using Hungarian Algorithm assign the correct
detected measurements to predicted tracks
https://en.wikipedia.org/wiki/Hungarian_algorithm
- Identify tracks with no assignment, if any
- If tracks are not detected for long time, remove them
- Now look for un_assigned detects
- Start new tracks
- Update KalmanFilter state, lastResults and tracks trace
Args:
detections: detected centroids of object to be tracked
Return:
None
"""
# Create tracks if no tracks vector found
if (len(self.tracks) == 0):
for i in range(len(detections)):
track = Track(detections[i], self.trackIdCount)
self.trackIdCount += 1
self.tracks.append(track)
# Calculate cost using sum of square distance between
# predicted vs detected centroids
N = len(self.tracks)
M = len(detections)
cost = np.zeros(shape=(N, M)) # Cost matrix
for i in range(len(self.tracks)):
for j in range(len(detections)):
try:
diff = self.tracks[i].prediction - detections[j]
distance = np.sqrt(diff[0][0]*diff[0][0] +
diff[1][0]*diff[1][0])
cost[i][j] = distance
except:
pass
# Let's average the squared ERROR
cost = (0.5) * cost
# Using Hungarian Algorithm assign the correct detected measurements
# to predicted tracks
assignment = []
for _ in range(N):
assignment.append(-1)
row_ind, col_ind = linear_sum_assignment(cost)
for i in range(len(row_ind)):
assignment[row_ind[i]] = col_ind[i]
# Identify tracks with no assignment, if any
un_assigned_tracks = []
for i in range(len(assignment)):
if (assignment[i] != -1):
# check for cost distance threshold.
# If cost is very high then un_assign (delete) the track
if (cost[i][assignment[i]] > self.dist_thresh):
assignment[i] = -1
un_assigned_tracks.append(i)
pass
else:
self.tracks[i].skipped_frames += 1
# If tracks are not detected for long time, remove them
del_tracks = []
for i in range(len(self.tracks)):
if (self.tracks[i].skipped_frames > self.max_frames_to_skip):
del_tracks.append(i)
if len(del_tracks) > 0: # only when skipped frame exceeds max
for id in del_tracks:
if id < len(self.tracks):
del self.tracks[id]
del assignment[id]
else:
dprint("ERROR: id is greater than length of tracks")
# Now look for un_assigned detects
un_assigned_detects = []
for i in range(len(detections)):
if i not in assignment:
un_assigned_detects.append(i)
# Start new tracks
if(len(un_assigned_detects) != 0):
for i in range(len(un_assigned_detects)):
track = Track(detections[un_assigned_detects[i]],
self.trackIdCount)
self.trackIdCount += 1
self.tracks.append(track)
# Update KalmanFilter state, lastResults and tracks trace
for i in range(len(assignment)):
self.tracks[i].KF.predict()
if(assignment[i] != -1):
self.tracks[i].skipped_frames = 0
self.tracks[i].prediction = self.tracks[i].KF.correct(
detections[assignment[i]], 1)
else:
self.tracks[i].prediction = self.tracks[i].KF.correct(
np.array([[0], [0]]), 0)
if(len(self.tracks[i].trace) > self.max_trace_length):
for j in range(len(self.tracks[i].trace) -
self.max_trace_length):
del self.tracks[i].trace[j]
self.tracks[i].trace.append(self.tracks[i].prediction)
self.tracks[i].KF.lastResult = self.tracks[i].prediction
kalman_filter.py
import numpy as np
class KalmanFilter(object):
"""Kalman Filter class keeps track of the estimated state of
the system and the variance or uncertainty of the estimate.
Predict and Correct methods implement the functionality
Reference: https://en.wikipedia.org/wiki/Kalman_filter
Attributes: None
"""
def __init__(self):
"""Initialize variable used by Kalman Filter class
Args:
None
Return:
None
"""
self.dt = 0.005 # delta time
self.A = np.array([[1, 0], [0, 1]]) # matrix in observation equations
self.u = np.zeros((2, 1)) # previous state vector
# (x,y) tracking object center
self.b = np.array([[0], [255]]) # vector of observations
self.P = np.diag((3.0, 3.0)) # covariance matrix
self.F = np.array([[1.0, self.dt], [0.0, 1.0]]) # state transition mat
self.Q = np.eye(self.u.shape[0]) # process noise matrix
self.R = np.eye(self.b.shape[0]) # observation noise matrix
self.lastResult = np.array([[0], [255]])
def predict(self):
"""Predict state vector u and variance of uncertainty P (covariance).
where,
u: previous state vector
P: previous covariance matrix
F: state transition matrix
Q: process noise matrix
Equations:
u'_{k|k-1} = Fu'_{k-1|k-1}
P_{k|k-1} = FP_{k-1|k-1} F.T + Q
where,
F.T is F transpose
Args:
None
Return:
vector of predicted state estimate
"""
# Predicted state estimate
self.u = np.round(np.dot(self.F, self.u))
# Predicted estimate covariance
self.P = np.dot(self.F, np.dot(self.P, self.F.T)) + self.Q
self.lastResult = self.u # same last predicted result
return self.u
def correct(self, b, flag):
"""Correct or update state vector u and variance of uncertainty P (covariance).
where,
u: predicted state vector u
A: matrix in observation equations
b: vector of observations
P: predicted covariance matrix
Q: process noise matrix
R: observation noise matrix
Equations:
C = AP_{k|k-1} A.T + R
K_{k} = P_{k|k-1} A.T(C.Inv)
u'_{k|k} = u'_{k|k-1} + K_{k}(b_{k} - Au'_{k|k-1})
P_{k|k} = P_{k|k-1} - K_{k}(CK.T)
where,
A.T is A transpose
C.Inv is C inverse
Args:
b: vector of observations
flag: if "true" prediction result will be updated else detection
Return:
predicted state vector u
"""
if not flag: # update using prediction
self.b = self.lastResult
else: # update using detection
self.b = b
C = np.dot(self.A, np.dot(self.P, self.A.T)) + self.R
K = np.dot(self.P, np.dot(self.A.T, np.linalg.inv(C)))
self.u = np.round(self.u + np.dot(K, (self.b - np.dot(self.A,
self.u))))
self.P = self.P - np.dot(K, np.dot(C, K.T))
self.lastResult = self.u
return self.u
Now, in the final part of the article, we will first draw tracking lines. Then, we draw bounding boxes for each object. The color of bounding box and tracking lines will be same and unique for each object.
We begin with importing Tracker class from the tracker.py script. We write the following code before the while loop i.e. before while cap.isOpened():-
from tracker import Tracker
traces = []
tracker = Tracker(200, 30, 20, 100)
# Variables initialization
skip_frame_count = 0
track_colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0),
(0, 255, 255), (255, 0, 255), (255, 127, 255),
(127, 0, 255), (127, 0, 127)]
After adding this code, we proceed to write code within the while loop i.e. within while cap.isOpened() loop.
# If centroids are detected then track them
if len(centers) > 0:
# Track object using Kalman Filter
tracker.Update(centers)
# For identified object tracks draw tracking line
# Use various colors to indicate different track_id
colors = {}
for i in range(len(tracker.tracks)):
if len(tracker.tracks[i].trace) > 1:
for j in range(len(tracker.tracks[i].trace) - 1):
# Draw trace line
x1 = tracker.tracks[i].trace[j][0][0]
y1 = tracker.tracks[i].trace[j][1][0]
x2 = tracker.tracks[i].trace[j + 1][0][0]
y2 = tracker.tracks[i].trace[j + 1][1][0]
clr = tracker.tracks[i].track_id % 9
traces.append([x1,y1,x2,y2,clr])
for trace in traces:
frame = cv2.line(frame, (int(trace[0]), int(trace[1])), (int(trace[2]), int(trace[3])),
track_colors[trace[4]], 2)
if clr not in colors.keys():
colors[clr] = [x2, y2]
try:
c = []
for z in range(len(coords)):
minv = np.linalg.norm(np.array(centers[z]) - np.array(list(colors.values())[0]))
v = list(colors.keys())[0]
for val in range(1, len(colors.keys())):
dist = np.linalg.norm(np.array(centers[z]) - np.array(list(colors.values())[val]))
if dist < minv:
minv = dist
v = list(colors.keys())[val]
if v in c:
c.append(v + 1)
else:
c.append(v)
for z in range(len(coords)):
cv2.rectangle(frame, (coords[z][0], coords[z][1]), (coords[z][2], coords[z][3]),
track_colors[c[z]], 2)
except:
pass
The above code block successfully draws tracking paths and bounding boxes on the frame. The frame can be shown using imshow() method.
Conclusion
In this article, we’ve examined how motion detection works in a video. We also worked with image processing techniques like blurring, diluting, finding differencesbetween frames. In addition, we saw how Kalman filter can be used to track the moving objects. At last, we learned about drawing bounding boxes and lines on the image with unique colors.
References
[1]. Mohamed LAARAIEDH. “Implementation of Kalman Filter with Python Language“. IETR Labs, University of Rennes 1. https://arxiv.org/ftp/arxiv/papers/1204/1204.0375.pdf