Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dpm-dump 57.87 KiB
#!/usr/bin/env python2
#
# Script for extracting file information from dpm database and converting
# the result into text, json or xml storage dump
#
# Erming Pei, 2009/11/13
# Tomas Kouba, 2012/11/16
# Dennis van Dok, 2015/07/03
# Alessandra Forti, 2015/10/14, 2015/11/18
# Eygene Ryabinkin, 2016
# Georgios Bitzes + Fabrizio Furano, 2016
# Petr Vokac, 2018/12/31
#
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division

import sys,os
import string
import datetime
import time
import collections
import logging, logging.handlers
import re
import stat
import socket
import hashlib
import tempfile
import optparse
import getpass
import inspect
import atexit

try: import simplejson as json
except ImportError: import json

# compatibility with python 2 and 3
try: import urllib.parse as urlparse
except ImportError: import urlparse
try: import http.client as httplib
except ImportError: import httplib

# compatibility for existing SLC6, CentOS7, CentOS8 packages
try:
    import pymysql
    import pymysql.cursors as pymysql_cursors
except ImportError:
    import MySQLdb as pymysql
    import MySQLdb.cursors as pymysql_cursors

__version__ = '0.1.1'

_log = logging.getLogger('DPMDUMP')

DEFAULT_CNS_DB = 'cns_db'
DEFAULT_DPM_DB = 'dpm_db'
# We could get column names directy from DB,
# but DPM schema is stable to just use predefined constants
# SHOW columns FROM Cns_file_metadata
METADATA_COLUMNS = [
    'metadata_rowid', 'metadata_fileid', 'metadata_parent_fileid',
    'metadata_guid', 'metadata_name', 'metadata_filemode',
    'metadata_nlink', 'metadata_owner_uid', 'metadata_gid',
    'metadata_filesize', 'metadata_atime', 'metadata_mtime',
    'metadata_ctime', 'metadata_fileclass', 'metadata_status',
    'metadata_csumtype', 'metadata_csumvalue', 'metadata_acl',
    'metadata_xattr',
]
# SHOW columns FROM Cns_file_replica
REPLICA_COLUMNS = [
    'replica_rowid', 'replica_fileid', 'replica_nbaccesses',
    'replica_atime', 'replica_ptime', 'replica_status',
    'replica_f_type', 'replica_poolname', 'replica_host',
    'replica_fs', 'replica_sfn', 'replica_ctime', 'replica_ltime',
    'replica_r_type', 'replica_setname', 'replica_xattr',
]
# global list of temporary files used by exit handler
tempfiles = []



@atexit.register
def temp_files_cleanup():
    global tempfiles
    for filename in tempfiles:
        if not os.path.exists(filename): continue
        os.unlink(filename)



def guess_config_files():
    """ Guesses the location of DPM namespace configuration file """
    try:
        possible_nsconfigs = ['/opt/lcg/etc/NSCONFIG', '/usr/etc/NSCONFIG']
        if 'LCG_LOCATION' in os.environ:
            possible_nsconfigs.insert(0, os.environ['LCG_LOCATION'].rstrip('/') + '/etc/NSCONFIG')

        for f in possible_nsconfigs:
            if os.path.exists(f):
                return f

    except Exception as e:
        _log.warn("failed to guess DB config file location: %s", str(e))

    return None



def get_conn_data(nsconfig):
    """ Returns connection data from NSCONFIG"""
    retval = {}

    _log.debug("getting connection info from %s", nsconfig)
    try:
        nsconfig_line = open(nsconfig).readline().strip()
    except Exception as e:
        _log.error("Cannot open DPM config file %s: %s", nsconfig, str(e))
        sys.exit(-1)

    nsre = re.compile(r"(.*?)/(.*?)@([^/]*)(?:/(.*))?")
    m = nsre.match(nsconfig_line)
    if m == None:
        _log.error("Bad line in DPM config '%s', doesn't match re '%s'", nsconfig, nsre)
        sys.exit(-1)
    retval['user'] = m.group(1)
    retval['pass'] = m.group(2)
    retval['host'] = m.group(3)
    if m.group(4):
        retval['cns_db'] = m.group(4)
    else:
        retval['cns_db'] = DEFAULT_CNS_DB
    retval['dpm_db'] = DEFAULT_DPM_DB

    _log.debug("database connection: host=%s, user=%s, cns_db=%s, dpm_db=%s", retval['host'], retval['user'], retval['cns_db'], retval['dpm_db'])

    return retval



def get_connection(conn_data, db, cclass=pymysql_cursors.Cursor):
    return pymysql.connect(
            host=conn_data['host'], user=conn_data['user'],
            passwd=conn_data['pass'], db=conn_data[db],
            cursorclass=cclass)



class BaseFormatter(object):
    """ Interface for all formatters """

    def __init__(self, fp, base, opts):
        """ Initializes formatter """
        self._fp = fp
        self._base = base
        self._opts = opts
        self._columns = []

    def __str__(self):
        return "{0}[{1}]".format(self.__class__.__name__, self._base)

    def write(self, data):
        """ Writes single record """
        raise NotImplementedError

    def finish(self):
        """ Finalize formatter """
        raise NotImplementedError

    @property
    def base(self):
        """ Base path used to construct relative paths {rpath} """
        return self._base

    @property
    def columns(self):
        """ Database columns used by specific formatter """
        return self._columns


class PlaintextFormatter(BaseFormatter):
    def __init__(self, fp, base, opts):
        super(PlaintextFormatter, self).__init__(fp, base, opts)
        self._header = opts.get('header')
        self._footer = opts.get('footer')
        self._format = opts.get('format', '{rpath}{nl}')

        self._records = 0
        self._sumsize = 0

        # parse database columns necessary to format record data
        self._columns = []
        fmt = string.Formatter()
        for literal_text, field_name, format_spec, conversion in fmt.parse(self._format):
            if field_name in self._columns: continue
            if field_name.startswith('metadata_'):
                self._columns.append(field_name)
            if field_name.startswith('replica_'):
                self._columns.append(field_name)

        if self._header == None:
            return

        data = {
            'version': __version__,
            'sysversion': str(sys.version_info),
            'command': " ".join(sys.argv),
            'script': os.path.abspath(inspect.getfile(inspect.currentframe())),
            'sha256': hashlib.sha256(open(__file__).read().encode('utf-8')).hexdigest(),
            'user': getpass.getuser(),
            'host': socket.gethostname(),
            'filename': getattr(self._fp, 'name', '<unknown>'),
            'base': base,
            'vo': opts.get('vo', ''),
            'curtime': opts.get('curtime'),
            'timestamp': int(time.time()),
            'datetime': datetime.datetime.now(),
            'time': time.ctime(),
            'format': self._format,
            'nl': '\n', # make it easy to specify newline on command line
            'tab': '\t', # make it easy to specify tabulator on command line
            'comma': ',', # make it easy to specify ',' on command line
        }

        #if _log.getEffectiveLevel() < logging.DEBUG:
        #    _log.debug("{0} {1}".format(self._header, str(data)))
        #    _log.debug(self._header.replace('{nl}', ';').format(**data))

        self._fp.write(self._header.format(**data))

    def write(self, data):
        self._records += 1
        self._sumsize += data.get('metadata_filesize', 0)

        #if _log.getEffectiveLevel() < logging.DEBUG:
        #    _log.debug("{0} {1}".format(self._format, str(data)))
        #    _log.debug(self._format.replace('{nl}', ';').format(**data))

        self._fp.write(self._format.format(**data))

    def finish(self):
        if self._footer == None:
            return

        data = {
            'timestamp': int(time.time()),
            'datetime': datetime.datetime.now(),
            'time': time.ctime(),
            'records': self._records,
            'sumsize': self._sumsize,
            'nl': '\n', # make it easy to specify newline on command line
            'tab': '\t', # make it easy to specify tabulator on command line
            'comma': ',', # make it easy to specify ',' on command line
        }

        #if _log.getEffectiveLevel() < logging.DEBUG:
        #    _log.debug("{0} {1}".format(self._footer, str(data)))
        #    _log.debug(self._footer.replace('{nl}', ';').format(**data))

        self._fp.write(self._footer.format(**data))


