JobBuilder.py 25.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#   Copyright notice:
#   Copyright CERN, 2015.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

16
import logging
17
18
import random
import socket
19
import time
20
21
import types
import uuid
22
import pylons
23

24
25
26
27
28
29
30
from datetime import datetime
from urlparse import urlparse

from fts3.model import File, BannedSE
from fts3rest.lib.base import Session
from fts3rest.lib.http_exceptions import *

31
32
33
from fts3rest.lib.scheduler.schd import Scheduler
from fts3rest.lib.scheduler.db import Database
from fts3rest.lib.scheduler.Cache import ThreadLocalCache
34

35
36

log = logging.getLogger(__name__)
37

38
39
BASE_ID = uuid.UUID('urn:uuid:01874efb-4735-4595-bc9c-591aef8240c9')

40
41
DEFAULT_PARAMS = {
    'bring_online': -1,
42
    'verify_checksum': False,
43
44
45
46
    'copy_pin_lifetime': -1,
    'gridftp': '',
    'job_metadata': None,
    'overwrite': False,
47
    'reuse': None,
48
49
50
51
    'multihop': False,
    'source_spacetoken': '',
    'spacetoken': '',
    'retry': 0,
52
    'retry_delay': 0,
53
    'priority': 3,
54
55
    'max_time_in_queue': 0,
    's3alternate': False,
56
57
}

58
def get_base_id():
59
    return BASE_ID
60

61
def get_vo_id(vo_name):
62
63
    log.debug("VO name: "+vo_name)
    return uuid.uuid5(BASE_ID, vo_name.encode('utf-8'))
64
    
65
def get_storage_element(uri):
66
    """
67
68
    Returns the storage element of the given uri, which is the scheme +
    hostname without the port
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

    Args:
        uri: An urlparse instance
    """
    return "%s://%s" % (uri.scheme, uri.hostname)


def _validate_url(url):
    """
    Validates the format and content of the url
    """
    if not url.scheme:
        raise ValueError('Missing scheme (%s)' % url.geturl())
    if url.scheme == 'file':
        raise ValueError('Can not transfer local files (%s)' % url.geturl())
    if not url.path or (url.path == '/' and not url.query):
        raise ValueError('Missing path (%s)' % url.geturl())
86
    if not url.hostname:
87
88
89
90
91
92
93
94
95
96
        raise ValueError('Missing host (%s)' % url.geturl())


def _safe_flag(flag):
    """
    Traduces from different representations of flag values to True/False
    True/False => True/False
    1/0 => True/False
    'Y'/'N' => True/False
    """
97
98
    if isinstance(flag, types.StringType) or isinstance(flag,
                                                        types.UnicodeType):
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
        return len(flag) > 0 and flag[0].upper() == 'Y'
    else:
        return bool(flag)


def _safe_filesize(size):
    if isinstance(size, float):
        return size
    elif size is None:
        return 0.0
    else:
        return float(size)


def _generate_hashed_id():
    """
    Generates a uniformly distributed value between 0 and 2**16
    This is intended to split evenly the load across node
117
118
    The name is an unfortunately legacy from when this used to
    be based on a hash on the job
119
120
121
122
123
124
125
    """
    return random.randint(0, (2 ** 16) - 1)


def _has_multiple_options(files):
    """
    Returns a tuple (Boolean, Integer)
126
127
    Boolean is True if there are multiple replica entries, and Integer
    holds the number of unique files.
128
129
130
131
132
133
134
    """
    ids = map(lambda f: f['file_index'], files)
    id_count = len(ids)
    unique_id_count = len(set(ids))
    return unique_id_count != id_count, unique_id_count


135
136
137
138
139
140
141
142
143
144
def _select_best_replica(files, vo_name, entry_state, strategy):

    dst = files[0]['dest_se']
    activity = files[0]['activity']
    user_filesize = files[0]['user_filesize']

    queue_provider = Database(Session)
    cache_provider = ThreadLocalCache(queue_provider)
    # s = Scheduler(queue_provider)
    s = Scheduler (cache_provider)
145
    source_se_list = map(lambda f: f['source_se'], files)
146
147

    if strategy == "orderly":
148
        sorted_ses = source_se_list
149
150

    elif strategy == "queue" or strategy == "auto":
151
        sorted_ses = map(lambda x: x[0], s.rank_submitted(source_se_list,
152
153
154
155
                                                        dst,
                                                        vo_name))

    elif strategy == "success":
156
        sorted_ses = map(lambda x: x[0], s.rank_success_rate(source_se_list,
157
158
159
                                                           dst))

    elif strategy == "throughput":
160
        sorted_ses = map(lambda x: x[0], s.rank_throughput(source_se_list,
161
162
163
                                                         dst))

    elif strategy == "file-throughput":
164
        sorted_ses = map(lambda x: x[0], s.rank_per_file_throughput(
165
166
167
168
                                                           source_se_list,
                                                           dst))

    elif strategy == "pending-data":
169
        sorted_ses = map(lambda x: x[0], s.rank_pending_data(source_se_list,
170
171
172
173
174
                                                           dst,
                                                           vo_name,
                                                           activity))

    elif strategy == "waiting-time":
175
        sorted_ses = map(lambda x: x[0], s.rank_waiting_time(source_se_list,
176
177
178
179
180
                                                           dst,
                                                           vo_name,
                                                           activity))

    elif strategy == "waiting-time-with-error":
181
        sorted_ses = map(lambda x: x[0], s.rank_waiting_time_with_error(
182
183
184
185
186
187
                                                               source_se_list,
                                                               dst,
                                                               vo_name,
                                                               activity))

    elif strategy == "duration":
188
        sorted_ses = map(lambda x: x[0], s.rank_finish_time(source_se_list,
189
190
191
192
193
                                                          dst,
                                                          vo_name,
                                                          activity,
                                                          user_filesize))
    else:
194
195
        raise HTTPBadRequest(strategy + " algorithm is not supported by Scheduler")

196
197
198
    # We got the storages sorted from better to worst following
    # the chosen strategy.
    # We need to find the file with the source matching that best_se
199
    best_index = 0
200
    best_se = sorted_ses[0]
201
    for index, transfer in enumerate(files):
202
        if transfer['source_se'] == best_se:
203
            best_index = index
204
            break
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
    files[best_index]['file_state'] = entry_state


def _apply_banning(files):
    """
    Query the banning information for all pairs, reject the job
    as soon as one SE can not submit.
    Update wait_timeout and wait_timestamp is there is a hit
    """
    # Usually, banned SES will be in the order of ~100 max
    # Files may be several thousands
    # We get all banned in memory so we avoid querying too many times the DB
    # We then build a dictionary to make look up easy
    banned_ses = dict()
    for b in Session.query(BannedSE):
220
        banned_ses[str(b.se)] = (b.vo, b.status)
221
222
223
224

    for f in files:
        source_banned = banned_ses.get(str(f['source_se']), None)
        dest_banned = banned_ses.get(str(f['dest_se']), None)
225
        banned = False
226

227
        if source_banned and (source_banned[0] == f['vo_name'] or source_banned[0] == '*'):
228
229
            if source_banned[1] != 'WAIT_AS':
                raise HTTPForbidden("%s is banned" % f['source_se'])
230
            banned = True
231

232
        if dest_banned and (dest_banned[0] == f['vo_name'] or dest_banned[0] == '*'):
233
234
            if dest_banned[1] != 'WAIT_AS':
                raise HTTPForbidden("%s is banned" % f['dest_se'])
235
236
237
238
239
240
241
242
243
            banned = True

        if banned:
            if f['file_state'] == 'SUBMITTED':
                f['file_state'] = 'ON_HOLD'
            elif f['file_state'] == 'STAGING':
                f['file_state'] = 'ON_HOLD_STAGING'
            elif f['file_state'] == 'DELETE':
                continue
244
            else:
245
                HTTPInternalServerError('Unexpected initial state: %s' % f['file_state'])
246
247


248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def _seconds_from_value(value):
    """
    Transform an interval value to seconds
    If value is an integer, assume it is hours (backwards compatibility)
    Otherwise, look at the suffix
    """
    if isinstance(value, int) and value != 0:
        return value * 3600
    elif not isinstance(value, basestring):
        return None

    try:
        suffix = value[-1].lower()
        value = value[:-1]
        if suffix == 's':
            return int(value)
        elif suffix == 'm':
            return int(value) * 60
        elif suffix == 'h':
            return int(value) * 3600
        else:
            return None
    except:
        return None


274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
class JobBuilder(object):
    """
    From a dictionary, build the internal representation of Job, Files and
    Data Management
    """

    def _get_params(self, submitted_params):
        """
        Returns proper parameters applying defaults and user values
        """
        params = dict()
        params.update(DEFAULT_PARAMS)
        params.update(submitted_params)
        # Some parameters may be explicitly None to pick the default, so re-apply defaults
        for k, v in params.iteritems():
            if v is None and k in DEFAULT_PARAMS:
                params[k] = DEFAULT_PARAMS[k]
        return params

    def _build_internal_job_params(self):
        """
        Generates the value for job.internal_job_params depending on the
        received protocol parameters
        """
        param_list = list()
299
        if self.params.get('nostreams', None):
300
            param_list.append("nostreams:%d" % int(self.params['nostreams']))
301
        if self.params.get('timeout', None):
302
            param_list.append("timeout:%d" % int(self.params['timeout']))
303
        if self.params.get('buffer_size', None):
304
            param_list.append("buffersize:%d" % int(self.params['buffer_size']))
305
        if self.params.get('strict_copy', False):
306
            param_list.append("strict")
307
308
309
310
        if self.params.get('ipv4', False):
            param_list.append('ipv4')
        elif self.params.get('ipv6', False):
            param_list.append('ipv6')
311
312
        if self.params.get('s3alternate', False):
            param_list.append('s3alternate')
313

314
315
316
317
318
319
320
321
322
323
324
        if len(param_list) == 0:
            return None
        else:
            return ','.join(param_list)

    def _set_job_source_and_destination(self, entries):
        """
        Iterates through the files that belong to the job, and determines the
        'overall' job source and destination Storage Elements
        """
        # Multihop
325
        if self.job['job_type'] == 'H':
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
            self.job['source_se'] = entries[0]['source_se']
            self.job['dest_se'] = entries[-1]['dest_se']
        # Regular transfers
        else:
            self.job['source_se'] = entries[0]['source_se']
            self.job['dest_se'] = entries[0]['dest_se']
            for elem in entries:
                if elem['source_se'] != self.job['source_se']:
                    self.job['source_se'] = None
                if elem['dest_se'] != self.job['dest_se']:
                    self.job['dest_se'] = None

    def _populate_files(self, file_dict, f_index, shared_hashed_id):
        """
        From the dictionary file_dict, generate a list of transfers for a job
        """
        # Extract matching pairs
        pairs = []
        for source in file_dict['sources']:
            source_url = urlparse(source.strip())
            _validate_url(source_url)
            for destination in file_dict['destinations']:
                dest_url = urlparse(destination.strip())
                _validate_url(dest_url)
                pairs.append((source_url, dest_url))

        # Create one File entry per matching pair
        if self.is_bringonline:
            initial_file_state = 'STAGING'
        else:
            initial_file_state = 'SUBMITTED'

358
359
        # Multiple replica job or multihop? Then, the initial state is NOT_USED
        if len(file_dict['sources']) > 1 or self.params['multihop']:
360
361
362
363
364
365
366
367
            if self.is_bringonline:
                raise HTTPBadRequest('Staging with multiple replicas is not allowed')
            # On multiple replica job, we mark all files initially with NOT_USED
            initial_file_state = 'NOT_USED'
            # Multiple replicas, all must share the hashed-id
            if shared_hashed_id is None:
                shared_hashed_id = _generate_hashed_id()

368
        for source, destination in pairs:
369
370
371
372
373
374
            f = dict(
                job_id=self.job_id,
                file_index=f_index,
                file_state=initial_file_state,
                source_surl=source.geturl(),
                dest_surl=destination.geturl(),
375
376
                source_se=get_storage_element(source),
                dest_se=get_storage_element(destination),
377
                vo_name=None,
378
379
380
381
382
383
384
385
386
387
388
389
390
                user_filesize=_safe_filesize(file_dict.get('filesize', 0)),
                selection_strategy=file_dict.get('selection_strategy', 'auto'),
                checksum=file_dict.get('checksum', None),
                file_metadata=file_dict.get('metadata', None),
                activity=file_dict.get('activity', 'default'),
                hashed_id=shared_hashed_id if shared_hashed_id else _generate_hashed_id()
            )
            self.files.append(f)

    def _apply_selection_strategy(self):
        """
        On multiple-replica jobs, select the adecuate file to go active
        """
391
392
393
        entry_state = "STAGING" if self.is_bringonline else "SUBMITTED"
        _select_best_replica(self.files, self.user.vos[0], entry_state,
                             self.files[0].get('selection_strategy', 'auto'))
394
395
396
397
398

    def _populate_transfers(self, files_list):
        """
        Initializes the list of transfers
        """
399
        
400
        job_type = None
Maria Arsuaga Rios's avatar
Maria Arsuaga Rios committed
401
        log.debug("job type is " + str(job_type)+ " reuse"+ str(self.params['reuse']))
402
        
403
        if self.params['multihop']:
404
            job_type = 'H'
405
        elif self.params['reuse'] is not None:
406
            if _safe_flag(self.params['reuse']):
407
                job_type = 'Y'
408
            else:
409
                job_type = 'N'
Maria Arsuaga Rios's avatar
Maria Arsuaga Rios committed
410
        log.debug("job type is " + str(job_type))
411
412
413
414
        self.is_bringonline = self.params['copy_pin_lifetime'] > 0 or self.params['bring_online'] > 0

        job_initial_state = 'STAGING' if self.is_bringonline else 'SUBMITTED'

415
        max_time_in_queue = _seconds_from_value(self.params.get('max_time_in_queue', None))
416
        expiration_time = None
417
418
        if max_time_in_queue is not None:
            expiration_time = time.time() + max_time_in_queue
419

420
421
422
        self.job = dict(
            job_id=self.job_id,
            job_state=job_initial_state,
423
            job_type=job_type,
424
            retry=int(self.params['retry']),
425
            retry_delay=int(self.params['retry_delay']),
426
427
            job_params=self.params['gridftp'],
            submit_host=socket.getfqdn(),
428
429
430
431
            user_dn=None,
            voms_cred=None,
            vo_name=None,
            cred_id=None,
432
433
434
435
436
437
438
439
440
            submit_time=datetime.utcnow(),
            priority=max(min(int(self.params['priority']), 5), 1),
            space_token=self.params['spacetoken'],
            overwrite_flag=_safe_flag(self.params['overwrite']),
            source_space_token=self.params['source_spacetoken'],
            copy_pin_lifetime=int(self.params['copy_pin_lifetime']),
            checksum_method=self.params['verify_checksum'],
            bring_online=self.params['bring_online'],
            job_metadata=self.params['job_metadata'],
441
            internal_job_params=self._build_internal_job_params(),
442
            max_time_in_queue=expiration_time
443
444
445
446
447
448
449
450
        )

        if 'credential' in self.params:
            self.job['user_cred'] = self.params['credential']
        elif 'credentials' in self.params:
            self.job['user_cred'] = self.params['credentials']

        # If reuse is enabled, or it is a bring online job, generate one single "hash" for all files
451
        if job_type in ('H', 'Y') or self.is_bringonline:
452
453
454
455
456
457
458
459
460
461
462
463
464
            shared_hashed_id = _generate_hashed_id()
        else:
            shared_hashed_id = None

        # Files
        f_index = 0
        for file_dict in files_list:
            self._populate_files(file_dict, f_index, shared_hashed_id)
            f_index += 1

        if len(self.files) == 0:
            raise HTTPBadRequest('No valid pairs available')

465
466
        # If a checksum is provided, but no checksum is available, 'target' comparison
        # (Not nice, but need to keep functionality!) Also by default all files will have ADLER32 checksum type
467
468
469
470
        has_checksum = False
        for file_dict in self.files:
            if file_dict['checksum'] is not None:
                has_checksum = len(file_dict['checksum']) > 0
471
            else: 
472
                file_dict['checksum'] = 'ADLER32'
473
                
474
475
476
477
478
479
480
481
482
        if type(self.job['checksum_method']) == bool:
            if not self.job['checksum_method'] and has_checksum:
                self.job['checksum_method'] = 'target'
            else:
                if not self.job['checksum_method'] and not has_checksum:
                    self.job['checksum_method'] = 'none'
                else: 
                    self.job['checksum_method'] = 'both'
            
483
        self.job['checksum_method'] = self.job['checksum_method'][0]
484
485
486
487
        # Validate that if this is a multiple replica job, that there is one single unique file
        self.is_multiple, unique_files = _has_multiple_options(self.files)
        if self.is_multiple:
            # Multiple replicas can not use the reuse flag, nor multihop
488
            if job_type in ('H', 'Y'):
489
490
491
492
                raise HTTPBadRequest('Can not specify reuse and multiple replicas at the same time')
            # Only one unique file per multiple-replica job
            if unique_files > 1:
                raise HTTPBadRequest('Multiple replicas jobs can only have one unique file')
493
            self.job['job_type'] = 'R'
494
495
            # Apply selection strategy
            self._apply_selection_strategy()
496
497
498
        # For multihop, mark the first as SUBMITTED
        elif self.params['multihop']:
            self.files[0]['file_state'] = 'SUBMITTED'
499
500

        self._set_job_source_and_destination(self.files)
501
        
502
503
        # If reuse is enabled, source and destination SE must be the same for all entries
        # Ignore for multiple replica jobs!
504
        if job_type == 'Y' and (not self.job['source_se'] or not self.job['dest_se']):
Maria Arsuaga Rios's avatar
Maria Arsuaga Rios committed
505
            raise HTTPBadRequest('Reuse jobs can only contain transfers for the same source and destination storage')
506
        
507
508
509
        if job_type == 'Y' and (self.job['source_se'] and self.job['dest_se']):
            self.job['job_type'] == 'Y'
        
510
        if job_type == 'N' and not self.is_multiple:
511
512
            self.job['job_type'] = 'N'
        
Maria Arsuaga Rios's avatar
Maria Arsuaga Rios committed
513
        auto_session_reuse= pylons.config.get('fts3.AutoSessionReuse', 'false')
514
        log.debug("AutoSessionReuse is "+str(auto_session_reuse)+ " job_type is" + str(job_type))
515
516
517
        max_reuse_files = int(pylons.config.get('fts3.AutoSessionReuseMaxFiles', 1000))
        max_size_small_file = int(pylons.config.get('fts3.AutoSessionReuseMaxSmallFileSize', 104857600)) #100MB
        max_size_big_file = int(pylons.config.get('fts3.AutoSessionReuseMaxBigFileSize', 1073741824)) #1GB
