// @(#)root/net:$Id$ // Author: Volodymyr Yurchenko 01/12/2016 /************************************************************************* * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. * * All rights reserved. * * * * For the licensing terms see $ROOTSYS/LICENSE. * * For the list of contributors see $ROOTSYS/README/CREDITS. * *************************************************************************/ #include "TObjArray.h" #include "TString.h" #include "TJAlien.h" #include "TJAlienJDL.h" #include "TJAlienCollection.h" #include "TJAlienFile.h" #include "TJAlienJob.h" #include "TJAlienJobStatus.h" #include "TJAlienJobStatusList.h" #include "TJAlienResultRewriter.h" #include "TJAlienDNSResolver.h" #include ClassImp(TJAlien) int TJAlien::destroy_flag = 0; int TJAlien::connection_flag = 0; int TJAlien::writeable_flag = 0; int TJAlien::receive_flag = 0; std::string TJAlien::readBuffer = ""; using std::string; //______________________________________________________________________________ TJAlien::TJAlien (const char* gridUrl, const char* uId, const char* passwd, const char* options) { if (gDebug > 1) Info("TJAlien", "Connecting to JBox"); UNUSED(gridUrl); UNUSED(uId); UNUSED(passwd); UNUSED(options); fGrid = "alien"; gGrid = this; if (getenv("HOME") != NULL) homedir = getenv("HOME"); else homedir = "~"; if (getenv("TMPDIR") != NULL) tmpdir = getenv("TMPDIR"); else if (getenv("TMP") != NULL) tmpdir = getenv("TMP"); else if (getenv("TEMP") != NULL) tmpdir = getenv("TEMP"); else tmpdir = P_tmpdir; CreateConnection(); } //______________________________________________________________________________ TJAlien::~TJAlien() { if (gDebug > 1) Info("TJAlien", "Destructor called"); if (!context) lws_context_destroy(context); } //______________________________________________________________________________ void TJAlien::CreateConnection() { TJAlienCredentialsObject co; TJAlienDNSResolver dns_jcentral(default_server, default_WSport); string current_host; clearFlags(); creds.loadCredentials(); if(creds.has(cJOB_TOKEN)) { co = creds.get(cJOB_TOKEN); } else if(creds.has(cJBOX_TOKEN)) { co = creds.get(cJBOX_TOKEN); } else if (creds.has(cFULL_GRID_CERT)) { co = creds.get(cFULL_GRID_CERT); } else { return; } if(co.kind == cJBOX_TOKEN || co.kind == cJOB_TOKEN) { ConnectJBox(); } if(connection_flag) return; for (int i = 0; i < dns_jcentral.lenght(); i++) { current_host = dns_jcentral.get_next_host(); ConnectJCentral(co, current_host); if (connection_flag) { // If connected directly to JCentral, immediately ask for token if(gDebug > 0) { Info("TJAlien", "Successfully connected to %s", current_host.c_str()); } Token("", false); fUser = Whoami(); return; } else { Error("TJAlien", "Failed to connect to %s - retrying...", current_host.c_str()); sleep(1); } } Error("TJAlien", "Failed to connect to any server! Giving up"); } void TJAlien::clearFlags() { destroy_flag = 0; connection_flag = 0; writeable_flag = 0; receive_flag = 0; readBuffer = ""; } //______________________________________________________________________________ void TJAlien::ConnectJBox() { if(!creds.has(cJBOX_TOKEN)) { return; } TJAlienCredentialsObject c = creds.get(cJBOX_TOKEN); TJClientFile jcf; if(jcf.isValid) { MakeWebsocketConnection(c, (string)jcf.fHost, jcf.fWSPort); } else { if(gDebug >= 1) Info("TJAlien", "The JClient file is not valid - not connecting to JBox!"); } } void TJAlien::ConnectJCentral(TJAlienCredentialsObject c, string host) { if (gDebug > 1) Info("TJAlien", "Trying to connect to server %s", host.c_str()); MakeWebsocketConnection(c, host, default_WSport); } //______________________________________________________________________________ void TJAlien::MakeWebsocketConnection(TJAlienCredentialsObject creds, string host, int WSPort) { // Create the connection to JBox using the parameters read from the token // returns true if the connection was established if(gDebug > 0) { Info("TJAlien", "Connecting to Server %s:%d", host.c_str(), WSPort); Info("TJAlien", "Using cert %s and %s", creds.certpath.c_str(), creds.keypath.c_str()); } // Use this for debugging // lws_set_log_level(1023, NULL);//LLL_DEBUG | LLL_INFO | LLL_ERR | LLL_NOTICE, NULL); lws_set_log_level(gDebug, NULL); // Reset context variables context = NULL; wsi = NULL; clearFlags(); // libwebsockets variables struct lws_client_connect_info connect_info; memset(&connect_info, 0, sizeof connect_info ); memset(&creation_info, 0, sizeof creation_info); // SSL options int use_ssl = LCCSCF_USE_SSL | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; // SSL, no selfsigned, don't check server hostname // Define protocol static const struct lws_protocols protocols[] = { { "jalien-protocol", ws_service_callback, 0, 0, 1, NULL }, { NULL, NULL, 0, 0, 0, NULL } /* end */ }; // Create the websockets context. This tracks open connections and // knows how to route any traffic and which protocol version to use, // and if each connection is client or server side. creation_info.port = CONTEXT_PORT_NO_LISTEN; // NO_LISTEN - we are client creation_info.iface = NULL; creation_info.protocols = protocols; creation_info.extensions = NULL; creation_info.gid = -1; creation_info.uid = -1; creation_info.options = 0; creation_info.vhost_name = "tjalien-root"; // TODO (nhardi): switch to explicit key/cert contents // see the context_creation_info creation_info.client_ssl_cert_filepath = creds.certpath.c_str(); creation_info.client_ssl_private_key_filepath = creds.keypath.c_str(); // creation_info.client_ssl_cert_mem = creds.getCertificate().c_str(); // creation_info.client_ssl_cert_mem_len = creds.getCertificate().length(); if (use_ssl) { creation_info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; } // Create context - only logical connection, no real connection yet context = lws_create_context(&creation_info); if (context == NULL) { Error("TJAlien", "Context creation failure"); destroy_flag = 1; return; } if (gDebug > 1) { Info("TJAlien", "context created"); } connect_info.address = host.c_str(); connect_info.port = WSPort; connect_info.path = "/websocket/json"; connect_info.context = context; connect_info.ssl_connection = use_ssl; connect_info.host = host.c_str(); connect_info.origin = host.c_str(); connect_info.ietf_version_or_minus_one = -1; connect_info.protocol = protocols[0].name; connect_info.pwsi = &wsi; // Create wsi - WebSocket Instance lws_client_connect_via_info(&connect_info); if (wsi == NULL) { if(gDebug > 0) { Error("TJAlien", "WebSocket instance creation error"); } return; } if (gDebug > 1) Info("TJAlien", "WebSocket instance creation successfull"); // Wait for server responce "connection established" while (!connection_flag) { lws_service(context, 500); if (destroy_flag) { if (gDebug > 1) Error("TJAlien", "Websocket connection failure"); return; } } return; } //______________________________________________________________________________ json_object *TJAlien::CreateJsonCommand(TString *command, TList *opt) { // Create the command in Json format. if (command == NULL || command->Length() == 0) { Error("TJAlien::CreateJsonCommand", "Received empty command"); return NULL; } json_object *jobj = json_object_new_object(); json_object *jstringcmd = json_object_new_string(command->Data()); json_object_object_add(jobj, "command", jstringcmd); if (opt != NULL && opt->GetEntries() != 0) { json_object *jarray = json_object_new_array(); for(int i = 0; i < opt->GetSize(); i++) { TObjString *ovalue = (TObjString*) opt->At(i); TString svalue = ovalue->GetString(); json_object *jstring1 = json_object_new_string(svalue); json_object_array_add(jarray, jstring1); } json_object_object_add(jobj, "options", jarray); } return jobj; } //______________________________________________________________________________ TJAlienResult *TJAlien::RunJsonCommand(TString *command, TList *opt) { json_object *jsonCommand = CreateJsonCommand(command, opt); if (jsonCommand == NULL) { Error("TJAlien", "JSON command build failed: %s", command->Data()); return NULL; } if (gDebug > 1) Info("TJAlien", "JSON command to be done: %s", json_object_to_json_string(jsonCommand) ); TJAlien::readBuffer = ""; if (!connection_flag) { Error("RunJsonCommand", "Connection is broken!"); return NULL; } websocket_write_back(wsi, json_object_to_json_string(jsonCommand), -1); lws_callback_on_writable(wsi); while (!receive_flag && !destroy_flag) lws_service(context, 500); // If connection is broken or closed by server, try to reconnect if (destroy_flag) { Error("RunJsonCommand", "Connection is broken, retrying..."); CreateConnection(); sleep(1); return RunJsonCommand(command, opt); } receive_flag = 0; json_object_put(jsonCommand); json_object * jobj_res = json_tokener_parse(TJAlien::readBuffer.c_str()); // get response TJAlienResult *GridResult = this->GetCommandResult( jobj_res ); return GridResult; } //______________________________________________________________________________ TJAlienResult* TJAlien::GetCommandResult(json_object *json_response) { // JSON parser of the JBox result // - parsing the meta information if (json_response == NULL) { Error("GetCommandResult", "Result JSON is null"); return NULL; } TJAlienResult *gridResult = new TJAlienResult(); // get data from metadata json_object *js_metadata; json_object_object_get_ex(json_response, "metadata", &js_metadata); if (js_metadata != NULL && json_object_is_type(js_metadata, json_type_object)) { json_object_object_foreach(js_metadata, key, val) { gridResult->SetMetaData(new TObjString(key), new TObjString(json_object_get_string(val))); } } // get data from results json_object *js_results; json_object_object_get_ex(json_response, "results", &js_results); if (js_results != NULL && json_object_is_type(js_results, json_type_array)) { int arraylen = json_object_array_length(js_results); int i; for (i = 0; i< arraylen; i++) { json_object *jvalue = json_object_array_get_idx(js_results, i); TMap *t = new TMap(); json_object_object_foreach(jvalue, key, val) { t->Add(new TObjString(key), new TObjString(json_object_get_string(val))); } gridResult->Add(t); } } return gridResult; } //______________________________________________________________________________ void TJAlien::Stderr() { if (TJAlien::readBuffer != NULL) { json_object * jobj_res = json_tokener_parse(TJAlien::readBuffer.c_str()); // get metadata json_object *js_results; json_object_object_get_ex(jobj_res, "metadata", &js_results); if (js_results != NULL && json_object_is_type(js_results, json_type_object)) { TString error_stream = json_object_get_string(json_object_object_get(js_results, "error")); if (error_stream != NULL && error_stream.Length() != 0) printf("%s\n", error_stream.Data()); } } } //______________________________________________________________________________ void TJAlien::Stdout() { if (TJAlien::readBuffer != NULL) { json_object * jobj_res = json_tokener_parse(TJAlien::readBuffer.c_str()); //Info("Stdout", "%s", TJAlien::readBuffer.c_str()); // get data from results json_object *js_results; json_object_object_get_ex(jobj_res, "results", &js_results); if (js_results != NULL && json_object_is_type(js_results, json_type_array)) { int arraylen = json_object_array_length(js_results); int i; for (i = 0; i < arraylen; i++) { json_object *jvalue = json_object_array_get_idx(js_results, i); json_object_object_foreach(jvalue, key, val) { printf("%s\t", json_object_get_string(val)); UNUSED(key); } printf("\n"); } } } } //______________________________________________________________________________ unsigned int TJAlien::ReadTags(int column, std::map &tags) const { /** Fills the key-value pairs of a response from the server into the the associative array @tags. @column is the column of the response you are interested in. The return value is the number of tags found. */ UNUSED(column); if (TJAlien::readBuffer != NULL) { json_object * jobj_res = json_tokener_parse(TJAlien::readBuffer.c_str()); // get data from results json_object *js_results; json_object_object_get_ex(jobj_res, "results", &js_results); if (js_results != NULL && json_object_is_type(js_results, json_type_array)) { int arraylen = json_object_array_length(js_results); int i; for (i = 0; i < arraylen; i++) { Info("TJAlien", "==================="); json_object *jvalue = json_object_array_get_idx(js_results, i); json_object_object_foreach(jvalue, key, val) { tags[key] = json_object_get_string(val); } } } } return tags.size(); } //______________________________________________________________________________ TGridResult *TJAlien::Command(const char *command, bool interactive, UInt_t stream) { // TGrid Command method implementation // Command can be called directly by the user // Command format is one line, ex : ls -la // For internal usage follow RunJsonCommand method // Do not forget to delete the result after usage if (command == NULL || std::string(command).find_first_not_of(' ') == std::string::npos) { Error("Command", "Please, specify the command"); return NULL; } if (interactive) Info("Command", "Received full command =\"%s\"", command); UNUSED(stream); TString *sCmd = new TString(command); TObjArray *tokens = sCmd->Tokenize(" "); TJAlienResult *result; TJAlien::readBuffer = ""; if (tokens->GetEntries() == 1) { if (interactive) Info("Command", "Received only command"); result = (TJAlienResult*) RunJsonCommand(sCmd, 0); TJAlienResultRewriter().Rewrite((std::string)*sCmd, result); } else { TList *options = new TList(); TObjString *sCommand = (TObjString *) tokens->At(0); if (interactive) Info("Command", "Command = \"%s\"", sCommand->GetString().Data()); for(int i=1; iGetEntries(); i++) { TObjString *opt = (TObjString*) tokens->At(i); if (interactive) Info("Command", "Option = \"%s\"", opt->GetString().Data()); options->Add(opt); } TString *s = new TString(sCommand->GetString()); result = (TJAlienResult*) RunJsonCommand(s, options); TJAlienResultRewriter().Rewrite((std::string)sCommand->GetString(), result); if (interactive) { Stdout(); Stderr(); } delete s; delete options; } delete tokens; delete sCmd; if (result) { // Extract the username immediately after each command, // since it could change TObjString p("user"); TObjString *sUserMetadata = result->GetMetaData(&p); if (sUserMetadata) { fUser = sUserMetadata->GetString().Data(); } } return result; } //______________________________________________________________________________ void TJAlien::Token(Option_t* options, bool force_restart) { if (force_restart) { // Immediately break previous connection and start a new one with user grid vertificate destroy_flag = 1; connection_flag = 0; if (context) lws_context_destroy(context); std::string usercert = sUsercert.Data()[0] != '\0' ? sUsercert.Data() : homedir + "/.globus/usercert.pem"; std::string userkey = sUserkey.Data()[0] != '\0' ? sUserkey.Data() : homedir + "/.globus/userkey.pem"; std::string usercertpath = std::getenv("X509_USER_CERT") ? : usercert.c_str(); std::string userkeypath = std::getenv("X509_USER_KEY") ? : userkey.c_str(); // ConnectJBox(usercertpath, userkeypath); TJAlienCredentialsObject co(usercertpath, userkeypath); ConnectJCentral(co); if (!connection_flag) { Info("TJAlien", "TJAlien::Token failed to establish the connection to the server"); return; } } TString sCmd("token"); TString sOptions(options); if (sOptions.Length() > 0) sCmd += TString(" ") + sOptions; if (gDebug > 1) Info("Token", "Full command = \"%s\"", sCmd.Data()); TJAlienResult *result = (TJAlienResult*) Command(sCmd.Data()); if (result) { TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(result, errorMessage); if (exitcode != 0) { Error("Token", "%s", errorMessage->GetString().Data()); delete result; } std::stringstream tokencert_s, tokenkey_s, tokenlock_s; tokencert_s << tmpdir << "/tokencert_" << getuid() << ".pem"; tokenkey_s << tmpdir << "/tokenkey_" << getuid() << ".pem"; tokenlock_s << tmpdir << "/jalien_token_" << getuid() << ".lock"; std::string tokencert = tokencert_s.str(); std::string tokenkey = tokenkey_s.str(); std::string tokenlock = tokenlock_s.str(); std::string tokencertpath = std::getenv("JALIEN_TOKEN_CERT") ? : tokencert; std::string tokenkeypath = std::getenv("JALIEN_TOKEN_KEY") ? : tokenkey; { // Create a lock file to block other TJAlien-ROOT instances from writing to tokencert file // If a lock exists that is older than 300 seconds, the file is removed and created again. TLockFile lock(tokenlock.c_str(), 300); FILE *tokencertfile = NULL; FILE *tokenkeyfile = NULL; // First modify permissions if files already exist if ((tokencertfile = fopen(tokencertpath.c_str(), "r")) && (tokenkeyfile = fopen(tokenkeypath.c_str(), "r")) ) { // TODO: add a validity check here if (system(("chmod 755 " + tokencertpath).c_str())) Error("Token", "Error while accessing token files"); if (system(("chmod 755 " + tokenkeypath).c_str())) Error("Token", "Error while accessing token files"); fclose(tokencertfile); fclose(tokenkeyfile); } // Write files and restrict permissions back if ((tokencertfile = fopen(tokencertpath.c_str(), "w")) && (tokenkeyfile = fopen(tokenkeypath.c_str(), "w")) ) { fprintf(tokencertfile, "%s", result->GetKey(0, "tokencert")); fprintf(tokenkeyfile, "%s", result->GetKey(0, "tokenkey")); if (system(("chmod 440 " + tokencertpath).c_str())) Error("Token", "Error while accessing token files"); if (system(("chmod 400 " + tokenkeypath).c_str())) Error("Token", "Error while accessing token files"); fclose(tokencertfile); fclose(tokenkeyfile); } } } else { Error("Token", "RequestTokenCert: error while running command, no return result"); } return; } //______________________________________________________________________________ TGridResult *TJAlien::Ls(const char* lfn, Option_t* options, Bool_t verbose) { if (verbose) Info("Ls", "Ls command received with lfn = \"%s\" and options = \"%s\"", lfn, options); TString cmdline("ls"); TString sLfn(lfn); TString sOptions(options); if (sLfn.Length() > 0 && std::string(lfn).find_first_not_of(' ') != std::string::npos) cmdline += TString(" ") + sLfn; if (sOptions.Length() > 0) cmdline += TString(" ") + sOptions; if (gDebug > 1) Info("Ls", "Full command = \"%s\"", cmdline.Data()); TJAlienResult *result = (TJAlienResult*) Command(cmdline.Data()); if (verbose) { Stdout(); Stderr(); } if (result) { TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(result, errorMessage); if (exitcode != 0) { if (gDebug > 1) Error("Ls", "%s", errorMessage->GetString().Data()); delete result; return NULL; } } else { if (gDebug > 1) Error("Ls", "Ls: error while running command, no return result"); return NULL; } if (gDebug > 1) Info("Ls", "Ls command successful"); return result; } //______________________________________________________________________________ Bool_t TJAlien::Cd(const char* lfn, Bool_t verbose) { if (verbose) Info("Cd", "\"Cd\" command with argument = \"%s\"", lfn); TString cmdline("cd"); TString sLfn(lfn); if (sLfn == NULL || sLfn.Length() == 0 || std::string(lfn).find_first_not_of(' ') == std::string::npos) sLfn = GetHomeDirectory(); cmdline += TString(" ") + sLfn; TJAlienResult *result = (TJAlienResult*) Command(cmdline.Data(), kFALSE); if (verbose) { Stdout(); Stderr(); } if (result) { TObjString *errorMessage = new TObjString(); Int_t exitcode = GetExitCode(result, errorMessage); if (exitcode != 0) { if (gDebug > 1) Error("Cd", "%s", errorMessage->GetString().Data()); delete result; return kFALSE; } else { if (gDebug > 1) Info("Cd", "Cd command successful, changed to \"%s\"", sLfn.Data()); delete result; return kTRUE; } } else { Error("Cd", "Cd: error while running command, no return result"); return kFALSE; } } //______________________________________________________________________________ const char *TJAlien::Pwd(Bool_t verbose) { Info("Pwd", "Pwd command received"); TString cmdline = TString("pwd"); TJAlienResult* result = (TJAlienResult*) Command(cmdline.Data(), kFALSE, kENVIR); if (verbose) { Stdout(); Stderr(); } if (result) { TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(result, errorMessage); if (exitcode != 0) { Error("Pwd", "Pwd command failed with error message \"%s\"", errorMessage->GetString().Data()); delete result; return NULL; } if (result) { TMap* resultmap = ((TMap*)result->At(0)); if (resultmap) { TObjString* pwd = (TObjString*)resultmap->GetValue("pwd"); if (pwd) { fPwd = pwd->GetName(); Info("Pwd", "Pwd, setting pwd to \"%s\"", pwd->GetString().Data()); delete resultmap; return fPwd; } else { delete resultmap; return 0; } } } delete result; } else { Error("Pwd", "Pwd: error while running command, no return result"); } return NULL; } //______________________________________________________________________________ Int_t TJAlien::Mkdir(const char* ldn, Option_t* option, Bool_t verbose) { // returns exitcode, what means: // 0 - if success // not 0 with error message - if sth went wrong if (ldn == NULL || std::string(ldn).find_first_not_of(' ') == std::string::npos) { Error("Mkdir", "Command requires an argument"); return -1; } TString cmdline("mkdir"); TString sOption(option); if (sOption.Length() > 0) cmdline += TString(" ") + sOption; cmdline += (TString(" ") + TString(ldn)); TGridResult *result = Command(cmdline, kFALSE); if (verbose) { Stdout(); Stderr(); } if (result) { TJAlienResult * jalienresult = (TJAlienResult*) result; TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(jalienresult, errorMessage); if (exitcode != 0) { if (gDebug > 1) Error("Mkdir", "Mkdir command failed with error message \"%s\"", errorMessage->GetString().Data()); delete result; return exitcode; } if (gDebug > 1) Info("Mkdir", "Mkdir command successful, created directory \"%s\"", ldn); delete result; return exitcode; } Error("Mkdir", "Cannot create directory %s", ldn); if (!verbose) Stdout(); return 1; } //______________________________________________________________________________ Bool_t TJAlien::Rmdir(const char* ldn, Option_t* options, Bool_t verbose) { if (ldn == NULL || std::string(ldn).find_first_not_of(' ') == std::string::npos) { Error("Rmdir", "Command requires an argument"); return -1; } TString cmdline = TString("rmdir "); if (strlen(options)) { cmdline += TString(options); } else { cmdline += TString(ldn); } TGridResult *result = Command(cmdline, kFALSE); if (verbose) { Stdout(); Stderr(); } if (result) { TJAlienResult *jalienresult = (TJAlienResult *) result; TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(jalienresult, errorMessage); if (exitcode != 0) { if (gDebug > 1) Error("Rmdir", "Rmdir command failed with error message \"%s\"", errorMessage->GetString().Data()); delete result; return kFALSE; } else { if (gDebug > 1) Info("Rmdir", "Rmdir command successful, removed directory \"%s\"", ldn); delete result; return kTRUE; } } if (gDebug > 1) Error("Rmdir","Cannot remove directory %s",ldn); return kFALSE; } //______________________________________________________________________________ Bool_t TJAlien::Register(const char* lfn, const char* turl, Long_t size, const char* se, const char* guid, Bool_t verbose) { UNUSED(lfn); UNUSED(turl); UNUSED(size); UNUSED(se); UNUSED(guid); UNUSED(verbose); (dynamic_cast (gGrid))->NotImplemented(__func__, __FILE__, __LINE__); return kFALSE; /* if (lfn == NULL || std::string(lfn).find_first_not_of(' ') == std::string::npos) { Error("Register", "Command requires an argument"); return -1; } TString cmdline = TString("register ") + TString(lfn) + TString(" ") + TString(turl); if (se) { cmdline += (TString(" ") + size + TString(" ") + TString(se)); if (guid) { cmdline += (TString(" ") + TString(guid)); } } TGridResult *result = Command(cmdline, kFALSE); if (verbose) { Stdout(); Stderr(); } if (result) { TJAlienResult *jalienresult = (TJAlienResult *) result; TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(jalienresult, errorMessage); if (exitcode != 0) { if (gDebug > 1) Error("Register", "Register command failed with error message \"%s\"", errorMessage->GetString().Data()); delete result; return kFALSE; } else { if (gDebug > 1) Info("Register", "Register command successful, registered file \"%s\"", lfn); delete result; return kTRUE; } } if (gDebug > 1) Error("Register", "Unable to register file %s", lfn); return kFALSE;*/ } //______________________________________________________________________________ TGridResult* TJAlien::ListPackages(const char* alienpackagedir) { if (!alienpackagedir) { alienpackagedir = "/alice/packages"; } TGridResult* gr = (TGridResult*) new TJAlienResult(); TGridResult* result = Ls(alienpackagedir); if (result) { Int_t i = 0; while (result->GetFileName(i)) { TString pname=result->GetFileName(i); TGridResult* version = Ls(Form("%s/%s",alienpackagedir,pname.Data())); if (version) { Int_t j=0; while (version->GetFileName(j)) { TString pversion=version->GetFileName(j); if (!pversion.Contains("post_")) { TGridResult* platform = Ls(Form("%s/%s/%s", alienpackagedir, pname.Data(), pversion.Data())); if (platform) { Int_t k = 0; TString allplatform = ""; while (platform->GetFileName(k)) { TString pplatform = platform->GetFileName(k); allplatform += pplatform; allplatform += " "; TMap* grmap = new TMap(); grmap->Add((TObject*) new TObjString("name"), (TObject*) new TObjString(pplatform.Data())); grmap->Add((TObject*) new TObjString("path"), new TObjString( Form ( "%s/%s/%s/%s" , alienpackagedir, pname.Data(), pversion.Data(), pplatform.Data()))); gr->Add(grmap); k++; } Info("ListPackages","Package: %-16s Version: %-20s Platform: [ %s ]", pname.Data(), pversion.Data(), allplatform.Data()); delete platform; } } j++; } delete version; } i++; } delete result; } return gr; } //______________________________________________________________________________ Bool_t TJAlien::Rm(const char* lfn, Option_t* options, Bool_t verbose) { if (lfn == NULL || std::string(lfn).find_first_not_of(' ') == std::string::npos) { Error("Rm", "Command requires an argument"); return kFALSE; } TString cmdline; TString sOption(options); if (sOption.Length() > 0) cmdline = TString("rm ") + sOption + TString(" ") + TString(lfn); else cmdline = TString("rm ") + TString(lfn); TGridResult *result = Command(cmdline, kFALSE); if (verbose) { Stdout(); Stderr(); } if (result) { TJAlienResult *jalienresult = (TJAlienResult *) result; TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(jalienresult, errorMessage); if (exitcode != 0) { if (gDebug > 1) Error("Rm", "Rm command failed with error message \"%s\"", errorMessage->GetString().Data()); delete result; return kFALSE; } else { if (gDebug > 1) Info("Rm", "Rm command successful"); delete result; return kTRUE; } } if (gDebug > 1) Error("Rm", "Cannot remove %s", lfn); return kFALSE; } //______________________________________________________________________________ TJAlien::CatalogType TJAlien::Type(const char* lfn, Option_t* option, Bool_t verbose) { // returns the type of the given lfn if (lfn == NULL || std::string(lfn).find_first_not_of(' ') == std::string::npos) { Error("Type", "Command requires an argument"); return kFailed; } TString cmdline; TString sOption(option); if (sOption.Length() > 0) { cmdline = TString("type ") + sOption + TString(" ") + TString(lfn); } else { cmdline = TString("type ") + TString(lfn); } TJAlienResult *result = (TJAlienResult*) Command(cmdline, kFALSE); if (verbose) { Stdout(); Stderr(); } if (!result) { Error("Type", "Did not receive TGridResult from query %s", cmdline.Data()); return kFailed; } //check exitcode, if command was executed successfully TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(result, errorMessage); if (exitcode != 0) { Error("Type", "Type command failed with error message \"%s\"", errorMessage->GetString().Data()); delete result; return kFailed; } const char* typeStr = result->GetKey(0, "type"); if (!typeStr || strlen(typeStr) == 0) { Error("Type", "Could not get type of %s", lfn); delete result; return kFailed; } TJAlien::CatalogType type = kFailed; if (strcmp(typeStr, "file") == 0) { type = kFile; } else if (strcmp(typeStr, "directory") == 0) { type = kDirectory; } else if (strcmp(typeStr, "collection") == 0) { type = kCollection; } else Error("Type", "Unknown type %s", typeStr); delete result; return type; } //______________________________________________________________________________ TGridJob *TJAlien::Submit(const char *jdl) { // Submit a command to JAliEn. Returns 0 in case of error. if (!jdl) return 0; TString command = TString("submit "); command += jdl; if (gDebug > 1) Info("TJAlien", "command: %s", command.Data()); TGridResult* result = Command(command, kFALSE, kOUTPUT); TJAlienResult* jalienResult = dynamic_cast (result); TList* list = dynamic_cast (jalienResult); if (!list) { if (result) delete result; return 0; } //check exitcode, if command was executed successfully if (jalienResult) { TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(jalienResult, errorMessage); if (exitcode != 0) { Error("Submit", "Submit command failed with error message \"%s\"", errorMessage->GetString().Data()); delete result; return 0; } } jalienResult->DumpResult(); TString jobID = "0"; TIterator* iter = list->MakeIterator(); TObject* object = 0; while ((object = iter->Next()) != 0) { TMap* map = dynamic_cast(object); TObject* jobIDObject = map->GetValue("jobId"); TObjString* jobIDStr = dynamic_cast (jobIDObject); if (jobIDStr) jobID = jobIDStr->GetString(); } delete iter; delete result; if (jobID == "0") { Error("Submit", "Error submitting job"); return 0; } Info("Submit", "Your job was submitted with the ID = %s", jobID.Data()); return dynamic_cast (new TJAlienJob(jobID)); } //______________________________________________________________________________ TGridJDL *TJAlien::GetJDLGenerator() { return new TJAlienJDL(); } // TODO not implemented in java, TEST!!!! //______________________________________________________________________________ Bool_t TJAlien::ResubmitById(TString jobid) { // Resubmit a specific job. TString cmdline = TString("resubmit ") + jobid; TGridResult *result = Command(cmdline, kFALSE); if (result) { TJAlienResult *jalienresult = (TJAlienResult *) result; TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(jalienresult, errorMessage); if (exitcode != 0) { Error("ResubmitById", "ResubmitByIt command failed with error message \"%s\"", errorMessage->GetString().Data()); delete result; return kFALSE; } else { Info("ResubmitById", "ResubmitById command successful, submited job \"%s\"", jobid.Data()); delete result; return kTRUE; } } return kFALSE; } // TODO not implemented in java, TEST!!!! //______________________________________________________________________________ Bool_t TJAlien::KillById(TString jobid) { // Kill a specific job. TString cmdline = TString("kill ") + jobid; TGridResult *result = Command(cmdline, kFALSE); if (result) { TJAlienResult *jalienresult = (TJAlienResult *) result; TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(jalienresult, errorMessage); if (exitcode != 0) { Error("KillById", "KillById command failed with error message \"%s\"", errorMessage->GetString().Data()); delete result; return kFALSE; } else { Info("KillById", "KillById command successful, killed job \"%s\"", jobid.Data()); delete result; return kTRUE; } } return kFALSE; } //______________________________________________________________________________ TGridJobStatusList *TJAlien::Ps(const char* options, Bool_t verbose) { UNUSED(verbose); TString cmdline("ps"); if (options != NULL && std::string(options).find_first_not_of(' ') != std::string::npos) { TString sOptions(options); cmdline += TString(" ") + sOptions; } TJAlienResult *jalienresult = (TJAlienResult*) Command(cmdline, kFALSE); Stdout(); Stderr(); if (jalienresult) { TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(jalienresult, errorMessage); if (exitcode != 0) { Error("Ps", "%s", errorMessage->GetString().Data()); delete jalienresult; return NULL; } } else { Error("Ps", "Ps: error while running command, no return result"); return NULL; } //jalienresult->DumpResult(); TList *list = dynamic_cast (jalienresult); if (!list) { if (jalienresult) delete jalienresult; Error("Ps", "Ps: error while running command, no empty result returned"); return NULL; } TJAlienJobStatusList *joblist = new TJAlienJobStatusList(); TIterator *it = list->MakeIterator(); TObject *object = it->Next(); while (object != 0) { TMap *status = dynamic_cast (object); TJAlienJobStatus *jobstatus = new TJAlienJobStatus(status); //if (verbose) jobstatus->Print(""); joblist->Add((TGridJobStatus*)jobstatus); object = it->Next(); } delete jalienresult; return joblist; } //______________________________________________________________________________ TGridResult* TJAlien::GetCollection(const char* lfn, Option_t* option, Bool_t verbose) { if (lfn == NULL || std::string(lfn).find_first_not_of(' ') == std::string::npos) lfn = GetHomeDirectory(); TString cmdline; TString sOption(option); if (sOption != 0 && sOption.Length() > 0) { cmdline = TString("listFilesFromCollection ") + sOption + TString(" ") + TString(lfn); } else { cmdline = TString("listFilesFromCollection ") + TString(lfn); } TGridResult* gridResult = Command(cmdline, kFALSE); if (verbose) { Stdout(); Stderr(); } return gridResult; } //______________________________________________________________________________ TGridCollection *TJAlien::OpenCollection(const char *collectionfile, UInt_t maxentries) { // Factory function for a TJAlienCollection based on an XML file. TString path(collectionfile); if (path.BeginsWith("alien://", TString::kIgnoreCase)) { TJAlien* jalien = dynamic_cast (gGrid); if (!jalien) { Error("OpenCollection", "Trying to read a collection, but gGrid is not initialized with JAliEn"); return 0; } TString lfn = path(strlen("alien://"), path.Length()); if (jalien->Type(lfn) == kCollection) { // it is a collection TGridResult* gridResult = jalien->GetCollection(lfn, 0, kFALSE); if (!gridResult) { Error("OpenCollection", "Could not retrieve collection %s from the catalog", collectionfile); return 0; } //check exitcode, if command was executed successfully TJAlienResult *jalienresult = (TJAlienResult *) gridResult; TObjString *errorMessage = 0; Int_t exitcode = GetExitCode(jalienresult, errorMessage); if (exitcode != 0) { Error("OpenCollection", "OpenCollection command failed with error message \"%s\"", errorMessage->GetString().Data()); delete gridResult; return 0; } return TJAlienCollection::OpenJAliEnCollection(gridResult); } } return TJAlienCollection::Open(collectionfile, maxentries); } //______________________________________________________________________________ TGridCollection *TJAlien::OpenCollectionQuery(TGridResult *queryresult, Bool_t nogrouping) { // Factory function fo a TJAlienCollection based on a gGrid Query. return (TGridCollection*)TJAlienCollection::OpenQuery(queryresult, nogrouping); } //______________________________________________________________________________ TGridResult *TJAlien::OpenDataset(const char *lfn, const char *options) { UNUSED(lfn); UNUSED(options); (dynamic_cast (gGrid))->NotImplemented(__func__, __FILE__, __LINE__); return NULL; } //______________________________________________________________________________ TMap *TJAlien::GetColumn(UInt_t stream, UInt_t column) { UNUSED(stream); UNUSED(column); (dynamic_cast (gGrid))->NotImplemented(__func__, __FILE__, __LINE__); return NULL; } //______________________________________________________________________________ const char *TJAlien::GetStreamFieldValue(UInt_t stream, UInt_t column, UInt_t row) { UNUSED(stream); UNUSED(column); UNUSED(row); (dynamic_cast (gGrid))->NotImplemented(__func__, __FILE__, __LINE__); return NULL; } //______________________________________________________________________________ const char *TJAlien::GetStreamFieldKey(UInt_t stream, UInt_t column, UInt_t row) { UNUSED(stream); UNUSED(column); UNUSED(row); (dynamic_cast (gGrid))->NotImplemented(__func__, __FILE__, __LINE__); return NULL; } //______________________________________________________________________________ UInt_t TJAlien::GetNColumns(UInt_t stream) { UNUSED(stream); (dynamic_cast (gGrid))->NotImplemented(__func__, __FILE__, __LINE__); return -1; } //______________________________________________________________________________ TGridResult *TJAlien::Query(const char *path, const char *pattern, const char *conditions, const char *options) { // this command should create collection using 'find -c' // in TJAlienResult should be path to this created collection // BUT change of java part is needed, this does not work now TODO TString cmdline = TString("find"); TString sPath(path); TString sPattern(pattern); TString sConditions(conditions); TString sOptions(options); if (sOptions.Length() > 0) cmdline += TString(" ") + sOptions; if (sPath.Length() > 0) cmdline += TString(" ") + sPath; if (sPattern.Length() > 0) cmdline += TString(" ") + sPattern; if (sConditions.Length() > 0) cmdline += TString(" ") + sConditions; return Command(cmdline); } //______________________________________________________________________________ Int_t TJAlien::GetExitCode(TJAlienResult *result, TObjString* &message) { // Extracting the exit code and error message from a TJAlien result if (!result) { Error("GetExitCode", "Could not retrieve the exit code, the result is null"); return -1; } TObjString *key = new TObjString(); key->SetString("exitcode"); TObjString *ecStr = result->GetMetaData(key); Int_t exitcode = 0; if (ecStr != NULL) exitcode = ecStr->GetString().Atoi(); if (exitcode != 0) { key->SetString("error"); //message->SetString(result->GetMetaData(key)->GetString().Data()); message = result->GetMetaData(key); } delete key; return exitcode; } //_____________________________________________________________________________ size_t TJAlien::WriteCallback(void *contents, size_t size, size_t nmemb) { size_t realsize = size * nmemb; readBuffer.append((const char*)contents, realsize); return realsize; } //_____________________________________________________________________________ int TJAlien::websocket_write_back(struct lws *wsi_in, const char *str, int str_size_in) { if (str == NULL || wsi_in == NULL) return -1; int n; int len; char *out = NULL; if (str_size_in < 1) len = strlen(str); else len = str_size_in; out = (char *)malloc(sizeof(char)*(LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING)); // setup the buffer memcpy (out + LWS_SEND_BUFFER_PRE_PADDING, str, len ); // write out n = lws_write(wsi_in, (unsigned char*)out + LWS_SEND_BUFFER_PRE_PADDING, len, LWS_WRITE_TEXT); // free the buffer free(out); return n; } //_____________________________________________________________________________ int TJAlien::ws_service_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { // Websocket callback handler UNUSED(len); switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED: { if (gDebug > 1) printf("[Websocket Callback] Connect with server success\n"); connection_flag = 1; break; } case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: { if (gDebug > 1) printf("[Websocket Callback] Connect with server error\n"); destroy_flag = 1; connection_flag = 0; if (!lws_get_context(wsi)) lws_context_destroy(lws_get_context(wsi)); wsi = NULL; break; } case LWS_CALLBACK_CLOSED: { if (gDebug > 1) printf("[Websocket Callback] LWS_CALLBACK_CLOSED\n"); destroy_flag = 1; connection_flag = 0; if (!lws_get_context(wsi)) lws_context_destroy(lws_get_context(wsi)); wsi = NULL; break; } case LWS_CALLBACK_CLIENT_RECEIVE: { if (gDebug > 100) { printf("[Websocket Callback] Client received:%s\n", (char *)in); printf("[Websocket Callback]: %4d (rpp %5d, last %d)\n", (int)len, (int)lws_remaining_packet_payload(wsi), lws_is_final_fragment(wsi)); } readBuffer.append((char*)in); if (lws_is_final_fragment(wsi) != 0) receive_flag = 1; len = 0; break; } case LWS_CALLBACK_CLIENT_WRITEABLE: { if (gDebug > 1) printf("[Websocket Callback] On writeable is called\n"); writeable_flag = 1; break; } #if defined(LWS_OPENSSL_SUPPORT) case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: { if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS is called\n"); //SSL_CTX_set_verify((SSL_CTX*)user, SSL_VERIFY_PEER, NULL); //SSL_set_verify(wsi->ssl, SSL_VERIFY_PEER, NULL); std::string location = "/etc/grid-security/certificates/"; struct stat info; if (stat("/cvmfs/alice.cern.ch", &info) == 0) location = "/cvmfs/alice.cern.ch" + location; std::string capath = std::getenv("X509_CERT_DIR") ? : location; size_t pos = 0; std::string token; // If capath contans two paths separated by ":" while ((pos = capath.find(":")) != std::string::npos) { token = capath.substr(0, pos); if (!SSL_CTX_load_verify_locations((SSL_CTX*)user, NULL, token.c_str())) { if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS failed\n"); return 1; } capath.erase(0, pos + 1); } // If capath is a single path if (capath.length() != 0) if (!SSL_CTX_load_verify_locations((SSL_CTX*)user, NULL, capath.c_str())) { if (gDebug > 1) printf("[Websocket Callback] LOAD_EXTRA_CLIENT_VERIFY_CERTS failed\n"); return 1; } //int preverify_ok; //SSL *ssl = X509_STORE_CTX_get_ex_data((SSL_CTX*)user, SSL_get_ex_data_X509_STORE_CTX_idx()); //user->protocols[0].callback(wsi, LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION, (SSL_CTX*)user, ssl, preverify_ok); break; } case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: { if (gDebug > 1) { printf("[Websocket Callback] LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION is called\n"); printf("[Websocket Callback] x509_ctx: %s\n", ((SSL_CTX*)user)->client_CA); printf("[Websocket Callback] ssl: %s\n", in); printf("[Websocket Callback] SSL_CTX_get_client_CA_list: %s\n", SSL_CTX_get_client_CA_list((SSL_CTX*)user)); } break; } #endif default: break; } return 0; } //______________________________________________________________________________ const char* TJAlien::Whoami() { TJAlienResult* r = (TJAlienResult*)Command("whoami"); const char *username; if (r && (username = r->GetKey(0, "message"))) { return username; } else { Error("TJAlien", "Unable to determine username."); return ""; } } //______________________________________________________________________________ void TJAlien::NotImplemented(const char *func, const char *file, int line) { Error("TJAlien", "You are trying to call:"); Error("TJAlien", " %s", func); Error("TJAlien", " in %s:%d", file, line); Error("TJAlien", "that is NOT IMPLEMENTED."); Error("TJAlien", "If you need this method please contact JAliEn support "); }