DiskWriter.cc 28.6 KB
Newer Older
1
#include <errno.h>
2
#include <iomanip>
3
#include <memory>
4
#include <regex>
5
6
#include <stdio.h>
#include <sstream>
7
#include <string>
8
#include <sys/stat.h>
9
#include <sys/types.h>
10
11
12
13

#include "evb/BU.h"
#include "evb/bu/DiskWriter.h"
#include "evb/bu/ResourceManager.h"
14
#include "evb/bu/RUproxy.h"
15
16
#include "evb/Exception.h"
#include "toolbox/task/WorkLoopFactory.h"
17
18
#include "xdata/String.h"
#include "xdata/Vector.h"
19
20
21
22
23


evb::bu::DiskWriter::DiskWriter
(
  BU* bu,
24
  std::shared_ptr<ResourceManager> resourceManager
25
) :
26
27
28
29
  bu_(bu),
  resourceManager_(resourceManager),
  configuration_(bu->getConfiguration()),
  buInstance_(bu->getApplicationDescriptor()->getInstance()),
30
  tmpFileIndex_(0),
31
  eventFIFO_(bu,"eventFIFO"),
32
  fileInfoFIFO_(bu,"fileInfoFIFO"),
33
  doProcessing_(false),
34
  active_(false)
35
36
{
  resetMonitoringCounters();
37
  startFileAccounting();
38
  umask(0);
39
40
41
}


42
43
evb::bu::DiskWriter::~DiskWriter()
{
44
45
  streamHandlers_.clear();

46
47
  if ( fileAccountingWorkLoop_ && fileAccountingWorkLoop_->isActive() )
    fileAccountingWorkLoop_->cancel();
48
49
50
}


51
void evb::bu::DiskWriter::handleEvent(EventPtr&& event)
52
{
53
54
55
56
57
58
  if ( configuration_->dropEventData )
  {
    resourceManager_->discardEvent(event);
  }
  else
  {
59
    std::lock_guard<std::mutex> guard(eventFIFOmutex_);
60

61
62
63
64
65
66
67
    eventFIFO_.enqWait(std::move(event));
  }
}


bool evb::bu::DiskWriter::getNextFileInfo(FileInfoPtr& fileInfo)
{
68
  return ( getReadyFileInfo(fileInfo) || populateFileInfo(fileInfo) );
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
}


bool evb::bu::DiskWriter::getReadyFileInfo(FileInfoPtr& fileInfo)
{
    std::lock_guard<std::mutex> guard(fileInfoFIFOmutex_);

    return fileInfoFIFO_.deq(fileInfo);
}


bool evb::bu::DiskWriter::populateFileInfo(FileInfoPtr& fileInfo)
{
  std::lock_guard<std::mutex> guard(fileInfoMapMutex_);

  bool haveCompleteFileInfo = false;
  EventPtr event;

  while ( !haveCompleteFileInfo && eventFIFO_.deq(event) )
  {
89
    const uint32_t lumiSection = event->getEvBid().lumiSection();
90
    FileInfoMap::iterator pos = fileInfoMap_.find(lumiSection);
91

92
    if ( pos == fileInfoMap_.end() )
93
    {
94
95
96
97
98
99
100
      pos = fileInfoMap_.emplace_hint(pos,
                                      lumiSection,
                                      std::make_unique<FileInfo>(
                                        (runRawDataDir_ / std::to_string(tmpFileIndex_++)).string()
                                      ));
      pos->second->events.reserve(configuration_->maxEventsPerFile);
      pos->second->header.lumiSection = lumiSection;
101
102
103
104
105
106
#ifdef EVB_USE_LOCS_VECTOR
      iovec fileHeader;
      fileHeader.iov_base = &(pos->second->header);
      fileHeader.iov_len = sizeof(FileHeader);
      pos->second->dataLocations.emplace_back(std::move(fileHeader));
#endif
107
108
    }

109
110
    pos->second->header.eventCount++;
    pos->second->header.fileSize += sizeof(EventInfo) + event->getEventInfo()->eventSize();
111
112
113
114
115
116
117
118
119
#ifdef EVB_USE_LOCS_VECTOR
    iovec eventHeader;
    eventHeader.iov_base = event->getEventInfo().get();
    eventHeader.iov_len = sizeof(EventInfo);
    pos->second->dataLocations.emplace_back(std::move(eventHeader));

    const DataLocations& frags = event->getDataLocations();
    pos->second->dataLocations.insert(pos->second->dataLocations.end(),frags.begin(),frags.end());
#endif
120
    pos->second->events.emplace_back(std::move(event));
121

122
    if ( pos->second->header.eventCount == configuration_->maxEventsPerFile )
123
    {
124
125
126
      fileInfo = std::move(pos->second);
      fileInfoMap_.erase(pos);
      haveCompleteFileInfo = true;
127
    }
128
129
  }

130
  return haveCompleteFileInfo;
131
132
133
134
135
}


