CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-imutils

A series of convenience functions to make basic image processing functions such as translation, rotation, resizing, skeletonization, displaying Matplotlib images, sorting contours, detecting edges, and much more easier with OpenCV and both Python 2.7 and Python 3.

91

1.33x
Overview
Eval results
Files

video-processing.mddocs/

Video Processing

Threaded video capture classes and utilities for efficient real-time video processing. These classes provide consistent APIs across different video sources (webcams, files, Pi cameras) and include performance monitoring capabilities.

Capabilities

Unified Video Stream Interface

The VideoStream class provides a unified interface that automatically selects the appropriate camera type based on the platform and parameters.

class VideoStream:
    def __init__(self, src=0, usePiCamera=False, resolution=(320, 240), framerate=32, **kwargs):
        """
        Unified video stream interface.
        
        Args:
            src (int): Camera index for webcam (default: 0)
            usePiCamera (bool): Use Raspberry Pi camera (default: False)
            resolution (tuple): Camera resolution (width, height) (default: (320, 240))
            framerate (int): Target framerate (default: 32)
            **kwargs: Additional camera-specific parameters
        """
    
    def start(self):
        """
        Start the video stream thread.
        
        Returns:
            VideoStream: Self for method chaining
        """
    
    def update(self):
        """Update the video stream (grab next frame)."""
    
    def read(self):
        """
        Read the current frame.
        
        Returns:
            np.ndarray: Current frame
        """
    
    def stop(self):
        """Stop the video stream and release resources."""

Usage Example:

from imutils.video import VideoStream
import cv2
import time

# Initialize video stream (automatically detects platform)
vs = VideoStream(src=0).start()
time.sleep(2.0)  # Allow camera to warm up

while True:
    frame = vs.read()
    
    if frame is None:
        break
    
    # Process frame here
    cv2.imshow("Frame", frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

vs.stop()
cv2.destroyAllWindows()

Webcam Video Stream

Threaded webcam video capture using OpenCV's VideoCapture.

class WebcamVideoStream:
    def __init__(self, src=0, name="WebcamVideoStream"):
        """
        Threaded webcam video stream.
        
        Args:
            src (int): Camera index (default: 0)
            name (str): Thread name (default: "WebcamVideoStream")
        """
    
    def start(self):
        """
        Start the threaded video capture.
        
        Returns:
            WebcamVideoStream: Self for method chaining
        """
    
    def update(self):
        """Internal method that continuously reads frames."""
    
    def read(self):
        """
        Get the most recently captured frame.
        
        Returns:
            np.ndarray: Current frame
        """
    
    def stop(self):
        """Stop the video capture thread."""

Usage Example:

from imutils.video import WebcamVideoStream
import cv2
import time

# Start webcam stream
webcam = WebcamVideoStream(src=0).start()
time.sleep(2.0)

while True:
    frame = webcam.read()
    
    # Show frame
    cv2.imshow("Webcam", frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

webcam.stop()
cv2.destroyAllWindows()

File Video Stream

Threaded video file reader with frame queuing for smooth playback.

class FileVideoStream:
    def __init__(self, path, transform=None, queue_size=128):
        """
        Threaded video file reader.
        
        Args:
            path (str): Path to video file
            transform (callable, optional): Function to transform frames
            queue_size (int): Maximum frame queue size (default: 128)
        """
    
    def start(self):
        """
        Start the threaded video file reading.
        
        Returns:
            FileVideoStream: Self for method chaining
        """
    
    def update(self):
        """Internal method that continuously reads frames into queue."""
    
    def read(self):
        """
        Get next frame from the queue.
        
        Returns:
            np.ndarray: Next frame from queue
        """
    
    def running(self):
        """
        Check if stream is still running.
        
        Returns:
            bool: True if stream has frames or is not stopped
        """
    
    def more(self):
        """
        Check if more frames are available in queue.
        
        Returns:
            bool: True if frames are available in queue
        """
    
    def stop(self):
        """Stop the video stream and wait for thread to join."""

Usage Example:

from imutils.video import FileVideoStream
import cv2
import time

# Start file stream
fvs = FileVideoStream("example_video.mp4").start()
time.sleep(1.0)

while fvs.more():
    frame = fvs.read()
    
    if frame is None:
        break
    
    cv2.imshow("Video", frame)
    
    if cv2.waitKey(25) & 0xFF == ord('q'):
        break

fvs.stop()
cv2.destroyAllWindows()

Raspberry Pi Camera Stream

Threaded Raspberry Pi camera capture using the picamera library.

class PiVideoStream:
    def __init__(self, resolution=(320, 240), framerate=32, **kwargs):
        """
        Threaded Raspberry Pi camera stream.
        
        Args:
            resolution (tuple): Camera resolution (width, height) (default: (320, 240))
            framerate (int): Target framerate (default: 32)
            **kwargs: Additional PiCamera parameters
        """
    
    def start(self):
        """
        Start the threaded Pi camera capture.
        
        Returns:
            PiVideoStream: Self for method chaining
        """
    
    def update(self):
        """Internal method that continuously captures frames."""
    
    def read(self):
        """
        Get the most recently captured frame.
        
        Returns:
            np.ndarray: Current frame
        """
    
    def stop(self):
        """Stop the camera capture and release resources."""

Usage Example:

from imutils.video import PiVideoStream
import cv2
import time

# Start Pi camera stream (requires picamera library)
pi_camera = PiVideoStream(resolution=(640, 480), framerate=30).start()
time.sleep(2.0)

while True:
    frame = pi_camera.read()
    
    cv2.imshow("Pi Camera", frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

pi_camera.stop()
cv2.destroyAllWindows()

FPS Monitoring

Frame rate monitoring utility for performance measurement.

class FPS:
    def __init__(self):
        """Initialize FPS counter."""
    
    def start(self):
        """
        Start the FPS timer.
        
        Returns:
            FPS: Self for method chaining
        """
    
    def stop(self):
        """Stop the FPS timer."""
    
    def update(self):
        """Increment the frame counter."""
    
    def elapsed(self):
        """
        Get elapsed time between start and stop.
        
        Returns:
            float: Elapsed time in seconds
        """
    
    def fps(self):
        """
        Calculate and return FPS.
        
        Returns:
            float: Frames per second
        """

Usage Example:

from imutils.video import VideoStream, FPS
import cv2
import time

# Initialize video stream and FPS counter
vs = VideoStream(src=0).start()
fps = FPS().start()
time.sleep(2.0)

while True:
    frame = vs.read()
    
    if frame is None:
        break
    
    # Process frame (add your processing here)
    processed = cv2.flip(frame, 1)
    
    cv2.imshow("Frame", processed)
    fps.update()
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Cleanup and show results
fps.stop()
vs.stop()
cv2.destroyAllWindows()

print(f"Elapsed time: {fps.elapsed():.2f} seconds")
print(f"Approx. FPS: {fps.fps():.2f}")

Frame Counting Utilities

Utilities for counting frames in video files.

def count_frames(path, override=False):
    """
    Count total frames in video file.
    
    Args:
        path (str): Path to video file
        override (bool): Force manual counting (default: False)
    
    Returns:
        int: Total number of frames
        
    Note:
        First attempts to use OpenCV's CAP_PROP_FRAME_COUNT property.
        If override=True or property is unreliable, uses manual counting.
    """

def count_frames_manual(video):
    """
    Manually count frames by iterating through video.
    
    Args:
        video (cv2.VideoCapture): OpenCV VideoCapture object
    
    Returns:
        int: Total number of frames
    """

Usage Example:

from imutils.video import count_frames
import cv2

# Count frames using OpenCV property (fast)
total_frames = count_frames("example_video.mp4")
print(f"Video has {total_frames} frames")

# Force manual counting (slower but more reliable)
total_frames_manual = count_frames("example_video.mp4", override=True)
print(f"Manual count: {total_frames_manual} frames")

# Manual counting with VideoCapture object
cap = cv2.VideoCapture("example_video.mp4")
frame_count = count_frames_manual(cap)
cap.release()
print(f"Frame count: {frame_count}")

Complete Video Processing Example

Here's a comprehensive example combining multiple video processing capabilities:

from imutils.video import VideoStream, FPS
import imutils
import cv2
import time

def process_video_stream():
    # Initialize video stream with higher resolution
    print("Starting video stream...")
    vs = VideoStream(src=0, resolution=(640, 480), framerate=30).start()
    fps = FPS().start()
    time.sleep(2.0)  # Allow camera to warm up
    
    while True:
        # Read frame from stream
        frame = vs.read()
        
        if frame is None:
            print("Failed to read frame")
            break
        
        # Resize frame for faster processing
        frame = imutils.resize(frame, width=400)
        
        # Convert to grayscale for edge detection
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        edges = imutils.auto_canny(gray)
        
        # Stack original and edge images
        output = np.hstack([frame, cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)])
        
        # Display the output
        cv2.imshow("Original vs Edges", output)
        
        # Update FPS counter
        fps.update()
        
        # Break on 'q' key press
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    # Cleanup
    fps.stop()
    vs.stop()
    cv2.destroyAllWindows()
    
    # Display performance info
    print(f"Elapsed time: {fps.elapsed():.2f} seconds")
    print(f"Approx. FPS: {fps.fps():.2f}")

if __name__ == "__main__":
    process_video_stream()

Install with Tessl CLI

npx tessl i tessl/pypi-imutils

docs

core-processing.md

face-analysis.md

feature-detection.md

index.md

utilities.md

video-processing.md

tile.json