DiskWriter.h 6.24 KB
Newer Older
1
2
3
4
5
#ifndef _evb_bu_DiskWriter_h_
#define _evb_bu_DiskWriter_h_

#include <boost/filesystem/convenience.hpp>

6
7
#include <atomic>
#include <cstdint>
8
#include <curl/curl.h>
9
#include <map>
10
11
#include <memory>
#include <mutex>
12
#include <string>
13
#include <vector>
14

15
#include "cgicc/HTMLClasses.h"
16
#include "evb/bu/Configuration.h"
17
#include "evb/bu/Event.h"
18
#include "evb/bu/FileInfo.h"
19
20
#include "evb/bu/StreamHandler.h"
#include "evb/InfoSpaceItems.h"
21
#include "evb/OneToOneQueue.h"
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include "toolbox/lang/Class.h"
#include "toolbox/task/Action.h"
#include "toolbox/task/WorkLoop.h"
#include "xdata/UnsignedInteger32.h"


namespace evb {

  class BU;

  namespace bu { // namespace evb::bu

    class ResourceManager;
    class StateMachine;

    /**
     * \ingroup xdaqApps
     * \brief Write events to disk
     */

    class DiskWriter : public toolbox::lang::Class
    {
    public:

      DiskWriter
      (
        BU*,
49
        std::shared_ptr<ResourceManager>
50
51
      );

52
53
      ~DiskWriter();

54
      /**
55
       * Process the event
56
       */
57
58
59
      void handleEvent(EventPtr&&);

      /**
60
       * Get the next file info.
61
62
       * Return false if no event is available
       */
63
      bool getNextFileInfo(FileInfoPtr&);
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

      /**
       * Append the info space items to be published in the
       * monitoring info space to the InfoSpaceItems
       */
      void appendMonitoringItems(InfoSpaceItems&);

      /**
       * Update all values of the items put into the monitoring
       * info space. The caller has to make sure that the info
       * space where the items reside is locked and properly unlocked
       * after the call.
       */
      void updateMonitoringItems();

      /**
       * Configure
       */
      void configure();

      /**
       * Start processing messages
       */
      void startProcessing(const uint32_t runNumber);

      /**
       * Drain messages
       */
      void drain();

      /**
       * Stop processing messages
       */
      void stopProcessing();

      /**
100
       * Return monitoring information as cgicc snipped
101
       */
102
103
      cgicc::div getHtmlSnipped() const;

104
105
106
107
      /**
       * Return the number of lumi sections since the start of the run
       */
      uint32_t getNbLumiSections() const;
108
109


110
    private:
111

112
113
114
      struct LumiInfo
      {
        const uint32_t lumiSection;
115
        uint32_t totalEvents;
116
117
        uint32_t nbEvents;
        uint32_t nbEventsWritten;
118
        uint64_t nbBytesWritten;
119
        uint32_t nbIncompleteEvents;
120
121
        uint32_t fileCount;
        uint32_t index;
122
        bool isEmpty;
123

124
        LumiInfo(const uint32_t ls)
125
126
          : lumiSection(ls),totalEvents(0),nbEvents(0),nbEventsWritten(0),nbBytesWritten(0),
            nbIncompleteEvents(0),fileCount(0),index(0),isEmpty(false) {};
127
128
129

        bool isComplete() const
        { return isEmpty || (nbEvents > 0 && nbEvents == nbEventsWritten+nbIncompleteEvents); }
130
      };
131
      using LumiInfoPtr = std::unique_ptr<LumiInfo>;
132
      using LumiStatistics = std::map<uint32_t,LumiInfoPtr>;
133
      LumiStatistics lumiStatistics_;
134
      std::mutex lumiStatisticsMutex_;
135

136
137
      bool getReadyFileInfo(FileInfoPtr&);
      bool populateFileInfo(FileInfoPtr&);
138
      void resetMonitoringCounters();
139
      void startFileAccounting();
140
      bool idle() const;
141
142
143
144
145
146
      bool fileAccounting(toolbox::task::WorkLoop*);
      bool processLumiSections(const bool completeLumiSectionsOnly);
      bool closeStaleFiles();
      bool closeLumiSections();
      bool moveFiles();
      void handleRawDataFile(const FileInfoPtr&);
147
148
149
      LumiStatistics::iterator getLumiStatistics(const uint32_t lumiSection);
      void createDir(const boost::filesystem::path&) const;
      void removeDir(const boost::filesystem::path&) const;
150
      void closeAnyOldRuns() const;
151
152
      void populateHltdDirectory(const boost::filesystem::path& runDir) const;
      void getHLTmenu(const boost::filesystem::path& tmpDir) const;
153
      void writeHLTinfo(const boost::filesystem::path& tmpDir) const;
154
      void writeBlacklist(const boost::filesystem::path& tmpDir) const;
155
      void writeWhitelist(const boost::filesystem::path& tmpDir) const;
156
      uint16_t writeHostList(const boost::filesystem::path&, const xdata::String& hosts) const;
157
      void retrieveFromURL(CURL*, const std::string& url, const boost::filesystem::path& output) const;
158
      void createLockFile(const boost::filesystem::path&) const;
159
160
161
162
      void writeEoLS(const LumiInfoPtr&) const;
      void writeEoR() const;
      void defineEoLS(const boost::filesystem::path& jsdDir);
      void defineEoR(const boost::filesystem::path& jsdDir);
163
164

      BU* bu_;
165
      std::shared_ptr<ResourceManager> resourceManager_;
166
      const ConfigurationPtr& configuration_;
167
168
169

      const uint32_t buInstance_;
      uint32_t runNumber_;
170
      uint64_t tmpFileIndex_;
171
172
173

      boost::filesystem::path runRawDataDir_;
      boost::filesystem::path runMetaDataDir_;
174
      boost::filesystem::path rawDataDefFile_;
175
176
177
      boost::filesystem::path eolsDefFile_;
      boost::filesystem::path eorDefFile_;

178
179
180
181
182
      using EventFIFO = OneToOneQueue<EventPtr>;
      EventFIFO eventFIFO_;
      mutable std::mutex eventFIFOmutex_;

      using FileInfoMap = std::unordered_map<uint32_t,FileInfoPtr>;
183
184
185
186
187
      FileInfoMap fileInfoMap_;
      mutable std::mutex fileInfoMapMutex_;

      using FileInfoFIFO = OneToOneQueue<FileInfoPtr>;
      FileInfoFIFO fileInfoFIFO_;
188
      mutable std::mutex fileInfoFIFOmutex_;
189
190

      using StreamHandlers = std::vector<StreamHandlerPtr>;
191
192
      StreamHandlers streamHandlers_;

193
194
      toolbox::task::WorkLoop* fileAccountingWorkLoop_;
      toolbox::task::ActionSignature* fileAccountingAction_;
195
      volatile std::atomic<bool> doProcessing_;
196
      volatile std::atomic<bool> active_;
197
198
199
200
201
202

      struct DiskWriterMonitoring
      {
        uint32_t nbFiles;
        uint32_t nbEventsWritten;
        uint32_t nbLumiSections;
203
        uint32_t lastLumiSection;
204
205
        uint32_t currentLumiSection;
      } diskWriterMonitoring_;
206
      mutable std::mutex diskWriterMonitoringMutex_;
207
208
209

      xdata::UnsignedInteger32 nbFilesWritten_;
      xdata::UnsignedInteger32 nbLumiSections_;
210
      xdata::UnsignedInteger32 currentLumiSection_;
211
212
213
214
215

    };

  } } // namespace evb::bu

216

217
218
219
220
221
222
223
224
225
#endif // _evb_bu_DiskWriter_h_


/// emacs configuration
/// Local Variables: -
/// mode: c++ -
/// c-basic-offset: 2 -
/// indent-tabs-mode: nil -
/// End: -