SourceXtractorPlusPlus  0.19
SourceXtractor++, the next generation SExtractor
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1 
17 /*
18  * MultiThreadedMeasurement.cpp
19  *
20  * Created on: May 23, 2018
21  * Author: mschefer
22  */
23 
24 #include <chrono>
25 #include <ElementsKernel/Logging.h>
26 #include <csignal>
27 
30 
31 using namespace SourceXtractor;
32 
34 
35 
37  if (m_output_thread->joinable()) {
39  }
40 }
41 
43  m_output_thread = Euclid::make_unique<std::thread>(outputThreadStatic, this);
44 }
45 
47  m_input_done = true;
48  m_thread_pool->block();
50  logger.debug() << "All worker threads done!";
51 }
52 
54  // Wait until all worker threads are done
55  m_thread_pool->block();
56 
57  // Wait until the output queue is empty
58  while (true) {
59  {
61  if (m_output_queue.empty()) {
62  break;
63  }
64  else if (m_thread_pool->checkForException(false)) {
65  logger.fatal() << "An exception was thrown from a worker thread";
66  m_thread_pool->checkForException(true);
67  }
68  else if (m_thread_pool->activeThreads() == 0) {
69  throw Elements::Exception() << "No active threads and the queue is not empty! Please, report this as a bug";
70  }
71  }
73  }
74 }
75 
77  // Force computation of SourceID here, where the order is still deterministic
78  for (auto& source : *source_group) {
79  source.getProperty<SourceID>();
80  }
81 
82  // Put the new SourceGroup into the input queue
83  auto order_number = m_group_counter;
84  auto lambda = [this, order_number, source_group = std::move(source_group)]() mutable {
85  // Trigger measurements
86  for (auto& source : *source_group) {
87  m_source_to_row(source);
88  }
89  // Pass to the output thread
90  {
92  m_output_queue.emplace_back(order_number, std::move(source_group));
93  }
95  };
96  auto lambda_copyable = [lambda = std::make_shared<decltype(lambda)>(std::move(lambda))](){
97  (*lambda)();
98  };
99  m_thread_pool->submit(lambda_copyable);
100  ++m_group_counter;
101 }
102 
104  logger.debug() << "Starting output thread";
105  try {
106  measurement->outputThreadLoop();
107  }
108  catch (const Elements::Exception& e) {
109  logger.fatal() << "Output thread got an exception!";
110  logger.fatal() << e.what();
111  if (!measurement->m_abort_raised.exchange(true)) {
112  logger.fatal() << "Aborting the execution";
113  ::raise(SIGTERM);
114  }
115  }
116  logger.debug() << "Stopping output thread";
117 }
118 
120  while (m_thread_pool->activeThreads() > 0) {
122 
123  // Wait for something in the output queue
124  if (m_output_queue.empty()) {
126  }
127 
128  // Process the output queue
129  while (!m_output_queue.empty()) {
130  sendSource(std::move(m_output_queue.front().second));
131  m_output_queue.pop_front();
132  }
133 
134  if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 &&
135  m_output_queue.empty()) {
136  break;
137  }
138  }
139 }
140 
142  sendProcessSignal(event);
143 }
static auto logger
Definition: WCS.cpp:41
void sendProcessSignal(const ProcessSourcesEvent &event) const
Definition: PipelineStage.h:92
T joinable(T...args)
Event received by SourceGrouping to request the processing of some of the Sources stored...
Definition: PipelineStage.h:33
T sleep_for(T...args)
std::list< std::pair< int, std::unique_ptr< SourceGroupInterface > > > m_output_queue
void receiveSource(std::unique_ptr< SourceGroupInterface > source_group) override
void receiveProcessSignal(const ProcessSourcesEvent &event) override
constexpr double e
T join(T...args)
T move(T...args)
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
std::unique_ptr< std::thread > m_output_thread
STL class.
const char * what() const noexceptoverride
void sendSource(std::unique_ptr< SourceGroupInterface > source) const
Definition: PipelineStage.h:85
static Logging getLogger(const std::string &name="")
static void outputThreadStatic(MultithreadedMeasurement *measurement)