bus: introduce concept of a default bus for each thread and make use of it everywhere

We want to emphasize bus connections as per-thread communication
primitives, hence introduce a concept of a per-thread default bus, and
make use of it everywhere.
This commit is contained in:
Lennart Poettering 2013-11-11 22:00:48 +01:00
parent afc6adb5ec
commit 76b543756e
14 changed files with 94 additions and 20 deletions

View File

@ -579,7 +579,7 @@ static int connect_bus(Context *c, sd_event *event, sd_bus **_bus) {
assert(event);
assert(_bus);
r = sd_bus_open_system(&bus);
r = sd_bus_default_system(&bus);
if (r < 0) {
log_error("Failed to get system bus connection: %s", strerror(-r));
return r;

View File

@ -746,7 +746,7 @@ static int get_virtualization(char **v) {
char *b;
int r;
r = sd_bus_open_system(&bus);
r = sd_bus_default_system(&bus);
if (r < 0)
return r;

View File

@ -249,6 +249,9 @@ struct sd_bus {
sd_event *event;
sd_bus_message *current;
sd_bus **default_bus_ptr;
pid_t tid;
};
#define BUS_DEFAULT_TIMEOUT ((usec_t) (25 * USEC_PER_SEC))

View File

@ -944,9 +944,9 @@ int bus_open_transport(BusTransport transport, const char *host, bool user, sd_b
case BUS_TRANSPORT_LOCAL:
if (user)
r = sd_bus_open_user(bus);
r = sd_bus_default_user(bus);
else
r = sd_bus_open_system(bus);
r = sd_bus_default_system(bus);
break;

View File

@ -12,6 +12,8 @@ global:
/* Same order as in sd-bus.h should be used */
/* Connections */
sd_bus_default_user;
sd_bus_default_system;
sd_bus_open_user;
sd_bus_open_system;
sd_bus_open_system_remote;
@ -49,6 +51,7 @@ global:
sd_bus_wait;
sd_bus_flush;
sd_bus_get_current;
sd_bus_get_tid;
sd_bus_attach_event;
sd_bus_detach_event;
sd_bus_add_filter;

View File

@ -2370,16 +2370,21 @@ _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
int r;
assert_return(bus, -EINVAL);
assert_return(event, -EINVAL);
assert_return(!bus->event, -EBUSY);
assert(!bus->input_io_event_source);
assert(!bus->output_io_event_source);
assert(!bus->time_event_source);
bus->event = sd_event_ref(event);
if (event)
bus->event = sd_event_ref(event);
else {
r = sd_event_default(&bus->event);
if (r < 0)
return r;
}
r = sd_event_add_io(event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
if (r < 0)
goto fail;
@ -2388,7 +2393,7 @@ _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
goto fail;
if (bus->output_fd != bus->input_fd) {
r = sd_event_add_io(event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
if (r < 0)
goto fail;
@ -2401,7 +2406,7 @@ _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
if (r < 0)
goto fail;
r = sd_event_add_monotonic(event, 0, 0, time_callback, bus, &bus->time_event_source);
r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
if (r < 0)
goto fail;
@ -2409,7 +2414,7 @@ _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
if (r < 0)
goto fail;
r = sd_event_add_quit(event, quit_callback, bus, &bus->quit_event_source);
r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
if (r < 0)
goto fail;
@ -2442,8 +2447,63 @@ _public_ int sd_bus_detach_event(sd_bus *bus) {
return 0;
}
sd_bus_message* sd_bus_get_current(sd_bus *bus) {
_public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
assert_return(bus, NULL);
return bus->current;
}
static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
sd_bus *b = NULL;
int r;
assert(bus_open);
assert(default_bus);
if (!ret)
return !!*default_bus;
if (*default_bus) {
*ret = sd_bus_ref(*default_bus);
return 0;
}
r = bus_open(&b);
if (r < 0)
return r;
b->default_bus_ptr = default_bus;
b->tid = gettid();
*default_bus = b;
*ret = b;
return 1;
}
_public_ int sd_bus_default_system(sd_bus **ret) {
static __thread sd_bus *default_system_bus = NULL;
return bus_default(sd_bus_open_system, &default_system_bus, ret);
}
_public_ int sd_bus_default_user(sd_bus **ret) {
static __thread sd_bus *default_user_bus = NULL;
return bus_default(sd_bus_open_user, &default_user_bus, ret);
}
_public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
assert_return(b, -EINVAL);
assert_return(tid, -EINVAL);
assert_return(!bus_pid_changed(b), -ECHILD);
if (b->tid != 0) {
*tid = b->tid;
return 0;
}
if (b->event)
return sd_event_get_tid(b->event, tid);
return -ENXIO;
}

View File

@ -1906,8 +1906,12 @@ _public_ int sd_event_default(sd_event **ret) {
_public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
assert_return(e, -EINVAL);
assert_return(tid, -EINVAL);
assert_return(e->tid != 0, -ENXIO);
assert_return(!event_pid_changed(e), -ECHILD);
*tid = e->tid;
return 0;
if (e->tid != 0) {
*tid = e->tid;
return 0;
}
return -ENXIO;
}

View File

@ -1061,7 +1061,7 @@ static int connect_bus(Context *c, sd_event *event, sd_bus **_bus) {
assert(event);
assert(_bus);
r = sd_bus_open_system(&bus);
r = sd_bus_default_system(&bus);
if (r < 0) {
log_error("Failed to get system bus connection: %s", strerror(-r));
return r;

View File

@ -233,7 +233,7 @@ int main(int argc, char *argv[]) {
if (r == 0)
return EXIT_SUCCESS;
r = sd_bus_open_system(&bus);
r = sd_bus_default_system(&bus);
if (r < 0) {
log_error("Failed to connect to bus: %s", strerror(-r));
return EXIT_FAILURE;

View File

@ -605,7 +605,7 @@ static int manager_connect_bus(Manager *m) {
assert(m);
assert(!m->bus);
r = sd_bus_open_system(&m->bus);
r = sd_bus_default_system(&m->bus);
if (r < 0) {
log_error("Failed to connect to system bus: %s", strerror(-r));
return r;

View File

@ -129,7 +129,7 @@ static int manager_connect_bus(Manager *m) {
assert(m);
assert(!m->bus);
r = sd_bus_open_system(&m->bus);
r = sd_bus_default_system(&m->bus);
if (r < 0) {
log_error("Failed to connect to system bus: %s", strerror(-r));
return r;

View File

@ -965,7 +965,7 @@ static int terminate_machine(pid_t pid) {
const char *path;
int r;
r = sd_bus_open_system(&bus);
r = sd_bus_default_system(&bus);
if (r < 0) {
log_error("Failed to open system bus: %s", strerror(-r));
return r;

View File

@ -57,6 +57,9 @@ typedef int (*sd_bus_node_enumerator_t) (sd_bus *bus, const char *path, char ***
/* Connections */
int sd_bus_default_user(sd_bus **ret);
int sd_bus_default_system(sd_bus **ret);
int sd_bus_open_user(sd_bus **ret);
int sd_bus_open_system(sd_bus **ret);
int sd_bus_open_system_remote(const char *host, sd_bus **ret);
@ -101,6 +104,7 @@ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec);
int sd_bus_flush(sd_bus *bus);
sd_bus_message* sd_bus_get_current(sd_bus *bus);
int sd_bus_get_tid(sd_bus *bus, pid_t *tid);
int sd_bus_attach_event(sd_bus *bus, sd_event *e, int priority);
int sd_bus_detach_event(sd_bus *bus);

View File

@ -795,7 +795,7 @@ static int connect_bus(Context *c, sd_event *event, sd_bus **_bus) {
assert(event);
assert(_bus);
r = sd_bus_open_system(&bus);
r = sd_bus_default_system(&bus);
if (r < 0) {
log_error("Failed to get system bus connection: %s", strerror(-r));
return r;