Source code for allensdk.internal.model.biophysical.passive_fitting.output_grabber

import os, sys, threading, time

[docs]class OutputGrabber(object): """ Class used to grab standard output or another stream. """ escape_char = "\b" def __init__(self, stream=None, threaded=False): self.origstream = stream self.threaded = threaded if self.origstream is None: self.origstream = sys.stdout self.origstreamfd = self.origstream.fileno() self.capturedtext = "" # Create a pipe so the stream can be captured: self.pipe_out, self.pipe_in = os.pipe()
[docs] def start(self): """ Start capturing the stream data. """ self.capturedtext = "" # Save a copy of the stream: self.streamfd = os.dup(self.origstreamfd) # Replace the Original stream with our write pipe os.dup2(self.pipe_in, self.origstreamfd) if self.threaded: # Start thread that will read the stream: self.workerThread = threading.Thread(target=self.readOutput) self.workerThread.start() # Make sure that the thread is running and os.read is executed: time.sleep(0.01)
[docs] def stop(self): """ Stop capturing the stream data and save the text in `capturedtext`. """ # Flush the stream to make sure all our data goes in before # the escape character. self.origstream.flush() # Print the escape character to make the readOutput method stop: self.origstream.write(self.escape_char) self.origstream.flush() if self.threaded: # wait until the thread finishes so we are sure that # we have until the last character: self.workerThread.join() else: self.readOutput() # Close the pipe: os.close(self.pipe_out) # Restore the original stream: os.dup2(self.streamfd, self.origstreamfd)
[docs] def readOutput(self): """ Read the stream data (one byte at a time) and save the text in `capturedtext`. """ while True: data = os.read(self.pipe_out, 1) # Read One Byte Only if self.escape_char in data: break if not data: break self.capturedtext += data