Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
schedule.py 5.07 KiB
#!/usr/bin/python

import nomad
from nomad.api import exceptions
import yaml
import json
import sys
from datetime import datetime
import time
import os
import configparser
import itertools

CONFIG = '/root/repos.yaml'
REPOPATH = '/etc/yum.repos.d/'
JOBNAME = 'reposync'
STATUS_CHECK_SECS = 5
REPOID = os.getenv('NOMAD_META_REPOID', None)

def log(**kwargs):
    print(json.dumps(kwargs))

def debug(**kwargs):
    kwargs.update({'message_type': 'debug'})
    log(**kwargs)

def error(**kwargs):
    kwargs.update({'message_type': 'error'})
    log(**kwargs)

try:
    nomad_token = os.environ['NOMAD_TOKEN']
except KeyError:
    error(comment='Error obtaining Nomad token!')
    sys.exit(1)

debug(comment='Connecting to Nomad at: {}'.format(os.getenv('NOMAD_ADDR', None)))
n = nomad.Nomad(secure=True, token=nomad_token, timeout=5, verify=False)
#print('nomad self token: {}'.format(n.acl.get_self_token()))

try:
    with open(CONFIG) as f:
        repoconf = yaml.load(f.read())
except IOError as e:
    error(comment='Unable to read configuration file {}'.format(CONFIG), exception=str(e))
    sys.exit(1)

# Don't fail even with an empty config file
if not repoconf:
    repoconf = {}

jobs = {}
for repofile in os.listdir(REPOPATH):
    debug(comment='Syncing {}'.format(repofile))

    try:
        config = configparser.ConfigParser()
        with open('{}/{}'.format(REPOPATH, repofile), 'r') as f:
            config.read_file(f)
    except IOError as e:
        error(comment='error while reading repo: {}/{}'.format(REPOPATH, repofile), exception=str(e))
        continue

    for rid in config.sections():
        # If we've been called as a parametrized job, only launch the job we've been asked for
        if REPOID and rid != REPOID:
            continue

        try:
            PATHROOT = repoconf[repofile]['pathroot']
        except KeyError:
            PATHROOT = ''
        try:
            PATHCUT = repoconf[repofile]['pathcut']
        except KeyError:
            PATHCUT = '//'

        url = config[rid]['baseurl']
        path = url.split(PATHCUT)[-1].lstrip('/')
        if PATHROOT:
            path = '{}/{}'.format(PATHROOT.strip('/'), path)

        # Recreate the repo file
        yumfile = '\n'.join(['='.join(x) for x in config.items(rid)])

        #print(' Dispatching job for {}'.format(rid))
        payload = {
            'REPOID': rid,
            'REPOPATH': path,
            'REPOFILE': yumfile,
        }
        #print('  payload: {}'.format(json.dumps(payload)))

        try:
            job = n.job.dispatch_job(JOBNAME, meta={"PARENT_JOB": os.getenv('NOMAD_JOB_NAME', None)}, payload=payload)
            log(comment='dispatched job', repoid=rid, job=job, payload=payload)
            jobs[job['DispatchedJobID']] = 'created'

        except (exceptions.URLNotFoundNomadException,
                exceptions.BaseNomadException) as e:
            error(comment='Error dispatching job', exception=e.nomad_resp.text)
            raise e

log(comment='Waiting for jobs to finish')
results = {}
while jobs:
    for j in jobs.keys():
        state = n.job.get_job(j)
        log(comment='Job state', job=j, state=state['Status'])
        if state['Status'] == 'dead':
            allocations = n.job.get_allocations(j)

            output = []
            for a in allocations:
                try:
                    o = n.client.cat.read_file(a['ID'], '/alloc/output.json')
                    try:
                        o = json.loads(o)
                    except:
                        pass
                    output.append(o)
                except nomad.api.exceptions.URLNotFoundNomadException as e:
                    error(comment='File not found for allocation {}'.format(a['ID']), exception=str(e))
            results[j] = output

            del jobs[j]
        else:
            jobs[j] = state['Status']
    time.sleep(STATUS_CHECK_SECS)

log(comment='All done!')
final_results = []
for j in results.keys():
    # Filter some stuff that may be too large
    res = results[j]
    for r in res:
        try:
            del r['changes']
        except KeyError:
            pass
        try:
            r['count_diff'] = r['post_count']-r['pre_count']
        except KeyError:
            pass

    if len(res) == 1:
        log(comment='Job result', job=j, result=res[0])
    else:
        log(comment='Job result', job=j, result=res[0], all_results=res)


exit_codes = []
total_pre_count = []
total_post_count = []
for r in results.values():
    r = r[0]
    exit_codes.append(r.get('exit_code', None))
    try:
        total_pre_count.append(r['pre_count'])
        total_post_count.append(r['post_count'])
    except KeyError:
        pass

total_pre_count  = sum(total_pre_count)
total_post_count = sum(total_post_count)
total_results_by_exit = dict([(x, exit_codes.count(x)) for x in set(exit_codes)])

log(comment='Job summary',
    total_results              = len(exit_codes),
    total_results_by_exit_code = total_results_by_exit,
    total_pre_count            = total_pre_count,
    total_post_count           = total_post_count,
    total_count_diff           = total_post_count-total_pre_count,
    )