Stem Docs

Descriptor Reader

Descriptor Reader

Utilities for reading descriptors from local directories and archives. This is mostly done through the DescriptorReader class, which is an iterator for the descriptor data in a series of destinations. For example...

my_descriptors = [
  '/tmp/server-descriptors-2012-03.tar.bz2',
  '/tmp/archived_descriptors/',
]

# prints the contents of all the descriptor files
with DescriptorReader(my_descriptors) as reader:
  for descriptor in reader:
    print descriptor

This ignores files that cannot be processed due to read errors or unparsable content. To be notified of skipped files you can register a listener with register_skip_listener().

The DescriptorReader keeps track of the last modified timestamps for descriptor files that it has read so it can skip unchanged files if run again. This listing of processed files can also be persisted and applied to other DescriptorReader instances. For example, the following prints descriptors as they're changed over the course of a minute, and picks up where it left off if run again...

reader = DescriptorReader(['/tmp/descriptor_data'])

try:
  processed_files = load_processed_files('/tmp/used_descriptors')
  reader.set_processed_files(processed_files)
except: pass # could not load, maybe this is the first run

start_time = time.time()

while (time.time() - start_time) < 60:
  # prints any descriptors that have changed since last checked
  with reader:
    for descriptor in reader:
      print descriptor

  time.sleep(1)

save_processed_files('/tmp/used_descriptors', reader.get_processed_files())

Module Overview:

load_processed_files - Loads a listing of processed files
save_processed_files - Saves a listing of processed files

DescriptorReader - Iterator for descriptor data on the local file system
  |- get_processed_files - provides the listing of files that we've processed
  |- set_processed_files - sets our tracking of the files we have processed
  |- register_read_listener - adds a listener for when files are read
  |- register_skip_listener - adds a listener that's notified of skipped files
  |- start - begins reading descriptor data
  |- stop - stops reading descriptor data
  |- __enter__ / __exit__ - manages the descriptor reader thread in the context
  +- __iter__ - iterates over descriptor data in unread files

FileSkipped - Base exception for a file that was skipped
  |- AlreadyRead - We've already read a file with this last modified timestamp
  |- ParsingFailure - Contents can't be parsed as descriptor data
  |- UnrecognizedType - File extension indicates non-descriptor data
  +- ReadFailed - Wraps an error that was raised while reading the file
     +- FileMissing - File does not exist
exception stem.descriptor.reader.FileSkipped[source]

Bases: exceptions.Exception

Base error when we can't provide descriptor data from a file.

exception stem.descriptor.reader.AlreadyRead(last_modified, last_modified_when_read)[source]

Bases: stem.descriptor.reader.FileSkipped

Already read a file with this 'last modified' timestamp or later.

Parameters:
  • last_modified (int) -- unix timestamp for when the file was last modified
  • last_modified_when_read (int) -- unix timestamp for the modification time when we last read this file
exception stem.descriptor.reader.ParsingFailure(parsing_exception)[source]

Bases: stem.descriptor.reader.FileSkipped

File contents could not be parsed as descriptor data.

Parameters:exception (ValueError) -- issue that arose when parsing
exception stem.descriptor.reader.UnrecognizedType(mime_type)[source]

Bases: stem.descriptor.reader.FileSkipped

File doesn't contain descriptor data. This could either be due to its file type or because it doesn't conform to a recognizable descriptor type.

Parameters:mime_type (tuple) -- the (type, encoding) tuple provided by mimetypes.guess_type()
exception stem.descriptor.reader.ReadFailed(read_exception)[source]

Bases: stem.descriptor.reader.FileSkipped

An IOError occurred while trying to read the file.

Parameters:exception (IOError) -- issue that arose when reading the file, None if this arose due to the file not being present
exception stem.descriptor.reader.FileMissing[source]

Bases: stem.descriptor.reader.ReadFailed

File does not exist.

stem.descriptor.reader.load_processed_files(path)[source]

Loads a dictionary of 'path => last modified timestamp' mappings, as persisted by save_processed_files(), from a file.

Parameters:

path (str) -- location to load the processed files dictionary from

Returns:

dict of 'path (str) => last modified unix timestamp (int)' mappings

Raises :
  • IOError if unable to read the file
  • TypeError if unable to parse the file's contents
stem.descriptor.reader.save_processed_files(path, processed_files)[source]

Persists a dictionary of 'path => last modified timestamp' mappings (as provided by the DescriptorReader's get_processed_files() method) so that they can be loaded later and applied to another DescriptorReader.

Parameters:
  • path (str) -- location to save the processed files dictionary to
  • processed_files (dict) -- 'path => last modified' mappings
Raises :
  • IOError if unable to write to the file
  • TypeError if processed_files is of the wrong type
class stem.descriptor.reader.DescriptorReader(target, validate=False, follow_links=False, buffer_size=100, persistence_path=None, document_handler='ENTRIES', **kwargs)[source]

Bases: object

Iterator for the descriptor data on the local file system. This can process text files, tarball archives (gzip or bzip2), or recurse directories.

By default this limits the number of descriptors that we'll read ahead before waiting for our caller to fetch some of them. This is included to avoid unbounded memory usage.

Our persistence_path argument is a convenient method to persist the listing of files we have processed between runs, however it doesn't allow for error handling. If you want that then use the load_processed_files() and save_processed_files() functions instead.

Parameters:
  • target (str,list) -- path or list of paths for files or directories to be read from
  • validate (bool) -- checks the validity of the descriptor's content if True, skips these checks otherwise
  • follow_links (bool) -- determines if we'll follow symlinks when traversing directories (requires python 2.6)
  • buffer_size (int) -- descriptors we'll buffer before waiting for some to be read, this is unbounded if zero
  • persistence_path (str) -- if set we will load and save processed file listings from this path, errors are ignored
  • document_handler (stem.descriptor.__init__.DocumentHandler) -- method in which to parse NetworkStatusDocument
  • kwargs (dict) -- additional arguments for the descriptor constructor
get_processed_files()[source]

For each file that we have read descriptor data from this provides a mapping of the form...

absolute path (str) => last modified unix timestamp (int)

This includes entries set through the set_processed_files() method. Each run resets this to only the files that were present during that run.

Returns:dict with the absolute paths and unix timestamp for the last modified times of the files we have processed
set_processed_files(processed_files)[source]

Sets the listing of the files we have processed. Most often this is used with a newly created DescriptorReader to pre-populate the listing of descriptor files that we have seen.

Parameters:processed_files (dict) -- mapping of absolute paths (str) to unix timestamps for the last modified time (int)
register_read_listener(listener)[source]

Registers a listener for when files are read. This is executed prior to processing files. Listeners are expected to be of the form...

my_listener(path)
Parameters:listener (functor) -- functor to be notified when files are read
register_skip_listener(listener)[source]

Registers a listener for files that are skipped. This listener is expected to be a functor of the form...

my_listener(path, exception)
Parameters:listener (functor) -- functor to be notified of files that are skipped to read errors or because they couldn't be parsed as valid descriptor data
get_buffered_descriptor_count()[source]

Provides the number of descriptors that are waiting to be iterated over. This is limited to the buffer_size that we were constructed with.

Returns:int for the estimated number of currently enqueued descriptors, this is not entirely reliable
start()[source]

Starts reading our descriptor files.

Raises :ValueError if we're already reading the descriptor files
stop()[source]

Stops further reading of descriptor files.