Skip to content
Snippets Groups Projects

Reconnection & TJAlienFile fields

Merged Volodymyr Yurchenko requested to merge vyurchen/jalien-root:master into master
4 files
+ 146
138
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 133
135
@@ -34,81 +34,14 @@ TJAlien::TJAlien (const char* gridUrl, const char* uId, const char* passwd,
{
if (gDebug > 1) Info("TJAlien", "Connecting to JBox");
UNUSED(gridUrl);
UNUSED(uId);
UNUSED(passwd);
UNUSED(options);
// Clear flags
destroy_flag = 0;
connection_flag = 0;
writeable_flag = 0;
receive_flag = 0;
readBuffer = "";
fGrid = "alien";
gGrid = this;
// Load certificate
if (getenv("HOME") == NULL)
homedir = "~";
else
homedir = getenv("HOME");
if (getenv("TMPDIR") == NULL)
tmpdir = P_tmpdir;
else
tmpdir = getenv("TMPDIR");
if (!uId[0]) {
if (gSystem->Getenv("alien_API_USER")) {
fUser = gSystem->Getenv("alien_API_USER");
} else {
if (gSystem->Getenv("LOGNAME")) {
// we try the LOGNAME env
fUser = gSystem->Getenv("LOGNAME");
} else {
// we set the USER env
fUser = gSystem->Getenv("USER");
}
}
} else {
fUser = uId;
}
std::string tokencert = tmpdir + "/tokencert.pem";
std::string tokenkey = tmpdir + "/tokenkey.pem";
std::string tokencertpath = std::getenv("JALIEN_TOKEN_CERT") ? : tokencert;
std::string tokenkeypath = std::getenv("JALIEN_TOKEN_KEY") ? : tokenkey;
FILE *tokencertfile = NULL;
FILE *tokenkeyfile = NULL;
// Try to connect with token certificate if it exists
if ((tokencertfile = fopen(tokencertpath.c_str(), "r")) &&
(tokenkeyfile = fopen(tokenkeypath.c_str(), "r")) )
{
fclose(tokencertfile);
fclose(tokenkeyfile);
ConnectJBox(tokencertpath, tokenkeypath);
}
// In not succeded, establish a connection with full user grid certificate
if (!connection_flag)
{
std::string usercert = sUsercert.Data()[0] != '\0' ? sUsercert.Data() : homedir + "/.globus/usercert.pem";
std::string userkey = sUserkey.Data()[0] != '\0' ? sUserkey.Data() : homedir + "/.globus/userkey.pem";
std::string usercertpath = std::getenv("X509_USER_CERT") ? : usercert;
std::string userkeypath = std::getenv("X509_USER_KEY") ? : userkey;
ConnectJBox(usercertpath, userkeypath);
}
// If connected directly to JCentral, immediately ask for token
if (connection_flag && fHost == default_server)
Token("", false);
if(connection_flag) {
fUser = Whoami();
}
CreateConnection();
}
@@ -121,6 +54,75 @@ TJAlien::~TJAlien()
lws_context_destroy(context);
}
//______________________________________________________________________________
void TJAlien::CreateConnection()
{
for (int i = 0; i < 5;)
{
// Clear flags
destroy_flag = 0;
connection_flag = 0;
writeable_flag = 0;
receive_flag = 0;
readBuffer = "";
// Load certificate
if (getenv("HOME") == NULL)
homedir = "~";
else
homedir = getenv("HOME");
if (getenv("TMPDIR") == NULL)
tmpdir = P_tmpdir;
else
tmpdir = getenv("TMPDIR");
std::string tokencert = tmpdir + "/tokencert.pem";
std::string tokenkey = tmpdir + "/tokenkey.pem";
std::string tokencertpath = std::getenv("JALIEN_TOKEN_CERT") ? : tokencert;
std::string tokenkeypath = std::getenv("JALIEN_TOKEN_KEY") ? : tokenkey;
FILE *tokencertfile = NULL;
FILE *tokenkeyfile = NULL;
// Try to connect with token certificate if it exists
if ((tokencertfile = fopen(tokencertpath.c_str(), "r")) &&
(tokenkeyfile = fopen(tokenkeypath.c_str(), "r")) )
{
fclose(tokencertfile);
fclose(tokenkeyfile);
ConnectJBox(tokencertpath, tokenkeypath);
}
// In not succeded, establish a connection with full user grid certificate
if (!connection_flag)
{
std::string usercert = sUsercert.Data()[0] != '\0' ? sUsercert.Data() : homedir + "/.globus/usercert.pem";
std::string userkey = sUserkey.Data()[0] != '\0' ? sUserkey.Data() : homedir + "/.globus/userkey.pem";
std::string usercertpath = std::getenv("X509_USER_CERT") ? : usercert;
std::string userkeypath = std::getenv("X509_USER_KEY") ? : userkey;
ConnectJBox(usercertpath, userkeypath);
}
if (connection_flag)
{
// If connected directly to JCentral, immediately ask for token
if (fHost == default_server)
Token("", false);
fUser = Whoami();
return;
}
else
{
Error("TJAlien", "Failed to connect to any server! Retrying in %d seconds...", ++i);
sleep(i);
}
}
Error("TJAlien", "Failed to connect to any server! Giving up");
}
//______________________________________________________________________________
void TJAlien::ConnectJBox(std::string certpath, std::string keypath)
{
@@ -146,6 +148,7 @@ void TJAlien::ConnectJBox(std::string certpath, std::string keypath)
fHost = default_server;
fPort = 8098;
fWSPort = 8097;
fUser = "";
fPw = "";
MakeWebsocketConnection(certpath, keypath);
}
@@ -301,6 +304,11 @@ TJAlienResult *TJAlien::RunJsonCommand(TString *command, TList *opt)
Info("TJAlien", "JSON command to be done: %s", json_object_to_json_string(jsonCommand) );
TJAlien::readBuffer = "";
if (!connection_flag)
{
Error("RunJsonCommand", "Connection is broken!");
return NULL;
}
websocket_write_back(wsi, json_object_to_json_string(jsonCommand), -1);
lws_callback_on_writable(wsi);
while (!receive_flag && !destroy_flag)
@@ -309,46 +317,12 @@ TJAlienResult *TJAlien::RunJsonCommand(TString *command, TList *opt)
// If connection is broken or closed by server, try to reconnect
if (destroy_flag)
{
Error("Command", "Connection is broken, retrying...");
// Load certificate
if (getenv("HOME") == NULL)
homedir = "~";
else
homedir = getenv("HOME");
std::string tokencert = tmpdir + "/tokencert.pem";
std::string tokenkey = tmpdir + "/tokenkey.pem";
std::string tokencertpath = std::getenv("JALIEN_TOKEN_CERT") ? : tokencert;
std::string tokenkeypath = std::getenv("JALIEN_TOKEN_KEY") ? : tokenkey;
FILE *tokencertfile = NULL;
FILE *tokenkeyfile = NULL;
// Try to connect with token certificate if it exists
if ((tokencertfile = fopen(tokencertpath.c_str(), "r")) &&
(tokenkeyfile = fopen(tokenkeypath.c_str(), "r")) )
{
fclose(tokencertfile);
fclose(tokenkeyfile);
ConnectJBox(tokencertpath, tokenkeypath);
if (connection_flag)
{
return RunJsonCommand(command, opt);
}
}
// Establish a connection with full user grid certificate
std::string usercert = sUsercert.Data()[0] != '\0' ? sUsercert.Data() : homedir + "/.globus/usercert.pem";
std::string userkey = sUserkey.Data()[0] != '\0' ? sUserkey.Data() : homedir + "/.globus/userkey.pem";
std::string usercertpath = std::getenv("X509_USER_CERT") ? : usercert;
std::string userkeypath = std::getenv("X509_USER_KEY") ? : userkey;
ConnectJBox(usercertpath, userkeypath);
Error("RunJsonCommand", "Connection is broken, retrying...");
CreateConnection();
sleep(1);
return RunJsonCommand(command, opt);
}
receive_flag = 0;
json_object_put(jsonCommand);
@@ -551,9 +525,12 @@ void TJAlien::Stdout()
for (i = 0; i < arraylen; i++)
{
json_object *jvalue = json_object_array_get_idx(js_results, i);
TString message_stream = json_object_get_string(json_object_object_get(jvalue, "message"));
if (message_stream != NULL && message_stream.Length() != 0)
printf("%s\n", message_stream.Data());
json_object_object_foreach(jvalue, key, val)
{
printf("%s\t", json_object_get_string(val));
UNUSED(key);
}
printf("\n");
}
}
}
@@ -653,10 +630,15 @@ TGridResult *TJAlien::Command(const char *command, bool interactive, UInt_t stre
delete tokens;
delete sCmd;
TObjString p("user");
TObjString *sUserMetadata = result->GetMetaData(&p);
if(sUserMetadata) {
fUser = sUserMetadata->GetString().Data();
if (result)
{
// Extract the username immediately after each command,
// since it could change
TObjString p("user");
TObjString *sUserMetadata = result->GetMetaData(&p);
if (sUserMetadata) {
fUser = sUserMetadata->GetString().Data();
}
}
return result;
@@ -669,6 +651,7 @@ void TJAlien::Token(Option_t* options, bool force_restart)
{
// Immediately break previous connection and start a new one with user grid vertificate
destroy_flag = 1;
connection_flag = 0;
if (context)
lws_context_destroy(context);
@@ -677,6 +660,11 @@ void TJAlien::Token(Option_t* options, bool force_restart)
std::string usercertpath = std::getenv("X509_USER_CERT") ? : usercert.c_str();
std::string userkeypath = std::getenv("X509_USER_KEY") ? : userkey.c_str();
ConnectJBox(usercertpath, userkeypath);
if (!connection_flag)
{
Info("TJAlien", "TJAlien::Token failed to establish the connection to the server");
return;
}
}
TString sCmd("token");
@@ -1657,11 +1645,18 @@ int TJAlien::ws_service_callback(struct lws *wsi, enum lws_callback_reasons reas
case LWS_CALLBACK_CLIENT_RECEIVE:
{
if (gDebug > 100) printf("[Websocket Callback] Client received:%s\n", (char *)in);
if (gDebug > 100)
{
printf("[Websocket Callback] Client received:%s\n", (char *)in);
printf("[Websocket Callback]: %4d (rpp %5d, last %d)\n",
(int)len, (int)lws_remaining_packet_payload(wsi),
lws_is_final_fragment(wsi));
}
readBuffer.append((char*)in);
if (len < 4096) {
if (lws_is_final_fragment(wsi) != 0)
receive_flag = 1;
}
len = 0;
break;
}
@@ -1677,13 +1672,13 @@ int TJAlien::ws_service_callback(struct lws *wsi, enum lws_callback_reasons reas
case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
{
if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS is called\n");
std::string homedir; // local home directory
if (getenv("HOME") == NULL)
homedir = "~";
else
homedir = getenv("HOME");
//SSL_CTX_set_verify((SSL_CTX*)user, SSL_VERIFY_PEER, NULL);
//SSL_set_verify(wsi->ssl, SSL_VERIFY_PEER, NULL);
@@ -1695,31 +1690,31 @@ int TJAlien::ws_service_callback(struct lws *wsi, enum lws_callback_reasons reas
// If capath contans two paths separated by ":"
while ((pos = capath.find(":")) != std::string::npos) {
token = capath.substr(0, pos);
if (!SSL_CTX_load_verify_locations((SSL_CTX*)user, NULL, token.c_str()))
if (!SSL_CTX_load_verify_locations((SSL_CTX*)user, NULL, token.c_str()))
{
if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS failed\n");
return 1;
}
capath.erase(0, pos + 1);
capath.erase(0, pos + 1);
}
// If capath is a single path
if (capath.length() != 0)
if (!SSL_CTX_load_verify_locations((SSL_CTX*)user, NULL, capath.c_str()))
if (!SSL_CTX_load_verify_locations((SSL_CTX*)user, NULL, capath.c_str()))
{
if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS failed\n");
return 1;
}
//int preverify_ok;
//int preverify_ok;
//SSL *ssl = X509_STORE_CTX_get_ex_data((SSL_CTX*)user, SSL_get_ex_data_X509_STORE_CTX_idx());
//user->protocols[0].callback(wsi, LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION, (SSL_CTX*)user, ssl, preverify_ok);
break;
}
case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
{
if (gDebug > 1)
if (gDebug > 1)
{
printf("[Websocket Callback] LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION is called\n");
printf("[Websocket Callback] x509_ctx: %s\n", ((SSL_CTX*)user)->client_CA);
@@ -1729,7 +1724,7 @@ int TJAlien::ws_service_callback(struct lws *wsi, enum lws_callback_reasons reas
break;
}
#endif
default:
@@ -1738,16 +1733,19 @@ int TJAlien::ws_service_callback(struct lws *wsi, enum lws_callback_reasons reas
return 0;
}
//______________________________________________________________________________
const char* TJAlien::Whoami() {
TJAlienResult* r = (TJAlienResult*)Command("whoami");
const char *username;
if(r && (username = r->GetKey(0, "message"))) {
return username;
} else {
Error("TJAlien", "Unable to determine username.");
return "";
}
TJAlienResult* r = (TJAlienResult*)Command("whoami");
const char *username;
if (r && (username = r->GetKey(0, "message")))
{
return username;
}
else
{
Error("TJAlien", "Unable to determine username.");
return "";
}
}
//______________________________________________________________________________
Loading