Commit fe75817d authored by Alejandro Alvarez Ayllon's avatar Alejandro Alvarez Ayllon
Browse files

FTS-468: Allow suffixes for max_time_in_queue

s => seconds
m => minutes
h => hours
parent fcdcc719
......@@ -18,10 +18,6 @@ from fts3.rest.client import Submitter
from delegate import delegate
from fts3.rest.client import ClientError
class JobIdGenerator:
standard = 'standard' #Default algorithm using uuid1
deterministic = 'deterministic' #Deterministic algorithm using uuid5 with base_id+vo+sid given by the user
def cancel(context, job_id, file_ids=None):
"""
Cancels a job
......@@ -38,6 +34,21 @@ def cancel(context, job_id, file_ids=None):
return submitter.cancel(job_id, file_ids)
def cancel_all(context, vo_name=None):
"""
Cancel all jobs within a given VO or FTS3 (needs enough privileges)
Args:
context: fts3.rest.client.context.Context instance
vo_name: The VO name, or None to cancell all jobs
Returns:
None
"""
submitter = Submitter(context)
return submitter.cancel_all(vo_name)
def new_transfer(source, destination, checksum=None, filesize=None, metadata=None, activity=None):
"""
Creates a new transfer pair
......@@ -85,8 +96,7 @@ def add_alternative_source(transfer, alt_source):
def new_job(transfers=None, deletion=None, verify_checksum=True, reuse=False, overwrite=False, multihop=False,
source_spacetoken=None, spacetoken=None,
bring_online=None, copy_pin_lifetime=None,
retry=-1, retry_delay=0, metadata=None, priority=None, strict_copy=False,
id_generator=JobIdGenerator.standard, sid=None):
retry=-1, retry_delay=0, metadata=None, priority=None, strict_copy=False):
"""
Creates a new dictionary representing a job
......@@ -104,8 +114,6 @@ def new_job(transfers=None, deletion=None, verify_checksum=True, reuse=False, ov
retry: Number of retries: <0 is no retries, 0 is server default, >0 is whatever value is passed
metadata: Metadata to bind to the job
priority: Job priority
id_generator: Job id generator algorithm
sid: Specific id given by the client
Returns:
An initialized dictionary representing a job
......@@ -127,9 +135,7 @@ def new_job(transfers=None, deletion=None, verify_checksum=True, reuse=False, ov
retry=retry,
retry_delay=retry_delay,
priority=priority,
strict_copy=strict_copy,
id_generator=id_generator,
sid=sid
strict_copy=strict_copy
)
job = dict(
files=transfers,
......@@ -138,20 +144,18 @@ def new_job(transfers=None, deletion=None, verify_checksum=True, reuse=False, ov
)
return job
def new_staging_job(files, bring_online=None, copy_pin_lifetime=None, source_spacetoken=None, spacetoken=None, metadata=None, priority=None, id_generator=JobIdGenerator.standard, sid=None):
def new_staging_job(files, bring_online=None, copy_pin_lifetime=None, source_spacetoken=None, spacetoken=None, metadata=None, priority=None):
"""
Creates a new dictionary representing a staging job
Args:
files: Array of surls to stage. Each item can be either a string or a dictionary with keys surl and metadata
files: Array of surls to stage.
bring_online: Bring online timeout
copy_pin_lifetime: Pin lifetime
source_spacetoken: Source space token
spacetoken: Deletion spacetoken
metadata: Metadata to bind to the job
priority: Job priority
id_generator: Job id generator algorithm
sid: Specific id given by the client
Returns:
An initialized dictionary representing a staging job
......@@ -160,17 +164,8 @@ def new_staging_job(files, bring_online=None, copy_pin_lifetime=None, source_spa
raise ClientError('Bad request: bring_online and copy_pin_lifetime are not positive numbers')
transfers = []
for trans in files:
if isinstance(trans, dict):
surl=trans['surl']
meta=trans['metadata']
elif isinstance(trans, basestring):
surl=trans
meta=None
else:
raise AttributeError("Unexpected input type %s"%type(files))
transfers.append(new_transfer(source=surl, destination=surl, metadata=meta))
for i in range (0, len(files)-1):
transfers.append(new_transfer(source=files[i], destination=files[i]))
params = dict(
source_spacetoken=source_spacetoken,
......@@ -178,9 +173,7 @@ def new_staging_job(files, bring_online=None, copy_pin_lifetime=None, source_spa
bring_online=bring_online,
copy_pin_lifetime=copy_pin_lifetime,
job_metadata=metadata,
priority=priority,
id_generator=id_generator,
sid=sid
)
job = dict(
files=transfers,
......@@ -188,7 +181,7 @@ def new_staging_job(files, bring_online=None, copy_pin_lifetime=None, source_spa
)
return job
def new_delete_job(files, spacetoken=None, metadata=None, priority=None, id_generator=JobIdGenerator.standard, sid=None):
def new_delete_job(files, spacetoken=None, metadata=None):
"""
Creates a new dictionary representing a deletion job
......@@ -196,17 +189,13 @@ def new_delete_job(files, spacetoken=None, metadata=None, priority=None, id_gene
files: Array of surls to delete. Each item can be either a string or a dictionary with keys surl and metadata
spacetoken: Deletion spacetoken
metadata: Metadata to bind to the job
id_generator: Job id generator algorithm
sid: Specific id given by the client
Return
An initialized dictionary representing a deletion job
"""
params = dict(
source_spacetoken=spacetoken,
job_metadata=metadata,
priority=priority,
id_generator=id_generator,
sid=sid
job_metadata=metadata
)
job = dict(
delete=files,
......@@ -232,4 +221,3 @@ def submit(context, job, delegation_lifetime=timedelta(hours=7), force_delegatio
submitter = Submitter(context)
params = job.get('params', {})
return submitter.submit(transfers=job.get('files', None), delete=job.get('delete', None), staging=job.get('staging', None), **params)
......@@ -19,7 +19,8 @@ try:
import simplejson as json
except:
import json
class Submitter(object):
def __init__(self, context):
......@@ -65,3 +66,9 @@ class Submitter(object):
return json.loads(self.context.delete('/jobs/%s/files/%s' % (job_id, file_ids_str)))
else:
return json.loads(self.context.delete('/jobs/%s' % job_id))
def cancel_all(self, vo = None):
if vo is None:
return json.loads(self.context.delete('/jobs/all'))
else:
return json.loads(self.context.delete('/jobs/vo/%s' % vo))
......@@ -241,6 +241,32 @@ def _apply_banning(files):
f['wait_timeout'] = timeout
def _seconds_from_value(value):
"""
Transform an interval value to seconds
If value is an integer, assume it is hours (backwards compatibility)
Otherwise, look at the suffix
"""
if isinstance(value, int) and value != 0:
return value * 3600
elif not isinstance(value, basestring):
return None
try:
suffix = value[-1].lower()
value = value[:-1]
if suffix == 's':
return int(value)
elif suffix == 'm':
return int(value) * 60
elif suffix == 'h':
return int(value) * 3600
else:
return None
except:
return None
class JobBuilder(object):
"""
From a dictionary, build the internal representation of Job, Files and
......@@ -377,10 +403,10 @@ class JobBuilder(object):
job_initial_state = 'STAGING' if self.is_bringonline else 'SUBMITTED'
max_time_in_queue = int(self.params['max_time_in_queue'])
max_time_in_queue = _seconds_from_value(self.params.get('max_time_in_queue', None))
expiration_time = None
if max_time_in_queue > 0:
expiration_time = time.time() + (max_time_in_queue * 60 * 60)
if max_time_in_queue is not None:
expiration_time = time.time() + max_time_in_queue
self.job = dict(
job_id=self.job_id,
......
......@@ -742,6 +742,62 @@ class TestJobSubmission(TestController):
self.assertGreater(job.max_time_in_queue, time.time())
self.assertLessEqual(job.max_time_in_queue, (8*60*60) + time.time())
def test_submit_max_time_in_queue_suffix(self):
"""
Submits a job specifying the maximum time it should stay in the queue.
Use a suffix.
"""
self.setup_gridsite_environment()
self.push_delegation()
job = {
'files': [{
'sources': ['http://source.es:8446/file'],
'destinations': ['root://dest.ch:8447/file'],
}],
'params': {
'max_time_in_queue': '4s'
}
}
job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
job = Session.query(Job).get(job_id)
self.assertGreater(job.max_time_in_queue, time.time())
self.assertLessEqual(job.max_time_in_queue, 8 + time.time())
def test_submit_max_time_in_queue_suffix2(self):
"""
Submits a job specifying the maximum time it should stay in the queue.
Use a suffix.
"""
self.setup_gridsite_environment()
self.push_delegation()
job = {
'files': [{
'sources': ['http://source.es:8446/file'],
'destinations': ['root://dest.ch:8447/file'],
}],
'params': {
'max_time_in_queue': '2m'
}
}
job_id = self.app.post(
url="/jobs",
content_type='application/json',
params=json.dumps(job),
status=200
).json['job_id']
job = Session.query(Job).get(job_id)
self.assertGreater(job.max_time_in_queue, time.time())
self.assertLessEqual(job.max_time_in_queue, 120 + time.time())
def test_submit_ipv4(self):
"""
Submit a job with IPv4 only
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment