diff --git a/src/configs/input-hw.json b/src/configs/input-hw.json index 4f1ebafe8f3be3749388ef10b5f6ca35563ff667..6ec89242c2db6ca4fd9e796a713e6767506ac191 100644 --- a/src/configs/input-hw.json +++ b/src/configs/input-hw.json @@ -6,7 +6,11 @@ "datasinks":{ "Console":{ "sinktype": "ConsoleSink" - } + }, + "File":{ + "sinktype": "CSVSink", + "directory": "myOutputData" + } }, "devices": { "PS" : { diff --git a/src/libDataSink/CMakeLists.txt b/src/libDataSink/CMakeLists.txt index 602e2bd35b0a8a71bccbd0c8a473eac1e5ffb5fb..fa757053b22e29e4e7526b18658c62cc74f02b6e 100644 --- a/src/libDataSink/CMakeLists.txt +++ b/src/libDataSink/CMakeLists.txt @@ -3,6 +3,7 @@ target_sources(DataSink PRIVATE IDataSink.cpp ConsoleSink.cpp + CSVSink.cpp DataSinkRegistry.cpp ) target_link_libraries(DataSink PRIVATE Utils) diff --git a/src/libDataSink/CSVSink.cpp b/src/libDataSink/CSVSink.cpp new file mode 100644 index 0000000000000000000000000000000000000000..414ab539d3cc215583870b34881e70d97db36722 --- /dev/null +++ b/src/libDataSink/CSVSink.cpp @@ -0,0 +1,265 @@ +#include "CSVSink.h" + +#include <algorithm> +#include <iostream> +#include <iomanip> + +#include "DataSinkRegistry.h" +REGISTER_DATASINK(CSVSink) + +CSVSink::CSVSink(const std::string& name) + : IDataSink(name) +{ } + +void CSVSink::setConfiguration(const nlohmann::json& config) +{ + for (const auto &kv : config.items()) + { + if(kv.key()=="directory") + { + m_directory=kv.value(); + } + } +} + +void CSVSink::setTag(const std::string& name, const std::string& value) +{ + std::string newTag = checkCSVEncoding(name); + checkTag(newTag); + m_tagsString[newTag]=checkCSVEncoding(value); +} + +void CSVSink::setTag(const std::string& name, int8_t value) +{ + std::string newTag = checkCSVEncoding(name); + checkTag(newTag); + m_tagsInt8[newTag]=value; +} + +void CSVSink::setTag(const std::string& name, int32_t value) +{ + std::string newTag = checkCSVEncoding(name); + checkTag(newTag); + m_tagsInt32[newTag]=value; +} + +void CSVSink::setTag(const std::string& name, int64_t value) +{ + std::string newTag = checkCSVEncoding(name); + checkTag(newTag); + m_tagsInt64[newTag]=value; +} + +void CSVSink::setTag(const std::string& name, double value) +{ + std::string newTag = checkCSVEncoding(name); + checkTag(newTag); + m_tagsDouble[newTag]=value; +} + +void CSVSink::startMeasurement(const std::string& measurement, std::chrono::time_point<std::chrono::system_clock> time) +{ + // Clean-up last measurement + m_fields.clear(); + m_fieldsString.clear(); + m_fieldsInt8 .clear(); + m_fieldsInt32 .clear(); + m_fieldsInt64 .clear(); + m_fieldsDouble.clear(); + + // State + m_finalSchema=false; + m_inMeasurement=true; + + m_measurement=measurement; + + // Convert the timestamp to ms + m_timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(time.time_since_epoch()).count(); + + std::string fileName = m_directory+"/"+m_measurement+".csv"; + + // Check if file exists + std::ifstream rfile; + rfile.open(fileName); + m_fileExist = !(rfile.fail()); + rfile.close(); + + // Open CSV file + m_ofile.open (fileName, std::ios_base::app); + if (!(m_ofile.is_open())) + throw std::runtime_error("Could not open file: "+fileName); +} + +void CSVSink::setField(const std::string& name, const std::string& value) +{ + std::string newField = checkCSVEncoding(name); + addField(newField); + m_fieldsString[newField]=checkCSVEncoding(value); +} + +void CSVSink::setField(const std::string& name, int8_t value) +{ + std::string newField = checkCSVEncoding(name); + addField(newField); + m_fieldsInt8[newField]=value; +} + +void CSVSink::setField(const std::string& name, int32_t value) +{ + std::string newField = checkCSVEncoding(name); + addField(newField); + m_fieldsInt32[newField]=value; +} + +void CSVSink::setField(const std::string& name, int64_t value) +{ + std::string newField = checkCSVEncoding(name); + addField(newField); + m_fieldsInt64[newField]=value; +} + +void CSVSink::setField(const std::string& name, double value) +{ + std::string newField = checkCSVEncoding(name); + addField(newField); + m_fieldsDouble[newField]=value; +} + +void CSVSink::recordPoint() +{ + // + // Print the header + if(!(m_fileExist)) + { // First time recording point in CSV file, print header + printHeader(m_ofile); + } + + // Print timestamp + m_ofile << m_timestamp; + + // + // Print the tags + for(const std::string& tag : m_tags) + { + m_ofile << ","; + + if(m_tagsString.count(tag)>0) + m_ofile << m_tagsString[tag]; + + if(m_tagsInt8.count(tag)>0) + m_ofile << m_tagsInt8 [tag]; + + if(m_tagsInt32.count(tag)>0) + m_ofile << m_tagsInt32 [tag]; + + if(m_tagsInt64.count(tag)>0) + m_ofile << m_tagsInt64 [tag]; + + if(m_tagsDouble.count(tag)>0) + m_ofile << m_tagsDouble[tag]; + + } + + // + // Print the fields + for(const std::string& field : m_fields) + { + m_ofile << ","; + + if(m_fieldsString.count(field)>0) + m_ofile << m_fieldsString[field]; + + if(m_fieldsInt8.count(field)>0) + m_ofile << m_fieldsInt8[field]; + + if(m_fieldsInt32.count(field)>0) + m_ofile << m_fieldsInt32[field]; + + if(m_fieldsInt64.count(field)>0) + m_ofile << m_fieldsInt64[field]; + + if(m_fieldsDouble.count(field)>0) + m_ofile << m_fieldsDouble[field]; + } + m_ofile << std::endl; + + m_finalSchema=true; +} + +void CSVSink::endMeasurement() +{ + m_inMeasurement=false; + m_ofile.close(); +} + +std::string CSVSink::checkCSVEncoding(const std::string& name) +{ + std::string newstr = name; + + // check if the name contains a comma + if (name.find(',') != std::string::npos) + { + size_t start_pos = 0; + static const std::string sold = "\""; + static const std::string snew = "\"\""; + + // check if string has quotation mark, if so replace by double quotation marks + while((start_pos = newstr.find(sold, start_pos)) != std::string::npos) { + newstr.replace(start_pos, sold.length(), snew); + start_pos += snew.length(); + } + + // enclose the string in double quotes + newstr = "\""+newstr+"\""; + } + return newstr; +} + + +void CSVSink::checkTag(const std::string& name) +{ + if(checkReserved(name)) + throw std::runtime_error("Tag name \""+name+"\" is reserved."); + + + if(std::find(m_tags.begin(), m_tags.end(), name)==m_tags.end()) + { + if(m_inMeasurement) + throw std::runtime_error("CSVSink: Cannot change tag during measurement."); + + m_tags.push_back(name); + } + +} + +void CSVSink::addField(const std::string& name) +{ + if(checkReserved(name)) + throw std::runtime_error("Tag name \""+name+"\" is reserved."); + + if(std::find(m_fields.begin(), m_fields.end(), name)==m_fields.end()) + { + if(m_finalSchema) + throw std::runtime_error("CSVSink: Cannot add new fields after the first recordPoint."); + + m_fields.push_back(name); + } +} + +void CSVSink::printHeader(std::ofstream& ofile) +{ + ofile << "time"; + + // Print out the tags + for(const std::string& tag : m_tags) + { + ofile << "," << tag ; + } + + // Print field names + for(const std::string& field : m_fields) + { + ofile << "," << field; + } + ofile << std::endl; +} diff --git a/src/libDataSink/CSVSink.h b/src/libDataSink/CSVSink.h new file mode 100644 index 0000000000000000000000000000000000000000..fe795cfcdd159e9ef226d928de8827b85d1f43c1 --- /dev/null +++ b/src/libDataSink/CSVSink.h @@ -0,0 +1,146 @@ +#ifndef CSVSINK_H +#define CSVSINK_H + +#include <vector> +#include <unordered_map> +#include <fstream> + +#include "IDataSink.h" + +//! \brief Data sink that saves to CSV files with comma delimiter +/** + * When a data point is recorded, the following are saved in this order, separated by a comma: + * - timestamp (under column heading time) + * - tags + * - fields + * + * The header (time, tag names, and field names) are written to a file when a file is first created. + */ +class CSVSink : public IDataSink +{ +public: + CSVSink(const std::string& name); + + /** \brief Configure file sink based on JSON object + * + * Valid keys: + * - `directory`: output directory where files are saved + * + * \param config JSON configuration + */ + virtual void setConfiguration(const nlohmann::json& config); + + virtual void setTag(const std::string& name, const std::string& value); + virtual void setTag(const std::string& name, int8_t value); + virtual void setTag(const std::string& name, int32_t value); + virtual void setTag(const std::string& name, int64_t value); + virtual void setTag(const std::string& name, double value); + + virtual void startMeasurement(const std::string& measurement, std::chrono::time_point<std::chrono::system_clock> time); + virtual void setField(const std::string& name, const std::string& value); + virtual void setField(const std::string& name, int8_t value); + virtual void setField(const std::string& name, int32_t value); + virtual void setField(const std::string& name, int64_t value); + virtual void setField(const std::string& name, double value); + + //! \brief Save data point to CSV + /** + * Saves the fields in the current data points in csv file. If the header + * needs to be written to file, it is also done here. + * The timestamp will be uploaded with milliseconds precision. + */ + virtual void recordPoint(); + virtual void endMeasurement(); + +private: + //! \brief Check if string contains CSV character + /** + * Check if the string contains a comma or double quote. + * + * If the string contains a comma, return the string in double-quotes. + * If the string contains quotes and a comma, the quotation mark will be escaped. + * + * \param name string to be check. + * + * return encoded string if string contains a comma. + * else return original name. + */ + std::string checkCSVEncoding(const std::string& name); + + //! \brief Checks whether a tag can be added + /** + * If tag is not defined, then it is added to the list. + * + * Marks tags as dirty. + * + * Error checking is performed. An exception is thrown if + * - Currently performing a measurement + * + * \param name Field name + */ + void checkTag(const std::string& name); + + //! \brief Adds a new field column if it already does not exists. + /** + * If field is already defined, nothing is done. + * + * Error checking is performed. An exception is thrown if + * - called after the first `recordPoint` in a measurement + * + * \param name Field name + */ + void addField(const std::string& name); + + //! \brief write the header + /** + * write the header in the format. + * Tag1 Tag 2 FIELD1 FIELD2 FIELD3 + * + * \param ofile file to save the measurement + */ + void printHeader(std::ofstream& ofile); + + // + // Directory where files are saved + std::string m_directory; + + // + // State + + // timestamp + unsigned long long m_timestamp; + + // Output CSV file + std::ofstream m_ofile; + + // Name of the current measurement (valid onyl if performing one) + std::string m_measurement; + + // Currently performing a measurement + bool m_inMeasurement=false; + + // File exists + bool m_fileExist; + + // + // Data store for fields + bool m_finalSchema=false; // The measurement schema is locked + std::vector<std::string> m_fields; + std::unordered_map<std::string, std::string> m_fieldsString; + std::unordered_map<std::string, int8_t > m_fieldsInt8 ; + std::unordered_map<std::string, int32_t > m_fieldsInt32; + std::unordered_map<std::string, int64_t > m_fieldsInt64; + std::unordered_map<std::string, double > m_fieldsDouble; + + // + // Data store for tags + std::vector<std::string> m_tags; + std::unordered_map<std::string, std::string> m_tagsString; + std::unordered_map<std::string, int8_t > m_tagsInt8 ; + std::unordered_map<std::string, int32_t > m_tagsInt32; + std::unordered_map<std::string, int64_t > m_tagsInt64; + std::unordered_map<std::string, double > m_tagsDouble; + +}; + +#endif // CSVSINK_H diff --git a/src/tools/ps_monitor.cpp b/src/tools/ps_monitor.cpp index 3fd3a58b8353e0982aa211e566aa6b52879ee94b..525b0729e8623f035b83a0ed8195a0f7bdb68419 100644 --- a/src/tools/ps_monitor.cpp +++ b/src/tools/ps_monitor.cpp @@ -85,8 +85,8 @@ int main(int argc, char*argv[]) logger(logDEBUG) << " Config file: " << configFile; logger(logDEBUG) << " sink name: " << sinkName; logger(logDEBUG) << " PS channel: " << channelName; - - // Crease sink + + // Create sink DataSinkConf ds; ds.setHardwareConfig(configFile); std::shared_ptr<IDataSink> sink = ds.getDataSink(sinkName);