JobBuilder.py 29.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#   Copyright notice:
#   Copyright CERN, 2015.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

16
import logging
17
18
import random
import socket
19
import time
20
21
import types
import uuid
22
import pylons
23
import json
24

25
from datetime import datetime
26
27
from urlparse import urlparse,parse_qsl, ParseResult
from urllib import urlencode
28
29
30
31
32

from fts3.model import File, BannedSE
from fts3rest.lib.base import Session
from fts3rest.lib.http_exceptions import *

33
34
35
from fts3rest.lib.scheduler.schd import Scheduler
from fts3rest.lib.scheduler.db import Database
from fts3rest.lib.scheduler.Cache import ThreadLocalCache
36

37
38

log = logging.getLogger(__name__)
39

40
41
BASE_ID = uuid.UUID('urn:uuid:01874efb-4735-4595-bc9c-591aef8240c9')

42
43
DEFAULT_PARAMS = {
    'bring_online': -1,
44
    'archive_timeout': -1,
45
    'verify_checksum': False,
46
47
48
49
    'copy_pin_lifetime': -1,
    'gridftp': '',
    'job_metadata': None,
    'overwrite': False,
50
    'dst_file_report': False,
51
    'overwrite_on_retry': False,
52
    'reuse': None,
53
54
55
56
    'multihop': False,
    'source_spacetoken': '',
    'spacetoken': '',
    'retry': 0,
57
    'retry_delay': 0,
58
    'priority': 3,
59
60
    'max_time_in_queue': 0,
    's3alternate': False,
61
62
}

63
def get_base_id():
64
    return BASE_ID
65

66
def get_vo_id(vo_name):
67
68
    log.debug("VO name: "+vo_name)
    return uuid.uuid5(BASE_ID, vo_name.encode('utf-8'))
69

70
def get_storage_element(uri):
71
    """
72
73
    Returns the storage element of the given uri, which is the scheme +
    hostname without the port
74
75
76
77
78
79

    Args:
        uri: An urlparse instance
    """
    return "%s://%s" % (uri.scheme, uri.hostname)

80
81
82
83
84
85
86
def _is_dest_surl_uuid_enabled(vo_name):
    """
    Returns True if the given vo_name allows dest_surl_uuid.

    Args:
        vo_name: Name of the vo
    """
87
    list_of_vos = pylons.config.get('fts3.CheckDuplicates', 'None')
88
    if not list_of_vos:
89
        return False	
90
    if vo_name in list_of_vos or "*" in list_of_vos:
91
        return True
92
    return False
93
94
95
96
97
98
99
100
101
102
103

def _validate_url(url):
    """
    Validates the format and content of the url
    """
    if not url.scheme:
        raise ValueError('Missing scheme (%s)' % url.geturl())
    if url.scheme == 'file':
        raise ValueError('Can not transfer local files (%s)' % url.geturl())
    if not url.path or (url.path == '/' and not url.query):
        raise ValueError('Missing path (%s)' % url.geturl())
104
    if not url.hostname:
105
106
        raise ValueError('Missing host (%s)' % url.geturl())

107
def _metadata(data):
108
109
    if isinstance(data, dict):
        return data
110
111
112
113
114
    try:
        return json.loads(data)
    except:
        return {"label": str(data)}

115
116
117
118
119
120
121
122

def _safe_flag(flag):
    """
    Traduces from different representations of flag values to True/False
    True/False => True/False
    1/0 => True/False
    'Y'/'N' => True/False
    """
123
124
    if isinstance(flag, types.StringType) or isinstance(flag,
                                                        types.UnicodeType):
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
        return len(flag) > 0 and flag[0].upper() == 'Y'
    else:
        return bool(flag)


def _safe_filesize(size):
    if isinstance(size, float):
        return size
    elif size is None:
        return 0.0
    else:
        return float(size)


def _generate_hashed_id():
    """
    Generates a uniformly distributed value between 0 and 2**16
    This is intended to split evenly the load across node
143
144
    The name is an unfortunately legacy from when this used to
    be based on a hash on the job
145
146
147
148
149
150
151
    """
    return random.randint(0, (2 ** 16) - 1)


def _has_multiple_options(files):
    """
    Returns a tuple (Boolean, Integer)
152
153
    Boolean is True if there are multiple replica entries, and Integer
    holds the number of unique files.
154
155
156
157
158
159
160
    """
    ids = map(lambda f: f['file_index'], files)
    id_count = len(ids)
    unique_id_count = len(set(ids))
    return unique_id_count != id_count, unique_id_count


161
162
163
164
165
166
167
168
169
170
def _select_best_replica(files, vo_name, entry_state, strategy):

    dst = files[0]['dest_se']
    activity = files[0]['activity']
    user_filesize = files[0]['user_filesize']

    queue_provider = Database(Session)
    cache_provider = ThreadLocalCache(queue_provider)
    # s = Scheduler(queue_provider)
    s = Scheduler (cache_provider)
171
    source_se_list = map(lambda f: f['source_se'], files)
172
173

    if strategy == "orderly":
174
        sorted_ses = source_se_list
175
176

    elif strategy == "queue" or strategy == "auto":
177
        sorted_ses = map(lambda x: x[0], s.rank_submitted(source_se_list,
178
179
                                                          dst,
                                                          vo_name))
180
181

    elif strategy == "success":
182
        sorted_ses = map(lambda x: x[0], s.rank_success_rate(source_se_list,
183
                                                             dst))
184
185

    elif strategy == "throughput":
186
        sorted_ses = map(lambda x: x[0], s.rank_throughput(source_se_list,
187
                                                           dst))
188
189

    elif strategy == "file-throughput":
190
        sorted_ses = map(lambda x: x[0], s.rank_per_file_throughput(
191
192
            source_se_list,
            dst))
193
194

    elif strategy == "pending-data":
195
        sorted_ses = map(lambda x: x[0], s.rank_pending_data(source_se_list,
196
197
198
                                                             dst,
                                                             vo_name,
                                                             activity))
199
200

    elif strategy == "waiting-time":
201
        sorted_ses = map(lambda x: x[0], s.rank_waiting_time(source_se_list,
202
203
204
                                                             dst,
                                                             vo_name,
                                                             activity))
205
206

    elif strategy == "waiting-time-with-error":
207
        sorted_ses = map(lambda x: x[0], s.rank_waiting_time_with_error(
208
209
210
211
            source_se_list,
            dst,
            vo_name,
            activity))
212
213

    elif strategy == "duration":
214
        sorted_ses = map(lambda x: x[0], s.rank_finish_time(source_se_list,
215
216
217
218
                                                            dst,
                                                            vo_name,
                                                            activity,
                                                            user_filesize))
219
    else:
220
221
        raise HTTPBadRequest(strategy + " algorithm is not supported by Scheduler")

222
223
224
    # We got the storages sorted from better to worst following
    # the chosen strategy.
    # We need to find the file with the source matching that best_se
225
    best_index = 0
226
    best_se = sorted_ses[0]
227
    for index, transfer in enumerate(files):
228
        if transfer['source_se'] == best_se:
229
            best_index = index
230
            break
231

232
    files[best_index]['file_state'] = entry_state
233
    if _is_dest_surl_uuid_enabled(vo_name):
234
        files[best_index]['dest_surl_uuid'] = str(uuid.uuid5(BASE_ID, files[best_index]['dest_surl'].encode('utf-8'))) 
235
236
237
238
239
240
241
242
243
244
245
246
247
248


def _apply_banning(files):
    """
    Query the banning information for all pairs, reject the job
    as soon as one SE can not submit.
    Update wait_timeout and wait_timestamp is there is a hit
    """
    # Usually, banned SES will be in the order of ~100 max
    # Files may be several thousands
    # We get all banned in memory so we avoid querying too many times the DB
    # We then build a dictionary to make look up easy
    banned_ses = dict()
    for b in Session.query(BannedSE):
249
        banned_ses[str(b.se)] = (b.vo, b.status)
250
251
252
253

    for f in files:
        source_banned = banned_ses.get(str(f['source_se']), None)
        dest_banned = banned_ses.get(str(f['dest_se']), None)
254
        banned = False
255

256
        if source_banned and (source_banned[0] == f['vo_name'] or source_banned[0] == '*'):
257
258
            if source_banned[1] != 'WAIT_AS':
                raise HTTPForbidden("%s is banned" % f['source_se'])
259
            banned = True
260

261
        if dest_banned and (dest_banned[0] == f['vo_name'] or dest_banned[0] == '*'):
262
263
            if dest_banned[1] != 'WAIT_AS':
                raise HTTPForbidden("%s is banned" % f['dest_se'])
264
265
266
267
268
269
270
271
272
            banned = True

        if banned:
            if f['file_state'] == 'SUBMITTED':
                f['file_state'] = 'ON_HOLD'
            elif f['file_state'] == 'STAGING':
                f['file_state'] = 'ON_HOLD_STAGING'
            elif f['file_state'] == 'DELETE':
                continue
273
            else:
274
                HTTPInternalServerError('Unexpected initial state: %s' % f['file_state'])
275
276


277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def _seconds_from_value(value):
    """
    Transform an interval value to seconds
    If value is an integer, assume it is hours (backwards compatibility)
    Otherwise, look at the suffix
    """
    if isinstance(value, int) and value != 0:
        return value * 3600
    elif not isinstance(value, basestring):
        return None

    try:
        suffix = value[-1].lower()
        value = value[:-1]
        if suffix == 's':
            return int(value)
        elif suffix == 'm':
            return int(value) * 60
        elif suffix == 'h':
            return int(value) * 3600
        else:
            return None
    except:
        return None


303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
class JobBuilder(object):
    """
    From a dictionary, build the internal representation of Job, Files and
    Data Management
    """

    def _get_params(self, submitted_params):
        """
        Returns proper parameters applying defaults and user values
        """
        params = dict()
        params.update(DEFAULT_PARAMS)
        params.update(submitted_params)
        # Some parameters may be explicitly None to pick the default, so re-apply defaults
        for k, v in params.iteritems():
            if v is None and k in DEFAULT_PARAMS:
                params[k] = DEFAULT_PARAMS[k]
320
321
322
323
324

        # Enforce JSON type for 'job_metadata'
        if params['job_metadata'] is not None:
            params['job_metadata'] = _metadata(params['job_metadata'])

325
326
327
328
329
330
331
332
        return params

    def _build_internal_job_params(self):
        """
        Generates the value for job.internal_job_params depending on the
        received protocol parameters
        """
        param_list = list()
333
        if self.params.get('nostreams', None):
334
            param_list.append("nostreams:%d" % int(self.params['nostreams']))
335
        if self.params.get('timeout', None):
336
            param_list.append("timeout:%d" % int(self.params['timeout']))
337
        if self.params.get('buffer_size', None):
338
            param_list.append("buffersize:%d" % int(self.params['buffer_size']))
339
        if self.params.get('strict_copy', False):
340
            param_list.append("strict")
341
342
343
344
        if self.params.get('ipv4', False):
            param_list.append('ipv4')
        elif self.params.get('ipv6', False):
            param_list.append('ipv6')
345
346
        if self.params.get('s3alternate', False):
            param_list.append('s3alternate')
347

348
349
350
351
352
353
354
355
356
357
358
        if len(param_list) == 0:
            return None
        else:
            return ','.join(param_list)

    def _set_job_source_and_destination(self, entries):
        """
        Iterates through the files that belong to the job, and determines the
        'overall' job source and destination Storage Elements
        """
        # Multihop
359
        if self.job['job_type'] == 'H':
360
361
362
363
364
365
366
367
368
369
370
371
            self.job['source_se'] = entries[0]['source_se']
            self.job['dest_se'] = entries[-1]['dest_se']
        # Regular transfers
        else:
            self.job['source_se'] = entries[0]['source_se']
            self.job['dest_se'] = entries[0]['dest_se']
            for elem in entries:
                if elem['source_se'] != self.job['source_se']:
                    self.job['source_se'] = None
                if elem['dest_se'] != self.job['dest_se']:
                    self.job['dest_se'] = None

372
373
374
375
376
377
378
379
380
381
382
383
384
385
    def _set_activity_query_string(self, url, file_dict):
        """
        Set the activity query string in the given url
        """
        query_p = parse_qsl(url.query)
        query_p.append(('activity', file_dict.get('activity', 'default')))
        query_str = urlencode(query_p)
        return ParseResult(scheme=url.scheme,
                           netloc=url.netloc,
                           path=url.path,
                           params=url.params,
                           query= query_str,
                           fragment=url.fragment)
        
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
    def _populate_files(self, file_dict, f_index, shared_hashed_id):
        """
        From the dictionary file_dict, generate a list of transfers for a job
        """
        # Extract matching pairs
        pairs = []
        for source in file_dict['sources']:
            source_url = urlparse(source.strip())
            _validate_url(source_url)
            for destination in file_dict['destinations']:
                dest_url = urlparse(destination.strip())
                _validate_url(dest_url)
                pairs.append((source_url, dest_url))

        # Create one File entry per matching pair
        if self.is_bringonline:
            initial_file_state = 'STAGING'
