From 5b9e4fff634e1d5052cf78375f1387bfceca4cbd Mon Sep 17 00:00:00 2001 From: Robin Gareus Date: Tue, 30 Apr 2024 03:46:39 +0200 Subject: [PATCH] Parallelize Disk I/O and RegionFx processing --- libs/ardour/ardour/debug.h | 1 + libs/ardour/ardour/disk_reader.h | 6 +- libs/ardour/ardour/io_tasklist.h | 18 ++- libs/ardour/ardour/rc_configuration_vars.h | 1 + libs/ardour/ardour/utils.h | 1 + libs/ardour/butler.cc | 5 + libs/ardour/debug.cc | 1 + libs/ardour/disk_reader.cc | 6 +- libs/ardour/globals.cc | 9 +- libs/ardour/io_tasklist.cc | 126 ++++++++++++++++++++- libs/ardour/session.cc | 5 +- libs/ardour/session_state.cc | 1 - libs/ardour/utils.cc | 19 ++++ libs/pbd/pbd/pthread_utils.h | 3 +- libs/pbd/pthread_utils.cc | 7 ++ 15 files changed, 189 insertions(+), 20 deletions(-) diff --git a/libs/ardour/ardour/debug.h b/libs/ardour/ardour/debug.h index db63e17756..18aaa74ae7 100644 --- a/libs/ardour/ardour/debug.h +++ b/libs/ardour/ardour/debug.h @@ -58,6 +58,7 @@ namespace PBD { LIBARDOUR_API extern DebugBits FaderPort; LIBARDOUR_API extern DebugBits GenericMidi; LIBARDOUR_API extern DebugBits Graph; + LIBARDOUR_API extern DebugBits IOTaskList; LIBARDOUR_API extern DebugBits LTC; LIBARDOUR_API extern DebugBits LV2; LIBARDOUR_API extern DebugBits LV2Automate; diff --git a/libs/ardour/ardour/disk_reader.h b/libs/ardour/ardour/disk_reader.h index ed240c744a..63797e8685 100644 --- a/libs/ardour/ardour/disk_reader.h +++ b/libs/ardour/ardour/disk_reader.h @@ -229,9 +229,9 @@ private: int channel, bool reversed); - static Sample* _sum_buffer; - static Sample* _mixdown_buffer; - static gain_t* _gain_buffer; + static thread_local Sample* _sum_buffer; + static thread_local Sample* _mixdown_buffer; + static thread_local gain_t* _gain_buffer; int refill (Sample* sum_buffer, Sample* mixdown_buffer, float* gain_buffer, samplecnt_t fill_level, bool reversed); int refill_audio (Sample* sum_buffer, Sample* mixdown_buffer, float* gain_buffer, samplecnt_t fill_level, bool reversed); diff --git a/libs/ardour/ardour/io_tasklist.h b/libs/ardour/ardour/io_tasklist.h index daac29fdb0..7758b6c340 100644 --- a/libs/ardour/ardour/io_tasklist.h +++ b/libs/ardour/ardour/io_tasklist.h @@ -19,8 +19,12 @@ #ifndef _ardour_io_tasklist_h_ #define _ardour_io_tasklist_h_ +#include #include #include +#include + +#include "pbd/semutils.h" #include "ardour/libardour_visibility.h" @@ -30,7 +34,7 @@ namespace ARDOUR class LIBARDOUR_API IOTaskList { public: - IOTaskList (); + IOTaskList (uint32_t); ~IOTaskList (); /** process tasks in list in parallel, wait for them to complete */ @@ -38,9 +42,19 @@ public: void push_back (boost::function fn); private: + static void* _worker_thread (void*); + + void io_thread (); + std::vector> _tasks; - size_t _n_threads; + uint32_t _n_threads; + std::atomic _n_workers; + std::vector _workers; + std::atomic _terminate; + PBD::Semaphore _exec_sem; + PBD::Semaphore _idle_sem; + Glib::Threads::Mutex _tasks_mutex; }; } // namespace ARDOUR diff --git a/libs/ardour/ardour/rc_configuration_vars.h b/libs/ardour/ardour/rc_configuration_vars.h index d6f35e1807..9b3d4f6809 100644 --- a/libs/ardour/ardour/rc_configuration_vars.h +++ b/libs/ardour/ardour/rc_configuration_vars.h @@ -221,6 +221,7 @@ CONFIG_VARIABLE (std::string, sample_lib_path, "sample-lib-path", "") /* custom CONFIG_VARIABLE (bool, allow_special_bus_removal, "allow-special-bus-removal", false) CONFIG_VARIABLE (int32_t, processor_usage, "processor-usage", -1) CONFIG_VARIABLE (int32_t, cpu_dma_latency, "cpu-dma-latency", -1) /* >=0 to enable */ +CONFIG_VARIABLE (int32_t, io_thread_count, "io-thread-count", -2) CONFIG_VARIABLE (gain_t, max_gain, "max-gain", 2.0) /* +6.0dB */ CONFIG_VARIABLE (uint32_t, max_recent_sessions, "max-recent-sessions", 10) CONFIG_VARIABLE (uint32_t, max_recent_templates, "max-recent-templates", 10) diff --git a/libs/ardour/ardour/utils.h b/libs/ardour/ardour/utils.h index 0030ddead8..22a99c758f 100644 --- a/libs/ardour/ardour/utils.h +++ b/libs/ardour/ardour/utils.h @@ -109,6 +109,7 @@ LIBARDOUR_API const char* native_header_format_extension (ARDOUR::HeaderFormat, LIBARDOUR_API bool matching_unsuffixed_filename_exists_in (const std::string& dir, const std::string& name); LIBARDOUR_API uint32_t how_many_dsp_threads (); +LIBARDOUR_API uint32_t how_many_io_threads (); LIBARDOUR_API std::string compute_sha1_of_file (std::string path); diff --git a/libs/ardour/butler.cc b/libs/ardour/butler.cc index 039d1aef00..6c954e63b4 100644 --- a/libs/ardour/butler.cc +++ b/libs/ardour/butler.cc @@ -172,9 +172,14 @@ Butler::_thread_work (void* arg) { SessionEvent::create_per_thread_pool ("butler events", 4096); pthread_set_name (X_("butler")); + /* get thread buffers for RegionFx */ ARDOUR::ProcessThread* pt = new ProcessThread (); pt->get_buffers (); + DiskReader::allocate_working_buffers (); + void* rv = ((Butler*)arg)->thread_work (); + + DiskReader::free_working_buffers (); pt->drop_buffers (); delete pt; return rv; diff --git a/libs/ardour/debug.cc b/libs/ardour/debug.cc index e061c2840d..99a8dbce20 100644 --- a/libs/ardour/debug.cc +++ b/libs/ardour/debug.cc @@ -53,6 +53,7 @@ PBD::DebugBits PBD::DEBUG::FaderPort = PBD::new_debug_bit ("faderport"); PBD::DebugBits PBD::DEBUG::FaderPort8 = PBD::new_debug_bit ("faderport8"); PBD::DebugBits PBD::DEBUG::GenericMidi = PBD::new_debug_bit ("genericmidi"); PBD::DebugBits PBD::DEBUG::Graph = PBD::new_debug_bit ("graph"); +PBD::DebugBits PBD::DEBUG::IOTaskList = PBD::new_debug_bit ("iotasklist"); PBD::DebugBits PBD::DEBUG::LTC = PBD::new_debug_bit ("ltc"); PBD::DebugBits PBD::DEBUG::LV2 = PBD::new_debug_bit ("lv2"); PBD::DebugBits PBD::DEBUG::LV2Automate = PBD::new_debug_bit ("lv2automate"); diff --git a/libs/ardour/disk_reader.cc b/libs/ardour/disk_reader.cc index d672bf3da4..bf040cf8b3 100644 --- a/libs/ardour/disk_reader.cc +++ b/libs/ardour/disk_reader.cc @@ -49,9 +49,9 @@ using namespace std; ARDOUR::samplecnt_t DiskReader::_chunk_samples = default_chunk_samples (); PBD::Signal0 DiskReader::Underrun; -Sample* DiskReader::_sum_buffer = 0; -Sample* DiskReader::_mixdown_buffer = 0; -gain_t* DiskReader::_gain_buffer = 0; +thread_local Sample* DiskReader::_sum_buffer = 0; +thread_local Sample* DiskReader::_mixdown_buffer = 0; +thread_local gain_t* DiskReader::_gain_buffer = 0; std::atomic DiskReader::_no_disk_output (0); DiskReader::Declicker DiskReader::loop_declick_in; DiskReader::Declicker DiskReader::loop_declick_out; diff --git a/libs/ardour/globals.cc b/libs/ardour/globals.cc index 49afbe5cf6..3b8c4889aa 100644 --- a/libs/ardour/globals.cc +++ b/libs/ardour/globals.cc @@ -734,11 +734,14 @@ ARDOUR::init (bool try_optimization, const char* localedir, bool with_gui) * each cycle). Session Export uses one, and the GUI requires * buffers (for plugin-analysis, auditioner updates) but not * concurrently. - * Last but not least, the butler needs one for RegionFX. * - * In theory (hw + 4) should be sufficient, let's add one for luck. + * Last but not least, the butler needs one for RegionFX for + * each I/O thread (up to hardware_concurrency) and one for itself + * (butler's main thread). + * + * In theory (2 * hw + 4) should be sufficient, let's add one for luck. */ - BufferManager::init (hardware_concurrency () + 5); + BufferManager::init (hardware_concurrency () * 2 + 5); PannerManager::instance ().discover_panners (); diff --git a/libs/ardour/io_tasklist.cc b/libs/ardour/io_tasklist.cc index 8af4a7f4e2..ae86e9a640 100644 --- a/libs/ardour/io_tasklist.cc +++ b/libs/ardour/io_tasklist.cc @@ -16,19 +16,72 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +#ifdef HAVE_IOPRIO +#include +#endif + +#include "pbd/compose.h" +#include "pbd/cpus.h" +#include "pbd/debug.h" +#include "pbd/failed_constructor.h" #include "pbd/pthread_utils.h" +#include "temporal/tempo.h" + +#include "ardour/debug.h" +#include "ardour/disk_reader.h" #include "ardour/io_tasklist.h" +#include "ardour/process_thread.h" +#include "ardour/session_event.h" using namespace ARDOUR; -IOTaskList::IOTaskList () - : _n_threads (0) +IOTaskList::IOTaskList (uint32_t n_threads) + : _n_threads (n_threads) + , _terminate (false) + , _exec_sem ("io thread exec", 0) + , _idle_sem ("io thread idle", 0) { + assert (n_threads <= hardware_concurrency ()); + + if (n_threads < 2) { + return; + } + + pthread_attr_t attr; + struct sched_param parm; + parm.sched_priority = pbd_absolute_rt_priority (SCHED_RR, pbd_pthread_priority (THREAD_IO)); + + pthread_attr_init (&attr); +#ifdef PLATFORM_WINDOWS + pthread_attr_setschedpolicy (&attr, SCHED_OTHER); +#else + pthread_attr_setschedpolicy (&attr, SCHED_RR); +#endif + pthread_attr_setschedparam (&attr, &parm); + pthread_attr_setscope (&attr, PTHREAD_SCOPE_SYSTEM); + pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED); + + DEBUG_TRACE (PBD::DEBUG::IOTaskList, string_compose ("IOTaskList starting %1 threads with priority = %2\n", _n_threads, parm.sched_priority)); + + _workers.resize (_n_threads); + for (uint32_t i = 0; i < _n_threads; ++i) { + if (pthread_create (&_workers[i], &attr, &_worker_thread, this)) { + throw failed_constructor (); + } + } + pthread_attr_destroy (&attr); } IOTaskList::~IOTaskList () { + _terminate.store (true); + for (size_t i = 0; i < _workers.size (); ++i) { + _exec_sem.signal (); + } + for (auto const& t : _workers) { + pthread_join (t, NULL); + } } void @@ -41,13 +94,78 @@ void IOTaskList::process () { assert (strcmp (pthread_name (), "butler") == 0); - //std::cout << "IOTaskList::process " << pthread_name () << " " << _tasks.size () << "\n"; if (_n_threads > 1 && _tasks.size () > 2) { - // TODO + uint32_t wakeup = std::min (_n_threads, _tasks.size ()); + DEBUG_TRACE (PBD::DEBUG::IOTaskList, string_compose ("IOTaskList process wakeup %1 thread for %2 tasks.\n", wakeup, _tasks.size ())) + for (uint32_t i = 0; i < wakeup; ++i) { + _exec_sem.signal (); + } + for (uint32_t i = 0; i < wakeup; ++i) { + _idle_sem.wait (); + } } else { + DEBUG_TRACE (PBD::DEBUG::IOTaskList, string_compose ("IOTaskList process %1 task(s) in main thread.\n", _tasks.size ())) for (auto const& fn : _tasks) { fn (); } } _tasks.clear (); } + +void* +IOTaskList::_worker_thread (void* me) +{ + IOTaskList* self = static_cast (me); + + uint32_t id = self->_n_workers.fetch_add (1); + char name[64]; + snprintf (name, 64, "IO-%u-%p", id, (void*)DEBUG_THREAD_SELF); + pthread_set_name (name); + + SessionEvent::create_per_thread_pool (name, 64); + PBD::notify_event_loops_about_thread_creation (pthread_self (), name, 64); + + DiskReader::allocate_working_buffers (); + ARDOUR::ProcessThread* pt = new ProcessThread (); + pt->get_buffers (); + +#ifdef HAVE_IOPRIO + /* compare to Butler::_thread_work */ + // ioprio_set (IOPRIO_WHO_PROCESS, 0 /*calling thread*/, IOPRIO_PRIO_VALUE (IOPRIO_CLASS_RT, 4)) + syscall (SYS_ioprio_set, 1, 0, (1 << 13) | 4); +#endif + + self->io_thread (); + + DiskReader::free_working_buffers (); + pt->drop_buffers (); + delete pt; + return 0; +} + +void +IOTaskList::io_thread () +{ + while (1) { + _exec_sem.wait (); + if (_terminate.load ()) { + break; + } + + Temporal::TempoMap::fetch (); + + while (1) { + boost::function fn; + Glib::Threads::Mutex::Lock lm (_tasks_mutex); + if (_tasks.empty ()) { + break; + } + fn = _tasks.back (); + _tasks.pop_back (); + lm.release (); + + fn (); + } + _idle_sem.signal (); + } +} diff --git a/libs/ardour/session.cc b/libs/ardour/session.cc index af1438cbcf..8dab2704e3 100644 --- a/libs/ardour/session.cc +++ b/libs/ardour/session.cc @@ -590,7 +590,8 @@ Session::immediately_post_engine () _process_graph.reset (new Graph (*this)); _rt_tasklist.reset (new RTTaskList (_process_graph)); - _io_tasklist.reset (new IOTaskList ()); + + _io_tasklist.reset (new IOTaskList (how_many_io_threads ())); /* every time we reconnect, recompute worst case output latencies */ @@ -775,8 +776,6 @@ Session::destroy () _bundles.flush (); _io_plugins.flush (); - DiskReader::free_working_buffers(); - /* tell everyone who is still standing that we're about to die */ drop_references (); diff --git a/libs/ardour/session_state.cc b/libs/ardour/session_state.cc index a74ec00194..9f0bcd9ddf 100644 --- a/libs/ardour/session_state.cc +++ b/libs/ardour/session_state.cc @@ -272,7 +272,6 @@ Session::post_engine_init () _engine.GraphReordered.connect_same_thread (*this, boost::bind (&Session::graph_reordered, this, true)); _engine.MidiSelectionPortsChanged.connect_same_thread (*this, boost::bind (&Session::rewire_midi_selection_ports, this)); - DiskReader::allocate_working_buffers(); refresh_disk_space (); /* we're finally ready to call set_state() ... all objects have diff --git a/libs/ardour/utils.cc b/libs/ardour/utils.cc index 9e06652fc4..25b31f4984 100644 --- a/libs/ardour/utils.cc +++ b/libs/ardour/utils.cc @@ -696,6 +696,25 @@ ARDOUR::how_many_dsp_threads () return num_threads; } +uint32_t +ARDOUR::how_many_io_threads () +{ + int num_cpu = hardware_concurrency(); + int pu = Config->get_io_thread_count (); + uint32_t num_threads = max (num_cpu - 2, 2); + if (pu < 0) { + if (-pu < num_cpu) { + num_threads = num_cpu + pu; + } + } else if (pu == 0) { + num_threads = num_cpu; + + } else { + num_threads = min (num_cpu, pu); + } + return num_threads; +} + double ARDOUR::gain_to_slider_position_with_max (double g, double max_gain) { diff --git a/libs/pbd/pbd/pthread_utils.h b/libs/pbd/pbd/pthread_utils.h index ed6628123b..e1b18f2603 100644 --- a/libs/pbd/pbd/pthread_utils.h +++ b/libs/pbd/pbd/pthread_utils.h @@ -68,7 +68,8 @@ LIBPBD_API void pthread_set_name (const char* name); enum PBDThreadClass { THREAD_MAIN, // main audio I/O thread THREAD_MIDI, // MIDI I/O threads - THREAD_PROC // realtime worker + THREAD_PROC, // realtime worker + THREAD_IO // non-realtime I/O }; LIBPBD_API int pbd_pthread_priority (PBDThreadClass); diff --git a/libs/pbd/pthread_utils.cc b/libs/pbd/pthread_utils.cc index 2387bd947f..052d035535 100644 --- a/libs/pbd/pthread_utils.cc +++ b/libs/pbd/pthread_utils.cc @@ -280,6 +280,11 @@ pbd_pthread_priority (PBDThreadClass which) default: case THREAD_PROC: return -2; + case THREAD_IO: + /* https://github.com/mingw-w64/mingw-w64/blob/master/mingw-w64-libraries/winpthreads/src/sched.c + * -> THREAD_PRIORITY_HIGHEST + */ + return -13; } #else int base = -20; @@ -299,6 +304,8 @@ pbd_pthread_priority (PBDThreadClass which) default: case THREAD_PROC: return base - 2; + case THREAD_IO: + return base - 10; } #endif }