Commit 638d248e authored by Luciano Orsini's avatar Luciano Orsini
Browse files

references #117: improve fault tolerance when cluster is not accessible

parent cf9aef1e
......@@ -485,7 +485,7 @@ json_t * elastic::api::Stream::request(XMLURL & xmlUrl, elastic::api::HTTPMethod
if(res != CURLE_OK)
{
std::stringstream msg;
msg << " Failed to perform request: " << curl_easy_strerror(res);
msg << " Failed to perform request to: " << url << " with error: " << curl_easy_strerror(res);
XCEPT_RAISE(elastic::api::exception::Exception, msg.str());
}
//long http_code;
......
......@@ -272,70 +272,84 @@ void elastic::timestream::Application::actionPerformed(xdata::Event& event)
member_ = new elastic::api::Member(this, properties);
if ( dynamicMetadata_ )
try
{
//curl -XPOST -H 'Content-Type: application/json' 'http://cmsos-iaas-cdaq.cms:9200/cmsos-meta-development-tags/_doc/_search?pretty' -d '{ "size": 1, "sort": { "timestamp": "desc"}, "query": { "match_all": {}}}'
std::string zone = this->getApplicationContext()->getDefaultZoneName();
elastic::api::Cluster& cluster = member_->joinCluster(elasticsearchClusterUrl_.toString());
if ( autoTag_ != "" ) // retrieve tag from elasticsearch
if ( dynamicMetadata_ )
{
tag_ = "";
std::string tagsIndex = elastic::api::ElasticMetaPrefix + zone + "-tags-registry";
json_t *tquery = json_pack("{s:i,s:{s: s},s:{s:{s:s}}}", "size", 1, "sort", "timestamp", "desc", "query", "term", "autotagtype", autoTag_.toString().c_str());
char * strQuery = json_dumps(tquery, 0);
LOG4CPLUS_DEBUG(this->getApplicationLogger(), "Tags index: '" << tagsIndex << "' auto tag query: '" << strQuery << "'");
free(strQuery);
json_t * tj = cluster.search(tagsIndex, "_doc", "", tquery);
nlohmann::json tags = elastic::api::janson2nlohmann(tj);
json_decref(tj);
json_decref(tquery);
nlohmann::json hits = tags["hits"]["hits"];
if ( hits.empty() )
//curl -XPOST -H 'Content-Type: application/json' 'http://cmsos-iaas-cdaq.cms:9200/cmsos-meta-development-tags/_doc/_search?pretty' -d '{ "size": 1, "sort": { "timestamp": "desc"}, "query": { "match_all": {}}}'
std::string zone = this->getApplicationContext()->getDefaultZoneName();
elastic::api::Cluster& cluster = member_->joinCluster(elasticsearchClusterUrl_.toString());
if ( autoTag_ != "" ) // retrieve tag from elasticsearch
{
std::stringstream msg;
msg << "Cannot find autotag '" << autoTag_.toString() << "'";
XCEPT_DECLARE(elastic::timestream::exception::Exception, q, msg.str());
this->notifyQualified("fatal", q);
return;
}
//std::cout << "dump tags json: " << hits.dump(4) << std::endl;
for (auto && hit:hits)
{
//std::cout << "dump doc json: " << (*itt).dump(4) << std::endl;
nlohmann::json entry = hit["_source"];
if ( entry.find("tag") != entry.end() )
{
std::string tag = entry["tag"];
tag_ = tag;
LOG4CPLUS_INFO(this->getApplicationLogger(), "Found auto tag '" << tag_.toString() << "'");
break;
}
else
tag_ = "";
std::string tagsIndex = elastic::api::ElasticMetaPrefix + zone + "-tags-registry";
json_t *tquery = json_pack("{s:i,s:{s: s},s:{s:{s:s}}}", "size", 1, "sort", "timestamp", "desc", "query", "term", "autotagtype", autoTag_.toString().c_str());
char * strQuery = json_dumps(tquery, 0);
LOG4CPLUS_DEBUG(this->getApplicationLogger(), "Tags index: '" << tagsIndex << "' auto tag query: '" << strQuery << "'");
free(strQuery);
json_t * tj = cluster.search(tagsIndex, "_doc", "", tquery);
nlohmann::json tags = elastic::api::janson2nlohmann(tj);
json_decref(tj);
json_decref(tquery);
nlohmann::json hits = tags["hits"]["hits"];
if ( hits.empty() )
{
std::stringstream msg;
msg << "Failed to retrieve auto tag";
msg << "Cannot find autotag '" << autoTag_.toString() << "'";
XCEPT_DECLARE(elastic::timestream::exception::Exception, q, msg.str());
this->notifyQualified("fatal", q);
return;
}
//std::cout << "dump tags json: " << hits.dump(4) << std::endl;
for (auto && hit:hits)
{
//std::cout << "dump doc json: " << (*itt).dump(4) << std::endl;
nlohmann::json entry = hit["_source"];
if ( entry.find("tag") != entry.end() )
{
std::string tag = entry["tag"];
tag_ = tag;
LOG4CPLUS_INFO(this->getApplicationLogger(), "Found auto tag '" << tag_.toString() << "'");
break;
}
else
{
std::stringstream msg;
msg << "Failed to retrieve auto tag";
XCEPT_DECLARE(elastic::timestream::exception::Exception, q, msg.str());
this->notifyQualified("fatal", q);
return;
}
}
}
}
}
if ( enable_ )
{
if ( dynamicMetadata_ )
{
this->applyCollectorSettingsJSON();
}
else
if ( enable_ )
{
this->applyCollectorSettingsXML();
if ( dynamicMetadata_ )
{
this->applyCollectorSettingsJSON();
}
else
{
this->applyCollectorSettingsXML();
}
}
} // trycatch
catch (elastic::api::exception::Exception & e )
{
std::stringstream msg;
msg << "Failed to configure";
XCEPT_DECLARE_NESTED(elastic::timestream::exception::Exception, q, msg.str(),e);
this->notifyQualified("fatal", q);
return;
}
toolbox::task::Timer * timer = 0;
// Create timer for refreshing subscriptions
if ( !toolbox::task::getTimerFactory()->hasTimer("urn:elastic:statistics-timer") )
......
......@@ -233,125 +233,136 @@ void xmas::sensord::Application::actionPerformed (xdata::Event& event)
member_ = new elastic::api::Member(this, properties);
if ( dynamicMetadata_)
try
{
//curl -XPOST -H 'Content-Type: application/json' 'http://cmsos-iaas-cdaq.cms:9200/cmsos-meta-development-tags/_doc/_search?pretty' -d '{ "size": 1, "sort": { "timestamp": "desc"}, "query": { "match_all": {}}}'
std::string zone = this->getApplicationContext()->getDefaultZoneName();
elastic::api::Cluster& cluster = member_->joinCluster(elasticsearchClusterUrl_.toString());
if ( autoTag_ != "" ) // retrieve tag from elasticsearch
if ( dynamicMetadata_)
{
tag_ = "";
std::string tagsIndex = elastic::api::ElasticMetaPrefix + zone + "-tags-registry";
json_t *tquery = json_pack("{s:i,s:{s: s},s:{s:{s:s}}}", "size", 1, "sort", "timestamp", "desc", "query", "term", "autotagtype", autoTag_.toString().c_str());
char * strQuery = json_dumps(tquery, 0);
LOG4CPLUS_DEBUG(this->getApplicationLogger(), "Tags index: '" << tagsIndex << "' auto tag query: '" << strQuery << "'");
free(strQuery);
json_t * tj = cluster.search(tagsIndex, "_doc", "", tquery);
nlohmann::json tags = elastic::api::janson2nlohmann(tj);
json_decref(tj);
json_decref(tquery);
nlohmann::json hits = tags["hits"]["hits"];
if ( hits.empty() )
{
std::stringstream msg;
msg << "Cannot find autotag '" << autoTag_.toString() << "'";
XCEPT_DECLARE(xmas::sensord::exception::Exception, q, msg.str());
this->notifyQualified("fatal", q);
return;
}
//std::cout << "dump tags json: " << hits.dump(4) << std::endl;
for (auto && hit:hits)
//curl -XPOST -H 'Content-Type: application/json' 'http://cmsos-iaas-cdaq.cms:9200/cmsos-meta-development-tags/_doc/_search?pretty' -d '{ "size": 1, "sort": { "timestamp": "desc"}, "query": { "match_all": {}}}'
std::string zone = this->getApplicationContext()->getDefaultZoneName();
elastic::api::Cluster& cluster = member_->joinCluster(elasticsearchClusterUrl_.toString());
if ( autoTag_ != "" ) // retrieve tag from elasticsearch
{
//std::cout << "dump doc json: " << (*itt).dump(4) << std::endl;
nlohmann::json entry = hit["_source"];
if ( entry.find("tag") != entry.end() )
{
std::string tag = entry["tag"];
tag_ = tag;
LOG4CPLUS_INFO(this->getApplicationLogger(), "Found auto tag '" << tag_.toString() << "'");
break;
}
else
tag_ = "";
std::string tagsIndex = elastic::api::ElasticMetaPrefix + zone + "-tags-registry";
json_t *tquery = json_pack("{s:i,s:{s: s},s:{s:{s:s}}}", "size", 1, "sort", "timestamp", "desc", "query", "term", "autotagtype", autoTag_.toString().c_str());
char * strQuery = json_dumps(tquery, 0);
LOG4CPLUS_DEBUG(this->getApplicationLogger(), "Tags index: '" << tagsIndex << "' auto tag query: '" << strQuery << "'");
free(strQuery);
json_t * tj = cluster.search(tagsIndex, "_doc", "", tquery);
nlohmann::json tags = elastic::api::janson2nlohmann(tj);
json_decref(tj);
json_decref(tquery);
nlohmann::json hits = tags["hits"]["hits"];
if ( hits.empty() )
{
std::stringstream msg;
msg << "Failed to retrieve auto tag";
msg << "Cannot find autotag '" << autoTag_.toString() << "'";
XCEPT_DECLARE(xmas::sensord::exception::Exception, q, msg.str());
this->notifyQualified("fatal", q);
return;
}
}
}
std::string index = elastic::api::ElasticMetaPrefix + zone + "-" + tag_.toString() + "-flashlists-registry";
if (! cluster.exists(index,""))
{
std::stringstream msg;
msg << "Invalid cmsos tag '" << tag_.toString() << "' does not exist";
XCEPT_DECLARE(xmas::sensord::exception::Exception, q, msg.str());
this->notifyQualified("fatal", q);
return;
}
// cache flashlists json format
json_t *fquery = json_pack("{'query': {'match_all': {}}}");
json_t * fj = cluster.search(index, "_doc", "size=2048", fquery);
nlohmann::json flashlistsJSONCache = elastic::api::janson2nlohmann(fj);
//std::cout << "dump flashlists json: " << fj["hits"]["hits"].dump(4) << std::endl;
json_decref(fj);
json_decref(fquery);
this->applySensorSettingsJSON(flashlistsJSONCache);
}
else
{
std::string path = autoConfSearchPath_.toString();
std::vector < std::string > files;
if (path.find("http") != std::string::npos)
{
try
{
files = this->getFileListing(path);
for (std::vector<std::string>::iterator j = files.begin(); j != files.end(); j++)
//std::cout << "dump tags json: " << hits.dump(4) << std::endl;
for (auto && hit:hits)
{
(*j) = path + "/" + (*j);
//std::cout << "dump doc json: " << (*itt).dump(4) << std::endl;
nlohmann::json entry = hit["_source"];
if ( entry.find("tag") != entry.end() )
{
std::string tag = entry["tag"];
tag_ = tag;
LOG4CPLUS_INFO(this->getApplicationLogger(), "Found auto tag '" << tag_.toString() << "'");
break;
}
else
{
std::stringstream msg;
msg << "Failed to retrieve auto tag";
XCEPT_DECLARE(xmas::sensord::exception::Exception, q, msg.str());
this->notifyQualified("fatal", q);
return;
}
}
}
catch (xmas::sensord::exception::Exception & e)
std::string index = elastic::api::ElasticMetaPrefix + zone + "-" + tag_.toString() + "-flashlists-registry";
if (! cluster.exists(index,""))
{
XCEPT_DECLARE_NESTED(xmas::sensord::exception::Exception, q, "Failed to get sensor settings", e);
std::stringstream msg;
msg << "Invalid cmsos tag '" << tag_.toString() << "' does not exist";
XCEPT_DECLARE(xmas::sensord::exception::Exception, q, msg.str());
this->notifyQualified("fatal", q);
return;
}
// cache flashlists json format
json_t *fquery = json_pack("{'query': {'match_all': {}}}");
json_t * fj = cluster.search(index, "_doc", "size=2048", fquery);
nlohmann::json flashlistsJSONCache = elastic::api::janson2nlohmann(fj);
//std::cout << "dump flashlists json: " << fj["hits"]["hits"].dump(4) << std::endl;
json_decref(fj);
json_decref(fquery);
this->applySensorSettingsJSON(flashlistsJSONCache);
}
else
{
// if no search path provided, rely on runtime information
if (path != "")
std::string path = autoConfSearchPath_.toString();
std::vector < std::string > files;
if (path.find("http") != std::string::npos)
{
try
{
path = path + "/*.sensor";
files = toolbox::getRuntime()->expandPathName(path);
files = this->getFileListing(path);
for (std::vector<std::string>::iterator j = files.begin(); j != files.end(); j++)
{
(*j) = path + "/" + (*j);
}
}
catch (toolbox::exception::Exception& e)
catch (xmas::sensord::exception::Exception & e)
{
std::stringstream msg;
msg << "Failed to import flashlist definitions from '" << path << "', ";
XCEPT_DECLARE_NESTED(xmas::sensord::exception::Exception, q, msg.str(), e);
this->notifyQualified("fatal",q);
XCEPT_DECLARE_NESTED(xmas::sensord::exception::Exception, q, "Failed to get sensor settings", e);
this->notifyQualified("fatal", q);
return;
}
}
}
else
{
// if no search path provided, rely on runtime information
if (path != "")
{
try
{
path = path + "/*.sensor";
files = toolbox::getRuntime()->expandPathName(path);
}
catch (toolbox::exception::Exception& e)
{
std::stringstream msg;
msg << "Failed to import flashlist definitions from '" << path << "', ";
XCEPT_DECLARE_NESTED(xmas::sensord::exception::Exception, q, msg.str(), e);
this->notifyQualified("fatal",q);
return;
}
}
}
LOG4CPLUS_INFO(this->getApplicationLogger(), "Found " << files.size() << " sensor files");
LOG4CPLUS_INFO(this->getApplicationLogger(), "Found " << files.size() << " sensor files");
this->applySensorSettings(files);
this->applySensorSettings(files);
}
}
catch (elastic::api::exception::Exception & e )
{
std::stringstream msg;
msg << "Failed to configure";
XCEPT_DECLARE_NESTED(xmas::sensord::exception::Exception, q, msg.str(),e);
this->notifyQualified("fatal", q);
return;
}
}
else
......
......@@ -238,6 +238,8 @@ void xmas::slash2g::Application::actionPerformed( xdata::Event& event)
member_ = new elastic::api::Member(this, properties);
try
{
// dynamicMetadata
if ( dynamicMetadata_ )
{
......@@ -296,6 +298,15 @@ void xmas::slash2g::Application::actionPerformed( xdata::Event& event)
// Read the configuration file
this->applySettings( settings_.toString() );
}
}
catch (elastic::api::exception::Exception & e )
{
std::stringstream msg;
msg << "Failed to configure";
XCEPT_DECLARE_NESTED(xmas::slash2g::exception::Exception, q, msg.str(),e);
this->notifyQualified("fatal", q);
return;
}
try
{
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment