Commit 5564a3ba authored by Joao Pedro Lopes's avatar Joao Pedro Lopes
Browse files
parent 946e6d6a
Pipeline #3118321 passed with stages
in 11 minutes and 4 seconds
......@@ -25,6 +25,7 @@ from fts3.rest.client import Submitter, Delegator, Inquirer
DEFAULT_PARAMS = {
"checksum": "ADLER32",
"overwrite": False,
"overwrite_on_retry": False,
"reuse": False,
"job_metadata": None,
"file_metadata": None,
......@@ -145,6 +146,13 @@ class JobSubmitter(Base):
action="store_true",
help="overwrite files.",
)
self.opt_parser.add_option(
"-o",
"--overwrite--on-retry",
dest="overwrite_on_retry",
action="store_true",
help="overwrite files.",
)
self.opt_parser.add_option(
"-r",
"--reuse",
......@@ -307,6 +315,10 @@ class JobSubmitter(Base):
self._prepare_options()
if self.params["ipv4"] and self.params["ipv6"]:
self.opt_parser.error("ipv4 and ipv6 can not be used at the same time")
if self.params["overwrite"] and self.params["overwrite_on_retry"]:
self.opt_parser.error(
"overwrite and overwrite-on-retry can not be used at the same time"
)
def _build_transfers(self):
if self.options.bulk_file:
......@@ -372,6 +384,7 @@ class JobSubmitter(Base):
gridftp=self.options.gridftp_params,
job_metadata=_metadata(self.options.job_metadata),
overwrite=self.options.overwrite,
overwrite_on_retry=self.options.overwrite_on_retry,
copy_pin_lifetime=self.options.pin_lifetime,
reuse=self.options.reuse,
retry=self.options.retry,
......
......@@ -117,6 +117,7 @@ def new_job(
verify_checksum=False,
reuse=None,
overwrite=False,
overwrite_on_retry=False,
multihop=False,
source_spacetoken=None,
spacetoken=None,
......@@ -146,6 +147,7 @@ def new_job(
verify_checksum: Enable checksum verification: source, destination, both or none
reuse: Enable reuse (all transfers are handled by the same process)
overwrite: Overwrite the destinations if exist
overwrite_on_retry: Enable overwrite files only during FTS retries
multihop: Treat the transfer as a multihop transfer
source_spacetoken: Source space token
spacetoken: Destination space token
......@@ -176,6 +178,12 @@ def new_job(
raise ClientError(
"Bad request: verify_checksum does not contain a valid value"
)
if overwrite != False and overwrite_on_retry != False:
raise ClientError(
"Bad request: overwrite and overwrite-on-retry can not be used at the same time"
)
params = dict(
verify_checksum=verify_checksum,
reuse=reuse,
......@@ -187,6 +195,7 @@ def new_job(
job_metadata=metadata,
source_spacetoken=source_spacetoken,
overwrite=overwrite,
overwrite_on_retry=overwrite_on_retry,
multihop=multihop,
retry=retry,
retry_delay=retry_delay,
......
......@@ -340,6 +340,13 @@ class JobBuilder:
if max_time_in_queue is not None:
expiration_time = time.time() + max_time_in_queue
if self.params["overwrite"]:
overwrite_flag = "Y"
elif self.params["overwrite_on_retry"]:
overwrite_flag = "R"
else:
overwrite_flag = None
self.job = dict(
job_id=self.job_id,
job_state=job_initial_state,
......@@ -355,7 +362,7 @@ class JobBuilder:
submit_time=datetime.utcnow(),
priority=max(min(int(self.params["priority"]), 5), 1),
space_token=self.params["spacetoken"],
overwrite_flag=safe_flag(self.params["overwrite"]),
overwrite_flag=overwrite_flag,
dst_file_report=safe_flag(self.params["dst_file_report"]),
source_space_token=self.params["source_spacetoken"],
copy_pin_lifetime=int(self.params["copy_pin_lifetime"]),
......
......@@ -28,6 +28,7 @@ DEFAULT_PARAMS = {
"gridftp": "",
"job_metadata": None,
"overwrite": False,
"overwrite_on_retry": False,
"dst_file_report": False,
"reuse": None,
"multihop": False,
......
......@@ -15,7 +15,7 @@
from sqlalchemy import Column, DateTime, Integer, String, Enum
from sqlalchemy.orm import relation, backref
from .base import Base, Flag, Json
from .base import Base, Flag, TernaryFlag, Json
from .file import ArchivedFile
JobActiveStates = [
......@@ -51,7 +51,7 @@ class Job(Base):
space_token = Column(String(255))
internal_job_params = Column(String(255))
dst_file_report = Column(Flag)
overwrite_flag = Column(Flag)
overwrite_flag = Column(TernaryFlag)
job_finished = Column(DateTime)
source_space_token = Column(String(255))
copy_pin_lifetime = Column(Integer)
......@@ -96,7 +96,7 @@ class ArchivedJob(Base):
space_token = Column(String(255))
internal_job_params = Column(String(255))
dst_file_report = Column(Flag)
overwrite_flag = Column(Flag)
overwrite_flag = Column(TernaryFlag)
job_finished = Column(DateTime)
source_space_token = Column(String(255))
copy_pin_lifetime = Column(Integer)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment