Ardour  9.0-pre0-582-g084a23a80d
threader.h
Go to the documentation of this file.
1 #ifndef AUDIOGRAPHER_THREADER_H
2 #define AUDIOGRAPHER_THREADER_H
3 
4 #include <atomic>
5 #include <vector>
6 #include <algorithm>
7 
8 #include "glibmm/threads.h"
9 #include <glibmm/threadpool.h>
10 #include <glibmm/timeval.h>
11 #include <sigc++/slot.h>
12 
13 #include <glib.h>
14 
15 #include "pbd/atomic.h"
16 #include "pbd/compose.h"
17 
19 #include "audiographer/source.h"
20 #include "audiographer/sink.h"
21 #include "audiographer/exception.h"
22 
23 namespace AudioGrapher
24 {
25 
27 class /*LIBAUDIOGRAPHER_API*/ ThreaderException : public Exception
28 {
29  public:
30  template<typename T>
31  ThreaderException (T const & thrower, std::exception const & e)
32  : Exception (thrower, string_compose ("\n\t- Dynamic type: %1\n\t- what(): %2", DebugUtils::demangled_name (e), e.what()))
33  { }
34 };
35 
37 template <typename T = DefaultSampleType>
38 class /*LIBAUDIOGRAPHER_API*/ Threader : public Source<T>, public Sink<T>
39 {
40  private:
41  typedef std::vector<typename Source<T>::SinkPtr> OutputVec;
42 
43  public:
44 
50  Threader (Glib::ThreadPool & thread_pool, long wait_timeout_milliseconds = 500)
52  , wait_timeout (wait_timeout_milliseconds)
53  {
54  readers.store (0);
55  }
56 
57  virtual ~Threader () {}
58 
60  void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
61 
63  void clear_outputs () { outputs.clear (); }
64 
66  void remove_output (typename Source<T>::SinkPtr output) {
67  typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
68  outputs.erase (new_end, outputs.end());
69  }
70 
72  void process (ProcessContext<T> const & c)
73  {
74  wait_mutex.lock();
75 
76  exception.reset();
77 
78  unsigned int outs = outputs.size();
79  (void) readers.fetch_add (outs);
80  for (unsigned int i = 0; i < outs; ++i) {
81  thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
82  }
83 
84  wait();
85  }
86 
87  using Sink<T>::process;
88 
89  private:
90 
91  void wait()
92  {
93  while (readers.load () != 0) {
94  gint64 end_time = g_get_monotonic_time () + (wait_timeout * G_TIME_SPAN_MILLISECOND);
95  wait_cond.wait_until(wait_mutex, end_time);
96  }
97 
98  wait_mutex.unlock();
99 
100  if (exception) {
101  throw *exception;
102  }
103  }
104 
105  void process_output(ProcessContext<T> const & c, unsigned int output)
106  {
107  try {
108  outputs[output]->process (c);
109  } catch (std::exception const & e) {
110  // Only first exception will be passed on
111  exception_mutex.lock();
112  if(!exception) { exception.reset (new ThreaderException (*this, e)); }
113  exception_mutex.unlock();
114  }
115 
117  wait_cond.signal();
118  }
119  }
120 
122 
123  Glib::ThreadPool& thread_pool;
124  Glib::Threads::Mutex wait_mutex;
125  Glib::Threads::Cond wait_cond;
126 
127  std::atomic<int> readers;
129 
130  Glib::Threads::Mutex exception_mutex;
131  std::shared_ptr<ThreaderException> exception;
132 
133 };
134 
135 } // namespace
136 
137 #endif //AUDIOGRAPHER_THREADER_H
const char * what() const
Definition: exception.h:28
std::shared_ptr< Sink< T > > SinkPtr
Class that stores exceptions thrown from different threads.
Definition: threader.h:28
ThreaderException(T const &thrower, std::exception const &e)
Definition: threader.h:31
Class for distributing processing across several threads.
Definition: threader.h:39
Glib::ThreadPool & thread_pool
Definition: threader.h:123
Glib::Threads::Mutex wait_mutex
Definition: threader.h:124
void add_output(typename Source< T >::SinkPtr output)
Adds output RT safe.
Definition: threader.h:60
Glib::Threads::Mutex exception_mutex
Definition: threader.h:130
void process(ProcessContext< T > const &c)
Processes context concurrently by scheduling each output separately to the given thread pool.
Definition: threader.h:72
Threader(Glib::ThreadPool &thread_pool, long wait_timeout_milliseconds=500)
Definition: threader.h:50
virtual ~Threader()
Definition: threader.h:57
std::vector< typename Source< T >::SinkPtr > OutputVec
Definition: threader.h:41
void clear_outputs()
Clears outputs RT safe.
Definition: threader.h:63
void remove_output(typename Source< T >::SinkPtr output)
Removes a specific output RT safe.
Definition: threader.h:66
std::shared_ptr< ThreaderException > exception
Definition: threader.h:131
void process_output(ProcessContext< T > const &c, unsigned int output)
Definition: threader.h:105
Glib::Threads::Cond wait_cond
Definition: threader.h:125
std::atomic< int > readers
Definition: threader.h:127
std::string string_compose(const std::string &fmt, const T1 &o1)
Definition: compose.h:246
bool atomic_dec_and_test(std::atomic< T > &aval)
Definition: atomic.h:26
std::string demangled_name(T const &obj)
Definition: demangle.h:45
Utilities for debugging.
Definition: debug_utils.h:21