re-engineer CrossThreadChannel to use C level API to avoid the mess caused by https://bugzilla.gnome.org/show_bug.cgi?id=561885

This commit is contained in:
Paul Davis 2014-11-26 18:10:47 +02:00
parent a42dd40660
commit a30c10e953
5 changed files with 114 additions and 155 deletions

View file

@ -51,16 +51,12 @@ BaseUI::BaseUI (const string& str)
: m_context(MainContext::get_default())
, run_loop_thread (0)
, _name (str)
#ifndef PLATFORM_WINDOWS
, request_channel (true)
#endif
{
base_ui_instance = this;
#ifndef PLATFORM_WINDOWS
request_channel.ios()->connect (sigc::mem_fun (*this, &BaseUI::request_handler));
#endif
request_channel.set_receive_handler (sigc::mem_fun (*this, &BaseUI::request_handler));
/* derived class must set _ok */
}
@ -172,11 +168,7 @@ void
BaseUI::signal_new_request ()
{
DEBUG_TRACE (DEBUG::EventLoop, "BaseUI::signal_new_request\n");
#ifdef PLATFORM_WINDOWS
// handled in timeout, how to signal...?
#else
request_channel.wakeup ();
#endif
}
/**
@ -186,13 +178,5 @@ void
BaseUI::attach_request_source ()
{
DEBUG_TRACE (DEBUG::EventLoop, "BaseUI::attach_request_source\n");
#ifdef PLATFORM_WINDOWS
GSource* request_source = g_timeout_source_new(200);
g_source_set_callback (request_source, &BaseUI::_request_handler, this, NULL);
g_source_attach (request_source, m_context->gobj());
#else
request_channel.ios()->attach (m_context);
/* glibmm hack - drop the refptr to the IOSource now before it can hurt */
request_channel.drop_ios ();
#endif
request_channel.attach (m_context);
}

View file

@ -17,7 +17,6 @@
*/
#ifndef PLATFORM_WINDOWS
#include <cstdlib>
#include <cerrno>
@ -25,6 +24,11 @@
#include <fcntl.h>
#include <unistd.h>
#ifdef PLATFORM_WINDOWS
#include <winsock2.h>
#include <ws2tcpip.h>
#endif
#include "pbd/error.h"
#include "pbd/crossthread.h"
@ -32,84 +36,29 @@ using namespace std;
using namespace PBD;
using namespace Glib;
CrossThreadChannel::CrossThreadChannel (bool non_blocking)
{
fds[0] = -1;
fds[1] = -1;
if (pipe (fds)) {
error << "cannot create x-thread pipe for read (%2)" << ::strerror (errno) << endmsg;
return;
}
if (non_blocking) {
if (fcntl (fds[0], F_SETFL, O_NONBLOCK)) {
error << "cannot set non-blocking mode for x-thread pipe (read) (" << ::strerror (errno) << ')' << endmsg;
return;
}
if (fcntl (fds[1], F_SETFL, O_NONBLOCK)) {
error << "cannot set non-blocking mode for x-thread pipe (write) (%2)" << ::strerror (errno) << ')' << endmsg;
return;
}
}
}
CrossThreadChannel::~CrossThreadChannel ()
{
/* glibmm hack */
drop_ios ();
if (fds[0] >= 0) {
close (fds[0]);
fds[0] = -1;
}
if (fds[1] >= 0) {
close (fds[1]);
fds[1] = -1;
}
}
void
CrossThreadChannel::wakeup ()
{
char c = 0;
(void) ::write (fds[1], &c, 1);
}
RefPtr<IOSource>
CrossThreadChannel::ios ()
{
if (!_ios) {
_ios = IOSource::create (fds[0], IOCondition(IO_IN|IO_PRI|IO_ERR|IO_HUP|IO_NVAL));
}
return _ios;
}
void
CrossThreadChannel::drop_ios ()
{
_ios.clear ();
}
void
CrossThreadChannel::drain ()
{
char buf[64];
while (::read (fds[0], buf, sizeof (buf)) > 0) {};
}
int
CrossThreadChannel::deliver (char msg)
{
return ::write (fds[1], &msg, 1);
}
int
CrossThreadChannel::receive (char& msg)
{
return ::read (fds[0], &msg, 1);
}
#ifndef PLATFORM_WINDOWS
#include "crossthread.posix.cc"
#else
#include "crossthread.win.cc"
#endif
gboolean
cross_thread_channel_call_receive_slot (GIOChannel*, GIOCondition condition, void *data)
{
CrossThreadChannel* ctc = static_cast<CrossThreadChannel*>(data);
return ctc->receive_slot (Glib::IOCondition (condition));
}
void
CrossThreadChannel::set_receive_handler (sigc::slot<bool,Glib::IOCondition> s)
{
receive_slot = s;
}
void
CrossThreadChannel::attach (Glib::RefPtr<Glib::MainContext> context)
{
receive_source = g_io_create_watch (receive_channel, GIOCondition(G_IO_IN|G_IO_PRI|G_IO_ERR|G_IO_HUP|G_IO_NVAL));
g_source_set_callback (receive_source, (GSourceFunc) cross_thread_channel_call_receive_slot, this, NULL);
g_source_attach (receive_source, context->gobj());
}

View file

@ -0,0 +1,68 @@
CrossThreadChannel::CrossThreadChannel (bool non_blocking)
: receive_channel (0)
{
fds[0] = -1;
fds[1] = -1;
if (pipe (fds)) {
error << "cannot create x-thread pipe for read (%2)" << ::strerror (errno) << endmsg;
return;
}
if (non_blocking) {
if (fcntl (fds[0], F_SETFL, O_NONBLOCK)) {
error << "cannot set non-blocking mode for x-thread pipe (read) (" << ::strerror (errno) << ')' << endmsg;
return;
}
if (fcntl (fds[1], F_SETFL, O_NONBLOCK)) {
error << "cannot set non-blocking mode for x-thread pipe (write) (%2)" << ::strerror (errno) << ')' << endmsg;
return;
}
}
receive_channel = g_io_channel_unix_new (fds[0]);
}
CrossThreadChannel::~CrossThreadChannel ()
{
if (receive_channel) {
g_io_channel_unref (receive_channel);
}
if (fds[0] >= 0) {
close (fds[0]);
fds[0] = -1;
}
if (fds[1] >= 0) {
close (fds[1]);
fds[1] = -1;
}
}
void
CrossThreadChannel::wakeup ()
{
char c = 0;
(void) ::write (fds[1], &c, 1);
}
void
CrossThreadChannel::drain ()
{
char buf[64];
while (::read (fds[0], buf, sizeof (buf)) > 0) {};
}
int
CrossThreadChannel::deliver (char msg)
{
return ::write (fds[1], &msg, 1);
}
int
CrossThreadChannel::receive (char& msg)
{
return ::read (fds[0], &msg, 1);
}

View file

@ -17,31 +17,11 @@
*/
#ifdef PLATFORM_WINDOWS
#include <cstdlib>
#include <cerrno>
#include <cstring>
#include <fcntl.h>
#include <unistd.h>
#include <csignal> // or signal.h if C code
#include <winsock2.h>
#include <ws2tcpip.h>
#include "pbd/error.h"
#include "pbd/crossthread.h"
using namespace std;
using namespace PBD;
using namespace Glib;
CrossThreadChannel::CrossThreadChannel (bool non_blocking)
: _ios()
, _send_socket()
, _receive_socket()
, _p_recv_channel(0)
, receive_channel(0)
{
WSADATA wsaData;
@ -107,14 +87,14 @@ CrossThreadChannel::CrossThreadChannel (bool non_blocking)
}
// construct IOChannel
_p_recv_channel = g_io_channel_win32_new_socket((gint)_receive_socket);
receive_channel = g_io_channel_win32_new_socket((gint)_receive_socket);
int flags = G_IO_FLAG_APPEND;
if (non_blocking) {
flags |= G_IO_FLAG_NONBLOCK;
}
GIOStatus g_status = g_io_channel_set_flags(_p_recv_channel, (GIOFlags)flags,
GIOStatus g_status = g_io_channel_set_flags(receive_channel, (GIOFlags)flags,
NULL);
if (G_IO_STATUS_NORMAL != g_status ) {
@ -127,7 +107,7 @@ CrossThreadChannel::~CrossThreadChannel ()
{
/* glibmm hack */
drop_ios ();
delete _p_recv_channel;
delete receive_channel;
closesocket(_send_socket);
closesocket(_receive_socket);
WSACleanup();
@ -142,22 +122,6 @@ CrossThreadChannel::wakeup ()
sendto(_send_socket, &c, sizeof(c), 0, (SOCKADDR*)&_recv_address, sizeof(_recv_address) );
}
RefPtr<IOSource>
CrossThreadChannel::ios ()
{
if (!_ios) {
_ios = IOSource::create (wrap(_p_recv_channel), IOCondition(IO_IN|IO_PRI|IO_ERR|IO_HUP|IO_NVAL));
}
return _ios;
}
void
CrossThreadChannel::drop_ios ()
{
_ios.reset ();
}
void
CrossThreadChannel::drain ()
{
@ -166,7 +130,7 @@ CrossThreadChannel::drain ()
gchar* buffer;
gsize read = 0;
g_io_channel_read_to_end (_p_recv_channel, &buffer, &read, &g_error);
g_io_channel_read_to_end (receive_channel, &buffer, &read, &g_error);
g_free(buffer);
}
@ -192,7 +156,7 @@ CrossThreadChannel::receive (char& msg)
GError *g_error = 0;
// fetch the message from the channel.
GIOStatus g_status = g_io_channel_read_chars (_p_recv_channel, &msg, sizeof(msg), &read, &g_error);
GIOStatus g_status = g_io_channel_read_chars (receive_channel, &msg, sizeof(msg), &read, &g_error);
if (G_IO_STATUS_NORMAL != g_status) {
read = -1;
@ -200,5 +164,3 @@ CrossThreadChannel::receive (char& msg)
return read;
}
#endif

View file

@ -79,25 +79,21 @@ class LIBPBD_API CrossThreadChannel {
*/
void drain ();
/* glibmm 2.22 and earlier has a terrifying bug that will
cause crashes whenever a Source is removed from
a MainContext (including the destruction of the MainContext),
because the Source is destroyed "out from under the nose of"
the RefPtr. I (Paul) have fixed this (https://bugzilla.gnome.org/show_bug.cgi?id=561885)
but in the meantime, we need a hack to get around the issue.
*/
Glib::RefPtr<Glib::IOSource> ios();
void set_receive_handler (sigc::slot<bool,Glib::IOCondition> s);
void attach (Glib::RefPtr<Glib::MainContext>);
void drop_ios ();
private:
Glib::RefPtr<Glib::IOSource> _ios; // lazily constructed
friend gboolean cross_thread_channel_call_receive_slot (GIOChannel*, GIOCondition condition, void *data);
GIOChannel* receive_channel;
GSource* receive_source;
sigc::slot<bool,Glib::IOCondition> receive_slot;
#ifndef PLATFORM_WINDOWS
int fds[2]; // current implementation uses a pipe/fifo
#else
SOCKET _send_socket;
SOCKET _receive_socket;
GIOChannel* _p_recv_channel;
struct sockaddr_in _recv_address;
#endif