Skip to content
Snippets Groups Projects
Commit 6f4d4508 authored by Dave Casper's avatar Dave Casper
Browse files

Merge branch 'xrootd' into 'master'

Xrootd support

See merge request faser/calypso!107
parents bb9a31d6 1bd07511
No related branches found
No related tags found
No related merge requests found
################################################################################
# Package: FaserEventsPlugins
################################################################################
# Declare the package name:
atlas_subdir( FaserEventsPlugins )
# External dependencies:
find_package( tdaq-common COMPONENTS ers EventStorage )
find_package( Boost COMPONENTS system )
find_package( Xrootd COMPONENTS Posix PosixPreload )
find_package( Davix )
# Make sure that libraries are linked correctly:
atlas_disable_as_needed()
atlas_add_library( fReadXRootD
src/fReadXRootD.h src/fReadXRootD.cxx
NO_PUBLIC_HEADERS
PRIVATE_INCLUDE_DIRS FaserEventStorage ${TDAQ-COMMON_INCLUDE_DIRS}
${XROOTD_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS}
PRIVATE_LINK_LIBRARIES FaserEventStorageLib ${TDAQ-COMMON_LIBRARIES}
${XROOTD_LIBRARIES} ${Boost_LIBRARIES}
PRIVATE_DEFINITIONS -D_LARGEFILE_SOURCE -D_LARGEFILE64_SOURCE
-D_FILE_OFFSET_BITS=64 )
atlas_add_library( fReadPlain
src/fReadPlain.h src/fReadPlain.cxx
NO_PUBLIC_HEADERS
PRIVATE_INCLUDE_DIRS FaserEventStorage ${TDAQ-COMMON_INCLUDE_DIRS}
${Boost_INCLUDE_DIRS}
PRIVATE_LINK_LIBRARIES FaserEventStorageLib ${TDAQ-COMMON_LIBRARIES}
${Boost_LIBRARIES})
atlas_add_library( fReadDavix
src/fReadDavix.h src/fReadDavix.cxx
NO_PUBLIC_HEADERS
PRIVATE_INCLUDE_DIRS FaserEventStorage ${TDAQ-COMMON_INCLUDE_DIRS}
${DAVIX_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS}
PRIVATE_LINK_LIBRARIES FaserEventStorageLib ${TDAQ-COMMON_LIBRARIES}
${DAVIX_LIBRARIES} ${Boost_LIBRARIES} )
/*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
*/
#include "ers/ers.h"
#include <fcntl.h>
#include "fReadDavix.h"
#include "EventStorage/EventStorageIssues.h"
// http://dmc-docs.web.cern.ch/dmc-docs/docs/davix-epel/html/lib-examples.html
// https://root.cern.ch/doc/master/TDavixFile_8cxx_source.html
static int TDavixFile_http_authn_cert_X509(void *userdata, const Davix::SessionInfo &info, Davix::X509Credential *cert, Davix::DavixError **err) {
(void) userdata; // keep quiete compilation warnings
(void) info;
// user proxy
std::string ucert, ukey;
if ( std::getenv("X509_USER_PROXY")) {
ucert = ukey = std::getenv("X509_USER_PROXY");
}
if (ucert.empty() || ukey.empty()) {
Davix::DavixError::setupError(err, "fReadDavix.cxx",
Davix::StatusCode::AuthentificationError,
"Could not set the user's proxy or certificate");
return -1;
}
return cert->loadFromFilePEM(ukey, ucert, "", err);
}
fReadDavix::fReadDavix()
{
m_pfd = 0;
m_offset = 0;
m_fd = nullptr;
m_davixParam = new Davix::RequestParams();
m_err = NULL;
m_pos = new Davix::DavPosix(&m_c);
//m_pos = &m_c;
// enableGridMode
const char *env_var = NULL;
if( ( env_var = std::getenv("X509_CERT_DIR")) == NULL){
env_var = "/etc/grid-security/certificates/";
}
m_davixParam->addCertificateAuthorityPath(env_var);
m_davixParam->setTransparentRedirectionSupport(true);
m_cert = new Davix::X509Credential();
m_davixParam->setClientCertCallbackX509(&TDavixFile_http_authn_cert_X509, NULL);
}
fReadDavix::~fReadDavix()
{
this->closeFile();
}
bool fReadDavix::isOpen()
{
return m_pfd != 0;
}
bool fReadDavix::isEoF()
{
return false;
}
bool fReadDavix::fileExists(std::string fName) const
{
Davix::DavixError* err2 = NULL;
DAVIX_FD* pfd = m_pos->open(m_davixParam, fName.c_str(), O_RDONLY, &err2);
if(pfd == 0) return false;
m_pos->close(pfd, &err2);
return true;
}
void fReadDavix::openFile(std::string fName)
{
if(this->isOpen()) this->closeFile();
m_fd = m_pos->open(m_davixParam, fName.c_str(), O_RDONLY, &m_err);
m_offset = 0;
m_pfd = 1;
}
void fReadDavix::closeFile()
{
if(m_pfd != 0) m_pos->close(m_fd, &m_err);
m_pfd = 0;
}
void fReadDavix::readData(char *buffer, unsigned int sizeBytes)
{
if (sizeBytes==0) return;
if(this->isOpen())
{
unsigned int totalRead=0,ntry=0;
while(sizeBytes > totalRead)
{
ssize_t ret = m_pos->pread(m_fd, buffer, sizeBytes, m_offset, &m_err);
if (ret < 0) {
std::stringstream mystream;
mystream << "fReadDavix::readData: can not read data with davix " << m_err->getErrMsg().c_str() << " " << m_err->getStatus();
Davix::DavixError::clearError(&m_err);
EventStorage::ReadingIssue ci(ERS_HERE, mystream.str().c_str());
ers::warning(ci);
return;
} else {
m_offset += ret;
}
totalRead += ret; ++ntry;
if(ntry>5) {
std::stringstream mystream;
mystream << "Problem reading from the data file. "
<<"fReadDavix::readData asked to read "<<sizeBytes
<<" bytes and managed to read only "<<totalRead
<<" bytes.";
EventStorage::ReadingIssue ci(ERS_HERE, mystream.str().c_str());
ers::warning(ci);
return;
}
}
}
}
int64_t fReadDavix::getPosition()
{
if(this->isOpen()) return m_offset;
return -1;
}
void fReadDavix::setPosition(int64_t p)
{
if(this->isOpen()) m_offset = p;
}
void fReadDavix::setPositionFromEnd(int64_t p)
{
dav_off_t ret;
if(this->isOpen()) {
ret = m_pos->lseek64(m_fd, p, SEEK_END, &m_err);
m_offset = ret;
}
}
fRead * fReadDavix::newReader() const
{
fReadDavix * nfr = new fReadDavix();
return (fRead *)nfr;
}
extern "C" {
fRead * fReadFactory()
{
fReadDavix * nfr = new fReadDavix();
return (fRead *)nfr;
}
}
/*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
*/
#ifndef FREADDAVIX_H
#define FREADDAVIX_H
#include "FaserEventStorage/fRead.h"
#include "davix.hpp"
class fReadDavix : public fRead
{
public:
fReadDavix();
~fReadDavix();
bool isOpen();
bool isEoF();
bool fileExists(std::string fName) const;
void openFile(std::string fName);
void closeFile();
void readData(char *buffer, unsigned int sizeBytes);
int64_t getPosition();
void setPosition(int64_t p);
void setPositionFromEnd(int64_t p);
fRead * newReader() const;
private:
int m_pfd; // current file, used as bool to check if file is open
int64_t m_offset;
Davix::Context m_c;
Davix::RequestParams *m_davixParam;
Davix::DavixError* m_err;
Davix::DavPosix *m_pos;
Davix::X509Credential *m_cert;
DAVIX_FD* m_fd; // davix pointer to current file
};
#endif
/*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
*/
#include "ers/ers.h"
#include <fcntl.h>
#include "fReadXRootD.h"
#include "EventStorage/EventStorageIssues.h"
// external XRootD functions from ROOT net/xrootd/src/xrootd/src/XrdPosix/XrdPosixXrootd.hh
class XrdPosixCallBack;
class XrdPosixXrootd {
public:
static int Open(const char *path, int oflag, mode_t mode=0, XrdPosixCallBack *cbP=0);
static int Close(int fildes);
static size_t Read(int fildes, void *buf, size_t nbyte);
static off_t Lseek(int fildes, off_t offset, int whence);
};
fReadXRootD::fReadXRootD()
{
m_pfd = 0;
}
fReadXRootD::~fReadXRootD()
{
this->closeFile();
}
bool fReadXRootD::isOpen()
{
return m_pfd != 0;
}
bool fReadXRootD::isEoF()
{
//xrd eof??
return false;
}
bool fReadXRootD::fileExists(std::string fName) const
{
int pfd = XrdPosixXrootd::Open(fName.c_str(), O_RDONLY);
if(pfd == 0) return false;
XrdPosixXrootd::Close(pfd);
return true;
}
void fReadXRootD::openFile(std::string fName)
{
if(this->isOpen()) this->closeFile();
m_pfd = XrdPosixXrootd::Open(fName.c_str(), O_RDONLY);
}
void fReadXRootD::closeFile()
{
if(m_pfd != 0) XrdPosixXrootd::Close(m_pfd);
m_pfd = 0;
}
void fReadXRootD::readData(char *buffer, unsigned int sizeBytes)
{
if (sizeBytes==0) return;
if(this->isOpen())
{
unsigned int totalRead=0,ntry=0;
while(sizeBytes > totalRead)
{
int ret = XrdPosixXrootd::Read(m_pfd,buffer,sizeBytes);
totalRead += ret; ++ntry;
if(ntry>5) {
std::stringstream mystream;
mystream << "Problem reading from the data file. "
<<"fReadXRootD::readData asked to read "<<sizeBytes
<<" bytes and managed to read only "<<totalRead
<<" bytes.";
EventStorage::ReadingIssue ci(ERS_HERE, mystream.str().c_str());
ers::warning(ci);
return;
}
}
}
}
int64_t fReadXRootD::getPosition()
{
if(this->isOpen()) return XrdPosixXrootd::Lseek(m_pfd, 0, SEEK_CUR);
return -1;
}
void fReadXRootD::setPosition(int64_t p)
{
if(this->isOpen()) XrdPosixXrootd::Lseek(m_pfd, (long long)p, SEEK_SET);
}
void fReadXRootD::setPositionFromEnd(int64_t p)
{
if(this->isOpen()) XrdPosixXrootd::Lseek(m_pfd, (long long)p, SEEK_END);
}
fRead * fReadXRootD::newReader() const
{
fReadXRootD * nfr = new fReadXRootD();
return (fRead *)nfr;
}
extern "C" {
fRead * fReadFactory()
{
fReadXRootD * nfr = new fReadXRootD();
return (fRead *)nfr;
}
}
/*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
*/
#ifndef FREADXROOTD_H
#define FREADXROOTD_H
#include "FaserEventStorage/fRead.h"
class fReadXRootD : public fRead
{
public:
fReadXRootD();
~fReadXRootD();
bool isOpen();
bool isEoF();
bool fileExists(std::string fName) const;
void openFile(std::string fName);
void closeFile();
void readData(char *buffer, unsigned int sizeBytes);
int64_t getPosition();
void setPosition(int64_t p);
void setPositionFromEnd(int64_t p);
fRead * newReader() const;
private:
int m_pfd; // current file
};
#endif
...@@ -16,11 +16,12 @@ atlas_add_library( FaserEventStorageLib ...@@ -16,11 +16,12 @@ atlas_add_library( FaserEventStorageLib
LINK_LIBRARIES ${CMAKE_DL_LIBS} ${Boost_LIBRARIES} ${TDAQ-COMMON_LIBRARIES} EventFormats ) LINK_LIBRARIES ${CMAKE_DL_LIBS} ${Boost_LIBRARIES} ${TDAQ-COMMON_LIBRARIES} EventFormats )
# atlas_add_library(fReadPlain # atlas_add_library(fReadPlain
# src/fReadPlain.h src/fReadPlain.cxx # src/fReadPlain.h src/fReadPlain.cxx
# PUBLIC_HEADERS src # NO_PUBLIC_HEADERS
# INCLUDE_DIRS ${Boost_INCLUDE_DIRS} ${TDAQ-COMMON_INCLUDE_DIRS} # PRIVATE_INCLUDE_DIRS ${Boost_INCLUDE_DIRS} ${TDAQ-COMMON_INCLUDE_DIRS}
# LINK_LIBRARIES ${Boost_LIBRARIES} ${TDAQ-COMMON_LIBRARIES} -rdynamic) # PRIVATE_LINK_LIBRARIES ${Boost_LIBRARIES} ${TDAQ-COMMON_LIBRARIES} -rdynamic)
# Install files from the package: # Install files from the package:
#atlas_install_python_modules( python/*.py POST_BUILD_CMD ${ATLAS_FLAKE8} ) #atlas_install_python_modules( python/*.py POST_BUILD_CMD ${ATLAS_FLAKE8} )
......
...@@ -10,40 +10,28 @@ DataReader * pickFaserDataReader(std::string fileName) { ...@@ -10,40 +10,28 @@ DataReader * pickFaserDataReader(std::string fileName) {
ERS_DEBUG(1,"pickFaserDataReader(file) with file name "<<fileName); ERS_DEBUG(1,"pickFaserDataReader(file) with file name "<<fileName);
std::vector<std::string> fReadLibs; std::vector<std::string> fReadLibs;
fReadLibs.push_back("fReadPlain"); // Just use plain
/*
// Control plugins libraries via file name prefixes. // Control plugins libraries via file name prefixes.
// Take the prefixes away from the name, in most cases anyway. // Take the prefixes away from the name, in most cases anyway.
if(fileName.find("rfio:")==0) {
fileName.erase(0,std::string("rfio:").size()); if(fileName.find("root:")==0) {
fReadLibs.push_back("fReadCastor");
} else if(fileName.find("dcache:")==0) {
fileName.erase(0,std::string("dcache:").size());
fReadLibs.push_back("fReaddCache");
} else if(fileName.find("dcap:")==0) {
// Leave the prefix in the file name in this case.
fReadLibs.push_back("fReaddCache");
} else if(fileName.find("root:")==0) {
// Leave the prefix in the file name in this case. // Leave the prefix in the file name in this case.
fReadLibs.push_back("fReadXRootD"); fReadLibs.push_back("fReadXRootD");
} else if(fileName.find("disk:")==0) {
fileName.erase(0,std::string("disk:").size());
fReadLibs.push_back("fReadPlain");
} else if(fileName.find("https:")==0) { } else if(fileName.find("https:")==0) {
// Leave the prefix in the file name in this case. // Leave the prefix in the file name in this case.
fReadLibs.push_back("fReadDavix"); fReadLibs.push_back("fReadDavix");
} else if(fileName.find("davs:")==0) { } else if(fileName.find("davs:")==0) {
// Leave the prefix in the file name in this case. // Leave the prefix in the file name in this case.
fReadLibs.push_back("fReadDavix"); fReadLibs.push_back("fReadDavix");
} else if(fileName.find("disk:")==0) {
fileName.erase(0,std::string("disk:").size());
fReadLibs.push_back("fReadPlain");
} else { // by defaul all will be tried } else { // by defaul all will be tried
fReadLibs.push_back("fReadPlain"); fReadLibs.push_back("fReadPlain");
fReadLibs.push_back("fReadCastor");
fReadLibs.push_back("fReadXRootD"); fReadLibs.push_back("fReadXRootD");
fReadLibs.push_back("fReadDavix"); fReadLibs.push_back("fReadDavix");
fReadLibs.push_back("fReaddCache");
} }
*/
ERS_DEBUG(2,"After parsing the file name is "<<fileName); ERS_DEBUG(2,"After parsing the file name is "<<fileName);
ERS_DEBUG(2,"Number of fRead plugins to try is "<<fReadLibs.size()); ERS_DEBUG(2,"Number of fRead plugins to try is "<<fReadLibs.size());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment