std::atomic - 2nd batch of changes to convert from glib to std::atomic

This commit is contained in:
Paul Davis 2023-02-17 11:34:35 -07:00
parent c304edd253
commit a486fba3e9
30 changed files with 179 additions and 233 deletions

View file

@ -38,7 +38,7 @@ public:
atomic_counter (gint value = 0)
{
g_atomic_int_set (&m_value, value);
m_value.store (value);
}
gint get() const
@ -48,7 +48,7 @@ public:
void set (gint new_value)
{
g_atomic_int_set (&m_value, new_value);
m_value.store (new_value);
}
void increment ()
@ -73,12 +73,7 @@ public:
bool compare_and_exchange (gint old_value, gint new_value)
{
return g_atomic_int_compare_and_exchange
(
&m_value,
old_value,
new_value
);
return m_value.compare_exchange_strong (old_value, new_value);
}
/**

View file

@ -20,21 +20,12 @@
#ifndef _pbd_mpc_queue_h_
#define _pbd_mpc_queue_h_
#if defined(__cplusplus) && __cplusplus >= 201103L
# define MPMC_USE_STD_ATOMIC 1
#endif
#include <cassert>
#include <stdint.h>
#include <stdlib.h>
#ifdef MPMC_USE_STD_ATOMIC
# include <atomic>
# define MPMC_QUEUE_TYPE std::atomic<size_t>
#else
# include <glib.h>
# define MPMC_QUEUE_TYPE std::atomic<unsigned int>
#endif
namespace PBD {
@ -88,64 +79,37 @@ public:
void
clear ()
{
#ifdef MPMC_USE_STD_ATOMIC
for (size_t i = 0; i <= _buffer_mask; ++i) {
_buffer[i]._sequence.store (i, std::memory_order_relaxed);
}
_enqueue_pos.store (0, std::memory_order_relaxed);
_dequeue_pos.store (0, std::memory_order_relaxed);
#else
for (size_t i = 0; i <= _buffer_mask; ++i) {
g_atomic_int_set (&_buffer[i]._sequence, i);
}
_enqueue_pos.store (0);
_dequeue_pos.store (0);
#endif
}
bool
push_back (T const& data)
{
cell_t* cell;
#ifdef MPMC_USE_STD_ATOMIC
size_t pos = _enqueue_pos.load (std::memory_order_relaxed);
#else
guint pos = _enqueue_pos.load ();
#endif
for (;;) {
cell = &_buffer[pos & _buffer_mask];
#ifdef MPMC_USE_STD_ATOMIC
size_t seq = cell->_sequence.load (std::memory_order_acquire);
#else
guint seq = g_atomic_int_get (&cell->_sequence);
#endif
intptr_t dif = (intptr_t)seq - (intptr_t)pos;
if (dif == 0) {
#ifdef MPMC_USE_STD_ATOMIC
if (_enqueue_pos.compare_exchange_weak (pos, pos + 1, std::memory_order_relaxed))
#else
if (g_atomic_int_compare_and_exchange (&_enqueue_pos, pos, pos + 1))
#endif
{
break;
}
} else if (dif < 0) {
return false;
} else {
#ifdef MPMC_USE_STD_ATOMIC
pos = _enqueue_pos.load (std::memory_order_relaxed);
#else
pos = _enqueue_pos.load ();
#endif
}
}
cell->_data = data;
#ifdef MPMC_USE_STD_ATOMIC
cell->_sequence.store (pos + 1, std::memory_order_release);
#else
g_atomic_int_set (&cell->_sequence, pos + 1);
#endif
return true;
}
@ -154,45 +118,26 @@ public:
pop_front (T& data)
{
cell_t* cell;
#ifdef MPMC_USE_STD_ATOMIC
size_t pos = _dequeue_pos.load (std::memory_order_relaxed);
#else
guint pos = _dequeue_pos.load ();
#endif
for (;;) {
cell = &_buffer[pos & _buffer_mask];
#ifdef MPMC_USE_STD_ATOMIC
size_t seq = cell->_sequence.load (std::memory_order_acquire);
#else
guint seq = g_atomic_int_get (&cell->_sequence);
#endif
intptr_t dif = (intptr_t)seq - (intptr_t) (pos + 1);
if (dif == 0) {
#ifdef MPMC_USE_STD_ATOMIC
if (_dequeue_pos.compare_exchange_weak (pos, pos + 1, std::memory_order_relaxed))
#else
if (g_atomic_int_compare_and_exchange (&_dequeue_pos, pos, pos + 1))
#endif
{
break;
}
} else if (dif < 0) {
return false;
} else {
#ifdef MPMC_USE_STD_ATOMIC
pos = _dequeue_pos.load (std::memory_order_relaxed);
#else
pos = _dequeue_pos.load ();
#endif
}
}
data = cell->_data;
#ifdef MPMC_USE_STD_ATOMIC
cell->_sequence.store (pos + _buffer_mask + 1, std::memory_order_release);
#else
g_atomic_int_set (&cell->_sequence, pos + _buffer_mask + 1);
#endif
return true;
}
@ -214,7 +159,6 @@ private:
} // namespace PBD
#undef MPMC_USE_STD_ATOMIC
#undef MPMC_QUEUE_TYPE
#endif

View file

@ -35,13 +35,13 @@ template<class T>
class /*LIBPBD_API*/ PlaybackBuffer
{
public:
static guint power_of_two_size (guint sz) {
static size_t power_of_two_size (size_t sz) {
int32_t power_of_two;
for (power_of_two = 1; 1U << power_of_two < sz; ++power_of_two);
return 1U << power_of_two;
}
PlaybackBuffer (guint sz, guint res = 8191)
PlaybackBuffer (size_t sz, size_t res = 8191)
: reservation (res)
{
sz += reservation;
@ -60,7 +60,7 @@ public:
/* init (mlock) */
T *buffer () { return buf; }
/* init (mlock) */
guint bufsize () const { return size; }
size_t bufsize () const { return size; }
/* write-thread */
void reset () {
@ -75,20 +75,20 @@ public:
/* called from rt (reader) thread for new buffers */
void align_to (PlaybackBuffer const& other) {
Glib::Threads::Mutex::Lock lm (_reset_lock);
g_atomic_int_set (&read_idx, g_atomic_int_get (&other.read_idx));
g_atomic_int_set (&write_idx, g_atomic_int_get (&other.write_idx));
g_atomic_int_set (&reserved, g_atomic_int_get (&other.reserved));
read_idx.store (other.read_idx.load());
write_idx.store (other.write_idx.load());
reserved.store (other.reserved.load());
memset (buf, 0, size * sizeof (T));
}
/* write-thread */
guint write_space () const {
guint w, r;
size_t write_space () const {
size_t w, r;
w = write_idx.load ();
r = read_idx.load ();
guint rv;
size_t rv;
if (w > r) {
rv = ((r + size) - w) & size_mask;
@ -111,8 +111,8 @@ public:
}
/* read-thread */
guint read_space () const {
guint w, r;
size_t read_space () const {
size_t w, r;
w = write_idx.load ();
r = read_idx.load ();
@ -125,8 +125,8 @@ public:
}
/* write thread */
guint overwritable_at (guint r) const {
guint w;
size_t overwritable_at (size_t r) const {
size_t w;
w = write_idx.load ();
@ -137,26 +137,26 @@ public:
}
/* read-thead */
guint read (T *dest, guint cnt, bool commit = true, guint offset = 0);
size_t read (T *dest, size_t cnt, bool commit = true, size_t offset = 0);
/* write-thead */
guint write (T const * src, guint cnt);
size_t write (T const * src, size_t cnt);
/* write-thead */
guint write_zero (guint cnt);
size_t write_zero (size_t cnt);
/* read-thead */
guint increment_write_ptr (guint cnt)
size_t increment_write_ptr (size_t cnt)
{
cnt = std::min (cnt, write_space ());
g_atomic_int_set (&write_idx, (write_idx.load () + cnt) & size_mask);
write_idx.store ((write_idx.load () + cnt) & size_mask);
return cnt;
}
/* read-thead */
guint decrement_read_ptr (guint cnt)
size_t decrement_read_ptr (size_t cnt)
{
SpinLock sl (_reservation_lock);
guint r = read_idx.load ();
guint res = reserved.load ();
size_t r = read_idx.load ();
size_t res = reserved.load ();
cnt = std::min (cnt, res);
@ -164,19 +164,19 @@ public:
res -= cnt;
read_idx.store (r);
g_atomic_int_set (&reserved, res);
reserved.store (res);
return cnt;
}
/* read-thead */
guint increment_read_ptr (guint cnt)
size_t increment_read_ptr (size_t cnt)
{
cnt = std::min (cnt, read_space ());
SpinLock sl (_reservation_lock);
g_atomic_int_set (&read_idx, (read_idx.load () + cnt) & size_mask);
g_atomic_int_set (&reserved, std::min (reservation, reserved.load () + cnt));
read_idx.store ((read_idx.load () + cnt) & size_mask);
reserved.store (std::min (reservation, reserved.load () + cnt));
return cnt;
}
@ -184,28 +184,28 @@ public:
/* read-thead */
bool can_seek (int64_t cnt) {
if (cnt > 0) {
return read_space() >= cnt;
return read_space() >= (size_t) cnt;
} else if (cnt < 0) {
return reserved.load () >= -cnt;
return reserved.load () >= (size_t) -cnt;
} else {
return true;
}
}
guint read_ptr() const { return read_idx.load (); }
guint write_ptr() const { return write_idx.load (); }
guint reserved_size() const { return reserved.load (); }
guint reservation_size() const { return reservation; }
size_t read_ptr() const { return read_idx.load (); }
size_t write_ptr() const { return write_idx.load (); }
size_t reserved_size() const { return reserved.load (); }
size_t reservation_size() const { return reservation; }
private:
T *buf;
const guint reservation;
guint size;
guint size_mask;
const size_t reservation;
size_t size;
size_t size_mask;
mutable std::atomic<int> write_idx;
mutable std::atomic<int> read_idx;
mutable std::atomic<int> reserved;
mutable std::atomic<size_t> write_idx;
mutable std::atomic<size_t> read_idx;
mutable std::atomic<size_t> reserved;
/* spinlock will be used to update write_idx and reserved in sync */
spinlock_t _reservation_lock;
@ -213,20 +213,20 @@ private:
Glib::Threads::Mutex _reset_lock;
};
template<class T> /*LIBPBD_API*/ guint
PlaybackBuffer<T>::write (T const *src, guint cnt)
template<class T> /*LIBPBD_API*/ size_t
PlaybackBuffer<T>::write (T const *src, size_t cnt)
{
guint w = write_idx.load ();
const guint free_cnt = write_space ();
size_t w = write_idx.load ();
const size_t free_cnt = write_space ();
if (free_cnt == 0) {
return 0;
}
const guint to_write = cnt > free_cnt ? free_cnt : cnt;
const guint cnt2 = w + to_write;
const size_t to_write = cnt > free_cnt ? free_cnt : cnt;
const size_t cnt2 = w + to_write;
guint n1, n2;
size_t n1, n2;
if (cnt2 > size) {
n1 = size - w;
n2 = cnt2 & size_mask;
@ -247,20 +247,20 @@ PlaybackBuffer<T>::write (T const *src, guint cnt)
return to_write;
}
template<class T> /*LIBPBD_API*/ guint
PlaybackBuffer<T>::write_zero (guint cnt)
template<class T> /*LIBPBD_API*/ size_t
PlaybackBuffer<T>::write_zero (size_t cnt)
{
guint w = write_idx.load ();
const guint free_cnt = write_space ();
size_t w = write_idx.load ();
const size_t free_cnt = write_space ();
if (free_cnt == 0) {
return 0;
}
const guint to_write = cnt > free_cnt ? free_cnt : cnt;
const guint cnt2 = w + to_write;
const size_t to_write = cnt > free_cnt ? free_cnt : cnt;
const size_t cnt2 = w + to_write;
guint n1, n2;
size_t n1, n2;
if (cnt2 > size) {
n1 = size - w;
n2 = cnt2 & size_mask;
@ -281,8 +281,8 @@ PlaybackBuffer<T>::write_zero (guint cnt)
return to_write;
}
template<class T> /*LIBPBD_API*/ guint
PlaybackBuffer<T>::read (T *dest, guint cnt, bool commit, guint offset)
template<class T> /*LIBPBD_API*/ size_t
PlaybackBuffer<T>::read (T *dest, size_t cnt, bool commit, size_t offset)
{
Glib::Threads::Mutex::Lock lm (_reset_lock, Glib::Threads::TRY_LOCK);
if (!lm.locked ()) {
@ -290,10 +290,10 @@ PlaybackBuffer<T>::read (T *dest, guint cnt, bool commit, guint offset)
return 0;
}
guint r = read_idx.load ();
const guint w = write_idx.load ();
size_t r = read_idx.load ();
const size_t w = write_idx.load ();
guint free_cnt = (w > r) ? (w - r) : ((w - r + size) & size_mask);
size_t free_cnt = (w > r) ? (w - r) : ((w - r + size) & size_mask);
if (!commit && offset > 0) {
if (offset > free_cnt) {
@ -303,11 +303,11 @@ PlaybackBuffer<T>::read (T *dest, guint cnt, bool commit, guint offset)
r = (r + offset) & size_mask;
}
const guint to_read = cnt > free_cnt ? free_cnt : cnt;
const size_t to_read = cnt > free_cnt ? free_cnt : cnt;
const guint cnt2 = r + to_read;
const size_t cnt2 = r + to_read;
guint n1, n2;
size_t n1, n2;
if (cnt2 > size) {
n1 = size - r;
n2 = cnt2 & size_mask;
@ -327,7 +327,7 @@ PlaybackBuffer<T>::read (T *dest, guint cnt, bool commit, guint offset)
if (commit) {
SpinLock sl (_reservation_lock);
read_idx.store (r);
g_atomic_int_set (&reserved, std::min (reservation, reserved.load () + to_read));
reserved.store (std::min (reservation, reserved.load () + to_read));
}
return to_read;
}

View file

@ -72,15 +72,15 @@ class /*LIBPBD_API*/ RingBufferNPT
void get_write_vector (rw_vector *);
void decrement_read_ptr (size_t cnt) {
g_atomic_int_set (&read_ptr, (read_ptr.load () - cnt) % size);
read_ptr.store ((read_ptr.load () - cnt) % size);
}
void increment_read_ptr (size_t cnt) {
g_atomic_int_set (&read_ptr, (read_ptr.load () + cnt) % size);
read_ptr.store ((read_ptr.load () + cnt) % size);
}
void increment_write_ptr (size_t cnt) {
g_atomic_int_set (&write_ptr, (write_ptr.load () + cnt) % size);
write_ptr.store ((write_ptr.load () + cnt) % size);
}
size_t write_space () {
@ -161,7 +161,7 @@ RingBufferNPT<T>::read (T *dest, size_t cnt)
priv_read_ptr = n2;
}
g_atomic_int_set (&read_ptr, priv_read_ptr);
read_ptr.store (priv_read_ptr);
return to_read;
}
@ -200,7 +200,7 @@ RingBufferNPT<T>::write (const T *src, size_t cnt)
priv_write_ptr = n2;
}
g_atomic_int_set (&write_ptr, priv_write_ptr);
write_ptr.store (priv_write_ptr);
return to_write;
}

View file

@ -105,7 +105,7 @@ class LIBPBD_API Stateful {
virtual void suspend_property_changes ();
virtual void resume_property_changes ();
bool property_changes_suspended() const { return g_atomic_int_get (const_cast<std::atomic<int>*> (&_stateful_frozen)) > 0; }
bool property_changes_suspended() const { return _stateful_frozen.load() > 0; }
protected: