Commit 67d83ce0 authored by Maria Arsuaga Rios's avatar Maria Arsuaga Rios
Browse files

FTS-792:Drop snapshot API

parent 58a7d78d
Pipeline #74250 passed with stage
in 1 minute and 19 seconds
#!/usr/bin/env python
from fts3.cli import Snapshot
import logging
import sys
import traceback
if __name__ == "__main__":
try:
snapshot = Snapshot()
snapshot(sys.argv[1:])
except Exception, e:
logging.critical(str(e))
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
traceback.print_exc()
sys.exit(1)
......@@ -23,7 +23,6 @@ from jobshower import JobShower
from jobsubmitter import JobSubmitter
from jobdeletionsubmitter import JobDeletionSubmitter
from serverstatus import ServerStatus
from snapshot import Snapshot
from whoami import WhoAmI
import logging
import sys
......
# Copyright notice:
# Copyright CERN, 2014.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import simplejson as json
except:
import json
from base import Base
from fts3.rest.client import Inquirer
def _human_readable_snapshot(logger, snapshot):
for entry in snapshot:
logger.info("Source: %s" % entry.get('source_se'))
logger.info("Destination: %s" % entry.get('dest_se'))
logger.info("VO: %s" % entry.get('vo_name'))
logger.info("Max. Active: %d" % entry.get('max_active', 0))
logger.info("Active: %d" % entry.get('active', 0))
logger.info("Submitted: %d" % entry.get('submitted', 0))
logger.info("Finished: %d" % entry.get('finished', 0))
logger.info("Failed: %d" % entry.get('failed', 0))
ratio = entry.get('success_ratio', None)
if ratio:
logger.info("Success ratio: %.2f%%" % ratio)
else:
logger.info("Success ratio: -")
avg_thr = entry.get('avg_throughput', None)
if isinstance(avg_thr, float):
logger.info("Avg. Throughput: %.2f MB/s" % avg_thr)
elif isinstance(avg_thr, dict):
for interval, thr in sorted(avg_thr.iteritems(), key=lambda p: int(p[0])):
if thr is not None:
logger.info("Avg. Throughput (%2d): %.2f MB/s" % (int(interval), thr))
else:
logger.info("Avg. Throughput (%2d): -" % int(interval))
else:
logger.info("Avg. Throughput: -")
avg_queued = entry.get('avg_queued')
if avg_queued is not None:
logger.info("Avg. Queued: %d seconds" % avg_queued)
else:
logger.info("Avg. Queued: -")
frequent_error = entry.get('frequent_error', None)
if frequent_error and 'count' in frequent_error and 'reason' in frequent_error:
logger.info("Most frequent error: [%d] %s" % (frequent_error['count'], frequent_error['reason']))
else:
logger.info("Most frequent error: -")
limits = entry.get('limits', None)
if isinstance(limits, dict):
if limits.get('source', None):
logger.info("Max. Source Thr: %.2f" % limits['source'])
if limits.get('destination', None):
logger.info("Max. Dest. Thr: %.2f" % limits['destination'])
logger.info("\n")
class Snapshot(Base):
def __init__(self):
super(Snapshot, self).__init__(
description="""
This command can be used to retrieve the internal status FTS3 has on all pairs with ACTIVE transfers.
It allows to filter by VO, source SE and destination SE
""",
example="""
$ %(prog)s -s https://fts3-devel.cern.ch:8446
Source: gsiftp://whatever
Destination: gsiftp://whatnot
VO: dteam
Max. Active: 5
Active: 1
Submitted: 0
Finished: 0
Failed: 0
Success ratio: -
Avg. Throughput: -
Avg. Duration: -
Avg. Queued: 0 seconds
Most frequent error: -
"""
)
# Specific options
self.opt_parser.add_option('--vo', dest='vo',
help='filter by VO')
self.opt_parser.add_option('--source', dest='source',
help='filter by source SE')
self.opt_parser.add_option('--destination', dest='destination',
help='filter by destination SE')
def run(self):
context = self._create_context()
inquirer = Inquirer(context)
snapshot = inquirer.get_snapshot(self.options.vo, self.options.source, self.options.destination)
if self.options.json:
self.logger.info(json.dumps(snapshot, indent=2))
else:
_human_readable_snapshot(self.logger, snapshot)
......@@ -16,7 +16,6 @@
from ban import *
from delegate import *
from state import *
from snapshot import *
from submission import *
from whoami import *
from fts3.rest.client.context import Context
......
# Copyright notice:
# Copyright CERN, 2014.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from fts3.rest.client import Inquirer
def get_snapshot(context, vo=None, source=None, destination=None):
"""
Get a snapshot of the server
Args:
context: fts3.rest.client.context.Context instance
vo: Filter by vo. Can be left empty.
source: Filter by source SE. Can be left empty
destination: Filter by destination SE. Can be left empty.
Returns:
Decoded JSON message returned by the server (server snapshot)
"""
inquirer = Inquirer(context)
return inquirer.get_snapshot(vo, source, destination)
......@@ -64,19 +64,18 @@ class Inquirer(object):
def whoami(self):
return json.loads(self.context.get("/whoami"))
def get_snapshot(self, vo=None, source=None, destination=None):
if vo:
vo = urllib.quote(vo, '')
else:
vo = ''
if source:
source = urllib.quote(source, '')
else:
source = ''
if destination:
destination = urllib.quote(destination, '')
else:
destination = ''
return json.loads(
self.context.get("/snapshot?vo_name=%s&source_se=%s&dest_se=%s" % (vo, source, destination))
)
......@@ -223,10 +223,6 @@ def do_connect(config, map):
map.connect('/dm/rename', controller='datamanagement', action='rename',
conditions=dict(method=['POST']))
# Snapshot
map.connect('/snapshot', controller='snapshot', action='snapshot',
conditions=dict(method=['GET']))
# Banning
map.connect('/ban/se', controller='banning', action='ban_se',
conditions=dict(method=['POST']))
......
# Copyright notice:
# Copyright CERN, 2014.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
from datetime import datetime, timedelta
from pylons import request
from sqlalchemy import distinct
from fts3.model import Job, File, OptimizerActive, Optimize
from fts3rest.lib.api import doc
from fts3rest.lib.base import BaseController, Session
from fts3rest.lib.helpers import jsonify, misc
def _get_limits(source, destination):
source_thr = Session.query(Optimize.throughput)\
.filter(Optimize.source_se == source).filter(Optimize.throughput != None).all()
dest_thr = Session.query(Optimize.throughput)\
.filter(Optimize.dest_se == destination).filter(Optimize.throughput != None).all()
limits = dict()
if len(source_thr):
limits['source'] = source_thr[0][0]
if len(dest_thr):
limits['destination'] = dest_thr[0][0]
return limits
class SnapshotController(BaseController):
"""
Snapshot API
"""
@doc.query_arg('vo_name', 'Filter by VO name', required=False)
@doc.query_arg('source_se', 'Filter by source SE', required=False)
@doc.query_arg('dest_se', 'Filter by destination SE', required=False)
@jsonify
def snapshot(self):
"""
Get the current status of the server
"""
filter_source = request.params.get('source_se', None)
filter_dest = request.params.get('dest_se', None)
filter_vo = request.params.get('vo_name', None)
if filter_vo:
vos = [filter_vo]
else:
vos = map(lambda r: r[0], Session.query(distinct(File.vo_name)))
snapshot_list = list()
not_before = datetime.utcnow() - timedelta(hours=1)
for vo in vos:
pairs = Session.query(File.source_se, File.dest_se)\
.filter(File.vo_name == vo)\
.distinct()
if filter_source:
pairs = pairs.filter(File.source_se == filter_source)
if filter_dest:
pairs = pairs.filter(File.dest_se == filter_dest)
for source, destination in pairs:
pair_info = dict(vo_name=vo, source_se=source, dest_se=destination)
# Maximum allowed number of active
max_active = Session.query(OptimizerActive.active)\
.filter(OptimizerActive.source_se == source)\
.filter(OptimizerActive.dest_se == destination).all()
max_active = max_active[0] if len(max_active) else None
if max_active:
pair_info['max_active'] = max_active[0]
else:
pair_info['max_active'] = None
# Files for this pair+vo
files = Session.query(
File.file_state, File.tx_duration, File.throughput,
File.reason, Job.submit_time, File.start_time, File.finish_time, File.file_id
)\
.filter(File.job_id == Job.job_id)\
.filter(File.source_se == source)\
.filter(File.dest_se == destination)\
.filter(File.vo_name == vo)\
.filter((File.finish_time >= not_before) | (File.finish_time == None))\
.all()
# Current number of active
n_active = len(filter(lambda f: f[0] == 'ACTIVE', files))
pair_info['active'] = n_active
# Filter finished and failed
failed = filter(lambda f: f[0] == 'FAILED', files)
finished = filter(lambda f: f[0] == 'FINISHED', files)
# Number of queued
n_queued = sum(map(lambda f: 1 if f[0] == 'SUBMITTED' else 0, files))
pair_info['submitted'] = n_queued
# Average queue time
queued_times = map(
# start_time - submit_time
lambda f: (f[5] - f[4]),
filter(lambda f: f[6] is None and f[5] is not None, files)
)
avg_queued = misc.average(queued_times, timedelta(), misc.timedelta_to_seconds)
pair_info['avg_queued'] = avg_queued
# Success rate
n_failed = len(failed)
n_finished = len(finished)
n_total = float(n_failed + n_finished)
if n_total:
pair_info['success_ratio'] = n_finished / n_total
else:
pair_info['success_ratio'] = None
pair_info['finished'] = n_finished
pair_info['failed'] = n_failed
# Average throughput for last 60, 30, 15 and 5 minutes
avg_thr = dict()
now = datetime.utcnow()
for minutes in 60, 30, 15, 5:
tail = filter(
lambda f: f[2] and f[6] is None or f[6] >= now - timedelta(minutes=minutes),
finished
)
avg_thr[str(minutes)] = misc.average(map(lambda f: f[2], tail))
pair_info['avg_throughput'] = avg_thr
# Most frequent error
reasons = map(lambda f: f[3], failed)
reasons_count = [(reason, len(list(grouped))) for reason, grouped in itertools.groupby(reasons)]
reasons_count = sorted(reasons_count, key=lambda p: p[1], reverse=True)
if len(reasons_count) > 0:
pair_info['frequent_error'] = dict(
count=reasons_count[0][1],
reason=reasons_count[0][0]
)
else:
pair_info['frequent_error'] = None
# Limits
pair_info['limits'] = _get_limits(source, destination)
snapshot_list.append(pair_info)
return snapshot_list
......@@ -108,15 +108,6 @@ class TestOptions(TestController):
response = self.app.options('/optimizer/evolution', status=200)
self.assertItemsEqual(['GET', 'OPTIONS'], response.allow)
def test_options_snapshot(self):
"""
Test OPTIONS on snapshot urls
"""
self.setup_gridsite_environment()
response = self.app.options('/snapshot', status=200)
self.assertItemsEqual(['GET', 'OPTIONS'], response.allow)
def test_options_404(self):
"""
Test OPTIONS on a non-existing url
......
# Copyright notice:
# Copyright CERN, 2014.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import urllib
from insert_job import insert_job
from fts3rest.tests import TestController
def _group_by_triplet(snapshot):
new_snapshot = dict()
for triplet_info in snapshot:
triplet = (
str(triplet_info['source_se']),
str(triplet_info['dest_se']),
str(triplet_info['vo_name'])
)
new_snapshot[triplet] = triplet_info
return new_snapshot
class TestSnapshot(TestController):
"""
Test the snapshot API
"""
def setUp(self):
"""
Insert some registers into the tables
"""
TestController.setUp(self)
# Insert some values into the DB
insert_job('dteam', 'srm://source.se', 'srm://dest.es', 'ACTIVE')
insert_job('dteam', 'srm://source.se', 'srm://dest.es', 'SUBMITTED')
insert_job('dteam', 'srm://source.se', 'srm://dest.es', 'SUBMITTED')
insert_job(
'dteam', 'srm://source.se', 'srm://dest.es', 'FINISHED', duration=55, queued=10, thr=10
)
insert_job(
'atlas', 'srm://source.se', 'srm://dest.es', 'FINISHED', duration=100, queued=20, thr=100
)
insert_job(
'atlas', 'srm://source.se', 'srm://dest.es', 'FAILED', duration=150, queued=30, thr=200,
reason='DESTINATION Something something'
)
insert_job(
'atlas', 'gsiftp://source.se', 'gsiftp://dest.es', 'FAILED', duration=5000, queued=0,
reason='SOURCE Blah'
)
insert_job(
'atlas', 'srm://source.se', 'gsiftp://dest.es', 'FINISHED', duration=100, queued=20, thr=50
)
def test_query_all(self):
"""
Get snapshot information for all pairs and vos
"""
self.setup_gridsite_environment()
snapshot_raw = self.app.get(url="/snapshot", status=200).json
snapshot = _group_by_triplet(snapshot_raw)
self.assertEqual(4, len(snapshot_raw))
self.assertEqual(1, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['active'])
self.assertEqual(2, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['submitted'])
self.assertEqual(1, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['finished'])
self.assertEqual(0, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['failed'])
self.assertEqual(None, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['frequent_error'])
self.assertEqual(10, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['avg_queued'])
self.assertEqual(10, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['avg_throughput']['60'])
self.assertEqual(1.0, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['success_ratio'])
self.assertEqual(0, snapshot[('srm://source.se', 'srm://dest.es', 'atlas')]['active'])
self.assertEqual(0, snapshot[('srm://source.se', 'srm://dest.es', 'atlas')]['submitted'])
self.assertEqual(1, snapshot[('srm://source.se', 'srm://dest.es', 'atlas')]['finished'])
self.assertEqual(1, snapshot[('srm://source.se', 'srm://dest.es', 'atlas')]['failed'])
self.assertEqual(1, snapshot[('srm://source.se', 'srm://dest.es', 'atlas')]['frequent_error']['count'])
self.assertEqual(
'DESTINATION Something something',
snapshot[('srm://source.se', 'srm://dest.es', 'atlas')]['frequent_error']['reason']
)
# Note that only FINISHED must be count
self.assertEqual(25, snapshot[('srm://source.se', 'srm://dest.es', 'atlas')]['avg_queued'])
self.assertEqual(100, snapshot[('srm://source.se', 'srm://dest.es', 'atlas')]['avg_throughput']['30'])
self.assertEqual(0.5, snapshot[('srm://source.se', 'srm://dest.es', 'atlas')]['success_ratio'])
self.assertEqual(1, snapshot[('gsiftp://source.se', 'gsiftp://dest.es', 'atlas')]['frequent_error']['count'])
self.assertEqual(
'SOURCE Blah', snapshot[('gsiftp://source.se', 'gsiftp://dest.es', 'atlas')]['frequent_error']['reason']
)
def test_query_vo(self):
"""
Get snapshot for one specific VO
"""
self.setup_gridsite_environment()
snapshot_raw = self.app.get(url="/snapshot?vo_name=dteam", status=200).json
snapshot = _group_by_triplet(snapshot_raw)
self.assertEqual(1, len(snapshot_raw))
self.assertEqual(('srm://source.se', 'srm://dest.es', 'dteam'), snapshot.keys()[0])
self.assertEqual(1, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['active'])
self.assertEqual(2, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['submitted'])
self.assertEqual(1, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['finished'])
self.assertEqual(None, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['frequent_error'])
self.assertEqual(10, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['avg_queued'])
self.assertEqual(10, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['avg_throughput']['5'])
self.assertEqual(1.0, snapshot[('srm://source.se', 'srm://dest.es', 'dteam')]['success_ratio'])
def test_query_source(self):
"""
Snapshot filtering by source only
"""
self.setup_gridsite_environment()
snapshot_raw = self.app.get(
url="/snapshot?source_se=%s" % urllib.quote("srm://source.se", ""),
status=200
).json
self.assertEqual(3, len(snapshot_raw))
def test_query_destination(self):
"""
Snapshot filtering by destination only
"""
self.setup_gridsite_environment()
snapshot_raw = self.app.get(
url="/snapshot?dest_se=%s" % urllib.quote("srm://dest.es", ""),
status=200
).json
self.assertEqual(2, len(snapshot_raw))
def test_query_pair(self):
"""
Snapshot filtering by pair
"""
self.setup_gridsite_environment()
snapshot_raw = self.app.get(
url="/snapshot?source_se=%s&dest_se=%s" % (
urllib.quote("srm://source.se", ""), urllib.quote("srm://dest.es", "")
),
status=200
).json
self.assertEqual(2, len(snapshot_raw))
def test_query_triplet(self):
"""
Snapshot filtering by source, destination and vo
"""
self.setup_gridsite_environment()
snapshot_raw = self.app.get(
url="/snapshot?source_se=%s&dest_se=%s&vo_name=%s" % (
urllib.quote("srm://source.se", ""), urllib.quote("srm://dest.es", ""), "atlas"
),
status=200
).json
snapshot = _group_by_triplet(snapshot_raw)
self.assertEqual(1, len(snapshot_raw))
self.assertEqual(('srm://source.se', 'srm://dest.es', 'atlas'), snapshot.keys()[0])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment