SourceXtractorPlusPlus  0.19
SourceXtractor++, the next generation SExtractor
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SourceXtractor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright © 2019-2022 Université de Genève, LMU Munich - Faculty of Physics, IAP-CNRS/Sorbonne Université
3  *
4  * This library is free software; you can redistribute it and/or modify it under
5  * the terms of the GNU Lesser General Public License as published by the Free
6  * Software Foundation; either version 3.0 of the License, or (at your option)
7  * any later version.
8  *
9  * This library is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12  * details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this library; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
24 #include <dlfcn.h>
25 #include <iomanip>
26 #include <map>
27 #include <string>
28 #include <typeinfo>
29 
30 #include <boost/program_options.hpp>
31 #include <boost/algorithm/string/predicate.hpp>
33 
34 #include "ElementsKernel/Main.h"
35 #include "ElementsKernel/System.h"
37 
39 #include "Configuration/Utils.h"
40 
42 
50 
52 
55 
79 
81 #include "SEMain/PluginConfig.h"
82 #include "SEMain/Sorter.h"
83 
84 
85 namespace po = boost::program_options;
86 namespace fs = boost::filesystem;
87 using namespace SourceXtractor;
88 using namespace Euclid::Configuration;
89 
91 
92 static const std::string LIST_OUTPUT_PROPERTIES {"list-output-properties"};
93 static const std::string PROPERTY_COLUMN_MAPPING_ALL {"property-column-mapping-all"};
94 static const std::string PROPERTY_COLUMN_MAPPING {"property-column-mapping"};
95 static const std::string DUMP_CONFIG {"dump-default-config"};
96 
97 class GroupObserver : public Observer<std::shared_ptr<SourceGroupInterface>> {
98 public:
99  virtual void handleMessage(const std::shared_ptr<SourceGroupInterface>& group) override {
100  m_list.push_back(group);
101  }
102 
104 };
105 
106 class SourceObserver : public Observer<std::shared_ptr<SourceWithOnDemandProperties>> {
107 public:
108  virtual void handleMessage(const std::shared_ptr<SourceWithOnDemandProperties>& source) override {
109  m_list.push_back(source);
110  }
111 
113 };
114 
116 
117 static void setupEnvironment(void) {
118  // Some parts of boost (including boost::filesystem) can throw an exception when the
119  // locale as configured in the environment is invalid.
120  // We work around that overriding the locale if we find an invalid one.
121  // See https://svn.boost.org/trac10/ticket/10205
122  try {
123  std::locale("");
124  }
125  catch (...) {
126  ::setenv("LC_ALL", "C", 1);
127  }
128 }
129 
137  bool omp_env_present = getenv("OMP_NUM_THREADS") || getenv("OMP_DYNAMIC");
138  bool mkl_env_present = getenv("MKL_NUM_THREADS") || getenv("MKL_DYNAMIC");
139  if (!omp_env_present && !mkl_env_present) {
140  // Despite the documentation, the methods following C ABI are capitalized
141  void (*set_num_threads)(int) = reinterpret_cast<void (*)(int)>(dlsym(RTLD_DEFAULT, "MKL_Set_Num_Threads"));
142  void (*set_dynamic)(int) = reinterpret_cast<void (*)(int)>(dlsym(RTLD_DEFAULT, "MKL_Set_Dynamic"));
143  if (set_num_threads) {
144  logger.debug() << "Disabling multithreading";
145  set_num_threads(1);
146  }
147  if (set_dynamic) {
148  logger.debug() << "Disabling dynamic multithreading";
149  set_dynamic(0);
150  }
151  }
152 }
153 
154 class SEMain : public Elements::Program {
155 
156  std::shared_ptr<TaskFactoryRegistry> task_factory_registry = std::make_shared<TaskFactoryRegistry>();
157  std::shared_ptr<TaskProvider> task_provider = std::make_shared<TaskProvider>(task_factory_registry);
158  std::shared_ptr<OutputRegistry> output_registry = std::make_shared<OutputRegistry>();
159  SegmentationFactory segmentation_factory {task_provider};
160  OutputFactory output_factory { output_registry };
163  std::make_shared<SourceWithOnDemandPropertiesFactory>(task_provider);
165  std::make_shared<SourceGroupWithOnDemandPropertiesFactory>(task_provider);
166  PartitionFactory partition_factory {source_factory};
167  GroupingFactory grouping_factory {group_factory};
168  DeblendingFactory deblending_factory {source_factory};
169  MeasurementFactory measurement_factory { output_registry };
170  ProgressReporterFactory progress_printer_factory {};
171 
172  bool config_initialized = false;
173  po::options_description config_parameters;
174 
175 public:
176 
177  SEMain(const std::string& plugin_path, const std::vector<std::string>& plugin_list)
178  : plugin_manager { task_factory_registry, output_registry, plugin_path, plugin_list } {
179  }
180 
184  po::options_description getConfigParameters() {
185  if (!config_initialized) {
186  auto& config_manager = ConfigManager::getInstance(config_manager_id);
187  config_manager.registerConfiguration<SourceXtractorConfig>();
188  config_manager.registerConfiguration<BackgroundConfig>();
189  config_manager.registerConfiguration<SE2BackgroundConfig>();
190  config_manager.registerConfiguration<MemoryConfig>();
191  config_manager.registerConfiguration<BackgroundAnalyzerFactory>();
192  config_manager.registerConfiguration<SamplingConfig>();
193  config_manager.registerConfiguration<DetectionFrameConfig>();
194 
196 
197  //plugins need to be registered before reportConfigDependencies()
198  plugin_manager.loadPlugins();
199  task_factory_registry->reportConfigDependencies(config_manager);
200  segmentation_factory.reportConfigDependencies(config_manager);
201  partition_factory.reportConfigDependencies(config_manager);
202  grouping_factory.reportConfigDependencies(config_manager);
203  deblending_factory.reportConfigDependencies(config_manager);
204  measurement_factory.reportConfigDependencies(config_manager);
205  output_factory.reportConfigDependencies(config_manager);
206 
207  config_parameters.add(config_manager.closeRegistration());
208  config_initialized = true;
209  }
210  return config_parameters;
211  }
212 
215  auto options = getConfigParameters();
216 
217  options.add_options() (LIST_OUTPUT_PROPERTIES.c_str(), po::bool_switch(),
218  "List the possible output properties for the given input parameters and exit");
219  options.add_options() (PROPERTY_COLUMN_MAPPING_ALL.c_str(), po::bool_switch(),
220  "Show the columns created for each property");
221  options.add_options() (PROPERTY_COLUMN_MAPPING.c_str(), po::bool_switch(),
222  "Show the columns created for each property, for the given configuration");
223  options.add_options() (DUMP_CONFIG.c_str(), po::bool_switch(),
224  "Dump parameters with default values into a configuration file");
225  progress_printer_factory.addOptions(options);
226 
227  // Allow to pass Python options as positional following --
228  po::positional_options_description p;
229  p.add("python-arg", -1);
230 
231  return {options, p};
232  }
233 
235  template <typename T>
236  static void writeDefault(std::ostream& out, const po::option_description& opt, const boost::any& default_value) {
237  out << opt.long_name() << '=' << boost::any_cast<T>(default_value) << std::endl;
238  }
239 
241  template <typename T>
242  static void writeDefaultMultiple(std::ostream& out, const po::option_description& opt, const boost::any& default_value) {
243  auto values = boost::any_cast<std::vector<T>>(default_value);
244  if (values.empty()) {
245  out << "# " << opt.long_name() << '=' << std::endl;
246  }
247  else {
248  for (const auto& v : values)
249  out << opt.long_name() << '=' << v << std::endl;
250  }
251  }
252 
254  void printDefaults() {
257  {typeid(bool), &writeDefault<bool>},
258  {typeid(int), &writeDefault<int>},
259  {typeid(double), &writeDefault<double>},
260  {typeid(std::string), &writeDefault<std::string>},
261  {typeid(std::vector<std::string>), &writeDefaultMultiple<std::string>}
262  };
263  decltype(printers)::const_iterator printer;
264 
265  auto config_parameters = getConfigParameters();
266  for (const auto& p : config_parameters.options()) {
267  boost::any default_value;
268 
269  std::cout << "# " << p->description() << std::endl;
270  if (!p->semantic()->apply_default(default_value)) {
271  std::cout << '#' << p->long_name() << "=" << std::endl;
272  }
273  else if ((printer = printers.find(default_value.type())) == printers.end()) {
274  std::cout << '#' << p->long_name() << "=<Unknown type " << default_value.type().name() << '>' << std::endl;
275  }
276  else {
277  printer->second(std::cout, *p, default_value);
278  }
279  std::cout << std::endl;
280  }
281 
282  // We need to print the log options manually, as that is set up by Elements
283  std::cout << "# Log level: FATAL, ERROR, WARN, INFO, DEBUG" << std::endl;
284  std::cout << "log-level=INFO" << std::endl;
285  std::cout << "# Log file" << std::endl;
286  std::cout << "#log-file" << std::endl;
287  }
288 
290 
291  // If the user just requested to see the possible output columns we show
292  // them and we do nothing else
293 
294  if (args.at(LIST_OUTPUT_PROPERTIES).as<bool>()) {
295  for (auto& name : output_registry->getOutputPropertyNames()) {
296  std::cout << name << std::endl;
297  }
298  return Elements::ExitCode::OK;
299  }
300 
301  if (args.at(PROPERTY_COLUMN_MAPPING_ALL).as<bool>()) {
302  output_registry->printPropertyColumnMap();
303  return Elements::ExitCode::OK;
304  }
305 
306  if (args.at(DUMP_CONFIG).as<bool>()) {
307  printDefaults();
308  return Elements::ExitCode::OK;
309  }
310 
311  // Make sure the BLAS multithreading does not interfere
313 
314  // Elements does not verify that the config-file exists. It will just not read it.
315  // We verify that it does exist here.
316  if (args.find("config-file") != args.end()) {
317  auto cfg_file = args.at("config-file").as<fs::path>();
318  if (cfg_file != "" && !fs::exists(cfg_file)) {
319  throw Elements::Exception() << "The configuration file '" << cfg_file << "' does not exist";
320  }
321  }
322 
323  // Create the progress listener and printer ASAP
324  progress_printer_factory.configure(args);
325  auto progress_mediator = progress_printer_factory.createProgressMediator();
326 
327  // Initialize the rest of the components
328  auto& config_manager = ConfigManager::getInstance(config_manager_id);
329  config_manager.initialize(args);
330 
331  // Configure TileManager
332  auto memory_config = config_manager.getConfiguration<MemoryConfig>();
333  TileManager::getInstance()->setOptions(memory_config.getTileSize(),
334  memory_config.getTileSize(), memory_config.getTileMaxMemory());
335 
336  CheckImages::getInstance().configure(config_manager);
337 
338  task_factory_registry->configure(config_manager);
339  task_factory_registry->registerPropertyInstances(*output_registry);
340 
341  segmentation_factory.configure(config_manager);
342  partition_factory.configure(config_manager);
343  grouping_factory.configure(config_manager);
344  deblending_factory.configure(config_manager);
345  measurement_factory.configure(config_manager);
346  output_factory.configure(config_manager);
347 
348  if (args.at(PROPERTY_COLUMN_MAPPING).as<bool>()) {
349  output_registry->printPropertyColumnMap(config_manager.getConfiguration<OutputConfig>().getOutputProperties());
350  return Elements::ExitCode::OK;
351  }
352 
353  auto segmentation = segmentation_factory.createSegmentation();
354 
355  // Multithreading
356  auto multithreading_config = config_manager.getConfiguration<MultiThreadingConfig>();
357  auto thread_pool = multithreading_config.getThreadPool();
358 
359  // Rest of the stages
360  auto partition = partition_factory.getPartition();
361  auto source_grouping = grouping_factory.createGrouping();
362 
363  std::shared_ptr<Deblending> deblending = deblending_factory.createDeblending();
364  std::shared_ptr<Measurement> measurement = measurement_factory.getMeasurement();
365  std::shared_ptr<Output> output = output_factory.createOutput();
366 
367  // Prefetcher
368  std::shared_ptr<Prefetcher> prefetcher;
369  if (thread_pool) {
370  auto prefetch = source_grouping->requiredProperties();
371  auto deblending_prefetch = deblending->requiredProperties();
372  prefetch.insert(deblending_prefetch.begin(), deblending_prefetch.end());
373  if (!prefetch.empty()) {
374  prefetcher = std::make_shared<Prefetcher>(thread_pool, multithreading_config.getMaxQueueSize());
375  prefetcher->requestProperties(prefetch);
376  }
377  }
378 
379  // Link together the pipeline's steps
380  segmentation->setNextStage(partition);
381 
382  if (prefetcher) {
383  partition->setNextStage(prefetcher);
384  prefetcher->setNextStage(source_grouping);
385  }
386  else {
387  partition->setNextStage(source_grouping);
388  }
389 
390  source_grouping->setNextStage(deblending);
391  deblending->setNextStage(measurement);
392 
393  if (config_manager.getConfiguration<OutputConfig>().getOutputUnsorted()) {
394  logger.info() << "Writing output following measure order";
395  measurement->setNextStage(output);
396  } else {
397  logger.info() << "Writing output following segmentation order";
398  auto sorter = std::make_shared<Sorter>();
399  measurement->setNextStage(sorter);
400  sorter->setNextStage(output);
401  }
402 
403  segmentation->Observable<SegmentationProgress>::addObserver(progress_mediator->getSegmentationObserver());
404  segmentation->Observable<SourceInterface>::addObserver(progress_mediator->getDetectionObserver());
405  deblending->Observable<SourceGroupInterface>::addObserver(progress_mediator->getDeblendingObserver());
406  measurement->Observable<SourceGroupInterface>::addObserver(progress_mediator->getMeasurementObserver());
407 
408  // Add observers for CheckImages
409  if (CheckImages::getInstance().getSegmentationImage(0) != nullptr) {
410  segmentation->Observable<SourceInterface>::addObserver(std::make_shared<DetectionIdCheckImage>());
411  }
412  if (CheckImages::getInstance().getPartitionImage(0) != nullptr) {
413  measurement->Observable<SourceGroupInterface>::addObserver(std::make_shared<SourceIdCheckImage>());
414  }
415  if (CheckImages::getInstance().getGroupImage(0) != nullptr) {
416  measurement->Observable<SourceGroupInterface>::addObserver(std::make_shared<GroupIdCheckImage>());
417  }
418  if (CheckImages::getInstance().getMoffatImage(0) != nullptr) {
419  measurement->Observable<SourceGroupInterface>::addObserver(std::make_shared<MoffatCheckImage>());
420  }
421  const auto& detection_frames = config_manager.getConfiguration<DetectionFrameConfig>().getDetectionFrames();
422 
423  // Perform measurements (multi-threaded part)
424  measurement->startThreads();
425 
426  size_t prev_writen_rows = 0;
427  size_t frame_number = 0;
428  for (auto& detection_frame : detection_frames) {
429  frame_number++;
430  try {
431  // Process the image
432  logger.info() << "Processing frame "
433  << frame_number << " / " << detection_frames.size() << " : " << detection_frame->getLabel();
434  segmentation->processFrame(detection_frame);
435  }
436  catch (const std::exception &e) {
437  logger.error() << "Failed to process the frame! " << e.what();
438  measurement->stopThreads();
440  }
441 
442  if (prefetcher) {
443  prefetcher->synchronize();
444  }
445  measurement->synchronizeThreads();
446 
447  size_t nb_writen_rows = output->flush();
448  output->nextPart();
449 
450  logger.info() << (nb_writen_rows - prev_writen_rows) << " sources detected in frame, " << nb_writen_rows << " total";
451 
452  prev_writen_rows = nb_writen_rows;
453  }
454 
455  if (prefetcher) {
456  prefetcher->wait();
457  }
458  measurement->stopThreads();
459 
461  TileManager::getInstance()->flush();
462  progress_mediator->done();
463 
464  if (prev_writen_rows > 0) {
465  logger.info() << "total " << prev_writen_rows << " sources detected";
466  } else {
467  logger.info() << "NO SOURCES DETECTED";
468  }
469 
470  return Elements::ExitCode::OK;
471  }
472 };
473 
474 
476 
477 public:
479  m_plugin_path(plugin_path), m_plugin_list(plugin_list) {
480  }
481 
482  virtual ~PluginOptionsMain() = default;
483 
484  boost::program_options::options_description defineSpecificProgramOptions() override {
485  auto& config_manager = ConfigManager::getInstance(conf_man_id);
486  config_manager.registerConfiguration<PluginConfig>();
487  auto options = config_manager.closeRegistration();
488  // The following will consume any extra options in the configuration file
489  options.add_options()("*", po::value<std::vector<std::string>>());
490  return options;
491  }
492 
494  auto& config_manager = ConfigManager::getInstance(conf_man_id);
495  config_manager.initialize(args);
496  auto& conf = config_manager.getConfiguration<PluginConfig>();
497  m_plugin_path = conf.getPluginPath();
498  m_plugin_list = conf.getPluginList();
499  return Elements::ExitCode::OK;
500  }
501 
502 private:
503 
504  long conf_man_id = getUniqueManagerId();
507 
508 };
509 
510 
511 static void forwardOptions(int argc, char *const *argv, std::vector<std::string>& plugin_options_input) {
512  for (int i = 0; i < argc; ++i) {
513  std::string option{argv[i]};
514  if (option == "--config-file") {
515  plugin_options_input.emplace_back("--config-file");
516  plugin_options_input.emplace_back(std::string{argv[i + 1]});
517  }
518  if (boost::starts_with(option, "--config-file=")) {
519  plugin_options_input.emplace_back(option);
520  }
521  if (option == "--plugin-directory") {
522  plugin_options_input.emplace_back("--plugin-directory");
523  plugin_options_input.emplace_back(std::string{argv[i + 1]});
524  }
525  if (boost::starts_with(option, "--plugin-directory=")) {
526  plugin_options_input.emplace_back(option);
527  }
528  if (option == "--plugin") {
529  plugin_options_input.emplace_back("--plugin");
530  plugin_options_input.emplace_back(std::string{argv[i + 1]});
531  }
532  if (boost::starts_with(option, "--plugin=")) {
533  plugin_options_input.emplace_back(option);
534  }
535  }
536 }
537 
538 
539 ELEMENTS_API int main(int argc, char* argv[]) {
540  std::string plugin_path {};
541  std::vector<std::string> plugin_list {};
542 
543  // This adds the current directory as a valid location for the default "sourcextractor++.conf" configuration
544  Elements::TempEnv local_env;
545  if (local_env["ELEMENTS_CONF_PATH"].empty()) {
546  local_env["ELEMENTS_CONF_PATH"] = ".:/etc";
547  } else {
548  local_env["ELEMENTS_CONF_PATH"] = ".:" + local_env["ELEMENTS_CONF_PATH"] + ":/etc";
549  }
550 
552 
553  // Try to be reasonably graceful with unhandled exceptions
555 
556  try {
557  // First we create a program which has a sole purpose to get the options for
558  // the plugin paths. Note that we do not want to have this helper program
559  // to handle any other options except of the plugin-directory and plugin, so
560  // we create a subset of the given options with only the necessary ones. We
561  // also turn off the the logging.
562  std::vector<int> masked_indices{};
563  std::vector<std::string> plugin_options_input{};
564  plugin_options_input.emplace_back("DummyProgram");
565  plugin_options_input.emplace_back("--log-level");
566  plugin_options_input.emplace_back("ERROR");
567  forwardOptions(argc, argv, plugin_options_input);
568 
569  int argc_tmp = plugin_options_input.size();
570  std::vector<const char *> argv_tmp(argc_tmp);
571  for (unsigned int i = 0; i < plugin_options_input.size(); ++i) {
572  auto& option_str = plugin_options_input[i];
573  argv_tmp[i] = option_str.data();
574  }
575 
576  CREATE_MANAGER_WITH_ARGS(plugin_options_program, PluginOptionsMain, plugin_path, plugin_list);
577  plugin_options_program.run(argc_tmp, const_cast<char **>(argv_tmp.data()));
578 
579  CREATE_MANAGER_WITH_ARGS(main, SEMain, plugin_path, plugin_list);
580  Elements::ExitCode exit_code = main.run(argc, argv);
581  return static_cast<Elements::ExitCodeType>(exit_code);
582  }
583  catch (const std::exception &e) {
584  logger.fatal() << e.what();
586  }
587  catch (...) {
588  logger.fatal() << "Unknown exception type!";
589  logger.fatal() << "Please, report this as a bug";
591  }
592 }
static long config_manager_id
T set_terminate(T...args)
std::list< std::shared_ptr< SourceWithOnDemandProperties > > m_list
PluginManager plugin_manager
static auto logger
Definition: WCS.cpp:41
static const std::string LIST_OUTPUT_PROPERTIES
SEMain(const std::string &plugin_path, const std::vector< std::string > &plugin_list)
static const std::string PROPERTY_COLUMN_MAPPING
std::string & m_plugin_path
static ConfigManager & getInstance(long id)
void printDefaults()
Print a configuration file populated with defaults.
static void onTerminate() noexcept
long getUniqueManagerId() noexcept
T endl(T...args)
T end(T...args)
std::pair< po::options_description, po::positional_options_description > defineProgramArguments() override
Return the arguments that the program accepts.
STL class.
T partition(T...args)
STL class.
T getenv(T...args)
T at(T...args)
std::list< std::shared_ptr< SourceGroupInterface > > m_list
PluginOptionsMain(std::string &plugin_path, std::vector< std::string > &plugin_list)
static void setupEnvironment(void)
constexpr double e
T data(T...args)
virtual void handleMessage(const std::shared_ptr< SourceWithOnDemandProperties > &source) override
po::options_description config_parameters
boost::program_options::options_description defineSpecificProgramOptions() override
T what(T...args)
const std::vector< std::string > getOutputProperties()
static CheckImages & getInstance()
Definition: CheckImages.h:150
Elements::ExitCode mainMethod(std::map< std::string, po::variable_value > &args) override
The SegmentationFactory will provide a Segmentation implementation based on the current configuration...
std::underlying_type< ExitCode >::type ExitCodeType
po::options_description getConfigParameters()
std::shared_ptr< WriteableImage< int > > getSegmentationImage(size_t index) const
Definition: CheckImages.h:54
void configure(Euclid::Configuration::ConfigManager &manager) override
Method which should initialize the object.
Definition: CheckImages.cpp:84
#define ELEMENTS_API
static void writeDefaultMultiple(std::ostream &out, const po::option_description &opt, const boost::any &default_value)
Print a multiple-value option.
STL class.
STL class.
ELEMENTS_API int main(int argc, char *argv[])
void reportConfigDependencies(Euclid::Configuration::ConfigManager &manager) const override
Registers all the Configuration dependencies.
Definition: CheckImages.cpp:58
T find(T...args)
static void writeDefault(std::ostream &out, const po::option_description &opt, const boost::any &default_value)
Print a simple option.
T c_str(T...args)
Elements::ExitCode mainMethod(std::map< std::string, boost::program_options::variable_value > &args) override
Observer interface to be used with Observable to implement the Observer pattern.
Definition: Observable.h:38
static const std::string DUMP_CONFIG
const std::shared_ptr< Euclid::ThreadPool > & getThreadPool() const
static void disableBlasMultithreading()
virtual void handleMessage(const std::shared_ptr< SourceGroupInterface > &group) override
PluginManager handles the loading of plugins and calls their registration function, providing them with with a PluginAPI implementation.
Definition: PluginManager.h:54
static std::shared_ptr< TileManager > getInstance()
std::string getPluginPath() const
static void forwardOptions(int argc, char *const *argv, std::vector< std::string > &plugin_options_input)
static Logging getLogger(const std::string &name="")
STL class.
Provides combined detection frame.
std::vector< std::string > & m_plugin_list
static const std::string PROPERTY_COLUMN_MAPPING_ALL
#define CREATE_MANAGER_WITH_ARGS(MANAGER, ELEMENTS_PROGRAM,...)
T emplace_back(T...args)