From a1ae47d632c287ec9e14678272a435a3002afbfe Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Wed, 12 Feb 2020 11:07:23 +0100 Subject: [PATCH 01/11] Migrate base (remove if version_info 2.6) --- src/fts3/cli/base.py | 178 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) create mode 100644 src/fts3/cli/base.py diff --git a/src/fts3/cli/base.py b/src/fts3/cli/base.py new file mode 100644 index 00000000..9022781a --- /dev/null +++ b/src/fts3/cli/base.py @@ -0,0 +1,178 @@ +# Copyright notice: +# Copyright Members of the EMI Collaboration, 2013. +# +# See www.eu-emi.eu for details on the copyright holders +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from configparser import ConfigParser +from optparse import OptionParser, IndentedHelpFormatter +import logging +import os +import socket +import sys + +from fts3.rest.client import Context + + +CONFIG_FILENAMES = ["/etc/fts3/fts3client.cfg", os.path.expanduser("~/.fts3client.cfg")] + +CONFIG_DEFAULTSECTION = "Main" +CONFIG_DEFAULTS = { + "verbose": "false", + "endpoint": "None", + "json": "false", + "ukey": "None", + "ucert": "None", +} + + +class _Formatter(IndentedHelpFormatter): + def format_epilog(self, epilog): + if not epilog: + return "" + else: + lines = ["Example:"] + indent = (self.current_indent + self.indent_increment) * " " + for l in epilog.splitlines(): + nl = l.strip() % {"prog": sys.argv[0]} + if len(nl) > 0: + lines.append(indent + nl) + return "\n" + "\n".join(lines) + "\n\n" + + +def _get_local_endpoint(): + """ + Generate an endpoint using the machine hostname + """ + return "https://%s:8446" % socket.getfqdn() + + +class Base(object): + def __init__(self, extra_args=None, description=None, example=None): + self.logger = logging.getLogger("fts3") + + # Common CLI options + usage = None + if extra_args: + usage = "usage: %prog [options] " + extra_args + + config = ConfigParser(defaults=CONFIG_DEFAULTS) + + section = CONFIG_DEFAULTSECTION + config.read(CONFIG_FILENAMES) + + # manually set the section in edge cases + if not config.has_section("Main"): + section = "DEFAULT" + + # manually get values for which we need to support None + opt_endpoint = config.get(section, "endpoint") + if opt_endpoint == "None": + opt_endpoint = None + opt_ukey = config.get(section, "ukey") + if opt_ukey == "None": + opt_ukey = None + opt_ucert = config.get(section, "ucert") + if opt_ucert == "None": + opt_ucert = None + + self.opt_parser = OptionParser( + usage=usage, description=description, epilog=example, formatter=_Formatter() + ) + + self.opt_parser.add_option( + "-v", + "--verbose", + dest="verbose", + action="store_true", + help="verbose output.", + default=config.getboolean(section, "verbose"), + ) + self.opt_parser.add_option( + "-s", + "--endpoint", + dest="endpoint", + help="FTS3 REST endpoint.", + default=opt_endpoint, + ) + self.opt_parser.add_option( + "-j", + "--json", + dest="json", + action="store_true", + help="print the output in JSON format.", + default=config.getboolean(section, "json"), + ) + self.opt_parser.add_option( + "--key", + dest="ukey", + help="the user certificate private key.", + default=opt_ukey, + ) + self.opt_parser.add_option( + "--cert", dest="ucert", help="the user certificate.", default=opt_ucert + ) + self.opt_parser.add_option( + "--capath", + dest="capath", + default="/etc/grid-security/certificates", + help="use the specified directory to verify the peer", + ) + self.opt_parser.add_option( + "--insecure", + dest="verify", + default=True, + action="store_false", + help="do not validate the server certificate", + ) + self.opt_parser.add_option( + "--access-token", + dest="access_token", + help="OAuth2 access token (supported only by some endpoints, takes precedence)", + default=None, + ) + + def __call__(self, argv=sys.argv[1:]): + (self.options, self.args) = self.opt_parser.parse_args(argv) + if self.options.endpoint is None: + self.options.endpoint = _get_local_endpoint() + if self.options.verbose: + self.logger.setLevel(logging.DEBUG) + self.validate() + return self.run() + + def validate(self): + """ + Should be implemented by inheriting classes to validate the command line arguments. + The implementation is assumed to call sys.exit() to abort if needed + """ + pass + + def run(self): + """ + Implementation of the command + """ + raise NotImplementedError( + "Run method not implemented in %s" % type(self).__name__ + ) + + def _create_context(self): + return Context( + self.options.endpoint, + ukey=self.options.ukey, + ucert=self.options.ucert, + verify=self.options.verify, + access_token=self.options.access_token, + capath=self.options.capath, + ) -- GitLab From e9efbefa935d7cfcd5444664d1993d0e140c45fd Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Wed, 12 Feb 2020 11:09:10 +0100 Subject: [PATCH 02/11] Migrate banning --- src/fts3/cli/banning.py | 115 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 src/fts3/cli/banning.py diff --git a/src/fts3/cli/banning.py b/src/fts3/cli/banning.py new file mode 100644 index 00000000..e3ae0047 --- /dev/null +++ b/src/fts3/cli/banning.py @@ -0,0 +1,115 @@ +# Copyright notice: +# Copyright CERN, 2014. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from .base import Base +from fts3.rest.client import Ban + + +class Banning(Base): + def __init__(self): + super(Banning, self).__init__( + description="Ban and unban storage elements and users", + example=""" + $ %(prog)s -s https://fts3-devel.cern.ch:8446 --storage gsiftp://sample + No jobs affected + $ %(prog)s -s https://fts3-devel.cern.ch:8446 --storage gsiftp://sample --unban + $ + """, + ) + + self.opt_parser.add_option("--storage", dest="storage", help="storage element") + self.opt_parser.add_option("--user", dest="user_dn", help="user dn") + self.opt_parser.add_option( + "--unban", + dest="unban", + default=False, + action="store_true", + help="unban instead of ban", + ) + + self.opt_parser.add_option( + "--status", + dest="status", + default="cancel", + help="status of the jobs that are already in the queue: cancel or wait", + ) + self.opt_parser.add_option( + "--timeout", + dest="timeout", + default=0, + help="the timeout for the jobs that are already in the queue if status is wait", + ) + self.opt_parser.add_option( + "--allow-submit", + dest="allow_submit", + default=False, + action="store_true", + help="allow submissions if status is wait", + ) + + def validate(self): + # Some sanity checks + # This are checked server side anyway (or so they should) but we can shorcurt here + self.options.status = self.options.status.lower() + if self.options.status not in ["cancel", "wait"]: + self.logger.critical("Status can only be cancel or wait") + sys.exit(1) + + if self.options.status == "cancel": + if self.options.allow_submit: + self.logger.critical( + "--allow-submit can only be used with --status=wait" + ) + sys.exit(1) + else: + if self.options.user_dn: + self.logger.critical("--user only accept cancel") + sys.exit(1) + + if (not self.options.storage and not self.options.user_dn) or ( + self.options.storage and self.options.user_dn + ): + self.logger.critical("Need to specify only one of --storage or --user") + sys.exit(1) + + def run(self): + context = self._create_context() + ban = Ban(context) + + affected_jobs = None + if self.options.storage: + if self.options.unban: + ban.unban_se(self.options.storage) + else: + affected_jobs = ban.ban_se( + self.options.storage, + self.options.status, + self.options.timeout, + self.options.allow_submit, + ) + else: + if self.options.unban: + ban.unban_dn(self.options.user_dn) + else: + affected_jobs = ban.ban_dn(self.options.user_dn) + + if affected_jobs: + self.logger.info("Affected jobs:") + for j in affected_jobs: + self.logger.info(j) + elif not self.options.unban: + self.logger.info("No jobs affected") -- GitLab From ad65629a9af6834d40363c296f697e846069572c Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Wed, 12 Feb 2020 11:10:27 +0100 Subject: [PATCH 03/11] Migrate delegator --- src/fts3/cli/delegator.py | 62 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 src/fts3/cli/delegator.py diff --git a/src/fts3/cli/delegator.py b/src/fts3/cli/delegator.py new file mode 100644 index 00000000..916168a4 --- /dev/null +++ b/src/fts3/cli/delegator.py @@ -0,0 +1,62 @@ +# Copyright notice: +# Copyright Members of the EMI Collaboration, 2013. +# +# See www.eu-emi.eu for details on the copyright holders +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import timedelta + +from .base import Base +from fts3.rest.client import Delegator as Deleg + + +class Delegator(Base): + def __init__(self): + super(Delegator, self).__init__( + description="This command can be used to (re)delegate your credentials to the FTS3 server", + example=""" + $ %(prog)s -s https://fts3-devel.cern.ch:8446 + Delegation id: 9a4257f435fa2010" + """, + ) + + self.opt_parser.add_option( + "-f", + "--force", + dest="force", + default=False, + action="store_true", + help="force the delegation", + ) + + self.opt_parser.add_option( + "-H", + "--hours", + dest="duration", + default=12, + type="int", + help="Duration of the delegation in hours (Default: 12)", + ) + + def run(self): + context = self._create_context() + delegator = Deleg(context) + delegation_id = delegator.delegate( + lifetime=timedelta(hours=self.options.duration), force=self.options.force + ) + self.logger.info("Delegation id: %s" % delegation_id) + self.logger.debug( + "Termination time: %s" % delegator.get_info()["termination_time"] + ) + return delegation_id -- GitLab From f5765b723a3925f4bd4d193dc6e9a30ff1971388 Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Wed, 12 Feb 2020 11:11:34 +0100 Subject: [PATCH 04/11] Migrate canceller --- src/fts3/cli/jobcanceller.py | 56 ++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 src/fts3/cli/jobcanceller.py diff --git a/src/fts3/cli/jobcanceller.py b/src/fts3/cli/jobcanceller.py new file mode 100644 index 00000000..b36fbe68 --- /dev/null +++ b/src/fts3/cli/jobcanceller.py @@ -0,0 +1,56 @@ +# Copyright notice: +# Copyright Members of the EMI Collaboration, 2013. +# +# See www.eu-emi.eu for details on the copyright holders +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .base import Base +from fts3.rest.client import Submitter + + +class JobCanceller(Base): + def __init__(self): + super(JobCanceller, self).__init__( + description=""" + This command can be used to cancel a running job. It returns the final state of the canceled job. + Please, mind that if the job is already in a final state (FINISHEDDIRTY, FINISHED, FAILED), + this command will return this state. + You can additionally cancel only a subset appending a comma-separated list of file ids + """, + example=""" + $ %(prog)s -s https://fts3-devel.cern.ch:8446 c079a636-c363-11e3-b7e5-02163e009f5a + FINISHED + $ %(prog)s -s https://fts3-devel.cern.ch:8446 c079a636-c363-11e3-b7e5-02163e009f5a:5658,5659,5670 + CANCELED, CANCELED, CANCELED + """, + ) + + def run(self): + if ":" in self.args[0]: + job_id, file_ids = self.args[0].split(":") + file_ids = file_ids.split(",") + else: + job_id = self.args[0] + file_ids = None + + context = self._create_context() + submitter = Submitter(context) + result = submitter.cancel(job_id, file_ids) + if file_ids: + if isinstance(result, basestring): + self.logger.info(result) + else: + self.logger.info("\n".join(result)) + else: + self.logger.info(result["job_state"]) -- GitLab From f2668aa8d3317509870e6306c846c3fba71e4c4a Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Wed, 12 Feb 2020 11:18:25 +0100 Subject: [PATCH 05/11] Migrat jobdeletionsubmitter --- src/fts3/cli/jobcanceller.py | 2 +- src/fts3/cli/jobdeletionsubmitter.py | 198 +++++++++++++++++++++++++++ 2 files changed, 199 insertions(+), 1 deletion(-) create mode 100644 src/fts3/cli/jobdeletionsubmitter.py diff --git a/src/fts3/cli/jobcanceller.py b/src/fts3/cli/jobcanceller.py index b36fbe68..8bc02b75 100644 --- a/src/fts3/cli/jobcanceller.py +++ b/src/fts3/cli/jobcanceller.py @@ -48,7 +48,7 @@ class JobCanceller(Base): submitter = Submitter(context) result = submitter.cancel(job_id, file_ids) if file_ids: - if isinstance(result, basestring): + if isinstance(result, str): self.logger.info(result) else: self.logger.info("\n".join(result)) diff --git a/src/fts3/cli/jobdeletionsubmitter.py b/src/fts3/cli/jobdeletionsubmitter.py new file mode 100644 index 00000000..7f54b0ac --- /dev/null +++ b/src/fts3/cli/jobdeletionsubmitter.py @@ -0,0 +1,198 @@ +# Copyright notice: +# Copyright CERN, 2014. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import timedelta +import json +import logging +import sys +import time + +from .base import Base +from fts3.rest.client import Submitter, Delegator, Inquirer + + +def _metadata(data): + try: + return json.loads(data) + except Exception: + return str(data) + + +class JobDeletionSubmitter(Base): + def __init__(self): + super(JobDeletionSubmitter, self).__init__( + extra_args="SURL1 [SURL2] [SURL3] [...]", + description=""" + This command can be used to submit a deletion job to FTS3. It supports simple and bulk submissions. + """, + example=""" + $ %(prog)s -s https://fts3-devel.cern.ch:8446 gsiftp://source.host/file1 gsiftp://source.host/file2 + Job successfully submitted. + Job id: 9fee8c1e-c46d-11e3-8299-02163e00a17a + + $ %(prog)s -s https://fts3-devel.cern.ch:8446 -f bulk.list + Job successfully submitted. + Job id: 9fee8c1e-c46d-11e3-8299-02163e00a17a + """, + ) + + # Specific options + self.opt_parser.add_option( + "-b", + "--blocking", + dest="blocking", + default=False, + action="store_true", + help="blocking mode. Wait until the operation completes.", + ) + self.opt_parser.add_option( + "-i", + "--interval", + dest="poll_interval", + type="int", + default=30, + help="interval between two poll operations in blocking mode.", + ) + self.opt_parser.add_option( + "-e", + "--expire", + dest="proxy_lifetime", + type="int", + default=420, + help="expiration time of the delegation in minutes.", + ) + self.opt_parser.add_option( + "--job-metadata", dest="job_metadata", help="transfer job metadata." + ) + self.opt_parser.add_option( + "--file-metadata", dest="file_metadata", help="file metadata." + ) + self.opt_parser.add_option( + "-S", + "--spacetoken", + dest="spacetoken", + help="the space token or its description.", + ) + self.opt_parser.add_option( + "--dry-run", + dest="dry_run", + default=False, + action="store_true", + help="do not send anything, just print the JSON message.", + ) + self.opt_parser.add_option( + "-f", + "--file", + dest="bulk_file", + type="string", + help="Name of configuration file", + ) + self.opt_parser.add_option( + "--retry", + dest="retry", + type="int", + default=0, + help="Number of retries. If 0, the server default will be used." + "If negative, there will be no retries.", + ) + self.opt_parser.add_option( + "--cloud-credentials", + dest="cloud_cred", + default=None, + help="use cloud credentials for the job (i.e. dropbox).", + ) + + def validate(self): + if not self.options.bulk_file: + if len(self.args) < 1: + self.logger.critical("Need at least a surl") + sys.exit(1) + + if self.options.verbose: + self.logger.setLevel(logging.DEBUG) + + def _build_delete(self): + if self.options.bulk_file: + files = list(filter(len, open(self.options.bulk_file).readlines())) + if len(files): + return files + else: + self.logger.critical("Could not find any file to delete") + sys.exit(1) + else: + return self.args + + def _do_submit(self, context): + + delegator = Delegator(context) + delegator.delegate(timedelta(minutes=self.options.proxy_lifetime)) + + submitter = Submitter(context) + + job_id = submitter.submit( + delete=self._build_delete(), + spacetoken=self.options.spacetoken, + job_metadata=_metadata(self.options.job_metadata), + retry=self.options.retry, + credential=self.options.cloud_cred, + ) + + if self.options.json: + self.logger.info(json.dumps(job_id)) + else: + self.logger.info("Job successfully submitted.") + self.logger.info("Job id: %s" % job_id) + + if job_id and self.options.blocking: + inquirer = Inquirer(context) + job = inquirer.get_job_status(job_id) + while job["job_state"] in [ + "SUBMITTED", + "READY", + "STAGING", + "QOS_TRANSITION", + "ACTIVE", + "DELETE", + ]: + self.logger.info("Job in state %s" % job["job_state"]) + time.sleep(self.options.poll_interval) + job = inquirer.get_job_status(job_id) + + self.logger.info("Job finished with state %s" % job["job_state"]) + if job["reason"]: + self.logger.info("Reason: %s" % job["reason"]) + + return job_id + + def _do_dry_run(self, context): + + submitter = Submitter(context) + print( + submitter.build_submission( + delete=self._build_delete(), + spacetoken=self.options.spacetoken, + job_metadata=_metadata(self.options.job_metadata), + retry=self.options.retry, + credential=self.options.cloud_cred, + ) + ) + return None + + def run(self): + context = self._create_context() + if not self.options.dry_run: + return self._do_submit(context) + else: + return self._do_dry_run(context) -- GitLab From b9d65a42d5a28aef7c837833c65a8f17fb649131 Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Wed, 12 Feb 2020 11:20:04 +0100 Subject: [PATCH 06/11] Migrate utils --- src/fts3/cli/utils.py | 62 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 src/fts3/cli/utils.py diff --git a/src/fts3/cli/utils.py b/src/fts3/cli/utils.py new file mode 100644 index 00000000..cc20137f --- /dev/null +++ b/src/fts3/cli/utils.py @@ -0,0 +1,62 @@ +# Copyright notice: +# Copyright Members of the EMI Collaboration, 2013. +# +# See www.eu-emi.eu for details on the copyright holders +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + + +def job_human_readable(job): + """ + Generates a human readable string for the given job. + """ + + s = ( + """Request ID: %(job_id)s +Status: %(job_state)s +Client DN: %(user_dn)s +Reason: %(reason)s +Submission time: %(submit_time)s +Priority: %(priority)d +VO Name: %(vo_name)s +""" + % job + ) + + return s + + +def job_list_human_readable(job_list): + """ + Generates a guman readable string for the given job list. + """ + jobstr = [] + for job in job_list: + jobstr.append(job_human_readable(job)) + return "\n".join(jobstr) + + +def job_list_as_json(job_list): + """ + Serializes a job list into JSON + """ + return json.dumps(job_list, indent=2) + + +def job_as_json(job): + """ + Serializes a job into JSON + """ + return json.dumps(job, indent=2) -- GitLab From e8f12b2c6ee378ba813fab5d3cfae71cc9b1fbee Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Wed, 12 Feb 2020 11:20:53 +0100 Subject: [PATCH 07/11] Migrate joblister --- src/fts3/cli/joblister.py | 76 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 src/fts3/cli/joblister.py diff --git a/src/fts3/cli/joblister.py b/src/fts3/cli/joblister.py new file mode 100644 index 00000000..32e565db --- /dev/null +++ b/src/fts3/cli/joblister.py @@ -0,0 +1,76 @@ +# Copyright notice: +# Copyright Members of the EMI Collaboration, 2013. +# +# See www.eu-emi.eu for details on the copyright holders +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from fts3.rest.client import Inquirer +from .base import Base +from .utils import * + + +class JobLister(Base): + def __init__(self): + super(JobLister, self).__init__( + description="This command can be used to list the running jobs, allowing to filter by user dn or vo name", + example=""" + $ %(prog)s -s https://fts3-devel.cern.ch:8446 -o atlas + Request ID: ff294db7-655a-4c0a-9efb-44a994677bb3 + Status: ACTIVE + Client DN: /DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=ddmadmin/CN=531497/CN=Robot: ATLAS Data Management + Reason: None + Submission time: 2014-04-15T07:05:38 + Priority: 3 + VO Name: atlas + + Request ID: a2e4586c-760a-469e-8303-d0f3d5aadc73 + Status: READY + Client DN: /DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=ddmadmin/CN=531497/CN=Robot: ATLAS Data Management + Reason: None + Submission time: 2014-04-15T07:07:33 + Priority: 3 + VO Name: atlas + """, + ) + # Specific options + self.opt_parser.add_option( + "-u", "--userdn", dest="user_dn", help="query only for the given user" + ) + self.opt_parser.add_option( + "-o", "--voname", dest="vo_name", help="query only for the given VO" + ) + self.opt_parser.add_option( + "--source", + dest="source_se", + help="query only for the given source storage element", + ) + self.opt_parser.add_option( + "--destination", + dest="dest_se", + help="query only for the given destination storage element", + ) + + def run(self): + context = self._create_context() + inquirer = Inquirer(context) + job_list = inquirer.get_job_list( + self.options.user_dn, + self.options.vo_name, + self.options.source_se, + self.options.dest_se, + ) + if not self.options.json: + self.logger.info(job_list_human_readable(job_list)) + else: + self.logger.info(job_list_as_json(job_list)) -- GitLab From d9cd1259208aecc2226438f5e99c3320affab691 Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Wed, 12 Feb 2020 11:26:45 +0100 Subject: [PATCH 08/11] Migrate jobshower --- src/fts3/cli/jobshower.py | 57 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 src/fts3/cli/jobshower.py diff --git a/src/fts3/cli/jobshower.py b/src/fts3/cli/jobshower.py new file mode 100644 index 00000000..7c30918f --- /dev/null +++ b/src/fts3/cli/jobshower.py @@ -0,0 +1,57 @@ +# Copyright notice: +# Copyright Members of the EMI Collaboration, 2013. +# +# See www.eu-emi.eu for details on the copyright holders +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from fts3.rest.client import Inquirer +from .base import Base +from .utils import * + + +class JobShower(Base): + def __init__(self): + super(JobShower, self).__init__( + extra_args="JOB_ID", + description="This command can be used to check the current status of a given job", + example=""" + $ %(prog)s -s https://fts3-devel.cern.ch:8446 c079a636-c363-11e3-b7e5-02163e009f5a + Request ID: c079a636-c363-11e3-b7e5-02163e009f5a + Status: FINISHED + Client DN: /DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=saketag/CN=678984/CN=Alejandro Alvarez Ayllon + Reason: + Submission time: 2014-04-13T23:31:34 + Priority: 3 + VO Name: dteam + """, + ) + + def validate(self): + if len(self.args) == 0: + self.logger.critical("Need a job id") + sys.exit(1) + + def run(self): + job_id = self.args[0] + context = self._create_context() + + inquirer = Inquirer(context) + job = inquirer.get_job_status(job_id, list_files=self.options.json) + + if not self.options.json: + self.logger.info(job_human_readable(job)) + else: + self.logger.info(job_as_json(job)) -- GitLab From 3a07ec336c3d31a138b2afb154b6b49573440242 Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Wed, 12 Feb 2020 11:30:31 +0100 Subject: [PATCH 09/11] Migrate jobsubmitter --- src/fts3/cli/jobsubmitter.py | 423 +++++++++++++++++++++++++++++++++++ 1 file changed, 423 insertions(+) create mode 100644 src/fts3/cli/jobsubmitter.py diff --git a/src/fts3/cli/jobsubmitter.py b/src/fts3/cli/jobsubmitter.py new file mode 100644 index 00000000..d9c5ad8d --- /dev/null +++ b/src/fts3/cli/jobsubmitter.py @@ -0,0 +1,423 @@ +# Copyright notice: +# Copyright Members of the EMI Collaboration, 2013. +# +# See www.eu-emi.eu for details on the copyright holders +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import timedelta +import json +import logging +import sys +import time + +from .base import Base +from fts3.rest.client import Submitter, Delegator, Inquirer + +DEFAULT_CHECKSUM = "ADLER32" + + +def _metadata(data): + try: + return json.loads(data) + except Exception: + return str(data) + + +class JobSubmitter(Base): + def __init__(self): + super(JobSubmitter, self).__init__( + extra_args="SOURCE DESTINATION [CHECKSUM]", + description=""" + This command can be used to submit new jobs to FTS3. It supports simple and bulk submissions. The bulk + format is as follows: + + ```json + { + "files": [ + { + "sources": [ + "gsiftp://source.host/file" + ], + "destinations": [ + "gsiftp://destination.host/file" + ], + "metadata": "file-metadata", + "checksum": "ADLER32:1234", + "filesize": 1024 + }, + { + "sources": [ + "gsiftp://source.host/file2" + ], + "destinations": [ + "gsiftp://destination.host/file2" + ], + "metadata": "file2-metadata", + "checksum": "ADLER32:4321", + "filesize": 2048, + "activity": "default" + } + ] + } + ``` + """, + example=""" + $ %(prog)s -s https://fts3-devel.cern.ch:8446 gsiftp://source.host/file gsiftp://destination.host/file + Job successfully submitted. + Job id: 9fee8c1e-c46d-11e3-8299-02163e00a17a + + $ %(prog)s -s https://fts3-devel.cern.ch:8446 -f bulk.json + Job successfully submitted. + Job id: 9fee8c1e-c46d-11e3-8299-02163e00a17a + """, + ) + + # Specific options + self.opt_parser.add_option( + "-b", + "--blocking", + dest="blocking", + default=False, + action="store_true", + help="blocking mode. Wait until the operation completes.", + ) + self.opt_parser.add_option( + "-i", + "--interval", + dest="poll_interval", + type="int", + default=30, + help="interval between two poll operations in blocking mode.", + ) + self.opt_parser.add_option( + "-e", + "--expire", + dest="proxy_lifetime", + type="int", + default=420, + help="expiration time of the delegation in minutes.", + ) + self.opt_parser.add_option( + "--delegate-when-lifetime-lt", + type=int, + default=120, + help="delegate the proxy when the remote lifetime is less than this value (in minutes)", + ) + self.opt_parser.add_option( + "-o", + "--overwrite", + dest="overwrite", + default=False, + action="store_true", + help="overwrite files.", + ) + self.opt_parser.add_option( + "-r", + "--reuse", + dest="reuse", + default=False, + action="store_true", + help="enable session reuse for the transfer job.", + ) + self.opt_parser.add_option( + "--job-metadata", dest="job_metadata", help="transfer job metadata." + ) + self.opt_parser.add_option( + "--file-metadata", dest="file_metadata", help="file metadata." + ) + self.opt_parser.add_option( + "--file-size", dest="file_size", type="long", help="file size (in Bytes)" + ) + self.opt_parser.add_option( + "-g", "--gparam", dest="gridftp_params", help="GridFTP parameters." + ) + self.opt_parser.add_option( + "-t", + "--dest-token", + dest="destination_token", + help="the destination space token or its description.", + ) + self.opt_parser.add_option( + "-S", + "--source-token", + dest="source_token", + help="the source space token or its description.", + ) + self.opt_parser.add_option( + "-K", + "--compare-checksum", + dest="compare_checksum", + default=False, + action="store_true", + help="deprecated: compare checksums between source and destination.", + ) + self.opt_parser.add_option( + "-C", + "--checksum-mode", + dest="checksum_mode", + type="string", + default="none", + help="compare checksums in source, target, both or none.", + ) + self.opt_parser.add_option( + "--copy-pin-lifetime", + dest="pin_lifetime", + type="long", + default=-1, + help="pin lifetime of the copy in seconds.", + ) + self.opt_parser.add_option( + "--bring-online", + dest="bring_online", + type="long", + default=None, + help="bring online timeout in seconds.", + ) + self.opt_parser.add_option( + "--timeout", + dest="timeout", + type="long", + default=None, + help="transfer timeout in seconds.", + ) + self.opt_parser.add_option( + "--fail-nearline", + dest="fail_nearline", + default=False, + action="store_true", + help="fail the transfer is the file is nearline.", + ) + self.opt_parser.add_option( + "--dry-run", + dest="dry_run", + default=False, + action="store_true", + help="do not send anything, just print the JSON message.", + ) + self.opt_parser.add_option( + "-f", + "--file", + dest="bulk_file", + type="string", + help="Name of configuration file", + ) + self.opt_parser.add_option( + "--retry", + dest="retry", + type="int", + default=0, + help="Number of retries. If 0, the server default will be used." + "If negative, there will be no retries.", + ) + self.opt_parser.add_option( + "-m", + "--multi-hop", + dest="multihop", + default=False, + action="store_true", + help="submit a multihop transfer.", + ) + self.opt_parser.add_option( + "--cloud-credentials", + dest="cloud_cred", + default=None, + help="use cloud credentials for the job (i.e. dropbox).", + ) + self.opt_parser.add_option( + "--nostreams", dest="nostreams", default=None, help="number of streams" + ) + self.opt_parser.add_option( + "--ipv4", dest="ipv4", default=False, action="store_true", help="force ipv4" + ) + self.opt_parser.add_option( + "--ipv6", dest="ipv6", default=False, action="store_true", help="force ipv6" + ) + self.opt_parser.add_option( + "--target-qos", + dest="target_qos", + type="string", + default=None, + help="define the target QoS for this transfer for CDMI endpoints", + ) + + def validate(self): + self.checksum = None + if not self.options.bulk_file: + if len(self.args) < 2: + self.logger.critical("Need a source and a destination") + sys.exit(1) + elif len(self.args) == 2: + (self.source, self.destination) = self.args + self.checksum = None + elif len(self.args) == 3: + (self.source, self.destination, self.checksum) = self.args + else: + self.logger.critical("Too many parameters") + sys.exit(1) + else: + self.checksum = None + + if self.options.ipv4 and self.options.ipv6: + self.opt_parser.error("ipv4 and ipv6 can not be used at the same time") + + if self.options.verbose: + self.logger.setLevel(logging.DEBUG) + + def _build_transfers(self): + if self.options.bulk_file: + filecontent = open(self.options.bulk_file).read() + bulk = json.loads(filecontent) + if "files" in bulk: + return bulk["files"] + elif "Files" in bulk: + return bulk["Files"] + else: + self.logger.critical("Could not find any transfers") + sys.exit(1) + else: + return [{"sources": [self.source], "destinations": [self.destination]}] + + def _do_submit(self, context): + # Backwards compatibility: compare_checksum parameter + if self.options.compare_checksum: + checksum_mode = "both" + else: + if self.checksum: + checksum_mode = "target" + else: + checksum = "none" + # Compare checksum has major priority than checksum_mode + if not self.options.compare_checksum: + if len(self.options.checksum_mode) > 0: + checksum_mode = self.options.checksum_mode + else: + checksum_mode = "none" + + if not self.checksum: + self.checksum = DEFAULT_CHECKSUM + + if not self.options.access_token: + delegator = Delegator(context) + delegator.delegate( + timedelta(minutes=self.options.proxy_lifetime), + delegate_when_lifetime_lt=timedelta( + minutes=self.options.delegate_when_lifetime_lt + ), + ) + + submitter = Submitter(context) + + job_id = submitter.submit( + self._build_transfers(), + checksum=self.checksum, + bring_online=self.options.bring_online, + timeout=self.options.timeout, + verify_checksum=checksum_mode[0], + spacetoken=self.options.destination_token, + source_spacetoken=self.options.source_token, + fail_nearline=self.options.fail_nearline, + file_metadata=_metadata(self.options.file_metadata), + filesize=self.options.file_size, + gridftp=self.options.gridftp_params, + job_metadata=_metadata(self.options.job_metadata), + overwrite=self.options.overwrite, + copy_pin_lifetime=self.options.pin_lifetime, + reuse=self.options.reuse, + retry=self.options.retry, + multihop=self.options.multihop, + credential=self.options.cloud_cred, + nostreams=self.options.nostreams, + ipv4=self.options.ipv4, + ipv6=self.options.ipv6, + target_qos=self.options.target_qos, + ) + + if self.options.json: + self.logger.info(json.dumps(job_id)) + else: + self.logger.info("Job successfully submitted.") + self.logger.info("Job id: %s" % job_id) + if job_id and self.options.blocking: + inquirer = Inquirer(context) + job = inquirer.get_job_status(job_id) + while job["job_state"] in [ + "SUBMITTED", + "READY", + "STAGING", + "ACTIVE", + "QOS_TRANSITION", + ]: + self.logger.info("Job in state %s" % job["job_state"]) + time.sleep(self.options.poll_interval) + job = inquirer.get_job_status(job_id) + + self.logger.info("Job finished with state %s" % job["job_state"]) + if job["reason"]: + self.logger.info("Reason: %s" % job["reason"]) + + return job_id + + def _do_dry_run(self, context): + # Backwards compatibility: compare_checksum parameter + if self.options.compare_checksum: + checksum_mode = "both" + else: + if self.checksum: + checksum_mode = "target" + else: + checksum = "none" + # Compare checksum has major priority than checksum_mode + if not self.options.compare_checksum: + if len(self.options.checksum_mode) > 0: + checksum_mode = self.options.checksum_mode + else: + checksum_mode = "none" + + if not self.checksum: + self.checksum = DEFAULT_CHECKSUM + + submitter = Submitter(context) + print( + submitter.build_submission( + self._build_transfers(), + checksum=self.checksum, + bring_online=self.options.bring_online, + timeout=self.options.timeout, + verify_checksum=checksum_mode, + spacetoken=self.options.destination_token, + source_spacetoken=self.options.source_token, + fail_nearline=self.options.fail_nearline, + file_metadata=_metadata(self.options.file_metadata), + filesize=self.options.file_size, + gridftp=self.options.gridftp_params, + job_metadata=_metadata(self.options.job_metadata), + overwrite=self.options.overwrite, + copy_pin_lifetime=self.options.pin_lifetime, + reuse=self.options.reuse, + retry=self.options.retry, + multihop=self.options.multihop, + credential=self.options.cloud_cred, + nostreams=self.options.nostreams, + ipv4=self.options.ipv4, + ipv6=self.options.ipv6, + ) + ) + return None + + def run(self): + context = self._create_context() + if not self.options.dry_run: + return self._do_submit(context) + else: + return self._do_dry_run(context) -- GitLab From 549abb258d9ea4cbb476ac91ade0795ef1ff8f55 Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Wed, 12 Feb 2020 11:35:03 +0100 Subject: [PATCH 10/11] Migrate serverstatus and whoami --- src/fts3/cli/serverstatus.py | 60 ++++++++++++++++++++++++++++++++++++ src/fts3/cli/whoami.py | 54 ++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+) create mode 100644 src/fts3/cli/serverstatus.py create mode 100644 src/fts3/cli/whoami.py diff --git a/src/fts3/cli/serverstatus.py b/src/fts3/cli/serverstatus.py new file mode 100644 index 00000000..b7a2c04f --- /dev/null +++ b/src/fts3/cli/serverstatus.py @@ -0,0 +1,60 @@ +# Copyright notice: +# Copyright CERN, 2015. +# +# See www.eu-emi.eu for details on the copyright holders +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from .base import Base + + +class ServerStatus(Base): + def __init__(self): + super(ServerStatus, self).__init__( + description="Use this command to check on the service status.", + ) + + self.opt_parser.add_option( + "-H", + "--host", + dest="host", + default=None, + help="limit the output to a given host", + ) + self.opt_parser.add_option( + "--is-active", + dest="is_active", + default=False, + action="store_true", + help="the tool will return < 0 on error, 0 if nothing is active, " + "1 if there are active transfers, 2 if there are active staging, 3 if there are both ", + ) + + def run(self): + context = self._create_context() + host_activity = json.loads(context.get("/status/hosts")) + hosts = [self.options.host] if self.options.host else host_activity.keys() + total_count = dict(active=0, staging=0) + for host in hosts: + self.logger.info(host) + for state, count in host_activity.get(host, {}).iteritems(): + self.logger.info("\t%s: %d" % (state, count)) + total_count[state] += count + + if self.options.is_active: + return ((total_count["active"] > 0) * 1) + ( + (total_count["staging"] > 0) * 2 + ) + else: + return 0 diff --git a/src/fts3/cli/whoami.py b/src/fts3/cli/whoami.py new file mode 100644 index 00000000..7499cad5 --- /dev/null +++ b/src/fts3/cli/whoami.py @@ -0,0 +1,54 @@ +# Copyright notice: +# Copyright Members of the EMI Collaboration, 2013. +# +# See www.eu-emi.eu for details on the copyright holders +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from .base import Base +from fts3.rest.client import Inquirer + + +class WhoAmI(Base): + def __init__(self): + super(WhoAmI, self).__init__( + description=""" + This command exists for convenience. It can be used to check, as the name suggests, + who are we for the server. + """, + example=""" + $ %(prog)s -s https://fts3-pilot.cern.ch:8446 + User DN: /DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=saketag/CN=678984/CN=Alejandro Alvarez Ayllon + VO: dteam + VO: dteam/cern + Delegation id: 9a4257f435fa2010 + """, + ) + + def run(self): + context = self._create_context() + inquirer = Inquirer(context) + whoami = inquirer.whoami() + + if self.options.json: + print(json.dumps(whoami, indent=2)) + else: + self.logger.info("User DN: %s" % whoami["dn"][0]) + for vo in whoami["vos"]: + self.logger.info("VO: %s" % vo) + for vo_id in whoami["vos_id"]: + self.logger.info("VO id: %s" % vo_id) + self.logger.info("Delegation id: %s" % whoami["delegation_id"]) + self.logger.info("Base id: %s" % whoami["base_id"]) -- GitLab From 6bd0cb206d0e13b6ed8cf656aceded9689d6ddaf Mon Sep 17 00:00:00 2001 From: Carles Garcia Cabot <carles.garcia.cabot@cern.ch> Date: Wed, 12 Feb 2020 11:37:43 +0100 Subject: [PATCH 11/11] Migrate init --- src/fts3/cli/__init__.py | 74 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 src/fts3/cli/__init__.py diff --git a/src/fts3/cli/__init__.py b/src/fts3/cli/__init__.py new file mode 100644 index 00000000..f7cd3ec6 --- /dev/null +++ b/src/fts3/cli/__init__.py @@ -0,0 +1,74 @@ +# Copyright notice: +# Copyright Members of the EMI Collaboration, 2013. +# +# See www.eu-emi.eu for details on the copyright holders +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .banning import Banning +from .delegator import Delegator +from .jobcanceller import JobCanceller +from .joblister import JobLister +from .jobshower import JobShower +from .jobsubmitter import JobSubmitter +from .jobdeletionsubmitter import JobDeletionSubmitter +from .serverstatus import ServerStatus +from .whoami import WhoAmI +import logging +import sys + + +class FTS3CliFormatter(logging.Formatter): + def format(self, record): + + if record.levelno == logging.CRITICAL: + self._fmt = "Error: %(msg)s" + elif record.levelno == logging.WARNING: + self._fmt = "# Warning: %(msg)s" + elif record.levelno == logging.DEBUG: + self._fmt = "# %(msg)s" + else: + self._fmt = "%(msg)s" + + return logging.Formatter.format(self, record) + + +class FTS3CliFilter(object): + def __init__(self, includes): + self.includes = includes + + def filter(self, record): + return record.levelno in self.includes + + +class FTS3CliFilterExclude(object): + def __init__(self, excludes): + self.excludes = excludes + + def filter(self, record): + return record.levelno not in self.excludes + + +fmt = FTS3CliFormatter() +stdout_handler = logging.StreamHandler(sys.stdout) +stderr_handler = logging.StreamHandler(sys.stderr) + +stdout_handler.setFormatter(fmt) +stdout_handler.addFilter(FTS3CliFilterExclude([logging.WARNING, logging.DEBUG])) +stderr_handler.setFormatter(fmt) +stderr_handler.addFilter(FTS3CliFilter([logging.WARNING, logging.DEBUG])) + +logging.root.addHandler(stdout_handler) +logging.root.addHandler(stderr_handler) + +logging.root.setLevel(logging.INFO) -- GitLab