403
404
        elif self.is_qos_cdmi_transfer:
            initial_file_state = 'QOS_TRANSITION'
405
406
407
        else:
            initial_file_state = 'SUBMITTED'

408
409
        # Multiple replica job or multihop? Then, the initial state is NOT_USED
        if len(file_dict['sources']) > 1 or self.params['multihop']:
410
411
412
413
            #if self.is_bringonline:
                #set the first as STAGING and the rest as 'NOT_USED'
                #staging_and_multihop = True
                #raise HTTPBadRequest('Staging with multiple replicas is not allowed')
414
415
416
417
418
            # On multiple replica job, we mark all files initially with NOT_USED
            initial_file_state = 'NOT_USED'
            # Multiple replicas, all must share the hashed-id
            if shared_hashed_id is None:
                shared_hashed_id = _generate_hashed_id()
419
        vo_name = self.user.vos[0]
420
421

        for source,destination in pairs:
422
423
424
425
            if len(file_dict['sources']) > 1 or not _is_dest_surl_uuid_enabled(vo_name):
                dest_uuid = None
            else:
                dest_uuid = str(uuid.uuid5(BASE_ID, destination.geturl().encode('utf-8')))
426
427
428
            if self.is_bringonline:
                # add the new query parameter only for root -> EOS-CTA for now
                if source.scheme == "root":
429
430
431
432
                    if source == destination:
                        destination = self._set_activity_query_string(destination,file_dict)    
                    source = self._set_activity_query_string(source,file_dict)

433
434
435
            f = dict(
                job_id=self.job_id,
                file_index=f_index,
436
                dest_surl_uuid=dest_uuid,
437
438
439
                file_state=initial_file_state,
                source_surl=source.geturl(),
                dest_surl=destination.geturl(),
440
441
                source_se=get_storage_element(source),
                dest_se=get_storage_element(destination),
442
                vo_name=None,
443
                priority=self.job['priority'],
444
445
446
447
448
449
450
                user_filesize=_safe_filesize(file_dict.get('filesize', 0)),
                selection_strategy=file_dict.get('selection_strategy', 'auto'),
                checksum=file_dict.get('checksum', None),
                file_metadata=file_dict.get('metadata', None),
                activity=file_dict.get('activity', 'default'),
                hashed_id=shared_hashed_id if shared_hashed_id else _generate_hashed_id()
            )
451
452
            if f['file_metadata'] != None:
                f['file_metadata'] = _metadata(f['file_metadata'])
453
454
455
456
457
458
            self.files.append(f)

    def _apply_selection_strategy(self):
        """
        On multiple-replica jobs, select the adecuate file to go active
        """
459
460
461
        entry_state = "STAGING" if self.is_bringonline else "SUBMITTED"
        _select_best_replica(self.files, self.user.vos[0], entry_state,
                             self.files[0].get('selection_strategy', 'auto'))
462
463
464
465
466

    def _populate_transfers(self, files_list):
        """
        Initializes the list of transfers
        """
467

468
        job_type = None
469
        log.debug("job_type=" + str(job_type) + " reuse=" + str(self.params['reuse']))
470

471
        if self.params['multihop']:
472
            job_type = 'H'
473
        elif self.params['reuse'] is not None:
474
            if _safe_flag(self.params['reuse']):
475
                job_type = 'Y'
476
            else:
477
                job_type = 'N'
478
        log.debug("job_type=" + str(job_type))
479
480
        self.is_bringonline = self.params['copy_pin_lifetime'] > 0 or self.params['bring_online'] > 0

481
        self.is_qos_cdmi_transfer = (self.params['target_qos'] if 'target_qos' in self.params.keys() else None) is not None
482

483
484
485
486
487
488
        if self.is_bringonline:
            job_initial_state = 'STAGING'
        elif self.is_qos_cdmi_transfer:
            job_initial_state = 'QOS_TRANSITION'
        else:
            job_initial_state = 'SUBMITTED'
489

490
        max_time_in_queue = _seconds_from_value(self.params.get('max_time_in_queue', None))
491
        expiration_time = None
492
493
        if max_time_in_queue is not None:
            expiration_time = time.time() + max_time_in_queue
494

495
496
497
498
499
        if self.params['overwrite']:
            overwrite_flag = 'Y'
        elif self.params['overwrite_on_retry']:
            overwrite_flag = 'R'
        else:
500
            overwrite_flag = False
501

502
503
504
        self.job = dict(
            job_id=self.job_id,
            job_state=job_initial_state,
505
            job_type=job_type,
506
            retry=int(self.params['retry']),
507
            retry_delay=int(self.params['retry_delay']),
508
509
            job_params=self.params['gridftp'],
            submit_host=socket.getfqdn(),
510
511
512
513
            user_dn=None,
            voms_cred=None,
            vo_name=None,
            cred_id=None,
514
515
516
            submit_time=datetime.utcnow(),
            priority=max(min(int(self.params['priority']), 5), 1),
            space_token=self.params['spacetoken'],
517
            dst_file_report=_safe_flag(self.params['dst_file_report']),
518
            overwrite_flag=overwrite_flag,
519
520
521
522
            source_space_token=self.params['source_spacetoken'],
            copy_pin_lifetime=int(self.params['copy_pin_lifetime']),
            checksum_method=self.params['verify_checksum'],
            bring_online=self.params['bring_online'],
523
            archive_timeout=self.params['archive_timeout'],
524
            job_metadata=self.params['job_metadata'],
525
            internal_job_params=self._build_internal_job_params(),
526
            max_time_in_queue=expiration_time,
527
            target_qos=self.params['target_qos'] if 'target_qos' in self.params.keys() else None
528
529
530
531
532
533
534
535
        )

        if 'credential' in self.params:
            self.job['user_cred'] = self.params['credential']
        elif 'credentials' in self.params:
            self.job['user_cred'] = self.params['credentials']

        # If reuse is enabled, or it is a bring online job, generate one single "hash" for all files
536
        if job_type in ('H', 'Y') or self.is_bringonline:
537
538
539
540
541
542
543
544
545
546
547
548
549
            shared_hashed_id = _generate_hashed_id()
        else:
            shared_hashed_id = None

        # Files
        f_index = 0
        for file_dict in files_list:
            self._populate_files(file_dict, f_index, shared_hashed_id)
            f_index += 1

        if len(self.files) == 0:
            raise HTTPBadRequest('No valid pairs available')

550
551
        # If a checksum is provided, but no checksum is available, 'target' comparison
        # (Not nice, but need to keep functionality!) Also by default all files will have ADLER32 checksum type
552
553
554
555
        has_checksum = False
        for file_dict in self.files:
            if file_dict['checksum'] is not None:
                has_checksum = len(file_dict['checksum']) > 0
556
            else: 
557
                file_dict['checksum'] = 'ADLER32'
558

559
560
561
562
563
564
565
566
        if type(self.job['checksum_method']) == bool:
            if not self.job['checksum_method'] and has_checksum:
                self.job['checksum_method'] = 'target'
            else:
                if not self.job['checksum_method'] and not has_checksum:
                    self.job['checksum_method'] = 'none'
                else: 
                    self.job['checksum_method'] = 'both'
567

568
        self.job['checksum_method'] = self.job['checksum_method'][0]
569
570
571
572
        # Validate that if this is a multiple replica job, that there is one single unique file
        self.is_multiple, unique_files = _has_multiple_options(self.files)
        if self.is_multiple:
            # Multiple replicas can not use the reuse flag, nor multihop
573
            if job_type in ('H', 'Y'):
574
575
576
577
                raise HTTPBadRequest('Can not specify reuse and multiple replicas at the same time')
            # Only one unique file per multiple-replica job
            if unique_files > 1:
                raise HTTPBadRequest('Multiple replicas jobs can only have one unique file')
578
            self.job['job_type'] = 'R'
579
580
            # Apply selection strategy
            self._apply_selection_strategy()
581
582
583
        # For multihop + staging mark the first as STAGING
        elif self.params['multihop'] and self.is_bringonline:
            self.files[0]['file_state'] = 'STAGING'
584
585
586
        # For multihop, mark the first as SUBMITTED
        elif self.params['multihop']:
            self.files[0]['file_state'] = 'SUBMITTED'
587
588

        self._set_job_source_and_destination(self.files)
589

590
591
        # If reuse is enabled, source and destination SE must be the same for all entries
        # Ignore for multiple replica jobs!
592
        min_reuse_files = int(pylons.config.get('fts3.SessionReuseMinFiles', 5))
593
        if job_type == 'Y' and (not self.job['source_se'] or not self.job['dest_se']):
Maria Arsuaga Rios's avatar
Maria Arsuaga Rios committed
594
            raise HTTPBadRequest('Reuse jobs can only contain transfers for the same source and destination storage')
595

596
        if job_type == 'Y' and (self.job['source_se'] and self.job['dest_se']) and len(self.files) > min_reuse_files:
597
            self.job['job_type'] = 'Y'
598

599
        if job_type == 'N' and not self.is_multiple:
600
            self.job['job_type'] = 'N'
601

Maria Arsuaga Rios's avatar
Maria Arsuaga Rios committed
602
        auto_session_reuse= pylons.config.get('fts3.AutoSessionReuse', 'false')
603
        log.debug("AutoSessionReuse=" + str(auto_session_reuse) + " job_type=" + str(job_type))
604
605
606
        max_reuse_files = int(pylons.config.get('fts3.AutoSessionReuseMaxFiles', 1000))
        max_size_small_file = int(pylons.config.get('fts3.AutoSessionReuseMaxSmallFileSize', 104857600)) #100MB
        max_size_big_file = int(pylons.config.get('fts3.AutoSessionReuseMaxBigFileSize', 1073741824)) #1GB
607
608
        max_big_files = int(pylons.config.get('fts3.AutoSessionReuseMaxBigFiles', 2))

609
        if auto_session_reuse == 'true' and not self.is_multiple and not self.is_bringonline and len(self.files) > min_reuse_files:
610
611
            if ((self.job['source_se']) and (self.job['dest_se']) and (job_type is None) and (len(self.files) > 1)):
                if len(self.files) > max_reuse_files:
612
                    self.job['job_type'] = 'N'
613
                    log.debug("The number of files " + str(len(self.files)) + " is bigger than the auto maximum reuse files " + str(max_reuse_files))
614
615
616
617
                else:
                    small_files = 0
                    big_files = 0
                    min_small_files = len(self.files) - max_big_files
618
                    for file in self.files:
619
620
621
622
623
624
625
626
                        log.debug(str(file['user_filesize']))
                        if file['user_filesize'] <= max_size_small_file and file['user_filesize'] > 0:
                            small_files +=1
                        else:
                            if file['user_filesize'] > max_size_small_file and file['user_filesize'] <= max_size_big_file:
                                big_files +=1
                    if small_files > min_small_files and big_files <= max_big_files:
                        self.job['job_type'] = 'Y'
627
                        log.debug("Reuse jobs with " + str(small_files) + " small files up to " + str(len(self.files)) + " total files")
628
629
630
631
                        # Need to reset their hashed_id so they land on the same machine
                        shared_hashed_id = _generate_hashed_id()
                        for file in self.files:
                            file['hashed_id'] = shared_hashed_id
632

633
634
        if self.job['job_type'] is None:
            self.job['job_type'] = 'N'
635

636
637
638
639
640
641
642
    def _populate_deletion(self, deletion_dict):
        """
        Initializes the list of deletions
        """
        self.job = dict(
            job_id=self.job_id,
            job_state='DELETE',
643
            job_type=None,
644
            retry=int(self.params['retry']),
645
            retry_delay=int(self.params['retry_delay']),
646
647
            job_params=self.params['gridftp'],
            submit_host=socket.getfqdn(),
648
649
650
651
            user_dn=None,
            voms_cred=None,
            vo_name=None,
            cred_id=None,
652
653
654
655
            submit_time=datetime.utcnow(),
            priority=3,
            space_token=self.params['spacetoken'],
            overwrite_flag='N',
656
            dst_file_report='N',
657
658
            source_space_token=self.params['source_spacetoken'],
            copy_pin_lifetime=-1,
659
            checksum_method=None,
660
            bring_online=None,
661
            archive_timeout=None,
662
            job_metadata=self.params['job_metadata'],
663
664
            internal_job_params=None,
            max_time_in_queue=self.params['max_time_in_queue']
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
        )

        if 'credential' in self.params:
            self.job['user_cred'] = self.params['credential']
        elif 'credentials' in self.params:
            self.job['user_cred'] = self.params['credentials']

        shared_hashed_id = _generate_hashed_id()

        # Avoid surl duplication
        unique_surls = []

        for dm in deletion_dict:
            if isinstance(dm, dict):
                entry = dm
            elif isinstance(dm, str) or isinstance(dm, unicode):
                entry = dict(surl=dm)
            else:
                raise ValueError("Invalid type for the deletion item (%s)" % type(dm))

            surl = urlparse(entry['surl'])
            _validate_url(surl)

            if surl not in unique_surls:
                self.datamanagement.append(dict(
                    job_id=self.job_id,
691
                    vo_name=None,
692
693
                    file_state='DELETE',
                    source_surl=entry['surl'],
694
                    source_se=get_storage_element(surl),
695
696
697
698
699
700
701
702
703
                    dest_surl=None,
                    dest_se=None,
                    hashed_id=shared_hashed_id,
                    file_metadata=entry.get('metadata', None)
                ))
                unique_surls.append(surl)

        self._set_job_source_and_destination(self.datamanagement)

704
705
706
707
708
709
710
711
712
713
714
715
716
    def _set_user(self):
        """
        Set the user that triggered the action
        """
        self.job['user_dn'] = self.user.user_dn
        self.job['cred_id'] = self.user.delegation_id
        self.job['voms_cred'] = ' '.join(self.user.voms_cred)
        self.job['vo_name'] = self.user.vos[0]
        for file in self.files:
            file['vo_name'] = self.user.vos[0]
        for dm in self.datamanagement:
            dm['vo_name'] = self.user.vos[0]

717
    def _add_auth_method_on_job_metadata(self):
718
        if self.params['job_metadata'] is not None and self.params['job_metadata'] != 'None':
719
720
721
722
            self.params['job_metadata'].update({"auth_method": self.user.method})
        else:
            self.params['job_metadata'] = {"auth_method": self.user.method}

723
    def __init__(self, user, **kwargs):
724
725
726
727
        """
        Constructor
        """
        try:
728
            self.user = user
729
730
731
            # Get the job parameters
            self.params = self._get_params(kwargs.pop('params', dict()))

732
733
734
            # Update auth method used
            self._add_auth_method_on_job_metadata()

735
736
737
738
739
740
741
            files_list = kwargs.pop('files', None)
            datamg_list = kwargs.pop('delete', None)

            if files_list is not None and datamg_list is not None:
                raise HTTPBadRequest('Simultaneous transfer and namespace operations not supported')
            if files_list is None and datamg_list is None:
                raise HTTPBadRequest('No transfers or namespace operations specified')
742
743
            id_generator = self.params.get('id_generator', 'standard')
            if id_generator =='deterministic':
744
                log.debug("Deterministic")
745
                sid = self.params.get('sid', None)
746
                if sid is not None:
747
                    log.info("sid: "+sid)
748
749
750
                    vo_id = uuid.uuid5(BASE_ID, self.user.vos[0])
                    self.job_id = str(uuid.uuid5(vo_id, str(sid)))
                else:
751
                    raise HTTPBadRequest('Need sid for deterministic job id generation')
752
753
            else:
                self.job_id = str(uuid.uuid1())
754
755
756
757
758
759
760
761
            self.files = list()
            self.datamanagement = list()

            if files_list is not None:
                self._populate_transfers(files_list)
            elif datamg_list is not None:
                self._populate_deletion(datamg_list)

762
763
            self._set_user()

764
765
766
            # Reject for SE banning
            # If any SE does not accept submissions, reject the whole job
            # Update wait_timeout and wait_timestamp if WAIT_AS is set
767
768
769
770
            if self.files:
                _apply_banning(self.files)
            if self.datamanagement:
                _apply_banning(self.datamanagement)
771
772
773
774
775
776
777

        except ValueError, e:
            raise HTTPBadRequest('Invalid value within the request: %s' % str(e))
        except TypeError, e:
            raise HTTPBadRequest('Malformed request: %s' % str(e))
        except KeyError, e:
            raise HTTPBadRequest('Missing parameter: %s' % str(e))