Commit 60f6e45b authored by hamza's avatar hamza Committed by Alejandro Alvarez Ayllon
Browse files

FTS-316: New replica selection submission time

parent 196fb9b2
......@@ -212,6 +212,10 @@ cp --preserve=timestamps -r src/fts3 %{buildroot}/%{python_sitelib}
%{python_sitelib}/fts3rest/lib/middleware/fts3auth/*.py*
%{python_sitelib}/fts3rest/lib/middleware/fts3auth/methods/__init__.py*
%{python_sitelib}/fts3rest/lib/middleware/fts3auth/methods/ssl.py*
%{python_sitelib}/fts3rest/lib/scheduler/schd.py*
%{python_sitelib}/fts3rest/lib/scheduler/db.py*
%{python_sitelib}/fts3rest/lib/scheduler/Cache.py*
%{python_sitelib}/fts3rest/lib/scheduler/__init__.py*
%{python_sitelib}/fts3rest/model/
......
......@@ -17,6 +17,8 @@ import random
import socket
import types
import uuid
import logging
from datetime import datetime
from sqlalchemy import func
from urlparse import urlparse
......@@ -25,6 +27,11 @@ from fts3.model import File, BannedSE
from fts3rest.lib.base import Session
from fts3rest.lib.http_exceptions import *
from fts3rest.lib.scheduler.schd import Scheduler
from fts3rest.lib.scheduler.db import Database
from fts3rest.lib.scheduler.Cache import ThreadLocalCache
log = logging.getLogger(__name__)
DEFAULT_PARAMS = {
'bring_online': -1,
......@@ -46,7 +53,8 @@ DEFAULT_PARAMS = {
def get_storage_element(uri):
"""
Returns the storage element of the given uri, which is the scheme + hostname without the port
Returns the storage element of the given uri, which is the scheme +
hostname without the port
Args:
uri: An urlparse instance
......@@ -75,7 +83,8 @@ def _safe_flag(flag):
1/0 => True/False
'Y'/'N' => True/False
"""
if isinstance(flag, types.StringType) or isinstance(flag, types.UnicodeType):
if isinstance(flag, types.StringType) or isinstance(flag,
types.UnicodeType):
return len(flag) > 0 and flag[0].upper() == 'Y'
else:
return bool(flag)
......@@ -94,7 +103,8 @@ def _generate_hashed_id():
"""
Generates a uniformly distributed value between 0 and 2**16
This is intended to split evenly the load across node
The name is an unfortunately legacy from when this used to be based on a hash on the job
The name is an unfortunately legacy from when this used to
be based on a hash on the job
"""
return random.randint(0, (2 ** 16) - 1)
......@@ -102,8 +112,8 @@ def _generate_hashed_id():
def _has_multiple_options(files):
"""
Returns a tuple (Boolean, Integer)
Boolean is True if there are multiple replica entries, and Integer holds the number
of unique files.
Boolean is True if there are multiple replica entries, and Integer
holds the number of unique files.
"""
ids = map(lambda f: f['file_index'], files)
id_count = len(ids)
......@@ -111,27 +121,72 @@ def _has_multiple_options(files):
return unique_id_count != id_count, unique_id_count
def _select_best_replica(files, vo_name, entry_state='SUBMITTED'):
"""
Given a list of files (that must be multiple replicas for the same file) mark as submitted
the best one
"""
def _select_best_replica(files, vo_name, entry_state, strategy):
dst = files[0]['dest_se']
activity = files[0]['activity']
user_filesize = files[0]['user_filesize']
queue_provider = Database(Session)
cache_provider = ThreadLocalCache(queue_provider)
# s = Scheduler(queue_provider)
s = Scheduler (cache_provider)
source_se_list = map(lambda f: f['source_se'], files)
queue_sizes = Session.query(File.source_se, func.count(File.source_se))\
.filter(File.vo_name == vo_name)\
.filter(File.file_state == 'SUBMITTED')\
.filter(File.dest_se == files[0]['dest_se'])\
.filter(File.source_se.in_(source_se_list))\
.group_by(File.source_se)
best_ses = map(lambda elem: elem[0], sorted(queue_sizes, key=lambda elem: elem[1]))
if strategy == "orderly":
best_ses = source_se_list
elif strategy == "queue" or strategy == "auto":
best_ses = map(lambda x: x[0], s.rank_submitted(source_se_list,
dst,
vo_name))
elif strategy == "success":
best_ses = map(lambda x: x[0], s.rank_success_rate(source_se_list,
dst))
elif strategy == "throughput":
best_ses = map(lambda x: x[0], s.rank_throughput(source_se_list,
dst))
elif strategy == "file-throughput":
best_ses = map(lambda x: x[0], s.rank_per_file_throughput(
source_se_list,
dst))
elif strategy == "pending-data":
best_ses = map(lambda x: x[0], s.rank_pending_data(source_se_list,
dst,
vo_name,
activity))
elif strategy == "waiting-time":
best_ses = map(lambda x: x[0], s.rank_waiting_time(source_se_list,
dst,
vo_name,
activity))
elif strategy == "waiting-time-with-error":
best_ses = map(lambda x: x[0], s.rank_waiting_time_with_error(
source_se_list,
dst,
vo_name,
activity))
elif strategy == "duration":
best_ses = map(lambda x: x[0], s.rank_finish_time(source_se_list,
dst,
vo_name,
activity,
user_filesize))
else:
log.info(strategy + " algorithm is not supported by Scheduler")
log.info("'auto' algorithm is invoked")
best_ses = map(lambda x: x[0], s.rank_submitted(source_se_list,
dst,
vo_name))
best_index = 0
for index, transfer in enumerate(files):
# If not in the result set, the queue is empty, so finish here
if transfer['source_se'] not in best_ses:
best_index = index
break
# So far this looks good, but keep looking, in case some other has nothing at all
if transfer['source_se'] == best_ses[0]:
best_index = index
files[best_index]['file_state'] = entry_state
......@@ -293,10 +348,9 @@ class JobBuilder(object):
"""
On multiple-replica jobs, select the adecuate file to go active
"""
if self.files[0].get('selection_strategy', 'auto') == 'auto':
_select_best_replica(self.files, self.user.vos[0])
else:
self.files[0]['file_state'] = 'STAGING' if self.is_bringonline else 'SUBMITTED'
entry_state = "STAGING" if self.is_bringonline else "SUBMITTED"
_select_best_replica(self.files, self.user.vos[0], entry_state,
self.files[0].get('selection_strategy', 'auto'))
def _populate_transfers(self, files_list):
"""
......@@ -510,4 +564,3 @@ class JobBuilder(object):
raise HTTPBadRequest('Malformed request: %s' % str(e))
except KeyError, e:
raise HTTPBadRequest('Missing parameter: %s' % str(e))
import threading
import hashlib
import logging
from datetime import datetime
log = logging.getLogger(__name__)
threadLocal = threading.local()
class ThreadLocalCache:
"""
ThreadLocalCache class provides an in memory cache for each thread
"""
initialized = False
# Run cache clean_cleanup after every 5 mins (1800 secs)
cache_refresh_time = 1800
# Expire cache entry after 5 mins (300 secs)
cache_entry_life = 300
def __init__(self, queue_provider):
self.queue_provider = queue_provider
if getattr(threadLocal, 'initialized', None) is None:
ThreadLocalCache.init_cache()
threadLocal.initialized = True
@staticmethod
def init_cache():
"""
Maintain a separate cache for submitted, success, throughput etc
"""
threadLocal.creation_time = datetime.utcnow()
threadLocal.submitted_dict = {}
threadLocal.success_dict = {}
threadLocal.throughput_dict = {}
threadLocal.per_file_throughput_dict = {}
threadLocal.pending_data_dict = {}
@staticmethod
def get_seconds_elapsed(time_diff):
seconds = (time_diff.days * 86400) + time_diff.seconds
return seconds
@staticmethod
def check_expiry(t, entry_life):
secs = ThreadLocalCache.get_seconds_elapsed(datetime.utcnow() - t)
return True if secs > entry_life else False
@staticmethod
def get_key(*args):
key = 0
for x in args:
key += hash(x)
return key
@staticmethod
def cache_cleanup():
threadLocal.creation_time = datetime.utcnow()
# Get dictionaries from threadLocal
dict_list = []
for attr in dir(threadLocal):
if attr.endswith("_dict"):
dict_list.append(getattr(threadLocal, attr, None))
# Remove expired entries from cache
for _dict in dict_list:
for key in _dict.keys():
if ThreadLocalCache.check_expiry(_dict[key][1],
ThreadLocalCache.cache_entry_life):
del _dict[key]
@staticmethod
def cache_wrapper(dict_name, func, *args):
"""
cache_wrapper gets info from cache, in case the cache entry is expired
or not present in cache, FTS db is queried to update the cache.
All expired entries from cache are eventually removed after the
cache_refresh_time expires
"""
val = []
thread_dict = getattr(threadLocal, dict_name, None)
key = ThreadLocalCache.get_key(*args)
if ThreadLocalCache.check_expiry(threadLocal.creation_time,
ThreadLocalCache.cache_refresh_time):
ThreadLocalCache.cache_cleanup()
if key not in thread_dict:
val.append(func(*args))
val.append(datetime.utcnow())
thread_dict[key] = val
else:
val = thread_dict[key]
if ThreadLocalCache.check_expiry(val[1],
ThreadLocalCache.cache_entry_life):
val = []
val.append(func(*args))
val.append(datetime.utcnow())
thread_dict[key] = val
return val[0]
def get_submitted(self, src, dst, vo):
return ThreadLocalCache.cache_wrapper('submitted_dict',
self.queue_provider.get_submitted,
src, dst, vo)
def get_success_rate(self, src, dst):
return ThreadLocalCache.cache_wrapper('success_dict',
self.queue_provider.get_success_rate,
src, dst)
def get_throughput(self, src, dst):
return ThreadLocalCache.cache_wrapper('throughput_dict',
self.queue_provider.get_throughput,
src, dst)
def get_per_file_throughput(self, src, dst):
return ThreadLocalCache.cache_wrapper('per_file_throughput_dict',
self.queue_provider.get_per_file_throughput,
src, dst)
def get_pending_data(self, src, dst, vo, user_activity):
return ThreadLocalCache.cache_wrapper('pending_data_dict',
self.queue_provider.get_pending_data,
src, dst, vo, user_activity)
import urllib
import json
import operator
import logging
from fts3.model import File
from fts3.model import OptimizerEvolution
from fts3.model import ActivityShare
from sqlalchemy import func
from datetime import datetime
from datetime import timedelta
log = logging.getLogger(__name__)
class Database:
"""
Database class queries information from FTS3 DB using sqlalchemy
"""
def __init__(self, session):
self.session = session
def get_submitted(self, src, dst, vo):
"""
Returns the number of submitted files for a given src, dst and vo.
"""
queue = self.session.query(func.count(File.source_se))\
.filter(File.vo_name == vo)\
.filter(File.file_state == 'SUBMITTED')\
.filter(File.dest_se == dst)\
.filter(File.source_se == src)
submitted = 0 if queue is None else queue[0][0]
return submitted
def get_success_rate(self, src, dst):
"""
Returns the success rate for a given src, dst pair in the last hour
"""
sum = 0
arr = self.session.query(OptimizerEvolution.success)\
.filter(OptimizerEvolution.source_se == src)\
.filter(OptimizerEvolution.dest_se == dst)\
.filter(OptimizerEvolution.datetime >=
(datetime.utcnow() - timedelta(hours=1)))
size = 0
for x in arr:
sum += x[0]
size += 1
return 100 if (sum == 0) else (sum / size)
def get_throughput(self, src, dst):
"""
Returns the throughput infomation in the last hour for a src, dst pair.
"""
total_throughput = 0
size = 0
for tp, active in self.session.query\
(OptimizerEvolution.throughput, OptimizerEvolution.active)\
.filter(OptimizerEvolution.source_se == src)\
.filter(OptimizerEvolution.dest_se == dst)\
.filter(OptimizerEvolution.datetime >=
(datetime.utcnow() - timedelta(hours=1))):
total_throughput += tp * active
size += 1
if size == 0:
return 0
else:
return (total_throughput/size)
def get_per_file_throughput(self, src, dst):
"""
Returns the per file throughput info in the last hour for a given src
dst pair
"""
throughput = 0
size = 0
for per_file_throughput in self.session.query\
(OptimizerEvolution.throughput)\
.filter(OptimizerEvolution.source_se == src)\
.filter(OptimizerEvolution.dest_se == dst)\
.filter(OptimizerEvolution.datetime >=
(datetime.utcnow() - timedelta(hours=1))):
throughput += per_file_throughput[0]
size += 1
if size == 0:
return 0
else:
return (throughput/size)
def get_pending_data(self, src, dst, vo, user_activity):
"""
Returns the pending data in the queue for a given src dst pair.
Pending data is aggregated from all activities with priorities >=
to the user_activity's priority. Only Atlas mentions the ActivityShare.
"""
share = self.session.query(ActivityShare).get(vo)
total_pending_data = 0
if share is None:
for data in self.session.query(File.user_filesize)\
.filter(File.source_se == src)\
.filter(File.dest_se == dst)\
.filter(File.vo_name == vo)\
.filter(File.file_state == 'SUBMITTED'):
total_pending_data += data[0]
else:
activities = json.loads(share.activity_share)
for key in activities.keys():
if activities.get(key) >= activities.get(user_activity):
for data in self.session.query(File.user_filesize)\
.filter(File.source_se == src)\
.filter(File.dest_se == dst)\
.filter(File.vo_name == vo)\
.filter(File.activity == key)\
.filter(File.file_state == 'SUBMITTED'):
total_pending_data += data[0]
return total_pending_data
import operator
import logging
log = logging.getLogger(__name__)
class Scheduler:
"""
The scheduler class is used to rank the source sites based on a number
of factors e.g queued files, success rate etc.
If the throughput is 0 for a given src dst pair, we should select this
source site, in this way we can probe the network to get throughput info
for future transfers
"""
def __init__(self, cls):
"""
cls is the querying mechanism, it can either be a cache or a database
impelmentation.
Using a caching implementation with scheduler:
queue_provider = Database(Session)
cache_provider = ThreadLocalCache(queue_provider)
s = Scheduler (cache_provider)
Using a direct database implementation with scheduler:
queue_provider = Database(Session)
s = Scheduler (queue_provider)
"""
self.cls = cls
@staticmethod
def select_source(source, throughput):
myList = []
myList.append((source,throughput))
return myList
def rank_submitted(self, sources, dst, vo):
"""
Ranks the source sites based on the number of pending files
in the queue
"""
ranks = []
for src in sources:
ranks.append((src, self.cls.get_submitted(src, dst, vo)))
return sorted(ranks, key=operator.itemgetter(1))
def rank_success_rate(self, sources, dst):
"""
Ranks the source sites based on the success rate of the transfers
in the last 1 hour
"""
ranks = []
for src in sources:
ranks.append((src, self.cls.get_success_rate(src, dst)))
return sorted(ranks, key=operator.itemgetter(1), reverse=True)
def rank_throughput(self, sources, dst):
"""
Ranks the source sites based on the total throughput rate between
a source destination pair in the last 1 hour
"""
ranks = []
for src in sources:
throughput = self.cls.get_throughput(src, dst)
if throughput == 0:
return Scheduler.select_source(src, throughput)
ranks.append((src, throughput))
return sorted(ranks, key=operator.itemgetter(1), reverse=True)
def rank_per_file_throughput(self, sources, dst):
"""
Ranks the source sites based on the per file throughput rate between
a source destination pair in the last 1 hour
"""
ranks = []
for src in sources:
per_file_throughput = self.cls.get_per_file_throughput(src, dst)
if per_file_throughput == 0:
return Scheduler.select_source(src, per_file_throughput)
ranks.append((src, per_file_throughput))
return sorted(ranks, key=operator.itemgetter(1), reverse=True)
def rank_pending_data(self, sources, dst, vo, user_activity):
"""
Ranks the source sites based on the total pending data in the queue
between a source destination pair. Pending data is the aggregated
amount of data from all activites with priorities >= to the
user_activities's priority
"""
ranks = []
for src in sources:
ranks.append((src, self.cls.get_pending_data(src, dst, vo,
user_activity)))
return sorted(ranks, key=operator.itemgetter(1))
def rank_waiting_time(self, sources, dst, vo, user_activity):
"""
Ranks the source sites based on the waiting time for the incoming
job in the queue
"""
ranks = []
for src in sources:
pending_data = self.cls.get_pending_data(src, dst, vo,
user_activity)
throughput = self.cls.get_throughput(src, dst)
if throughput == 0:
return Scheduler.select_source(src, throughput)
waiting_time = pending_data / throughput
ranks.append((src, waiting_time))
return sorted(ranks, key=operator.itemgetter(1))
def rank_waiting_time_with_error(self, sources, dst, vo, user_activity):
"""
Using the failure rate info, calculate the amount of data that will
be resent. Rank based on the waiting time plus the time for resending
failed data
"""
ranks = []
for src in sources:
pending_data = self.cls.get_pending_data(src, dst, vo,
user_activity)
throughput = self.cls.get_throughput(src, dst)
if throughput == 0:
return Scheduler.select_source(src, throughput)
waiting_time = pending_data / throughput
failure_rate = 100 - self.cls.get_success_rate(src, dst)
error = failure_rate * waiting_time / 100
wait_time_with_error = waiting_time + error
ranks.append((src, wait_time_with_error))
return sorted(ranks, key=operator.itemgetter(1))
def rank_finish_time(self, sources, dst, vo, user_activity,
user_file_size):
"""
Ranks the source sites based on the waiting time with error plus the
time required to transfer the file
"""
ranks = []
for src in sources:
pending_data = self.cls.get_pending_data(src, dst, vo,
user_activity)
throughput = self.cls.get_throughput(src, dst)
if throughput == 0:
return Scheduler.select_source(src, throughput)
waiting_time = pending_data / throughput
failure_rate = 100 - self.cls.get_success_rate(src, dst)
error = failure_rate * waiting_time / 100
wait_time_with_error = waiting_time + error
file_throughput = self.cls.get_per_file_throughput(src, dst)
file_transfer_time = (user_file_size/1024/1024) / file_throughput
finish_time = wait_time_with_error + file_transfer_time
ranks.append((src, finish_time))
return sorted(ranks, key=operator.itemgetter(1))
......@@ -91,88 +91,6 @@ class TestMultiple(TestController):
uniq_hashes = set(map(lambda f: f.hashed_id, db_job.files))
self.assertEqual(len(uniq_hashes), 1)