submission.py 9.07 KB
Newer Older
Michal Simon's avatar
Michal Simon committed
1
#   Copyright notice:
2
3
#   Copyright CERN, 2014.
#
Michal Simon's avatar
Michal Simon committed
4
5
6
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
7
#
Michal Simon's avatar
Michal Simon committed
8
#       http://www.apache.org/licenses/LICENSE-2.0
9
#
Michal Simon's avatar
Michal Simon committed
10
11
12
13
14
15
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

16
from datetime import timedelta
17
from fts3.rest.client import Submitter
18
from delegate import delegate
19
from fts3.rest.client import ClientError
20

21
22
23
24
25
26

class JobIdGenerator:
    standard = 'standard' #Default algorithm using uuid1
    deterministic = 'deterministic' #Deterministic algorithm using uuid5 with base_id+vo+sid given by the user


27
def cancel(context, job_id, file_ids=None):
28
29
30
31
32
33
34
35
36
37
38
39
    """
    Cancels a job

    Args:
        context: fts3.rest.client.context.Context instance
        job_id:  The job to cancel

    Returns:
        The terminal state in which the job has been left.
        Note that it may not be CANCELED if the job finished already!
    """
    submitter = Submitter(context)
40
    return submitter.cancel(job_id, file_ids)
41
42


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def cancel_all(context, vo_name=None):
    """
    Cancel all jobs within a given VO or FTS3 (needs enough privileges)

    Args:
        context: fts3.rest.client.context.Context instance
        vo_name: The VO name, or None to cancell all jobs

    Returns:
        None
    """
    submitter = Submitter(context)
    return submitter.cancel_all(vo_name)


Andrea Manzi's avatar
Andrea Manzi committed
58
def new_transfer(source, destination, checksum='ADLER32', filesize=None, metadata=None, activity=None,selection_strategy='auto'):
59
60
61
62
    """
    Creates a new transfer pair

    Args:
63
64
65
66
67
68
        source:             Source SURL
        destination:        Destination SURL
        checksum:           Checksum
        filesize:           File size
        metadata:           Metadata to bind to the transfer
        selection_strategy: selection Strategy to implement for multiple replica Jobs
69
70
71
72
73
74
75
76
77
78
79
80
81
82

    Returns:
        An initialized transfer
    """
    transfer = dict(
        sources=[source],
        destinations=[destination],
    )
    if checksum:
        transfer['checksum'] = checksum
    if filesize:
        transfer['filesize'] = filesize
    if metadata:
        transfer['metadata'] = metadata
83
84
    if activity:
        transfer['activity'] = activity
85
86
87
    if selection_strategy:
        transfer['selection_strategy'] = selection_strategy

88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
    return transfer


def add_alternative_source(transfer, alt_source):
    """
    Adds an alternative source to a transfer

    Args:
        transfer:   A dictionary created with new_transfer
        alt_source: Alternative source

    Returns:
        For convenience, transfer
    """
    transfer['sources'].push_back(alt_source)
    return transfer


106
def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, overwrite=False, multihop=False,
107
108
            source_spacetoken=None, spacetoken=None,
            bring_online=None, copy_pin_lifetime=None,
109
            retry=-1, retry_delay=0, metadata=None, priority=None, strict_copy=False,
110
            max_time_in_queue=None, timeout=None,
111
            id_generator=JobIdGenerator.standard, sid=None, s3alternate=False):
112
113
114
115
116
    """
    Creates a new dictionary representing a job

    Args:
        transfers:         Initial list of transfers
117
        deletion:          Delete files
118
        verify_checksum:   Enable checksum verification: source, destination, both or none
119
120
121
122
123
124
125
126
127
        reuse:             Enable reuse (all transfers are handled by the same process)
        overwrite:         Overwrite the destinations if exist
        multihop:          Treat the transfer as a multihop transfer
        source_spacetoken: Source space token
        spacetoken:        Destination space token
        bring_online:      Bring online timeout
        copy_pin_lifetime: Pin lifetime
        retry:             Number of retries: <0 is no retries, 0 is server default, >0 is whatever value is passed
        metadata:          Metadata to bind to the job
Alejandro Alvarez Ayllon's avatar
Alejandro Alvarez Ayllon committed
128
        priority:          Job priority
129
        max_time_in_queue: Maximum number
130
        id_generator:      Job id generator algorithm
131
        sid:               Specific id given by the client
132
        s3alternate:       Use S3 alternate url schema
133
134
135

    Returns:
        An initialized dictionary representing a job
136
    """
137
138
    if transfers is None and deletion is None:
        raise ClientError('Bad request: No transfers or deletion jobs are provided')
139
140
    if transfers is None:
        transfers = []
141
        
142
    if isinstance(verify_checksum, basestring):
143
144
            if not verify_checksum in ('source','target','both', 'none'):
                raise ClientError('Bad request: verify_checksum does not contain a valid value')
145
146
147
148
149
150
151
152
153
154
    params = dict(
        verify_checksum=verify_checksum,
        reuse=reuse,
        spacetoken=spacetoken,
        bring_online=bring_online,
        copy_pin_lifetime=copy_pin_lifetime,
        job_metadata=metadata,
        source_spacetoken=source_spacetoken,
        overwrite=overwrite,
        multihop=multihop,
155
        retry=retry,
Alejandro Alvarez Ayllon's avatar
Alejandro Alvarez Ayllon committed
156
        retry_delay=retry_delay,
157
        priority=priority,
158
        strict_copy=strict_copy,
159
        max_time_in_queue=max_time_in_queue,
160
161
        timeout=timeout,
        id_generator=id_generator,
162
163
        sid=sid,
        s3alternate=s3alternate
164
165
166
    )
    job = dict(
        files=transfers,
167
        delete=deletion,
168
169
170
171
        params=params
    )
    return job

172
173
def new_staging_job(files, bring_online=None, copy_pin_lifetime=None, source_spacetoken=None,
                    spacetoken=None, metadata=None, priority=None, id_generator=JobIdGenerator.standard, sid=None):
174
175
    """
        Creates a new dictionary representing a staging job
176

177
    Args:
178
        files:  Array of surls to stage. Each item can be either a string or a dictionary with keys surl and metadata
179
180
181
182
183
184
        bring_online:      Bring online timeout
        copy_pin_lifetime: Pin lifetime
        source_spacetoken: Source space token
        spacetoken: Deletion spacetoken
        metadata:   Metadata to bind to the job
        priority:          Job priority
185
186
        id_generator:      Job id generator algorithm
        sid:               Specific id given by the client
187

188
189
190
191
192
    Returns:
        An initialized dictionary representing a staging job
    """
    if bring_online <= 0 and copy_pin_lifetime <= 0:
        raise ClientError('Bad request: bring_online and copy_pin_lifetime are not positive numbers')
193

194
    transfers = []
195
196
197
198
199
200
201
202
203
204
205
    for trans in files:
        if isinstance(trans, dict):
            surl=trans['surl']
            meta=trans['metadata']
        elif isinstance(trans, basestring):
            surl=trans
            meta=None
        else:
            raise AttributeError("Unexpected input type %s"%type(files))

        transfers.append(new_transfer(source=surl, destination=surl, metadata=meta))
206

207
208
209
210
211
212
    params = dict(
        source_spacetoken=source_spacetoken,
        spacetoken=spacetoken,
        bring_online=bring_online,
        copy_pin_lifetime=copy_pin_lifetime,
        job_metadata=metadata,
213
214
215
        priority=priority,
        id_generator=id_generator,
        sid=sid
216
217
218
219
220
221
    )
    job = dict(
       files=transfers,
       params=params
    )
    return job
222

223
def new_delete_job(files, spacetoken=None, metadata=None, priority=None, id_generator=JobIdGenerator.standard, sid=None):
224
225
226
227
228
229
230
    """
    Creates a new dictionary representing a deletion job

    Args:
        files:      Array of surls to delete. Each item can be either a string or a dictionary with keys surl and metadata
        spacetoken: Deletion spacetoken
        metadata:   Metadata to bind to the job
231
232
        id_generator:    Job id generator algorithm
        sid:    Specific id given by the client
233

234
    Return
235
236
237
238
        An initialized dictionary representing a deletion job
    """
    params = dict(
        source_spacetoken=spacetoken,
239
240
241
242
        job_metadata=metadata,
        priority=priority,
        id_generator=id_generator,
        sid=sid
243
244
245
246
247
248
249
250
    )
    job = dict(
        delete=files,
        params=params
    )
    return job


251
def submit(context, job, delegation_lifetime=timedelta(hours=7), force_delegation=False, delegate_when_lifetime_lt=timedelta(hours=2)):
252
253
254
255
256
257
    """
    Submits a job

    Args:
        context: fts3.rest.client.context.Context instance
        job:     Dictionary representing the job
258
259
        delegation_lifetime: Delegation lifetime
        force_delegation:    Force delegation even if there is a valid proxy
260
261
        delegate_when_lifetime_lt: If the remaining lifetime on the delegated proxy is less than this interval,
                  do a new delegation
262
263

    Returns:
264
        The job id
265
    """
266
    delegate(context, delegation_lifetime, force_delegation, delegate_when_lifetime_lt)
267
    submitter = Submitter(context)
268
    params = job.get('params', {})
269
270
271
272
    return submitter.submit(
        transfers=job.get('files', None), delete=job.get('delete', None), staging=job.get('staging', None),
        **params
    )