Commit c3fbbf5b authored by Joao Pedro Lopes's avatar Joao Pedro Lopes
Browse files
parent 804ef74c
Pipeline #3090217 passed with stages
in 11 minutes and 45 seconds
......@@ -22,7 +22,31 @@ import time
from .base import Base
from fts3.rest.client import Submitter, Delegator, Inquirer
DEFAULT_CHECKSUM = "ADLER32"
DEFAULT_PARAMS = {
"checksum": "ADLER32",
"overwrite": False,
"reuse": False,
"job_metadata": None,
"file_metadata": None,
"filesize": None,
"gridftp": None,
"spacetoken": None,
"source_spacetoken": None,
"verify_checksum": "n",
"copy_pin_lifetime": -1,
"bring_online": -1,
"archive_timeout": -1,
"timeout": None,
"fail_nearline": False,
"retry": 0,
"multihop": False,
"credential": None,
"nostreams": None,
"s3alternate": False,
"target_qos": None,
"ipv4": False,
"ipv6": False,
}
def _metadata(data):
......@@ -116,7 +140,6 @@ class JobSubmitter(Base):
"-o",
"--overwrite",
dest="overwrite",
default=False,
action="store_true",
help="overwrite files.",
)
......@@ -124,7 +147,6 @@ class JobSubmitter(Base):
"-r",
"--reuse",
dest="reuse",
default=False,
action="store_true",
help="enable session reuse for the transfer job.",
)
......@@ -165,21 +187,18 @@ class JobSubmitter(Base):
"--checksum-mode",
dest="checksum_mode",
type="string",
default="none",
help="compare checksums in source, target, both or none.",
)
self.opt_parser.add_option(
"--copy-pin-lifetime",
dest="pin_lifetime",
type="long",
default=-1,
help="pin lifetime of the copy in seconds.",
)
self.opt_parser.add_option(
"--bring-online",
dest="bring_online",
type="long",
default=None,
help="bring online timeout in seconds.",
)
self.opt_parser.add_option(
......@@ -192,13 +211,11 @@ class JobSubmitter(Base):
"--timeout",
dest="timeout",
type="long",
default=None,
help="transfer timeout in seconds.",
)
self.opt_parser.add_option(
"--fail-nearline",
dest="fail_nearline",
default=False,
action="store_true",
help="fail the transfer is the file is nearline.",
)
......@@ -220,7 +237,6 @@ class JobSubmitter(Base):
"--retry",
dest="retry",
type="int",
default=0,
help="Number of retries. If 0, the server default will be used."
"If negative, there will be no retries.",
)
......@@ -228,29 +244,26 @@ class JobSubmitter(Base):
"-m",
"--multi-hop",
dest="multihop",
default=False,
action="store_true",
help="submit a multihop transfer.",
)
self.opt_parser.add_option(
"--cloud-credentials",
dest="cloud_cred",
default=None,
help="use cloud credentials for the job (i.e. dropbox).",
)
self.opt_parser.add_option(
"--nostreams", dest="nostreams", default=None, help="number of streams"
"--nostreams", dest="nostreams", help="number of streams"
)
self.opt_parser.add_option(
"--ipv4", dest="ipv4", default=False, action="store_true", help="force ipv4"
"--ipv4", dest="ipv4", action="store_true", help="force ipv4"
)
self.opt_parser.add_option(
"--ipv6", dest="ipv6", default=False, action="store_true", help="force ipv6"
"--ipv6", dest="ipv6", action="store_true", help="force ipv6"
)
self.opt_parser.add_option(
"--s3alternate",
dest="s3alternate",
default=False,
action="store_true",
help="use S3 alternate URL",
)
......@@ -258,7 +271,6 @@ class JobSubmitter(Base):
"--target-qos",
dest="target_qos",
type="string",
default=None,
help="define the target QoS for this transfer for CDMI endpoints",
)
......@@ -270,25 +282,21 @@ class JobSubmitter(Base):
sys.exit(1)
elif len(self.args) == 2:
(self.source, self.destination) = self.args
self.checksum = None
elif len(self.args) == 3:
(self.source, self.destination, self.checksum) = self.args
else:
self.logger.critical("Too many parameters")
sys.exit(1)
else:
self.checksum = None
if self.options.ipv4 and self.options.ipv6:
self._prepare_options()
if self.params["ipv4"] and self.params["ipv6"]:
self.opt_parser.error("ipv4 and ipv6 can not be used at the same time")
if self.options.verbose:
self.logger.setLevel(logging.DEBUG)
def _build_transfers(self):
if self.options.bulk_file:
filecontent = open(self.options.bulk_file).read()
bulk = json.loads(filecontent)
with open(self.options.bulk_file, "r") as file:
filecontent = file.read()
bulk = json.loads(filecontent)
if "files" in bulk:
return bulk["files"]
elif "Files" in bulk:
......@@ -299,38 +307,39 @@ class JobSubmitter(Base):
else:
return [{"sources": [self.source], "destinations": [self.destination]}]
def _do_submit(self, context):
def _build_params(self, **kwargs):
params = dict()
params.update(DEFAULT_PARAMS)
if self.options.bulk_file:
with open(self.options.bulk_file, "r") as file:
filecontent = file.read()
bulk = json.loads(filecontent)
if "params" in bulk:
params.update(bulk["params"])
# Apply command-line parameters
for k, v in iter(kwargs.items()):
if v is not None:
params[k] = v
# JSONify metadata
params["job_metadata"] = _metadata(params["job_metadata"])
params["file_metadata"] = _metadata(params["file_metadata"])
return params
def _prepare_options(self):
# Backwards compatibility: compare_checksum parameter
# Note: compare_checksum has higher priority than checksum_mode
if self.options.compare_checksum:
checksum_mode = "both"
elif self.options.checksum_mode:
checksum_mode = self.options.checksum_mode
else:
if self.checksum:
checksum_mode = "target"
else:
checksum = "none"
# Compare checksum has major priority than checksum_mode
if not self.options.compare_checksum:
if len(self.options.checksum_mode) > 0:
checksum_mode = self.options.checksum_mode
else:
checksum_mode = "none"
if not self.checksum:
self.checksum = DEFAULT_CHECKSUM
if not self.options.access_token:
delegator = Delegator(context)
delegator.delegate(
timedelta(minutes=self.options.proxy_lifetime),
delegate_when_lifetime_lt=timedelta(
minutes=self.options.delegate_when_lifetime_lt
),
)
checksum_mode = "none"
submitter = Submitter(context)
job_id = submitter.submit(
self._build_transfers(),
self.transfers = self._build_transfers()
self.params = self._build_params(
checksum=self.checksum,
bring_online=self.options.bring_online,
archive_timeout=self.options.archive_timeout,
......@@ -356,6 +365,19 @@ class JobSubmitter(Base):
target_qos=self.options.target_qos,
)
def _do_submit(self, context):
if not self.options.access_token:
delegator = Delegator(context)
delegator.delegate(
timedelta(minutes=self.options.proxy_lifetime),
delegate_when_lifetime_lt=timedelta(
minutes=self.options.delegate_when_lifetime_lt
),
)
submitter = Submitter(context)
job_id = submitter.submit(transfers=self.transfers, params=self.params)
if self.options.json:
self.logger.info(json.dumps(job_id))
else:
......@@ -384,51 +406,8 @@ class JobSubmitter(Base):
return job_id
def _do_dry_run(self, context):
# Backwards compatibility: compare_checksum parameter
if self.options.compare_checksum:
checksum_mode = "both"
else:
if self.checksum:
checksum_mode = "target"
else:
checksum = "none"
# Compare checksum has major priority than checksum_mode
if not self.options.compare_checksum:
if len(self.options.checksum_mode) > 0:
checksum_mode = self.options.checksum_mode
else:
checksum_mode = "none"
if not self.checksum:
self.checksum = DEFAULT_CHECKSUM
submitter = Submitter(context)
print(
submitter.build_submission(
self._build_transfers(),
checksum=self.checksum,
bring_online=self.options.bring_online,
timeout=self.options.timeout,
verify_checksum=checksum_mode,
spacetoken=self.options.destination_token,
source_spacetoken=self.options.source_token,
fail_nearline=self.options.fail_nearline,
file_metadata=_metadata(self.options.file_metadata),
filesize=self.options.file_size,
gridftp=self.options.gridftp_params,
job_metadata=_metadata(self.options.job_metadata),
overwrite=self.options.overwrite,
copy_pin_lifetime=self.options.pin_lifetime,
reuse=self.options.reuse,
retry=self.options.retry,
multihop=self.options.multihop,
credential=self.options.cloud_cred,
nostreams=self.options.nostreams,
ipv4=self.options.ipv4,
ipv6=self.options.ipv6,
s3alternate=self.options.s3alternate,
)
)
print(submitter.build_submission(transfers=self.transfers, params=self.params))
return None
def run(self):
......
......@@ -21,10 +21,13 @@ class Submitter:
self.context = context
@staticmethod
def build_submission(transfers=None, delete=None, staging=None, **kwargs):
def build_submission(
transfers=None, delete=None, params=None, staging=None, **kwargs
):
job = dict()
job["params"] = dict()
if params:
job["params"].update(params)
job["params"].update(kwargs)
if delete:
......@@ -36,21 +39,23 @@ class Submitter:
if "checksum" in kwargs:
for f in job["files"]:
if "checksum" not in f:
f["checksum"] = kwargs["checksum"]
f["checksum"] = job["params"]["checksum"]
del job["params"]["checksum"]
if "filesize" in kwargs:
if "filesize" in job["params"]:
for f in job["files"]:
f["filesize"] = kwargs["filesize"]
f["filesize"] = job["params"]["filesize"]
del job["params"]["filesize"]
if "file_metadata" in kwargs:
if "file_metadata" in job["params"]:
for f in job["files"]:
f["metadata"] = kwargs["file_metadata"]
f["metadata"] = job["params"]["file_metadata"]
del job["params"]["file_metadata"]
return json.dumps(job, indent=2)
def submit(self, transfers=None, delete=None, **kwargs):
job = Submitter.build_submission(transfers, delete, **kwargs)
def submit(self, transfers=None, delete=None, params=None, **kwargs):
job = Submitter.build_submission(
transfers=transfers, delete=delete, params=params, **kwargs
)
r = json.loads(self.context.post_json("/jobs", job))
return r["job_id"]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment