diff --git a/libs/ardour/ardour/rt_tasklist.h b/libs/ardour/ardour/rt_tasklist.h index 2ba78e9c82..373e9e7d61 100644 --- a/libs/ardour/ardour/rt_tasklist.h +++ b/libs/ardour/ardour/rt_tasklist.h @@ -19,30 +19,29 @@ #ifndef _ardour_rt_tasklist_h_ #define _ardour_rt_tasklist_h_ -#include #include +#include -#include "pbd/semutils.h" #include "pbd/g_atomic_compat.h" +#include "pbd/mpmc_queue.h" +#include "pbd/semutils.h" -#include "ardour/libardour_visibility.h" -#include "ardour/types.h" #include "ardour/audio_backend.h" +#include "ardour/libardour_visibility.h" #include "ardour/session_handle.h" +#include "ardour/types.h" -namespace ARDOUR { - +namespace ARDOUR +{ class LIBARDOUR_API RTTaskList { public: RTTaskList (); ~RTTaskList (); - // TODO use dedicated allocator of a boost::intrusive::list - typedef std::list > TaskList; - /** process tasks in list in parallel, wait for them to complete */ - void process (TaskList const&); + void process (); + void push_back (boost::function fn); private: GATOMIC_QUAL gint _threads_active; @@ -50,18 +49,18 @@ private: void reset_thread_list (); void drop_threads (); - - void process_tasklist (); - - static void* _thread_run (void *arg); void run (); - Glib::Threads::Mutex _process_mutex; - Glib::Threads::Mutex _tasklist_mutex; + static void* _thread_run (void* arg); + PBD::Semaphore _task_run_sem; PBD::Semaphore _task_end_sem; - TaskList _tasklist; + size_t _n_tasks; + size_t _m_tasks; + size_t _queue_size; + + PBD::MPMCQueue> _tasks; }; } // namespace ARDOUR diff --git a/libs/ardour/port_manager.cc b/libs/ardour/port_manager.cc index ff8fe7e270..6bb74f786a 100644 --- a/libs/ardour/port_manager.cc +++ b/libs/ardour/port_manager.cc @@ -1112,15 +1112,18 @@ PortManager::cycle_start (pframes_t nframes, Session* s) * A single external source-port may be connected to many ardour * input-ports. Currently re-sampling is per input. */ - if (s && s->rt_tasklist () && fabs (Port::speed_ratio ()) != 1.0) { - RTTaskList::TaskList tl; + boost::shared_ptr tl; + if (s) { + tl = s->rt_tasklist (); + } + if (tl && fabs (Port::speed_ratio ()) != 1.0) { for (Ports::iterator p = _cycle_ports->begin (); p != _cycle_ports->end (); ++p) { if (!(p->second->flags () & TransportSyncPort)) { - tl.push_back (boost::bind (&Port::cycle_start, p->second, nframes)); + tl->push_back (boost::bind (&Port::cycle_start, p->second, nframes)); } } - tl.push_back (boost::bind (&PortManager::run_input_meters, this, nframes, s ? s->nominal_sample_rate () : 0)); - s->rt_tasklist ()->process (tl); + tl->push_back (boost::bind (&PortManager::run_input_meters, this, nframes, s ? s->nominal_sample_rate () : 0)); + tl->process (); } else { for (Ports::iterator p = _cycle_ports->begin (); p != _cycle_ports->end (); ++p) { if (!(p->second->flags () & TransportSyncPort)) { @@ -1135,14 +1138,17 @@ void PortManager::cycle_end (pframes_t nframes, Session* s) { // see optimzation note in ::cycle_start() - if (0 && s && s->rt_tasklist () && fabs (Port::speed_ratio ()) != 1.0) { - RTTaskList::TaskList tl; + boost::shared_ptr tl; + if (s) { + tl = s->rt_tasklist (); + } + if (tl && fabs (Port::speed_ratio ()) != 1.0) { for (Ports::iterator p = _cycle_ports->begin (); p != _cycle_ports->end (); ++p) { if (!(p->second->flags () & TransportSyncPort)) { - tl.push_back (boost::bind (&Port::cycle_end, p->second, nframes)); + tl->push_back (boost::bind (&Port::cycle_end, p->second, nframes)); } } - s->rt_tasklist ()->process (tl); + tl->process (); } else { for (Ports::iterator p = _cycle_ports->begin (); p != _cycle_ports->end (); ++p) { if (!(p->second->flags () & TransportSyncPort)) { @@ -1250,14 +1256,17 @@ void PortManager::cycle_end_fade_out (gain_t base_gain, gain_t gain_step, pframes_t nframes, Session* s) { // see optimzation note in ::cycle_start() - if (0 && s && s->rt_tasklist () && fabs (Port::speed_ratio ()) != 1.0) { - RTTaskList::TaskList tl; + boost::shared_ptr tl; + if (s) { + tl = s->rt_tasklist (); + } + if (tl && fabs (Port::speed_ratio ()) != 1.0) { for (Ports::iterator p = _cycle_ports->begin (); p != _cycle_ports->end (); ++p) { if (!(p->second->flags () & TransportSyncPort)) { - tl.push_back (boost::bind (&Port::cycle_end, p->second, nframes)); + tl->push_back (boost::bind (&Port::cycle_end, p->second, nframes)); } } - s->rt_tasklist ()->process (tl); + tl->process (); } else { for (Ports::iterator p = _cycle_ports->begin (); p != _cycle_ports->end (); ++p) { if (!(p->second->flags () & TransportSyncPort)) { diff --git a/libs/ardour/rt_tasklist.cc b/libs/ardour/rt_tasklist.cc index 448ef6d81d..9a9b4c41c5 100644 --- a/libs/ardour/rt_tasklist.cc +++ b/libs/ardour/rt_tasklist.cc @@ -18,6 +18,7 @@ #include +#include "pbd/g_atomic_compat.h" #include "pbd/pthread_utils.h" #include "ardour/audioengine.h" @@ -32,6 +33,10 @@ using namespace ARDOUR; RTTaskList::RTTaskList () : _task_run_sem ("rt_task_run", 0) , _task_end_sem ("rt_task_done", 0) + , _n_tasks (0) + , _m_tasks (0) + , _queue_size (1024) + , _tasks (_queue_size) { g_atomic_int_set (&_threads_active, 0); reset_thread_list (); @@ -45,15 +50,14 @@ RTTaskList::~RTTaskList () void RTTaskList::drop_threads () { - Glib::Threads::Mutex::Lock pm (_process_mutex); g_atomic_int_set (&_threads_active, 0); uint32_t nt = _threads.size (); for (uint32_t i = 0; i < nt; ++i) { _task_run_sem.signal (); } - for (std::vector::const_iterator i = _threads.begin (); i != _threads.end (); ++i) { - pthread_join (*i, NULL); + for (auto const& i : _threads) { + pthread_join (i, NULL); } _threads.clear (); _task_run_sem.reset (); @@ -61,9 +65,9 @@ RTTaskList::drop_threads () } /*static*/ void* -RTTaskList::_thread_run (void *arg) +RTTaskList::_thread_run (void* arg) { - RTTaskList *d = static_cast(arg); + RTTaskList* d = static_cast (arg); char name[64]; snprintf (name, 64, "RTTask-%p", (void*)DEBUG_THREAD_SELF); @@ -84,20 +88,18 @@ RTTaskList::reset_thread_list () return; } - Glib::Threads::Mutex::Lock pm (_process_mutex); - g_atomic_int_set (&_threads_active, 1); for (uint32_t i = 0; i < num_threads; ++i) { + int rv = 1; pthread_t thread_id; - int rv = 1; - if (AudioEngine::instance()->is_realtime ()) { - rv = pbd_realtime_pthread_create (PBD_SCHED_FIFO, AudioEngine::instance()->client_real_time_priority(), PBD_RT_STACKSIZE_HELP, &thread_id, _thread_run, this); + if (AudioEngine::instance ()->is_realtime ()) { + rv = pbd_realtime_pthread_create (PBD_SCHED_FIFO, AudioEngine::instance ()->client_real_time_priority (), PBD_RT_STACKSIZE_HELP, &thread_id, _thread_run, this); } if (rv) { rv = pbd_pthread_create (PBD_RT_STACKSIZE_HELP, &thread_id, _thread_run, this); } if (rv) { - PBD::fatal << _("Cannot create thread for TaskList!") << " (" << strerror(rv) << ")" << endmsg; + PBD::fatal << _("Cannot create thread for TaskList!") << " (" << strerror (rv) << ")" << endmsg; /* NOT REACHED */ } pbd_mach_set_realtime_policy (thread_id, 5. * 1e-5, false); @@ -108,7 +110,6 @@ RTTaskList::reset_thread_list () void RTTaskList::run () { - Glib::Threads::Mutex::Lock tm (_tasklist_mutex, Glib::Threads::NOT_LOCK); bool wait = true; while (true) { @@ -124,14 +125,7 @@ RTTaskList::run () wait = false; boost::function to_run; - tm.acquire (); - if (!_tasklist.empty ()) { - to_run = _tasklist.front(); - _tasklist.pop_front (); - } - tm.release (); - - if (!to_run.empty ()) { + if (_tasks.pop_front (to_run)) { to_run (); continue; } @@ -145,42 +139,31 @@ RTTaskList::run () } void -RTTaskList::process (TaskList const& tl) +RTTaskList::push_back (boost::function fn) { - Glib::Threads::Mutex::Lock pm (_process_mutex); - -#ifndef NDEBUG - /* must not be called while processing is already running */ - Glib::Threads::Mutex::Lock tm (_tasklist_mutex, Glib::Threads::NOT_LOCK); - tm.acquire (); - assert (_tasklist.empty ()); - tm.release (); -#endif - - _tasklist = tl; - process_tasklist (); - -#ifndef NDEBUG - /* ensure that all tasks are processed, and threads are in wait state */ - tm.acquire (); - assert (_tasklist.empty ()); - tm.release (); -#endif + if (!_tasks.push_back (fn)) { + fn (); + } else { + ++_n_tasks; + } + ++_m_tasks; } void -RTTaskList::process_tasklist () +RTTaskList::process () { if (0 == g_atomic_int_get (&_threads_active) || _threads.size () == 0) { - - for (TaskList::iterator i = _tasklist.begin (); i != _tasklist.end(); ++i) { - (*i)(); + boost::function to_run; + while (_tasks.pop_front (to_run)) { + to_run (); + --_n_tasks; } - _tasklist.clear (); + assert (_n_tasks == 0); + _n_tasks = 0; return; } - uint32_t nt = std::min (_threads.size (), _tasklist.size ()); + uint32_t nt = std::min (_threads.size (), _n_tasks); for (uint32_t i = 0; i < nt; ++i) { _task_run_sem.signal (); @@ -188,4 +171,12 @@ RTTaskList::process_tasklist () for (uint32_t i = 0; i < nt; ++i) { _task_end_sem.wait (); } + + /* re-allocate queue if needed */ + if (_m_tasks >= _queue_size) { + _queue_size = _tasks.power_of_two_size (_m_tasks + 1); + _tasks.reserve (_queue_size); + } + _n_tasks = 0; + _m_tasks = 0; }