void evb::bu::DiskWriter::startProcessing(const uint32_t runNumber)
{
136
137
138
  if ( configuration_->closeOldRuns )
    closeAnyOldRuns();

139
140
  resetMonitoringCounters();
  runNumber_ = runNumber;
141
  tmpFileIndex_ = 0;
142

143
144
145
146
147
148
149
150
151
152
  std::ostringstream runDir;
  runDir << "run" << std::setfill('0') << std::setw(6) << runNumber_;

  boost::filesystem::path rawRunDir( configuration_->rawDataDir.value_ );
  rawRunDir /= runDir.str();
  runRawDataDir_ = rawRunDir / "open";

  runMetaDataDir_ = configuration_->metaDataDir.value_;
  runMetaDataDir_ /= runDir.str();

153
  fileInfoMap_.clear();
154
  streamHandlers_.clear();
155
  lumiStatistics_.clear();
156

157
158
159
160
  if ( ! configuration_->dropEventData )
  {
    createDir(runRawDataDir_);
    createDir(runMetaDataDir_);
161

162
    populateHltdDirectory(rawRunDir);
163
    createLockFile(rawRunDir);
164

165
166
167
168
169
    const boost::filesystem::path jsdDir = runMetaDataDir_ / configuration_->jsdDirName.value_;
    createDir(jsdDir);
    defineEoLS(jsdDir);
    defineEoR(jsdDir);

170
171
172
173
174
    for (uint16_t i=0; i < configuration_->numberOfDiskWriters; ++i)
    {
      streamHandlers_.emplace_back(std::make_unique<StreamHandler>(
                                     bu_,
                                     this,
175
                                     resourceManager_,
176
177
                                     std::to_string(i)));
    }
178
  }
179
180
181

  doProcessing_ = true;
  fileAccountingWorkLoop_->submit(fileAccountingAction_);
182
183
184
}


185
186
bool evb::bu::DiskWriter::idle() const
{
187
  if ( !eventFIFO_.empty() || !fileInfoFIFO_.empty() ) return false;
188
189
190
191
192
193
194
195
196
197

  for (auto const& handler : streamHandlers_)
  {
    if ( !handler->idle() ) return false;
  }

  return true;
}


198
void evb::bu::DiskWriter::drain()
199
{
200
  while ( !idle() || active_ ) ::usleep(1000);
201
}
202
203
204
205


void evb::bu::DiskWriter::stopProcessing()
{
206
207
  doProcessing_ = false;

208
  while ( active_ || !eventFIFO_.empty() ) ::usleep(1000);
209
210
211
212

  processLumiSections(false);

  if ( ! fileInfoMap_.empty() )
213
  {
214
215
216
217
218
219
220
    std::ostringstream msg;
    msg << "There are unaccounted events for the following lumi sections:";
    for (auto const& fileInfo : fileInfoMap_)
    {
      msg << " " << fileInfo.first;
    }
    XCEPT_RAISE(exception::DiskWriting, msg.str());
221
222
  }

223
  while ( ! fileInfoFIFO_.empty() ) ::usleep(1000);
224

225
226
227
228
  for (auto const& handler : streamHandlers_)
  {
    handler->stopProcessing();
  }
229

230
  moveFiles();
231
  closeLumiSections();
232

233
  if ( ! lumiStatistics_.empty() )
234
  {
235
236
    std::ostringstream msg;
    msg << "There are unaccounted files for the following lumi sections:";
237
    for (auto const& stat : lumiStatistics_)
238
    {
239
      msg << " " << stat.first;
240
    }
241
    XCEPT_RAISE(exception::DiskWriting, msg.str());
242
243
244
  }

  streamHandlers_.clear();
245

246
  if ( !configuration_->dropEventData )
247
  {
248
249
250
251
252
253
254
255
    writeEoR();
    removeDir(runRawDataDir_);

    if ( configuration_->deleteRawDataFiles )
    {
      removeDir(runRawDataDir_.parent_path());
      removeDir(runMetaDataDir_);
    }
256
  }
257
258
259
}


260
void evb::bu::DiskWriter::startFileAccounting()
261
262
263
{
  try
  {
264
265
    fileAccountingWorkLoop_ =
      toolbox::task::getWorkLoopFactory()->getWorkLoop(bu_->getIdentifier("fileAccounting"), "waiting");
266

267
268
    if ( !fileAccountingWorkLoop_->isActive() )
      fileAccountingWorkLoop_->activate();
269

270
    fileAccountingAction_ =
271
      toolbox::task::bind(this,
272
273
                          &evb::bu::DiskWriter::fileAccounting,
                          bu_->getIdentifier("fileAccountingAction"));
274
275
276
  }
  catch(xcept::Exception& e)
  {
277
    std::string msg = "Failed to start lumi accounting workloop";
278
279
280
281
282
    XCEPT_RETHROW(exception::WorkLoop, msg, e);
  }
}


283
bool evb::bu::DiskWriter::fileAccounting(toolbox::task::WorkLoop* wl)
284
{
285
286
  if ( ! doProcessing_ ) return false;

287
  active_ = true;
288
289
290

  try
  {
291
292
293
294
295
296
    while (
      processLumiSections(true) ||
      closeStaleFiles() ||
      moveFiles() ||
      closeLumiSections()
    ) {};
297
298
299
  }
  catch(xcept::Exception& e)
  {
300
    active_ = false;
301
    bu_->getStateMachine()->processFSMEvent( Fail(e) );
302
303
304
  }
  catch(std::exception& e)
  {
305
    active_ = false;
306
    XCEPT_DECLARE(exception::DiskWriting,
307
                  sentinelException, e.what());
308
    bu_->getStateMachine()->processFSMEvent( Fail(sentinelException) );
309
310
311
  }
  catch(...)
  {
312
    active_ = false;
313
    XCEPT_DECLARE(exception::DiskWriting,
314
                  sentinelException, "unkown exception");
315
    bu_->getStateMachine()->processFSMEvent( Fail(sentinelException) );
316
317
  }

318
  active_ = false;
319
320
321
322
323
324
325

  ::usleep(1000);

  return doProcessing_;
}


326
bool evb::bu::DiskWriter::processLumiSections(const bool completeLumiSectionsOnly)
327
{
328
  bool workDone = false;
329
330
  ResourceManager::LumiSectionAccountPtr lumiSectionAccount;

331
  while ( resourceManager_->getNextLumiSectionAccount(lumiSectionAccount,completeLumiSectionsOnly) )
332
  {
333
334
    workDone = true;

335
336
    if ( configuration_->dropEventData ) continue;

337
338
    LumiStatistics::iterator lumiStatistics = getLumiStatistics(lumiSectionAccount->lumiSection);
    if ( lumiStatistics->second->nbEvents > 0 )
339
340
341
342
343
    {
      std::ostringstream msg;
      msg << "Got a duplicated account for lumi section " << lumiSectionAccount->lumiSection;
      XCEPT_RAISE(exception::EventOrder, msg.str());
    }
344
345
    if ( lumiSectionAccount->nbEvents == 0 )
    {
346
      lumiStatistics->second->isEmpty = true;
347
348
349
    }
    else
    {
350
351
352
353
354
355
356
357
358
359
360
361
362
363
      lumiStatistics->second->nbEvents = lumiSectionAccount->nbEvents;
      lumiStatistics->second->nbIncompleteEvents = lumiSectionAccount->nbIncompleteEvents;
    }
    lumiStatistics->second->totalEvents = bu_->getTotalEventsInLumiSection(lumiSectionAccount->lumiSection);

    {
      std::lock_guard<std::mutex> guard(fileInfoMapMutex_);

      FileInfoMap::iterator fileInfo = fileInfoMap_.find(lumiSectionAccount->lumiSection);
      if ( fileInfo != fileInfoMap_.end() )
      {
        fileInfoFIFO_.enqWait(std::move(fileInfo->second));
        fileInfoMap_.erase(fileInfo);
      }
364
365
366
    }
  }

367
368
  return workDone;
}
369

370
371
372
373
374
375
376
377
378
379
380
381

bool evb::bu::DiskWriter::closeStaleFiles()
{
  bool workDone = false;

  std::lock_guard<std::mutex> guard(fileInfoMapMutex_);

  FileInfoMap::iterator fileInfo = fileInfoMap_.begin();

  while ( fileInfo != fileInfoMap_.end() )
  {
    if ( fileInfo->second->getFileAge() > configuration_->maxFileAgeSeconds )
382
    {
383
384
385
      fileInfoFIFO_.enqWait(std::move(fileInfo->second));
      fileInfoMap_.erase(fileInfo++);
      workDone = true;
386
    }
387
    else
388
    {
389
      ++fileInfo;
390
    }
391
392
393
394
  }

  return workDone;
}
395

396
397
398
399
400
401
402
403

bool evb::bu::DiskWriter::closeLumiSections()
{
  bool workDone = false;

  LumiStatistics::iterator it = lumiStatistics_.begin();
  while ( it != lumiStatistics_.end() )
  {
404
    if ( it->second->isComplete() )
405
    {
406
      workDone = true;
407
408
      writeEoLS(it->second);

409
      {
410
        std::lock_guard<std::mutex> guard(diskWriterMonitoringMutex_);
411
412

        if ( it->second->nbEventsWritten > 0 )
413
          diskWriterMonitoring_.nbLumiSections++;
414
415
416
        if ( diskWriterMonitoring_.currentLumiSection < it->second->lumiSection )
          diskWriterMonitoring_.currentLumiSection = it->second->lumiSection;
      }
417
418
419
420
421
422
423
424
425

      lumiStatistics_.erase(it++);
    }
    else
    {
      ++it;
    }
  }

426
  return workDone;
427
428
429
}


430
bool evb::bu::DiskWriter::moveFiles()
431
{
432
433
  FileInfoPtr fileInfo;
  bool workDone = false;
434

435
  for (auto const& handler : streamHandlers_)
436
  {
437
438
439
440
    while ( handler->getNextClosedFileInfo(fileInfo) )
    {
      workDone = true;
      handleRawDataFile(fileInfo);
441
    }
442
443
444
  }

  return workDone;
445
446
447
}


448
void evb::bu::DiskWriter::handleRawDataFile(const FileInfoPtr& fileInfo)
449
{
450
451
  const uint32_t lumiSection = fileInfo->header.lumiSection;
  const LumiStatistics::iterator lumiStatistics = getLumiStatistics(lumiSection);
452

453
  if ( configuration_->deleteRawDataFiles )
454
  {
455
    boost::filesystem::remove(fileInfo->fileName);
456
  }
457
  else
458
  {
459
460
461
462
463
464
    std::ostringstream fileNameStream;
    fileNameStream << std::setfill('0') <<
      "run"<< std::setw(6) << runNumber_ <<
      "_ls" << std::setw(4) << lumiSection <<
      "_index" << std::setw(6) << lumiStatistics->second->index++ <<
      ".raw";
465

466
    boost::filesystem::rename(fileInfo->fileName,runRawDataDir_.parent_path() / fileNameStream.str());
467
  }
468

469
  {
470
    std::lock_guard<std::mutex> guard(diskWriterMonitoringMutex_);
471

472
473
474
475
476
477
    diskWriterMonitoring_.nbFiles++;
    diskWriterMonitoring_.nbEventsWritten += fileInfo->header.eventCount;
    if ( diskWriterMonitoring_.currentLumiSection < lumiSection)
      diskWriterMonitoring_.currentLumiSection = lumiSection;
    if ( diskWriterMonitoring_.lastLumiSection < lumiSection )
      diskWriterMonitoring_.lastLumiSection = lumiSection;
478
  }
479

480
481
482
  lumiStatistics->second->fileCount++;
  lumiStatistics->second->nbEventsWritten += fileInfo->header.eventCount;
  lumiStatistics->second->nbBytesWritten += fileInfo->header.fileSize;
483
484
485
486
487
}


evb::bu::DiskWriter::LumiStatistics::iterator evb::bu::DiskWriter::getLumiStatistics(const uint32_t lumiSection)
{
488
  std::lock_guard<std::mutex> guard(lumiStatisticsMutex_);
489

490
  return lumiStatistics_.emplace(lumiSection,std::make_unique<LumiInfo>(lumiSection)).first;
491
492
493
}


494
495
496
497
void evb::bu::DiskWriter::appendMonitoringItems(InfoSpaceItems& items)
{
  nbFilesWritten_ = 0;
  nbLumiSections_ = 0;
498
  currentLumiSection_ = 0;
499
500
501

  items.add("nbFilesWritten", &nbFilesWritten_);
  items.add("nbLumiSections", &nbLumiSections_);
502
  items.add("currentLumiSection", &currentLumiSection_);
503
504
505
506
507
}


void evb::bu::DiskWriter::updateMonitoringItems()
{
508
  std::lock_guard<std::mutex> guard(diskWriterMonitoringMutex_);
509
510
511

  nbFilesWritten_ = diskWriterMonitoring_.nbFiles;
  nbLumiSections_ = diskWriterMonitoring_.nbLumiSections;
512
  currentLumiSection_ = diskWriterMonitoring_.currentLumiSection;
513
514
515
516
517
}