518
519
        max_big_files = int(pylons.config.get('fts3.AutoSessionReuseMaxBigFiles', 2)
        
520
        if ((self.is_multiple == False) and (auto_session_reuse == 'true') and (self.job['source_se']) and (self.job['dest_se']) and (job_type is None) and (len(self.files) > 1)) :
521
522
523
524
525
526
527
            if len(self.files) > max_reuse_files:
                self.job['job_type'] == 'N'
                log.debug("The number of files "+str(len(self.files))+"is bigger than the auto maximum reuse files "+str(max_reuse_files))
            else:
                small_files = 0
                big_files = 0
                min_small_files = len(self.files) - max_big_files
528
                for file in self.files:
529
530
531
532
533
534
535
536
537
538
539
540
541
                    log.debug(str(file['user_filesize']))
                    if file['user_filesize'] <= max_size_small_file and file['user_filesize'] > 0:
                        small_files +=1
                    else:
                        if file['user_filesize'] > max_size_small_file and file['user_filesize'] <= max_size_big_file:
                            big_files +=1
                if small_files > min_small_files and big_files <= max_big_files:
                    self.job['job_type'] = 'Y'
                    log.debug("Reuse jobs with "+str(small_files)+" small files up to "+str(len(self.files))+" total files")
                    # Need to reset their hashed_id so they land on the same machine
                    shared_hashed_id = _generate_hashed_id()
                    for file in self.files:
                        file['hashed_id'] = shared_hashed_id
542
        
543
544
        if self.job['job_type'] is None:
            self.job['job_type'] = 'N'
545

546
547
548
549
550
551
552
    def _populate_deletion(self, deletion_dict):
        """
        Initializes the list of deletions
        """
        self.job = dict(
            job_id=self.job_id,
            job_state='DELETE',
553
            job_type=None,
554
            retry=int(self.params['retry']),
555
            retry_delay=int(self.params['retry_delay']),
556
557
            job_params=self.params['gridftp'],
            submit_host=socket.getfqdn(),
558
559
560
561
            user_dn=None,
            voms_cred=None,
            vo_name=None,
            cred_id=None,
562
563
564
565
566
567
            submit_time=datetime.utcnow(),
            priority=3,
            space_token=self.params['spacetoken'],
            overwrite_flag='N',
            source_space_token=self.params['source_spacetoken'],
            copy_pin_lifetime=-1,
568
            checksum_method=None,
569
570
            bring_online=None,
            job_metadata=self.params['job_metadata'],
571
572
            internal_job_params=None,
            max_time_in_queue=self.params['max_time_in_queue']
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
        )

        if 'credential' in self.params:
            self.job['user_cred'] = self.params['credential']
        elif 'credentials' in self.params:
            self.job['user_cred'] = self.params['credentials']

        shared_hashed_id = _generate_hashed_id()

        # Avoid surl duplication
        unique_surls = []

        for dm in deletion_dict:
            if isinstance(dm, dict):
                entry = dm
            elif isinstance(dm, str) or isinstance(dm, unicode):
                entry = dict(surl=dm)
            else:
                raise ValueError("Invalid type for the deletion item (%s)" % type(dm))

            surl = urlparse(entry['surl'])
            _validate_url(surl)

            if surl not in unique_surls:
                self.datamanagement.append(dict(
                    job_id=self.job_id,
599
                    vo_name=None,
600
601
                    file_state='DELETE',
                    source_surl=entry['surl'],
602
                    source_se=get_storage_element(surl),
603
604
605
606
607
608
609
610
611
                    dest_surl=None,
                    dest_se=None,
                    hashed_id=shared_hashed_id,
                    file_metadata=entry.get('metadata', None)
                ))
                unique_surls.append(surl)

        self._set_job_source_and_destination(self.datamanagement)

612
613
614
615
616
617
618
619
620
621
622
623
624
625
    def _set_user(self):
        """
        Set the user that triggered the action
        """
        self.job['user_dn'] = self.user.user_dn
        self.job['cred_id'] = self.user.delegation_id
        self.job['voms_cred'] = ' '.join(self.user.voms_cred)
        self.job['vo_name'] = self.user.vos[0]
        for file in self.files:
            file['vo_name'] = self.user.vos[0]
        for dm in self.datamanagement:
            dm['vo_name'] = self.user.vos[0]

    def __init__(self, user, **kwargs):
626
627
628
629
        """
        Constructor
        """
        try:
630
            self.user = user
631
632
633
634
635
636
637
638
639
640
            # Get the job parameters
            self.params = self._get_params(kwargs.pop('params', dict()))

            files_list = kwargs.pop('files', None)
            datamg_list = kwargs.pop('delete', None)

            if files_list is not None and datamg_list is not None:
                raise HTTPBadRequest('Simultaneous transfer and namespace operations not supported')
            if files_list is None and datamg_list is None:
                raise HTTPBadRequest('No transfers or namespace operations specified')
641
642
            id_generator = self.params.get('id_generator', 'standard')
            if id_generator =='deterministic':
643
                log.debug("Deterministic")
644
                sid = self.params.get('sid', None)
645
                if sid is not None:
646
                    log.info("sid: "+sid)
647
648
649
                    vo_id = uuid.uuid5(BASE_ID, self.user.vos[0])
                    self.job_id = str(uuid.uuid5(vo_id, str(sid)))
                else:
650
                    raise HTTPBadRequest('Need sid for deterministic job id generation')
651
652
            else:
                self.job_id = str(uuid.uuid1())
653
654
655
656
657
658
659
660
            self.files = list()
            self.datamanagement = list()

            if files_list is not None:
                self._populate_transfers(files_list)
            elif datamg_list is not None:
                self._populate_deletion(datamg_list)

661
662
            self._set_user()

663
664
665
            # Reject for SE banning
            # If any SE does not accept submissions, reject the whole job
            # Update wait_timeout and wait_timestamp if WAIT_AS is set
666
667
668
669
            if self.files:
                _apply_banning(self.files)
            if self.datamanagement:
                _apply_banning(self.datamanagement)
670
671
672
673
674
675
676

        except ValueError, e:
            raise HTTPBadRequest('Invalid value within the request: %s' % str(e))
        except TypeError, e:
            raise HTTPBadRequest('Malformed request: %s' % str(e))
        except KeyError, e:
            raise HTTPBadRequest('Missing parameter: %s' % str(e))