Source code for allensdk.internal.brain_observatory.frame_stream

import subprocess as sp
import numpy as np
import logging
import sys, os
from collections import deque
import scipy.misc
import traceback
import signal

[docs]class FrameInputStream( object ): def __init__(self, movie_path, num_frames=None, block_size=1, cache_frames=False, process_frame_cb=None): self.movie_path = movie_path self.num_frames = num_frames self.block_size = block_size self.cache_frames = cache_frames self.process_frame_cb = process_frame_cb if process_frame_cb else lambda f: f[:,:,0].copy() self.frames_read = 0 self.frame_cache = []
[docs] def open(self): self.frames_read = 0
[docs] def close(self): logging.debug("Read total frames %d", self.frames_read) if self.num_frames is not None and self.frames_read != self.num_frames: raise IOError("read incorrect number of frames: %d vs %d", self.frames_read, self.num_frames)
def _error(self): pass def _process_frame(self, frame): return self.process_frame_cb(frame) def _read_iter(self): pass def __enter__(self): return self def __iter__(self): # if we're caching frames and the cache exists, return it if self.cache_frames and self.frame_cache: n = self.num_frames if self.num_frames is not None else len(self.frame_cache) for i in range(n): yield self.frame_cache[i] else: self.open() self.frame_cache = [] for frame in self._read_iter(): self.frame_cache.append(self._process_frame(frame)) self.frames_read += 1 if (self.frames_read % 100) == 0: logging.debug("Read frames %d", self.frames_read) if self.block_size is None: continue if self.block_size == 1: yield self.frame_cache[-1] elif (self.frames_read % self.block_size) == 0: for i in range(-self.block_size,0): yield self.frame_cache[i] if not self.cache_frames: self.frame_cache = [] self.close() for frame in self.frame_cache: yield frame if not self.cache_frames: self.frame_cache = [] def __exit__(self, exc_type, exc_value, tb): if exc_value: traceback.print_tb(tb) self._error() raise exc_value
[docs] def create_images(self, output_directory, image_type): for i, frame in enumerate(self): file_name = os.path.join(output_directory, "input_frame-%06d." % i + image_type) scipy.misc.imsave(file_name, frame)
[docs]class FfmpegInputStream( FrameInputStream ): def __init__(self, movie_path, frame_shape, ffmpeg_bin='ffmpeg', num_frames=None, block_size=1, cache_frames=False, process_frame_cb=None): super(FfmpegInputStream, self).__init__(movie_path=movie_path, num_frames=num_frames, block_size=block_size, cache_frames=cache_frames, process_frame_cb=process_frame_cb) self.ffmpeg_bin = ffmpeg_bin self.frame_shape = frame_shape self.pipe = None
[docs] def open(self): super(FfmpegInputStream, self).open() if self.pipe: raise IOError("pipe is open already") command = [ self.ffmpeg_bin, '-i', self.movie_path, '-f', 'image2pipe', '-pix_fmt', 'rgb24', '-vcodec', 'rawvideo'] if self.num_frames is not None: command += ['-vframes', str(self.num_frames)] command += ['-'] frame_size = np.prod(self.frame_shape) self.pipe = sp.Popen(command, stdout=sp.PIPE, bufsize=0) logging.debug("opened pipe")
[docs] def close(self): if self.pipe is None: raise IOError("pipe is not open") if self.pipe.poll() is None: logging.debug("pipe is still open. terminating.") self.pipe.terminate() super(FfmpegInputStream, self).close() rc = self.pipe.wait() logging.debug("closed input pipe") if rc: raise Exception("input pipe returned with error code %d" % rc) self.pipe = None
def _process_frame(self, frame): frame = np.fromstring(frame, dtype=np.uint8) frame.resize(self.frame_shape) return self.process_frame_cb(frame) def _read_iter(self): if self.pipe is None: raise IOError("pipe is not open") frame_size = np.prod(self.frame_shape) while self.pipe.poll() is None or self.pipe.stdout: self.pipe.stdout.flush() input_frame = self.pipe.stdout.read(frame_size) bytes_read = len(input_frame) if bytes_read == 0: break if bytes_read != frame_size: raise IOError("pipe read wrong number of bytes (%d vs %d)" % (frame_size, bytes_read)) yield input_frame def _error(self): if self.pipe: self.pipe.kill() self.pipe = None
[docs] def create_images(self, output_directory, image_type): cmd = self.ffmpeg_bin + ' -i ' + self.movie_path + ' ' + output_directory + '/input_frame-%06d.' + image_type logging.debug("Calling ffmpeg with the command:") logging.debug("\t"+cmd) retcode = sp.call(cmd, shell=True) if retcode != 0: logging.debug(retcode) raise Exception('Something went wrong with image creation')
[docs]class CvInputStream( object): def __init__(self, movie_path, num_frames=None, block_size=1, cache_frames=False): super(FfmpegInputStream, self).__init__(movie_path=movie_path, num_frames=num_frames, block_size=block_size, cache_frames=cache_frames) self.cap = None
[docs] def open(self): super(FfmpegInputStream, self).open() if self.cap: raise IOError("capture is open already") self.frames_read = 0 import cv2 self.cap = cv2.VideoCapture(self.movie_path) logging.debug("opened capture")
[docs] def close(self): if self.cap is None: return self.cap.release() self.cap = None super(FfmpegInputStream, self).close()
def _read_iter(self): if self.cap is None: raise IOError("capture is not open") while self.cap.isOpened(): ret, frame = self.cap.read() yield frame if self.frames_read == self.num_frames: break def _error(self): self.cap.release() self.cap = None
[docs]class FrameOutputStream( object ): def __init__(self, block_size=1): self.frames_processed = 0 self.block_frames = [] self.block_size = block_size
[docs] def open(self, movie_path): self.frames_processed = 0 self.block_frames = [] self.movie_path = movie_path
def _write_frames(self, frames): raise NotImplementedError()
[docs] def write(self, frame): self.block_frames.append(frame) if len(self.block_frames) == self.block_size: self._write_frames(self.block_frames) self.frames_processed += len(self.block_frames) self.block_frames = []
[docs] def close(self): if self.block_frames: self._write_frames(self.block_frames) self.frames_processed += len(self.block_frames) self.block_frames = [] logging.debug("wrote %d frames", self.frames_processed)
def __enter__(self): return self def __exit__(self, exc_type, exc_value, tb): if exc_value: raise exc_value self.close()
[docs]class ImageOutputStream( FrameOutputStream ): def _write_frames(frames): for i, frame in enumerate(frames): file_name = self.movie_path % i scipy.misc.imsave(file_name, frame)
[docs]class FfmpegOutputStream( FrameOutputStream ): def __init__(self, frame_shape, ffmpeg_bin='ffmpeg', block_size=1): super(FfmpegOutputStream, self).__init__(block_size) self.ffmpeg_bin = ffmpeg_bin self.frame_shape = frame_shape self.pipe = None self.stopped = False
[docs] def open(self, movie_path): super(FfmpegOutputStream, self).open(movie_path) if self.pipe: logging.warning("pipe is already open!") return command = [ self.ffmpeg_bin, '-y', '-f', 'rawvideo', '-vcodec', 'rawvideo', '-s', '%dx%d' % (self.frame_shape[1], self.frame_shape[0]), '-pix_fmt', 'rgb24', '-r', '30', '-i', '-', '-an', '-vcodec', 'libx264', self.movie_path] self.pipe = sp.Popen(command, stdin=sp.PIPE) os.kill(self.pipe.pid, signal.SIGSTOP) self.stopped = True logging.debug("opened output pipe")
def _write_frames(self, frames): if self.pipe is None: self.open(self.movie_path) if self.stopped: os.kill(self.pipe.pid, signal.SIGCONT) self.stopped = False for frame in frames: sys.stdout.flush() self.pipe.stdin.write( frame.tostring() )
[docs] def close(self): super(FfmpegOutputStream, self).close() if self.pipe is None: raise IOError("pipe is closed") self.pipe.stdin.close() rc = self.pipe.wait() if rc: raise Exception("output pipe returned with error code %d" % rc) logging.debug("closed output pipe") self.pipe = None
def __exit__(self, exc_type, exc_value, tb): if exc_value: self.pipe.kill() raise exc_value self.close()