SourceXtractorPlusPlus
0.19
SourceXtractor++, the next generation SExtractor
Main Page
Related Pages
Namespaces
Classes
Files
File List
File Members
All
Classes
Namespaces
Files
Functions
Variables
Typedefs
Enumerations
Enumerator
Friends
Macros
Groups
Pages
SEImplementation
SEImplementation
Measurement
MultithreadedMeasurement.h
Go to the documentation of this file.
1
17
/*
18
* Multithreadedmeasurement.h
19
*
20
* Created on: May 17, 2018
21
* Author: mschefer
22
*/
23
24
#ifndef _SEIMPLEMENTATION_OUTPUT_MULTITHREADEDMEASUREMENT_H_
25
#define _SEIMPLEMENTATION_OUTPUT_MULTITHREADEDMEASUREMENT_H_
26
27
#include <atomic>
28
#include <thread>
29
#include <mutex>
30
#include <condition_variable>
31
#include <atomic>
32
#include "
AlexandriaKernel/ThreadPool.h
"
33
#include "
AlexandriaKernel/Semaphore.h
"
34
#include "
SEFramework/Pipeline/Measurement.h
"
35
36
namespace
SourceXtractor {
37
38
class
MultithreadedMeasurement
:
public
Measurement
{
39
public
:
40
41
using
SourceToRowConverter
=
std::function<Euclid::Table::Row(const SourceInterface&)>
;
42
MultithreadedMeasurement
(
SourceToRowConverter
source_to_row,
const
std::shared_ptr<Euclid::ThreadPool>
& thread_pool,
43
unsigned
max_queue_size)
44
:
m_source_to_row
(source_to_row),
45
m_thread_pool
(thread_pool),
46
m_group_counter
(0),
47
m_input_done
(false),
m_abort_raised
(false),
m_semaphore
(max_queue_size) {}
48
49
~MultithreadedMeasurement
()
override
;
50
51
void
receiveSource
(
std::unique_ptr<SourceGroupInterface>
source_group)
override
;
52
void
receiveProcessSignal
(
const
ProcessSourcesEvent
& event)
override
;
53
54
void
startThreads
()
override
;
55
void
stopThreads
()
override
;
56
void
synchronizeThreads
()
override
;
57
58
private
:
59
static
void
outputThreadStatic
(
MultithreadedMeasurement
* measurement);
60
void
outputThreadLoop
();
61
62
SourceToRowConverter
m_source_to_row
;
63
std::shared_ptr<Euclid::ThreadPool>
m_thread_pool
;
64
std::unique_ptr<std::thread>
m_output_thread
;
65
66
int
m_group_counter
;
67
std::atomic_bool
m_input_done
,
m_abort_raised
;
68
69
std::condition_variable
m_new_output
;
70
std::list<std::pair<int, std::unique_ptr<SourceGroupInterface>
>>
m_output_queue
;
71
std::mutex
m_output_queue_mutex
;
72
Euclid::Semaphore
m_semaphore
;
73
};
74
75
}
76
77
#endif
/* _SEIMPLEMENTATION_OUTPUT_MULTITHREADEDMEASUREMENT_H_ */
SourceXtractor::MultithreadedMeasurement::m_source_to_row
SourceToRowConverter m_source_to_row
Definition:
MultithreadedMeasurement.h:62
ThreadPool.h
std::shared_ptr< Euclid::ThreadPool >
SourceXtractor::MultithreadedMeasurement::m_abort_raised
std::atomic_bool m_abort_raised
Definition:
MultithreadedMeasurement.h:67
std::function< Euclid::Table::Row(const SourceInterface &)>
SourceXtractor::MultithreadedMeasurement::MultithreadedMeasurement
MultithreadedMeasurement(SourceToRowConverter source_to_row, const std::shared_ptr< Euclid::ThreadPool > &thread_pool, unsigned max_queue_size)
Definition:
MultithreadedMeasurement.h:42
SourceXtractor::ProcessSourcesEvent
Event received by SourceGrouping to request the processing of some of the Sources stored...
Definition:
PipelineStage.h:33
SourceXtractor::MultithreadedMeasurement::m_output_queue
std::list< std::pair< int, std::unique_ptr< SourceGroupInterface > > > m_output_queue
Definition:
MultithreadedMeasurement.h:70
std::condition_variable
SourceXtractor::MultithreadedMeasurement::~MultithreadedMeasurement
~MultithreadedMeasurement() override
Definition:
MultithreadedMeasurement.cpp:36
SourceXtractor::MultithreadedMeasurement::receiveSource
void receiveSource(std::unique_ptr< SourceGroupInterface > source_group) override
Definition:
MultithreadedMeasurement.cpp:76
SourceXtractor::MultithreadedMeasurement::receiveProcessSignal
void receiveProcessSignal(const ProcessSourcesEvent &event) override
Definition:
MultithreadedMeasurement.cpp:141
std::mutex
SourceXtractor::MultithreadedMeasurement::m_new_output
std::condition_variable m_new_output
Definition:
MultithreadedMeasurement.h:69
Measurement.h
SourceXtractor::Measurement
Definition:
Measurement.h:34
Euclid::Semaphore
std::list
STL class.
SourceXtractor::MultithreadedMeasurement::m_thread_pool
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Definition:
MultithreadedMeasurement.h:63
SourceXtractor::MultithreadedMeasurement::m_output_thread
std::unique_ptr< std::thread > m_output_thread
Definition:
MultithreadedMeasurement.h:64
SourceXtractor::MultithreadedMeasurement::m_semaphore
Euclid::Semaphore m_semaphore
Definition:
MultithreadedMeasurement.h:72
std::unique_ptr
STL class.
SourceXtractor::MultithreadedMeasurement::m_group_counter
int m_group_counter
Definition:
MultithreadedMeasurement.h:66
SourceXtractor::MultithreadedMeasurement::m_output_queue_mutex
std::mutex m_output_queue_mutex
Definition:
MultithreadedMeasurement.h:71
SourceXtractor::MultithreadedMeasurement
Definition:
MultithreadedMeasurement.h:38
SourceXtractor::MultithreadedMeasurement::stopThreads
void stopThreads() override
Definition:
MultithreadedMeasurement.cpp:46
SourceXtractor::MultithreadedMeasurement::m_input_done
std::atomic_bool m_input_done
Definition:
MultithreadedMeasurement.h:67
SourceXtractor::MultithreadedMeasurement::startThreads
void startThreads() override
Definition:
MultithreadedMeasurement.cpp:42
SourceXtractor::MultithreadedMeasurement::outputThreadLoop
void outputThreadLoop()
Definition:
MultithreadedMeasurement.cpp:119
SourceXtractor::MultithreadedMeasurement::synchronizeThreads
void synchronizeThreads() override
Definition:
MultithreadedMeasurement.cpp:53
SourceXtractor::MultithreadedMeasurement::outputThreadStatic
static void outputThreadStatic(MultithreadedMeasurement *measurement)
Definition:
MultithreadedMeasurement.cpp:103
Semaphore.h
Generated by
1.8.5