Skip to content
Snippets Groups Projects
Commit 50ddd877 authored by Martin Barisits's avatar Martin Barisits
Browse files

Merge branch 'hotfix-2814-estimated_at_is_not_propagated_in_requests_history' into 'master'

[RUCIO-2814] estimated_at is not propagated in requests_history

See merge request !1933
parents 1bb4a504 f4922a72
No related branches found
No related tags found
No related merge requests found
# Copyright European Organization for Nuclear Research (CERN) """
# Copyright European Organization for Nuclear Research (CERN)
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License. Licensed under the Apache License, Version 2.0 (the "License");
# You may obtain a copy of the License at You may not use this file except in compliance with the License.
# http://www.apache.org/licenses/LICENSE-2.0 You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Authors:
# - Mario Lassnig, <mario.lassnig@cern.ch>, 2013-2015, 2017 Authors:
# - Vincent Garonne, <vincent.garonne@cern.ch>, 2015-2017 - Mario Lassnig, <mario.lassnig@cern.ch>, 2013-2015, 2017
# - Martin Barisits, <martin.barisits@cern.ch>, 2014-2017 - Vincent Garonne, <vincent.garonne@cern.ch>, 2015-2017
# - Wen Guan, <wen.guan@cern.ch>, 2014-2016 - Martin Barisits, <martin.barisits@cern.ch>, 2014-2017
# - Joaquin Bogado, <jbogadog@cern.ch>, 2016 - Wen Guan, <wen.guan@cern.ch>, 2014-2016
# - Thomas Beermann, <thomas.beermann@cern.ch>, 2016 - Joaquin Bogado, <jbogadog@cern.ch>, 2016
- Thomas Beermann, <thomas.beermann@cern.ch>, 2016
- Cedric Serfon, <cedric.serfon@cern.ch>, 2017
"""
import datetime import datetime
import json import json
...@@ -441,8 +444,8 @@ def set_request_state(request_id, new_state, transfer_id=None, transferred_at=No ...@@ -441,8 +444,8 @@ def set_request_state(request_id, new_state, transfer_id=None, transferred_at=No
logging.error("Request %s should not be updated to 'Failed' or 'Done' without external transfer_id" % request_id) logging.error("Request %s should not be updated to 'Failed' or 'Done' without external transfer_id" % request_id)
else: else:
rowcount = session.query(models.Request).filter_by(id=request_id).update(update_items, synchronize_session=False) rowcount = session.query(models.Request).filter_by(id=request_id).update(update_items, synchronize_session=False)
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
if not rowcount: if not rowcount:
raise UnsupportedOperation("Request %s state cannot be updated." % request_id) raise UnsupportedOperation("Request %s state cannot be updated." % request_id)
...@@ -463,8 +466,8 @@ def set_requests_state(request_ids, new_state, session=None): ...@@ -463,8 +466,8 @@ def set_requests_state(request_ids, new_state, session=None):
try: try:
for request_id in request_ids: for request_id in request_ids:
set_request_state(request_id, new_state, session=session) set_request_state(request_id, new_state, session=session)
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
@transactional_session @transactional_session
...@@ -484,8 +487,8 @@ def touch_requests_by_rule(rule_id, session=None): ...@@ -484,8 +487,8 @@ def touch_requests_by_rule(rule_id, session=None):
.filter(models.Request.state.in_([RequestState.FAILED, RequestState.DONE, RequestState.LOST, RequestState.NO_SOURCES, RequestState.ONLY_TAPE_SOURCES]))\ .filter(models.Request.state.in_([RequestState.FAILED, RequestState.DONE, RequestState.LOST, RequestState.NO_SOURCES, RequestState.ONLY_TAPE_SOURCES]))\
.filter(models.Request.updated_at < datetime.datetime.utcnow())\ .filter(models.Request.updated_at < datetime.datetime.utcnow())\
.update({'updated_at': datetime.datetime.utcnow() + datetime.timedelta(minutes=20)}, synchronize_session=False) .update({'updated_at': datetime.datetime.utcnow() + datetime.timedelta(minutes=20)}, synchronize_session=False)
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
@read_session @read_session
...@@ -507,8 +510,8 @@ def get_request(request_id, session=None): ...@@ -507,8 +510,8 @@ def get_request(request_id, session=None):
tmp = dict(tmp) tmp = dict(tmp)
tmp.pop('_sa_instance_state') tmp.pop('_sa_instance_state')
return tmp return tmp
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
@read_session @read_session
...@@ -535,8 +538,8 @@ def get_requests_by_transfer(external_host, transfer_id, session=None): ...@@ -535,8 +538,8 @@ def get_requests_by_transfer(external_host, transfer_id, session=None):
result.append(t2) result.append(t2)
return result return result
return return
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
@read_session @read_session
...@@ -573,8 +576,8 @@ def get_request_by_did(scope, name, rse, rse_id=None, request_type=None, session ...@@ -573,8 +576,8 @@ def get_request_by_did(scope, name, rse, rse_id=None, request_type=None, session
tmp = dict(tmp) tmp = dict(tmp)
tmp.pop('_sa_instance_state') tmp.pop('_sa_instance_state')
return tmp return tmp
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
@transactional_session @transactional_session
...@@ -613,6 +616,7 @@ def archive_request(request_id, session=None): ...@@ -613,6 +616,7 @@ def archive_request(request_id, session=None):
dest_url=req['dest_url'], dest_url=req['dest_url'],
submitted_at=req['submitted_at'], submitted_at=req['submitted_at'],
started_at=req['started_at'], started_at=req['started_at'],
estimated_at=req['estimated_at'],
transferred_at=req['transferred_at']) transferred_at=req['transferred_at'])
hist_request.save(session=session) hist_request.save(session=session)
try: try:
...@@ -621,8 +625,8 @@ def archive_request(request_id, session=None): ...@@ -621,8 +625,8 @@ def archive_request(request_id, session=None):
record_timer('core.request.archive_request.%s' % req['activity'].replace(' ', '_'), time_diff_s) record_timer('core.request.archive_request.%s' % req['activity'].replace(' ', '_'), time_diff_s)
session.query(models.Source).filter_by(request_id=request_id).delete() session.query(models.Source).filter_by(request_id=request_id).delete()
session.query(models.Request).filter_by(id=request_id).delete() session.query(models.Request).filter_by(id=request_id).delete()
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
@transactional_session @transactional_session
...@@ -649,16 +653,16 @@ def cancel_request_did(scope, name, dest_rse_id, request_type=RequestType.TRANSF ...@@ -649,16 +653,16 @@ def cancel_request_did(scope, name, dest_rse_id, request_type=RequestType.TRANSF
request_type=request_type).all() request_type=request_type).all()
if not reqs: if not reqs:
logging.warn('Tried to cancel non-existant request for DID %s:%s at RSE ID %s' % (scope, name, dest_rse_id)) logging.warn('Tried to cancel non-existant request for DID %s:%s at RSE ID %s' % (scope, name, dest_rse_id))
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
for req in reqs: for req in reqs:
# is there a transfer already in FTS3? if so, try to cancel it # is there a transfer already in FTS3? if so, try to cancel it
if req[1] is not None: if req[1] is not None:
try: try:
fts3.cancel(req[1], req[2]) fts3.cancel(req[1], req[2])
except Exception, e: except Exception as error:
logging.warn('Could not cancel FTS3 transfer %s on %s: %s' % (req[1], req[2], str(e))) logging.warn('Could not cancel FTS3 transfer %s on %s: %s' % (req[1], req[2], str(error)))
archive_request(request_id=req[0], session=session) archive_request(request_id=req[0], session=session)
...@@ -811,8 +815,8 @@ def get_sources(request_id, rse_id=None, session=None): ...@@ -811,8 +815,8 @@ def get_sources(request_id, rse_id=None, session=None):
result.append(t2) result.append(t2)
return result return result
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
@read_session @read_session
...@@ -840,8 +844,8 @@ def get_heavy_load_rses(threshold, session=None): ...@@ -840,8 +844,8 @@ def get_heavy_load_rses(threshold, session=None):
result.append(t2) result.append(t2)
return result return result
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
@read_session @read_session
...@@ -876,8 +880,8 @@ def get_stats_by_activity_dest_state(state, session=None): ...@@ -876,8 +880,8 @@ def get_stats_by_activity_dest_state(state, session=None):
.with_hint(models.RSE, "INDEX(RSES RSES_PK)", 'oracle')\ .with_hint(models.RSE, "INDEX(RSES RSES_PK)", 'oracle')\
.filter(models.RSE.id == subquery.c.dest_rse_id).all() .filter(models.RSE.id == subquery.c.dest_rse_id).all()
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
@transactional_session @transactional_session
...@@ -919,8 +923,8 @@ def release_waiting_requests(rse, activity=None, rse_id=None, count=None, accoun ...@@ -919,8 +923,8 @@ def release_waiting_requests(rse, activity=None, rse_id=None, count=None, accoun
.update({'state': RequestState.QUEUED}, .update({'state': RequestState.QUEUED},
synchronize_session=False) synchronize_session=False)
return rowcount return rowcount
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
@read_session @read_session
...@@ -957,8 +961,8 @@ def update_requests_priority(priority, filter, session=None): ...@@ -957,8 +961,8 @@ def update_requests_priority(priority, filter, session=None):
logging.debug("Failed to boost request %s priority: %s" % (item[0], traceback.format_exc())) logging.debug("Failed to boost request %s priority: %s" % (item[0], traceback.format_exc()))
else: else:
logging.debug("Update request %s priority to %s: %s" % (item[0], priority, res['http_message'])) logging.debug("Update request %s priority to %s: %s" % (item[0], priority, res['http_message']))
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
@transactional_session @transactional_session
...@@ -1140,8 +1144,8 @@ def __touch_request(request_id, session=None): ...@@ -1140,8 +1144,8 @@ def __touch_request(request_id, session=None):
try: try:
rowcount = session.query(models.Request).filter_by(id=request_id).update({'updated_at': datetime.datetime.utcnow()}, synchronize_session=False) rowcount = session.query(models.Request).filter_by(id=request_id).update({'updated_at': datetime.datetime.utcnow()}, synchronize_session=False)
except IntegrityError, e: except IntegrityError as error:
raise RucioException(e.args) raise RucioException(error.args)
if not rowcount: if not rowcount:
raise UnsupportedOperation("Request %s cannot be touched." % request_id) raise UnsupportedOperation("Request %s cannot be touched." % request_id)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment