sd-bus: optionally, use inotify to wait for bus sockets to appear

This adds a "watch-bind" feature to sd-bus connections. If set and the
AF_UNIX socket we are connecting to doesn't exist yet, we'll establish
an inotify watch instead, and wait for the socket to appear. In other
words, a missing AF_UNIX just makes connecting slower.

This is useful for daemons such as networkd or resolved that shall be
able to run during early-boot, before dbus-daemon is up, and want to
connect to dbus-daemon as soon as it becomes ready.
This commit is contained in:
Lennart Poettering 2017-12-15 22:24:52 +01:00
parent 5b5e6deabb
commit 8a5cd31e5f
8 changed files with 760 additions and 123 deletions

View File

@ -530,3 +530,9 @@ global:
sd_bus_message_new;
sd_bus_message_seal;
} LIBSYSTEMD_234;
LIBSYSTEMD_237 {
global:
sd_bus_set_watch_bind;
sd_bus_get_watch_bind;
} LIBSYSTEMD_236;

View File

@ -157,6 +157,7 @@ struct sd_bus_slot {
enum bus_state {
BUS_UNSET,
BUS_WATCH_BIND, /* waiting for the socket to appear via inotify */
BUS_OPENING,
BUS_AUTHENTICATING,
BUS_HELLO,
@ -188,6 +189,7 @@ struct sd_bus {
enum bus_state state;
int input_fd, output_fd;
int inotify_fd;
int message_version;
int message_endian;
@ -210,6 +212,7 @@ struct sd_bus {
bool exited:1;
bool exit_triggered:1;
bool is_local:1;
bool watch_bind:1;
int use_memfd;
@ -293,6 +296,7 @@ struct sd_bus {
sd_event_source *output_io_event_source;
sd_event_source *time_event_source;
sd_event_source *quit_event_source;
sd_event_source *inotify_event_source;
sd_event *event;
int event_priority;
@ -312,6 +316,9 @@ struct sd_bus {
LIST_HEAD(sd_bus_slot, slots);
LIST_HEAD(sd_bus_track, tracks);
int *inotify_watches;
size_t n_inotify_watches;
};
/* For method calls we time-out at 25s, like in the D-Bus reference implementation */
@ -367,6 +374,12 @@ bool bus_pid_changed(sd_bus *bus);
char *bus_address_escape(const char *v);
int bus_attach_io_events(sd_bus *b);
int bus_attach_inotify_event(sd_bus *b);
void bus_close_inotify_fd(sd_bus *b);
void bus_close_io_fds(sd_bus *b);
#define OBJECT_PATH_FOREACH_PREFIX(prefix, path) \
for (char *_slash = ({ strcpy((prefix), (path)); streq((prefix), "/") ? NULL : strrchr((prefix), '/'); }) ; \
_slash && !(_slash[(_slash) == (prefix)] = 0); \

View File

@ -32,9 +32,12 @@
#include "bus-socket.h"
#include "fd-util.h"
#include "format-util.h"
#include "fs-util.h"
#include "hexdecoct.h"
#include "io-util.h"
#include "macro.h"
#include "missing.h"
#include "path-util.h"
#include "selinux-util.h"
#include "signal-util.h"
#include "stdio-util.h"
@ -688,30 +691,249 @@ int bus_socket_start_auth(sd_bus *b) {
return bus_socket_start_auth_client(b);
}
static int bus_socket_inotify_setup(sd_bus *b) {
_cleanup_free_ int *new_watches = NULL;
_cleanup_free_ char *absolute = NULL;
size_t n_allocated = 0, n = 0, done = 0, i;
unsigned max_follow = 32;
const char *p;
int wd, r;
assert(b);
assert(b->watch_bind);
assert(b->sockaddr.sa.sa_family == AF_UNIX);
assert(b->sockaddr.un.sun_path[0] != 0);
/* Sets up an inotify fd in case watch_bind is enabled: wait until the configured AF_UNIX file system socket
* appears before connecting to it. The implemented is pretty simplistic: we just subscribe to relevant changes
* to all prefix components of the path, and every time we get an event for that we try to reconnect again,
* without actually caring what precisely the event we got told us. If we still can't connect we re-subscribe
* to all relevant changes of anything in the path, so that our watches include any possibly newly created path
* components. */
if (b->inotify_fd < 0) {
b->inotify_fd = inotify_init1(IN_NONBLOCK|IN_CLOEXEC);
if (b->inotify_fd < 0)
return -errno;
}
/* Make sure the path is NUL terminated */
p = strndupa(b->sockaddr.un.sun_path, sizeof(b->sockaddr.un.sun_path));
/* Make sure the path is absolute */
r = path_make_absolute_cwd(p, &absolute);
if (r < 0)
goto fail;
/* Watch all parent directories, and don't mind any prefix that doesn't exist yet. For the innermost directory
* that exists we want to know when files are created or moved into it. For all parents of it we just care if
* they are removed or renamed. */
if (!GREEDY_REALLOC(new_watches, n_allocated, n + 1)) {
r = -ENOMEM;
goto fail;
}
/* Start with the top-level directory, which is a bit simpler than the rest, since it can't be a symlink, and
* always exists */
wd = inotify_add_watch(b->inotify_fd, "/", IN_CREATE|IN_MOVED_TO);
if (wd < 0) {
r = log_debug_errno(errno, "Failed to add inotify watch on /: %m");
goto fail;
} else
new_watches[n++] = wd;
for (;;) {
_cleanup_free_ char *component = NULL, *prefix = NULL, *destination = NULL;
size_t n_slashes, n_component;
char *c = NULL;
n_slashes = strspn(absolute + done, "/");
n_component = n_slashes + strcspn(absolute + done + n_slashes, "/");
if (n_component == 0) /* The end */
break;
component = strndup(absolute + done, n_component);
if (!component) {
r = -ENOMEM;
goto fail;
}
/* A trailing slash? That's a directory, and not a socket then */
if (path_equal(component, "/")) {
r = -EISDIR;
goto fail;
}
/* A single dot? Let's eat this up */
if (path_equal(component, "/.")) {
done += n_component;
continue;
}
prefix = strndup(absolute, done + n_component);
if (!prefix) {
r = -ENOMEM;
goto fail;
}
if (!GREEDY_REALLOC(new_watches, n_allocated, n + 1)) {
r = -ENOMEM;
goto fail;
}
wd = inotify_add_watch(b->inotify_fd, prefix, IN_DELETE_SELF|IN_MOVE_SELF|IN_ATTRIB|IN_CREATE|IN_MOVED_TO|IN_DONT_FOLLOW);
log_debug("Added inotify watch for %s on bus %s: %i", prefix, strna(b->description), wd);
if (wd < 0) {
if (IN_SET(errno, ENOENT, ELOOP))
break; /* This component doesn't exist yet, or the path contains a cyclic symlink right now */
r = log_debug_errno(errno, "Failed to add inotify watch on %s: %m", isempty(prefix) ? "/" : prefix);
goto fail;
} else
new_watches[n++] = wd;
/* Check if this is possibly a symlink. If so, let's follow it and watch it too. */
r = readlink_malloc(prefix, &destination);
if (r == -EINVAL) { /* not a symlink */
done += n_component;
continue;
}
if (r < 0)
goto fail;
if (isempty(destination)) { /* Empty symlink target? Yuck! */
r = -EINVAL;
goto fail;
}
if (max_follow <= 0) { /* Let's make sure we don't follow symlinks forever */
r = -ELOOP;
goto fail;
}
if (path_is_absolute(destination)) {
/* For absolute symlinks we build the new path and start anew */
c = strjoin(destination, absolute + done + n_component);
done = 0;
} else {
_cleanup_free_ char *t = NULL;
/* For relative symlinks we replace the last component, and try again */
t = strndup(absolute, done);
if (!t)
return -ENOMEM;
c = strjoin(t, "/", destination, absolute + done + n_component);
}
if (!c) {
r = -ENOMEM;
goto fail;
}
free(absolute);
absolute = c;
max_follow--;
}
/* And now, let's remove all watches from the previous iteration we don't need anymore */
for (i = 0; i < b->n_inotify_watches; i++) {
bool found = false;
size_t j;
for (j = 0; j < n; j++)
if (new_watches[j] == b->inotify_watches[i]) {
found = true;
break;
}
if (found)
continue;
(void) inotify_rm_watch(b->inotify_fd, b->inotify_watches[i]);
}
free_and_replace(b->inotify_watches, new_watches);
b->n_inotify_watches = n;
return 0;
fail:
bus_close_inotify_fd(b);
return r;
}
int bus_socket_connect(sd_bus *b) {
bool inotify_done = false;
int r;
assert(b);
assert(b->input_fd < 0);
assert(b->output_fd < 0);
assert(b->sockaddr.sa.sa_family != AF_UNSPEC);
b->input_fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
if (b->input_fd < 0)
return -errno;
for (;;) {
assert(b->input_fd < 0);
assert(b->output_fd < 0);
assert(b->sockaddr.sa.sa_family != AF_UNSPEC);
b->output_fd = b->input_fd;
b->input_fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
if (b->input_fd < 0)
return -errno;
bus_socket_setup(b);
b->output_fd = b->input_fd;
bus_socket_setup(b);
r = connect(b->input_fd, &b->sockaddr.sa, b->sockaddr_size);
if (r < 0) {
if (errno == EINPROGRESS)
return 1;
if (connect(b->input_fd, &b->sockaddr.sa, b->sockaddr_size) < 0) {
if (errno == EINPROGRESS) {
return -errno;
/* If we have any inotify watches open, close them now, we don't need them anymore, as
* we have successfully initiated a connection */
bus_close_inotify_fd(b);
/* Note that very likely we are already in BUS_OPENING state here, as we enter it when
* we start parsing the address string. The only reason we set the state explicitly
* here, is to undo BUS_WATCH_BIND, in case we did the inotify magic. */
b->state = BUS_OPENING;
return 1;
}
if (IN_SET(errno, ENOENT, ECONNREFUSED) && /* ENOENT → unix socket doesn't exist at all; ECONNREFUSED → unix socket stale */
b->watch_bind &&
b->sockaddr.sa.sa_family == AF_UNIX &&
b->sockaddr.un.sun_path[0] != 0) {
/* This connection attempt failed, let's release the socket for now, and start with a
* fresh one when reconnecting. */
bus_close_io_fds(b);
if (inotify_done) {
/* inotify set up already, don't do it again, just return now, and remember
* that we are waiting for inotify events now. */
b->state = BUS_WATCH_BIND;
return 1;
}
/* This is a file system socket, and the inotify logic is enabled. Let's create the necessary inotify fd. */
r = bus_socket_inotify_setup(b);
if (r < 0)
return r;
/* Let's now try to connect a second time, because in theory there's otherwise a race
* here: the socket might have been created in the time between our first connect() and
* the time we set up the inotify logic. But let's remember that we set up inotify now,
* so that we don't do the connect() more than twice. */
inotify_done = true;
} else
return -errno;
} else
break;
}
/* Yay, established, we don't need no inotify anymore! */
bus_close_inotify_fd(b);
return bus_socket_start_auth(b);
}
@ -1069,3 +1291,34 @@ int bus_socket_process_authenticating(sd_bus *b) {
return bus_socket_read_auth(b);
}
int bus_socket_process_watch_bind(sd_bus *b) {
int r, q;
assert(b);
assert(b->state == BUS_WATCH_BIND);
assert(b->inotify_fd >= 0);
r = flush_fd(b->inotify_fd);
if (r <= 0)
return r;
log_debug("Got inotify event on bus %s.", strna(b->description));
/* We flushed events out of the inotify fd. In that case, maybe the socket is valid now? Let's try to connect
* to it again */
r = bus_socket_connect(b);
if (r < 0)
return r;
q = bus_attach_io_events(b);
if (q < 0)
return q;
q = bus_attach_inotify_event(b);
if (q < 0)
return q;
return r;
}

View File

@ -34,5 +34,6 @@ int bus_socket_read_message(sd_bus *bus);
int bus_socket_process_opening(sd_bus *b);
int bus_socket_process_authenticating(sd_bus *b);
int bus_socket_process_watch_bind(sd_bus *b);
bool bus_socket_auth_needs_write(sd_bus *b);

View File

@ -72,23 +72,33 @@
} while (false)
static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
static int attach_io_events(sd_bus *b);
static void detach_io_events(sd_bus *b);
static void bus_detach_io_events(sd_bus *b);
static void bus_detach_inotify_event(sd_bus *b);
static thread_local sd_bus *default_system_bus = NULL;
static thread_local sd_bus *default_user_bus = NULL;
static thread_local sd_bus *default_starter_bus = NULL;
static void bus_close_fds(sd_bus *b) {
void bus_close_io_fds(sd_bus *b) {
assert(b);
detach_io_events(b);
bus_detach_io_events(b);
if (b->input_fd != b->output_fd)
safe_close(b->output_fd);
b->output_fd = b->input_fd = safe_close(b->input_fd);
}
void bus_close_inotify_fd(sd_bus *b) {
assert(b);
bus_detach_inotify_event(b);
b->inotify_fd = safe_close(b->inotify_fd);
b->inotify_watches = mfree(b->inotify_watches);
b->n_inotify_watches = 0;
}
static void bus_reset_queues(sd_bus *b) {
assert(b);
@ -132,7 +142,8 @@ static void bus_free(sd_bus *b) {
if (b->default_bus_ptr)
*b->default_bus_ptr = NULL;
bus_close_fds(b);
bus_close_io_fds(b);
bus_close_inotify_fd(b);
free(b->label);
free(b->groups);
@ -182,6 +193,7 @@ _public_ int sd_bus_new(sd_bus **ret) {
r->n_ref = REFCNT_INIT;
r->input_fd = r->output_fd = -1;
r->inotify_fd = -1;
r->message_version = 1;
r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
@ -379,6 +391,22 @@ _public_ int sd_bus_get_allow_interactive_authorization(sd_bus *bus) {
return bus->allow_interactive_authorization;
}
_public_ int sd_bus_set_watch_bind(sd_bus *bus, int b) {
assert_return(bus, -EINVAL);
assert_return(bus->state == BUS_UNSET, -EPERM);
assert_return(!bus_pid_changed(bus), -ECHILD);
bus->watch_bind = b;
return 0;
}
_public_ int sd_bus_get_watch_bind(sd_bus *bus) {
assert_return(bus, -EINVAL);
assert_return(!bus_pid_changed(bus), -ECHILD);
return bus->watch_bind;
}
static int hello_callback(sd_bus_message *reply, void *userdata, sd_bus_error *error) {
const char *s;
sd_bus *bus;
@ -901,7 +929,8 @@ static int bus_start_address(sd_bus *b) {
assert(b);
for (;;) {
bus_close_fds(b);
bus_close_io_fds(b);
bus_close_inotify_fd(b);
/* If you provide multiple different bus-addresses, we
* try all of them in order and use the first one that
@ -909,20 +938,25 @@ static int bus_start_address(sd_bus *b) {
if (b->exec_path)
r = bus_socket_exec(b);
else if ((b->nspid > 0 || b->machine) && b->sockaddr.sa.sa_family != AF_UNSPEC)
r = bus_container_connect_socket(b);
else if (b->sockaddr.sa.sa_family != AF_UNSPEC)
r = bus_socket_connect(b);
else
goto next;
if (r >= 0) {
r = attach_io_events(b);
if (r >= 0)
return r;
int q;
q = bus_attach_io_events(b);
if (q < 0)
return q;
q = bus_attach_inotify_event(b);
if (q < 0)
return q;
return r;
}
b->last_connect_error = -r;
@ -1305,7 +1339,8 @@ _public_ void sd_bus_close(sd_bus *bus) {
* the bus object and the bus may be freed */
bus_reset_queues(bus);
bus_close_fds(bus);
bus_close_io_fds(bus);
bus_close_inotify_fd(bus);
}
_public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) {
@ -1322,7 +1357,7 @@ _public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) {
static void bus_enter_closing(sd_bus *bus) {
assert(bus);
if (!IN_SET(bus->state, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING))
if (!IN_SET(bus->state, BUS_WATCH_BIND, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING))
return;
bus->state = BUS_CLOSING;
@ -1973,7 +2008,16 @@ _public_ int sd_bus_get_fd(sd_bus *bus) {
assert_return(bus->input_fd == bus->output_fd, -EPERM);
assert_return(!bus_pid_changed(bus), -ECHILD);
return bus->input_fd;
if (bus->state == BUS_CLOSED)
return -ENOTCONN;
if (bus->inotify_fd >= 0)
return bus->inotify_fd;
if (bus->input_fd >= 0)
return bus->input_fd;
return -ENOTCONN;
}
_public_ int sd_bus_get_events(sd_bus *bus) {
@ -1982,23 +2026,40 @@ _public_ int sd_bus_get_events(sd_bus *bus) {
assert_return(bus, -EINVAL);
assert_return(!bus_pid_changed(bus), -ECHILD);
if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING)
switch (bus->state) {
case BUS_UNSET:
case BUS_CLOSED:
return -ENOTCONN;
if (bus->state == BUS_OPENING)
flags |= POLLOUT;
else if (bus->state == BUS_AUTHENTICATING) {
case BUS_WATCH_BIND:
flags |= POLLIN;
break;
case BUS_OPENING:
flags |= POLLOUT;
break;
case BUS_AUTHENTICATING:
if (bus_socket_auth_needs_write(bus))
flags |= POLLOUT;
flags |= POLLIN;
break;
} else if (IN_SET(bus->state, BUS_RUNNING, BUS_HELLO)) {
case BUS_RUNNING:
case BUS_HELLO:
if (bus->rqueue_size <= 0)
flags |= POLLIN;
if (bus->wqueue_size > 0)
flags |= POLLOUT;
break;
case BUS_CLOSING:
break;
default:
assert_not_reached("Unknown state");
}
return flags;
@ -2019,39 +2080,45 @@ _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
return 1;
}
if (bus->state == BUS_CLOSING) {
*timeout_usec = 0;
return 1;
}
switch (bus->state) {
if (bus->state == BUS_AUTHENTICATING) {
case BUS_AUTHENTICATING:
*timeout_usec = bus->auth_timeout;
return 1;
}
if (!IN_SET(bus->state, BUS_RUNNING, BUS_HELLO)) {
*timeout_usec = (uint64_t) -1;
return 0;
}
case BUS_RUNNING:
case BUS_HELLO:
if (bus->rqueue_size > 0) {
*timeout_usec = 0;
return 1;
}
if (bus->rqueue_size > 0) {
c = prioq_peek(bus->reply_callbacks_prioq);
if (!c) {
*timeout_usec = (uint64_t) -1;
return 0;
}
if (c->timeout == 0) {
*timeout_usec = (uint64_t) -1;
return 0;
}
*timeout_usec = c->timeout;
return 1;
case BUS_CLOSING:
*timeout_usec = 0;
return 1;
}
c = prioq_peek(bus->reply_callbacks_prioq);
if (!c) {
case BUS_WATCH_BIND:
case BUS_OPENING:
*timeout_usec = (uint64_t) -1;
return 0;
}
if (c->timeout == 0) {
*timeout_usec = (uint64_t) -1;
return 0;
default:
assert_not_reached("Unknown or unexpected stat");
}
*timeout_usec = c->timeout;
return 1;
}
static int process_timeout(sd_bus *bus) {
@ -2114,8 +2181,8 @@ static int process_timeout(sd_bus *bus) {
sd_bus_slot_unref(slot);
/* When this is the hello message and it failed, then make sure to propagate the error up, don't just log and
* ignore the callback handler's return value. */
/* When this is the hello message and it timed out, then make sure to propagate the error up, don't just log
* and ignore the callback handler's return value. */
if (is_hello)
return r;
@ -2219,8 +2286,8 @@ static int process_reply(sd_bus *bus, sd_bus_message *m) {
sd_bus_slot_unref(slot);
/* When this is the hello message and it timed out, then make sure to propagate the error up, don't just log
* and ignore the callback handler's return value. */
/* When this is the hello message and it failed, then make sure to propagate the error up, don't just log and
* ignore the callback handler's return value. */
if (is_hello)
return r;
@ -2656,48 +2723,44 @@ static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priorit
case BUS_CLOSED:
return -ECONNRESET;
case BUS_WATCH_BIND:
r = bus_socket_process_watch_bind(bus);
break;
case BUS_OPENING:
r = bus_socket_process_opening(bus);
if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) {
bus_enter_closing(bus);
r = 1;
} else if (r < 0)
return r;
if (ret)
*ret = NULL;
return r;
break;
case BUS_AUTHENTICATING:
r = bus_socket_process_authenticating(bus);
if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) {
bus_enter_closing(bus);
r = 1;
} else if (r < 0)
return r;
if (ret)
*ret = NULL;
return r;
break;
case BUS_RUNNING:
case BUS_HELLO:
r = process_running(bus, hint_priority, priority, ret);
if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) {
bus_enter_closing(bus);
r = 1;
if (r >= 0)
return r;
if (ret)
*ret = NULL;
}
return r;
/* This branch initializes *ret, hence we don't use the generic error checking below */
break;
case BUS_CLOSING:
return process_closing(bus, ret);
default:
assert_not_reached("Unknown state");
}
assert_not_reached("Unknown state");
if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) {
bus_enter_closing(bus);
r = 1;
} else if (r < 0)
return r;
if (ret)
*ret = NULL;
return r;
}
_public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
@ -2710,7 +2773,7 @@ _public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_messa
static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
struct pollfd p[2] = {};
int r, e, n;
int r, n;
struct timespec ts;
usec_t m = USEC_INFINITY;
@ -2722,45 +2785,52 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
if (!BUS_IS_OPEN(bus->state))
return -ENOTCONN;
e = sd_bus_get_events(bus);
if (e < 0)
return e;
if (bus->state == BUS_WATCH_BIND) {
assert(bus->inotify_fd >= 0);
if (need_more)
/* The caller really needs some more data, he doesn't
* care about what's already read, or any timeouts
* except its own. */
e |= POLLIN;
else {
usec_t until;
/* The caller wants to process if there's something to
* process, but doesn't care otherwise */
p[0].events = POLLIN;
p[0].fd = bus->inotify_fd;
n = 1;
} else {
int e;
r = sd_bus_get_timeout(bus, &until);
if (r < 0)
return r;
if (r > 0) {
usec_t nw;
nw = now(CLOCK_MONOTONIC);
m = until > nw ? until - nw : 0;
e = sd_bus_get_events(bus);
if (e < 0)
return e;
if (need_more)
/* The caller really needs some more data, he doesn't
* care about what's already read, or any timeouts
* except its own. */
e |= POLLIN;
else {
usec_t until;
/* The caller wants to process if there's something to
* process, but doesn't care otherwise */
r = sd_bus_get_timeout(bus, &until);
if (r < 0)
return r;
if (r > 0)
m = usec_sub_unsigned(until, now(CLOCK_MONOTONIC));
}
p[0].fd = bus->input_fd;
if (bus->output_fd == bus->input_fd) {
p[0].events = e;
n = 1;
} else {
p[0].events = e & POLLIN;
p[1].fd = bus->output_fd;
p[1].events = e & POLLOUT;
n = 2;
}
}
if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
if (timeout_usec != (uint64_t) -1 && (m == USEC_INFINITY || timeout_usec < m))
m = timeout_usec;
p[0].fd = bus->input_fd;
if (bus->output_fd == bus->input_fd) {
p[0].events = e;
n = 1;
} else {
p[0].events = e & POLLIN;
p[1].fd = bus->output_fd;
p[1].events = e & POLLOUT;
n = 2;
}
r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
r = ppoll(p, n, m == USEC_INFINITY ? NULL : timespec_store(&ts, m), NULL);
if (r < 0)
return -errno;
@ -2796,6 +2866,10 @@ _public_ int sd_bus_flush(sd_bus *bus) {
if (!BUS_IS_OPEN(bus->state))
return -ENOTCONN;
/* We never were connected? Don't hang in inotify for good, as there's no timeout set for it */
if (bus->state == BUS_WATCH_BIND)
return -EUNATCH;
r = bus_ensure_running(bus);
if (r < 0)
return r;
@ -2966,6 +3040,8 @@ static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userd
assert(bus);
/* Note that this is called both on input_fd, output_fd as well as inotify_fd events */
r = sd_bus_process(bus, NULL);
if (r < 0) {
log_debug_errno(r, "Processing of bus failed, closing down: %m");
@ -3053,7 +3129,7 @@ static int quit_callback(sd_event_source *event, void *userdata) {
return 1;
}
static int attach_io_events(sd_bus *bus) {
int bus_attach_io_events(sd_bus *bus) {
int r;
assert(bus);
@ -3107,7 +3183,7 @@ static int attach_io_events(sd_bus *bus) {
return 0;
}
static void detach_io_events(sd_bus *bus) {
static void bus_detach_io_events(sd_bus *bus) {
assert(bus);
if (bus->input_io_event_source) {
@ -3121,6 +3197,44 @@ static void detach_io_events(sd_bus *bus) {
}
}
int bus_attach_inotify_event(sd_bus *bus) {
int r;
assert(bus);
if (bus->inotify_fd < 0)
return 0;
if (!bus->event)
return 0;
if (!bus->inotify_event_source) {
r = sd_event_add_io(bus->event, &bus->inotify_event_source, bus->inotify_fd, EPOLLIN, io_callback, bus);
if (r < 0)
return r;
r = sd_event_source_set_priority(bus->inotify_event_source, bus->event_priority);
if (r < 0)
return r;
r = sd_event_source_set_description(bus->inotify_event_source, "bus-inotify");
} else
r = sd_event_source_set_io_fd(bus->inotify_event_source, bus->inotify_fd);
if (r < 0)
return r;
return 0;
}
static void bus_detach_inotify_event(sd_bus *bus) {
assert(bus);
if (bus->inotify_event_source) {
sd_event_source_set_enabled(bus->inotify_event_source, SD_EVENT_OFF);
bus->inotify_event_source = sd_event_source_unref(bus->inotify_event_source);
}
}
_public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
int r;
@ -3161,7 +3275,11 @@ _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
if (r < 0)
goto fail;
r = attach_io_events(bus);
r = bus_attach_io_events(bus);
if (r < 0)
goto fail;
r = bus_attach_inotify_event(bus);
if (r < 0)
goto fail;
@ -3178,7 +3296,8 @@ _public_ int sd_bus_detach_event(sd_bus *bus) {
if (!bus->event)
return 0;
detach_io_events(bus);
bus_detach_io_events(bus);
bus_detach_inotify_event(bus);
if (bus->time_event_source) {
sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);

View File

@ -0,0 +1,239 @@
/* SPDX-License-Identifier: LGPL-2.1+ */
/***
This file is part of systemd.
Copyright 2017 Lennart Poettering
systemd is free software; you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
systemd is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with systemd; If not, see <http://www.gnu.org/licenses/>.
***/
#include <pthread.h>
#include "sd-bus.h"
#include "sd-event.h"
#include "sd-id128.h"
#include "alloc-util.h"
#include "fd-util.h"
#include "fileio.h"
#include "fs-util.h"
#include "mkdir.h"
#include "path-util.h"
#include "random-util.h"
#include "rm-rf.h"
#include "socket-util.h"
#include "string-util.h"
static int method_foobar(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
log_info("Got Foobar() call.");
assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0) >= 0);
return sd_bus_reply_method_return(m, NULL);
}
static int method_exit(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
log_info("Got Exit() call");
assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 1) >= 0);
return sd_bus_reply_method_return(m, NULL);
}
static const sd_bus_vtable vtable[] = {
SD_BUS_VTABLE_START(0),
SD_BUS_METHOD("Foobar", NULL, NULL, method_foobar, SD_BUS_VTABLE_UNPRIVILEGED),
SD_BUS_METHOD("Exit", NULL, NULL, method_exit, SD_BUS_VTABLE_UNPRIVILEGED),
SD_BUS_VTABLE_END,
};
static void* thread_server(void *p) {
_cleanup_free_ char *suffixed = NULL, *suffixed2 = NULL, *d = NULL;
_cleanup_close_ int fd = -1;
union sockaddr_union u = {
.un.sun_family = AF_UNIX,
};
const char *path = p;
log_debug("Initializing server");
/* Let's play some games, by slowly creating the socket directory, and renaming it in the middle */
(void) usleep(100 * USEC_PER_MSEC);
assert_se(mkdir_parents(path, 0755) >= 0);
(void) usleep(100 * USEC_PER_MSEC);
d = dirname_malloc(path);
assert_se(d);
assert_se(asprintf(&suffixed, "%s.%" PRIx64, d, random_u64()) >= 0);
assert_se(rename(d, suffixed) >= 0);
(void) usleep(100 * USEC_PER_MSEC);
assert_se(asprintf(&suffixed2, "%s.%" PRIx64, d, random_u64()) >= 0);
assert_se(symlink(suffixed2, d) >= 0);
(void) usleep(100 * USEC_PER_MSEC);
assert_se(symlink(basename(suffixed), suffixed2) >= 0);
(void) usleep(100 * USEC_PER_MSEC);
strncpy(u.un.sun_path, path, sizeof(u.un.sun_path));
fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
assert_se(fd >= 0);
assert_se(bind(fd, &u.sa, SOCKADDR_UN_LEN(u.un)) >= 0);
usleep(100 * USEC_PER_MSEC);
assert_se(listen(fd, SOMAXCONN) >= 0);
usleep(100 * USEC_PER_MSEC);
assert_se(touch(path) >= 0);
usleep(100 * USEC_PER_MSEC);
log_debug("Initialized server");
for (;;) {
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
_cleanup_(sd_event_unrefp) sd_event *event = NULL;
sd_id128_t id;
int bus_fd, code;
assert_se(sd_id128_randomize(&id) >= 0);
assert_se(sd_event_new(&event) >= 0);
bus_fd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
assert_se(bus_fd >= 0);
log_debug("Accepted server connection");
assert_se(sd_bus_new(&bus) >= 0);
assert_se(sd_bus_set_description(bus, "server") >= 0);
assert_se(sd_bus_set_fd(bus, bus_fd, bus_fd) >= 0);
assert_se(sd_bus_set_server(bus, true, id) >= 0);
/* assert_se(sd_bus_set_anonymous(bus, true) >= 0); */
assert_se(sd_bus_attach_event(bus, event, 0) >= 0);
assert_se(sd_bus_add_object_vtable(bus, NULL, "/foo", "foo.TestInterface", vtable, NULL) >= 0);
assert_se(sd_bus_start(bus) >= 0);
assert_se(sd_event_loop(event) >= 0);
assert_se(sd_event_get_exit_code(event, &code) >= 0);
if (code > 0)
break;
}
log_debug("Server done");
return NULL;
}
static void* thread_client1(void *p) {
_cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
const char *path = p, *t;
int r;
log_debug("Initializing client1");
assert_se(sd_bus_new(&bus) >= 0);
assert_se(sd_bus_set_description(bus, "client1") >= 0);
t = strjoina("unix:path=", path);
assert_se(sd_bus_set_address(bus, t) >= 0);
assert_se(sd_bus_set_watch_bind(bus, true) >= 0);
assert_se(sd_bus_start(bus) >= 0);
r = sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Foobar", &error, NULL, NULL);
assert_se(r >= 0);
log_debug("Client1 done");
return NULL;
}
static int client2_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
assert_se(sd_bus_message_is_method_error(m, NULL) == 0);
assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0) >= 0);
return 0;
}
static void* thread_client2(void *p) {
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
_cleanup_(sd_event_unrefp) sd_event *event = NULL;
const char *path = p, *t;
log_debug("Initializing client2");
assert_se(sd_event_new(&event) >= 0);
assert_se(sd_bus_new(&bus) >= 0);
assert_se(sd_bus_set_description(bus, "client2") >= 0);
t = strjoina("unix:path=", path);
assert_se(sd_bus_set_address(bus, t) >= 0);
assert_se(sd_bus_set_watch_bind(bus, true) >= 0);
assert_se(sd_bus_attach_event(bus, event, 0) >= 0);
assert_se(sd_bus_start(bus) >= 0);
assert_se(sd_bus_call_method_async(bus, NULL, "foo.bar", "/foo", "foo.TestInterface", "Foobar", client2_callback, NULL, NULL) >= 0);
assert_se(sd_event_loop(event) >= 0);
log_debug("Client2 done");
return NULL;
}
static void request_exit(const char *path) {
_cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
const char *t;
assert_se(sd_bus_new(&bus) >= 0);
t = strjoina("unix:path=", path);
assert_se(sd_bus_set_address(bus, t) >= 0);
assert_se(sd_bus_set_watch_bind(bus, true) >= 0);
assert_se(sd_bus_set_description(bus, "request-exit") >= 0);
assert_se(sd_bus_start(bus) >= 0);
assert_se(sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Exit", NULL, NULL, NULL) >= 0);
}
int main(int argc, char *argv[]) {
_cleanup_(rm_rf_physical_and_freep) char *d = NULL;
pthread_t server, client1, client2;
char *path;
log_set_max_level(LOG_DEBUG);
/* We use /dev/shm here rather than /tmp, since some weird distros might set up /tmp as some weird fs that
* doesn't support inotify properly. */
assert_se(mkdtemp_malloc("/dev/shm/systemd-watch-bind-XXXXXX", &d) >= 0);
path = strjoina(d, "/this/is/a/socket");
assert_se(pthread_create(&server, NULL, thread_server, path) == 0);
assert_se(pthread_create(&client1, NULL, thread_client1, path) == 0);
assert_se(pthread_create(&client2, NULL, thread_client2, path) == 0);
assert_se(pthread_join(client1, NULL) == 0);
assert_se(pthread_join(client2, NULL) == 0);
request_exit(path);
assert_se(pthread_join(server, NULL) == 0);
return 0;
}

View File

@ -150,8 +150,10 @@ int sd_bus_set_allow_interactive_authorization(sd_bus *bus, int b);
int sd_bus_get_allow_interactive_authorization(sd_bus *bus);
int sd_bus_set_exit_on_disconnect(sd_bus *bus, int b);
int sd_bus_get_exit_on_disconnect(sd_bus *bus);
int sd_bus_set_watch_bind(sd_bus *bus, int b);
int sd_bus_get_watch_bind(sd_bus *bus);
int sd_bus_start(sd_bus *ret);
int sd_bus_start(sd_bus *bus);
int sd_bus_try_close(sd_bus *bus);
void sd_bus_close(sd_bus *bus);

View File

@ -762,6 +762,10 @@ tests += [
[],
[threads]],
[['src/libsystemd/sd-bus/test-bus-watch-bind.c'],
[],
[threads], '', 'timeout=120'],
[['src/libsystemd/sd-bus/test-bus-chat.c'],
[],
[threads]],