delegate signal emission to dedicated thread.

This works around abysmal performance (~.15ms) of
boost::function and boost::bind (in PBD::Signal).

The overall load is probably higher but the realtime
thread remains unaffected.
This commit is contained in:
Robin Gareus 2015-04-28 22:18:30 +02:00
parent 1df7b4ffb6
commit f410705c3a
7 changed files with 174 additions and 14 deletions

View file

@ -72,6 +72,7 @@ public:
*/
void reflect_inputs (const ChanCount& in);
void emit_configuration_changed ();
/** Compute peaks */
void run (BufferSet& bufs, framepos_t start_frame, framepos_t end_frame, pframes_t nframes, bool);

View file

@ -182,7 +182,8 @@ class LIBARDOUR_API Route : public SessionObject, public Automatable, public Rou
bool denormal_protection() const;
void set_meter_point (MeterPoint, bool force = false);
void apply_processor_changes_rt ();
bool apply_processor_changes_rt ();
void emit_pending_signals ();
MeterPoint meter_point() const { return _pending_meter_point; }
void meter ();
@ -522,8 +523,16 @@ class LIBARDOUR_API Route : public SessionObject, public Automatable, public Rou
boost::shared_ptr<MonitorProcessor> _monitor_control;
boost::shared_ptr<Pannable> _pannable;
enum {
EmitNone = 0x00,
EmitMeterChanged = 0x01,
EmitMeterVisibilityChange = 0x02,
EmitRtProcessorChange = 0x04
};
ProcessorList _pending_processor_order;
gint _pending_process_reorder; // atomic
gint _pending_signals; // atomic
Flag _flags;
int _pending_declick;
@ -604,7 +613,7 @@ class LIBARDOUR_API Route : public SessionObject, public Automatable, public Rou
bool _initial_io_setup;
int configure_processors_unlocked (ProcessorStreams*);
void set_meter_point_unlocked ();
bool set_meter_point_unlocked ();
void apply_processor_order (const ProcessorList& new_order);
std::list<std::pair<ChanCount, ChanCount> > try_configure_processors (ChanCount, ProcessorStreams *);

View file

@ -1267,6 +1267,21 @@ class LIBARDOUR_API Session : public PBD::StatefulDestructible, public PBD::Scop
void *do_work();
/* Signal Forwarding */
void emit_route_signals () const;
void emit_thread_run ();
static void *emit_thread (void *);
void emit_thread_start ();
void emit_thread_terminate ();
pthread_t _rt_emit_thread;
bool _rt_thread_active;
pthread_mutex_t _rt_emit_mutex;
pthread_cond_t _rt_emit_cond;
bool _rt_emit_pending;
/* SessionEventManager interface */
void process_event (SessionEvent*);

View file

@ -209,7 +209,12 @@ PeakMeter::reflect_inputs (const ChanCount& in)
current_meters = in;
reset_max();
ConfigurationChanged (in, in); /* EMIT SIGNAL */
// ConfigurationChanged() postponed
}
void
PeakMeter::emit_configuration_changed () {
ConfigurationChanged (current_meters, current_meters); /* EMIT SIGNAL */
}
void

View file

@ -3382,15 +3382,21 @@ Route::flush_processors ()
#ifdef __clang__
__attribute__((annotate("realtime")))
#endif
void
bool
Route::apply_processor_changes_rt ()
{
int emissions = EmitNone;
if (_pending_meter_point != _meter_point) {
Glib::Threads::RWLock::WriterLock pwl (_processor_lock, Glib::Threads::TRY_LOCK);
if (pwl.locked()) {
/* meters always have buffers for 'processor_max_streams'
* they can be re-positioned without re-allocation */
set_meter_point_unlocked();
if (set_meter_point_unlocked()) {
emissions |= EmitMeterChanged | EmitMeterVisibilityChange;;
} else {
emissions |= EmitMeterChanged;
}
}
}
@ -3401,15 +3407,38 @@ Route::apply_processor_changes_rt ()
if (pwl.locked()) {
apply_processor_order (_pending_processor_order);
setup_invisible_processors ();
changed = true;
g_atomic_int_set (&_pending_process_reorder, 0);
emissions |= EmitRtProcessorChange;
}
}
if (changed) {
processors_changed (RouteProcessorChange (RouteProcessorChange::RealTimeChange)); /* EMIT SIGNAL */
set_processor_positions ();
}
if (emissions != 0) {
g_atomic_int_set (&_pending_signals, emissions);
return true;
}
return false;
}
void
Route::emit_pending_signals ()
{
int sig = g_atomic_int_and (&_pending_signals, 0);
if (sig & EmitMeterChanged) {
_meter->emit_configuration_changed();
meter_change (); /* EMIT SIGNAL */
if (sig & EmitMeterVisibilityChange) {
processors_changed (RouteProcessorChange (RouteProcessorChange::MeterPointChange, true)); /* EMIT SIGNAL */
} else {
processors_changed (RouteProcessorChange (RouteProcessorChange::MeterPointChange, false)); /* EMIT SIGNAL */
}
}
if (sig & EmitRtProcessorChange) {
processors_changed (RouteProcessorChange (RouteProcessorChange::RealTimeChange)); /* EMIT SIGNAL */
}
}
void
@ -3423,7 +3452,13 @@ Route::set_meter_point (MeterPoint p, bool force)
Glib::Threads::Mutex::Lock lx (AudioEngine::instance()->process_lock ());
Glib::Threads::RWLock::WriterLock lm (_processor_lock);
_pending_meter_point = p;
set_meter_point_unlocked();
_meter->emit_configuration_changed();
meter_change (); /* EMIT SIGNAL */
if (set_meter_point_unlocked()) {
processors_changed (RouteProcessorChange (RouteProcessorChange::MeterPointChange, true)); /* EMIT SIGNAL */
} else {
processors_changed (RouteProcessorChange (RouteProcessorChange::MeterPointChange, false)); /* EMIT SIGNAL */
}
} else {
_pending_meter_point = p;
}
@ -3433,7 +3468,7 @@ Route::set_meter_point (MeterPoint p, bool force)
#ifdef __clang__
__attribute__((annotate("realtime")))
#endif
void
bool
Route::set_meter_point_unlocked ()
{
#ifndef NDEBUG
@ -3502,9 +3537,7 @@ Route::set_meter_point_unlocked ()
* but all those signals are subscribed to with gui_thread()
* so we're safe.
*/
meter_change (); /* EMIT SIGNAL */
bool const meter_visibly_changed = (_meter->display_to_user() != meter_was_visible_to_user);
processors_changed (RouteProcessorChange (RouteProcessorChange::MeterPointChange, meter_visibly_changed)); /* EMIT SIGNAL */
return (_meter->display_to_user() != meter_was_visible_to_user);
}
void

View file

@ -212,6 +212,8 @@ Session::Session (AudioEngine &eng,
, rf_scale (1.0)
, _locations (new Locations (*this))
, _ignore_skips_updates (false)
, _rt_thread_active (false)
, _rt_emit_pending (false)
, step_speed (0)
, outbound_mtc_timecode_frame (0)
, next_quarter_frame_to_send (-1)
@ -275,6 +277,9 @@ Session::Session (AudioEngine &eng,
{
uint32_t sr = 0;
pthread_mutex_init (&_rt_emit_mutex, 0);
pthread_cond_init (&_rt_emit_cond, 0);
pre_engine_init (fullpath);
if (_is_new) {
@ -353,6 +358,8 @@ Session::Session (AudioEngine &eng,
_is_new = false;
emit_thread_start ();
/* hook us up to the engine since we are now completely constructed */
BootMessage (_("Connect to engine"));
@ -570,6 +577,11 @@ Session::destroy ()
/* not strictly necessary, but doing it here allows the shared_ptr debugging to work */
playlists.reset ();
emit_thread_terminate ();
pthread_cond_destroy (&_rt_emit_cond);
pthread_mutex_destroy (&_rt_emit_mutex);
delete _scene_changer; _scene_changer = 0;
delete midi_control_ui; midi_control_ui = 0;

View file

@ -75,7 +75,7 @@ Session::process (pframes_t nframes)
(this->*process_function) (nframes);
/* realtime-safe meter-position changes
/* realtime-safe meter-position and processor-order changes
*
* ideally this would be done in
* Route::process_output_buffers() but various functions
@ -83,7 +83,19 @@ Session::process (pframes_t nframes)
*/
boost::shared_ptr<RouteList> r = routes.reader ();
for (RouteList::const_iterator i = r->begin(); i != r->end(); ++i) {
(*i)->apply_processor_changes_rt();
if ((*i)->apply_processor_changes_rt()) {
_rt_emit_pending = true;
}
}
if (_rt_emit_pending) {
if (!_rt_thread_active) {
emit_route_signals ();
}
if (pthread_mutex_trylock (&_rt_emit_mutex) == 0) {
pthread_cond_signal (&_rt_emit_cond);
pthread_mutex_unlock (&_rt_emit_mutex);
_rt_emit_pending = false;
}
}
_engine.main_thread()->drop_buffers ();
@ -1226,3 +1238,76 @@ Session::compute_stop_limit () const
return current_end_frame ();
}
/* dedicated thread for signal emission.
*
* while sending cross-thread signals from the process thread
* is fine in general, PBD::Signal's use of boost::function and
* boost:bind can produce a vast overhead which is not
* acceptable for low latency.
*
* This works around the issue by moving the boost overhead
* out of the RT thread. The overall load is probably higher but
* the realtime thread remains unaffected.
*/
void
Session::emit_route_signals () const
{
boost::shared_ptr<RouteList> r = routes.reader ();
for (RouteList::const_iterator ci = r->begin(); ci != r->end(); ++ci) {
(*ci)->emit_pending_signals ();
}
}
void
Session::emit_thread_start ()
{
if (_rt_thread_active) {
return;
}
_rt_thread_active = true;
if (pthread_create (&_rt_emit_thread, NULL, emit_thread, this)) {
_rt_thread_active = false;
}
}
void
Session::emit_thread_terminate ()
{
if (!_rt_thread_active) {
return;
}
_rt_thread_active = false;
if (pthread_mutex_lock (&_rt_emit_mutex) == 0) {
pthread_cond_signal (&_rt_emit_cond);
pthread_mutex_unlock (&_rt_emit_mutex);
}
void *status;
pthread_join (_rt_emit_thread, &status);
}
void *
Session::emit_thread (void *arg)
{
Session *s = static_cast<Session *>(arg);
s->emit_thread_run ();
pthread_exit (0);
return 0;
}
void
Session::emit_thread_run ()
{
pthread_mutex_lock (&_rt_emit_mutex);
while (_rt_thread_active) {
emit_route_signals();
pthread_cond_wait (&_rt_emit_cond, &_rt_emit_mutex);
}
pthread_mutex_unlock (&_rt_emit_mutex);
}