Commit 4a55f8d5 authored by Mihai Patrascoiu's avatar Mihai Patrascoiu
Browse files

Merge branch 'archive_monitoring' into develop

parents 5bd06e9a 3d8cf1f2
Pipeline #1997642 passed with stage
in 2 minutes and 13 seconds
......@@ -118,7 +118,7 @@ class JobDeletionSubmitter(Base):
if job_id and self.options.blocking:
inquirer = Inquirer(context)
job = inquirer.get_job_status(job_id)
while job['job_state'] in ['SUBMITTED', 'READY', 'STAGING', 'QOS_TRANSITION', 'ACTIVE', 'DELETE']:
while job['job_state'] in ['SUBMITTED', 'READY', 'STAGING', 'ACTIVE', 'DELETE', 'ARCHIVING', 'QOS_TRANSITION', 'QOS_REQUEST_SUBMITTED']:
self.logger.info("Job in state %s" % job['job_state'])
time.sleep(self.options.poll_interval)
job = inquirer.get_job_status(job_id)
......
......@@ -119,6 +119,8 @@ class JobSubmitter(Base):
help='pin lifetime of the copy in seconds.')
self.opt_parser.add_option('--bring-online', dest='bring_online', type='long', default=None,
help='bring online timeout in seconds.')
self.opt_parser.add_option('--archive-timeout', dest='archive_timeout', type='long', default=None,
help='archive timeout in seconds.')
self.opt_parser.add_option('--timeout', dest='timeout', type='long', default=None,
help='transfer timeout in seconds.')
self.opt_parser.add_option('--fail-nearline', dest='fail_nearline', default=False, action='store_true',
......@@ -214,7 +216,8 @@ class JobSubmitter(Base):
self._build_transfers(),
checksum=self.checksum,
bring_online=self.options.bring_online,
timeout=self.options.timeout,
archive_timeout=self.options.archive_timeout,
timeout = self.options.timeout,
verify_checksum=checksum_mode[0],
spacetoken=self.options.destination_token,
source_spacetoken=self.options.source_token,
......@@ -244,7 +247,7 @@ class JobSubmitter(Base):
if job_id and self.options.blocking:
inquirer = Inquirer(context)
job = inquirer.get_job_status(job_id)
while job['job_state'] in ['SUBMITTED', 'READY', 'STAGING', 'ACTIVE', 'QOS_TRANSITION']:
while job['job_state'] in ['SUBMITTED', 'READY', 'STAGING', 'ACTIVE', 'ARCHIVING', 'QOS_TRANSITION', 'QOS_REQUEST_SUBMITTED']:
self.logger.info("Job in state %s" % job['job_state'])
time.sleep(self.options.poll_interval)
job = inquirer.get_job_status(job_id)
......@@ -279,7 +282,8 @@ class JobSubmitter(Base):
self._build_transfers(),
checksum=self.checksum,
bring_online=self.options.bring_online,
timeout=self.options.timeout,
archive_timeout = self.options.archive_timeout,
timeout = self.options.timeout,
verify_checksum=checksum_mode,
spacetoken=self.options.destination_token,
source_spacetoken=self.options.source_token,
......
......@@ -21,7 +21,7 @@ from base import Base
class CloudStorage(Base):
__tablename__ = 't_cloudStorage'
storage_name = Column(String(50), primary_key=True, name='cloudStorage_name')
storage_name = Column(String(150), primary_key=True, name='cloudStorage_name')
app_key = Column(String(255))
app_secret = Column(String(255))
service_api_url = Column(String(1024))
......@@ -31,7 +31,7 @@ class CloudStorageUser(Base):
__tablename__ = 't_cloudStorageUser'
user_dn = Column(String(700), primary_key=True)
storage_name = Column(String(36), ForeignKey('t_cloudStorage.cloudStorage_name'),
storage_name = Column(String(150), ForeignKey('t_cloudStorage.cloudStorage_name'),
primary_key=True, name='cloudStorage_name')
access_token = Column(String(255))
access_token_secret = Column(String(255))
......
......@@ -22,7 +22,7 @@ from sqlalchemy.orm import relation, backref
from base import Base, Json
FileActiveStates = ['STAGING', 'STARTED', 'SUBMITTED', 'READY', 'ACTIVE', 'QOS_TRANSITION']
FileActiveStates = ['STAGING', 'STARTED', 'SUBMITTED', 'READY', 'ACTIVE', 'ARCHIVING', 'QOS_TRANSITION', 'QOS_REQUEST_SUBMITTED']
FileTerminalStates = ['FINISHED', 'FAILED', 'CANCELED']
# NOT_USED is not terminal, nor not-terminal
FileOnHoldStates = ['NOT_USED', 'ON_HOLD', 'ON_HOLD_STAGING']
......@@ -55,6 +55,8 @@ class File(Base):
checksum = Column(String(100))
finish_time = Column(DateTime)
start_time = Column(DateTime)
archive_start_time = Column(DateTime)
archive_finish_time = Column(DateTime)
internal_file_params = Column(String(255))
pid = Column(Integer)
tx_duration = Column(Float)
......@@ -102,6 +104,8 @@ class ArchivedFile(Base):
checksum = Column(String(100))
finish_time = Column(DateTime)
start_time = Column(DateTime)
archive_start_time = Column(DateTime)
archive_finish_time = Column(DateTime)
internal_file_params = Column(String(255))
job_finished = Column(DateTime)
pid = Column(Integer)
......
......@@ -22,7 +22,7 @@ from sqlalchemy.orm import relation, backref
from base import Base, Flag, TernaryFlag, Json
JobActiveStates = ['STAGING', 'SUBMITTED', 'READY', 'ACTIVE', 'DELETE', 'QOS_TRANSITION', 'QOS_REQUEST_SUBMITTED']
JobActiveStates = ['STAGING', 'SUBMITTED', 'READY', 'ACTIVE', 'DELETE', 'ARCHIVING', 'QOS_TRANSITION', 'QOS_REQUEST_SUBMITTED']
JobTerminalStates = ['FINISHED', 'FAILED', 'FINISHEDDIRTY', 'CANCELED']
......@@ -52,6 +52,7 @@ class Job(Base):
verify_checksum = Column(String(1),
name='checksum_method')
bring_online = Column(Integer)
archive_timeout = Column(Integer)
target_qos = Column(String(255))
job_metadata = Column(Json(255))
retry = Column(Integer)
......@@ -97,6 +98,7 @@ class ArchivedJob(Base):
verify_checksum = Column(String(1),
name='checksum_method')
bring_online = Column(Integer)
archive_timeout = Column(Integer)
target_qos = Column(String(255))
job_metadata = Column(Json(255))
retry = Column(Integer)
......
......@@ -105,7 +105,7 @@ def add_alternative_source(transfer, alt_source):
def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, overwrite=False, multihop=False,
source_spacetoken=None, spacetoken=None,
bring_online=None, copy_pin_lifetime=None,
bring_online=None, archive_timeout=None, copy_pin_lifetime=None,
retry=-1, retry_delay=0, metadata=None, priority=None, strict_copy=False,
max_time_in_queue=None, timeout=None,
id_generator=JobIdGenerator.standard, sid=None, s3alternate=False, nostreams=1):
......@@ -122,6 +122,7 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov
source_spacetoken: Source space token
spacetoken: Destination space token
bring_online: Bring online timeout
archive_timeout: Archive timeout
copy_pin_lifetime: Pin lifetime
retry: Number of retries: <0 is no retries, 0 is server default, >0 is whatever value is passed
metadata: Metadata to bind to the job
......@@ -148,6 +149,7 @@ def new_job(transfers=None, deletion=None, verify_checksum=False, reuse=None, ov
reuse=reuse,
spacetoken=spacetoken,
bring_online=bring_online,
archive_timeout=archive_timeout,
copy_pin_lifetime=copy_pin_lifetime,
job_metadata=metadata,
source_spacetoken=source_spacetoken,
......
......@@ -40,6 +40,7 @@ BASE_ID = uuid.UUID('urn:uuid:01874efb-4735-4595-bc9c-591aef8240c9')
DEFAULT_PARAMS = {
'bring_online': -1,
'archive_timeout': -1,
'verify_checksum': False,
'copy_pin_lifetime': -1,
'gridftp': '',
......@@ -100,6 +101,12 @@ def _validate_url(url):
if not url.hostname:
raise ValueError('Missing host (%s)' % url.geturl())
def _metadata(data):
try:
return json.loads(data)
except:
return {"label": str(data)}
def _safe_flag(flag):
"""
......@@ -305,6 +312,11 @@ class JobBuilder(object):
for k, v in params.iteritems():
if v is None and k in DEFAULT_PARAMS:
params[k] = DEFAULT_PARAMS[k]
# Enforce JSON type for 'job_metadata'
if params['job_metadata'] is not None:
params['job_metadata'] = _metadata(params['job_metadata'])
return params
def _build_internal_job_params(self):
......@@ -493,6 +505,7 @@ class JobBuilder(object):
copy_pin_lifetime=int(self.params['copy_pin_lifetime']),
checksum_method=self.params['verify_checksum'],
bring_online=self.params['bring_online'],
archive_timeout=self.params['archive_timeout'],
job_metadata=self.params['job_metadata'],
internal_job_params=self._build_internal_job_params(),
max_time_in_queue=expiration_time,
......@@ -629,6 +642,7 @@ class JobBuilder(object):
copy_pin_lifetime=-1,
checksum_method=None,
bring_online=None,
archive_timeout=None,
job_metadata=self.params['job_metadata'],
internal_job_params=None,
max_time_in_queue=self.params['max_time_in_queue']
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment