Commit 7d18eba6 authored by Alejandro Alvarez Ayllon's avatar Alejandro Alvarez Ayllon
Browse files

FTS-817: Adapt to schema 3.0

parent 39df9d8c
......@@ -23,7 +23,7 @@ from sqlalchemy import Column, DateTime, Text, String
class CredentialCache(Base):
__tablename__ = 't_credential_cache'
dlg_id = Column(String(100), primary_key=True)
dlg_id = Column(String(16), primary_key=True)
dn = Column(String(255), primary_key=True)
cert_request = Column(Text)
priv_key = Column(Text)
......@@ -33,7 +33,7 @@ class CredentialCache(Base):
class Credential(Base):
__tablename__ = 't_credential'
dlg_id = Column(String(100), primary_key=True)
dlg_id = Column(String(16), primary_key=True)
dn = Column(String(255), primary_key=True)
proxy = Column(Text)
voms_attrs = Column(Text)
......
......@@ -36,24 +36,18 @@ class File(Base):
vo_name = Column(String(50))
source_se = Column(String(255))
dest_se = Column(String(255))
symbolicname = Column(String(255))
file_state = Column(String(32))
transferhost = Column(String(255))
transfer_host = Column(String(255))
source_surl = Column(String(1100))
dest_surl = Column(String(1100))
agent_dn = Column(String(1024))
error_scope = Column(String(32))
error_phase = Column(String(32))
reason_class = Column(String(32))
reason = Column(String(2048))
num_failures = Column(Integer)
recoverable = Column('current_failures', Boolean)
filesize = Column(Float)
checksum = Column(String(100))
finish_time = Column(DateTime)
start_time = Column(DateTime)
internal_file_params = Column(String(255))
job_finished = Column(DateTime)
pid = Column(Integer)
tx_duration = Column(Float)
throughput = Column(Float)
......@@ -89,17 +83,12 @@ class ArchivedFile(Base):
ForeignKey('t_job_backup.job_id'))
source_se = Column(String(255))
dest_se = Column(String(255))
symbolicname = Column(String(255))
file_state = Column(String(32))
transferhost = Column(String(255))
source_surl = Column(String(1100))
dest_surl = Column(String(1100))
agent_dn = Column(String(1024))
error_scope = Column(String(32))
error_phase = Column(String(32))
reason_class = Column(String(32))
reason = Column(String(2048))
num_failures = Column(Integer)
current_failures = Column(Integer)
filesize = Column(Float)
checksum = Column(String(100))
......
......@@ -33,19 +33,14 @@ class Job(Base):
source_se = Column(String(255))
dest_se = Column(String(255))
job_state = Column(String(32))
reuse_job = Column(String(1), default='N')
job_type = Column(String(1), default='N')
cancel_job = Column(Flag(negative=None))
job_params = Column(String(255))
submit_host = Column(String(255))
user_dn = Column(String(1024))
agent_dn = Column(String(1024))
user_cred = Column(String(255))
cred_id = Column(String(100))
vo_name = Column(String(50))
voms_cred = Column(Text)
reason = Column(String(2048))
submit_time = Column(DateTime)
finish_time = Column(DateTime)
priority = Column(Integer)
max_time_in_queue = Column(Integer)
space_token = Column(String(255))
......@@ -53,7 +48,6 @@ class Job(Base):
overwrite_flag = Column(Flag)
job_finished = Column(DateTime)
source_space_token = Column(String(255))
source_token_description = Column(String(255))
copy_pin_lifetime = Column(Integer)
verify_checksum = Column(TernaryFlag(positive='c'),
name='checksum_method')
......@@ -88,13 +82,10 @@ class ArchivedJob(Base):
submit_host = Column(String(255))
user_dn = Column(String(1024))
agent_dn = Column(String(1024))
user_cred = Column(String(255))
cred_id = Column(String(100))
vo_name = Column(String(50))
voms_cred = Column(Text)
reason = Column(String(2048))
submit_time = Column(DateTime)
finish_time = Column(DateTime)
priority = Column(Integer)
max_time_in_queue = Column(Integer)
space_token = Column(String(255))
......@@ -102,7 +93,6 @@ class ArchivedJob(Base):
overwrite_flag = Column(Flag)
job_finished = Column(DateTime)
source_space_token = Column(String(255))
source_token_description = Column(String(255))
copy_pin_lifetime = Column(Integer)
verify_checksum = Column(Flag(positive='c'),
name='checksum_method')
......
......@@ -128,12 +128,8 @@ def _cancel_transfers(storage=None, vo_name=None):
Session.query(Job).filter(Job.job_id == job_id).update({
'job_state': 'CANCELED',
'job_finished': now,
'finish_time': now,
'reason': reason
})
Session.query(File).filter(File.job_id == job_id).update({
'job_finished': now
})
Session.commit()
except Exception:
......@@ -156,12 +152,12 @@ def _cancel_jobs(dn):
Session.query(File).filter(File.job_id == job_id).filter(File.file_state.in_(FileActiveStates))\
.update({
'file_state': 'CANCELED', 'reason': 'User banned',
'job_finished': now, 'finish_time': now
'finish_time': now
}, synchronize_session=False)
Session.query(Job).filter(Job.job_id == job_id)\
.update({
'job_state': 'CANCELED', 'reason': 'User banned',
'job_finished': now, 'finish_time': now
'job_finished': now
}, synchronize_session=False)
Session.commit()
Session.expire_all()
......@@ -316,7 +312,6 @@ class BanningController(BaseController):
Session.delete(banned)
Session.query(File)\
.filter(File.file_state == 'SUBMITTED')\
.filter(File.job_finished == None)\
.filter((File.source_se == storage) | (File.dest_se == storage))\
.update({
'wait_timestamp': None,
......
......@@ -103,6 +103,6 @@ class FilesController(BaseController):
filter_not_before = datetime.utcnow() - filter_time
files = files.filter(File.job_finished >= filter_not_before)
else:
files = files.filter(File.job_finished == None)
files = files.filter(File.finish_time == None)
return files[:filter_limit]
......@@ -295,8 +295,8 @@ class JobsController(BaseController):
"""
job = self._get_job(job_id)
if job.reuse_job != 'N':
raise HTTPBadRequest('Multihop or reuse jobs must be cancelled at once (%s)' % str(job.reuse_job))
if job.job_type != 'N':
raise HTTPBadRequest('Multihop or reuse jobs must be cancelled at once (%s)' % str(job.job_type))
file_ids = file_ids.split(',')
changed_states = list()
......@@ -334,12 +334,6 @@ class JobsController(BaseController):
job.job_state = 'CANCELED'
job.cancel_job = True
job.job_finished = datetime.utcnow()
# Remember, job_finished must be NULL so FTS3 kills the fts_url_copy process
# Update job_finished for all the others
Session.query(File).filter(File.job_id == job_id).filter(~File.file_id.in_(file_ids))\
.update({
'job_finished': job.job_finished
}, synchronize_session=False)
else:
log.warning('Cancelling files within a job with others still active (%s)' % job_id)
......@@ -412,23 +406,22 @@ class JobsController(BaseController):
for job in cancellable_jobs:
job.job_state = 'CANCELED'
job.cancel_job = True
job.finish_time = now
job.job_finished = now
job.reason = 'Job canceled by the user'
# FTS3 daemon expects job_finished to be NULL in order to trigger the signal
# FTS3 daemon expects finish_time to be NULL in order to trigger the signal
# to fts_url_copy, but this only makes sense if pid is set
Session.query(File).filter(File.job_id == job.job_id)\
.filter(File.file_state.in_(FileActiveStates), File.pid != None)\
.update({
'file_state': 'CANCELED', 'reason': 'Job canceled by the user',
'finish_time': now, 'job_finished': None,
'finish_time': None
}, synchronize_session=False)
Session.query(File).filter(File.job_id == job.job_id)\
.filter(File.file_state.in_(FileActiveStates), File.pid == None) \
.update({
'file_state': 'CANCELED', 'reason': 'Job canceled by the user',
'finish_time': now, 'job_finished': now,
'finish_time': now
}, synchronize_session=False)
# However, for data management operations there is nothing to signal, so
# set job_finished
......@@ -626,13 +619,13 @@ class JobsController(BaseController):
)
try:
# FTS3 daemon expects job_finished to be NULL in order to trigger the signal
# FTS3 daemon expects finish_time to be NULL in order to trigger the signal
# to fts_url_copy
file_count = Session.query(File).filter(File.vo_name == vo_name)\
.filter(File.file_state.in_(FileActiveStates))\
.update({
'file_state': 'CANCELED', 'reason': 'Job canceled by the user',
'finish_time': now
'finish_time': None
}, synchronize_session=False)
# However, for data management operations there is nothing to signal, so
......@@ -648,7 +641,7 @@ class JobsController(BaseController):
.filter(Job.job_state.in_(JobActiveStates))\
.update({
'job_state': 'CANCELED', 'reason': 'Job canceled by the user',
'job_finished': now, 'finish_time': now
'job_finished': now
}, synchronize_session=False)
Session.commit()
Session.expire_all()
......@@ -679,12 +672,12 @@ class JobsController(BaseController):
)
try:
# FTS3 daemon expects job_finished to be NULL in order to trigger the signal
# FTS3 daemon expects finish_time to be NULL in order to trigger the signal
# to fts_url_copy
file_count = Session.query(File).filter(File.file_state.in_(FileActiveStates))\
.update({
'file_state': 'CANCELED', 'reason': 'Job canceled by the user',
'finish_time': now
'finish_time': None
}, synchronize_session=False)
# However, for data management operations there is nothing to signal, so
......@@ -699,7 +692,7 @@ class JobsController(BaseController):
job_count = Session.query(Job).filter(Job.job_state.in_(JobActiveStates))\
.update({
'job_state': 'CANCELED', 'reason': 'Job canceled by the user',
'job_finished': now, 'finish_time': now
'job_finished': now
}, synchronize_session=False)
Session.commit()
Session.expire_all()
......
......@@ -87,13 +87,13 @@ class SnapshotController(BaseController):
# Files for this pair+vo
files = Session.query(
File.file_state, File.tx_duration, File.throughput,
File.reason, Job.submit_time, File.start_time, File.job_finished, File.file_id
File.reason, Job.submit_time, File.start_time, File.finish_time, File.file_id
)\
.filter(File.job_id == Job.job_id)\
.filter(File.source_se == source)\
.filter(File.dest_se == destination)\
.filter(File.vo_name == vo)\
.filter((File.job_finished >= not_before) | (File.job_finished == None))\
.filter((File.finish_time >= not_before) | (File.finish_time == None))\
.all()
# Current number of active
......
......@@ -322,7 +322,7 @@ class JobBuilder(object):
'overall' job source and destination Storage Elements
"""
# Multihop
if self.job['reuse_job'] == 'H':
if self.job['job_type'] == 'H':
self.job['source_se'] = entries[0]['source_se']
self.job['dest_se'] = entries[-1]['dest_se']
# Regular transfers
......@@ -400,14 +400,14 @@ class JobBuilder(object):
Initializes the list of transfers
"""
reuse_flag = None
job_type = None
if self.params['multihop']:
reuse_flag = 'H'
job_type = 'H'
elif self.params['reuse'] is not None:
if _safe_flag(self.params['reuse']):
reuse_flag = 'Y'
job_type = 'Y'
else:
reuse_flag = 'N'
job_type = 'N'
self.is_bringonline = self.params['copy_pin_lifetime'] > 0 or self.params['bring_online'] > 0
......@@ -421,7 +421,7 @@ class JobBuilder(object):
self.job = dict(
job_id=self.job_id,
job_state=job_initial_state,
reuse_job=reuse_flag,
job_type=job_type,
retry=int(self.params['retry']),
retry_delay=int(self.params['retry_delay']),
job_params=self.params['gridftp'],
......@@ -450,7 +450,7 @@ class JobBuilder(object):
self.job['user_cred'] = self.params['credentials']
# If reuse is enabled, or it is a bring online job, generate one single "hash" for all files
if reuse_flag in ('H', 'Y') or self.is_bringonline:
if job_type in ('H', 'Y') or self.is_bringonline:
shared_hashed_id = _generate_hashed_id()
else:
shared_hashed_id = None
......@@ -479,12 +479,12 @@ class JobBuilder(object):
self.is_multiple, unique_files = _has_multiple_options(self.files)
if self.is_multiple:
# Multiple replicas can not use the reuse flag, nor multihop
if reuse_flag in ('H', 'Y'):
if job_type in ('H', 'Y'):
raise HTTPBadRequest('Can not specify reuse and multiple replicas at the same time')
# Only one unique file per multiple-replica job
if unique_files > 1:
raise HTTPBadRequest('Multiple replicas jobs can only have one unique file')
self.job['reuse_job'] = 'R'
self.job['job_type'] = 'R'
# Apply selection strategy
self._apply_selection_strategy()
......@@ -493,10 +493,10 @@ class JobBuilder(object):
# If reuse is enabled, source and destination SE must be the same
# for all entries
if reuse_flag == 'Y' and (not self.job['source_se'] or not self.job['dest_se']):
if job_type == 'Y' and (not self.job['source_se'] or not self.job['dest_se']):
raise HTTPBadRequest('Reuse jobs can only contain transfers for the same source and destination storage')
if (self.job['source_se'] and self.job['dest_se'] and (reuse_flag is None) and (len(self.files) > 1)) :
if (self.job['source_se'] and self.job['dest_se'] and (job_type is None) and (len(self.files) > 1)) :
small_files = 0
min_small_files = len(self.files) - 2
for file in self.files:
......@@ -504,11 +504,11 @@ class JobBuilder(object):
if file['user_filesize'] < 104857600:
small_files +=1
if small_files >= min_small_files:
self.job['reuse_job'] = 'Y'
self.job['job_type'] = 'Y'
log.debug("Reuse jobs with "+str(small_files)+" small files up to "+str(len(self.files))+" total files")
if self.job['reuse_job'] is None:
self.job['reuse_job'] = 'N'
if self.job['job_type'] is None:
self.job['job_type'] = 'N'
def _populate_deletion(self, deletion_dict):
"""
......@@ -517,7 +517,7 @@ class JobBuilder(object):
self.job = dict(
job_id=self.job_id,
job_state='DELETE',
reuse_job=None,
job_type=None,
retry=int(self.params['retry']),
retry_delay=int(self.params['retry_delay']),
job_params=self.params['gridftp'],
......
......@@ -126,11 +126,9 @@ class TestBanning(TestController):
files = Session.query(File).filter(File.job_id == job_id)
self.assertEqual('CANCELED', job.job_state)
self.assertNotEqual(None, job.job_finished)
self.assertNotEqual(None, job.finish_time)
self.assertEqual('User banned', job.reason)
for f in files:
self.assertEqual('CANCELED', f.file_state)
self.assertNotEqual(None, f.job_finished)
self.assertNotEqual(None, f.finish_time)
self.assertEqual('User banned', f.reason)
......@@ -227,10 +225,8 @@ class TestBanning(TestController):
files = Session.query(File).filter(File.job_id == job_id)
self.assertEqual('CANCELED', job.job_state)
self.assertNotEqual(None, job.job_finished)
self.assertNotEqual(None, job.finish_time)
for f in files:
self.assertEqual('CANCELED', f.file_state)
self.assertNotEqual(None, f.job_finished)
self.assertNotEqual(None, f.finish_time)
self.assertEqual('Storage banned', f.reason)
......@@ -260,7 +256,6 @@ class TestBanning(TestController):
job = Session.query(Job).get(job_id)
self.assertEqual('SUBMITTED', job.job_state)
self.assertEqual(None, job.job_finished)
self.assertEqual(None, job.finish_time)
files = Session.query(File).filter(File.job_id == job_id)
for f in files:
......@@ -269,7 +264,6 @@ class TestBanning(TestController):
self.assertNotEqual(None, f.finish_time)
else:
self.assertEqual('SUBMITTED', f.file_state)
self.assertEqual(None, f.job_finished)
def test_ban_se_cancel_vo(self):
"""
......@@ -328,10 +322,8 @@ class TestBanning(TestController):
files = Session.query(File).filter(File.job_id == job_id)
self.assertIn(job.job_state, ['ACTIVE', 'SUBMITTED'])
self.assertEqual(None, job.job_finished)
self.assertEqual(None, job.finish_time)
for f in files:
self.assertIn(f.file_state, ['ACTIVE', 'SUBMITTED'])
self.assertEqual(None, f.job_finished)
self.assertEqual(None, f.finish_time)
self.assertEqual(1234, f.wait_timeout)
self.assertGreater(f.wait_timestamp, datetime.utcnow() - timedelta(minutes=1))
......
......@@ -119,13 +119,11 @@ class TestJobCancel(TestController):
# Is it in the database?
job = Session.query(Job).get(job_id)
self.assertEqual(job.job_state, 'CANCELED')
self.assertEqual(job.reuse_job, 'N')
self.assertEqual(job.job_type, 'N')
self.assertNotEqual(None, job.job_finished)
self.assertNotEqual(None, job.finish_time)
for f in job.files:
self.assertEqual(f.file_state, 'CANCELED')
self.assertNotEqual(None, f.job_finished)
self.assertNotEqual(None, f.finish_time)
def test_cancel_running(self):
......@@ -150,11 +148,9 @@ class TestJobCancel(TestController):
job = Session.query(Job).get(job_id)
self.assertEqual(job.job_state, 'CANCELED')
self.assertNotEqual(None, job.job_finished)
self.assertNotEqual(None, job.finish_time)
for f in job.files:
self.assertEqual(f.file_state, 'CANCELED')
self.assertEqual(None, f.job_finished)
self.assertNotEqual(None, f.finish_time)
self.assertEqual(None, f.finish_time)
def test_cancel_terminal(self):
"""
......@@ -281,13 +277,8 @@ class TestJobCancel(TestController):
self.assertIsNotNone(job.job_finished)
self.assertEqual('CANCELED', job.files[0].file_state)
self.assertIsNotNone(job.files[0].finish_time)
self.assertIsNone(job.files[0].job_finished)
for f in job.files[1:]:
self.assertEqual(expect_files, f.file_state)
if expect_job in JobActiveStates:
self.assertIsNone(f.job_finished)
else:
self.assertIsNotNone(f.job_finished)
def test_cancel_file(self):
"""
......@@ -403,13 +394,11 @@ class TestJobCancel(TestController):
# Is it in the database?
job = Session.query(Job).get(job_id)
self.assertEqual(job.job_state, 'CANCELED')
self.assertEqual(job.reuse_job, 'N')
self.assertEqual(job.job_type, 'N')
self.assertNotEqual(None, job.job_finished)
self.assertNotEqual(None, job.finish_time)
for f in job.files:
self.assertEqual(f.file_state, 'CANCELED')
self.assertNotEqual(None, f.job_finished)
self.assertNotEqual(None, f.finish_time)
def test_cancel_reuse_small_files_and_big_files(self):
......@@ -426,13 +415,11 @@ class TestJobCancel(TestController):
# Is it in the database?
job = Session.query(Job).get(job_id)
self.assertEqual(job.job_state, 'CANCELED')
self.assertEqual(job.reuse_job, 'Y')
self.assertEqual(job.job_type, 'Y')
self.assertNotEqual(None, job.job_finished)
self.assertNotEqual(None, job.finish_time)
for f in job.files:
self.assertEqual(f.file_state, 'CANCELED')
self.assertNotEqual(None, f.job_finished)
self.assertNotEqual(None, f.finish_time)
def _become_root(self):
......
......@@ -98,7 +98,6 @@ class TestJobDeletion(TestController):
self.assertEqual('CANCELED', job.job_state)
self.assertEqual(job.reason, 'Job canceled by the user')
self.assertIsNotNone(job.finish_time)
self.assertIsNotNone(job.job_finished)
dm = Session.query(DataManagement).filter(DataManagement.job_id == job_id).all()
......
......@@ -49,7 +49,7 @@ class TestJobSubmission(TestController):
self.assertEqual(job.dest_se, 'root://dest.ch')
self.assertEqual(job.overwrite_flag, True)
self.assertEqual(job.verify_checksum, True)
self.assertEqual(job.reuse_job, 'N')
self.assertEqual(job.job_type, 'N')
self.assertEqual(job.priority, 3)
self.assertIsNone(job.max_time_in_queue)
......@@ -80,7 +80,6 @@ class TestJobSubmission(TestController):
# Validate submitter
self.assertEqual(socket.getfqdn(), job.submit_host)
self.assertEqual('rest', job.agent_dn)
def test_submit(self):
"""
......@@ -212,7 +211,7 @@ class TestJobSubmission(TestController):
self.assertGreater(len(job_id), 0)
job = Session.query(Job).get(job_id)
self.assertEqual(job.reuse_job, 'Y')
self.assertEqual(job.job_type, 'Y')
return job_id
......@@ -246,7 +245,7 @@ class TestJobSubmission(TestController):
self.assertGreater(len(job_id), 0)
job = Session.query(Job).get(job_id)
self.assertEqual(job.reuse_job, 'Y')
self.assertEqual(job.job_type, 'Y')
def test_submit_post(self):
"""
......@@ -698,7 +697,7 @@ class TestJobSubmission(TestController):
'filesize': 1024,
'metadata': {'mykey': 'myvalue'},
}],
'params': {'overwrite': True, 'verify_checksum': True, 'credential': 'dropbox'}
'params': {'overwrite': True, 'verify_checksum': True}
}
job_id = self.app.post(
......@@ -712,7 +711,6 @@ class TestJobSubmission(TestController):
self.assertEqual(1, len(job.files))
self.assertEqual('dropbox://dropbox.com/file', job.files[0].source_surl)
self.assertEqual('root://dest.ch:8447/file', job.files[0].dest_surl)
self.assertEqual('dropbox', job.user_cred)
def test_submit_protocol_params(self):
"""
......
......@@ -64,7 +64,7 @@ class TestMultiple(TestController):
# Validate job in the database
db_job = Session.query(Job).get(job_id)
self.assertEqual(db_job.reuse_job, 'R')
self.assertEqual(db_job.job_type, 'R')
self.assertEqual(len(db_job.files), 4)
......@@ -132,7 +132,7 @@ class TestMultiple(TestController):
job_id = json.loads(answer.body)['job_id']
db_job = Session.query(Job).get(job_id)
self.assertEqual(db_job.reuse_job, 'R')
self.assertEqual(db_job.job_type, 'R')
self.assertEqual(len(db_job.files), 2)
......@@ -193,7 +193,7 @@ class TestMultiple(TestController):
# Validate job in the database
db_job = Session.query(Job).get(job_id)
self.assertNotEqual(db_job.reuse_job, 'R')
self.assertNotEqual(db_job.job_type, 'R')
self.assertEqual(len(db_job.files), 2)
......@@ -308,7 +308,7 @@ class TestMultiple(TestController):
).json['job_id']
job = Session.query(Job).get(job_id)
self.assertEqual(job.reuse_job, 'Y')
self.assertEqual(job.job_type, 'Y')
# In a reuse job, the hashed ID must be the same for all files!
# Regression for FTS-20
......@@ -349,7 +349,7 @@ class TestMultiple(TestController):
# Also, the reuse flag must be 'H' in the database
job = Session.query(Job).get(job_id)
self.assertEqual(job.reuse_job, 'H')