Commit 4e85c7ff authored by Alejandro Alvarez Ayllon's avatar Alejandro Alvarez Ayllon
Browse files

FTS-277: Multireplicas go into CANDIDATE

parent b81e3b01
......@@ -22,7 +22,7 @@ from sqlalchemy.orm import relation, backref
from base import Base, Json
FileActiveStates = ['SUBMITTED', 'READY', 'STARTED', 'ACTIVE', 'STAGING']
FileActiveStates = ['SUBMITTED', 'CANDIDATE', 'READY', 'STARTED', 'ACTIVE', 'STAGING']
FileTerminalStates = ['FINISHED', 'FAILED', 'CANCELED']
......
......@@ -60,7 +60,7 @@ def get_base_id():
def get_vo_id(vo_name):
log.debug("VO name: "+vo_name)
return uuid.uuid5(BASE_ID, vo_name.encode('utf-8'))
def get_storage_element(uri):
"""
Returns the storage element of the given uri, which is the scheme +
......@@ -131,79 +131,6 @@ def _has_multiple_options(files):
return unique_id_count != id_count, unique_id_count
def _select_best_replica(files, vo_name, entry_state, strategy):
dst = files[0]['dest_se']
activity = files[0]['activity']
user_filesize = files[0]['user_filesize']
queue_provider = Database(Session)
cache_provider = ThreadLocalCache(queue_provider)
# s = Scheduler(queue_provider)
s = Scheduler (cache_provider)
source_se_list = map(lambda f: f['source_se'], files)
if strategy == "orderly":
sorted_ses = source_se_list
elif strategy == "queue" or strategy == "auto":
sorted_ses = map(lambda x: x[0], s.rank_submitted(source_se_list,
dst,
vo_name))
elif strategy == "success":
sorted_ses = map(lambda x: x[0], s.rank_success_rate(source_se_list,
dst))
elif strategy == "throughput":
sorted_ses = map(lambda x: x[0], s.rank_throughput(source_se_list,
dst))
elif strategy == "file-throughput":
sorted_ses = map(lambda x: x[0], s.rank_per_file_throughput(
source_se_list,
dst))
elif strategy == "pending-data":
sorted_ses = map(lambda x: x[0], s.rank_pending_data(source_se_list,
dst,
vo_name,
activity))
elif strategy == "waiting-time":
sorted_ses = map(lambda x: x[0], s.rank_waiting_time(source_se_list,
dst,
vo_name,
activity))
elif strategy == "waiting-time-with-error":
sorted_ses = map(lambda x: x[0], s.rank_waiting_time_with_error(
source_se_list,
dst,
vo_name,
activity))
elif strategy == "duration":
sorted_ses = map(lambda x: x[0], s.rank_finish_time(source_se_list,
dst,
vo_name,
activity,
user_filesize))
else:
raise HTTPBadRequest(strategy + " algorithm is not supported by Scheduler")
# We got the storages sorted from better to worst following
# the chosen strategy.
# We need to find the file with the source matching that best_se
best_index = 0
best_se = sorted_ses[0]
for index, transfer in enumerate(files):
if transfer['source_se'] == best_se:
best_index = index
break
files[best_index]['file_state'] = entry_state
def _apply_banning(files):
"""
Query the banning information for all pairs, reject the job
......@@ -359,8 +286,9 @@ class JobBuilder(object):
if len(file_dict['sources']) > 1:
if self.is_bringonline:
raise HTTPBadRequest('Staging with multiple replicas is not allowed')
# On multiple replica job, we mark all files initially with NOT_USED
initial_file_state = 'NOT_USED'
# On multiple replica job, we mark all files initially with CANDIDATE, which is
# like submitted, only they can transition into NOT_USED
initial_file_state = 'CANDIDATE'
# Multiple replicas, all must share the hashed-id
if shared_hashed_id is None:
shared_hashed_id = _generate_hashed_id()
......@@ -387,14 +315,6 @@ class JobBuilder(object):
)
self.files.append(f)
def _apply_selection_strategy(self):
"""
On multiple-replica jobs, select the adecuate file to go active
"""
entry_state = "STAGING" if self.is_bringonline else "SUBMITTED"
_select_best_replica(self.files, self.user.vos[0], entry_state,
self.files[0].get('selection_strategy', 'auto'))
def _populate_transfers(self, files_list):
"""
Initializes the list of transfers
......@@ -480,8 +400,6 @@ class JobBuilder(object):
if unique_files > 1:
raise HTTPBadRequest('Multiple replicas jobs can only have one unique file')
self.job['reuse_job'] = 'R'
# Apply selection strategy
self._apply_selection_strategy()
self._set_job_source_and_destination(self.files)
......
......@@ -73,28 +73,28 @@ class TestMultiple(TestController):
self.assertEqual(db_job.files[0].dest_surl, 'http://dest.ch:8447/file')
self.assertEqual(db_job.files[0].activity, 'something something')
self.assertEqual(db_job.files[0].file_metadata['mykey'], 'myvalue')
self.assertEqual(db_job.files[0].file_state, 'SUBMITTED')
self.assertEqual(db_job.files[0].file_state, 'CANDIDATE')
self.assertEqual(db_job.files[1].file_index, 0)
self.assertEqual(db_job.files[1].source_surl, 'http://source.es:8446/file')
self.assertEqual(db_job.files[1].dest_surl, 'root://dest.ch/file')
self.assertEqual(db_job.files[1].activity, 'something something')
self.assertEqual(db_job.files[1].file_metadata['mykey'], 'myvalue')
self.assertEqual(db_job.files[1].file_state, 'NOT_USED')
self.assertEqual(db_job.files[1].file_state, 'CANDIDATE')
self.assertEqual(db_job.files[2].file_index, 0)
self.assertEqual(db_job.files[2].source_surl, 'root://source.es/file')
self.assertEqual(db_job.files[2].dest_surl, 'http://dest.ch:8447/file')
self.assertEqual(db_job.files[2].activity, 'something something')
self.assertEqual(db_job.files[2].file_metadata['mykey'], 'myvalue')
self.assertEqual(db_job.files[2].file_state, 'NOT_USED')
self.assertEqual(db_job.files[2].file_state, 'CANDIDATE')
self.assertEqual(db_job.files[3].file_index, 0)
self.assertEqual(db_job.files[3].source_surl, 'root://source.es/file')
self.assertEqual(db_job.files[3].dest_surl, 'root://dest.ch/file')
self.assertEqual(db_job.files[3].activity, 'something something')
self.assertEqual(db_job.files[3].file_metadata['mykey'], 'myvalue')
self.assertEqual(db_job.files[3].file_state, 'NOT_USED')
self.assertEqual(db_job.files[3].file_state, 'CANDIDATE')
# Same file index, same hashed id
uniq_hashes = set(map(lambda f: f.hashed_id, db_job.files))
......@@ -141,14 +141,14 @@ class TestMultiple(TestController):
self.assertEqual(db_job.files[0].dest_surl, 'http://dest.ch/file')
self.assertEqual(db_job.files[0].activity, 'something something')
self.assertEqual(db_job.files[0].file_metadata['mykey'], 'myvalue')
self.assertEqual(db_job.files[0].file_state, 'SUBMITTED')
self.assertEqual(db_job.files[0].file_state, 'CANDIDATE')
self.assertEqual(db_job.files[1].file_index, 0)
self.assertEqual(db_job.files[1].source_surl, 'http://source.fr/file')
self.assertEqual(db_job.files[1].dest_surl, 'http://dest.ch/file')
self.assertEqual(db_job.files[1].activity, 'something something')
self.assertEqual(db_job.files[1].file_metadata['mykey'], 'myvalue')
self.assertEqual(db_job.files[1].file_state, 'NOT_USED')
self.assertEqual(db_job.files[1].file_state, 'CANDIDATE')
# Same file index, same hashed id
uniq_hashes = set(map(lambda f: f.hashed_id, db_job.files))
......
# Copyright notice:
# Copyright Members of the EMI Collaboration, 2013.
#
# See www.eu-emi.eu for details on the copyright holders
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import datetime
import logging
from fts3rest.tests import TestController
from fts3rest.lib.base import Session
from fts3rest.lib.scheduler.Cache import ThreadLocalCache
from fts3.model import Job, File, OptimizerEvolution, ActivityShare
log = logging.getLogger(__name__)
class TestScheduler(TestController):
"""
Test different selection strategies at submission time
"""
def setUp(self):
Session.query(OptimizerEvolution).delete()
Session.commit()
def tearDown(self):
Session.query(Job).delete()
Session.query(File).delete()
Session.query(OptimizerEvolution).delete()
Session.query(ActivityShare).delete()
Session.commit()
@staticmethod
def fill_file_queue(self):
for i in range(0, 15):
self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps({
'files': [{
'sources': ['http://site01.es/file%d' % i],
'destinations': ['http://dest.ch/file%d' % i],
'selection_strategy': 'orderly',
'filesize':4096,
'success':90
}]
}),
status=200
)
for i in range(0, 10):
self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps({
'files': [{
'sources': ['http://site02.ch/file%d' % i],
'destinations': ['http://dest.ch/file%d' % i],
'selection_strategy': 'orderly',
'filesize':2048,
'success':95
}]
}),
status=200
)
for i in range(0, 5):
self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps({
'files': [{
'sources': ['http://site03.fr/file%d' % i],
'destinations': ['http://dest.ch/file%d' % i],
'selection_strategy': 'orderly',
'filesize':1024,
'success':100
}]
}),
status=200
)
@staticmethod
def fill_optimizer():
evolution = OptimizerEvolution(
datetime=datetime.datetime.utcnow(),
source_se='http://site01.es',
dest_se='http://dest.ch',
success=90,
active=10,
throughput=10
)
Session.add(evolution)
evolution = OptimizerEvolution(
datetime=datetime.datetime.utcnow(),
source_se='http://site02.ch',
dest_se='http://dest.ch',
success=95,
active=10,
throughput=15
)
Session.add(evolution)
evolution = OptimizerEvolution(
datetime=datetime.datetime.utcnow(),
source_se='http://site03.fr',
dest_se='http://dest.ch',
success=100,
active=10,
throughput=20
)
Session.add(evolution)
Session.commit()
@staticmethod
def fill_activities():
activity = ActivityShare(
vo='testvo',
activity_share=json.dumps({
"data brokering": 0.3,
"data consolidation": 0.4,
"default": 0.02,
"express": 0.4,
"functional test": 0.2,
"production": 0.5,
"production input": 0.25,
"production output": 0.25,
"recovery": 0.4,
"staging": 0.5,
"t0 export": 0.7,
"t0 tape": 0.7,
"user subscriptions": 0.1
})
)
Session.add(activity)
Session.commit()
def submit_job(self, strategy):
job = {
'files': [
{
'sources': [
'http://site01.es/file',
'http://site02.ch/file',
'http://site03.fr/file'
],
'destinations': ['http://dest.ch/file'],
'selection_strategy': strategy,
'checksum': 'adler32:1234',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'}
}
],
'params': {'overwrite': True}
}
job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
return job_id
def validate(self, job_id, expected_submitted='http://site03.fr/file'):
db_job = Session.query(Job).get(job_id)
self.assertEqual(db_job.reuse_job, 'R')
self.assertEqual(len(db_job.files), 3)
for f in db_job.files:
self.assertEqual(f.file_index, 0)
self.assertEqual(f.dest_surl, 'http://dest.ch/file')
if f.source_surl == expected_submitted:
self.assertEqual(f.file_state, 'SUBMITTED')
else:
self.assertEqual(f.file_state, 'NOT_USED')
self.assertEqual(db_job.files[0].source_surl, 'http://site01.es/file')
# Same file index, same hashed id
uniq_hashes = set(map(lambda f: f.hashed_id, db_job.files))
self.assertEqual(len(uniq_hashes), 1)
def test_queue(self):
"""
Test the 'queue' algorithm
This algorithm must choose the pair with lest pending transfers
"""
self.setup_gridsite_environment()
self.push_delegation()
TestScheduler.fill_file_queue(self)
job_id = self.submit_job("queue")
self.validate(job_id)
job_id = self.submit_job("queue")
self.validate(job_id)
# Trigger a cache expiration
ThreadLocalCache.cache_cleanup()
job_id = self.submit_job("queue")
self.validate(job_id)
def test_success(self):
"""
Test the 'success' algorithm
This algorithm must choose the pair with highest success rate
"""
self.setup_gridsite_environment()
self.push_delegation()
TestScheduler.fill_optimizer()
job_id = self.submit_job("success")
self.validate(job_id)
def test_throughput(self):
"""
Test the 'throughput' algorithm
This algorithm must choose the pair with highest total throughput
"""
self.setup_gridsite_environment()
self.push_delegation()
TestScheduler.fill_optimizer()
job_id = self.submit_job("throughput")
self.validate(job_id)
def test_file_throughput(self):
"""
Test the 'file-throughput algorithm
This algorithm must choose the pair with highest throughput _per file_
"""
self.setup_gridsite_environment()
self.push_delegation()
TestScheduler.fill_optimizer()
job_id = self.submit_job("file-throughput")
self.validate(job_id)
def test_pending_data(self):
"""
Test the 'pending-data' algorihtm
This algorithm must choose the pair with less data to be transferred
(sum of the file sizes of the queued transfers)
"""
self.setup_gridsite_environment()
self.push_delegation()
TestScheduler.fill_activities()
TestScheduler.fill_file_queue(self)
job_id = self.submit_job("pending-data")
self.validate(job_id)
def test_waiting_time(self):
"""
Test the 'waiting-time' algorithm
This algorithm must choose the pair with less estimated waiting time
(pending data / total throughput)
"""
self.setup_gridsite_environment()
self.push_delegation()
TestScheduler.fill_activities()
TestScheduler.fill_optimizer()
TestScheduler.fill_file_queue(self)
job_id = self.submit_job("waiting-time")
self.validate(job_id)
def test_waiting_time_with_error(self):
"""
Test the 'waiting-time-with-error' algorihtm
This algorithm must choose the pair with less estimated waiting time,
penalized by its failure rate
"""
self.setup_gridsite_environment()
self.push_delegation()
TestScheduler.fill_activities()
TestScheduler.fill_optimizer()
TestScheduler.fill_file_queue(self)
job_id = self.submit_job("waiting-time-with-error")
self.validate(job_id)
def test_duration(self):
"""
Test the 'duration' algorithm
Similar to the 'waiting-time-with-error', but accounting for the file size
of the submitted transfer
"""
self.setup_gridsite_environment()
self.push_delegation()
TestScheduler.fill_activities()
TestScheduler.fill_optimizer()
TestScheduler.fill_file_queue(self)
job_id = self.submit_job("duration")
self.validate(job_id)
def test_invalid_strategy(self):
"""
Test a random strategy name, which must fail
"""
self.setup_gridsite_environment()
self.push_delegation()
job = {
'files': [
{
'sources': [
'http://site01.es/file',
'http://site02.ch/file',
'http://site03.fr/file'
],
'destinations': ['http://dest.ch/file'],
'selection_strategy': "YOLO",
'checksum': 'adler32:1234',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'}
}
],
'params': {'overwrite': True}
}
self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=400
)
def test_orderly(self):
"""
Test the 'orderly' algorithm
This isn't really an algorithm. Just choose the first pair as submitted.
"""
self.setup_gridsite_environment()
self.push_delegation()
TestScheduler.fill_activities()
TestScheduler.fill_optimizer()
TestScheduler.fill_file_queue(self)
job_id = self.submit_job("orderly")
self.validate(job_id, expected_submitted='http://site01.es/file')
def test_orderly_same_sources(self):
"""
Test the 'orderly' algorithm, but the same source appears more than once
This is a regression for FTS-323
"""
self.setup_gridsite_environment()
self.push_delegation()
TestScheduler.fill_activities()
TestScheduler.fill_optimizer()
TestScheduler.fill_file_queue(self)
job = {
'files': [
{
'sources': [
'http://site01.es/file',
'http://site02.ch/file',
'http://site01.es/file'
],
'destinations': ['http://dest.ch/file'],
'selection_strategy': 'orderly',
'checksum': 'adler32:1234',
'filesize': 1024,
'metadata': {'mykey': 'myvalue'}
}
],
'params': {'overwrite': True}
}
job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
files = Session.query(File).filter(File.job_id == job_id)
self.assertEqual('SUBMITTED', files[0].file_state)
self.assertEqual('NOT_USED', files[1].file_state)
self.assertEqual('NOT_USED', files[2].file_state)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment