Skip to content
Snippets Groups Projects
Commit b1a49a35 authored by ddmusr01's avatar ddmusr01
Browse files

Enhancements and fixes to various tools, cleaning.py looks ready to replace...

Enhancements and fixes to various tools, cleaning.py looks ready to replace verify_gfal* and remove_gfal*
parent e114743d
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@ import os
import pprint
import requests
import sys
from functools import partial
fields = {
'is_tape': bool,
......@@ -50,7 +51,6 @@ fields = {
def parse_args(args):
parser = argparse.ArgumentParser(prog=args[0])
parser.add_argument('endpoint', default=None, metavar='ENDPOINT')
parser.add_argument('--pretty', action='store_true')
parser.add_argument('-f', '--file', default=None, metavar='FILENAME')
for field, type_ in fields.items():
......@@ -62,6 +62,29 @@ def parse_args(args):
return parser.parse_args(args[1:])
def build_filter_funcs(args):
filter_exprs = [(argument[7:], value) for argument, value in args._get_kwargs() if value is not None and argument.startswith('filter_')]
filter_funcs = []
for field, value in filter_exprs:
filter_funcs.append(lambda endpoint: endpoint[field] == value)
filter_funcs[-1].generated_as = partial(str.format, 'lambda endpoint: endpoint[{0}] == {1}', field, value)
return filter_funcs
priorities = {
'name': -3,
'se': -2,
'endpoint': 1,
}
def tweak_priorities(key):
return priorities.get(key, key)
def match_all(filter_funcs, endpoint):
if not filter_funcs:
return True
return all(func(endpoint) for func in filter_funcs)
def main(args):
if args.file is None or not os.path.exists(args.file):
req = requests.get('http://atlas-agis-api.cern.ch/request/ddmendpoint/query/list/?json')
......@@ -78,10 +101,8 @@ def main(args):
with open(args.file) as f:
endpoints = cPickle.load(f)
# If endpoint name is specified filter by name
if args.endpoint is not None:
endpoints = [endpoint for endpoint in endpoints if endpoint['name'] == args.endpoint]
filter_funcs = build_filter_funcs(args)
endpoints = (endpoint for endpoint in endpoints if match_all(filter_funcs, endpoint))
# If any field is specified only show specified fields
if args.show_fields:
......@@ -95,12 +116,12 @@ def main(args):
endpoints = new_list
if args.pretty:
pprint.pprint(endpoints)
pprint.pprint(list(endpoints))
else:
for endpoint in endpoints:
fields = []
for key, val in sorted(endpoint.items(), key=lambda t: t[0]):
fields.append(val)
for key, val in sorted(endpoint.items(), key=lambda t: tweak_priorities(t[0])):
fields.append(str(val))
print(','.join(fields))
......
#!/usr/bin/python
from functools import partial
from itertools import imap
from rucio.common.exception import SourceNotFound
......@@ -91,17 +92,17 @@ def process_input(gfal_ctx, read_proto, delete_proto, input_generator, full_urls
for command, output in zip(commands, output_files):
if command == 'ls':
if not full_urls:
path = read_full_prefix + path
output.write(formated_stat(gfal_stat(gfal_ctx, path), path))
ls_path = read_full_prefix + path
output.write(formated_stat(gfal_stat(gfal_ctx, ls_path), ls_path))
elif command == 'rm':
if not full_urls:
path = delete_full_prefix + path
rm_path = delete_full_prefix + path
try:
delete_proto.delete(path)
delete_proto.delete(rm_path)
except SourceNotFound:
output.write(RM_FORMAT.format('NOTFOUND', path))
output.write(RM_FORMAT.format('NOTFOUND', rm_path))
else:
output.write(RM_FORMAT.format('DELETED', path))
output.write(RM_FORMAT.format('DELETED', rm_path))
if __name__ == '__main__':
......
json2csv.py 100644 → 100755
#!/usr/bin/env python
import json
import sys
import pprint
......
......@@ -5,6 +5,9 @@ import sys
# rucio/user/something/xx/yy/name
# rucio/group/something/xx/yy/name
ignore_non_standard = '--ignore-non-standard' in sys.argv[1:]
for line in sys.stdin:
line = line.strip()
components = line.split('/')
......@@ -13,7 +16,7 @@ for line in sys.stdin:
components = components[-5:]
elif len(components) == 4:
components.insert(0, 'rucio')
else:
elif not ignore_non_standard:
raise Exception("Don't know how to translate {0}".format(line))
......@@ -22,5 +25,5 @@ for line in sys.stdin:
print('{0}:{1}'.format(components[1], components[-1]))
elif components[0] in ('user', 'group'):
print('{0}.{1}:{2}'.format(components[0], components[1], components[-1]))
else:
elif not ignore_non_standard:
raise Exception("Don't know how to translate {0}".format(line))
......@@ -36,7 +36,7 @@ def parse_paths(url, paths):
def build_paths(iterator, prefix):
assert prefix.endswith('rucio'), prefix
for path in iterator:
if path.startswith('SAM/'):
if path.startswith('SAM/') or path.startswith('panda/destDB/'):
yield ('/'.join(prefix.split('/')[:-1]) + '/' + path).strip()
else:
yield (prefix + '/' + path).strip()
......
#!/usr/bin/env python
import gfal
import itertools
import argparse
from datetime import datetime
#from rucio.rse.protocols.gfal import Default
import gfal
def chunks(iterable, size=1024):
it = iter(iterable)
......@@ -12,16 +15,6 @@ def chunks(iterable, size=1024):
chunk = list(itertools.islice(it, size))
def exists(urls):
status, internal, desc = gfal.gfal_init({'surls': urls})
assert status == 0, 'gfal_init returned: {0}, "{1}"'.format(status, desc)
gfal.gfal_ls(internal)
status, internal, desc = gfal.gfal_get_results(internal)
assert status == len(urls), 'gfal_get_results returned: {0}, "{1}"'.format(status, desc)
return itertools.izip(urls, desc)
def parse_paths(url, paths):
for path in paths:
yield '/'.join((url, path.strip()))
......@@ -29,12 +22,15 @@ def parse_paths(url, paths):
if __name__ == '__main__':
import sys
# srm_proto = Default({}, {'deterministic': True})
# srm_proto.connect()
parser = argparse.ArgumentParser(
description='Verify if the input paths exists in any of the given DDM Endpoints'
)
parser.add_argument('--bulk', type=int, default=1024, help='Number of paths sent to the server in each request')
args = parser.parse_args()
for urls in chunks(sys.stdin.xreadlines(), size=args.bulk):
status, internal, desc = gfal.gfal_init({'surls': urls})
assert status == 0, 'gfal_init returned: {0}, "{1}"'.format(status, desc)
......@@ -44,6 +40,39 @@ if __name__ == '__main__':
status, internal, desc = gfal.gfal_get_results(internal)
assert status == len(urls), 'Every URL must have a status'
#assert all(entry['status'] == 0 for entry in desc), 'Removal of all files must succeed'
for entry in desc:
print('STATUS{0},{1}'.format(entry['status'], entry['surl']))
# for url in sys.stdin.xreadlines():
# try:
# srm_proto.delete(url)
# except Exception as e:
# print('FAILED,' + str(url) + ',' + e.__class__.__name__ + ':' + e.message)
# else:
# print('SUCCESS,' + str(url) + ',')
# for urls in chunks(sys.stdin.xreadlines(), size=args.bulk):
# try:
# srm_proto.delete(urls)
# except Exception as e:
# print('FAILED,' + str(urls) + ',' + e.__class__.__name__ + ':' + e.message)
# else:
# print('SUCCESS,' + str(urls) + ',')
# for urls in chunks(sys.stdin.xreadlines(), size=args.bulk):
# for url in urls:
# print url
# gfal.gfal_unlink(url)
# for urls in chunks(sys.stdin.xreadlines(), size=args.bulk):
# status, internal, desc = gfal.gfal_init({'surls': urls})
# assert status == 0, 'gfal_init returned: {0}, "{1}"'.format(status, desc)
#
# status, internal, desc = gfal.gfal_deletesurls(internal)
# assert status == 0, 'gfal_deletesurls returned: {0}, "{1}"'.format(status, desc)
#
# status, internal, desc = gfal.gfal_get_results(internal)
# assert status == len(urls), 'Every URL must have a status'
# for entry in desc:
# print('STATUS{0},{1}'.format(entry['status'], entry['surl']))
#!/usr/bin/env python
import itertools
import argparse
from datetime import datetime
from rucio.rse.protocols.gfal import Default
from rucio.rse import rsemanager as rsemgr
def chunks(iterable, size=1024):
it = iter(iterable)
chunk = list(itertools.islice(it, size))
while chunk:
yield chunk
chunk = list(itertools.islice(it, size))
def parse_paths(url, paths):
for path in paths:
yield '/'.join((url, path.strip()))
if __name__ == '__main__':
import sys
#srm_proto = Default({}, {'deterministic': True})
#srm_proto.connect()
parser = argparse.ArgumentParser(
description='Verify if the input paths exists in any of the given DDM Endpoints'
)
parser.add_argument('--bulk', type=int, default=1024, help='Number of paths sent to the server in each request')
parser.add_argument('--rse', help='RSE name')
args = parser.parse_args()
rse_info = rsemgr.get_rse_info(args.rse)
proto = rsemgr.create_protocol(rse_info, 'delete', scheme='srm')
for url in sys.stdin.xreadlines():
url = url.strip()
try:
proto.delete(url)
except Exception as e:
print('FAILED,' + str(url) + ',' + str(e).strip())
else:
print('SUCCESS,' + str(url) + ',')
# for urls in chunks(sys.stdin.xreadlines(), size=args.bulk):
# try:
# proto.delete([url.strip() for url in urls])
# except Exception as e:
# print('FAILED,' + str(urls) + ',' + str(e))
# else:
# print('SUCCESS,' + str(urls) + ',')
# sample = "srm://grid-cert-03.roma1.infn.it:8446/dpm/roma1.infn.it/home/atlas/atlasdatadisk/rucio/data11_7TeV/05/37/data11_7TeV.00187815.physics_Muons.merge.AOD.f396_m945._lb0071-lb0072._0001.1"
# sample = "srm://grid-cert-03.roma1.infn.it:8446/srm/managerv2?SFN=/dpm/roma1.infn.it/home/atlas/atlasdatadisk/rucio/data12_8TeV/7f/5a/data12_8TeV.00203335.physics_JetTauEtmiss.merge.AOD.f446_m1148._lb0063._0001.1"
# sample = "srm://grid-cert-03.roma1.infn.it:8446/srm/managerv2?SFN=/dpm/roma1.infn.it/home/atlas/atlasdatadisk/rucio/data12_8TeV/7f/61/data12_8TeV.00203258.physics_JetTauEtmiss.merge.AOD.f444_m1143._lb0118._0001.1"
# try:
# proto.delete(sample)
#except Exception as e:
# import pdb; pdb.set_trace()
# for url in sys.stdin.xreadlines():
# url = url.strip()
# try:
# proto.delete(url)
# except Exception as e:
# print('FAILED,' + str(url) + ',' + e.__class__.__name__ + ':' + e.message)
# else:
# print('SUCCESS,' + str(url) + ',')
# for urls in chunks(sys.stdin.xreadlines(), size=args.bulk):
# try:
# srm_proto.delete(urls)
# except Exception as e:
# print('FAILED,' + str(urls) + ',' + e.__class__.__name__ + ':' + e.message)
# else:
# print('SUCCESS,' + str(urls) + ',')
# for url in sys.stdin.xreadlines():
# try:
# srm_proto.delete(url)
# except Exception as e:
# print('FAILED,' + str(url) + ',' + e.__class__.__name__ + ':' + e.message)
# else:
# print('SUCCESS,' + str(url) + ',')
# for urls in chunks(sys.stdin.xreadlines(), size=args.bulk):
# for url in urls:
# print url
# gfal.gfal_unlink(url)
# for urls in chunks(sys.stdin.xreadlines(), size=args.bulk):
# status, internal, desc = gfal.gfal_init({'surls': urls})
# assert status == 0, 'gfal_init returned: {0}, "{1}"'.format(status, desc)
#
# status, internal, desc = gfal.gfal_deletesurls(internal)
# assert status == 0, 'gfal_deletesurls returned: {0}, "{1}"'.format(status, desc)
#
# status, internal, desc = gfal.gfal_get_results(internal)
# assert status == len(urls), 'Every URL must have a status'
# for entry in desc:
# print('STATUS{0},{1}'.format(entry['status'], entry['surl']))
......@@ -30,7 +30,7 @@ def url_for_endpoint(endpoint):
def parse_paths(url, paths):
sam = '/'.join(url.split('/')[:-1])
for path in paths:
if path.startswith('SAM/'):
if path.startswith('SAM/') or path.startswith('panda/destDB/'):
yield '/'.join((sam, path)).strip()
else:
yield '/'.join((url, path)).strip()
......
......@@ -18,9 +18,14 @@ while read line; do
echo "rucio list-file-replicas $opts \"$did\" | tail -n+3" 1>&2
replicas=$(rucio list-file-replicas $opts "$did" | tail -n+3)
if [ -n "$replicas" ]; then
echo "FALSE_POSITIVE: $did"
echo "$replicas"
echo '--------------------------------------'
if [ -n "$(echo "$replicas" | grep -vF '???')" ]; then
echo "REPLICA,$did,$(echo "$replicas" | tr '\n' ';' | tr -s '[[:blank:]]')"
else
echo "???,$did,$(echo "$replicas" | tr '\n' ';' | tr -s '[[:blank:]]')"
fi
else
echo "NOTFOUND,$did,"
fi
done
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment