Source code for allensdk.internal.brain_observatory.frame_stream
import subprocess as sp
import numpy as np
import logging
import sys, os
from collections import deque
import scipy.misc
import traceback
import signal
[docs]class FrameInputStream( object ):
def __init__(self, movie_path, num_frames=None, block_size=1, cache_frames=False, process_frame_cb=None):
self.movie_path = movie_path
self.num_frames = num_frames
self.block_size = block_size
self.cache_frames = cache_frames
self.process_frame_cb = process_frame_cb if process_frame_cb else lambda f: f[:,:,0].copy()
self.frames_read = 0
self.frame_cache = []
[docs] def close(self):
logging.debug("Read total frames %d", self.frames_read)
if self.num_frames is not None and self.frames_read != self.num_frames:
raise IOError("read incorrect number of frames: %d vs %d", self.frames_read, self.num_frames)
def _error(self):
pass
def _process_frame(self, frame):
return self.process_frame_cb(frame)
def _read_iter(self):
pass
def __enter__(self):
return self
def __iter__(self):
# if we're caching frames and the cache exists, return it
if self.cache_frames and self.frame_cache:
n = self.num_frames if self.num_frames is not None else len(self.frame_cache)
for i in range(n):
yield self.frame_cache[i]
else:
self.open()
self.frame_cache = []
for frame in self._read_iter():
self.frame_cache.append(self._process_frame(frame))
self.frames_read += 1
if (self.frames_read % 100) == 0:
logging.debug("Read frames %d", self.frames_read)
if self.block_size is None:
continue
if self.block_size == 1:
yield self.frame_cache[-1]
elif (self.frames_read % self.block_size) == 0:
for i in range(-self.block_size,0):
yield self.frame_cache[i]
if not self.cache_frames:
self.frame_cache = []
self.close()
for frame in self.frame_cache:
yield frame
if not self.cache_frames:
self.frame_cache = []
def __exit__(self, exc_type, exc_value, tb):
if exc_value:
traceback.print_tb(tb)
self._error()
raise exc_value
[docs] def create_images(self, output_directory, image_type):
for i, frame in enumerate(self):
file_name = os.path.join(output_directory, "input_frame-%06d." % i + image_type)
scipy.misc.imsave(file_name, frame)
[docs]class FfmpegInputStream( FrameInputStream ):
def __init__(self, movie_path, frame_shape, ffmpeg_bin='ffmpeg', num_frames=None, block_size=1, cache_frames=False, process_frame_cb=None):
super(FfmpegInputStream, self).__init__(movie_path=movie_path, num_frames=num_frames, block_size=block_size, cache_frames=cache_frames, process_frame_cb=process_frame_cb)
self.ffmpeg_bin = ffmpeg_bin
self.frame_shape = frame_shape
self.pipe = None
[docs] def open(self):
super(FfmpegInputStream, self).open()
if self.pipe:
raise IOError("pipe is open already")
command = [ self.ffmpeg_bin,
'-i', self.movie_path,
'-f', 'image2pipe',
'-pix_fmt', 'rgb24',
'-vcodec', 'rawvideo']
if self.num_frames is not None:
command += ['-vframes', str(self.num_frames)]
command += ['-']
frame_size = np.prod(self.frame_shape)
self.pipe = sp.Popen(command, stdout=sp.PIPE, bufsize=0)
logging.debug("opened pipe")
[docs] def close(self):
if self.pipe is None:
raise IOError("pipe is not open")
if self.pipe.poll() is None:
logging.debug("pipe is still open. terminating.")
self.pipe.terminate()
super(FfmpegInputStream, self).close()
rc = self.pipe.wait()
logging.debug("closed input pipe")
if rc:
raise Exception("input pipe returned with error code %d" % rc)
self.pipe = None
def _process_frame(self, frame):
frame = np.fromstring(frame, dtype=np.uint8)
frame.resize(self.frame_shape)
return self.process_frame_cb(frame)
def _read_iter(self):
if self.pipe is None:
raise IOError("pipe is not open")
frame_size = np.prod(self.frame_shape)
while self.pipe.poll() is None or self.pipe.stdout:
self.pipe.stdout.flush()
input_frame = self.pipe.stdout.read(frame_size)
bytes_read = len(input_frame)
if bytes_read == 0:
break
if bytes_read != frame_size:
raise IOError("pipe read wrong number of bytes (%d vs %d)" % (frame_size, bytes_read))
yield input_frame
def _error(self):
if self.pipe:
self.pipe.kill()
self.pipe = None
[docs] def create_images(self, output_directory, image_type):
cmd = self.ffmpeg_bin + ' -i ' + self.movie_path + ' ' + output_directory + '/input_frame-%06d.' + image_type
logging.debug("Calling ffmpeg with the command:")
logging.debug("\t"+cmd)
retcode = sp.call(cmd, shell=True)
if retcode != 0:
logging.debug(retcode)
raise Exception('Something went wrong with image creation')
[docs]class CvInputStream( object):
def __init__(self, movie_path, num_frames=None, block_size=1, cache_frames=False):
super(FfmpegInputStream, self).__init__(movie_path=movie_path, num_frames=num_frames, block_size=block_size, cache_frames=cache_frames)
self.cap = None
[docs] def open(self):
super(FfmpegInputStream, self).open()
if self.cap:
raise IOError("capture is open already")
self.frames_read = 0
import cv2
self.cap = cv2.VideoCapture(self.movie_path)
logging.debug("opened capture")
[docs] def close(self):
if self.cap is None:
return
self.cap.release()
self.cap = None
super(FfmpegInputStream, self).close()
def _read_iter(self):
if self.cap is None:
raise IOError("capture is not open")
while self.cap.isOpened():
ret, frame = self.cap.read()
yield frame
if self.frames_read == self.num_frames:
break
def _error(self):
self.cap.release()
self.cap = None
[docs]class FrameOutputStream( object ):
def __init__(self, block_size=1):
self.frames_processed = 0
self.block_frames = []
self.block_size = block_size
[docs] def open(self, movie_path):
self.frames_processed = 0
self.block_frames = []
self.movie_path = movie_path
def _write_frames(self, frames):
raise NotImplementedError()
[docs] def write(self, frame):
self.block_frames.append(frame)
if len(self.block_frames) == self.block_size:
self._write_frames(self.block_frames)
self.frames_processed += len(self.block_frames)
self.block_frames = []
[docs] def close(self):
if self.block_frames:
self._write_frames(self.block_frames)
self.frames_processed += len(self.block_frames)
self.block_frames = []
logging.debug("wrote %d frames", self.frames_processed)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
if exc_value:
raise exc_value
self.close()
[docs]class ImageOutputStream( FrameOutputStream ):
def _write_frames(frames):
for i, frame in enumerate(frames):
file_name = self.movie_path % i
scipy.misc.imsave(file_name, frame)
[docs]class FfmpegOutputStream( FrameOutputStream ):
def __init__(self, frame_shape, ffmpeg_bin='ffmpeg', block_size=1):
super(FfmpegOutputStream, self).__init__(block_size)
self.ffmpeg_bin = ffmpeg_bin
self.frame_shape = frame_shape
self.pipe = None
self.stopped = False
[docs] def open(self, movie_path):
super(FfmpegOutputStream, self).open(movie_path)
if self.pipe:
logging.warning("pipe is already open!")
return
command = [ self.ffmpeg_bin,
'-y',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-s', '%dx%d' % (self.frame_shape[1], self.frame_shape[0]),
'-pix_fmt', 'rgb24',
'-r', '30',
'-i', '-',
'-an',
'-vcodec', 'libx264',
self.movie_path]
self.pipe = sp.Popen(command, stdin=sp.PIPE)
os.kill(self.pipe.pid, signal.SIGSTOP)
self.stopped = True
logging.debug("opened output pipe")
def _write_frames(self, frames):
if self.pipe is None:
self.open(self.movie_path)
if self.stopped:
os.kill(self.pipe.pid, signal.SIGCONT)
self.stopped = False
for frame in frames:
sys.stdout.flush()
self.pipe.stdin.write( frame.tostring() )
[docs] def close(self):
super(FfmpegOutputStream, self).close()
if self.pipe is None:
raise IOError("pipe is closed")
self.pipe.stdin.close()
rc = self.pipe.wait()
if rc:
raise Exception("output pipe returned with error code %d" % rc)
logging.debug("closed output pipe")
self.pipe = None
def __exit__(self, exc_type, exc_value, tb):
if exc_value:
self.pipe.kill()
raise exc_value
self.close()