-
Alex Iribarren authoredAlex Iribarren authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
schedule.py 5.07 KiB
#!/usr/bin/python
import nomad
from nomad.api import exceptions
import yaml
import json
import sys
from datetime import datetime
import time
import os
import configparser
import itertools
CONFIG = '/root/repos.yaml'
REPOPATH = '/etc/yum.repos.d/'
JOBNAME = 'reposync'
STATUS_CHECK_SECS = 5
REPOID = os.getenv('NOMAD_META_REPOID', None)
def log(**kwargs):
print(json.dumps(kwargs))
def debug(**kwargs):
kwargs.update({'message_type': 'debug'})
log(**kwargs)
def error(**kwargs):
kwargs.update({'message_type': 'error'})
log(**kwargs)
try:
nomad_token = os.environ['NOMAD_TOKEN']
except KeyError:
error(comment='Error obtaining Nomad token!')
sys.exit(1)
debug(comment='Connecting to Nomad at: {}'.format(os.getenv('NOMAD_ADDR', None)))
n = nomad.Nomad(secure=True, token=nomad_token, timeout=5, verify=False)
#print('nomad self token: {}'.format(n.acl.get_self_token()))
try:
with open(CONFIG) as f:
repoconf = yaml.load(f.read())
except IOError as e:
error(comment='Unable to read configuration file {}'.format(CONFIG), exception=str(e))
sys.exit(1)
# Don't fail even with an empty config file
if not repoconf:
repoconf = {}
jobs = {}
for repofile in os.listdir(REPOPATH):
debug(comment='Syncing {}'.format(repofile))
try:
config = configparser.ConfigParser()
with open('{}/{}'.format(REPOPATH, repofile), 'r') as f:
config.read_file(f)
except IOError as e:
error(comment='error while reading repo: {}/{}'.format(REPOPATH, repofile), exception=str(e))
continue
for rid in config.sections():
# If we've been called as a parametrized job, only launch the job we've been asked for
if REPOID and rid != REPOID:
continue
try:
PATHROOT = repoconf[repofile]['pathroot']
except KeyError:
PATHROOT = ''
try:
PATHCUT = repoconf[repofile]['pathcut']
except KeyError:
PATHCUT = '//'
url = config[rid]['baseurl']
path = url.split(PATHCUT)[-1].lstrip('/')
if PATHROOT:
path = '{}/{}'.format(PATHROOT.strip('/'), path)
# Recreate the repo file
yumfile = '\n'.join(['='.join(x) for x in config.items(rid)])
#print(' Dispatching job for {}'.format(rid))
payload = {
'REPOID': rid,
'REPOPATH': path,
'REPOFILE': yumfile,
}
#print(' payload: {}'.format(json.dumps(payload)))
try:
job = n.job.dispatch_job(JOBNAME, meta={"PARENT_JOB": os.getenv('NOMAD_JOB_NAME', None)}, payload=payload)
log(comment='dispatched job', repoid=rid, job=job, payload=payload)
jobs[job['DispatchedJobID']] = 'created'
except (exceptions.URLNotFoundNomadException,
exceptions.BaseNomadException) as e:
error(comment='Error dispatching job', exception=e.nomad_resp.text)
raise e
log(comment='Waiting for jobs to finish')
results = {}
while jobs:
for j in jobs.keys():
state = n.job.get_job(j)
log(comment='Job state', job=j, state=state['Status'])
if state['Status'] == 'dead':
allocations = n.job.get_allocations(j)
output = []
for a in allocations:
try:
o = n.client.cat.read_file(a['ID'], '/alloc/output.json')
try:
o = json.loads(o)
except:
pass
output.append(o)
except nomad.api.exceptions.URLNotFoundNomadException as e:
error(comment='File not found for allocation {}'.format(a['ID']), exception=str(e))
results[j] = output
del jobs[j]
else:
jobs[j] = state['Status']
time.sleep(STATUS_CHECK_SECS)
log(comment='All done!')
final_results = []
for j in results.keys():
# Filter some stuff that may be too large
res = results[j]
for r in res:
try:
del r['changes']
except KeyError:
pass
try:
r['count_diff'] = r['post_count']-r['pre_count']
except KeyError:
pass
if len(res) == 1:
log(comment='Job result', job=j, result=res[0])
else:
log(comment='Job result', job=j, result=res[0], all_results=res)
exit_codes = []
total_pre_count = []
total_post_count = []
for r in results.values():
r = r[0]
exit_codes.append(r.get('exit_code', None))
try:
total_pre_count.append(r['pre_count'])
total_post_count.append(r['post_count'])
except KeyError:
pass
total_pre_count = sum(total_pre_count)
total_post_count = sum(total_post_count)
total_results_by_exit = dict([(x, exit_codes.count(x)) for x in set(exit_codes)])
log(comment='Job summary',
total_results = len(exit_codes),
total_results_by_exit_code = total_results_by_exit,
total_pre_count = total_pre_count,
total_post_count = total_post_count,
total_count_diff = total_post_count-total_pre_count,
)