Commit 18c9a9ad authored by Volodymyr Yurchenko's avatar Volodymyr Yurchenko
Browse files

Refactor TJAlien: move websockets-related functions to TJAlienConnectionManager

parent 47b1d3cb
......@@ -25,4 +25,5 @@
#pragma link C++ class TJAlienCredentials;
#pragma link C++ class TJClientFile;
#pragma link C++ class TJAlienDNSResolver;
#pragma link C++ class TJAlienConnectionManager;
#endif
......@@ -15,6 +15,10 @@
#include <unistd.h>
#include <map>
#include <fstream>
#include <stdlib.h>
#include <string>
#include <signal.h>
#include <sstream>
#ifdef _DEBUG
#define DEBUGMSG(msg) ( (void) (std::cerr<<msg<<std::endl) )
......@@ -34,9 +38,20 @@
#include "TArrayI.h"
#include "THashList.h"
#include <TLockFile.h>
#include "TObjArray.h"
#include "TString.h"
#include "TJClientFile.h"
#include "TJAlienJDL.h"
#include "TJAlienCollection.h"
#include "TJAlienFile.h"
#include "TJAlienJob.h"
#include "TJAlienJobStatus.h"
#include "TJAlienJobStatusList.h"
#include "TJAlienResultRewriter.h"
#include "TJAlienSAXHandler.h"
#include "TJAlienResult.h"
#include <TLockFile.h>
#include "TJAlienConnectionManager.h"
#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
#include <json-c/json.h>
......@@ -49,25 +64,6 @@ struct json_object;
typedef char __signed;
typedef char int8_t;
#endif
//#include "picojson.h"
//#include "json.hpp"
//#include "rapidjson/document.h"
//#include <cstdint>
//#include <jsoncons/json.hpp>
//#include <tao/json.hpp>
#include <stdlib.h>
#include <string>
#include <signal.h>
#include <sys/stat.h>
#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
#include <libwebsockets.h>
#include "lws_config.h"
#else
struct lws_context;
struct lws_context_creation_info;
#endif
#if (SSLEAY_VERSION_NUMBER >= 0x0907000L)
#include <openssl/conf.h>
......@@ -81,10 +77,6 @@ struct lws_context_creation_info;
#endif
#define UNUSED(x) (void)(x)
#define DEFAULT_JCENTRAL_SERVER "alice-jcentral.cern.ch"
#include "TJClientFile.h"
#include "TJAlienCredentials.h"
using std::string;
......@@ -97,60 +89,20 @@ public:
private:
TString fPwd; // working directory
TString fHome; // home directory with alien:// prefix
int fWSPort; // websocket port
TString sUsercert; // location of user certificate
TString sUserkey; // location of user private key
static std::string readBuffer;
std::string homedir; // local home directory
std::string tmpdir; // tmp directory
const int default_WSport = 8097;
const std::string default_server = DEFAULT_JCENTRAL_SERVER;
Bool_t WriteTokenFile();
static size_t WriteCallback(void *contents, size_t size, size_t nmemb);
void CreateConnection();
void ConnectJBox();
void ConnectJCentral(TJAlienCredentialsObject c, string host = DEFAULT_JCENTRAL_SERVER);
void MakeWebsocketConnection(TJAlienCredentialsObject creds, string host, int WSPort);
// Format command to Json structure
json_object *CreateJsonCommand(TString *command, TList *options);
// Parse the result from Json structure
TJAlienResult *GetCommandResult(json_object *json_response);
// Libwebsockets
static int destroy_flag; // Flags to know connection status
static int connection_flag;
static int writeable_flag;
static int receive_flag;
struct lws_context *context; // Context contains all information about connection
/* struct lws_context_creation_info creation_info; // Info to create logical connection */
struct lws *wsi; // WebSocket Instance - real connection object, created basing on context
#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
static int ws_service_callback( // Callback to handle connection
struct lws *wsi,
enum lws_callback_reasons reason, void *user,
void *in, size_t len);
static int websocket_write_back(struct lws *wsi_in, const char *str, int str_size_in);
#endif
virtual TGridResult *OpenDataset(const char *lfn, const char *options = "");
const char* Whoami();
void clearFlags();
TJAlienCredentials creds;
TJAlienConnectionManager connection;
public:
TJAlien(const char *gridUrl, const char *uId=0, const char *passwd=0,
const char *options=0);
virtual ~TJAlien();
TJAlienResult *RunJsonCommand(TString *command, TList *options);
TGridResult *Command(const char * command, bool interactive = kFALSE, UInt_t stream = kOUTPUT);
Int_t GetExitCode(TJAlienResult *result, TObjString* &message);
......@@ -158,14 +110,12 @@ public:
virtual TGridResult *Query(const char *path, const char *pattern, const char *conditions = "", const char *options = "");
void Stdout(); // print the stdout of the last executed command
void Stderr(); // print the stderr of the last executed command
void Token(Option_t* options = "", bool force_restart = true);
//--- Redefinition of superclass methods
virtual Bool_t IsConnected() const { return connection_flag; }
virtual Bool_t IsConnected() const { return connection.IsConnected(); }
//--- Catalogue Interface
virtual TGridResult *Ls(const char *ldn = "", Option_t *options = "", Bool_t verbose = kFALSE);
......
// @(#)root/net:$Id$
// Author: Volodymyr Yurchenko 27/06/2019
#ifndef ROOT_TJAlienConnectionManager
#define ROOT_TJAlienConnectionManager
#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
#include <json-c/json.h>
#else
struct json_object;
#endif
#include <sys/stat.h>
#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
#include <libwebsockets.h>
#include "lws_config.h"
#else
struct lws_context;
struct lws_context_creation_info;
#endif
#include "TJAlienResult.h"
#include "TJAlienCredentials.h"
#include "TJAlienDNSResolver.h"
#include "TError.h"
#include "TGrid.h"
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <fstream>
#define DEFAULT_JCENTRAL_SERVER "alice-jcentral.cern.ch"
#define UNUSED(x) (void)(x)
class TJAlienConnectionManager {
private:
std::string homedir; // local home directory
std::string tmpdir; // tmp directory
const int default_WSport = 8097;
const std::string default_server = DEFAULT_JCENTRAL_SERVER;
int fWSPort; // websocket port
TString sUsercert; // location of user certificate
TString sUserkey; // location of user private key
// Libwebsockets
static int destroy_flag; // Flags to know connection status
static int connection_flag;
static int writeable_flag;
static int receive_flag;
struct lws_context *context; // Context contains all information about connection
/* struct lws_context_creation_info creation_info; // Info to create logical connection */
struct lws *wsi; // WebSocket Instance - real connection object, created basing on context
#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
static int ws_service_callback( // Callback to handle connection
struct lws *wsi,
enum lws_callback_reasons reason, void *user,
void *in, size_t len);
static int websocket_write_back(struct lws *wsi_in, const char *str, int str_size_in);
#endif
static size_t WriteCallback(void *contents, size_t size, size_t nmemb);
void clearFlags();
TJAlienCredentials creds;
public:
TJAlienConnectionManager();
~TJAlienConnectionManager();
void CreateConnection();
void ConnectJBox();
void ConnectJCentral(TJAlienCredentialsObject c, string host = DEFAULT_JCENTRAL_SERVER);
void MakeWebsocketConnection(TJAlienCredentialsObject creds, string host, int WSPort);
void ForceRestart();
TJAlienResult *RunJsonCommand(TString *command, TList *options);
// Parse the result from Json structure
TJAlienResult *GetCommandResult(json_object *json_response);
// Format command to Json structure
json_object *CreateJsonCommand(TString *command, TList *options);
virtual Bool_t IsConnected() const { return connection_flag; }
static std::string readBuffer;
ClassDef(TJAlienConnectionManager, 0)
};
#endif
This diff is collapsed.
// @(#)root/net:$Id$
// Author: Volodymyr Yurchenko 27/06/2019
#include "TJAlienConnectionManager.h"
ClassImp(TJAlienConnectionManager)
using std::string;
int TJAlienConnectionManager::destroy_flag = 0;
int TJAlienConnectionManager::connection_flag = 0;
int TJAlienConnectionManager::writeable_flag = 0;
int TJAlienConnectionManager::receive_flag = 0;
string TJAlienConnectionManager::readBuffer = "";
TJAlienConnectionManager::TJAlienConnectionManager() {
creds.loadCredentials();
}
TJAlienConnectionManager::~TJAlienConnectionManager() {
if (context)
lws_context_destroy(context);
}
//______________________________________________________________________________
void TJAlienConnectionManager::CreateConnection()
{
TJAlienCredentialsObject co;
TJAlienDNSResolver dns_jcentral(default_server, default_WSport);
string current_host;
clearFlags();
while (creds.count() > 0) {
if (creds.has(cJOB_TOKEN)) {
co = creds.get(cJOB_TOKEN);
} else if (creds.has(cJBOX_TOKEN)) {
co = creds.get(cJBOX_TOKEN);
} else if (creds.has(cFULL_GRID_CERT)) {
co = creds.get(cFULL_GRID_CERT);
if (co.password.empty())
co.readPassword();
} else {
Error("TJAlienConnectionManager", "Failed to get any credentials");
return;
}
if (co.kind == cJBOX_TOKEN || co.kind == cJOB_TOKEN) {
ConnectJBox();
}
if (connection_flag) {
Info("TJAlienConnectionManager", "Successfully connected to JBox");
co.password = "";
return;
}
for (int i = 0; i < dns_jcentral.lenght(); i++)
{
current_host = dns_jcentral.get_next_host();
ConnectJCentral(co, current_host);
if (connection_flag)
{
// If connected directly to JCentral, immediately ask for token
Info("TJAlienConnectionManager", "Successfully connected to %s", current_host.c_str());
co.password = "";
return;
}
else
{
if (gDebug > 0) {
Error("TJAlienConnectionManager", "Failed to connect to %s - retrying...", current_host.c_str());
}
sleep(1);
}
}
creds.removeCredentials(co.kind);
}
Error("TJAlienConnectionManager", "Failed to connect to any server! Giving up");
}
void TJAlienConnectionManager::clearFlags()
{
destroy_flag = 0;
connection_flag = 0;
writeable_flag = 0;
receive_flag = 0;
readBuffer = "";
}
//______________________________________________________________________________
void TJAlienConnectionManager::ConnectJBox()
{
if(!creds.has(cJBOX_TOKEN)) {
return;
}
TJAlienCredentialsObject c = creds.get(cJBOX_TOKEN);
TJClientFile jcf;
if(jcf.isValid) {
MakeWebsocketConnection(c, (string)jcf.fHost, jcf.fWSPort);
} else {
if(gDebug >= 1) Info("TJAlienConnectionManager", "The JClient file is not valid - not connecting to JBox!");
}
}
void TJAlienConnectionManager::ConnectJCentral(TJAlienCredentialsObject c, string host)
{
if (gDebug > 1) Info("TJAlienConnectionManager", "Trying to connect to server %s", host.c_str());
MakeWebsocketConnection(c, host, default_WSport);
}
//______________________________________________________________________________
void TJAlienConnectionManager::MakeWebsocketConnection(TJAlienCredentialsObject creds, string host, int WSPort)
{
// Create the connection to JBox using the parameters read from the token
// returns true if the connection was established
if(gDebug > 0) {
Info("TJAlienConnectionManager", "Connecting to Server %s:%d", host.c_str(), WSPort);
Info("TJAlienConnectionManager", "Using cert %s and %s", creds.certpath.c_str(), creds.keypath.c_str());
}
// Use this for debugging
//lws_set_log_level(LLL_EXT | LLL_USER | LLL_PARSER | LLL_INFO | LLL_ERR | LLL_NOTICE, nullptr);
lws_set_log_level(gDebug, nullptr);
// Reset context variables
context = nullptr;
wsi = nullptr;
clearFlags();
// libwebsockets variables
struct lws_client_connect_info connect_info;
struct lws_context_creation_info creation_info; // Info to create logical connection
memset(&connect_info, 0, sizeof connect_info );
memset(&creation_info, 0, sizeof creation_info);
// SSL options
int use_ssl = LCCSCF_USE_SSL | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; // SSL, no selfsigned, don't check server hostname
// Define protocol
static const struct lws_protocols protocols[] = {
{
"jalien-protocol",
ws_service_callback,
0, 0, 1, nullptr
},
{ nullptr, nullptr, 0, 0, 0, nullptr } /* end */
};
// Create the websockets context. This tracks open connections and
// knows how to route any traffic and which protocol version to use,
// and if each connection is client or server side.
creation_info.port = CONTEXT_PORT_NO_LISTEN; // NO_LISTEN - we are client
creation_info.iface = nullptr;
creation_info.protocols = protocols;
creation_info.extensions = nullptr;
creation_info.gid = -1;
creation_info.uid = -1;
creation_info.options = 0;
creation_info.vhost_name = "tjalien-root";
creation_info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
// TODO (nhardi): switch to explicit key/cert contents
// see the context_creation_info
creation_info.client_ssl_cert_filepath = creds.certpath.c_str();
creation_info.client_ssl_private_key_filepath = creds.keypath.c_str();
//creation_info.client_ssl_cert_mem = creds.getCertificate().c_str();
//creation_info.client_ssl_cert_mem_len = creds.getCertificate().length();
// TODO (yuw): switch to client_ssl_private_key_password starting from libwebsockets 3.1.0 and onward
creation_info.ssl_private_key_password = creds.password.c_str();
// Create context - only logical connection, no real connection yet
context = lws_create_context(&creation_info);
if (context == nullptr) {
Error("TJAlienConnectionManager", "Context creation failure");
destroy_flag = 1;
return;
}
if (gDebug > 1)
{
Info("TJAlienConnectionManager", "context created");
}
connect_info.address = host.c_str();
connect_info.port = WSPort;
connect_info.path = "/websocket/json";
connect_info.context = context;
connect_info.ssl_connection = use_ssl;
connect_info.host = host.c_str();
connect_info.origin = host.c_str();
connect_info.ietf_version_or_minus_one = -1;
connect_info.protocol = protocols[0].name;
connect_info.pwsi = &wsi;
// Create wsi - WebSocket Instance
lws_client_connect_via_info(&connect_info);
if (wsi == nullptr) {
if(gDebug > 0) {
Error("TJAlienConnectionManager", "WebSocket instance creation error");
}
return;
}
if (gDebug > 1)
Info("TJAlienConnectionManager", "WebSocket instance creation successfull");
// Wait for server responce "connection established"
while (!connection_flag)
{
lws_service(context, 500);
if (destroy_flag)
{
if (gDebug > 1) Error("TJAlienConnectionManager", "Websocket connection failure");
return;
}
}
creation_info.ssl_private_key_password = "";
return;
}
//_____________________________________________________________________________
size_t TJAlienConnectionManager::WriteCallback(void *contents, size_t size, size_t nmemb)
{
size_t realsize = size * nmemb;
readBuffer.append((const char*)contents, realsize);
return realsize;
}
//_____________________________________________________________________________
int TJAlienConnectionManager::websocket_write_back(struct lws *wsi_in, const char *str, int str_size_in)
{
if (str == nullptr || wsi_in == nullptr)
return -1;
int n;
int len;
char *out = nullptr;
if (str_size_in < 1)
len = strlen(str);
else
len = str_size_in;
out = (char *)malloc(sizeof(char)*(LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING));
// setup the buffer
memcpy (out + LWS_SEND_BUFFER_PRE_PADDING, str, len );
// write out
n = lws_write(wsi_in, (unsigned char*)out + LWS_SEND_BUFFER_PRE_PADDING, len, LWS_WRITE_TEXT);
// free the buffer
free(out);
return n;
}
//_____________________________________________________________________________
int TJAlienConnectionManager::ws_service_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
// Websocket callback handler
UNUSED(len);
switch (reason)
{
case LWS_CALLBACK_CLIENT_ESTABLISHED:
{
if (gDebug > 1) printf("[Websocket Callback] Connect with server success\n");
connection_flag = 1;
break;
}
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
{
if (gDebug > 1) printf("[Websocket Callback] Connect with server error\n");
destroy_flag = 1;
connection_flag = 0;
if (!lws_get_context(wsi))
lws_context_destroy(lws_get_context(wsi));
wsi = nullptr;
break;
}
case LWS_CALLBACK_CLOSED:
{
if (gDebug > 1) printf("[Websocket Callback] LWS_CALLBACK_CLOSED\n");
destroy_flag = 1;
connection_flag = 0;
if (!lws_get_context(wsi))
lws_context_destroy(lws_get_context(wsi));
wsi = nullptr;
break;
}
case LWS_CALLBACK_CLIENT_RECEIVE:
{
if (gDebug > 100)
{
printf("[Websocket Callback] Client received:%s\n", (char *)in);
printf("[Websocket Callback]: %4d (rpp %5d, last %d)\n",
(int)len, (int)lws_remaining_packet_payload(wsi),
lws_is_final_fragment(wsi));
}
readBuffer.append((char*)in);
if (lws_is_final_fragment(wsi) != 0)
receive_flag = 1;
len = 0;
break;
}
case LWS_CALLBACK_CLIENT_WRITEABLE:
{
if (gDebug > 1) printf("[Websocket Callback] On writeable is called\n");
writeable_flag = 1;
break;
}
#if defined(LWS_OPENSSL_SUPPORT)
case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
{
if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS is called\n");
//SSL_CTX_set_verify((SSL_CTX*)user, SSL_VERIFY_PEER, nullptr);
//SSL_set_verify(wsi->ssl, SSL_VERIFY_PEER, nullptr);
std::string location = "/etc/grid-security/certificates/";
struct stat info;
if (stat("/cvmfs/alice.cern.ch", &info) == 0)
location = "/cvmfs/alice.cern.ch" + location;
std::string capath = std::getenv("X509_CERT_DIR") ? : location;
size_t pos = 0;
std::string token;
// If capath contans two paths separated by ":"
while ((pos = capath.find(":")) != std::string::npos) {
token = capath.substr(0, pos);
if (!SSL_CTX_load_verify_locations((SSL_CTX*)user, nullptr, token.c_str()))
{
if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS failed\n");
return 1;
}
capath.erase(0, pos + 1);
}
// If capath is a single path
if (capath.length() != 0)
if (!SSL_CTX_load_verify_locations((SSL_CTX*)user, nullptr, capath.c_str()))
{
if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS failed\n");
return 1;
}
//int preverify_ok;
//SSL *ssl = X509_STORE_CTX_get_ex_data((SSL_CTX*)user, SSL_get_ex_data_X509_STORE_CTX_idx());
//user->protocols[0].callback(wsi, LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION, (SSL_CTX*)user, ssl, preverify_ok);
break;
}
case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
{
if (gDebug > 1)
{
printf("[Websocket Callback] LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION is called\n");
//printf("[Websocket Callback] x509_ctx: %s\n", ((SSL_CTX*)user)->client_CA);
//printf("[Websocket Callback] ssl: %s\n", in);
//printf("[Websocket Callback] SSL_CTX_get_client_CA_list: %s\n", SSL_CTX_get_client_CA_list((SSL_CTX*)user));
}