banning.py 14.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#   Copyright notice:
#   Copyright CERN, 2014.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
15
from fts3rest.controllers.config import audit_configuration
16

17
18
19
20
try:
    import simplejson as json
except:
    import json
21
import logging
22
23
from datetime import datetime
from pylons import request
24
from sqlalchemy import distinct, func, and_
25
26
27
28
29
30
31
32
33

from fts3.model import BannedDN, BannedSE, Job, File, JobActiveStates, FileActiveStates
from fts3rest.lib.api import doc
from fts3rest.lib.base import BaseController, Session
from fts3rest.lib.helpers import jsonify
from fts3rest.lib.http_exceptions import *
from fts3rest.lib.middleware.fts3auth import authorize
from fts3rest.lib.middleware.fts3auth.constants import *

34
35
log = logging.getLogger(__name__)

36

37
def _ban_se(storage, vo_name, allow_submit, status, message):
38
39
40
41
42
43
44
45
46
    """
    Mark in the db the given storage as banned
    """
    user = request.environ['fts3.User.Credentials']
    banned = BannedSE()
    banned.se = storage
    banned.addition_time = datetime.utcnow()
    banned.admin_dn = user.user_dn
    banned.vo = vo_name
47
    banned.message = message
48
49
50
51
    if allow_submit and status == 'WAIT':
        banned.status = 'WAIT_AS'
    else:
        banned.status = status
52
53
54
55
56
57
    try:
        Session.merge(banned)
        Session.commit()
    except Exception:
        Session.rollback()
        raise
58
59


60
def _ban_dn(dn, message):
61
62
63
64
65
66
67
68
    """
    Mark in the db the given DN as banned
    """
    user = request.environ['fts3.User.Credentials']
    banned = BannedDN()
    banned.dn = dn
    banned.addition_time = datetime.utcnow()
    banned.admin_dn = user.user_dn
69
    banned.message = message
70
71
72
73
74
75
    try:
        Session.merge(banned)
        Session.commit()
    except Exception:
        Session.rollback()
        raise
76
77
78
79
80
81
82
83
84


def _cancel_transfers(storage=None, vo_name=None):
    """
    Cancels the transfers that have the given storage either in source or destination,
    and belong to the given VO.
    Returns the list of affected jobs ids.
    """
    affected_job_ids = set()
85
86
87
88
89
90
    files = Session.query(File.file_id).filter(
        and_(
            (File.source_se == storage) | (File.dest_se == storage),
            File.file_state.in_(FileActiveStates + ['NOT_USED'])
        )
    )
91
    if vo_name and vo_name != '*':
92
93
94
95
96
        files = files.filter(File.vo_name == vo_name)

    now = datetime.utcnow()

    try:
97
98
99
100
        for row in files:
            file_id = row[0]
            job_id, file_index = Session.query(File.job_id, File.file_index).filter(File.file_id == file_id).one()
            affected_job_ids.add(job_id)
101
            # Cancel the affected file
102
103
104
105
106
            Session.query(File).filter(File.file_id == file_id)\
                .update({
                    'file_state': 'CANCELED', 'reason': 'Storage banned',
                    'finish_time': now
                }, synchronize_session=False)
107
            # If there are alternatives, enable them
108
109
110
111
112
113
114
115
116
117
118
            if Session.bind.dialect.name == 'mysql':
                limit = " LIMIT 1"
            else:
                limit = ''
            Session.execute(
                "UPDATE t_file SET"
                "   file_state = 'SUBMITTED' "
                "WHERE"
                "  job_id = :job_id AND file_index = :file_index AND file_state = 'NOT_USED' " + limit,
                dict(job_id=job_id, file_index=file_index)
            )
119
120

        Session.commit()
121
        Session.expire_all()
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
    except Exception:
        Session.rollback()
        raise

    # Set each job terminal state if needed
    try:
        for job_id in affected_job_ids:
            n_files = Session.query(func.count(distinct(File.file_id))).filter(File.job_id == job_id).all()[0][0]
            n_canceled = Session.query(func.count(distinct(File.file_id)))\
                .filter(File.job_id == job_id).filter(File.file_state == 'CANCELED').all()[0][0]
            n_finished = Session.query(func.count(distinct(File.file_id)))\
                .filter(File.job_id == job_id).filter(File.file_state == 'FINISHED').all()[0][0]
            n_failed = Session.query(func.count(distinct(File.file_id)))\
                .filter(File.job_id == job_id).filter(File.file_state == 'FAILED').all()[0][0]

            n_terminal = n_canceled + n_finished + n_failed

            # Job finished!
            if n_terminal == n_files:
                reason = None
                Session.query(Job).filter(Job.job_id == job_id).update({
                    'job_state': 'CANCELED',
                    'job_finished': now,
                    'reason': reason
                })

        Session.commit()
    except Exception:
        Session.rollback()
        raise
    return affected_job_ids


def _cancel_jobs(dn):
    """
    Cancel all jobs that belong to dn.
    Returns the list of affected jobs ids.
    """
160
161
162
    jobs = Session.query(Job.job_id).filter(
        Job.job_state.in_(JobActiveStates), Job.user_dn == dn, Job.job_finished == None
    )
163
164
165
166
167
168
169
170
    job_ids = map(lambda j: j[0], jobs)

    try:
        now = datetime.utcnow()
        for job_id in job_ids:
            Session.query(File).filter(File.job_id == job_id).filter(File.file_state.in_(FileActiveStates))\
                .update({
                    'file_state': 'CANCELED', 'reason': 'User banned',
171
                    'finish_time': now
172
                }, synchronize_session=False)
173
174
175
            Session.query(Job).filter(Job.job_id == job_id)\
                .update({
                    'job_state': 'CANCELED', 'reason': 'User banned',
176
                    'job_finished': now
177
                }, synchronize_session=False)
178
        Session.commit()
179
        Session.expire_all()
180
181
182
183
184
185
        return job_ids
    except Exception:
        Session.rollback()
        raise


186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def _set_to_wait_helper(storage, vo_name, from_state, to_state):
    """
    Helper for _set_to_wait
    """
    file_ids = Session.query(File.file_id).filter(
        and_(
            File.file_state == from_state),
            (File.source_se == storage) | (File.dest_se == storage)
    )
    if vo_name and vo_name != '*':
        file_ids = file_ids.filter(File.vo_name == vo_name)

    file_ids = map(lambda j: j[0], file_ids.all())
    job_ids = set()
    for file_id in file_ids:
        Session.query(File).filter(File.file_id == file_id).update({'file_state': to_state}, synchronize_session=False)
        job_ids.add(Session.query(File).get(file_id).job_id)
    return job_ids


206
def _set_to_wait(storage, vo_name):
207
208
209
210
211
    """
    Updates the transfers that have the given storage either in source or destination,
    and belong to the given VO.
    """
    try:
212
213
        job_ids = _set_to_wait_helper(storage, vo_name, 'SUBMITTED', 'ON_HOLD')
        job_ids.update(_set_to_wait_helper(storage, vo_name, 'STAGING', 'ON_HOLD_STAGING'))
214
        Session.commit()
215
        Session.expire_all()
216
217
218
    except Exception:
        Session.rollback()
        raise
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
    return job_ids

def _reenter_queue(storage, vo_name):
    """
    Resets to SUBMITTED or STAGING those transfers that were set ON_HOLD with a previous banning
    Returns the list of affects job ids.
    """
    job_ids = Session.query(distinct(File.job_id))\
        .filter((File.source_se == storage) | (File.dest_se == storage)).filter(File.file_state.in_(['ON_HOLD', 'ON_HOLD_STAGING']))
    if vo_name and vo_name != '*':
        job_ids = job_ids.filter(File.vo_name == vo_name)
    job_ids = map(lambda j: j[0], job_ids.all())

    try:
        for job_id in job_ids:
            Session.query(File).filter(File.job_id == job_id, File.file_state == 'ON_HOLD_STAGING')\
                .update({'file_state': 'STAGING'}, synchronize_session=False)
            Session.query(File).filter(File.job_id == job_id, File.file_state == 'ON_HOLD')\
                .update({'file_state': 'SUBMITTED'}, synchronize_session=False)
    except Exception:
        Session.rollback()
        raise

    return job_ids

244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262

class BanningController(BaseController):
    """
    Banning API
    """

    @doc.query_arg('storage', 'Storage to ban', required=True)
    @doc.query_arg('vo_name', 'Limit the banning to a given VO', required=False)
    @doc.query_arg('allow_submit', 'If true, transfers will not run, but submissions will be accepted', required=False)
    @doc.query_arg(
        'status',
        'What to do with the queued jobs: cancel (default, cancel immediately) or wait(wait for some time)',
        required=False
    )
    @doc.query_arg(
        'timeout',
        'If status==wait, timeout for the queued jobs. 0 = will not timeout (default)',
        required=False
    )
263
    @doc.query_arg('message', 'Explanatory message if desired', required=False)
264
    @doc.response(400, 'storage is missing, or any of the others have an invalid value')
265
    @doc.response(403, 'The user is not allowed to change the configuration')
266
    @doc.return_type(array_of=str)
267
    @authorize(CONFIG)
268
269
270
271
272
    @jsonify
    def ban_se(self):
        """
        Ban a storage element. Returns affected jobs ids.
        """
273
274
275
276
277
278
279
280
281
        if request.content_type == 'application/json':
            try:
                input_dict = json.loads(request.body)
            except Exception:
                raise HTTPBadRequest('Malformed input')
        else:
            input_dict = request.params

        storage = input_dict.get('storage', None)
282
283
284
        if not storage:
            raise HTTPBadRequest('Missing storage parameter')

Andrea Manzi's avatar
Andrea Manzi committed
285
286
        user = request.environ['fts3.User.Credentials']
	vo_name = user.vos[0]
287
288
        allow_submit = bool(input_dict.get('allow_submit', False))
        status = input_dict.get('status', 'cancel').upper()
289
290
291
292
293
294
295

        if status not in ['CANCEL', 'WAIT']:
            raise HTTPBadRequest('status can only be cancel or wait')

        if allow_submit and status == 'CANCEL':
            raise HTTPBadRequest('allow_submit and status = CANCEL can not be combined')

296
        _ban_se(storage, vo_name, allow_submit, status, input_dict.get('message', ''))
297
        audit_configuration('ban-se', "Storage %s for %s banned (%s)" % (storage, vo_name, status))
298
299

        if status == 'CANCEL':
300
            affected = _cancel_transfers(storage=storage, vo_name=vo_name)
301
        else:
302
            affected = _set_to_wait(storage=storage, vo_name=vo_name)
303
304
305

        log.warn("Storage %s banned (%s), %d jobs affected" % (storage, status, len(affected)))
        return affected
306
307

    @doc.query_arg('user_dn', 'User DN to ban', required=True)
308
    @doc.query_arg('message', 'Explanatory message if desired', required=False)
309
    @doc.response(400, 'dn is missing')
310
    @doc.response(403, 'The user is not allowed to change the configuration')
311
    @doc.response(409, 'The user tried to ban (her|his)self')
312
    @authorize(CONFIG)
313
314
315
316
317
    @jsonify
    def ban_dn(self):
        """
        Ban a user
        """
318
319
320
321
322
323
324
325
        if request.content_type == 'application/json':
            try:
                input_dict = json.loads(request.body)
            except Exception:
                raise HTTPBadRequest('Malformed input')
        else:
            input_dict = request.params

326
        user = request.environ['fts3.User.Credentials']
327
        dn = input_dict.get('user_dn', None)
328
329
330
331
332
333

        if not dn:
            raise HTTPBadRequest('Missing dn parameter')
        if dn == user.user_dn:
            raise HTTPConflict('The user tried to ban (her|his)self')

334
        _ban_dn(dn, input_dict.get('message', ''))
335
336
        affected = _cancel_jobs(dn=dn)

337
        audit_configuration('ban-dn', "User %s banned" % (dn))
338
339
340
        log.warn("User %s banned, %d jobs affected" % (dn, len(affected)))

        return affected
341
342
343
344
345
346

    @doc.query_arg('storage', 'The storage to unban', required=True)
    @doc.response(204, 'Success')
    @doc.response(400, 'storage is empty or missing')
    @doc.response(403, 'The user is not allowed to perform configuration actions')
    @authorize(CONFIG)
347
    @jsonify
348
349
350
351
352
353
354
    def unban_se(self, start_response):
        """
        Unban a storage element
        """
        storage = request.params.get('storage', None)
        if not storage:
            raise HTTPBadRequest('Missing storage parameter')
355
        job_ids = []
Andrea Manzi's avatar
Andrea Manzi committed
356
        try:
Andrea Manzi's avatar
Andrea Manzi committed
357
358
359
360
            user = request.environ['fts3.User.Credentials']
            vo_name = user.vos[0]
            Session.query(BannedSE).filter(BannedSE.se==storage,BannedSE.vo==vo_name).delete()
            job_ids= _reenter_queue(storage, vo_name)
Andrea Manzi's avatar
Andrea Manzi committed
361
362
363
364
365
366
            Session.commit()
        except Exception:
            Session.rollback()
            raise HTTPBadRequest('Storage not found')
        log.warn("Storage %s unbanned" % storage)
        audit_configuration('unban-se', "Storage %s unbanned" % storage)
367
        start_response('204 No Content', [])
368
        return job_ids
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384

    @doc.query_arg('user_dn', 'User DN to unban', required=True)
    @doc.response(204, 'Success')
    @doc.response(400, 'user_dn is empty or missing')
    @doc.response(403, 'The user is not allowed to perform configuration actions')
    @authorize(CONFIG)
    def unban_dn(self, start_response):
        """
        Unban a user
        """
        dn = request.params.get('user_dn', None)
        if not dn:
            raise HTTPBadRequest('Missing user_dn parameter')

        banned = Session.query(BannedDN).get(dn)
        if banned:
385
386
387
388
389
390
            try:

                Session.delete(banned)
                Session.commit()
            except Exception:
                Session.rollback()
391
392
393
            log.warn("User %s unbanned" % dn)
        else:
            log.warn("Unban of user %s without effect" % dn)
394

395
        audit_configuration('unban-dn', "User %s unbanned" % (dn))
396
397
398
        start_response('204 No Content', [])
        return ['']

399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
    @doc.response(403, 'The user is not allowed to check the configuration')
    @authorize(CONFIG)
    @jsonify
    def list_banned_dn(self, start_response):
        """
        List banned users
        """
        return Session.query(BannedDN).all()

    @doc.response(403, 'The user is not allowed to check the configuration')
    @authorize(CONFIG)
    @jsonify
    def list_banned_se(self, start_response):
        """
        List banned storage elements
        """
        return Session.query(BannedSE).all()

417
418

__all__ = ['BanningController']