Commit 6cb44830 authored by Volodymyr Yurchenko's avatar Volodymyr Yurchenko
Browse files

Fix the reconnection and make WS detect that the server is gone

Let TJAlien decide if it should reconnect - remove recursive reconnect
from the connection manager.
Poll for connection status in IsConnected().
Use the right callback reason for the connection error event.
parent 1ae675d8
...@@ -84,7 +84,7 @@ public: ...@@ -84,7 +84,7 @@ public:
// Format command to Json structure // Format command to Json structure
json_object *CreateJsonCommand(TString *command, TList *options); json_object *CreateJsonCommand(TString *command, TList *options);
virtual Bool_t IsConnected() const { return connection_flag; } virtual Bool_t IsConnected() const;
ClassDef(TJAlienConnectionManager, 0) ClassDef(TJAlienConnectionManager, 0)
}; };
......
...@@ -177,24 +177,18 @@ TGridResult *TJAlien::Command(const char *command, bool interactive, UInt_t stre ...@@ -177,24 +177,18 @@ TGridResult *TJAlien::Command(const char *command, bool interactive, UInt_t stre
if (interactive) Info("Command", "Received full command =\"%s\"", command); if (interactive) Info("Command", "Received full command =\"%s\"", command);
UNUSED(stream); UNUSED(stream);
TString *sCmd = new TString(command);
TObjArray *tokens = sCmd->Tokenize(" ");
TJAlienResult *result; TJAlienResult *result;
std::map<std::string, TString> metadata; std::map<std::string, TString> metadata;
if (tokens->GetEntries() == 1) TString *sCmd = new TString(command);
{ TObjArray *tokens = sCmd->Tokenize(" ");
if (interactive) Info("Command", "Received only command"); TObjString *sObjCommand = (TObjString *) tokens->At(0);
result = (TJAlienResult*) connection.RunJsonCommand(sCmd, 0, &metadata, &readBuffer); TString *sCommand = new TString(sObjCommand->GetString()); // Bare command
if (result) TList *options = new TList();
TJAlienResultRewriter().Rewrite(sCmd->Data(), result);
}
else
{
TList *options = new TList();
TObjString *sCommand = (TObjString *) tokens->At(0);
if (interactive) Info("Command", "Command = \"%s\"", sCommand->GetString().Data()); // Parse command options
if (tokens->GetEntries() > 1) {
if (interactive) Info("Command", "Command = \"%s\"", sCommand->Data());
TString opt; TString opt;
bool append = false; bool append = false;
for (int i = 1; i < tokens->GetEntries(); i++) for (int i = 1; i < tokens->GetEntries(); i++)
...@@ -234,31 +228,43 @@ TGridResult *TJAlien::Command(const char *command, bool interactive, UInt_t stre ...@@ -234,31 +228,43 @@ TGridResult *TJAlien::Command(const char *command, bool interactive, UInt_t stre
if (interactive) Info("Command", "Option = \"%s\"", opt.Data()); if (interactive) Info("Command", "Option = \"%s\"", opt.Data());
options->Add(new TObjString(opt)); options->Add(new TObjString(opt));
} }
}
TString *s = new TString(sCommand->GetString()); // Execute the command
result = (TJAlienResult*) connection.RunJsonCommand(s, options, &metadata, &readBuffer); int retries_left = 3;
if (result) while (retries_left > 0) {
TJAlienResultRewriter().Rewrite(sCommand->GetString().Data(), result); result = (TJAlienResult*) connection.RunJsonCommand(sCommand, options, &metadata, &readBuffer);
if (result) {
if (interactive) TJAlienResultRewriter().Rewrite(sCmd->Data(), result);
{ break;
Stdout();
Stderr();
} }
delete s; if (--retries_left == 0)
Error("Command", "Failed to execute the command");
else {
Error("Command", "Trying to reconnect in 1 sec");
sleep(1);
Connect();
}
}
delete options; if (interactive) {
Stdout();
Stderr();
} }
delete sObjCommand;
delete sCommand;
delete options;
delete tokens; delete tokens;
delete sCmd; delete sCmd;
fHost = metadata["fHost"]; // Set metadata
fPort = metadata["fPort"].Atoi(); fHost = metadata["fHost"];
if (result) { fPort = metadata["fPort"].Atoi();
fUser = metadata["fUser"]; if (result) {
fPwd = metadata["fPwd"]; fUser = metadata["fUser"];
} fPwd = metadata["fPwd"];
}
return result; return result;
} }
......
...@@ -318,9 +318,9 @@ int TJAlienConnectionManager::ws_service_callback(struct lws *wsi, enum lws_call ...@@ -318,9 +318,9 @@ int TJAlienConnectionManager::ws_service_callback(struct lws *wsi, enum lws_call
break; break;
} }
case LWS_CALLBACK_CLOSED: case LWS_CALLBACK_CLIENT_CLOSED:
{ {
if (gDebug > 1) printf("[Websocket Callback] LWS_CALLBACK_CLOSED\n"); if (gDebug > 1) printf("[Websocket Callback] LWS_CALLBACK_CLIENT_CLOSED\n");
destroy_flag = 1; destroy_flag = 1;
connection_flag = 0; connection_flag = 0;
if (!lws_get_context(wsi)) if (!lws_get_context(wsi))
...@@ -359,9 +359,6 @@ int TJAlienConnectionManager::ws_service_callback(struct lws *wsi, enum lws_call ...@@ -359,9 +359,6 @@ int TJAlienConnectionManager::ws_service_callback(struct lws *wsi, enum lws_call
{ {
if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS is called\n"); if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS is called\n");
//SSL_CTX_set_verify((SSL_CTX*)user, SSL_VERIFY_PEER, nullptr);
//SSL_set_verify(wsi->ssl, SSL_VERIFY_PEER, nullptr);
std::string location = "/etc/grid-security/certificates/"; std::string location = "/etc/grid-security/certificates/";
struct stat info; struct stat info;
if (stat("/cvmfs/alice.cern.ch", &info) == 0) if (stat("/cvmfs/alice.cern.ch", &info) == 0)
...@@ -390,9 +387,6 @@ int TJAlienConnectionManager::ws_service_callback(struct lws *wsi, enum lws_call ...@@ -390,9 +387,6 @@ int TJAlienConnectionManager::ws_service_callback(struct lws *wsi, enum lws_call
return 1; return 1;
} }
//int preverify_ok;
//SSL *ssl = X509_STORE_CTX_get_ex_data((SSL_CTX*)user, SSL_get_ex_data_X509_STORE_CTX_idx());
//user->protocols[0].callback(wsi, LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION, (SSL_CTX*)user, ssl, preverify_ok);
break; break;
} }
...@@ -401,9 +395,6 @@ int TJAlienConnectionManager::ws_service_callback(struct lws *wsi, enum lws_call ...@@ -401,9 +395,6 @@ int TJAlienConnectionManager::ws_service_callback(struct lws *wsi, enum lws_call
if (gDebug > 1) if (gDebug > 1)
{ {
printf("[Websocket Callback] LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION is called\n"); printf("[Websocket Callback] LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION is called\n");
//printf("[Websocket Callback] x509_ctx: %s\n", ((SSL_CTX*)user)->client_CA);
//printf("[Websocket Callback] ssl: %s\n", in);
//printf("[Websocket Callback] SSL_CTX_get_client_CA_list: %s\n", SSL_CTX_get_client_CA_list((SSL_CTX*)user));
} }
break; break;
...@@ -500,12 +491,10 @@ TJAlienResult *TJAlienConnectionManager::RunJsonCommand(TString *command, TList ...@@ -500,12 +491,10 @@ TJAlienResult *TJAlienConnectionManager::RunJsonCommand(TString *command, TList
lws_service(context, 500); lws_service(context, 500);
// If connection is broken or closed by server, try to reconnect // If connection is broken or closed by server, try to reconnect
if (destroy_flag) // The connection is broken or closed by server, notify TJAlien it has to reconnect
{ if (destroy_flag) {
Error("TJAlienConnectionManager", "Connection is broken, retrying..."); Error("TJAlienConnectionManager", "Connection is broken");
CreateConnection(); return nullptr;
sleep(1);
return RunJsonCommand(command, opt);
} }
receive_flag = 0; receive_flag = 0;
...@@ -590,3 +579,10 @@ TJAlienResult* TJAlienConnectionManager::GetCommandResult(json_object *json_resp ...@@ -590,3 +579,10 @@ TJAlienResult* TJAlienConnectionManager::GetCommandResult(json_object *json_resp
} }
return gridResult; return gridResult;
} }
//______________________________________________________________________________
Bool_t TJAlienConnectionManager::IsConnected() const {
// Poll the connection to get it's status
lws_service(context,0);
return connection_flag;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment