jobsubmitter.py 13.2 KB
Newer Older
Michal Simon's avatar
Michal Simon committed
1
#   Copyright notice:
2
#   Copyright  Members of the EMI Collaboration, 2013.
Alejandro Alvarez Ayllon's avatar
Alejandro Alvarez Ayllon committed
3
#
Michal Simon's avatar
Michal Simon committed
4
#   See www.eu-emi.eu for details on the copyright holders
Alejandro Alvarez Ayllon's avatar
Alejandro Alvarez Ayllon committed
5
#
Michal Simon's avatar
Michal Simon committed
6
7
8
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
Alejandro Alvarez Ayllon's avatar
Alejandro Alvarez Ayllon committed
9
#
Michal Simon's avatar
Michal Simon committed
10
#       http://www.apache.org/licenses/LICENSE-2.0
Alejandro Alvarez Ayllon's avatar
Alejandro Alvarez Ayllon committed
11
#
Michal Simon's avatar
Michal Simon committed
12
13
14
15
16
17
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

18
from datetime import timedelta
19
20
try:
    import simplejson as json
21
22
except:
    import json
23
24
25
26
import logging
import sys
import time

27
from base import Base
28
from fts3.rest.client import Submitter, Delegator, Inquirer
29

30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
DEFAULT_PARAMS = {
    'checksum': 'ADLER32',
    'overwrite': False,
    'reuse': False,
    'job_metadata': None,
    'file_metadata': None,
    'filesize': None,
    'gridftp': None,
    'spacetoken': None,
    'source_spacetoken': None,
    'verify_checksum': 'n',
    'copy_pin_lifetime': -1,
    'bring_online': -1,
    'archive_timeout': -1,
    'timeout': None,
    'fail_nearline': False,
    'retry': 0,
    'multihop': False,
    'credential': None,
    'nostreams': None,
    's3alternate': False,
    'target_qos': None,
    'ipv4': False,
    'ipv6': False
}
55

56
57
58
59
60
61
62
def _metadata(data):
    try:
        return json.loads(data)
    except:
        return str(data)


63
class JobSubmitter(Base):
Alejandro Alvarez Ayllon's avatar
Alejandro Alvarez Ayllon committed
64
    def __init__(self):
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
        super(JobSubmitter, self).__init__(
            extra_args='SOURCE DESTINATION [CHECKSUM]',
            description="""
            This command can be used to submit new jobs to FTS3. It supports simple and bulk submissions. The bulk
            format is as follows:

            ```json
            {
              "files": [
                {
                  "sources": [
                    "gsiftp://source.host/file"
                  ],
                  "destinations": [
                    "gsiftp://destination.host/file"
                  ],
                  "metadata": "file-metadata",
                  "checksum": "ADLER32:1234",
                  "filesize": 1024
                },
                {
                  "sources": [
                    "gsiftp://source.host/file2"
                  ],
                  "destinations": [
                    "gsiftp://destination.host/file2"
                  ],
                  "metadata": "file2-metadata",
                  "checksum": "ADLER32:4321",
94
95
                  "filesize": 2048,
                  "activity": "default"
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
                }
              ]
            }
            ```
            """,
            example="""
            $ %(prog)s -s https://fts3-devel.cern.ch:8446 gsiftp://source.host/file gsiftp://destination.host/file
            Job successfully submitted.
            Job id: 9fee8c1e-c46d-11e3-8299-02163e00a17a

            $ %(prog)s -s https://fts3-devel.cern.ch:8446 -f bulk.json
            Job successfully submitted.
            Job id: 9fee8c1e-c46d-11e3-8299-02163e00a17a
            """
        )
111

ayllon's avatar
ayllon committed
112
        # Specific options
113
114
115
116
117
118
        self.opt_parser.add_option('-b', '--blocking', dest='blocking', default=False, action='store_true',
                                   help='blocking mode. Wait until the operation completes.')
        self.opt_parser.add_option('-i', '--interval', dest='poll_interval', type='int', default=30,
                                   help='interval between two poll operations in blocking mode.')
        self.opt_parser.add_option('-e', '--expire', dest='proxy_lifetime', type='int', default=420,
                                   help='expiration time of the delegation in minutes.')
119
120
        self.opt_parser.add_option('--delegate-when-lifetime-lt', type=int, default=120,
                                   help='delegate the proxy when the remote lifetime is less than this value (in minutes)')
121
        self.opt_parser.add_option('-o', '--overwrite', dest='overwrite', action='store_true',
122
                                   help='overwrite files.')
123
        self.opt_parser.add_option('-r', '--reuse', dest='reuse', action='store_true',
124
125
126
127
128
129
130
131
132
133
134
135
136
                                   help='enable session reuse for the transfer job.')
        self.opt_parser.add_option('--job-metadata', dest='job_metadata',
                                   help='transfer job metadata.')
        self.opt_parser.add_option('--file-metadata', dest='file_metadata',
                                   help='file metadata.')
        self.opt_parser.add_option('--file-size', dest='file_size', type='long',
                                   help='file size (in Bytes)')
        self.opt_parser.add_option('-g', '--gparam', dest='gridftp_params',
                                   help='GridFTP parameters.')
        self.opt_parser.add_option('-t', '--dest-token', dest='destination_token',
                                   help='the destination space token or its description.')
        self.opt_parser.add_option('-S', '--source-token', dest='source_token',
                                   help='the source space token or its description.')
137
        self.opt_parser.add_option('-K', '--compare-checksum', dest='compare_checksum', default=False, action='store_true',
138
                                   help='deprecated: compare checksums between source and destination.')
139
        self.opt_parser.add_option('-C', '--checksum-mode', dest='checksum_mode', type='string',
140
                                   help='compare checksums in source, target, both or none.')
141
        self.opt_parser.add_option('--copy-pin-lifetime', dest='pin_lifetime', type='long',
142
                                   help='pin lifetime of the copy in seconds.')
143
        self.opt_parser.add_option('--bring-online', dest='bring_online', type='long',
144
                                   help='bring online timeout in seconds.')
145
        self.opt_parser.add_option('--archive-timeout', dest='archive_timeout', type='long',
146
                                   help='archive timeout in seconds.')
147
        self.opt_parser.add_option('--timeout', dest='timeout', type='long',
148
                                   help='transfer timeout in seconds.')
149
        self.opt_parser.add_option('--fail-nearline', dest='fail_nearline', action='store_true',
Edward Karavakis's avatar
Edward Karavakis committed
150
                                   help='fail the transfer if the file is nearline.')
151
152
153
154
        self.opt_parser.add_option('--dry-run', dest='dry_run', default=False, action='store_true',
                                   help='do not send anything, just print the JSON message.')
        self.opt_parser.add_option('-f', '--file', dest='bulk_file', type='string',
                                   help='Name of configuration file')
155
        self.opt_parser.add_option('--retry', dest='retry', type='int',
156
157
                                   help='Number of retries. If 0, the server default will be used.'
                                        'If negative, there will be no retries.')
158
        self.opt_parser.add_option('-m', '--multi-hop', dest='multihop', action='store_true',
159
                                   help='submit a multihop transfer.')
160
        self.opt_parser.add_option('--cloud-credentials', dest='cloud_cred',
161
                                   help='use cloud credentials for the job (i.e. dropbox).')
162
        self.opt_parser.add_option('--nostreams', dest='nostreams',
163
                                   help='number of streams')
164
        self.opt_parser.add_option('--ipv4', dest='ipv4', action='store_true',
165
                                   help='force ipv4')
166
        self.opt_parser.add_option('--ipv6', dest='ipv6', action='store_true',
167
                                   help='force ipv6')
168
        self.opt_parser.add_option('--s3alternate', dest='s3alternate', action='store_true',
169
                                   help='use S3 alternate URL')
170
        self.opt_parser.add_option('--target-qos', dest='target_qos', type='string',
171
                                   help='define the target QoS for this transfer for CDMI endpoints')
172

Alejandro Alvarez Ayllon's avatar
Alejandro Alvarez Ayllon committed
173
    def validate(self):
174
        self.checksum = None
Anna Iutalova's avatar
Anna Iutalova committed
175
176
177
178
179
180
181
182
183
184
185
        if not self.options.bulk_file:
            if len(self.args) < 2:
                self.logger.critical("Need a source and a destination")
                sys.exit(1)
            elif len(self.args) == 2:
                (self.source, self.destination) = self.args
            elif len(self.args) == 3:
                (self.source, self.destination, self.checksum) = self.args
            else:
                self.logger.critical("Too many parameters")
                sys.exit(1)
186

187
188
        self._prepare_options()
        if self.params['ipv4'] and self.params['ipv6']:
189
190
            self.opt_parser.error('ipv4 and ipv6 can not be used at the same time')

191
    def _build_transfers(self):
192
        if self.options.bulk_file:
193
194
195
            with open(self.options.bulk_file, 'r') as file:
                filecontent = file.read()
                bulk = json.loads(filecontent)
196
197
198
199
200
201
202
            if "files" in bulk:
                return bulk["files"]
            elif "Files" in bulk:
                return bulk["Files"]
            else:
                self.logger.critical("Could not find any transfers")
                sys.exit(1)
203
204
        else:
            return [{"sources": [self.source], "destinations": [self.destination]}]
205

206
207
208
    def _build_params(self, **kwargs):
        params = dict()
        params.update(DEFAULT_PARAMS)
209

210
211
212
213
214
215
        if self.options.bulk_file:
            with open(self.options.bulk_file, 'r') as file:
                filecontent = file.read()
                bulk = json.loads(filecontent)
            if 'params' in bulk:
                params.update(bulk['params'])
216

217
218
219
220
        # Apply command-line parameters
        for k, v in kwargs.iteritems():
            if v is not None:
                params[k] = v
221

222
223
224
225
        # JSONify metadata
        params['job_metadata'] = _metadata(params['job_metadata'])
        params['file_metadata'] = _metadata(params['file_metadata'])
        return params
226

227
228
229
230
231
232
233
234
235
    def _prepare_options(self):
        # Backwards compatibility: compare_checksum parameter
        # Note: compare_checksum has higher priority than checksum_mode
        if self.options.compare_checksum:
            checksum_mode ='both'
        elif self.options.checksum_mode:
            checksum_mode = self.options.checksum_mode
        else:
            checksum_mode = 'none'
236

237
238
        self.transfers=self._build_transfers()
        self.params=self._build_params(
239
            checksum=self.checksum,
240
            bring_online=self.options.bring_online,
241
            archive_timeout=self.options.archive_timeout,
242
            timeout = self.options.timeout,
243
            verify_checksum=checksum_mode[0],
244
245
            spacetoken=self.options.destination_token,
            source_spacetoken=self.options.source_token,
246
            fail_nearline=self.options.fail_nearline,
247
            file_metadata=self.options.file_metadata,
248
249
            filesize=self.options.file_size,
            gridftp=self.options.gridftp_params,
250
            job_metadata=self.options.job_metadata,
251
252
253
            overwrite=self.options.overwrite,
            copy_pin_lifetime=self.options.pin_lifetime,
            reuse=self.options.reuse,
254
            retry=self.options.retry,
255
            multihop=self.options.multihop,
256
257
258
            credential=self.options.cloud_cred,
            nostreams=self.options.nostreams,
            ipv4=self.options.ipv4,
259
            ipv6=self.options.ipv6,
260
            s3alternate=self.options.s3alternate,
261
            target_qos=self.options.target_qos
262
        )
263

264
265
266
267
268
269
270
271
272
273
274
275
276
277
    def _do_submit(self, context):
        if not self.options.access_token:
            delegator = Delegator(context)
            delegator.delegate(
                timedelta(minutes=self.options.proxy_lifetime),
                delegate_when_lifetime_lt=timedelta(minutes=self.options.delegate_when_lifetime_lt)
            )

        submitter = Submitter(context)
        job_id = submitter.submit(
            transfers=self.transfers,
            params=self.params
        )

ayllon's avatar
ayllon committed
278
        if self.options.json:
279
            self.logger.info(json.dumps(job_id))
ayllon's avatar
ayllon committed
280
281
        else:
            self.logger.info("Job successfully submitted.")
282
283
284
285
            self.logger.info("Job id: %s" % job_id)
        if job_id and self.options.blocking:
            inquirer = Inquirer(context)
            job = inquirer.get_job_status(job_id)
286
            while job['job_state'] in ['SUBMITTED', 'READY', 'STAGING', 'ACTIVE', 'ARCHIVING', 'QOS_TRANSITION', 'QOS_REQUEST_SUBMITTED']:
ayllon's avatar
ayllon committed
287
                self.logger.info("Job in state %s" % job['job_state'])
288
289
                time.sleep(self.options.poll_interval)
                job = inquirer.get_job_status(job_id)
290

ayllon's avatar
ayllon committed
291
292
293
            self.logger.info("Job finished with state %s" % job['job_state'])
            if job['reason']:
                self.logger.info("Reason: %s" % job['reason'])
294

295
        return job_id
ayllon's avatar
ayllon committed
296

297
298
299
    def _do_dry_run(self, context):
        submitter = Submitter(context)
        print submitter.build_submission(
300
301
            transfers=self.transfers,
            params=self.params
302
        )
ayllon's avatar
ayllon committed
303
        return None
ayllon's avatar
ayllon committed
304

Alejandro Alvarez Ayllon's avatar
Alejandro Alvarez Ayllon committed
305
306
    def run(self):
        context = self._create_context()
ayllon's avatar
ayllon committed
307
        if not self.options.dry_run:
308
            return self._do_submit(context)
ayllon's avatar
ayllon committed
309
        else:
310
            return self._do_dry_run(context)