-
Petr Vokac authoredPetr Vokac authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dpm-dump 57.87 KiB
#!/usr/bin/env python2
#
# Script for extracting file information from dpm database and converting
# the result into text, json or xml storage dump
#
# Erming Pei, 2009/11/13
# Tomas Kouba, 2012/11/16
# Dennis van Dok, 2015/07/03
# Alessandra Forti, 2015/10/14, 2015/11/18
# Eygene Ryabinkin, 2016
# Georgios Bitzes + Fabrizio Furano, 2016
# Petr Vokac, 2018/12/31
#
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
import sys,os
import string
import datetime
import time
import collections
import logging, logging.handlers
import re
import stat
import socket
import hashlib
import tempfile
import optparse
import getpass
import inspect
import atexit
try: import simplejson as json
except ImportError: import json
# compatibility with python 2 and 3
try: import urllib.parse as urlparse
except ImportError: import urlparse
try: import http.client as httplib
except ImportError: import httplib
# compatibility for existing SLC6, CentOS7, CentOS8 packages
try:
import pymysql
import pymysql.cursors as pymysql_cursors
except ImportError:
import MySQLdb as pymysql
import MySQLdb.cursors as pymysql_cursors
__version__ = '0.1.1'
_log = logging.getLogger('DPMDUMP')
DEFAULT_CNS_DB = 'cns_db'
DEFAULT_DPM_DB = 'dpm_db'
# We could get column names directy from DB,
# but DPM schema is stable to just use predefined constants
# SHOW columns FROM Cns_file_metadata
METADATA_COLUMNS = [
'metadata_rowid', 'metadata_fileid', 'metadata_parent_fileid',
'metadata_guid', 'metadata_name', 'metadata_filemode',
'metadata_nlink', 'metadata_owner_uid', 'metadata_gid',
'metadata_filesize', 'metadata_atime', 'metadata_mtime',
'metadata_ctime', 'metadata_fileclass', 'metadata_status',
'metadata_csumtype', 'metadata_csumvalue', 'metadata_acl',
'metadata_xattr',
]
# SHOW columns FROM Cns_file_replica
REPLICA_COLUMNS = [
'replica_rowid', 'replica_fileid', 'replica_nbaccesses',
'replica_atime', 'replica_ptime', 'replica_status',
'replica_f_type', 'replica_poolname', 'replica_host',
'replica_fs', 'replica_sfn', 'replica_ctime', 'replica_ltime',
'replica_r_type', 'replica_setname', 'replica_xattr',
]
# global list of temporary files used by exit handler
tempfiles = []
@atexit.register
def temp_files_cleanup():
global tempfiles
for filename in tempfiles:
if not os.path.exists(filename): continue
os.unlink(filename)
def guess_config_files():
""" Guesses the location of DPM namespace configuration file """
try:
possible_nsconfigs = ['/opt/lcg/etc/NSCONFIG', '/usr/etc/NSCONFIG']
if 'LCG_LOCATION' in os.environ:
possible_nsconfigs.insert(0, os.environ['LCG_LOCATION'].rstrip('/') + '/etc/NSCONFIG')
for f in possible_nsconfigs:
if os.path.exists(f):
return f
except Exception as e:
_log.warn("failed to guess DB config file location: %s", str(e))
return None
def get_conn_data(nsconfig):
""" Returns connection data from NSCONFIG"""
retval = {}
_log.debug("getting connection info from %s", nsconfig)
try:
nsconfig_line = open(nsconfig).readline().strip()
except Exception as e:
_log.error("Cannot open DPM config file %s: %s", nsconfig, str(e))
sys.exit(-1)
nsre = re.compile(r"(.*?)/(.*?)@([^/]*)(?:/(.*))?")
m = nsre.match(nsconfig_line)
if m == None:
_log.error("Bad line in DPM config '%s', doesn't match re '%s'", nsconfig, nsre)
sys.exit(-1)
retval['user'] = m.group(1)
retval['pass'] = m.group(2)
retval['host'] = m.group(3)
if m.group(4):
retval['cns_db'] = m.group(4)
else:
retval['cns_db'] = DEFAULT_CNS_DB
retval['dpm_db'] = DEFAULT_DPM_DB
_log.debug("database connection: host=%s, user=%s, cns_db=%s, dpm_db=%s", retval['host'], retval['user'], retval['cns_db'], retval['dpm_db'])
return retval
def get_connection(conn_data, db, cclass=pymysql_cursors.Cursor):
return pymysql.connect(
host=conn_data['host'], user=conn_data['user'],
passwd=conn_data['pass'], db=conn_data[db],
cursorclass=cclass)
class BaseFormatter(object):
""" Interface for all formatters """
def __init__(self, fp, base, opts):
""" Initializes formatter """
self._fp = fp
self._base = base
self._opts = opts
self._columns = []
def __str__(self):
return "{0}[{1}]".format(self.__class__.__name__, self._base)
def write(self, data):
""" Writes single record """
raise NotImplementedError
def finish(self):
""" Finalize formatter """
raise NotImplementedError
@property
def base(self):
""" Base path used to construct relative paths {rpath} """
return self._base
@property
def columns(self):
""" Database columns used by specific formatter """
return self._columns
class PlaintextFormatter(BaseFormatter):
def __init__(self, fp, base, opts):
super(PlaintextFormatter, self).__init__(fp, base, opts)
self._header = opts.get('header')
self._footer = opts.get('footer')
self._format = opts.get('format', '{rpath}{nl}')
self._records = 0
self._sumsize = 0
# parse database columns necessary to format record data
self._columns = []
fmt = string.Formatter()
for literal_text, field_name, format_spec, conversion in fmt.parse(self._format):
if field_name in self._columns: continue
if field_name.startswith('metadata_'):
self._columns.append(field_name)
if field_name.startswith('replica_'):
self._columns.append(field_name)
if self._header == None:
return
data = {
'version': __version__,
'sysversion': str(sys.version_info),
'command': " ".join(sys.argv),
'script': os.path.abspath(inspect.getfile(inspect.currentframe())),
'sha256': hashlib.sha256(open(__file__).read().encode('utf-8')).hexdigest(),
'user': getpass.getuser(),
'host': socket.gethostname(),
'filename': getattr(self._fp, 'name', '<unknown>'),
'base': base,
'vo': opts.get('vo', ''),
'curtime': opts.get('curtime'),
'timestamp': int(time.time()),
'datetime': datetime.datetime.now(),
'time': time.ctime(),
'format': self._format,
'nl': '\n', # make it easy to specify newline on command line
'tab': '\t', # make it easy to specify tabulator on command line
'comma': ',', # make it easy to specify ',' on command line
}
#if _log.getEffectiveLevel() < logging.DEBUG:
# _log.debug("{0} {1}".format(self._header, str(data)))
# _log.debug(self._header.replace('{nl}', ';').format(**data))
self._fp.write(self._header.format(**data))
def write(self, data):
self._records += 1
self._sumsize += data.get('metadata_filesize', 0)
#if _log.getEffectiveLevel() < logging.DEBUG:
# _log.debug("{0} {1}".format(self._format, str(data)))
# _log.debug(self._format.replace('{nl}', ';').format(**data))
self._fp.write(self._format.format(**data))
def finish(self):
if self._footer == None:
return
data = {
'timestamp': int(time.time()),
'datetime': datetime.datetime.now(),
'time': time.ctime(),
'records': self._records,
'sumsize': self._sumsize,
'nl': '\n', # make it easy to specify newline on command line
'tab': '\t', # make it easy to specify tabulator on command line
'comma': ',', # make it easy to specify ',' on command line
}
#if _log.getEffectiveLevel() < logging.DEBUG:
# _log.debug("{0} {1}".format(self._footer, str(data)))
# _log.debug(self._footer.replace('{nl}', ';').format(**data))
self._fp.write(self._footer.format(**data))
class XmlFormatter(BaseFormatter):
def __init__(self, fp, base, opts):
super(XmlFormatter, self).__init__(fp, base, opts)
self._columns = [ 'metadata_filesize', 'metadata_atime', 'metadata_mtime', 'metadata_csumtype', 'metadata_csumvalue', 'metadata_xattr' ]
self._fp.write('<?xml version="1.0" encoding="iso-8859-1"?>\n')
self._fp.write('<dump recorded="{0}"><for>vo:{1}</for>\n<entry-set>\n'.format(opts.get('curtime'), opts.get('vo', 'none')))
def _escape(self, astr):
# avoid dependency on xml.sax.saxutils.escape
if type(astr) != str:
return astr
astr = astr.replace("&", "&")
astr = astr.replace("<", "<")
astr = astr.replace(">", ">")
astr = astr.replace('"', """)
astr = astr.replace("'", "'")
return astr
def write(self, data):
rpath = self._escape(data['path'][len(self._base):])
xattr = data.get('metadata_xattr', {})
csumtype = ''
checksum = ''
if 'checksum_adler32' in xattr:
# Cns_file_metadata csumtype & csumvalue columns are deprecated
# use adler32 checksum from xattr column if available in json data
csumtype = 'AD'
checksum = xattr['checksum_adler32']
elif data.get('metadata_csumtype') == 'AD':
# don't use other than adler32 checksum, because DPM < 1.10.x
# did not provide other algorithm and all tools that use these
# dumps expect either adler32 or empty checksum
csumtype = 'AD'
checksum = data['metadata_csumvalue']
self._fp.write('<entry name="{rpath} ctype="{ctype}" cs="{cs}"'.format(rpath=rpath, ctype=csumtype, cs=checksum))
self._fp.write(' size="{metadata_filesize}" atime="{metadata_atime}" mtime="{metadata_mtime}"'.format(**data))
for k, v in sorted(xattr.items()):
self._fp.write(' {0}="{1}"'.format(k, self._escape(v)))
self._fp.write(' />\n')
def finish(self):
self._fp.write("</entry-set>\n")
self._fp.write('<complete>"{0}"</complete>\n'.format(datetime.datetime.isoformat(datetime.datetime.now())))
self._fp.write("</dump>\n")
class JsonFormatter(BaseFormatter):
def __init__(self, fp, base, opts):
super(JsonFormatter, self).__init__(fp, base, opts)
self._columns = [ 'metadata_filesize', 'metadata_atime', 'metadata_mtime', 'metadata_csumtype', 'metadata_csumvalue', 'metadata_xattr' ]
self._fp.write('{{ "recorded" : "{0}", "for" : "vo:{1}", "entries" : [\n'.format(opts.get('curtime'), opts.get('vo', 'none')))
self.first_line = True
def _escape(self, astr):
if type(astr) != str:
return astr
return astr.replace('"', '\\"')
def write(self, data):
rpath = self._escape(data['path'][len(self._base):])
xattr = data.get('metadata_xattr', {})
csumtype = ''
checksum = ''
if 'checksum_adler32' in xattr:
# Cns_file_metadata csumtype & csumvalue columns are deprecated
# use adler32 checksum from xattr column if available in json data
csumtype = 'AD'
checksum = xattr['checksum_adler32']
elif data.get('metadata_csumtype') == 'AD':
# don't use other than adler32 checksum, because DPM < 1.10.x
# did not provide other algorithm and all tools that use these
# dumps expect either adler32 or empty checksum
csumtype = 'AD'
checksum = data['metadata_csumvalue']
if self.first_line:
self.first_line = False
else:
self._fp.write(',\n')
self._fp.write(' {')
self._fp.write(' "name": "{rpath}, "ctype": "{ctype}", "cs": "{cs}"'.format(rpath=rpath, ctype=csumtype, cs=checksum))
self._fp.write(', "size": "{metadata_filesize}", "atime": "{metadata_atime}", "mtime": "{metadata_mtime}"'.format(**data))
for k, v in sorted(xattr.items()):
self._fp.write(', "{0}": "{1}"'.format(k, self._escape(v)))
self._fp.write(' }')
def finish(self):
if not self.first_line:
self._fp.write('\n')
self._fp.write(' ],\n')
self._fp.write(' "dump_complete" : "{0}"\n'.format(datetime.datetime.isoformat(datetime.datetime.now())))
self._fp.write('}\n')
class CachedFullPath(object):
"""DPM file metadata stored in database have pointer just
to parent directory and to build whole path it is necessary
to recursively query all parent directories to the root "/".
Althought these queries are pretty cheap they are done for
each file and even with small latencies (especially with
database on dedicated server) they can take quite a time.
This class not only caches past queries, but also limit
size of cached data not to exhaust whole memory while
dumping big DPM database with a lot of files.
"""
def __init__(self, conn=None, conn_data=None, maxsize=1000000):
self._close = False
if conn != None:
self._conn = conn
elif conn_data != None:
self._conn = get_connection(conn_data, 'cns_db')
self._close = True
else:
raise Exception("CachedFullPath called without required argument conn or conn_data")
self._maxsize = maxsize
self._cache = {}
self._ntotal = 0
self._ncached = 0
self._nqueries = 0
self._ncleanup = 0
def __del__(self):
if _log:
_log.info("path lookup cache usage: total %i, cached %i, queries %i, cleanup %i", self._ntotal, self._ncached, self._nqueries, self._ncleanup)
if self._close:
self._conn.close()
def get(self, fileid):
"""Recursively get full path for given fileid"""
self._ntotal += 1
if fileid == 0:
return ""
if fileid in self._cache:
self._ncached += 1
return self._cache[fileid]
if len(self._cache) >= self._maxsize:
_log.debug("fullpath cache too big (%i), dropping cached records...", len(self._cache))
self._ncleanup += 1
del(self._cache)
self._cache = {}
sql = "SELECT parent_fileid, name FROM Cns_file_metadata WHERE fileid=%s"
cursor = self._conn.cursor()
cursor.execute(sql, (fileid, ))
res = cursor.fetchone()
cursor.close()
self._nqueries += 1
if _log.getEffectiveLevel() < logging.DEBUG:
_log.debug("query parent directory '%s': %s", sql, res)
fullname = None
if res:
(parentid, name) = res
if parentid == 0:
fullname = ''
else:
prefix = self.get(parentid)
if prefix != None:
fullname = prefix + "/" + name
else:
_log.warn("db inconsistency: could not find fileid %i (most likely the entry is orphan).", fileid)
self._cache[fileid] = fullname
return fullname
def get_multi(self, fileids):
"""Reduce impact of query latency by resolving paths for multiple
fileids. Max number of queries still corresponds to the max path
depth, but all fileids are resolved at the same time."""
self._ntotal += len(fileids)
if len(self._cache) + len(fileids) >= self._maxsize:
_log.debug("fullpath cache too big (%i+%i), dropping cached records...", len(self._cache), len(fileids))
self._ncleanup += 1
del(self._cache)
self._cache = {}
tores = set()
id2name = {}
id2parent = {}
for fileid in fileids:
if fileid in self._cache:
self._ncached += 1
else:
tores.add(str(fileid))
if len(tores) > 0:
cursor = self._conn.cursor()
while len(tores) > 0:
sql = "SELECT fileid, parent_fileid, name FROM Cns_file_metadata WHERE fileid IN ({0})".format(','.join(tores))
cursor.execute(sql)
self._nqueries += 1
tores = set()
for row in cursor:
(fileid, parentid, name) = row
if _log.getEffectiveLevel() < logging.DEBUG:
_log.debug("query parent directory '%s': %s", sql, row)
if parentid == 0:
id2name[fileid] = ''
else:
id2name[fileid] = name
id2parent[fileid] = parentid
if parentid == 0:
pass
elif parentid in self._cache:
self._ncached += 1
elif parentid not in id2parent:
tores.add(str(parentid))
cursor.close()
ret = {}
for fileid in fileids:
if fileid in self._cache:
ret[fileid] = self._cache[fileid]
else:
currid = fileid
revname = []
while True:
if currid in self._cache:
revname.append((currid, self._cache[currid]))
self._ncached += 1
break
elif currid in id2parent:
revname.append((currid, id2name[currid]))
currid = id2parent[currid]
if currid == 0:
break
else:
_log.warn("db inconsistency: could not find fileid %i (most likely the entry is orphan).", fileid)
revname = []
break
if len(revname) > 0:
revname.reverse()
for i in range(len(revname)):
currid = revname[i][0]
if currid in self._cache: continue
fullname = '/'.join([ x[1] for x in revname[:i+1] ])
self._cache[currid] = fullname
ret[fileid] = self._cache[fileid]
return ret
def filterByDisknode(replica_hostfs, replica_table = 'replica'):
where = []
for host, fss in replica_hostfs.items():
if None in fss:
where.append('{0}.host = "{1}"'.format(replica_table, host))
else:
for fs in fss:
where.append('{0}.host = "{1}" AND {0}.fs = "{2}"'.format(replica_table, host, fs))
return "({0})".format(') OR ('.join(where))
def dump_data(conn_data, formatters, filters = {}, timestamp = 0):
_log.debug("dump_data")
# mandatory columns
cols = [ 'metadata.fileid', 'metadata.parent_fileid', 'metadata.name', 'metadata.mtime' ]
# union of columns required by all formatters
ucols = []
for formatter in formatters:
for col in formatter.columns:
dbcol = col.replace('_', '.', 1)
if dbcol in cols: continue
if col in METADATA_COLUMNS + REPLICA_COLUMNS:
cols.append(dbcol)
else:
ucols.append(col)
_log.error("%s use uknown column %s", str(formatter), dbcol)
if len(ucols) > 0:
raise Exception("unknown columns: {0}".format(', '.join(ucols)))
has_replica_column = any([ col.startswith('replica.') for col in cols ])
where = []
where.append('(metadata.filemode & {0}) = {0}'.format(stat.S_IFREG)) # only files
if 'replica_pool' in filters:
where.append('replica.poolname IN ("{0}")'.format('","'.join(filters['replica_pool'])))
if 'replica_spacetoken' in filters:
st2uuid = {}
conn_dpm = get_connection(conn_data, 'dpm_db')
cursor = conn_dpm.cursor()
cursor.execute('SELECT s_token, u_token, poolname, path FROM dpm_space_reserv')
while True:
row = cursor.fetchone()
if row == None: break
s_token, u_token, poolname, path = row
st2uuid[u_token.lower()] = s_token
cursor.close()
conn_dpm.close()
replica_setnames = [ st2uuid.get(x.lower(), x) for x in filters['replica_spacetoken'] ]
where.append('replica.setname IN ("{0}")'.format('","'.join(replica_setnames)))
if 'metadata_status' in filters:
where.append('metadata.status = "{0}"'.format(filters['metadata_status']))
if 'replica_status' in filters:
where.append('replica.status = "{0}"'.format(filters['replica_status']))
if len(filters.get('replica_hostfs', [])) > 0:
where.append(filterByDisknode(filters['replica_hostfs']))
if len(filters.get('only_replica_hostfs', [])) > 0:
where.append(filterByDisknode(filters['only_replica_hostfs']))
# exclude files that have multiple replicas and one replica
# with "Available" status is on diskserver or filesystem
# that is not selected by current hostfs filter
onlyReplicaDisknodeFilter = filterByDisknode(filters['only_replica_hostfs'], replica_table='r')
excludedReplicaIds = 'SELECT r.fileid FROM Cns_file_replica r WHERE r.status = "-" AND NOT ({0})'.format(onlyReplicaDisknodeFilter)
#if filter_replica_status: # commented out - we really want only "Available" replicas
# excludedReplicaIds = '{0} AND r.status = "{1}"'.format(excludedReplicaIds, filter_replica_status)
where.append('replica.fileid NOT IN ({0})'.format(excludedReplicaIds))
colstr = ', '.join([ "{0} AS `{1}`".format(x, x.replace('.', '_', 1)) for x in cols ])
sql = 'SELECT SQL_BIG_RESULT {0} FROM Cns_file_metadata AS metadata JOIN Cns_file_replica AS replica ON metadata.fileid = replica.fileid'.format(colstr)
if len(where) > 0:
sql += ' WHERE ({0})'.format(') AND ('.join(where))
if not has_replica_column:
# don't produce duplicate replica records unless result contains column from replica table
sql += ' GROUP BY replica.fileid'
# fileid => fullpath
pathname = CachedFullPath(conn_data=conn_data)
cnt_rows = 0
cnt_files = 0
conn = get_connection(conn_data, 'cns_db', cclass=pymysql_cursors.SSDictCursor)
cursor = conn.cursor()
try:
_log.debug("query file metadata with '%s'", sql)
cursor.execute(sql)
while True:
# retreiving data in chunks dramatically increase performance
# mostly because of latency associated with each DB query
rows = cursor.fetchmany(1000)
if len(rows) == 0: break
if _log.getEffectiveLevel() < logging.DEBUG:
_log.debug("fetched %i rows", len(rows))
fileids = [ row['metadata_parent_fileid'] for row in rows ]
pathnames = pathname.get_multi(fileids)
for row in rows:
cnt_rows += 1
if _log.getEffectiveLevel() < logging.DEBUG:
_log.debug("row %i: %s", cnt_rows, str(row))
fileid = row['metadata_fileid']
parent_fileid = row['metadata_parent_fileid']
name = row['metadata_name']
mtime = row['metadata_mtime']
# NOTE: unfortunately mtime & ctime doesn't probably contain
# timestamp that you would expect, because at least DPM in legacy
# mode update both timestamps in case of checksum recalculation
# (done on first SRM download when in legacy mody). That means
# [cm]time can be set to the value that is much more recent than
# the time when file was uploaded to the DPM - more details in
# https://groups.cern.ch/group/dpm-users-forum/Lists/Archive/Flat.aspx?RootFolder=%2Fgroup%2Fdpm-users-forum%2FLists%2FArchive%2FStorage%20dumps%20%20filter%20by%20mtime
# In pure DPM DOME configuration (without legacy SRM support)
# these timestamps should not be modified unless you modify file
# content (e.g. adding checksum should not cause [cm]time update)
if timestamp != 0 and mtime > timestamp:
continue
prefix = pathnames.get(parent_fileid)
if prefix is None:
_log.error("skipping fileid %i with name '%s', unable to reconstruct path of parent fileid %i", fileid, name, parent_fileid)
continue
result_enhanced = False
metadata_xattr_parsing_required = 'metadata_xattr' in row
replica_xattr_parsing_required = 'replica_xattr' in row
fullpath = prefix + "/" + name
for formatter in formatters:
# filter by base path
if not fullpath.startswith(formatter.base):
continue
# parse metadata xattr json value into dictionary
if metadata_xattr_parsing_required:
metadata_xattr_parsing_required = False
xattr = row.get('metadata_xattr')
xattr_dict = collections.defaultdict(lambda: '')
if xattr not in [ None, '', '{}' ]:
try:
# replace '.' in dictionary key to be able to easily access
# xattr values using string.Template formatting
xattr_dict = collections.defaultdict(lambda: '', [ (k.replace('.', '_'), v) for k, v in json.loads(xattr).items() ])
except Exception as e:
_log.error("unable to parse metadata_xattr for fileid %i: %s", fileid, str(xattr))
row['metadata_xattr'] = xattr_dict
# parse replica xattr json value into dictionary
if replica_xattr_parsing_required:
replica_xattr_parsing_required = False
xattr = row.get('replica_xattr')
xattr_dict = collections.defaultdict(lambda: '')
if xattr not in [ None, '', '{}' ]:
try:
# replace '.' in dictionary key to be able to easily access
# xattr values using string.Template formatting
xattr_dict = collections.defaultdict(lambda: '', [ (k.replace('.', '_'), v) for k, v in json.loads(xattr).items() ])
except Exception as e:
_log.error("unable to parse replica_xattr for fileid %i: %s", fileid, str(xattr))
row['replica_xattr'] = xattr_dict
# add necessary metadata and formatting strings in the 'row' dictionary
if not result_enhanced:
result_enhanced = True
#row['bpath'] = prefix
row['path'] = fullpath
row['tab'] = '\t'
row['nl'] = '\n'
row['comma'] = ','
# relative path with respect to formatter base
row['rpath'] = fullpath[len(formatter.base):]
formatter.write(row)
# file info processed at least by on formatter
if result_enhanced:
cnt_files += 1
if cnt_rows % 1000000 == 0:
_log.debug("processed %i records (%i files formatted)", cnt_rows, cnt_files)
except Exception as e:
# query in progress that use SSDictCursor can be killed only by terminating DB connection
# (closing / deleting cursor lead to retreival of all selected entries from DB)
conn.close()
del(pathname)
raise
# Close cursor and connections
cursor.close()
conn.close()
del(pathname)
_log.info("processed %i records (%i files)", cnt_rows, cnt_files)
def publish_https(filename, location, cert, key):
_log.debug("publish DPM dump %s to %s", filename, location)
try:
import pycurl
except ImportError as e:
raise Exception("unable to import pycurl module (install python-pycurl package): {0}".format(str(e)))
c = pycurl.Curl()
if _log.getEffectiveLevel() < logging.DEBUG:
c.setopt(pycurl.VERBOSE, True)
c.setopt(c.SSLCERT, cert)
c.setopt(c.SSLKEY, key)
c.setopt(c.SSL_VERIFYPEER, 0)
c.setopt(c.SSL_VERIFYHOST, 2)
c.setopt(c.FOLLOWLOCATION, 1)
# check path exists
lurl = urlparse.urlparse(location)
burl = lurl._replace(path=os.path.dirname(lurl.path))
c.setopt(c.URL, burl.geturl())
c.setopt(c.NOBODY, True)
c.setopt(c.CUSTOMREQUEST, "HEAD")
c.perform()
if (c.getinfo(c.HTTP_CODE) != 200):
raise Exception("path {0} not found".format(burl.geturl()))
# delete existing file
c.setopt(c.URL, location)
c.setopt(c.NOBODY, True)
c.setopt(c.CUSTOMREQUEST, "DELETE")
c.perform()
if (c.getinfo(c.HTTP_CODE) != 204 and c.getinfo(c.HTTP_CODE) != 404):
raise Exception("can't delete {0}".format(location))
# put the new file
c.setopt(c.CUSTOMREQUEST, "PUT")
c.setopt(c.NOBODY, False)
# suppress the response body
c.setopt(c.WRITEFUNCTION, lambda x: None)
c.setopt(pycurl.UPLOAD, 1)
c.setopt(pycurl.READFUNCTION, open(filename, 'rb').read)
c.setopt(pycurl.INFILESIZE, os.path.getsize(filename))
c.perform()
if (c.getinfo(c.HTTP_CODE) == 201):
_log.info("uploaded %s", location)
else:
_log.error("upload %s error: %i", location, c.getinfo(c.HTTP_CODE))
c.close()
def publish_xrootd(datalist, cert, key):
_log.debug("publish %i DPM dumps with xrootd protocol", len(datalist))
# set environment for XRootD transfers
# XRD_* env variables must be set befor importing XRootD module
if _log.getEffectiveLevel() < logging.DEBUG:
os.putenv('XRD_LOGLEVEL', 'Dump')
#os.putenv('XRD_LOGFILE', '/tmp/xrootd.debug')
os.putenv('XRD_LOGMASK', 'All')
os.putenv('XRD_CONNECTIONWINDOW', '10') # by default connection timeouts after 300s
#os.putenv('XRD_REQUESTTIMEOUT', '10') # can be set individually for each operation
# set X509_* env variable used by XRootD authentication
if os.getenv('X509_USER_CERT', cert) != cert:
_log.info("overwriting X509_USER_CERT (%s) with %s", os.getenv('X509_USER_CERT'), cert)
os.putenv('X509_USER_CERT', cert)
if os.getenv('X509_USER_KEY', key) != key:
_log.info("overwriting X509_USER_KEY (%s) with %s", os.getenv('X509_USER_KEY'), key)
os.putenv('X509_USER_KEY', key)
try:
import XRootD.client
except ImportError as e:
raise Exception("unable to import XRootD module (install python2-xrootd or python34-xrootd package): {0}".format(str(e)))
process = XRootD.client.CopyProcess()
for filename, location in datalist:
process.add_job(filename, location, force=True)
status = process.prepare()
if not status.ok:
raise Exception("unable to prepare XRootD transfer ({0})".format(str(status)))
status, details = process.run()
if not status.ok:
succeeded = 0
failed = 0
for i in range(len(datalist)):
filename, location = datalist[i]
tstatus = None
if len(details) > i:
tstatus = details[i].get('status')
if tstatus == None:
failed += 1
_log.error("unknown transfer status from %s to %s", filename, location)
else:
if not tstatus.ok:
failed += 1
_log.error("transfer status from %s to %s: %s", filename, location, str(tstatus))
else:
succeeded += 1
_log.debug("transfer succeeded from %s to %s", filename, location)
raise Exception("xrootd transfers failed (succeeded: %i, failed: %i)", succeeded, failed)
#=====================================================================
# main
#=====================================================================
if __name__ == '__main__':
# basic logging configuration
streamHandler = logging.StreamHandler(sys.stderr)
streamHandler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s](%(module)s:%(lineno)d) %(message)s", "%d %b %H:%M:%S"))
_log.addHandler(streamHandler)
_log.setLevel(logging.WARN)
# parse options from command line
def opt_set_loglevel(option, opt, value, parser):
loglevel = option.default
if value != None:
loglevel = int({
'CRITICAL': logging.CRITICAL,
'DEBUG': logging.DEBUG,
'ERROR': logging.ERROR,
'FATAL': logging.FATAL,
'INFO': logging.INFO,
'NOTSET': logging.NOTSET,
'WARN': logging.WARN,
'WARNING': logging.WARNING,
}.get(value, value))
_log.setLevel(loglevel)
setattr(parser.values, option.dest, loglevel)
class IndentedHelpFormatterWithEpilogExamples(optparse.IndentedHelpFormatter):
def format_epilog(self, epilog):
import textwrap
if not epilog:
return ""
text_width = self.width - self.current_indent
indent = " "*self.current_indent
sections = []
if type(epilog) == str:
sections.append(textwrap.fill(text, text_width, initial_indent=indent, subsequent_indent=indent))
else:
example_section = False
for epilog_section in epilog:
if not epilog_section.startswith('EXAMPLE '):
sections.append(textwrap.fill(epilog_section, text_width, initial_indent=indent, subsequent_indent=indent))
sections.append('')
example_section = False
else:
if not example_section:
sections.append('Examples:')
example_section = True
sections.append(" {0}{1}".format(indent, epilog_section[len('EXAMPLE '):].replace("\n", "\n{0}".format(indent))))
return "\n{0}\n".format("\n".join(sections))
# default config values
guess_nsconfig = guess_config_files()
# command line arguments
usage = "usage: %prog [options]"
epilog = []
epilog.append(
"WARNING: be aware that \"age\" and \"date\" configuration option doesn't produce expected results for DPM in legacy (SRM) mode. "
"File modification time (ctime & mtime) is not handled consistently in DPM implementation, "
"e.g. both timestamps are updated on first file download during checksum calculation.")
epilog.append(
"DUMPPATH dpm_path[,output_path[,key=val]] must contain lfn path that is used as a filter "
"to retrieve only files within specific subdirectory in DPM namespace. "
"Optionally you can specify output_path that points to the file or http URL "
"where to store output data, otherwise everything is printed to the standard output. "
"Output file is automatically compressed with gzip, bzip or lzma according "
"suffix (.gz, .bz2, .lzma). Each formater can have their own specific "
"configuratin options that can be passed as additional key=value arguments.")
epilog.append(
"HOSTFS diskserver_fqdn[:/filesystem] specifies filter for replica on given diskserver "
"and optionally filesystem (e.g. diskserver1.example.com or diskserver1.example.com:/mnt/fs1)")
epilog.append(
"POOL name of existing DPM pool")
epilog.append(
"SPACETOKEN name of existing spacetoken or its uuid")
epilog.append(
"Plain text formatter (txt) can use customized header, footer and per file output format. "
"Standard python str.format function is called and fomatting string can use all features provides by this function. "
"List of available attributes for header/footer formatting can be found in PlaintextFormatter.__init__, PlaintextFormatter.finish method. "
"File record formatting can use 'path', 'rpath' (relative path with respect to formatter base path) "
"and all available columns from file metadata and file replica table ({0}). "
"Columns 'metadata_xattr' and 'replica_xattr' contains dictionary, "
"to access \"xattr\" data you have to use e.g. {{metadata_xattr[checksum_adler32]}} "
"(all '.' characters in xattr dictionary keys are replaced with '_').".format(', '.join(METADATA_COLUMNS + REPLICA_COLUMNS)))
epilog.append("EXAMPLE # dump everything in stdout in plain text format")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-path=")
epilog.append("EXAMPLE # dump everything in compressed file in plain text format")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-path=,/tmp/dump.txt.gz")
epilog.append("EXAMPLE # dump everything in compressed file in plain json format")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --json-path=,/tmp/dump.json.bz2,vo=vo_name")
epilog.append("EXAMPLE # dump everything in compressed file in plain xml format")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --xml-path=,/tmp/dump.xml.lzma,vo=vo_name")
epilog.append("EXAMPLE # dump several directories in different output files in different formats")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-path=/dpm/fqdn/home/atlas,/tmp/dump.atlas.txt --json-path=/dpm/fqdn/home/cms,/tmp/dump.cms.json,vo=cms")
epilog.append("EXAMPLE # upload dump output to the storage directory using WebDAV")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-path=/dpm/fqdn/home/atlas,davs://dpmheadnode.fqdn:443/dpm/fqdn/home/atlas/dump.txt")
epilog.append("EXAMPLE # upload dump output to the storage directory using XRootD")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-path=/dpm/fqdn/home/atlas,root://dpmheadnode.fqdn:1094//dpm/fqdn/home/atlas/dump.txt")
epilog.append("EXAMPLE # plain text output format is configurable (header, line, footer)")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-header='# command: {command}{nl}# ...{nl}' --txt-format='{path}{nl}' --txt-footer='# records: {records}{nl}# ...{nl}# finished OK{nl}' --txt-path=/dpm/fqdn/home,/tmp/dump1.txt")
epilog.append("EXAMPLE # each plain text output can also have individualy configured (header, line, footer)")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-path='/dpm/fqdn/home,/tmp/dump2.txt,header=# header{nl},format={path};{metadata_fileid}{tab}{metadata_parent_fileid}{comma}{metadata_mtime}{nl},footer=# footer{nl}'")
epilog.append("EXAMPLE # enable logging with given log level to the log file rotated at given size")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --log-level=DEBUG --log-file=/var/log/dpm-dump.log --log-size=1048576 --log-backup=2 --txt-path=")
epilog.append("EXAMPLE # dump data from DPM pools (mypool1, mypool2)")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --filter-replica-pool=mypool1 --filter-replica-pool=mypool2 --txt-path=")
epilog.append("EXAMPLE # dump data from DPM spacetokens (dteam, ops)")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --filter-replica-spacetoken=dteam --filter-replica-spacetoken=ops --txt-path=")
epilog.append("EXAMPLE # dump data from one diskserver plus one filesystem on the other diskserver")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --filter-replica-hostfs=dpmpoolxx.fqdn --filter-replica-hostfs=dpmpoolyy.fqdn:/mnt/fs1 --txt-path=")
epilog.append("EXAMPLE # dump pfn name from a diskserver plus one filesystem on different diskserver")
epilog.append("EXAMPLE # with no replicas elsewhere (to declare temporarily unavailable Rucio files)")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --filter-only-replica-hostfs=dpmpoolxx.fqdn --filter-only-replica-hostfs=dpmpoolyy.fqdn:/mnt/fs1 --txt-format='root://dpmheadnode.fqdn:1094/{path}{nl}' --txt-path=/dpm/fqdn/home/vo,file:///tmp/dump.vo-temporarily-unavailable.txt")
epilog.append("EXAMPLE # dump all pfn and replicaid stored only on one diskserver including files")
epilog.append("EXAMPLE # with special file/replicas status (useful to cleanup DPM namespace after")
epilog.append("EXAMPLE # you e.g. loose completely data on one diskserver)")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --filter-replica-hostfs=dpmpoolxx.fqdn --txt-format=\"dmlite-shell -e 'replicadel {path} {replica_rowid}' # {replica_host}:{replica_fs}{nl}\" --txt-path=,file:///tmp/dump.dpmpoolxx.txt")
epilog.append("EXAMPLE ")
epilog.append("EXAMPLE # ATLAS storage dumps")
epilog.append("EXAMPLE # ===================")
epilog.append("EXAMPLE # storage dump for PRAGUELCG2 data and scratch disk")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --log-level=INFO --log-file=/var/log/dpm-dump.log --txt-path=/dpm/farm.particle.cz/home/atlas/atlasdatadisk/rucio/,davs://golias100.farm.particle.cz:443/dpm/farm.particle.cz/home/atlas/atlasdatadisk/dumps/dump_20181228 --txt-path=/dpm/farm.particle.cz/home/atlas/atlasscratchdisk/rucio/,davs://golias100.farm.particle.cz:443/dpm/farm.particle.cz/home/atlas/atlasscratchdisk/dumps/dump_20181228")
epilog.append("EXAMPLE # legacy (deprecated) dump for PRAGUELCG2 data disk")
epilog.append("EXAMPLE python /usr/bin/dpm-dump -t /tmp/ATLASDATADISK-dump_20181228 -p /dpm/farm.particle.cz/home/atlas/atlasdatadisk/rucio -a -1")
epilog.append("EXAMPLE # temporary unavailable PRAGUELCG2 pfn dump for rucio, one diskserver and one specific filesystem from second diskserver")
epilog.append("EXAMPLE python /usr/bin/dpm-dump --log-level=INFO --log-file=/var/log/dpm-dump.log --txt-path=/dpm/farm.particle.cz/home/atlas --txt-format='root://golias100.farm.particle.cz/{path}{nl}' --filter-only-replica-hostfs=dpmpool1.farm.particle.cz --filter-only-replica-hostfs=dpmpool2.farm.particle.cz:/mnt/fs7")
description = "Dumps the content of DPM storage element into a file that can be used for consistency checks (e.g. with Rucio database)."
parser = optparse.OptionParser(usage=usage, description=description, version="%prog", epilog=epilog, formatter=IndentedHelpFormatterWithEpilogExamples())
parser.add_option("-v", "--verbose", dest="loglevel", action="callback", callback=opt_set_loglevel, default=logging.DEBUG, help="set log level to DEBUG")
parser.add_option("-q", "--quiet", dest="loglevel", action="callback", callback=opt_set_loglevel, default=logging.ERROR, help="set log level to ERROR")
parser.add_option("--log-level", dest="loglevel", action="callback", callback=opt_set_loglevel, type="string", help="set log level, default: %default")
parser.add_option("--log-file", dest="logfile", metavar="FILE", help="set log file (default: %default)")
parser.add_option("--log-size", dest="logsize", type="int", default=10*1024*1024, help="maximum size of log file, default: %default")
parser.add_option("--log-backup", dest="logbackup", type="int", default=4, help="number of log backup files, default: %default")
# db command line options
parser.add_option("-c", "--nsconfig", help="NSCONFIG file with sql connection info. Default: {0}".format(guess_nsconfig), default=guess_nsconfig)
parser.add_option('--dbhost', dest='dbhost', default=None, help="database host (overwrite host from NSCONFIG)")
parser.add_option('--dbuser', dest='dbuser', default=None, help="database user (overwrite user from NSCONFIG)")
parser.add_option('--dbpwd', dest='dbpwd', default=None, help="database password (overwrite password from NSCONFIG)")
parser.add_option('--dbname', dest='dbname', default=None, help="database name (overwrite cns database from NSCONFIG)")
parser.add_option('--dbdpm', dest='dbdpm', default=None, help="DPM database name overwrite dpm database from NSCONFIG)")
# legacy command line options
parser.add_option("-x", "--xml", help="create output file in XML format (deprecated).", metavar="XMLFILE")
parser.add_option("-j", "--json", help="create output file in JSON format (deprecated).", metavar="JSONFILE")
parser.add_option("-t", "--txt", help="create output file in TXT format (deprecated).", metavar="TXTFILE")
parser.add_option("-p", "--path", help="dump only files within this DPNS path (deprecated), default: %default", default="/", metavar="PATH")
parser.add_option("-a", "--age", help="dump only files older than AGE days (warning), default: %default", metavar="AGE")
parser.add_option("-D", "--date", help="dump only files up to the day before date, format YYYYMMDD (warning)", metavar="DATE")
parser.add_option("-V", "--vo", help="VO information used by JSON and XML formatters, default: %default", default="none")
parser.add_option("-F", "--filter", help="filter dpm diskserver or diskserver filesystem, default: %default", default=None)
# new command line options
parser.add_option("--xml-path", action="append", help="dump files within this DPNS path in XML format to stdout, local file or upload to DPM", default=[], metavar="DUMPPATH")
parser.add_option("--json-path", action="append", help="dump files within this DPNS path in JSON format to stdout, local file or upload to DPM", default=[], metavar="DUMPPATH")
parser.add_option("--txt-path", action="append", help="dump files within this DPNS path in TXT format to stdout, local file or upload to DPM", default=[], metavar="DUMPPATH")
parser.add_option("--txt-header", help="TXT output file header format, default: %default", default=None, metavar="FORMAT")
parser.add_option("--txt-footer", help="TXT output file footer format, default: %default", default=None, metavar="FORMAT")
parser.add_option("--txt-format", help="TXT output file line format, default: %default", default="{rpath}{nl}", metavar="FORMAT")
parser.add_option("--filter-metadata-status", help="filter file metadata statuses (None, Online, Migrated), default: %default", default="Online")
parser.add_option("--filter-replica-status", help="filter file replica statuses (None, Available, BeingPopulated, ToBeDeleted), default: %default", default="Available")
parser.add_option("--filter-replica-pool", action="append", help="", default=[], metavar="POOL")
parser.add_option("--filter-replica-spacetoken", action="append", help="", default=[], metavar="SPACETOKEN")
parser.add_option("--filter-replica-hostfs", action="append", help="retrive data for specific diskserver and optionally filesystem", default=[], metavar="HOSTFS")
parser.add_option("--filter-only-replica-hostfs", action="append", help="retrive data for replicas on given diskserver and optionally filesystem with no other replica on other diskservers and/or filesystems", default=[], metavar="HOSTFS")
parser.add_option("--cert", help="path to host certificate used for data uploads, default: %default", default="/etc/grid-security/hostcert.pem")
parser.add_option("--key", help="path to host key used for data uploads, default: %default", default="/etc/grid-security/hostkey.pem")
parser.add_option("--tmp", help="custom directory for temporary dump files", default=None, metavar="PATH")
parser.add_option("--keep-failed", action="store_true", help="keep temporary files in case output transfer fails, default: %default", default=False)
(options, args) = parser.parse_args()
if options.logfile == '-':
_log.removeHandler(streamHandler)
streamHandler = logging.StreamHandler(sys.stdout)
streamHandler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s](%(module)s:%(lineno)d) %(message)s", "%d %b %H:%M:%S"))
_log.addHandler(streamHandler)
elif options.logfile != None and options.logfile != '':
#fileHandler = logging.handlers.TimedRotatingFileHandler(options.logfile, 'midnight', 1, 4)
fileHandler = logging.handlers.RotatingFileHandler(options.logfile, maxBytes=options.logsize, backupCount=options.logbackup)
fileHandler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s](%(module)s:%(lineno)d) %(message)s", "%d %b %H:%M:%S"))
_log.addHandler(fileHandler)
_log.removeHandler(streamHandler)
_log.info("command: %s", " ".join(sys.argv))
_log.info("script: %s", os.path.abspath(inspect.getfile(inspect.currentframe())))
_log.info("version: %s", __version__)
_log.info("sha256: %s", hashlib.sha256(open(__file__).read().encode('utf-8')).hexdigest())
_log.info("python: %s", str(sys.version_info))
_log.info("user: %s@%s", getpass.getuser(), socket.gethostname())
_log.info("system load: %s", str(os.getloadavg()))
# validate command line options
timestamp = 0
if options.date != None:
timestamp = int(time.mktime(datetime.datetime.strptime(str(options.date), "%Y%m%d").timetuple()))
elif options.age != None:
now = int(time.time())
age = int(options.age)
timestamp = now-86400*age
if timestamp != 0 and timestamp < int(time.time()):
_log.info("filtering older data according ctime or mtime doesn't produce expected results for DPM in legacy (SRM) mode")
# database connection data
conn_data = {}
if options.nsconfig not in [ None, '' ]: conn_data = get_conn_data(options.nsconfig)
if options.dbhost != None: conn_data['host'] = options.dbhost
if options.dbuser != None: conn_data['user'] = options.dbuser
if options.dbpwd != None: conn_data['pass'] = options.dbpwd
if options.dbname != None: conn_data['cns_db'] = options.dbname
if options.dbdpm != None: conn_data['dpm_db'] = options.dbdpm
if not ('host' in conn_data and 'user' in conn_data and 'pass' in conn_data and 'cns_db' in conn_data and 'dpm_db' in conn_data):
_log.error("incomplete database connection data %s, use either nsconfig or db* command line options", conn_data)
sys.exit(1)
filters = {}
# filters for pool and spacetokens
if len(options.filter_replica_pool) > 0:
filters['replica_pool'] = options.filter_replica_pool
if len(options.filter_replica_spacetoken) > 0:
filters['replica_spacetoken'] = options.filter_replica_spacetoken
# filters for disknodes and filesystems
if options.filter != None:
host, fs = list(options.filter.split(':', 1)) if options.filter.find(':') != -1 else (options.filter, None)
filters['replica_hostfs'].setdefault(host.lower(), []).append(fs)
if len(options.filter_replica_hostfs) > 0:
filters['replica_hostfs'] = {}
for hostfs in options.filter_replica_hostfs:
host, fs = list(hostfs.split(':', 1)) if hostfs.find(':') != -1 else (hostfs, None)
filters['replica_hostfs'].setdefault(host.lower(), []).append(fs)
if len(options.filter_only_replica_hostfs) > 0:
filters['only_replica_hostfs'] = {}
for hostfs in options.filter_only_replica_hostfs:
host, fs = list(hostfs.split(':', 1)) if hostfs.find(':') != -1 else (hostfs, None)
filters['only_replica_hostfs'].setdefault(host.lower(), []).append(fs)
# file metadata status filter (pydmlite.FileStatus)
if options.filter_metadata_status.lower() == 'none':
pass # no filtering on metadata file status
elif options.filter_metadata_status.lower() in ('o', 'online', '-'):
filters['metadata_status'] = '-' # pydmlite.FileStatus.kOnline
elif options.filter_metadata_status.lower() in ('m', 'migrated'):
filters['metadata_status'] = 'm' # pydmlite.FileStatus.kMigrated
else:
_log.error("invalid file metadata status \"%s\"", options.filter_metadata_status)
sys.exit(1)
# file replica status filter (pydmlite.ReplicaStatus)
if options.filter_replica_status.lower() == 'none':
pass # no filtering on replica status
elif options.filter_replica_status.lower() in ('a', 'available', '-'):
filters['replica_status'] = '-' # pydmlite.ReplicaStatus.kAvailable
elif options.filter_replica_status.lower() in ('p', 'beingpopulated'):
filters['replica_status'] = 'p' # pydmlite.ReplicaStatus.kBeingPopulated
elif options.filter_replica_status.lower() in ('d', 'tobedeleted'):
filters['replica_status'] = 'd' # pydmlite.ReplicaStatus.kToBeDeleted
else:
_log.error("invalid file replica status \"%s\"", options.filter_replica_status)
sys.exit(1)
# Configure requested dumps
oconfig = []
curtime = datetime.datetime.isoformat(datetime.datetime.now())
for format in [ 'xml', 'json', 'txt' ]:
# legacy command line options
if getattr(options, format) != None:
opts = { 'curtime': curtime, 'vo': options.vo }
output = getattr(options, format)
if output.startswith('/'):
opts['output'] = "file://{0}".format(output)
else:
opts['output'] = "file://{0}".format(os.path.realpath(output))
oconfig.append((format, options.path, opts))
# new command line options
for params in getattr(options, "{0}_path".format(format), []):
path = None
opts = { 'curtime': curtime }
for pos, param in enumerate(params.split(',')):
if pos == 0: path = param
elif pos == 1: opts['output'] = param
else:
# formatter specific key=value options
key, val = param.split('=', 1)
opts[key] = val
if opts.get('output', '') == '':
opts['output'] = 'stdout://'
elif opts['output'].startswith('/'):
opts['output'] = 'file://{0}'.format(opts['output'])
elif not opts['output'].startswith('file://'):
opts['output'] = "file://{0}".format(os.path.realpath(opts['output']))
oconfig.append((format, path, opts))
if len(oconfig) == 0:
_log.error("no output defined")
sys.exit(1)
fclazz = {
'xml': XmlFormatter,
'json': JsonFormatter,
'txt': PlaintextFormatter,
}
formatters = []
outputs = []
for format, path, opts in oconfig:
_log.debug("requested dump: format=%s, path=%s, opts=%s", format, path, opts)
if path != '' and not path.endswith('/'):
path = "{0}/".format(path)
ourl = urlparse.urlparse(opts.get('output', 'stdout://'))
if ourl.scheme not in [ 'stdout', 'file', 'davs', 'https', 'root' ]:
_log.warn("skipping output with unsupported protocol scheme: %s", ourl.geturl())
continue
# open output file
fp = None
ofile = None
if ourl.scheme == 'stdout':
fp = sys.stdout
else:
if ourl.scheme == 'file':
ofile = ourl.path
else:
bfile = os.path.basename(ourl.path)
fp, ofile = tempfile.mkstemp(suffix=".{0}".format(bfile), prefix='dpm-dump.', dir=options.tmp)
# python < 3.3 doesn't support bz2 opened from file object - always use directly filename
os.close(fp)
tempfiles.append(ofile)
if ofile.endswith('.gz'):
import gzip
fp = gzip.open(ofile, 'wb')
elif ofile.endswith('.bz2'):
import bz2
fp = bz2.BZ2File(ofile, 'wb')
elif ofile.endswith('.lzma'):
import lzma
fp = lzma.LZMAFile(ofile, "wb")
else:
fp = open(ofile, 'wb')
# allow to write strings (instead of bytes) also in python3
if sys.version_info >= (3,):
import io
fp = io.TextIOWrapper(fp)
# parse command line options for given output format
for k in list(options.__dict__.keys()):
prefix = "{0}_".format(format)
if not k.startswith(prefix): continue
key = k[len(prefix):]
if key in opts: continue
opts[key] = getattr(options, k)
formatter = fclazz[format](fp, path, opts)
formatters.append(formatter)
outputs.append((formatter, ourl, ofile, fp))
_log.info("dump data from host=%s, database=%s (%s)", conn_data['host'], conn_data['cns_db'], conn_data['dpm_db'])
dump_start = time.time()
dump_data(conn_data, formatters, filters=filters, timestamp=timestamp)
dump_end = time.time()
_log.info("all database entries processed")
published_ok = True
xroot_uploads = []
for formatter, ourl, ofile, fp in outputs:
formatter.finish()
if ourl.scheme not in [ 'stdout' ]:
# close all files (including temporary used later for davs/root upload)
fp.close()
if ourl.scheme in [ 'stdout', 'file' ]:
pass
elif ourl.scheme in [ 'davs', 'https' ]:
try:
location = ourl.geturl()
if ourl.scheme == 'davs':
location = ourl._replace(scheme='https').geturl()
publish_https(ofile, location, options.cert, options.key)
except Exception as e:
_log.error("unable to upload %s to %s: %s", ofile, ourl.geturl(), str(e))
published_ok = False
if options.keep_failed and ofile in tempfiles:
_log.info("keep %s for manual upload (location: %s)", ofile, ourl.geturl())
tempfiles.remove(ofile)
elif ourl.scheme in [ 'root' ]:
# XRootD can deal with parallel uploads
xroot_uploads.append((ofile, ourl.geturl()))
else:
_log.info("unknow output scheme '%s' for %s", ourl.scheme, ourl.geturl())
# XRootD parallel upload
if len(xroot_uploads) > 0:
try:
publish_xrootd(xroot_uploads, options.cert, options.key)
except Exception as e:
_log.error("unable to upload files with xrootd protocol: %s", str(e))
published_ok = False
if options.keep_failed:
for ofile, location in xroot_uploads:
if ofile in tempfiles:
_log.info("keep %s for manual upload (location: %s)", ofile, location)
tempfiles.remove(ofile)
_log.info("done (dump: %s, upload: %s)", int(dump_end-dump_start), int(time.time()-dump_end))
if not published_ok:
sys.exit(1)