void evb::bu::DiskWriter::resetMonitoringCounters()
{
518
  std::lock_guard<std::mutex> guard(diskWriterMonitoringMutex_);
519
520
521
522
523

  diskWriterMonitoring_.nbFiles = 0;
  diskWriterMonitoring_.nbEventsWritten = 0;
  diskWriterMonitoring_.nbLumiSections = 0;
  diskWriterMonitoring_.currentLumiSection = 0;
524
  diskWriterMonitoring_.lastLumiSection = 0;
525
526
527
}


528
529
uint32_t evb::bu::DiskWriter::getNbLumiSections() const
{
530
  std::lock_guard<std::mutex> guard(diskWriterMonitoringMutex_);
531
532
533
534
  return diskWriterMonitoring_.nbLumiSections;
}


535
536
void evb::bu::DiskWriter::configure()
{
537
  eventFIFO_.clear();
538
  fileInfoFIFO_.clear();
539
  streamHandlers_.clear();
540
  lumiStatistics_.clear();
541

542
  if ( ! configuration_->dropEventData )
543
  {
544
    eventFIFO_.resize(configuration_->maxEvtsUnderConstruction);
545
    fileInfoFIFO_.resize(configuration_->fileInfoFIFOCapacity);
546
    createDir(configuration_->rawDataDir.value_);
547
    createDir(configuration_->metaDataDir.value_);
548
549
550
551
  }
}


552
void evb::bu::DiskWriter::createDir(const boost::filesystem::path& path) const
553
554
{
  if ( ! boost::filesystem::exists(path) &&
555
       ( ! boost::filesystem::create_directories(path) ) )
556
  {
557
558
559
    std::ostringstream msg;
    msg << "Failed to create directory " << path.string();
    XCEPT_RAISE(exception::DiskWriting, msg.str());
560
561
562
563
  }
}


564
void evb::bu::DiskWriter::removeDir(const boost::filesystem::path& path) const
565
{
566
  if ( boost::filesystem::exists(path) )
567
  {
568
569
570
571
572
573
574
575
576
577
578
    try
    {
      boost::filesystem::remove_all(path);
    }
    catch(boost::filesystem::filesystem_error& e)
    {
      std::ostringstream msg;
      msg << "Failed to remove directory " << path.string();
      msg << ": " << e.what();
      XCEPT_RAISE(exception::DiskWriting, msg.str());
    }
579
580
581
582
  }
}


583
cgicc::div evb::bu::DiskWriter::getHtmlSnipped() const
584
{
585
586
  using namespace cgicc;

587
588
  cgicc::div div;
  div.add(p("DiskWriter"));
589
590

  {
591
592
593
    table table;
    table.set("title","Statistics for files generated. This numbers are updated only then a file is closed. Thus, they lag a bit behind. If 'dropEventData' is true, these counters stay at 0.");

594
    std::lock_guard<std::mutex> guard(diskWriterMonitoringMutex_);
595

596
    table.add(tr()
597
              .add(td("# files written"))
598
              .add(td(std::to_string(diskWriterMonitoring_.nbFiles))));
599
    table.add(tr()
600
              .add(td("# events written"))
601
              .add(td(std::to_string(diskWriterMonitoring_.nbEventsWritten))));
602
    table.add(tr()
603
              .add(td("# finished lumi sections with files"))
604
              .add(td(std::to_string(diskWriterMonitoring_.nbLumiSections))));
605
    table.add(tr()
606
              .add(td("last lumi section with files"))
607
              .add(td(std::to_string(diskWriterMonitoring_.lastLumiSection))));
608
    table.add(tr()
609
              .add(td("current lumi section"))
610
              .add(td(std::to_string(diskWriterMonitoring_.currentLumiSection))));
611
    div.add(table);
612
613
  }

614
615
  div.add(eventFIFO_.getHtmlSnipped());

616
  div.add(fileInfoFIFO_.getHtmlSnipped());
617

Remi Mommsen's avatar
Remi Mommsen committed
618
  {
619
620
621
622
    cgicc::table table;
    table.set("title","List of writer threads. Each thread writes events independently.");

    table.add(tr()
623
              .add(th("Event writers").set("colspan","5")));
624
625
626
    table.add(tr()
              .add(td("writer"))
              .add(td("active"))
627
628
629
              .add(td("#files ready"))
              .add(td("#events"))
              .add(td("MB written")));
630
631
632

    for (auto const& handler : streamHandlers_)
    {
633
      table.add(handler->getWriterTableRow());
634
635
636
637
638
    }

    div.add(table);
  }

639
  return div;
640
641
642
}


