Ardour  9.0-pre0-582-g084a23a80d
mpmc_queue.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2010-2011 Dmitry Vyukov
3  * Copyright (C) 2019 Robin Gareus <robin@gareus.org>
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19 
20 #ifndef _pbd_mpc_queue_h_
21 #define _pbd_mpc_queue_h_
22 
23 #include <cassert>
24 #include <stdint.h>
25 #include <stdlib.h>
26 
27 # include <atomic>
28 # define MPMC_QUEUE_TYPE std::atomic<size_t>
29 
30 namespace PBD {
31 
32 /* Lock free multiple producer, multiple consumer queue
33  *
34  * Inspired by http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
35  * Kudos to Dmitry Vyukov who licensed that code in terms of a 2-clause BSD license.
36  */
37 template <typename T>
38 class /*LIBPBD_API*/ MPMCQueue
39 {
40 public:
41  MPMCQueue (size_t buffer_size = 8)
42  : _buffer (0)
43  , _buffer_mask (0)
44  {
45  reserve (buffer_size);
46  }
47 
49  {
50  delete[] _buffer;
51  }
52 
53  size_t capacity () const {
54  return _buffer_mask + 1;
55  }
56 
57  static size_t
58  power_of_two_size (size_t sz)
59  {
60  int32_t power_of_two;
61  for (power_of_two = 1; 1U << power_of_two < sz; ++power_of_two) ;
62  return 1U << power_of_two;
63  }
64 
65  void
66  reserve (size_t buffer_size)
67  {
68  buffer_size = power_of_two_size (buffer_size);
69  assert ((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0));
70  if (_buffer_mask >= buffer_size - 1) {
71  return;
72  }
73  delete[] _buffer;
74  _buffer = new cell_t[buffer_size];
75  _buffer_mask = buffer_size - 1;
76  clear ();
77  }
78 
79  void
80  clear ()
81  {
82  for (size_t i = 0; i <= _buffer_mask; ++i) {
83  _buffer[i]._sequence.store (i, std::memory_order_relaxed);
84  }
85  _enqueue_pos.store (0, std::memory_order_relaxed);
86  _dequeue_pos.store (0, std::memory_order_relaxed);
87  }
88 
89  bool
90  push_back (T const& data)
91  {
92  cell_t* cell;
93  size_t pos = _enqueue_pos.load (std::memory_order_relaxed);
94 
95  for (;;) {
96  cell = &_buffer[pos & _buffer_mask];
97  size_t seq = cell->_sequence.load (std::memory_order_acquire);
98  intptr_t dif = (intptr_t)seq - (intptr_t)pos;
99  if (dif == 0) {
100  if (_enqueue_pos.compare_exchange_weak (pos, pos + 1, std::memory_order_relaxed))
101  {
102  break;
103  }
104  } else if (dif < 0) {
105  return false;
106  } else {
107  pos = _enqueue_pos.load (std::memory_order_relaxed);
108  }
109  }
110 
111  cell->_data = data;
112  cell->_sequence.store (pos + 1, std::memory_order_release);
113 
114  return true;
115  }
116 
117  bool
118  pop_front (T& data)
119  {
120  cell_t* cell;
121  size_t pos = _dequeue_pos.load (std::memory_order_relaxed);
122 
123  for (;;) {
124  cell = &_buffer[pos & _buffer_mask];
125  size_t seq = cell->_sequence.load (std::memory_order_acquire);
126  intptr_t dif = (intptr_t)seq - (intptr_t) (pos + 1);
127  if (dif == 0) {
128  if (_dequeue_pos.compare_exchange_weak (pos, pos + 1, std::memory_order_relaxed))
129  {
130  break;
131  }
132  } else if (dif < 0) {
133  return false;
134  } else {
135  pos = _dequeue_pos.load (std::memory_order_relaxed);
136  }
137  }
138 
139  data = cell->_data;
140  cell->_sequence.store (pos + _buffer_mask + 1, std::memory_order_release);
141  return true;
142  }
143 
144 private:
145  struct cell_t {
147  T _data;
148  };
149 
150  char _pad0[64];
152  size_t _buffer_mask;
153  char _pad1[64 - sizeof (cell_t*) - sizeof (size_t)];
155  char _pad2[64 - sizeof (size_t)];
157  char _pad3[64 - sizeof (size_t)];
158 };
159 
160 } // namespace PBD
161 
162 #undef MPMC_QUEUE_TYPE
163 
164 #endif
void clear()
Definition: mpmc_queue.h:80
char _pad2[64 - sizeof(size_t)]
Definition: mpmc_queue.h:155
std::atomic< size_t > _enqueue_pos
Definition: mpmc_queue.h:154
size_t _buffer_mask
Definition: mpmc_queue.h:152
static size_t power_of_two_size(size_t sz)
Definition: mpmc_queue.h:58
void reserve(size_t buffer_size)
Definition: mpmc_queue.h:66
bool push_back(T const &data)
Definition: mpmc_queue.h:90
MPMCQueue(size_t buffer_size=8)
Definition: mpmc_queue.h:41
bool pop_front(T &data)
Definition: mpmc_queue.h:118
cell_t * _buffer
Definition: mpmc_queue.h:151
std::atomic< size_t > _dequeue_pos
Definition: mpmc_queue.h:156
char _pad1[64 - sizeof(cell_t *) - sizeof(size_t)]
Definition: mpmc_queue.h:153
char _pad0[64]
Definition: mpmc_queue.h:150
char _pad3[64 - sizeof(size_t)]
Definition: mpmc_queue.h:157
size_t capacity() const
Definition: mpmc_queue.h:53
#define MPMC_QUEUE_TYPE
Definition: mpmc_queue.h:28
Definition: axis_view.h:42
std::atomic< size_t > _sequence
Definition: mpmc_queue.h:146