class XmlFormatter(BaseFormatter):
    def __init__(self, fp, base, opts):
        super(XmlFormatter, self).__init__(fp, base, opts)
        self._columns = [ 'metadata_filesize', 'metadata_atime', 'metadata_mtime', 'metadata_csumtype', 'metadata_csumvalue', 'metadata_xattr' ]
        self._fp.write('<?xml version="1.0" encoding="iso-8859-1"?>\n')
        self._fp.write('<dump recorded="{0}"><for>vo:{1}</for>\n<entry-set>\n'.format(opts.get('curtime'), opts.get('vo', 'none')))

    def _escape(self, astr):
        # avoid dependency on xml.sax.saxutils.escape
        if type(astr) != str:
            return astr
        astr = astr.replace("&", "&amp;")
        astr = astr.replace("<", "&lt;")
        astr = astr.replace(">", "&gt;")
        astr = astr.replace('"', "&quot;")
        astr = astr.replace("'", "&apos;")
        return astr

    def write(self, data):
        rpath = self._escape(data['path'][len(self._base):])
        xattr = data.get('metadata_xattr', {})
        csumtype = ''
        checksum = ''
        if 'checksum_adler32' in xattr:
            # Cns_file_metadata csumtype & csumvalue columns are deprecated
            # use adler32 checksum from xattr column if available in json data
            csumtype = 'AD'
            checksum = xattr['checksum_adler32']
        elif data.get('metadata_csumtype') == 'AD':
            # don't use other than adler32 checksum, because DPM < 1.10.x
            # did not provide other algorithm and all tools that use these
            # dumps expect either adler32 or empty checksum
            csumtype = 'AD'
            checksum = data['metadata_csumvalue']

        self._fp.write('<entry name="{rpath} ctype="{ctype}" cs="{cs}"'.format(rpath=rpath, ctype=csumtype, cs=checksum))
        self._fp.write(' size="{metadata_filesize}" atime="{metadata_atime}" mtime="{metadata_mtime}"'.format(**data))
        for k, v in sorted(xattr.items()):
            self._fp.write(' {0}="{1}"'.format(k, self._escape(v)))
        self._fp.write(' />\n')

    def finish(self):
        self._fp.write("</entry-set>\n")
        self._fp.write('<complete>"{0}"</complete>\n'.format(datetime.datetime.isoformat(datetime.datetime.now())))
        self._fp.write("</dump>\n")


class JsonFormatter(BaseFormatter):
    def __init__(self, fp, base, opts):
        super(JsonFormatter, self).__init__(fp, base, opts)
        self._columns = [ 'metadata_filesize', 'metadata_atime', 'metadata_mtime', 'metadata_csumtype', 'metadata_csumvalue', 'metadata_xattr' ]
        self._fp.write('{{ "recorded" : "{0}", "for" : "vo:{1}", "entries" : [\n'.format(opts.get('curtime'), opts.get('vo', 'none')))
        self.first_line = True

    def _escape(self, astr):
        if type(astr) != str:
            return astr
        return astr.replace('"', '\\"')

    def write(self, data):
        rpath = self._escape(data['path'][len(self._base):])
        xattr = data.get('metadata_xattr', {})
        csumtype = ''
        checksum = ''
        if 'checksum_adler32' in xattr:
            # Cns_file_metadata csumtype & csumvalue columns are deprecated
            # use adler32 checksum from xattr column if available in json data
            csumtype = 'AD'
            checksum = xattr['checksum_adler32']
        elif data.get('metadata_csumtype') == 'AD':
            # don't use other than adler32 checksum, because DPM < 1.10.x
            # did not provide other algorithm and all tools that use these
            # dumps expect either adler32 or empty checksum
            csumtype = 'AD'
            checksum = data['metadata_csumvalue']

        if self.first_line:
            self.first_line = False
        else:
            self._fp.write(',\n')

        self._fp.write('  {')
        self._fp.write(' "name": "{rpath}, "ctype": "{ctype}", "cs": "{cs}"'.format(rpath=rpath, ctype=csumtype, cs=checksum))
        self._fp.write(', "size": "{metadata_filesize}", "atime": "{metadata_atime}", "mtime": "{metadata_mtime}"'.format(**data))
        for k, v in sorted(xattr.items()):
            self._fp.write(', "{0}": "{1}"'.format(k, self._escape(v)))
        self._fp.write(' }')

    def finish(self):
        if not self.first_line:
            self._fp.write('\n')
        self._fp.write(' ],\n')
        self._fp.write(' "dump_complete" : "{0}"\n'.format(datetime.datetime.isoformat(datetime.datetime.now())))
        self._fp.write('}\n')



