Commit b9d92084 authored by Luciano Orsini's avatar Luciano Orsini
Browse files

references #118: improving elasticsearch coding in context

parent 638d248e
......@@ -234,7 +234,6 @@ void elastic::timestream::Application::actionPerformed(xdata::Event& event)
this->notifyQualified("fatal", e);
}
try
{
toolbox::net::URN urn("toolbox-mem-pool", "timestream");
......@@ -251,32 +250,32 @@ void elastic::timestream::Application::actionPerformed(xdata::Event& event)
adispatcher_ = new toolbox::task::AsynchronousEventDispatcher ("elastic-timestream-dispatcher", "waiting", 0.6, committedQueueSize_);
adispatcher_->addActionListener(this);
toolbox::Properties properties;
if ( httpVerbose_ )
{
properties.setProperty("urn:es-api-stream:CURLOPT_VERBOSE","true");
}
if ( tcpNoDelay_ )
{
properties.setProperty("urn:es-api-stream:CURLOPT_TCP_NODELAY","true");
}
LOG4CPLUS_INFO(this->getApplicationLogger(), "Attaching to elastic search...");
if ( (bool)(elasticsearchConnectionForbidReuse_) )
try
{
properties.setProperty("urn:es-api-stream:CURLOPT_FORBID_REUSE", "true");
}
if ( dynamicMetadata_ )
{
toolbox::Properties properties;
if ( httpVerbose_ )
{
properties.setProperty("urn:es-api-stream:CURLOPT_VERBOSE","true");
}
if ( tcpNoDelay_ )
{
properties.setProperty("urn:es-api-stream:CURLOPT_TCP_NODELAY","true");
}
properties.setProperty("urn:es-api-cluster:number-of-channels", numberOfChannels_.toString());
LOG4CPLUS_INFO(this->getApplicationLogger(), "Attaching to elastic search...");
if ( (bool)(elasticsearchConnectionForbidReuse_) )
{
properties.setProperty("urn:es-api-stream:CURLOPT_FORBID_REUSE", "true");
}
member_ = new elastic::api::Member(this, properties);
properties.setProperty("urn:es-api-cluster:number-of-channels", numberOfChannels_.toString());
member_ = new elastic::api::Member(this, properties);
try
{
if ( dynamicMetadata_ )
{
//curl -XPOST -H 'Content-Type: application/json' 'http://cmsos-iaas-cdaq.cms:9200/cmsos-meta-development-tags/_doc/_search?pretty' -d '{ "size": 1, "sort": { "timestamp": "desc"}, "query": { "match_all": {}}}'
std::string zone = this->getApplicationContext()->getDefaultZoneName();
elastic::api::Cluster& cluster = member_->joinCluster(elasticsearchClusterUrl_.toString());
......
......@@ -212,31 +212,32 @@ void xmas::sensord::Application::actionPerformed (xdata::Event& event)
}
toolbox::Properties properties;
if ( httpVerbose_ )
{
properties.setProperty("urn:es-api-stream:CURLOPT_VERBOSE","true");
}
if ( tcpNoDelay_ )
try
{
properties.setProperty("urn:es-api-stream:CURLOPT_TCP_NODELAY","true");
}
if ( dynamicMetadata_)
{
toolbox::Properties properties;
if ( httpVerbose_ )
{
properties.setProperty("urn:es-api-stream:CURLOPT_VERBOSE","true");
}
if ( tcpNoDelay_ )
{
properties.setProperty("urn:es-api-stream:CURLOPT_TCP_NODELAY","true");
}
LOG4CPLUS_INFO(this->getApplicationLogger(), "Attaching to elastic search...");
LOG4CPLUS_INFO(this->getApplicationLogger(), "Attaching to elastic search...");
if ( (bool)(elasticsearchConnectionForbidReuse_) )
{
properties.setProperty("urn:es-api-stream:CURLOPT_FORBID_REUSE", "true");
}
if ( (bool)(elasticsearchConnectionForbidReuse_) )
{
properties.setProperty("urn:es-api-stream:CURLOPT_FORBID_REUSE", "true");
}
properties.setProperty("urn:es-api-cluster:number-of-channels", numberOfChannels_.toString());
properties.setProperty("urn:es-api-cluster:number-of-channels", numberOfChannels_.toString());
member_ = new elastic::api::Member(this, properties);
try
{
if ( dynamicMetadata_)
{
member_ = new elastic::api::Member(this, properties);
//curl -XPOST -H 'Content-Type: application/json' 'http://cmsos-iaas-cdaq.cms:9200/cmsos-meta-development-tags/_doc/_search?pretty' -d '{ "size": 1, "sort": { "timestamp": "desc"}, "query": { "match_all": {}}}'
std::string zone = this->getApplicationContext()->getDefaultZoneName();
elastic::api::Cluster& cluster = member_->joinCluster(elasticsearchClusterUrl_.toString());
......
......@@ -217,96 +217,96 @@ void xmas::slash2g::Application::actionPerformed( xdata::Event& event)
if (event.type() == "urn:xdaq-event:setDefaultValues")
{
toolbox::Properties properties;
if ( httpVerbose_ )
{
properties.setProperty("urn:es-api-stream:CURLOPT_VERBOSE","true");
}
if ( tcpNoDelay_ )
try
{
properties.setProperty("urn:es-api-stream:CURLOPT_TCP_NODELAY","true");
}
// dynamicMetadata
if ( dynamicMetadata_ )
{
toolbox::Properties properties;
if ( httpVerbose_ )
{
properties.setProperty("urn:es-api-stream:CURLOPT_VERBOSE","true");
}
if ( tcpNoDelay_ )
{
properties.setProperty("urn:es-api-stream:CURLOPT_TCP_NODELAY","true");
}
LOG4CPLUS_INFO(this->getApplicationLogger(), "Attaching to elastic search...");
LOG4CPLUS_INFO(this->getApplicationLogger(), "Attaching to elastic search...");
if ( (bool)(elasticsearchConnectionForbidReuse_) )
{
properties.setProperty("urn:es-api-stream:CURLOPT_FORBID_REUSE", "true");
}
if ( (bool)(elasticsearchConnectionForbidReuse_) )
{
properties.setProperty("urn:es-api-stream:CURLOPT_FORBID_REUSE", "true");
}
properties.setProperty("urn:es-api-cluster:number-of-channels", numberOfChannels_.toString());
properties.setProperty("urn:es-api-cluster:number-of-channels", numberOfChannels_.toString());
member_ = new elastic::api::Member(this, properties);
member_ = new elastic::api::Member(this, properties);
try
{
// dynamicMetadata
if ( dynamicMetadata_ )
{
//curl -XPOST -H 'Content-Type: application/json' 'http://cmsos-iaas-cdaq.cms:9200/cmsos-meta-development-tags/_doc/_search?pretty' -d '{ "size": 1, "sort": { "timestamp": "desc"}, "query": { "match_all": {}}}'
std::string zone = this->getApplicationContext()->getDefaultZoneName();
elastic::api::Cluster& cluster = member_->joinCluster(elasticsearchClusterUrl_.toString());
if ( autoTag_ != "" ) // retrieve tag from elasticsearch
{
tag_ = "";
std::string tagsIndex = elastic::api::ElasticMetaPrefix + zone + "-tags-registry";
json_t *tquery = json_pack("{s:i,s:{s: s},s:{s:{s:s}}}", "size", 1, "sort", "timestamp", "desc", "query", "term", "autotagtype", autoTag_.toString().c_str());
char * strQuery = json_dumps(tquery, 0);
LOG4CPLUS_DEBUG(this->getApplicationLogger(), "Tags index: '" << tagsIndex << "' auto tag query: '" << strQuery << "'");
free(strQuery);
json_t * tj = cluster.search(tagsIndex, "_doc", "", tquery);
nlohmann::json tags = elastic::api::janson2nlohmann(tj);
json_decref(tj);
json_decref(tquery);
nlohmann::json hits = tags["hits"]["hits"];
if ( hits.empty() )
//curl -XPOST -H 'Content-Type: application/json' 'http://cmsos-iaas-cdaq.cms:9200/cmsos-meta-development-tags/_doc/_search?pretty' -d '{ "size": 1, "sort": { "timestamp": "desc"}, "query": { "match_all": {}}}'
std::string zone = this->getApplicationContext()->getDefaultZoneName();
elastic::api::Cluster& cluster = member_->joinCluster(elasticsearchClusterUrl_.toString());
if ( autoTag_ != "" ) // retrieve tag from elasticsearch
{
std::stringstream msg;
msg << "Cannot find autotag '" << autoTag_.toString() << "'";
XCEPT_DECLARE(xmas::slash2g::exception::Exception, q, msg.str());
this->notifyQualified("fatal", q);
return;
}
//std::cout << "dump tags json: " << hits.dump(4) << std::endl;
for (auto && hit:hits)
{
//std::cout << "dump doc json: " << (*itt).dump(4) << std::endl;
nlohmann::json entry = hit["_source"];
if ( entry.find("tag") != entry.end() )
{
std::string tag = entry["tag"];
tag_ = tag;
LOG4CPLUS_INFO(this->getApplicationLogger(), "Found auto tag '" << tag_.toString() << "'");
break;
}
else
tag_ = "";
std::string tagsIndex = elastic::api::ElasticMetaPrefix + zone + "-tags-registry";
json_t *tquery = json_pack("{s:i,s:{s: s},s:{s:{s:s}}}", "size", 1, "sort", "timestamp", "desc", "query", "term", "autotagtype", autoTag_.toString().c_str());
char * strQuery = json_dumps(tquery, 0);
LOG4CPLUS_DEBUG(this->getApplicationLogger(), "Tags index: '" << tagsIndex << "' auto tag query: '" << strQuery << "'");
free(strQuery);
json_t * tj = cluster.search(tagsIndex, "_doc", "", tquery);
nlohmann::json tags = elastic::api::janson2nlohmann(tj);
json_decref(tj);
json_decref(tquery);
nlohmann::json hits = tags["hits"]["hits"];
if ( hits.empty() )
{
std::stringstream msg;
msg << "Failed to retrieve auto tag";
msg << "Cannot find autotag '" << autoTag_.toString() << "'";
XCEPT_DECLARE(xmas::slash2g::exception::Exception, q, msg.str());
this->notifyQualified("fatal", q);
return;
}
//std::cout << "dump tags json: " << hits.dump(4) << std::endl;
for (auto && hit:hits)
{
//std::cout << "dump doc json: " << (*itt).dump(4) << std::endl;
nlohmann::json entry = hit["_source"];
if ( entry.find("tag") != entry.end() )
{
std::string tag = entry["tag"];
tag_ = tag;
LOG4CPLUS_INFO(this->getApplicationLogger(), "Found auto tag '" << tag_.toString() << "'");
break;
}
else
{
std::stringstream msg;
msg << "Failed to retrieve auto tag";
XCEPT_DECLARE(xmas::slash2g::exception::Exception, q, msg.str());
this->notifyQualified("fatal", q);
return;
}
}
}
this->applyJSONSettings();
}
else
{
// Read the configuration file
this->applySettings( settings_.toString() );
}
this->applyJSONSettings();
}
else
catch (elastic::api::exception::Exception & e )
{
// Read the configuration file
this->applySettings( settings_.toString() );
}
std::stringstream msg;
msg << "Failed to configure";
XCEPT_DECLARE_NESTED(xmas::slash2g::exception::Exception, q, msg.str(),e);
this->notifyQualified("fatal", q);
return;
}
catch (elastic::api::exception::Exception & e )
{
std::stringstream msg;
msg << "Failed to configure";
XCEPT_DECLARE_NESTED(xmas::slash2g::exception::Exception, q, msg.str(),e);
this->notifyQualified("fatal", q);
return;
}
try
{
......@@ -342,37 +342,6 @@ void xmas::slash2g::Application::actionPerformed( xdata::Event& event)
interval.fromString("PT1S");
timer->scheduleAtFixedRate( start, this,interval , 0, "urn:xmas:slash2g-check" );
/*
// Load collector senttings for keys
//
LOG4CPLUS_INFO (this->getApplicationLogger(), "Load keys from '" << brokerCollectorSettings_.toString() << "'");
DOMDocument* doc = xoap::getDOMParserFactory()->get("configure")->loadXML( brokerCollectorSettings_.toString() );
DOMNodeList* flashlistList = doc->getElementsByTagNameNS (xoap::XStr("http://xdaq.web.cern.ch/xdaq/xsd/2006/xmas-10"), xoap::XStr("flashlist"));
for (XMLSize_t i = 0; i < flashlistList->getLength(); i++)
{
DOMNode * flashlistNode = flashlistList->item(i);
std::string flashlistName = xoap::getNodeAttribute (flashlistNode, "name");
DOMNodeList* collectorList = flashlistNode->getChildNodes();
for (XMLSize_t j = 0 ; j < collectorList->getLength() ; j++)
{
DOMNode * collectorNode = collectorList->item(j);
std::string nodeName = xoap::XMLCh2String(collectorNode->getLocalName());
if(nodeName == "collector")
{
std::string haskKey = xoap::getNodeAttribute (collectorNode, "hashkey");
hashKeys_[flashlistName] = toolbox::parseTokenSet(haskKey,",");
LOG4CPLUS_DEBUG( this->getApplicationLogger(), "Loaded haskey:[" << haskKey << "] for flashlist:" << flashlistName );
}
}
}
*/
//
//
}
else
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment