import sys
import ast
import csv
import signal
import faulthandler
import platform
import logging
import warnings
from argparse import ArgumentParser
from datetime import datetime
from astropy.table import Table
from .query import Query
from .cone import Cone
from .plugin_support import SmPluginSupport, AbstractResultWriter
[docs]class QueryRunner():
"""
"""
_first_instance = True
def __init__(self, args):
"""
args is assumed to be a Namespace object args with each attribute having a valid
value (no defaults will be applied here). Using vars(args) to show the object as
a dict would yield an object like this:
{'cone_file': 'input/cones-10000-0_05-0_25.py',
'cone_limit': 2,
'load_plugins': None,
'max_radius': 0.25,
'min_radius': 0,
'norun': True,
'num_cones': None,
'result_dir': 'output',
'save_results': True,
'services': 'input/stsci/ps_tap.py',
'start_index': 14,
'tap_mode': 'async',
'verbose': False,
'writers': ['csv_writer:outfile=output/ps-tap-2020-08-02-16:19:37.484191.csv',
'some_writer']}
"""
# reset this flag for each instance
self._first_stat = True
self._args = args # All the args rolled up. We can get rid of the others.
services = getattr(args, 'services', None)
if services is not None:
self._services = self._read_if_file(services)
else:
# We must be in replay mode.
self._services = self._read_if_file(args.file_to_replay)
# Use getattr for cone_file since it won't exist for replay use cases.
self._cones = self._read_if_file(getattr(args, 'cone_file', None))
self._result_dir = args.result_dir
self._starting_cone = int(args.start_index)
self._cone_limit = int(args.cone_limit)
self._tap_mode = args.tap_mode
self._user_agent = args.user_agent
self._save_results = args.save_results
self._verbose = args.verbose
self._writer_specs = args.writers
self._writers_descs = []
self._writers = []
self._load_plugins(args)
# This flag will stay False for all future instances.
self._first_instance = False
def _load_plugins(self, args):
if self._first_instance:
SmPluginSupport.load_builtin_plugins()
# Load user plugins.
if args.load_plugins is None:
SmPluginSupport.load_plugins()
else:
SmPluginSupport.load_plugins(plugins=args.load_plugins)
if self._writer_specs is None:
# Default to the csv_writer and its default kwargs.
self._writer_specs = ['csv_writer']
for spec in self._writer_specs:
# Substitute time formats in writer templates,
# such as '%Y-%m-%d %H:%M:%S.%f'
now = datetime.now()
spec_with_time = now.strftime(spec)
plugin = AbstractResultWriter.get_plugin_from_spec(spec_with_time)
self._writers_descs.append(plugin)
[docs] def run(self):
"""
"""
self._validate_services(self._services)
for wdesc in self._writers_descs:
w = wdesc.cls()
w.begin(self._args, **wdesc.kwargs)
self._writers.append(w)
if self._cones is not None:
self._run_with_cones()
else:
self._run_services_only()
for w in self._writers:
w.end()
def _validate_services(self, services):
if len(services) == 0:
warnings.warn('Service list is empty. Nothing will be timed.')
else:
first_stype = self.getval(services[0], 'service_type')
for service in services[1:]:
stype = self.getval(service, 'service_type')
if stype != first_stype:
warnings.warn('Differing service_type values found in service list.'
' Some result writers may fail.')
break
def _run_with_cones(self):
cone_index = 0
cones_run = 0
for cone in self._cones:
if cone_index >= self._starting_cone:
cones_run += 1
if cones_run > self._cone_limit:
break
for service in self._services:
# Don't use the previous results upon new exception.
query = None
try:
query = Query(service, (cone['ra'], cone['dec']),
cone['radius'], self._result_dir,
tap_mode=self._tap_mode,
agent=self._user_agent,
save_results=self._save_results,
verbose=self._verbose)
query.run()
except Exception as e:
msg = f'Query error for cone {cone}, service {service}: {repr(e)}'
query._handle_exc(msg, trace=True)
try:
self._collect_stats(query.stats)
except Exception as e:
msg = f'Unable to write stats for cone {cone}, service {service}: {repr(e)}'
query._handle_exc(msg)
cone_index += 1
def _run_services_only(self):
cone_index = 0
cones_run = 0
for service in self._services:
if cone_index >= self._starting_cone:
cones_run += 1
if cones_run > self._cone_limit:
break
query = Query(service, None, None, self._result_dir,
tap_mode=self._tap_mode,
agent=self._user_agent,
save_results=self._save_results,
verbose=self._verbose)
query.run()
try:
self._collect_stats(query.stats)
except Exception as e:
msg = f'Unable to write stats for service {service}: {repr(e)}'
query._handle_exc(msg)
cone_index += 1
def _collect_stats(self, stats):
self._output_stats_row(stats)
def _output_stats_row(self, stats):
for w in self._writers:
w.one_result(stats)
def _output_stats_row_to_file(self, stats, stat_file):
writer = csv.DictWriter(stat_file, dialect='excel',
fieldnames=stats.columns())
if self._first_stat:
self._first_stat = False
writer.writeheader()
writer.writerow(stats.row_values())
def _read_if_file(self, obj):
val = obj
if isinstance(obj, str):
# Read from file
if obj.endswith('.py'):
# read as Python literal, then into Table
with open(obj, 'r') as f:
data = ast.literal_eval(f.read())
val = Table(rows=data)
else:
# assume csv and read into Table
val = Table.read(obj, format='ascii.csv')
return val
[docs] def getval(self, obj, key, default=None):
"""
Gets the value as either an attribute val or key val.
"""
val = getattr(obj, key, None)
if val is None:
try:
val = obj[key]
except KeyError:
val = default
return val
################################################################################
# run time routines
conegen_defaults = {
'min_radius': 0,
'max_radius': 0.25
}
conelist_defaults = {
'start_index': 0,
'cone_limit': 100000000
}
[docs]def sm_query(input_args=None):
args = _parse_query(input_args)
_init_logging(args)
_print_arg_info(args)
# Get the input from the specified file or have it autogenerated.
if args.cone_file is None:
args.cone_file = Cone.generate_random(args.num_cones, args.min_radius, args.max_radius)
# Run the queries
qr = QueryRunner(args)
qr.run()
def _parse_query(input_args):
"""
# Parse args and apply defaults.
"""
parser = _create_query_argparser()
# Parse the arguments. If args is None, then the args implicitly come from sys.argv.
args = parser.parse_args(input_args)
# Catch SIGHUP, SIGQUIT and SIGTERM to allow running in the background.
catch_signals()
# Validate args.
if ((args.min_radius is not None or
args.max_radius is not None) and
args.num_cones is None):
parser.error(message='argument --num-cones is required when '
'--min-radius or --max-radius are present.')
if args.num_cones is None and args.cone_file is None:
parser.error(message='Either --num-cones or --cone_file must be present\n'
' to specify what values go into the service file templates.')
# Apply defaults that couldn't be built in.
apply_query_defaults(args, conegen_defaults)
apply_query_defaults(args, conelist_defaults)
if args.writers is None:
# Default to the csv_writer and its default output file.
args.writers = ['csv_writer']
return args
[docs]def sm_replay(input_args=None):
args = _parse_replay(input_args)
_init_logging(args)
_print_arg_info(args)
# Run the queries
qr = QueryRunner(args)
qr.run()
def _print_arg_info(args):
# Print arg info and exit if --norun specified
if args.verbose or args.norun:
# print arg info
import pprint
pp = pprint.PrettyPrinter(width=100, stream=sys.stdout, compact=True)
arg_values = pp.pformat(vars(args))
msg = f'Argument values after parsing and applying defaults:\n{arg_values}'
if args.norun:
print(msg)
exit(0)
else:
logging.info(msg)
def _parse_replay(input_args):
"""
# Parse args and apply defaults.
"""
parser = _create_replay_argparser()
# Parse the arguments. If args is None, then the args implicitly come from sys.argv.
args = parser.parse_args(input_args)
# Catch SIGHUP, SIGQUIT and SIGTERM to allow running in the background.
catch_signals()
# Apply defaults that couldn't be built in.
apply_query_defaults(args, conelist_defaults)
if args.writers is None:
# Default to the csv_writer and its default output file.
args.writers = ['csv_writer']
return args
def _create_query_argparser():
parser = ArgumentParser(description='Measure query performance.')
# Add positional args.
parser.add_argument(
'services', help='File containing list of services to query')
# Add general args.
parser.add_argument('-r', '--result_dir', dest='result_dir', default='results',
help='The directory in which to put query result files.'
' Unless --save_results is specified, each query result file'
' will be deleted after statistics are gathered for the query.',
metavar='result_dir')
parser.add_argument('-l', '--load_plugins', dest='load_plugins', metavar='plugin_dir_or_file',
help='Directory or file from which to load user plug-ins. '
'If not specified, and there is a "plugins" subdirectory, plugin '
'files will be loaded from there.')
parser.add_argument('-w', '--writer', dest='writers', action='append',
help="Name and kwargs of a writer plug-in to use."
"Format is writer_name[:arg1=val1[,arg2=val2...]]"
" May appear multiple times to specify multiple writers."
" May contain Python datetime format elements which will be"
" substituted with appropriate elements of the current time"
" (e.g., results-'%%m-%%d-%%H:%%M:%%S'.py)",
metavar='writer')
parser.add_argument('-s', '--save_results', dest='save_results', action='store_true',
help='Save the query result data files. Without this argument, '
'the query result file will be deleted after metadata is gathered '
'for the query.')
parser.add_argument('-t', '--tap_mode', dest='tap_mode',
choices={'sync', 'async'}, default='async',
help='How to run TAP queries (default=async)')
parser.add_argument('-u', '--user_agent', dest='user_agent',
default=None,
help='Override the User-Agent used for queries (default=None)')
parser.add_argument('-n', '--norun', dest='norun', action='store_true',
help='Display summary of command arguments without '
'performing any actions')
parser.add_argument('-v', '--verbose', dest='verbose',
action='store_true',
help='Print additional information to stderr')
# Add cone arguments.
cone_types = parser.add_mutually_exclusive_group()
cone_types.add_argument('--num_cones', type=int, metavar='num_cones',
help='Number of cones to generate')
cone_types.add_argument('--cone_file', metavar='cone_file',
help='Path of the file containing the individual query inputs.')
cone_random = parser.add_argument_group()
cone_random.add_argument(
'--min_radius', type=float, help='Minimum radius (deg).'
f' Default={conegen_defaults["min_radius"]}')
cone_random.add_argument(
'--max_radius', type=float, help='Maximum radius (deg).'
f' Default={conegen_defaults["max_radius"]}')
cone_file = parser.add_argument_group()
cone_file.add_argument(
'--start_index', type=int, metavar='start_index',
help='Start with this cone in cone file'
f' Default={conelist_defaults["start_index"]}')
cone_file.add_argument(
'--cone_limit', type=int, metavar='cone_limit',
help='Maximum number of cones to query'
f' Default={conelist_defaults["cone_limit"]}')
return parser
def _create_replay_argparser():
parser = ArgumentParser(description='Measure query replay performance.')
# Add positional args.
parser.add_argument('file_to_replay',
help='File containing the results of a previous set of query timings.')
# Add general args.
parser.add_argument('-r', '--result_dir', dest='result_dir', default='results',
help='The directory in which to put query result files.',
metavar='result_dir')
parser.add_argument('-l', '--load_plugins', dest='load_plugins', metavar='plugin_dir_or_file',
help='Directory or file from which to load user plug-ins. '
'If not specified, and there is a "plugins" subdirectory, plugin '
'files will be loaded from there.')
parser.add_argument('-w', '--writer', dest='writers', action='append',
help="Name and kwargs of a writer plug-in to use."
"Format is writer_name[:arg1=val1[,arg2=val2...]]"
" May appear multiple times to specify multiple writers."
" May contain Python datetime format elements which will be"
" substituted with appropriate elements of the current time"
" (e.g., results-'%%m-%%d-%%H:%%M:%%S'.py)",
metavar='writer')
parser.add_argument('-s', '--save_results', dest='save_results', action='store_true',
help='Save the query result data files. Without this argument, '
'the query result file will be deleted after metadata is gathered '
'for the query.')
parser.add_argument('-t', '--tap_mode', dest='tap_mode',
choices={'sync', 'async'}, default='async',
help='How to run TAP queries (default=async)')
parser.add_argument('-u', '--user_agent', dest='user_agent',
default=None,
help='Override the User-Agent used for queries (default=None)')
parser.add_argument('-n', '--norun', dest='norun', action='store_true',
help='Display summary of command arguments without '
'performing any actions')
parser.add_argument('-v', '--verbose', dest='verbose',
action='store_true',
help='Print additional information to stderr')
# Add cone arguments.
parser.add_argument(
'--start_index', type=int, metavar='start_index',
help='Start with this cone in cone file'
f' Default={conelist_defaults["start_index"]}')
parser.add_argument(
'--cone_limit', type=int, metavar='cone_limit',
help='Maximum number of cones to query'
f' Default={conelist_defaults["cone_limit"]}')
return parser
[docs]def receiveSignal(signalNumber, frame):
now = datetime.now()
dtstr = now.strftime('%Y-%m-%d %H:%M:%S.%f')
logging.warning(f'Received signal {signalNumber} at {dtstr}', file=sys.stderr,
flush=True)
[docs]def receiveSIGTERM(signalNumber, frame):
now = datetime.now()
dtstr = now.strftime('%Y-%m-%d %H:%M:%S.%f')
logging.warning(f'Received signal {signalNumber} at {dtstr}', file=sys.stderr,
flush=True)
faulthandler.dump_traceback()
[docs]def catch_signals():
"""
Catch SIGHUP, SIGQUIT and SIGTERM to allow running in the background.
When these signals are caught, a message will go to stderr.
SIGTERM also cause a stack trace to be sent to stderr.
"""
# register the signals to be caught
if platform.system() != 'Windows':
try:
signal.signal(signal.SIGHUP, receiveSignal)
except AttributeError as e:
logging.warning(f'Warning: unable to add signal.SIGHUP handler: {repr(e)}')
try:
signal.signal(signal.SIGQUIT, receiveSignal)
except AttributeError as e:
logging.warning(f'Warning: unable to add signal.SIGQUIT handler: {repr(e)}')
# SIGTERM should be available on Windows.
try:
signal.signal(signal.SIGTERM, receiveSignal)
except AttributeError as e:
logging.warning(f'Warning: unable to add signal.SIGTERM handler: {repr(e)}')
[docs]def apply_query_defaults(parsed_args, defaults):
for k in defaults:
if getattr(parsed_args, k) is None:
setattr(parsed_args, k, defaults[k])
def _enable_requests_logging():
import http.client as http_client
http_client.HTTPConnection.debuglevel = 1
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
def _init_logging(args):
level = logging.WARNING
if (args.verbose):
level = logging.DEBUG
_enable_requests_logging()
logging.basicConfig(format='%(levelname)s: (%(asctime)s) %(message)s',
level=level,
datefmt='%Y-%m-%d %H:%M:%S')