class CachedFullPath(object):
    """DPM file metadata stored in database have pointer just
    to parent directory and to build whole path it is necessary
    to recursively query all parent directories to the root "/".

    Althought these queries are pretty cheap they are done for
    each file and even with small latencies (especially with
    database on dedicated server) they can take quite a time.

    This class not only caches past queries, but also limit
    size of cached data not to exhaust whole memory while
    dumping big DPM database with a lot of files.
    """

    def __init__(self, conn=None, conn_data=None, maxsize=1000000):
        self._close = False
        if conn != None:
            self._conn = conn
        elif conn_data != None:
            self._conn = get_connection(conn_data, 'cns_db')
            self._close = True
        else:
            raise Exception("CachedFullPath called without required argument conn or conn_data")

        self._maxsize = maxsize

        self._cache = {}
        self._ntotal = 0
        self._ncached = 0
        self._nqueries = 0
        self._ncleanup = 0

    def __del__(self):
        if _log:
            _log.info("path lookup cache usage: total %i, cached %i, queries %i, cleanup %i", self._ntotal, self._ncached, self._nqueries, self._ncleanup)
        if self._close:
            self._conn.close()

    def get(self, fileid):
        """Recursively get full path for given fileid"""
        self._ntotal += 1
        if fileid == 0:
            return ""

        if fileid in self._cache:
            self._ncached += 1
            return self._cache[fileid]

        if len(self._cache) >= self._maxsize:
            _log.debug("fullpath cache too big (%i), dropping cached records...", len(self._cache))
            self._ncleanup += 1
            del(self._cache)
            self._cache = {}

        sql = "SELECT parent_fileid, name FROM Cns_file_metadata WHERE fileid=%s"
        cursor = self._conn.cursor()
        cursor.execute(sql, (fileid, ))
        res = cursor.fetchone()
        cursor.close()

        self._nqueries += 1

        if _log.getEffectiveLevel() < logging.DEBUG:
            _log.debug("query parent directory '%s': %s", sql, res)

        fullname = None
        if res:
            (parentid, name) = res
            if parentid == 0:
                fullname = ''
            else:
                prefix = self.get(parentid)
                if prefix != None:
                    fullname = prefix + "/" + name
        else:
            _log.warn("db inconsistency: could not find fileid %i (most likely the entry is orphan).", fileid)

        self._cache[fileid] = fullname
        return fullname

    def get_multi(self, fileids):
        """Reduce impact of query latency by resolving paths for multiple
        fileids. Max number of queries still corresponds to the max path
        depth, but all fileids are resolved at the same time."""
        self._ntotal += len(fileids)

        if len(self._cache) + len(fileids) >= self._maxsize:
            _log.debug("fullpath cache too big (%i+%i), dropping cached records...", len(self._cache), len(fileids))
            self._ncleanup += 1
            del(self._cache)
            self._cache = {}

        tores = set()
        id2name = {}
        id2parent = {}
        for fileid in fileids:
            if fileid in self._cache:
                self._ncached += 1
            else:
                tores.add(str(fileid))

        if len(tores) > 0:
            cursor = self._conn.cursor()

            while len(tores) > 0:

                sql = "SELECT fileid, parent_fileid, name FROM Cns_file_metadata WHERE fileid IN ({0})".format(','.join(tores))
                cursor.execute(sql)

                self._nqueries += 1
                tores = set()

                for row in cursor:
                    (fileid, parentid, name) = row

                    if _log.getEffectiveLevel() < logging.DEBUG:
                        _log.debug("query parent directory '%s': %s", sql, row)

                    if parentid == 0:
                        id2name[fileid] = ''
                    else:
                        id2name[fileid] = name
                    id2parent[fileid] = parentid

                    if parentid == 0:
                        pass
                    elif parentid in self._cache:
                        self._ncached += 1
                    elif parentid not in id2parent:
                        tores.add(str(parentid))

            cursor.close()

        ret = {}
        for fileid in fileids:
            if fileid in self._cache:
                ret[fileid] = self._cache[fileid]
            else:
                currid = fileid
                revname = []

                while True:
                    if currid in self._cache:
                        revname.append((currid, self._cache[currid]))
                        self._ncached += 1
                        break
                    elif currid in id2parent:
                        revname.append((currid, id2name[currid]))
                        currid = id2parent[currid]
                        if currid == 0:
                            break
                    else:
                        _log.warn("db inconsistency: could not find fileid %i (most likely the entry is orphan).", fileid)
                        revname = []
                        break

                if len(revname) > 0:
                    revname.reverse()
                    for i in range(len(revname)):
                        currid = revname[i][0]
                        if currid in self._cache: continue
                        fullname = '/'.join([ x[1] for x in revname[:i+1] ])
                        self._cache[currid] = fullname

                    ret[fileid] = self._cache[fileid]

        return ret



def filterByDisknode(replica_hostfs, replica_table = 'replica'):
    where = []

    for host, fss in replica_hostfs.items():
        if None in fss:
            where.append('{0}.host = "{1}"'.format(replica_table, host))
        else:
            for fs in fss:
                where.append('{0}.host = "{1}" AND {0}.fs = "{2}"'.format(replica_table, host, fs))

    return "({0})".format(') OR ('.join(where))



def dump_data(conn_data, formatters, filters = {}, timestamp = 0):
    _log.debug("dump_data")

    # mandatory columns
    cols = [ 'metadata.fileid', 'metadata.parent_fileid', 'metadata.name', 'metadata.mtime' ]
    # union of columns required by all formatters
    ucols = []
    for formatter in formatters:
        for col in formatter.columns:
            dbcol = col.replace('_', '.', 1)
            if dbcol in cols: continue
            if col in METADATA_COLUMNS + REPLICA_COLUMNS:
                cols.append(dbcol)
            else:
                ucols.append(col)
                _log.error("%s use uknown column %s", str(formatter), dbcol)
    if len(ucols) > 0:
        raise Exception("unknown columns: {0}".format(', '.join(ucols)))

    has_replica_column = any([ col.startswith('replica.') for col in cols ])

    where = []
    where.append('(metadata.filemode & {0}) = {0}'.format(stat.S_IFREG)) # only files
    if 'replica_pool' in filters:
        where.append('replica.poolname IN ("{0}")'.format('","'.join(filters['replica_pool'])))
    if 'replica_spacetoken' in filters:
        st2uuid = {}
        conn_dpm = get_connection(conn_data, 'dpm_db')
        cursor = conn_dpm.cursor()
        cursor.execute('SELECT s_token, u_token, poolname, path FROM dpm_space_reserv')
        while True:
            row = cursor.fetchone()
            if row == None: break
            s_token, u_token, poolname, path = row
            st2uuid[u_token.lower()] = s_token
        cursor.close()
        conn_dpm.close()
        replica_setnames = [ st2uuid.get(x.lower(), x) for x in filters['replica_spacetoken'] ]
        where.append('replica.setname IN ("{0}")'.format('","'.join(replica_setnames)))
    if 'metadata_status' in filters:
        where.append('metadata.status = "{0}"'.format(filters['metadata_status']))
    if 'replica_status' in filters:
        where.append('replica.status = "{0}"'.format(filters['replica_status']))
    if len(filters.get('replica_hostfs', [])) > 0:
        where.append(filterByDisknode(filters['replica_hostfs']))
    if len(filters.get('only_replica_hostfs', [])) > 0:
        where.append(filterByDisknode(filters['only_replica_hostfs']))
        # exclude files that have multiple replicas and one replica
        # with "Available" status is on diskserver or filesystem
        # that is not selected by current hostfs filter
        onlyReplicaDisknodeFilter = filterByDisknode(filters['only_replica_hostfs'], replica_table='r')
        excludedReplicaIds = 'SELECT r.fileid FROM Cns_file_replica r WHERE r.status = "-" AND NOT ({0})'.format(onlyReplicaDisknodeFilter)
        #if filter_replica_status: # commented out - we really want only "Available" replicas
        #    excludedReplicaIds = '{0} AND r.status = "{1}"'.format(excludedReplicaIds, filter_replica_status)
        where.append('replica.fileid NOT IN ({0})'.format(excludedReplicaIds))

    colstr = ', '.join([ "{0} AS `{1}`".format(x, x.replace('.', '_', 1)) for x in cols ])
    sql = 'SELECT SQL_BIG_RESULT {0} FROM Cns_file_metadata AS metadata JOIN Cns_file_replica AS replica ON metadata.fileid = replica.fileid'.format(colstr)
    if len(where) > 0:
        sql += ' WHERE ({0})'.format(') AND ('.join(where))
    if not has_replica_column:
        # don't produce duplicate replica records unless result contains column from replica table
        sql += ' GROUP BY replica.fileid'

    # fileid => fullpath
    pathname = CachedFullPath(conn_data=conn_data)

    cnt_rows = 0
    cnt_files = 0
    conn = get_connection(conn_data, 'cns_db', cclass=pymysql_cursors.SSDictCursor)
    cursor = conn.cursor()
    try:
        _log.debug("query file metadata with '%s'", sql)
        cursor.execute(sql)

        while True:
            # retreiving data in chunks dramatically increase performance
            # mostly because of latency associated with each DB query
            rows = cursor.fetchmany(1000)
            if len(rows) == 0: break

            if _log.getEffectiveLevel() < logging.DEBUG:
                _log.debug("fetched %i rows", len(rows))

            fileids = [ row['metadata_parent_fileid'] for row in rows ]
            pathnames = pathname.get_multi(fileids)

            for row in rows:
                cnt_rows += 1
                if _log.getEffectiveLevel() < logging.DEBUG:
                    _log.debug("row %i: %s", cnt_rows, str(row))

                fileid = row['metadata_fileid']
                parent_fileid = row['metadata_parent_fileid']
                name = row['metadata_name']
                mtime = row['metadata_mtime']

                # NOTE: unfortunately mtime & ctime doesn't probably contain
                # timestamp that you would expect, because at least DPM in legacy
                # mode update both timestamps in case of checksum recalculation
                # (done on first SRM download when in legacy mody). That means
                # [cm]time can be set to the value that is much more recent than
                # the time when file was uploaded to the DPM - more details in
                # https://groups.cern.ch/group/dpm-users-forum/Lists/Archive/Flat.aspx?RootFolder=%2Fgroup%2Fdpm-users-forum%2FLists%2FArchive%2FStorage%20dumps%20%20filter%20by%20mtime
                # In pure DPM DOME configuration (without legacy SRM support)
                # these timestamps should not be modified unless you modify file
                # content (e.g. adding checksum should not cause [cm]time update)
                if timestamp != 0 and mtime > timestamp:
                    continue

                prefix = pathnames.get(parent_fileid)
                if prefix is None:
                    _log.error("skipping fileid %i with name '%s', unable to reconstruct path of parent fileid %i", fileid, name, parent_fileid)
                    continue

                result_enhanced = False
                metadata_xattr_parsing_required = 'metadata_xattr' in row
                replica_xattr_parsing_required = 'replica_xattr' in row
                fullpath = prefix + "/" + name
                for formatter in formatters:
                    # filter by base path
                    if not fullpath.startswith(formatter.base):
                        continue

                    # parse metadata xattr json value into dictionary
                    if metadata_xattr_parsing_required:
                        metadata_xattr_parsing_required = False

                        xattr = row.get('metadata_xattr')
                        xattr_dict = collections.defaultdict(lambda: '')
                        if xattr not in [ None, '', '{}' ]:
                            try:
                                # replace '.' in dictionary key to be able to easily access
                                # xattr values using string.Template formatting
                                xattr_dict = collections.defaultdict(lambda: '', [ (k.replace('.', '_'), v) for k, v in json.loads(xattr).items() ])
                            except Exception as e:
                                _log.error("unable to parse metadata_xattr for fileid %i: %s", fileid, str(xattr))
                        row['metadata_xattr'] = xattr_dict

                    # parse replica xattr json value into dictionary
                    if replica_xattr_parsing_required:
                        replica_xattr_parsing_required = False

                        xattr = row.get('replica_xattr')
                        xattr_dict = collections.defaultdict(lambda: '')
                        if xattr not in [ None, '', '{}' ]:
                            try:
                                # replace '.' in dictionary key to be able to easily access
                                # xattr values using string.Template formatting
                                xattr_dict = collections.defaultdict(lambda: '', [ (k.replace('.', '_'), v) for k, v in json.loads(xattr).items() ])
                            except Exception as e:
                                _log.error("unable to parse replica_xattr for fileid %i: %s", fileid, str(xattr))
                        row['replica_xattr'] = xattr_dict

                    # add necessary metadata and formatting strings in the 'row' dictionary
                    if not result_enhanced:
                        result_enhanced = True

                        #row['bpath'] = prefix
                        row['path'] = fullpath
                        row['tab'] = '\t'
                        row['nl'] = '\n'
                        row['comma'] = ','

                    # relative path with respect to formatter base
                    row['rpath'] = fullpath[len(formatter.base):]

                    formatter.write(row)

                # file info processed at least by on formatter
                if result_enhanced:
                    cnt_files += 1

                if cnt_rows % 1000000 == 0:
                    _log.debug("processed %i records (%i files formatted)", cnt_rows, cnt_files)

    except Exception as e:
        # query in progress that use SSDictCursor can be killed only by terminating DB connection
        # (closing / deleting cursor lead to retreival of all selected entries from DB)
        conn.close()
        del(pathname)
        raise

    # Close cursor and connections
    cursor.close()
    conn.close()
    del(pathname)

    _log.info("processed %i records (%i files)", cnt_rows, cnt_files)



def publish_https(filename, location, cert, key):
    _log.debug("publish DPM dump %s to %s", filename, location)
    try:
        import pycurl
    except ImportError as e:
        raise Exception("unable to import pycurl module (install python-pycurl package): {0}".format(str(e)))

    c = pycurl.Curl()
    if _log.getEffectiveLevel() < logging.DEBUG:
        c.setopt(pycurl.VERBOSE, True)
    c.setopt(c.SSLCERT, cert)
    c.setopt(c.SSLKEY, key)
    c.setopt(c.SSL_VERIFYPEER, 0)
    c.setopt(c.SSL_VERIFYHOST, 2)
    c.setopt(c.FOLLOWLOCATION, 1)

    # check path exists
    lurl = urlparse.urlparse(location)
    burl = lurl._replace(path=os.path.dirname(lurl.path))
    c.setopt(c.URL, burl.geturl())
    c.setopt(c.NOBODY, True)
    c.setopt(c.CUSTOMREQUEST, "HEAD")
    c.perform()
    if (c.getinfo(c.HTTP_CODE) != 200):
        raise Exception("path {0} not found".format(burl.geturl()))

    # delete existing file
    c.setopt(c.URL, location)
    c.setopt(c.NOBODY, True)
    c.setopt(c.CUSTOMREQUEST, "DELETE")
    c.perform()
    if (c.getinfo(c.HTTP_CODE) != 204 and c.getinfo(c.HTTP_CODE) != 404):
        raise Exception("can't delete {0}".format(location))

    # put the new file
    c.setopt(c.CUSTOMREQUEST, "PUT")
    c.setopt(c.NOBODY, False)
    # suppress the response body
    c.setopt(c.WRITEFUNCTION, lambda x: None)
    c.setopt(pycurl.UPLOAD, 1)
    c.setopt(pycurl.READFUNCTION, open(filename, 'rb').read)
    c.setopt(pycurl.INFILESIZE, os.path.getsize(filename))
    c.perform()
    if (c.getinfo(c.HTTP_CODE) == 201):
        _log.info("uploaded %s", location)
    else:
        _log.error("upload %s error: %i", location, c.getinfo(c.HTTP_CODE))

    c.close()



def publish_xrootd(datalist, cert, key):
    _log.debug("publish %i DPM dumps with xrootd protocol", len(datalist))

    # set environment for XRootD transfers
    # XRD_* env variables must be set befor importing XRootD module
    if _log.getEffectiveLevel() < logging.DEBUG:
        os.putenv('XRD_LOGLEVEL', 'Dump')
        #os.putenv('XRD_LOGFILE', '/tmp/xrootd.debug')
        os.putenv('XRD_LOGMASK', 'All')
    os.putenv('XRD_CONNECTIONWINDOW', '10') # by default connection timeouts after 300s
    #os.putenv('XRD_REQUESTTIMEOUT', '10') # can be set individually for each operation

    # set X509_* env variable used by XRootD authentication
    if os.getenv('X509_USER_CERT', cert) != cert:
        _log.info("overwriting X509_USER_CERT (%s) with %s", os.getenv('X509_USER_CERT'), cert)
    os.putenv('X509_USER_CERT', cert)
    if os.getenv('X509_USER_KEY', key) != key:
        _log.info("overwriting X509_USER_KEY (%s) with %s", os.getenv('X509_USER_KEY'), key)
    os.putenv('X509_USER_KEY', key)

    try:
        import XRootD.client
    except ImportError as e:
        raise Exception("unable to import XRootD module (install python2-xrootd or python34-xrootd package): {0}".format(str(e)))

    process = XRootD.client.CopyProcess()
    for filename, location in datalist:
        process.add_job(filename, location, force=True)

    status = process.prepare()
    if not status.ok:
        raise Exception("unable to prepare XRootD transfer ({0})".format(str(status)))

    status, details = process.run()
    if not status.ok:
        succeeded = 0
        failed = 0
        for i in range(len(datalist)):
            filename, location = datalist[i]
            tstatus = None
            if len(details) > i:
                tstatus = details[i].get('status')
            if tstatus == None:
                failed += 1
                _log.error("unknown transfer status from %s to %s", filename, location)
            else:
                if not tstatus.ok:
                    failed += 1
                    _log.error("transfer status from %s to %s: %s", filename, location, str(tstatus))
                else:
                    succeeded += 1
                    _log.debug("transfer succeeded from %s to %s", filename, location)

        raise Exception("xrootd transfers failed (succeeded: %i, failed: %i)", succeeded, failed)



