// @(#)root/net:$Id$ // Author: Volodymyr Yurchenko 27/06/2019 #include "TJAlienConnectionManager.h" #include ClassImp(TJAlienConnectionManager) int TJAlienConnectionManager::destroy_flag = 0; int TJAlienConnectionManager::connection_flag = 0; int TJAlienConnectionManager::writeable_flag = 0; int TJAlienConnectionManager::receive_flag = 0; std::string TJAlienConnectionManager::readBuffer = ""; TJAlienConnectionManager::TJAlienConnectionManager() { creds.loadCredentials(); } TJAlienConnectionManager::~TJAlienConnectionManager() { if (context) lws_context_destroy(context); } //______________________________________________________________________________ int TJAlienConnectionManager::CreateConnection() { TJAlienCredentialsObject co; TJAlienDNSResolver dns_jcentral(default_server, default_WSport); std::string current_host; clearFlags(); while (creds.count() > 0) { if (creds.has(cJOB_TOKEN)) { co = creds.get(cJOB_TOKEN); } else if (creds.has(cJBOX_TOKEN)) { co = creds.get(cJBOX_TOKEN); } else if (creds.has(cFULL_GRID_CERT)) { co = creds.get(cFULL_GRID_CERT); if (co.password.empty()) co.readPassword(); } else { Error("TJAlienConnectionManager", "Failed to get any credentials"); return -1; } if (co.kind == cJBOX_TOKEN || co.kind == cJOB_TOKEN) { ConnectJBox(co); } if (connection_flag) { Info("TJAlienConnectionManager", "Successfully connected to JBox"); co.password = ""; fWSHost = "localhost"; return 0; } std::cout << "\r" << "Opening connection to JCentral. Please wait" << std::flush; for (int i = 0; i < dns_jcentral.lenght(); i++) { std::cout << "." << std::flush; current_host = dns_jcentral.get_next_host(); ConnectJCentral(co, current_host); if (connection_flag) { std::cout << "\r" << std::flush; Info("TJAlienConnectionManager", "Successfully connected to %s", current_host.c_str()); co.password = ""; fWSHost = default_server; if (co.kind == cFULL_GRID_CERT) return 1; else return 2; } else { if (gDebug > 0) { std::cout << "\r" << std::flush; Error("TJAlienConnectionManager", "Failed to connect to %s - retrying...", current_host.c_str()); } sleep(1); } } creds.removeCredentials(co.kind); } std::cout << "\r" << std::flush; Error("TJAlienConnectionManager", "Failed to connect to any server! Giving up"); return -1; } void TJAlienConnectionManager::clearFlags() { destroy_flag = 0; connection_flag = 0; writeable_flag = 0; receive_flag = 0; readBuffer = ""; } //______________________________________________________________________________ void TJAlienConnectionManager::ConnectJBox(TJAlienCredentialsObject c) { TJClientFile jcf; if (jcf.isValid) { MakeWebsocketConnection(c, (std::string)jcf.fHost, jcf.fWSPort); } else { if (gDebug >= 1) Info("TJAlienConnectionManager", "The JClient file is not valid - not connecting to JBox!"); } } void TJAlienConnectionManager::ConnectJCentral(TJAlienCredentialsObject c, string host) { if (gDebug > 1) Info("TJAlienConnectionManager", "Trying to connect to server %s", host.c_str()); MakeWebsocketConnection(c, host, default_WSport); } //______________________________________________________________________________ void TJAlienConnectionManager::MakeWebsocketConnection(TJAlienCredentialsObject creds, string host, int WSPort) { // Create the connection to JBox using the parameters read from the token // returns true if the connection was established if (gDebug > 0) { Info("TJAlienConnectionManager", "Connecting to Server %s:%d", host.c_str(), WSPort); Info("TJAlienConnectionManager", "Using cert %s and %s", creds.certpath.c_str(), creds.keypath.c_str()); } // Use this for debugging //lws_set_log_level(LLL_EXT | LLL_USER | LLL_PARSER | LLL_INFO | LLL_ERR | LLL_NOTICE, nullptr); lws_set_log_level(gDebug, nullptr); // Reset context variables context = nullptr; wsi = nullptr; clearFlags(); // libwebsockets variables struct lws_client_connect_info connect_info; struct lws_context_creation_info creation_info; // Info to create logical connection memset(&connect_info, 0, sizeof connect_info ); memset(&creation_info, 0, sizeof creation_info); // SSL options int use_ssl = LCCSCF_USE_SSL | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; // SSL, no selfsigned, don't check server hostname // Define protocol static const struct lws_protocols protocols[] = { { "jalien-protocol", ws_service_callback, 0, 0, 1, nullptr }, { nullptr, nullptr, 0, 0, 0, nullptr } /* end */ }; // Create the websockets context. This tracks open connections and // knows how to route any traffic and which protocol version to use, // and if each connection is client or server side. creation_info.port = CONTEXT_PORT_NO_LISTEN; // NO_LISTEN - we are client creation_info.iface = nullptr; creation_info.protocols = protocols; creation_info.extensions = nullptr; creation_info.gid = -1; creation_info.uid = -1; creation_info.options = 0; creation_info.vhost_name = "tjalien-root"; creation_info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; creation_info.ws_ping_pong_interval = 10; creation_info.timeout_secs = 5; // TODO (nhardi): switch to explicit key/cert contents // see the context_creation_info creation_info.client_ssl_cert_filepath = creds.certpath.c_str(); creation_info.client_ssl_private_key_filepath = creds.keypath.c_str(); //creation_info.client_ssl_cert_mem = creds.getCertificate().c_str(); //creation_info.client_ssl_cert_mem_len = creds.getCertificate().length(); // TODO (yuw): switch to client_ssl_private_key_password starting from libwebsockets 3.1.0 and onward creation_info.ssl_private_key_password = creds.password.c_str(); // Create context - only logical connection, no real connection yet context = lws_create_context(&creation_info); if (context == nullptr) { Error("TJAlienConnectionManager", "Context creation failure"); destroy_flag = 1; return; } if (gDebug > 1) { Info("TJAlienConnectionManager", "context created"); } connect_info.address = host.c_str(); connect_info.port = WSPort; connect_info.path = "/websocket/json"; connect_info.context = context; connect_info.ssl_connection = use_ssl; connect_info.host = host.c_str(); connect_info.origin = host.c_str(); connect_info.ietf_version_or_minus_one = -1; connect_info.protocol = protocols[0].name; connect_info.pwsi = &wsi; // Create wsi - WebSocket Instance lws_client_connect_via_info(&connect_info); if (wsi == nullptr) { if(gDebug > 0) { Error("TJAlienConnectionManager", "WebSocket instance creation error"); } return; } if (gDebug > 1) Info("TJAlienConnectionManager", "WebSocket instance creation successfull"); // Wait for server responce "connection established" while (!connection_flag) { lws_service(context, 500); if (destroy_flag) { if (gDebug > 1) Error("TJAlienConnectionManager", "Websocket connection failure"); return; } } creation_info.ssl_private_key_password = ""; fWSPort = WSPort; return; } //_____________________________________________________________________________ size_t TJAlienConnectionManager::WriteCallback(void *contents, size_t size, size_t nmemb) { size_t realsize = size * nmemb; readBuffer.append((const char*)contents, realsize); return realsize; } //_____________________________________________________________________________ int TJAlienConnectionManager::websocket_write_back(struct lws *wsi_in, const char *str, int str_size_in) { if (str == nullptr || wsi_in == nullptr) return -1; int n; int len; char *out = nullptr; if (str_size_in < 1) len = strlen(str); else len = str_size_in; out = (char *)malloc(sizeof(char)*(LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING)); // setup the buffer memcpy (out + LWS_SEND_BUFFER_PRE_PADDING, str, len ); // write out n = lws_write(wsi_in, (unsigned char*)out + LWS_SEND_BUFFER_PRE_PADDING, len, LWS_WRITE_TEXT); // free the buffer free(out); return n; } //_____________________________________________________________________________ int TJAlienConnectionManager::ws_service_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { // Websocket callback handler UNUSED(len); switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED: { if (gDebug > 1) printf("[Websocket Callback] Connect with server success\n"); connection_flag = 1; break; } case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: { if (gDebug > 1) printf("[Websocket Callback] Connect with server error\n"); destroy_flag = 1; connection_flag = 0; if (!lws_get_context(wsi)) lws_context_destroy(lws_get_context(wsi)); wsi = nullptr; break; } case LWS_CALLBACK_CLOSED: { if (gDebug > 1) printf("[Websocket Callback] LWS_CALLBACK_CLOSED\n"); destroy_flag = 1; connection_flag = 0; if (!lws_get_context(wsi)) lws_context_destroy(lws_get_context(wsi)); wsi = nullptr; break; } case LWS_CALLBACK_CLIENT_RECEIVE: { if (gDebug > 100) { printf("[Websocket Callback] Client received:%s\n", (char *)in); printf("[Websocket Callback]: %4d (rpp %5d, last %d)\n", (int)len, (int)lws_remaining_packet_payload(wsi), lws_is_final_fragment(wsi)); } readBuffer.append((char*)in); if (lws_is_final_fragment(wsi) != 0) receive_flag = 1; len = 0; break; } case LWS_CALLBACK_CLIENT_WRITEABLE: { if (gDebug > 1) printf("[Websocket Callback] On writeable is called\n"); writeable_flag = 1; break; } #if defined(LWS_OPENSSL_SUPPORT) case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: { if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS is called\n"); //SSL_CTX_set_verify((SSL_CTX*)user, SSL_VERIFY_PEER, nullptr); //SSL_set_verify(wsi->ssl, SSL_VERIFY_PEER, nullptr); std::string location = "/etc/grid-security/certificates/"; struct stat info; if (stat("/cvmfs/alice.cern.ch", &info) == 0) location = "/cvmfs/alice.cern.ch" + location; std::string capath = std::getenv("X509_CERT_DIR") ? : location; size_t pos = 0; std::string token; // If capath contans two paths separated by ":" while ((pos = capath.find(":")) != std::string::npos) { token = capath.substr(0, pos); if (!SSL_CTX_load_verify_locations((SSL_CTX*)user, nullptr, token.c_str())) { if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS failed\n"); return 1; } capath.erase(0, pos + 1); } // If capath is a single path if (capath.length() != 0) if (!SSL_CTX_load_verify_locations((SSL_CTX*)user, nullptr, capath.c_str())) { if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS failed\n"); return 1; } //int preverify_ok; //SSL *ssl = X509_STORE_CTX_get_ex_data((SSL_CTX*)user, SSL_get_ex_data_X509_STORE_CTX_idx()); //user->protocols[0].callback(wsi, LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION, (SSL_CTX*)user, ssl, preverify_ok); break; } case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: { if (gDebug > 1) { printf("[Websocket Callback] LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION is called\n"); //printf("[Websocket Callback] x509_ctx: %s\n", ((SSL_CTX*)user)->client_CA); //printf("[Websocket Callback] ssl: %s\n", in); //printf("[Websocket Callback] SSL_CTX_get_client_CA_list: %s\n", SSL_CTX_get_client_CA_list((SSL_CTX*)user)); } break; } #endif default: break; } return 0; } //______________________________________________________________________________ void TJAlienConnectionManager::ForceRestart() { // Immediately break previous connection and start a new one with user grid vertificate destroy_flag = 1; connection_flag = 0; if (context) lws_context_destroy(context); ConnectJCentral(creds.get(cFULL_GRID_CERT)); if (!IsConnected()) { Info("TJAlienConnectionManager", "Failed to establish the connection to the server"); return; } } //______________________________________________________________________________ void TJAlienConnectionManager::GetHostAndPort(TString &fHost, Int_t &fPort) { fHost = fWSHost; fPort = fWSPort; }; //______________________________________________________________________________ json_object *TJAlienConnectionManager::CreateJsonCommand(TString *command, TList *opt) { // Create the command in Json format. if (command == nullptr || command->Length() == 0) { Error("TJAlienConnectionManager::CreateJsonCommand", "Received empty command"); return nullptr; } json_object *jobj = json_object_new_object(); json_object *jstringcmd = json_object_new_string(command->Data()); json_object_object_add(jobj, "command", jstringcmd); if (opt != nullptr && opt->GetEntries() != 0) { json_object *jarray = json_object_new_array(); for(int i = 0; i < opt->GetSize(); i++) { TObjString *ovalue = (TObjString*) opt->At(i); TString svalue = ovalue->GetString(); json_object *jstring1 = json_object_new_string(svalue); json_object_array_add(jarray, jstring1); } json_object_object_add(jobj, "options", jarray); } return jobj; } //______________________________________________________________________________ TJAlienResult *TJAlienConnectionManager::RunJsonCommand(TString *command, TList *opt) { json_object *jsonCommand = CreateJsonCommand(command, opt); if (jsonCommand == nullptr) { Error("TJAlienConnectionManager", "JSON command build failed: %s", command->Data()); return nullptr; } if (gDebug > 1) Info("TJAlienConnectionManager", "JSON command to be done: %s", json_object_to_json_string(jsonCommand) ); readBuffer = ""; if (!connection_flag) { Error("TJAlienConnectionManager", "Connection is broken!"); return nullptr; } websocket_write_back(wsi, json_object_to_json_string(jsonCommand), -1); lws_callback_on_writable(wsi); while (!receive_flag && !destroy_flag) lws_service(context, 500); // If connection is broken or closed by server, try to reconnect if (destroy_flag) { Error("TJAlienConnectionManager", "Connection is broken, retrying..."); CreateConnection(); sleep(1); return RunJsonCommand(command, opt); } receive_flag = 0; json_object_put(jsonCommand); json_object * jobj_res = json_tokener_parse(readBuffer.c_str()); // get response TJAlienResult *GridResult = this->GetCommandResult(jobj_res, *command == "find"); return GridResult; } //______________________________________________________________________________ TJAlienResult* TJAlienConnectionManager::GetCommandResult(json_object *json_response, bool expand_find) { // JSON parser of the JBox result // - parsing the meta information if (json_response == nullptr) { Error("GetCommandResult", "Result JSON is nullptr"); return nullptr; } TJAlienResult *gridResult = new TJAlienResult(); // get data from metadata json_object *js_metadata; json_object_object_get_ex(json_response, "metadata", &js_metadata); if (js_metadata != nullptr && json_object_is_type(js_metadata, json_type_object)) { json_object_object_foreach(js_metadata, key, val) { gridResult->SetMetaData(new TObjString(key), new TObjString(json_object_get_string(val))); } } // get data from results json_object *js_results; json_object_object_get_ex(json_response, "results", &js_results); if (js_results != nullptr && json_object_is_type(js_results, json_type_array)) { int arraylen = json_object_array_length(js_results); int i; for (i = 0; i< arraylen; i++) { json_object *jvalue = json_object_array_get_idx(js_results, i); TMap *t = new TMap(); json_object_object_foreach(jvalue, key, val) { TString sValue = json_object_get_string(val); TString sKey = key; t->Add(new TObjString(key), new TObjString(sValue)); if(expand_find && sKey == "lfn") { t->Add(new TObjString("turl"), new TObjString("alien://" + sValue)); } } gridResult->Add(t); } } return gridResult; }