643
644
645
646
647
648
649
650
651
652
void evb::bu::DiskWriter::closeAnyOldRuns() const
{
  const boost::filesystem::path rawDataDir( configuration_->rawDataDir.value_ );

  if ( ! boost::filesystem::exists(rawDataDir) ) return;

  boost::filesystem::directory_iterator dirIter(rawDataDir);

  while ( dirIter != boost::filesystem::directory_iterator() )
  {
653
    const std::string fileName = dirIter->path().string();
654
655
656
    size_t pos = fileName.rfind("run");
    if ( pos != std::string::npos )
    {
657
658
659
      boost::filesystem::path eorPath = *dirIter;
      eorPath /= boost::filesystem::path( "run" + fileName.substr(pos+3) + "_ls0000_EoR.jsn" );
      if ( ! boost::filesystem::exists(eorPath) )
660
      {
661
662
663
664
665
666
        try
        {
          std::ofstream json(eorPath.string().c_str());
          json.close();
        }
        catch (...) {} // Ignore any failures in case that the run directory is removed while we try to write
667
668
669
670
671
672
673
      }
    }
    ++dirIter;
  }
}


674
void evb::bu::DiskWriter::populateHltdDirectory(const boost::filesystem::path& runDir) const
675
{
676
  const boost::filesystem::path tmpPath = runDir / "tmp";
677
  createDir(tmpPath);
678
  getHLTmenu(tmpPath);
679
  writeHLTinfo(tmpPath);
680
  writeBlacklist(tmpPath);
681
  writeWhitelist(tmpPath);
682
683
684
685
686
687

  const boost::filesystem::path hltPath( runDir / configuration_->hltDirName.value_  );
  boost::filesystem::rename(tmpPath,hltPath);
}


688
void evb::bu::DiskWriter::getHLTmenu(const boost::filesystem::path& tmpDir) const
689
690
{
  if ( configuration_->hltParameterSetURL.value_.empty() ) return;
691

692
693
694
695
696
  std::string url(configuration_->hltParameterSetURL.value_);

  CURL* curl = curl_easy_init();
  if ( ! curl )
  {
697
    XCEPT_RAISE(exception::DiskWriting, "Could not initialize curl for retrieving the HLT menu");
698
699
700
701
702
703
704
  }

  try
  {
    char lastChar = *url.rbegin();
    if ( lastChar != '/' ) url += '/';

705
    for (auto const& hltFile : configuration_->hltFiles)
706
    {
707
      const std::string fileName = hltFile.toString();
708
      retrieveFromURL(curl, url+fileName, tmpDir/fileName);
709
    }
710
711
712
713
714
715
716
717
  }
  catch(xcept::Exception& e)
  {
    curl_easy_cleanup(curl);
    throw(e);
  }

  curl_easy_cleanup(curl);
718
}
719

720

721
void evb::bu::DiskWriter::writeHLTinfo(const boost::filesystem::path& tmpDir) const
722
{
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
  const boost::filesystem::path hltInfoPath( tmpDir / configuration_->hltinfoName.value_ );

  std::ofstream json(hltInfoPath.string().c_str());

  json << "{"                                                                     << std::endl;
  json << "   \"daqSystem\": \"" << configuration_->daqSystem.value_ << "\","     << std::endl;
  json << "   \"daqInstance\": \"" << configuration_->daqInstance.value_ << "\"," << std::endl;
  json << "   \"fuGroup\": \"" << configuration_->fuGroup.value_ << "\""          << std::endl;
  json << "}"                                                                     << std::endl;
  json.close();
}


void evb::bu::DiskWriter::writeBlacklist(const boost::filesystem::path& tmpDir) const
{
  const boost::filesystem::path blacklistPath( tmpDir / configuration_->blacklistName.value_ );
739
740
741
742
  writeHostList(blacklistPath, configuration_->fuBlacklist);
}


743
void evb::bu::DiskWriter::writeWhitelist(const boost::filesystem::path& tmpDir) const
744
{
745
  const boost::filesystem::path whitelistPath( tmpDir / configuration_->whitelistName.value_ );
746
747
748
749
750
751
752
  writeHostList(whitelistPath, configuration_->fuWhitelist);
}


void evb::bu::DiskWriter::writeHostList(const boost::filesystem::path& path, const xdata::String& hosts) const
{
  std::ofstream hostlist(path.string().c_str());
753

754
  const std::regex regex("[a-zA-Z0-9]+-[a-zA-Z0-9-]+");
755
756
  std::sregex_iterator res(hosts.value_.begin(), hosts.value_.end(), regex);

757
  const std::sregex_iterator end;
758
759
  if ( res == end )
  {
760
    hostlist << "[]" << std::endl;
761
762
763
  }
  else
  {
764
    hostlist << "[\"" << res->str()  << "\"";
765
    while (++res != end) {
766
      hostlist << ", \"" << res->str() << "\"";
767
    }
768
    hostlist << "]" << std::endl;
769
  }
770
  hostlist.close();
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
}


void evb::bu::DiskWriter::retrieveFromURL(CURL* curl, const std::string& url, const boost::filesystem::path& output) const
{
  const char* path = output.string().c_str();
  FILE* file = fopen(path,"w");
  if ( ! file )
  {
    std::ostringstream msg;
    msg << "Failed to open file " << path
      << ": " << strerror(errno);
    XCEPT_RAISE(exception::DiskWriting, msg.str());
  }

  curl_easy_setopt(curl, CURLOPT_WRITEDATA, file);
  curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
  curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); //allow libcurl to follow redirection
789
  curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
790

791
  const CURLcode result = curl_easy_perform(curl);
792

793
  if ( result != CURLE_OK )
794
795
796
797
798
  {
    fclose(file);

    std::ostringstream msg;
    msg << "Failed to retrieve the HLT information from " << url
799
      << ": " << curl_easy_strerror(result);
800
801
802
803
804
805
806
    XCEPT_RAISE(exception::DiskWriting, msg.str());
  }

  fclose(file);
}


807
808
void evb::bu::DiskWriter::createLockFile(const boost::filesystem::path& runDir) const
{
809
  const boost::filesystem::path fulockPath( runDir / configuration_->fuLockName.value_ );
810
811
812
813
814
815
816
  const char* path = fulockPath.string().c_str();
  std::ofstream fulock(path);
  fulock << "1 0";
  fulock.close();
}


817
void evb::bu::DiskWriter::writeEoLS(const LumiInfoPtr& lumiInfo) const
818
819
820
{
  std::ostringstream fileNameStream;
  fileNameStream << std::setfill('0') <<
821
822
823
    "run" << std::setw(6) << runNumber_ <<
    "_ls"  << std::setw(4) << lumiInfo->lumiSection <<
    "_EoLS.jsn";
824
825
826
827
  const boost::filesystem::path jsonFile = runMetaDataDir_ / fileNameStream.str();

  if ( boost::filesystem::exists(jsonFile) )
  {
828
829
830
    std::ostringstream msg;
    msg << "The JSON file " << jsonFile.string() << " already exists";
    XCEPT_RAISE(exception::DiskWriting, msg.str());
831
832
  }

833
834
  const std::string path = jsonFile.string() + ".tmp";
  std::ofstream json(path.c_str());
835
  json << "{"                                                              << std::endl;
836
837
838
  json << "   \"data\" : [ \""
    << lumiInfo->nbEventsWritten << "\", \""
    << lumiInfo->fileCount << "\", \""
839
    << lumiInfo->totalEvents << "\", \""
840
841
    << lumiInfo->nbIncompleteEvents << "\", \""
    << lumiInfo->nbBytesWritten << "\" ],"                                 << std::endl;
842
843
844
  json << "   \"definition\" : \"" << eolsDefFile_.string() << "\","       << std::endl;
  json << "   \"source\" : \"BU-"  << buInstance_ << "\""                  << std::endl;
  json << "}"                                                              << std::endl;
845
  json.close();
846
847

  boost::filesystem::rename(path,jsonFile);
848
849
850
851
852
853
854
}


void evb::bu::DiskWriter::writeEoR() const
{
  std::ostringstream fileNameStream;
  fileNameStream << std::setfill('0') <<
855
856
    "run" << std::setw(6) << runNumber_ <<
    "_ls0000_EoR.jsn";
857
858
859
860
  const boost::filesystem::path jsonFile = runMetaDataDir_ / fileNameStream.str();

  if ( boost::filesystem::exists(jsonFile) )
  {
861
862
863
    std::ostringstream msg;
    msg << "The JSON file " << jsonFile.string() << " already exists";
    XCEPT_RAISE(exception::DiskWriting, msg.str());
864
865
  }

866
  std::lock_guard<std::mutex> guard(diskWriterMonitoringMutex_);
867
868
  const std::string path = jsonFile.string() + ".tmp";
  std::ofstream json(path.c_str());
869
  json << "{"                                                                           << std::endl;
870
871
872
873
  json << "   \"data\" : [ \""
    << diskWriterMonitoring_.nbEventsWritten << "\", \""
    << diskWriterMonitoring_.nbFiles         << "\", \""
    << diskWriterMonitoring_.nbLumiSections  << "\", \""
874
    << diskWriterMonitoring_.lastLumiSection << "\" ],"                                 << std::endl;
875
  json << "   \"definition\" : \"" << eorDefFile_.string() << "\","                     << std::endl;
876
877
878
  json << "   \"source\" : \"BU-"  << buInstance_   << "\""                             << std::endl;
  json << "}"                                                                           << std::endl;
  json.close();
879
880

  boost::filesystem::rename(path,jsonFile);
881
882
883
}


884
void evb::bu::DiskWriter::defineEoLS(const boost::filesystem::path& jsdDir)
885
886
887
{
  eolsDefFile_ = jsdDir / "EoLS.jsd";

888
889
  const std::string path = eolsDefFile_.string() + ".tmp";
  std::ofstream json(path.c_str());
890
891
892
893
  json << "{"                                                 << std::endl;
  json << "   \"legend\" : ["                                 << std::endl;
  json << "      {"                                           << std::endl;
  json << "         \"name\" : \"NEvents\","                  << std::endl;
894
895
  json << "         \"operation\" : \"sum\","                 << std::endl;
  json << "         \"type\" : \"integer\""                   << std::endl;
896
897
898
  json << "      },"                                          << std::endl;
  json << "      {"                                           << std::endl;
  json << "         \"name\" : \"NFiles\","                   << std::endl;
899
900
  json << "         \"operation\" : \"sum\","                 << std::endl;
  json << "         \"type\" : \"integer\""                   << std::endl;
901
902
903
  json << "      },"                                          << std::endl;
  json << "      {"                                           << std::endl;
  json << "         \"name\" : \"TotalEvents\","              << std::endl;
904
905
906
907
908
909
910
  json << "         \"operation\" : \"max\","                 << std::endl;
  json << "         \"type\" : \"integer\""                   << std::endl;
  json << "      },"                                          << std::endl;
  json << "      {"                                           << std::endl;
  json << "         \"name\" : \"NLostEvents\","              << std::endl;
  json << "         \"operation\" : \"sum\","                 << std::endl;
  json << "         \"type\" : \"integer\""                   << std::endl;
911
912
913
914
915
  json << "      },"                                          << std::endl;
  json << "      {"                                           << std::endl;
  json << "         \"name\" : \"NBytes\","                   << std::endl;
  json << "         \"operation\" : \"sum\","                 << std::endl;
  json << "         \"type\" : \"integer\""                   << std::endl;
916
917
918
919
  json << "      }"                                           << std::endl;
  json << "   ]"                                              << std::endl;
  json << "}"                                                 << std::endl;
  json.close();
920
921

  boost::filesystem::rename(path,eolsDefFile_);
922
923
924
}


925
void evb::bu::DiskWriter::defineEoR(const boost::filesystem::path& jsdDir)
926
927
928
{
  eorDefFile_ = jsdDir / "EoR.jsd";

929
930
  const std::string path = eorDefFile_.string() + ".tmp";
  std::ofstream json(path.c_str());
931
932
933
934
  json << "{"                                                 << std::endl;
  json << "   \"legend\" : ["                                 << std::endl;
  json << "      {"                                           << std::endl;
  json << "         \"name\" : \"NEvents\","                  << std::endl;
935
936
  json << "         \"operation\" : \"sum\","                 << std::endl;
  json << "         \"type\" : \"integer\""                   << std::endl;
937
938
939
  json << "      },"                                          << std::endl;
  json << "      {"                                           << std::endl;
  json << "         \"name\" : \"NFiles\","                   << std::endl;
940
941
  json << "         \"operation\" : \"sum\","                 << std::endl;
  json << "         \"type\" : \"integer\""                   << std::endl;
942
943
944
  json << "      },"                                          << std::endl;
  json << "      {"                                           << std::endl;
  json << "         \"name\" : \"NLumis\","                   << std::endl;
945
946
  json << "         \"operation\" : \"max\","                 << std::endl;
  json << "         \"type\" : \"integer\""                   << std::endl;
947
948
949
  json << "      },"                                          << std::endl;
  json << "      {"                                           << std::endl;
  json << "         \"name\" : \"LastLumi\","                 << std::endl;
950
951
  json << "         \"operation\" : \"max\","                 << std::endl;
  json << "         \"type\" : \"integer\""                   << std::endl;
952
953
954
955
  json << "      }"                                           << std::endl;
  json << "   ]"                                              << std::endl;
  json << "}"                                                 << std::endl;
  json.close();
956
957

  boost::filesystem::rename(path,eorDefFile_);
958
959
960
961
962
963
964
965
966
}


/// emacs configuration
/// Local Variables: -
/// mode: c++ -
/// c-basic-offset: 2 -
/// indent-tabs-mode: nil -
/// End: -