Skip to content
Snippets Groups Projects
Commit 3acbd40f authored by Konstantina Skovola's avatar Konstantina Skovola
Browse files

Add cta-admin version command impl

parent 1727bf62
No related branches found
No related tags found
No related merge requests found
...@@ -195,7 +195,10 @@ int main(const int argc, char *const *const argv) { ...@@ -195,7 +195,10 @@ int main(const int argc, char *const *const argv) {
builder.RegisterService(&svc); builder.RegisterService(&svc);
// I need to also register the CtaRpcStream service // I need to also register the CtaRpcStream service
// but need to make it so it has its own catalogue? logger etc? // but need to make it so it has its own catalogue? logger etc?
frontend::grpc::CtaRpcStreamImpl streamSvc(svc.getFrontendService().getCatalogue(), svc.getFrontendService().getScheduler(), svc.getFrontendService().getLogContext()); frontend::grpc::CtaRpcStreamImpl streamSvc(svc.getFrontendService().getCatalogue(),
svc.getFrontendService().getScheduler(),
svc.getFrontendService().getCatalogueConnString(),
svc.getFrontendService().getLogContext());
builder.RegisterService(&streamSvc); builder.RegisterService(&streamSvc);
std::unique_ptr <Server> server(builder.BuildAndStart()); std::unique_ptr <Server> server(builder.BuildAndStart());
......
...@@ -81,6 +81,9 @@ public: ...@@ -81,6 +81,9 @@ public:
case cta::admin::HeaderType::ADMIN_LS: case cta::admin::HeaderType::ADMIN_LS:
m_textFormatter.printAdminLsHeader(); m_textFormatter.printAdminLsHeader();
break; break;
case cta::admin::HeaderType::VERSION_CMD:
m_textFormatter.printVersionHeader();
break;
default: default:
// keep compiler happy // keep compiler happy
break; break;
...@@ -144,6 +147,12 @@ public: ...@@ -144,6 +147,12 @@ public:
m_textFormatter.print(adminLsItem); m_textFormatter.print(adminLsItem);
break; break;
} }
case cta::xrd::Data::kVersionItem:
{
const cta::admin::VersionItem& versionItem = m_response.data().version_item();
m_textFormatter.print(versionItem);
break;
}
default: default:
// keep compiler happy // keep compiler happy
break; break;
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "ServerDiskInstanceLs.hpp" #include "ServerDiskInstanceLs.hpp"
#include "ServerDriveLs.hpp" #include "ServerDriveLs.hpp"
#include "ServerAdminLs.hpp" #include "ServerAdminLs.hpp"
#include "ServerVersion.hpp"
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
...@@ -51,10 +52,11 @@ class CtaRpcStreamImpl : public cta::xrd::CtaRpcStream::CallbackService { ...@@ -51,10 +52,11 @@ class CtaRpcStreamImpl : public cta::xrd::CtaRpcStream::CallbackService {
public: public:
cta::log::LogContext getLogContext() const { return m_lc; } cta::log::LogContext getLogContext() const { return m_lc; }
// CtaRpcStreamImpl() = delete; // CtaRpcStreamImpl() = delete;
CtaRpcStreamImpl(cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler, cta::log::LogContext logContext) : CtaRpcStreamImpl(cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler, std::string connstr, cta::log::LogContext logContext) :
m_lc(logContext), m_lc(logContext),
m_catalogue(catalogue), m_catalogue(catalogue),
m_scheduler(scheduler) {} m_scheduler(scheduler),
m_catalogueConnString(connstr) {}
/* CtaAdminServerWriteReactor is what the type of GenericAdminStream could be */ /* CtaAdminServerWriteReactor is what the type of GenericAdminStream could be */
::grpc::ServerWriteReactor<cta::xrd::StreamResponse>* GenericAdminStream(::grpc::CallbackServerContext* context, const cta::xrd::Request* request); ::grpc::ServerWriteReactor<cta::xrd::StreamResponse>* GenericAdminStream(::grpc::CallbackServerContext* context, const cta::xrd::Request* request);
// ::grpc::ServerWriteReactor<cta::xrd::StreamResponse>* TapeLs(::grpc::CallbackServerContext* context, const cta::xrd::Request* request); // ::grpc::ServerWriteReactor<cta::xrd::StreamResponse>* TapeLs(::grpc::CallbackServerContext* context, const cta::xrd::Request* request);
...@@ -64,6 +66,7 @@ class CtaRpcStreamImpl : public cta::xrd::CtaRpcStream::CallbackService { ...@@ -64,6 +66,7 @@ class CtaRpcStreamImpl : public cta::xrd::CtaRpcStream::CallbackService {
cta::log::LogContext m_lc; // <! Provided by the frontendService cta::log::LogContext m_lc; // <! Provided by the frontendService
cta::catalogue::Catalogue &m_catalogue; //!< Reference to CTA Catalogue cta::catalogue::Catalogue &m_catalogue; //!< Reference to CTA Catalogue
cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler
std::string m_catalogueConnString; //!< Provided by frontendService
// I do not think a reactor could be a member of this class because it must be reinitialized upon each call // I do not think a reactor could be a member of this class because it must be reinitialized upon each call
// CtaAdminServerWriteReactor *m_reactor; // this will have to be initialized to TapeLs or StorageClassLs or whatever... // CtaAdminServerWriteReactor *m_reactor; // this will have to be initialized to TapeLs or StorageClassLs or whatever...
}; };
...@@ -113,6 +116,8 @@ CtaRpcStreamImpl::GenericAdminStream(::grpc::CallbackServerContext* context, con ...@@ -113,6 +116,8 @@ CtaRpcStreamImpl::GenericAdminStream(::grpc::CallbackServerContext* context, con
return new DriveLsWriteReactor(m_catalogue, m_scheduler, m_lc, request); return new DriveLsWriteReactor(m_catalogue, m_scheduler, m_lc, request);
case cmd_pair(cta::admin::AdminCmd::CMD_ADMIN, cta::admin::AdminCmd::SUBCMD_LS): case cmd_pair(cta::admin::AdminCmd::CMD_ADMIN, cta::admin::AdminCmd::SUBCMD_LS):
return new AdminLsWriteReactor(m_catalogue, m_scheduler, request); return new AdminLsWriteReactor(m_catalogue, m_scheduler, request);
case cmd_pair(cta::admin::AdminCmd::CMD_VERSION, cta::admin::AdminCmd::SUBCMD_NONE):
return new VersionWriteReactor(m_catalogue, m_scheduler, m_catalogueConnString, request);
default: default:
// make the compiler happy maybe and return // make the compiler happy maybe and return
return new TapeLsWriteReactor(m_catalogue, m_scheduler, request); return new TapeLsWriteReactor(m_catalogue, m_scheduler, request);
......
// #include "CtaAdminServer.hpp" // need this for the class CtaAdminServerWriteReactor, nothing else
#include <catalogue/Catalogue.hpp>
#include <scheduler/Scheduler.hpp>
#include "cta_frontend.pb.h"
#include "cta_frontend.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include "common/dataStructures/LabelFormatSerDeser.hpp"
#include "frontend/common/Version.hpp"
#include "catalogue/SchemaVersion.hpp"
#include "version.h"
namespace cta::frontend::grpc {
class VersionWriteReactor : public ::grpc::ServerWriteReactor<cta::xrd::StreamResponse> /* CtaAdminServerWriteReactor */ {
public:
VersionWriteReactor(cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler, const std::string &catalogueConnString, const cta::xrd::Request* request);
void OnWriteDone(bool ok) override {
std::cout << "In VersionWriteReactor, we are inside OnWriteDone" << std::endl;
if (!ok) {
std::cout << "Unexpected failure in OnWriteDone" << std::endl;
Finish(Status(::grpc::StatusCode::UNKNOWN, "Unexpected Failure in OnWriteDone"));
}
std::cout << "Calling NextWrite inside server's OnWriteDone" << std::endl;
NextWrite();
}
void OnDone() override;
void NextWrite();
private:
bool m_isHeaderSent;
frontend::Version m_client_versions;
frontend::Version m_server_versions;
std::string m_catalogue_conn_string;
std::string m_catalogue_version;
std::optional<std::string> m_schedulerBackendName;
bool m_is_upgrading;
cta::xrd::StreamResponse m_response;
};
void VersionWriteReactor::OnDone() {
std::cout << "In VersionWriteReactor::OnDone(), about to delete this object" << std::endl;
delete this;
}
VersionWriteReactor::VersionWriteReactor(cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler, const std::string& catalogueConnString, const cta::xrd::Request* request)
: m_isHeaderSent(false),
m_catalogue_conn_string(catalogueConnString),
m_catalogue_version(catalogue.Schema()->getSchemaVersion().getSchemaVersion<std::string>()),
m_schedulerBackendName(scheduler.getSchedulerBackendName()),
m_is_upgrading(catalogue.Schema()->getSchemaVersion().getStatus<catalogue::SchemaVersion::Status>() ==
catalogue::SchemaVersion::Status::UPGRADING) {
std::cout << "In VersionWriteReactor constructor, just entered!" << std::endl;
m_server_versions.ctaVersion = CTA_VERSION;
m_server_versions.protobufTag = XROOTD_SSI_PROTOBUF_INTERFACE_VERSION;
m_client_versions.ctaVersion = request->admincmd().client_version();
m_client_versions.protobufTag = request->admincmd().protobuf_tag();
NextWrite();
}
void VersionWriteReactor::NextWrite() {
std::cout << "In VersionWriteReactor::NextWrite(), just entered!" << std::endl;
m_response.Clear();
// is this the first item? Then write the header
if (!m_isHeaderSent) {
cta::xrd::Response *header = new cta::xrd::Response();
std::cout << "header is not sent, sending the header" << std::endl;
header->set_type(cta::xrd::Response::RSP_SUCCESS);
header->set_show_header(cta::admin::HeaderType::VERSION_CMD);
m_response.set_allocated_header(header); // now the message takes ownership of the allocated object, we don't need to free header
m_isHeaderSent = true;
std::cout << "about to call StartWrite on the server side" << std::endl;
StartWrite(&m_response); // this will trigger the OnWriteDone method
std::cout << "called StartWrite on the server" << std::endl;
return; // because we'll be called in a loop by OnWriteDone
} else {
cta::xrd::Data* data = new cta::xrd::Data();
cta::admin::VersionItem *version = data->mutable_version_item();
auto client_version = version->mutable_client_version();
client_version->set_cta_version(m_client_versions.ctaVersion);
client_version->set_xrootd_ssi_protobuf_interface_version(m_client_versions.protobufTag);
auto server_version = version->mutable_server_version();
server_version->set_cta_version(m_server_versions.ctaVersion);
server_version->set_xrootd_ssi_protobuf_interface_version(m_server_versions.protobufTag);
version->set_catalogue_connection_string(m_catalogue_conn_string);
version->set_catalogue_version(m_catalogue_version);
version->set_is_upgrading(m_is_upgrading);
version->set_scheduler_backend_name(m_schedulerBackendName.value());
std::cout << "Calling StartWrite on the server, with some data this time" << std::endl;
m_response.set_allocated_data(data);
StartWrite(&m_response);
std::cout << "Finishing the call on the server side" << std::endl;
// Finish the call
Finish(::grpc::Status::OK);
}
}
} // namespace cta::frontend::grpc
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment