Stem Docs

stem.descriptor.bandwidth_file

Source code for stem.descriptor.bandwidth_file

# Copyright 2019, Damian Johnson and The Tor Project
# See LICENSE for licensing information

"""
Parsing for Bandwidth Authority metrics as described in Tor's
`bandwidth-file-spec <https://gitweb.torproject.org/torspec.git/tree/bandwidth-file-spec.txt>`_.

**Module Overview:**

::

  BandwidthFile - Tor bandwidth authority measurements.

.. versionadded:: 1.8.0
"""

import datetime
import io
import time

import stem.util.str_tools

from stem.descriptor import (
  _mappings_for,
  Descriptor,
)

try:
  # added in python 2.7
  from collections import OrderedDict
except ImportError:
  from stem.util.ordereddict import OrderedDict

# Four character dividers are allowed for backward compatability, but five is
# preferred.

HEADER_DIV = b'====='
HEADER_DIV_ALT = b'===='


[docs]class RecentStats(object): """ Statistical information collected over the last 'data_period' (by default five days). :var int consensus_count: number of consensuses published during this period :var int prioritized_relays: number of relays prioritized to be measured :var int prioritized_relay_lists: number of times a set of relays were prioritized to be measured :var int measurement_attempts: number of relay measurements we attempted :var int measurement_failures: number of measurement attempts that failed :var RelayFailures relay_failures: number of relays we failed to measure """ def __init__(self): self.consensus_count = None self.prioritized_relays = None self.prioritized_relay_lists = None self.measurement_attempts = None self.measurement_failures = None self.relay_failures = RelayFailures()
[docs]class RelayFailures(object): """ Summary of the number of relays we were unable to measure. :var int no_measurement: number of relays that did not have any successful measurements :var int insuffient_period: number of relays whos measurements were collected over a period that was too small (1 day by default) :var int insufficient_measurements: number of relays we did not collect enough measurements for (2 by default) :var int stale: number of relays whos latest measurement is too old (5 days by default) """ def __init__(self): self.no_measurement = None self.insuffient_period = None self.insufficient_measurements = None self.stale = None # Converts header attributes to a given type. Malformed fields should be # ignored according to the spec.
def _str(val): return val # already a str def _int(val): return int(val) if (val and val.isdigit()) else None def _date(val): try: return stem.util.str_tools._parse_iso_timestamp(val) except ValueError: return None # not an iso formatted date def _csv(val): return list(map(lambda v: v.strip(), val.split(','))) if val is not None else None # mapping of attributes => (header, type) HEADER_ATTR = { # version 1.1.0 introduced headers 'version': ('version', _str), 'software': ('software', _str), 'software_version': ('software_version', _str), 'earliest_bandwidth': ('earliest_bandwidth', _date), 'latest_bandwidth': ('latest_bandwidth', _date), 'created_at': ('file_created', _date), 'generated_at': ('generator_started', _date), # version 1.2.0 additions 'consensus_size': ('number_consensus_relays', _int), 'eligible_count': ('number_eligible_relays', _int), 'eligible_percent': ('percent_eligible_relays', _int), 'min_count': ('minimum_number_eligible_relays', _int), 'min_percent': ('minimum_percent_eligible_relays', _int), # version 1.3.0 additions 'scanner_country': ('scanner_country', _str), 'destinations_countries': ('destinations_countries', _csv), # version 1.4.0 additions 'time_to_report_half_network': ('time_to_report_half_network', _int), 'recent_stats.consensus_count': ('recent_consensus_count', _int), 'recent_stats.prioritized_relay_lists': ('recent_priority_list_count', _int), 'recent_stats.prioritized_relays': ('recent_priority_relay_count', _int), 'recent_stats.measurement_attempts': ('recent_measurement_attempt_count', _int), 'recent_stats.measurement_failures': ('recent_measurement_failure_count', _int), 'recent_stats.relay_failures.no_measurement': ('recent_measurements_excluded_error_count', _int), 'recent_stats.relay_failures.insuffient_period': ('recent_measurements_excluded_near_count', _int), 'recent_stats.relay_failures.insufficient_measurements': ('recent_measurements_excluded_few_count', _int), 'recent_stats.relay_failures.stale': ('recent_measurements_excluded_old_count', _int), } HEADER_DEFAULT = { 'version': '1.0.0', # version field was added in 1.1.0 } def _parse_file(descriptor_file, validate = False, **kwargs): """ Iterates over the bandwidth authority metrics in a file. :param file descriptor_file: file with descriptor content :param bool validate: checks the validity of the descriptor's content if **True**, skips these checks otherwise :param dict kwargs: additional arguments for the descriptor constructor :returns: :class:`stem.descriptor.bandwidth_file.BandwidthFile` object :raises: * **ValueError** if the contents is malformed and validate is **True** * **IOError** if the file can't be read """ yield BandwidthFile(descriptor_file.read(), validate, **kwargs) def _parse_header(descriptor, entries): header = OrderedDict() content = io.BytesIO(descriptor.get_bytes()) content.readline() # skip the first line, which should be the timestamp index = 1 version_index = None while True: line = content.readline().strip() if not line: break # end of the content elif line in (HEADER_DIV, HEADER_DIV_ALT): break # end of header elif not header and b'node_id=' in line: break # version 1.0 doesn't have any headers if b'=' in line: key, value = stem.util.str_tools._to_unicode(line).split('=', 1) header[key] = value if key == 'version': version_index = index else: raise ValueError("Header expected to be key=value pairs, but had '%s'" % line) index += 1 descriptor.header = header descriptor.recent_stats = RecentStats() for full_attr, (keyword, cls) in HEADER_ATTR.items(): obj = descriptor for attr in full_attr.split('.')[:-1]: obj = getattr(obj, attr) setattr(obj, full_attr.split('.')[-1], cls(header.get(keyword, HEADER_DEFAULT.get(full_attr)))) if version_index is not None and version_index != 1: raise ValueError("The 'version' header must be in the second position") def _parse_timestamp(descriptor, entries): first_line = io.BytesIO(descriptor.get_bytes()).readline().strip() if first_line.isdigit(): descriptor.timestamp = datetime.datetime.utcfromtimestamp(int(first_line)) else: raise ValueError("First line should be a unix timestamp, but was '%s'" % first_line) def _parse_body(descriptor, entries): # In version 1.0.0 the body is everything after the first line. Otherwise # it's everything after the header's divider. content = io.BytesIO(descriptor.get_bytes()) if descriptor.version == '1.0.0': content.readline() # skip the first line else: while content.readline().strip() not in ('', HEADER_DIV, HEADER_DIV_ALT): pass # skip the header measurements = {} for line in content.readlines(): line = stem.util.str_tools._to_unicode(line.strip()) attr = dict(_mappings_for('measurement', line)) fingerprint = attr.get('node_id', '').lstrip('$') # bwauths prefix fingerprints with '$' if not fingerprint: raise ValueError("Every meaurement must include 'node_id': %s" % line) elif fingerprint in measurements: raise ValueError('Relay %s is listed multiple times. It should only be present once.' % fingerprint) measurements[fingerprint] = attr descriptor.measurements = measurements
[docs]class BandwidthFile(Descriptor): """ Tor bandwidth authority measurements. :var dict measurements: **\\*** mapping of relay fingerprints to their bandwidth measurement metadata :var dict header: **\\*** header metadata :var datetime timestamp: **\\*** time when these metrics were published :var str version: **\\*** document format version :var str software: application that generated these metrics :var str software_version: version of the application that generated these metrics :var datetime earliest_bandwidth: time of the first sampling :var datetime latest_bandwidth: time of the last sampling :var datetime created_at: time when this file was created :var datetime generated_at: time when collection of these metrics started :var int consensus_size: number of relays in the consensus :var int eligible_count: relays with enough measurements to be included :var int eligible_percent: percentage of consensus with enough measurements :var int min_count: minimum eligible relays for results to be provided :var int min_percent: minimum measured percentage of the consensus :var str scanner_country: country code where this scan took place :var list destinations_countries: all country codes that were scanned :var int time_to_report_half_network: estimated number of seconds required to measure half the network, given recent measurements :var RecentStats recent_stats: statistical information collected over the last 'data_period' (by default five days) **\\*** attribute is either required when we're parsed with validation or has a default value, others are left as **None** if undefined """ TYPE_ANNOTATION_NAME = 'bandwidth-file' ATTRIBUTES = { 'timestamp': (None, _parse_timestamp), 'header': ({}, _parse_header), 'measurements': ({}, _parse_body), } ATTRIBUTES.update(dict([(k, (None, _parse_header)) for k in HEADER_ATTR.keys()])) @classmethod
[docs] def content(cls, attr = None, exclude = (), sign = False): """ Creates descriptor content with the given attributes. This descriptor type differs somewhat from others and treats our attr/exclude attributes as follows... * 'timestamp' is a reserved key for our mandatory header unix timestamp. * 'content' is a reserved key for our bandwidth measurement lines. * All other keys are treated as header fields. For example... :: BandwidthFile.content({ 'timestamp': '12345', 'version': '1.2.0', 'content': [], }) """ if sign: raise NotImplementedError('Signing of %s not implemented' % cls.__name__) header = OrderedDict(attr) if attr is not None else OrderedDict() timestamp = header.pop('timestamp', str(int(time.time()))) content = header.pop('content', []) version = header.get('version', HEADER_DEFAULT.get('version')) lines = [] if 'timestamp' not in exclude: lines.append(stem.util.str_tools._to_bytes(timestamp)) if version == '1.0.0' and header: raise ValueError('Headers require BandwidthFile version 1.1 or later') elif version != '1.0.0': # ensure 'version' is the second header if 'version' not in exclude: lines.append(stem.util.str_tools._to_bytes('version=%s' % header.pop('version'))) for k, v in header.items(): lines.append(stem.util.str_tools._to_bytes('%s=%s' % (k, v))) lines.append(HEADER_DIV) for measurement in content: lines.append(stem.util.str_tools._to_bytes(measurement)) return b'\n'.join(lines)
def __init__(self, raw_content, validate = False): super(BandwidthFile, self).__init__(raw_content, lazy_load = not validate) if validate: _parse_timestamp(self, None) _parse_header(self, None) _parse_body(self, None)