#=====================================================================
# main
#=====================================================================
if __name__ == '__main__':
    # basic logging configuration
    streamHandler = logging.StreamHandler(sys.stderr)
    streamHandler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s](%(module)s:%(lineno)d) %(message)s", "%d %b %H:%M:%S"))
    _log.addHandler(streamHandler)
    _log.setLevel(logging.WARN)

    # parse options from command line
    def opt_set_loglevel(option, opt, value, parser):
        loglevel = option.default
        if value != None:
            loglevel = int({
                'CRITICAL': logging.CRITICAL,
                'DEBUG': logging.DEBUG,
                'ERROR': logging.ERROR,
                'FATAL': logging.FATAL,
                'INFO': logging.INFO,
                'NOTSET': logging.NOTSET,
                'WARN': logging.WARN,
                'WARNING': logging.WARNING,
            }.get(value, value))

        _log.setLevel(loglevel)
        setattr(parser.values, option.dest, loglevel)

    class IndentedHelpFormatterWithEpilogExamples(optparse.IndentedHelpFormatter):
        def format_epilog(self, epilog):
            import textwrap
            if not epilog:
                return ""
            text_width = self.width - self.current_indent
            indent = " "*self.current_indent
            sections = []
            if type(epilog) == str:
                sections.append(textwrap.fill(text, text_width, initial_indent=indent, subsequent_indent=indent))
            else:
                example_section = False
                for epilog_section in epilog:
                    if not epilog_section.startswith('EXAMPLE '):
                        sections.append(textwrap.fill(epilog_section, text_width, initial_indent=indent, subsequent_indent=indent))
                        sections.append('')
                        example_section = False
                    else:
                        if not example_section:
                            sections.append('Examples:')
                            example_section = True
                        sections.append("  {0}{1}".format(indent, epilog_section[len('EXAMPLE '):].replace("\n", "\n{0}".format(indent))))
            return "\n{0}\n".format("\n".join(sections))

    # default config values
    guess_nsconfig = guess_config_files()

    # command line arguments
    usage = "usage: %prog [options]"
    epilog = []
    epilog.append(
        "WARNING: be aware that \"age\" and \"date\" configuration option doesn't produce expected results for DPM in legacy (SRM) mode. "
        "File modification time (ctime & mtime) is not handled consistently in DPM implementation, "
        "e.g. both timestamps are updated on first file download during checksum calculation.")
    epilog.append(
        "DUMPPATH dpm_path[,output_path[,key=val]] must contain lfn path that is used as a filter "
        "to retrieve only files within specific subdirectory in DPM namespace. "
        "Optionally you can specify output_path that points to the file or http URL "
        "where to store output data, otherwise everything is printed to the standard output. "
        "Output file is automatically compressed with gzip, bzip or lzma according "
        "suffix (.gz, .bz2, .lzma). Each formater can have their own specific "
        "configuratin options that can be passed as additional key=value arguments.")
    epilog.append(
        "HOSTFS diskserver_fqdn[:/filesystem] specifies filter for replica on given diskserver "
        "and optionally filesystem (e.g. diskserver1.example.com or diskserver1.example.com:/mnt/fs1)")
    epilog.append(
        "POOL name of existing DPM pool")
    epilog.append(
        "SPACETOKEN name of existing spacetoken or its uuid")
    epilog.append(
        "Plain text formatter (txt) can use customized header, footer and per file output format. "
        "Standard python str.format function is called and fomatting string can use all features provides by this function. "
        "List of available attributes for header/footer formatting can be found in PlaintextFormatter.__init__, PlaintextFormatter.finish method. "
        "File record formatting can use 'path', 'rpath' (relative path with respect to formatter base path) "
        "and all available columns from file metadata and file replica table ({0}). "
        "Columns 'metadata_xattr' and 'replica_xattr' contains dictionary, "
        "to access \"xattr\" data you have to use e.g. {{metadata_xattr[checksum_adler32]}} "
        "(all '.' characters in xattr dictionary keys are replaced with '_').".format(', '.join(METADATA_COLUMNS + REPLICA_COLUMNS)))
    epilog.append("EXAMPLE # dump everything in stdout in plain text format")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-path=")
    epilog.append("EXAMPLE # dump everything in compressed file in plain text format")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-path=,/tmp/dump.txt.gz")
    epilog.append("EXAMPLE # dump everything in compressed file in plain json format")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --json-path=,/tmp/dump.json.bz2,vo=vo_name")
    epilog.append("EXAMPLE # dump everything in compressed file in plain xml format")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --xml-path=,/tmp/dump.xml.lzma,vo=vo_name")
    epilog.append("EXAMPLE # dump several directories in different output files in different formats")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-path=/dpm/fqdn/home/atlas,/tmp/dump.atlas.txt --json-path=/dpm/fqdn/home/cms,/tmp/dump.cms.json,vo=cms")
    epilog.append("EXAMPLE # upload dump output to the storage directory using WebDAV")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-path=/dpm/fqdn/home/atlas,davs://dpmheadnode.fqdn:443/dpm/fqdn/home/atlas/dump.txt")
    epilog.append("EXAMPLE # upload dump output to the storage directory using XRootD")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-path=/dpm/fqdn/home/atlas,root://dpmheadnode.fqdn:1094//dpm/fqdn/home/atlas/dump.txt")
    epilog.append("EXAMPLE # plain text output format is configurable (header, line, footer)")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-header='# command: {command}{nl}# ...{nl}' --txt-format='{path}{nl}' --txt-footer='# records: {records}{nl}# ...{nl}# finished OK{nl}' --txt-path=/dpm/fqdn/home,/tmp/dump1.txt")
    epilog.append("EXAMPLE # each plain text output can also have individualy configured (header, line, footer)")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --txt-path='/dpm/fqdn/home,/tmp/dump2.txt,header=# header{nl},format={path};{metadata_fileid}{tab}{metadata_parent_fileid}{comma}{metadata_mtime}{nl},footer=# footer{nl}'")
    epilog.append("EXAMPLE # enable logging with given log level to the log file rotated at given size")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --log-level=DEBUG --log-file=/var/log/dpm-dump.log --log-size=1048576 --log-backup=2 --txt-path=")
    epilog.append("EXAMPLE # dump data from DPM pools (mypool1, mypool2)")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --filter-replica-pool=mypool1 --filter-replica-pool=mypool2 --txt-path=")
    epilog.append("EXAMPLE # dump data from DPM spacetokens (dteam, ops)")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --filter-replica-spacetoken=dteam --filter-replica-spacetoken=ops --txt-path=")
    epilog.append("EXAMPLE # dump data from one diskserver plus one filesystem on the other diskserver")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --filter-replica-hostfs=dpmpoolxx.fqdn --filter-replica-hostfs=dpmpoolyy.fqdn:/mnt/fs1 --txt-path=")
    epilog.append("EXAMPLE # dump pfn name from a diskserver plus one filesystem on different diskserver")
    epilog.append("EXAMPLE # with no replicas elsewhere (to declare temporarily unavailable Rucio files)")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --filter-only-replica-hostfs=dpmpoolxx.fqdn --filter-only-replica-hostfs=dpmpoolyy.fqdn:/mnt/fs1 --txt-format='root://dpmheadnode.fqdn:1094/{path}{nl}' --txt-path=/dpm/fqdn/home/vo,file:///tmp/dump.vo-temporarily-unavailable.txt")
    epilog.append("EXAMPLE # dump all pfn and replicaid stored only on one diskserver including files")
    epilog.append("EXAMPLE # with special file/replicas status (useful to cleanup DPM namespace after")
    epilog.append("EXAMPLE # you e.g. loose completely data on one diskserver)")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --filter-replica-hostfs=dpmpoolxx.fqdn --txt-format=\"dmlite-shell -e 'replicadel {path} {replica_rowid}' # {replica_host}:{replica_fs}{nl}\" --txt-path=,file:///tmp/dump.dpmpoolxx.txt")
    epilog.append("EXAMPLE ")
    epilog.append("EXAMPLE # ATLAS storage dumps")
    epilog.append("EXAMPLE # ===================")
    epilog.append("EXAMPLE # storage dump for PRAGUELCG2 data and scratch disk")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --log-level=INFO --log-file=/var/log/dpm-dump.log --txt-path=/dpm/farm.particle.cz/home/atlas/atlasdatadisk/rucio/,davs://golias100.farm.particle.cz:443/dpm/farm.particle.cz/home/atlas/atlasdatadisk/dumps/dump_20181228 --txt-path=/dpm/farm.particle.cz/home/atlas/atlasscratchdisk/rucio/,davs://golias100.farm.particle.cz:443/dpm/farm.particle.cz/home/atlas/atlasscratchdisk/dumps/dump_20181228")
    epilog.append("EXAMPLE # legacy (deprecated) dump for PRAGUELCG2 data disk")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump -t /tmp/ATLASDATADISK-dump_20181228 -p /dpm/farm.particle.cz/home/atlas/atlasdatadisk/rucio -a -1")
    epilog.append("EXAMPLE # temporary unavailable PRAGUELCG2 pfn dump for rucio, one diskserver and one specific filesystem from second diskserver")
    epilog.append("EXAMPLE python /usr/bin/dpm-dump --log-level=INFO --log-file=/var/log/dpm-dump.log --txt-path=/dpm/farm.particle.cz/home/atlas --txt-format='root://golias100.farm.particle.cz/{path}{nl}' --filter-only-replica-hostfs=dpmpool1.farm.particle.cz --filter-only-replica-hostfs=dpmpool2.farm.particle.cz:/mnt/fs7")
    description = "Dumps the content of DPM storage element into a file that can be used for consistency checks (e.g. with Rucio database)."
    parser = optparse.OptionParser(usage=usage, description=description, version="%prog", epilog=epilog, formatter=IndentedHelpFormatterWithEpilogExamples())
    parser.add_option("-v", "--verbose", dest="loglevel", action="callback", callback=opt_set_loglevel, default=logging.DEBUG, help="set log level to DEBUG")
    parser.add_option("-q", "--quiet", dest="loglevel", action="callback", callback=opt_set_loglevel, default=logging.ERROR, help="set log level to ERROR")
    parser.add_option("--log-level", dest="loglevel", action="callback", callback=opt_set_loglevel, type="string", help="set log level, default: %default")
    parser.add_option("--log-file", dest="logfile", metavar="FILE", help="set log file (default: %default)")
    parser.add_option("--log-size", dest="logsize", type="int", default=10*1024*1024, help="maximum size of log file, default: %default")
    parser.add_option("--log-backup", dest="logbackup", type="int", default=4, help="number of log backup files, default: %default")
    # db command line options
    parser.add_option("-c", "--nsconfig", help="NSCONFIG file with sql connection info. Default: {0}".format(guess_nsconfig), default=guess_nsconfig)
    parser.add_option('--dbhost', dest='dbhost', default=None, help="database host (overwrite host from NSCONFIG)")
    parser.add_option('--dbuser', dest='dbuser', default=None, help="database user (overwrite user from NSCONFIG)")
    parser.add_option('--dbpwd', dest='dbpwd', default=None, help="database password (overwrite password from NSCONFIG)")
    parser.add_option('--dbname', dest='dbname', default=None, help="database name (overwrite cns database from NSCONFIG)")
    parser.add_option('--dbdpm', dest='dbdpm', default=None, help="DPM database name overwrite dpm database from NSCONFIG)")
    # legacy command line options
    parser.add_option("-x", "--xml", help="create output file in XML format (deprecated).", metavar="XMLFILE")
    parser.add_option("-j", "--json", help="create output file in JSON format (deprecated).", metavar="JSONFILE")
    parser.add_option("-t", "--txt", help="create output file in TXT format (deprecated).", metavar="TXTFILE")
    parser.add_option("-p", "--path", help="dump only files within this DPNS path (deprecated), default: %default", default="/", metavar="PATH")
    parser.add_option("-a", "--age", help="dump only files older than AGE days (warning), default: %default", metavar="AGE")
    parser.add_option("-D", "--date", help="dump only files up to the day before date, format YYYYMMDD (warning)", metavar="DATE")
    parser.add_option("-V", "--vo", help="VO information used by JSON and XML formatters, default: %default", default="none")
    parser.add_option("-F", "--filter", help="filter dpm diskserver or diskserver filesystem, default: %default", default=None)
    # new command line options
    parser.add_option("--xml-path", action="append", help="dump files within this DPNS path in XML format to stdout, local file or upload to DPM", default=[], metavar="DUMPPATH")
    parser.add_option("--json-path", action="append", help="dump files within this DPNS path in JSON format to stdout, local file or upload to DPM", default=[], metavar="DUMPPATH")
    parser.add_option("--txt-path", action="append", help="dump files within this DPNS path in TXT format to stdout, local file or upload to DPM", default=[], metavar="DUMPPATH")
    parser.add_option("--txt-header", help="TXT output file header format, default: %default", default=None, metavar="FORMAT")
    parser.add_option("--txt-footer", help="TXT output file footer format, default: %default", default=None, metavar="FORMAT")
    parser.add_option("--txt-format", help="TXT output file line format, default: %default", default="{rpath}{nl}", metavar="FORMAT")
    parser.add_option("--filter-metadata-status", help="filter file metadata statuses (None, Online, Migrated), default: %default", default="Online")
    parser.add_option("--filter-replica-status", help="filter file replica statuses (None, Available, BeingPopulated, ToBeDeleted), default: %default", default="Available")
    parser.add_option("--filter-replica-pool", action="append", help="", default=[], metavar="POOL")
    parser.add_option("--filter-replica-spacetoken", action="append", help="", default=[], metavar="SPACETOKEN")
    parser.add_option("--filter-replica-hostfs", action="append", help="retrive data for specific diskserver and optionally filesystem", default=[], metavar="HOSTFS")
    parser.add_option("--filter-only-replica-hostfs", action="append", help="retrive data for replicas on given diskserver and optionally filesystem with no other replica on other diskservers and/or filesystems", default=[], metavar="HOSTFS")
    parser.add_option("--cert", help="path to host certificate used for data uploads, default: %default", default="/etc/grid-security/hostcert.pem")
    parser.add_option("--key", help="path to host key used for data uploads, default: %default", default="/etc/grid-security/hostkey.pem")
    parser.add_option("--tmp", help="custom directory for temporary dump files", default=None, metavar="PATH")
    parser.add_option("--keep-failed", action="store_true", help="keep temporary files in case output transfer fails, default: %default", default=False)

    (options, args) = parser.parse_args()

    if options.logfile == '-':
        _log.removeHandler(streamHandler)
        streamHandler = logging.StreamHandler(sys.stdout)
        streamHandler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s](%(module)s:%(lineno)d) %(message)s", "%d %b %H:%M:%S"))
        _log.addHandler(streamHandler)
    elif options.logfile != None and options.logfile != '':
        #fileHandler = logging.handlers.TimedRotatingFileHandler(options.logfile, 'midnight', 1, 4)
        fileHandler = logging.handlers.RotatingFileHandler(options.logfile, maxBytes=options.logsize, backupCount=options.logbackup)
        fileHandler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s](%(module)s:%(lineno)d) %(message)s", "%d %b %H:%M:%S"))
        _log.addHandler(fileHandler)
        _log.removeHandler(streamHandler)

    _log.info("command: %s", " ".join(sys.argv))
    _log.info("script: %s", os.path.abspath(inspect.getfile(inspect.currentframe())))
    _log.info("version: %s", __version__)
    _log.info("sha256: %s", hashlib.sha256(open(__file__).read().encode('utf-8')).hexdigest())
    _log.info("python: %s", str(sys.version_info))
    _log.info("user: %s@%s", getpass.getuser(), socket.gethostname())
    _log.info("system load: %s", str(os.getloadavg()))

    # validate command line options
    timestamp = 0
    if options.date != None:
        timestamp = int(time.mktime(datetime.datetime.strptime(str(options.date), "%Y%m%d").timetuple()))
    elif options.age != None:
        now = int(time.time())
        age = int(options.age)
        timestamp = now-86400*age

    if timestamp != 0 and timestamp < int(time.time()):
        _log.info("filtering older data according ctime or mtime doesn't produce expected results for DPM in legacy (SRM) mode")

    # database connection data
    conn_data = {}
    if options.nsconfig not in [ None, '' ]: conn_data = get_conn_data(options.nsconfig)
    if options.dbhost != None: conn_data['host'] = options.dbhost
    if options.dbuser != None: conn_data['user'] = options.dbuser
    if options.dbpwd != None: conn_data['pass'] = options.dbpwd
    if options.dbname != None: conn_data['cns_db'] = options.dbname
    if options.dbdpm != None: conn_data['dpm_db'] = options.dbdpm
    if not ('host' in conn_data and 'user' in conn_data and 'pass' in conn_data and 'cns_db' in conn_data and 'dpm_db' in conn_data):
        _log.error("incomplete database connection data %s, use either nsconfig or db* command line options", conn_data)
        sys.exit(1)

    filters = {}

    # filters for pool and spacetokens
    if len(options.filter_replica_pool) > 0:
        filters['replica_pool'] = options.filter_replica_pool
    if len(options.filter_replica_spacetoken) > 0:
        filters['replica_spacetoken'] = options.filter_replica_spacetoken

    # filters for disknodes and filesystems
    if options.filter != None:
        host, fs = list(options.filter.split(':', 1)) if options.filter.find(':') != -1 else (options.filter, None)
        filters['replica_hostfs'].setdefault(host.lower(), []).append(fs)
    if len(options.filter_replica_hostfs) > 0:
        filters['replica_hostfs'] = {}
        for hostfs in options.filter_replica_hostfs:
            host, fs = list(hostfs.split(':', 1)) if hostfs.find(':') != -1 else (hostfs, None)
            filters['replica_hostfs'].setdefault(host.lower(), []).append(fs)
    if len(options.filter_only_replica_hostfs) > 0:
        filters['only_replica_hostfs'] = {}
        for hostfs in options.filter_only_replica_hostfs:
            host, fs = list(hostfs.split(':', 1)) if hostfs.find(':') != -1 else (hostfs, None)
            filters['only_replica_hostfs'].setdefault(host.lower(), []).append(fs)

    # file metadata status filter (pydmlite.FileStatus)
    if options.filter_metadata_status.lower() == 'none':
        pass # no filtering on metadata file status
    elif options.filter_metadata_status.lower() in ('o', 'online', '-'):
        filters['metadata_status'] = '-' # pydmlite.FileStatus.kOnline
    elif options.filter_metadata_status.lower() in ('m', 'migrated'):
        filters['metadata_status'] = 'm' # pydmlite.FileStatus.kMigrated
    else:
        _log.error("invalid file metadata status \"%s\"", options.filter_metadata_status)
        sys.exit(1)

    # file replica status filter (pydmlite.ReplicaStatus)
    if options.filter_replica_status.lower() == 'none':
        pass # no filtering on replica status
    elif options.filter_replica_status.lower() in ('a', 'available', '-'):
        filters['replica_status'] = '-' # pydmlite.ReplicaStatus.kAvailable
    elif options.filter_replica_status.lower() in ('p', 'beingpopulated'):
        filters['replica_status'] = 'p' # pydmlite.ReplicaStatus.kBeingPopulated
    elif options.filter_replica_status.lower() in ('d', 'tobedeleted'):
        filters['replica_status'] = 'd' # pydmlite.ReplicaStatus.kToBeDeleted
    else:
        _log.error("invalid file replica status \"%s\"", options.filter_replica_status)
        sys.exit(1)

    # Configure requested dumps
    oconfig = []
    curtime = datetime.datetime.isoformat(datetime.datetime.now())
    for format in [ 'xml', 'json', 'txt' ]:

        # legacy command line options
        if getattr(options, format) != None:
            opts = { 'curtime': curtime, 'vo': options.vo }
            output = getattr(options, format)
            if output.startswith('/'):
                opts['output'] = "file://{0}".format(output)
            else:
                opts['output'] = "file://{0}".format(os.path.realpath(output))
            oconfig.append((format, options.path, opts))

        # new command line options
        for params in getattr(options, "{0}_path".format(format), []):
            path = None
            opts = { 'curtime': curtime }
            for pos, param in enumerate(params.split(',')):
                if pos == 0: path = param
                elif pos == 1: opts['output'] = param
                else:
                    # formatter specific key=value options
                    key, val = param.split('=', 1)
                    opts[key] = val

            if opts.get('output', '') == '':
                opts['output'] = 'stdout://'
            elif opts['output'].startswith('/'):
                opts['output'] = 'file://{0}'.format(opts['output'])
            elif not opts['output'].startswith('file://'):
                opts['output'] = "file://{0}".format(os.path.realpath(opts['output']))

            oconfig.append((format, path, opts))

    if len(oconfig) == 0:
        _log.error("no output defined")
        sys.exit(1)

    fclazz = {
        'xml': XmlFormatter,
        'json': JsonFormatter,
        'txt': PlaintextFormatter,
    }

    formatters = []
    outputs = []
    for format, path, opts in oconfig:
        _log.debug("requested dump: format=%s, path=%s, opts=%s", format, path, opts)
        if path != '' and not path.endswith('/'):
            path = "{0}/".format(path)
        ourl = urlparse.urlparse(opts.get('output', 'stdout://'))
        if ourl.scheme not in [ 'stdout', 'file', 'davs', 'https', 'root' ]:
            _log.warn("skipping output with unsupported protocol scheme: %s", ourl.geturl())
            continue

        # open output file
        fp = None
        ofile = None
        if ourl.scheme == 'stdout':
            fp = sys.stdout

        else:
            if ourl.scheme == 'file':
                ofile = ourl.path
            else:
                bfile = os.path.basename(ourl.path)
                fp, ofile = tempfile.mkstemp(suffix=".{0}".format(bfile), prefix='dpm-dump.', dir=options.tmp)
                # python < 3.3 doesn't support bz2 opened from file object - always use directly filename
                os.close(fp)
                tempfiles.append(ofile)

            if ofile.endswith('.gz'):
                import gzip
                fp = gzip.open(ofile, 'wb')
            elif ofile.endswith('.bz2'):
                import bz2
                fp = bz2.BZ2File(ofile, 'wb')
            elif ofile.endswith('.lzma'):
                import lzma
                fp = lzma.LZMAFile(ofile, "wb")
            else:
                fp = open(ofile, 'wb')

            # allow to write strings (instead of bytes) also in python3
            if sys.version_info >= (3,):
                import io
                fp = io.TextIOWrapper(fp)

        # parse command line options for given output format
        for k in list(options.__dict__.keys()):
            prefix = "{0}_".format(format)
            if not k.startswith(prefix): continue
            key = k[len(prefix):]
            if key in opts: continue
            opts[key] = getattr(options, k)

        formatter = fclazz[format](fp, path, opts)
        formatters.append(formatter)
        outputs.append((formatter, ourl, ofile, fp))

    _log.info("dump data from host=%s, database=%s (%s)", conn_data['host'], conn_data['cns_db'], conn_data['dpm_db'])
    dump_start = time.time()
    dump_data(conn_data, formatters, filters=filters, timestamp=timestamp)
    dump_end = time.time()
    _log.info("all database entries processed")

    published_ok = True
    xroot_uploads = []
    for formatter, ourl, ofile, fp in outputs:
        formatter.finish()

        if ourl.scheme not in [ 'stdout' ]:
            # close all files (including temporary used later for davs/root upload)
            fp.close()

        if ourl.scheme in [ 'stdout', 'file' ]:
            pass

        elif ourl.scheme in [ 'davs', 'https' ]:
            try:
                location = ourl.geturl()
                if ourl.scheme == 'davs':
                    location = ourl._replace(scheme='https').geturl()
                publish_https(ofile, location, options.cert, options.key)
            except Exception as e:
                _log.error("unable to upload %s to %s: %s", ofile, ourl.geturl(), str(e))
                published_ok = False
                if options.keep_failed and ofile in tempfiles:
                    _log.info("keep %s for manual upload (location: %s)", ofile, ourl.geturl())
                    tempfiles.remove(ofile)

        elif ourl.scheme in [ 'root' ]:
            # XRootD can deal with parallel uploads
            xroot_uploads.append((ofile, ourl.geturl()))

        else:
            _log.info("unknow output scheme '%s' for %s", ourl.scheme, ourl.geturl())

    # XRootD parallel upload
    if len(xroot_uploads) > 0:
        try:
            publish_xrootd(xroot_uploads, options.cert, options.key)
        except Exception as e:
            _log.error("unable to upload files with xrootd protocol: %s", str(e))
            published_ok = False
            if options.keep_failed:
                for ofile, location in xroot_uploads:
                    if ofile in tempfiles:
                        _log.info("keep %s for manual upload (location: %s)", ofile, location)
                        tempfiles.remove(ofile)

    _log.info("done (dump: %s, upload: %s)", int(dump_end-dump_start), int(time.time()-dump_end))

    if not published_ok:
        sys.exit(1)