Merge pull request #11472 from poettering/sd-bus-ref-tweak

try harder to detect when a bus and its queued messages are fully unreffed and free everything then
This commit is contained in:
Zbigniew Jędrzejewski-Szmek 2019-03-01 17:52:57 +01:00 committed by GitHub
commit bdc8f36bfb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 187 additions and 56 deletions

View file

@ -219,11 +219,11 @@ struct sd_bus {
size_t rbuffer_size;
sd_bus_message **rqueue;
unsigned rqueue_size;
size_t rqueue_size;
size_t rqueue_allocated;
sd_bus_message **wqueue;
unsigned wqueue_size;
size_t wqueue_size;
size_t windex;
size_t wqueue_allocated;

View file

@ -118,7 +118,8 @@ static sd_bus_message* message_free(sd_bus_message *m) {
message_reset_parts(m);
sd_bus_unref(m->bus);
/* Note that we don't unref m->bus here. That's already done by sd_bus_message_unref() as each user
* reference to the bus message also is considered a reference to the bus connection itself. */
if (m->free_fds) {
close_many(m->fds, m->n_fds);
@ -136,8 +137,6 @@ static sd_bus_message* message_free(sd_bus_message *m) {
return mfree(m);
}
DEFINE_TRIVIAL_CLEANUP_FUNC(sd_bus_message*, message_free);
static void *message_extend_fields(sd_bus_message *m, size_t align, size_t sz, bool add_offset) {
void *op, *np;
size_t old_size, new_size, start;
@ -459,7 +458,6 @@ int bus_message_from_header(
if (!m)
return -ENOMEM;
m->n_ref = 1;
m->sealed = true;
m->header = header;
m->header_accessible = header_accessible;
@ -513,7 +511,9 @@ int bus_message_from_header(
m->creds.mask |= SD_BUS_CREDS_SELINUX_CONTEXT;
}
m->n_ref = 1;
m->bus = sd_bus_ref(bus);
*ret = TAKE_PTR(m);
return 0;
@ -528,7 +528,7 @@ int bus_message_from_malloc(
const char *label,
sd_bus_message **ret) {
_cleanup_(message_freep) sd_bus_message *m = NULL;
_cleanup_(sd_bus_message_unrefp) sd_bus_message *m = NULL;
size_t sz;
int r;
@ -585,13 +585,13 @@ _public_ int sd_bus_message_new(
return -ENOMEM;
t->n_ref = 1;
t->bus = sd_bus_ref(bus);
t->header = (struct bus_header*) ((uint8_t*) t + ALIGN(sizeof(struct sd_bus_message)));
t->header->endian = BUS_NATIVE_ENDIAN;
t->header->type = type;
t->header->version = bus->message_version;
t->allow_fds = bus->can_fds || !IN_SET(bus->state, BUS_HELLO, BUS_RUNNING);
t->root_container.need_offsets = BUS_MESSAGE_IS_GVARIANT(t);
t->bus = sd_bus_ref(bus);
if (bus->allow_interactive_authorization)
t->header->flags |= BUS_MESSAGE_ALLOW_INTERACTIVE_AUTHORIZATION;
@ -647,7 +647,7 @@ _public_ int sd_bus_message_new_method_call(
const char *interface,
const char *member) {
_cleanup_(message_freep) sd_bus_message *t = NULL;
_cleanup_(sd_bus_message_unrefp) sd_bus_message *t = NULL;
int r;
assert_return(bus, -ENOTCONN);
@ -692,7 +692,7 @@ static int message_new_reply(
uint8_t type,
sd_bus_message **m) {
_cleanup_(message_freep) sd_bus_message *t = NULL;
_cleanup_(sd_bus_message_unrefp) sd_bus_message *t = NULL;
uint64_t cookie;
int r;
@ -743,7 +743,7 @@ _public_ int sd_bus_message_new_method_error(
sd_bus_message **m,
const sd_bus_error *e) {
_cleanup_(message_freep) sd_bus_message *t = NULL;
_cleanup_(sd_bus_message_unrefp) sd_bus_message *t = NULL;
int r;
assert_return(sd_bus_error_is_set(e), -EINVAL);
@ -846,7 +846,7 @@ int bus_message_new_synthetic_error(
const sd_bus_error *e,
sd_bus_message **m) {
_cleanup_(message_freep) sd_bus_message *t = NULL;
_cleanup_(sd_bus_message_unrefp) sd_bus_message *t = NULL;
int r;
assert(bus);
@ -890,7 +890,80 @@ int bus_message_new_synthetic_error(
return 0;
}
DEFINE_PUBLIC_TRIVIAL_REF_UNREF_FUNC(sd_bus_message, sd_bus_message, message_free);
_public_ sd_bus_message* sd_bus_message_ref(sd_bus_message *m) {
if (!m)
return NULL;
/* We are fine if this message so far was either explicitly reffed or not reffed but queued into at
* least one bus connection object. */
assert(m->n_ref > 0 || m->n_queued > 0);
m->n_ref++;
/* Each user reference to a bus message shall also be considered a ref on the bus */
sd_bus_ref(m->bus);
return m;
}
_public_ sd_bus_message* sd_bus_message_unref(sd_bus_message *m) {
if (!m)
return NULL;
assert(m->n_ref > 0);
sd_bus_unref(m->bus); /* Each regular ref is also a ref on the bus connection. Let's hence drop it
* here. Note we have to do this before decrementing our own n_ref here, since
* otherwise, if this message is currently queued sd_bus_unref() might call
* bus_message_unref_queued() for this which might then destroy the message
* while we are still processing it. */
m->n_ref--;
if (m->n_ref > 0 || m->n_queued > 0)
return NULL;
/* Unset the bus field if neither the user has a reference nor this message is queued. We are careful
* to reset the field only after the last reference to the bus is dropped, after all we might keep
* multiple references to the bus, once for each reference kept on outselves. */
m->bus = NULL;
return message_free(m);
}
sd_bus_message* bus_message_ref_queued(sd_bus_message *m, sd_bus *bus) {
if (!m)
return NULL;
/* If this is a different bus than the message is associated with, then implicitly turn this into a
* regular reference. This means that you can create a memory leak by enqueuing a message generated
* on one bus onto another at the same time as enqueueing a message from the second one on the first,
* as we'll not detect the cyclic references there. */
if (bus != m->bus)
return sd_bus_message_ref(m);
assert(m->n_ref > 0 || m->n_queued > 0);
m->n_queued++;
return m;
}
sd_bus_message* bus_message_unref_queued(sd_bus_message *m, sd_bus *bus) {
if (!m)
return NULL;
if (bus != m->bus)
return sd_bus_message_unref(m);
assert(m->n_queued > 0);
m->n_queued--;
if (m->n_ref > 0 || m->n_queued > 0)
return NULL;
m->bus = NULL;
return message_free(m);
}
_public_ int sd_bus_message_get_type(sd_bus_message *m, uint8_t *type) {
assert_return(m, -EINVAL);

View file

@ -48,7 +48,16 @@ struct bus_body_part {
};
struct sd_bus_message {
unsigned n_ref;
/* Caveat: a message can be referenced in two different ways: the main (user-facing) way will also
* pin the bus connection object the message is associated with. The secondary way ("queued") is used
* when a message is in the read or write queues of the bus connection object, which will not pin the
* bus connection object. This is necessary so that we don't have to have a pair of cyclic references
* between a message that is queued and its connection: as soon as a message is only referenced by
* the connection (by means of being queued) and the connection itself has no other references it
* will be freed. */
unsigned n_ref; /* Counter of references that pin the connection */
unsigned n_queued; /* Counter of references that do not pin the connection */
sd_bus *bus;
@ -211,3 +220,6 @@ int bus_message_remarshal(sd_bus *bus, sd_bus_message **m);
void bus_message_set_sender_driver(sd_bus *bus, sd_bus_message *m);
void bus_message_set_sender_local(sd_bus *bus, sd_bus_message *m);
sd_bus_message* bus_message_ref_queued(sd_bus_message *m, sd_bus *bus);
sd_bus_message* bus_message_unref_queued(sd_bus_message *m, sd_bus *bus);

View file

@ -1110,8 +1110,10 @@ static int bus_socket_make_message(sd_bus *bus, size_t size) {
bus->fds = NULL;
bus->n_fds = 0;
if (t)
bus->rqueue[bus->rqueue_size++] = t;
if (t) {
bus->rqueue[bus->rqueue_size++] = bus_message_ref_queued(t, bus);
sd_bus_message_unref(t);
}
return 1;
}

View file

@ -146,13 +146,13 @@ static void bus_reset_queues(sd_bus *b) {
assert(b);
while (b->rqueue_size > 0)
sd_bus_message_unref(b->rqueue[--b->rqueue_size]);
bus_message_unref_queued(b->rqueue[--b->rqueue_size], b);
b->rqueue = mfree(b->rqueue);
b->rqueue_allocated = 0;
while (b->wqueue_size > 0)
sd_bus_message_unref(b->wqueue[--b->wqueue_size]);
bus_message_unref_queued(b->wqueue[--b->wqueue_size], b);
b->wqueue = mfree(b->wqueue);
b->wqueue_allocated = 0;
@ -248,12 +248,12 @@ _public_ int sd_bus_new(sd_bus **ret) {
.close_on_exit = true,
};
assert_se(pthread_mutex_init(&b->memfd_cache_mutex, NULL) == 0);
/* We guarantee that wqueue always has space for at least one entry */
if (!GREEDY_REALLOC(b->wqueue, b->wqueue_allocated, 1))
return -ENOMEM;
assert_se(pthread_mutex_init(&b->memfd_cache_mutex, NULL) == 0);
*ret = TAKE_PTR(b);
return 0;
}
@ -493,7 +493,7 @@ static int synthesize_connected_signal(sd_bus *bus) {
/* Insert at the very front */
memmove(bus->rqueue + 1, bus->rqueue, sizeof(sd_bus_message*) * bus->rqueue_size);
bus->rqueue[0] = TAKE_PTR(m);
bus->rqueue[0] = bus_message_ref_queued(m, bus);
bus->rqueue_size++;
return 0;
@ -1811,7 +1811,7 @@ static int dispatch_wqueue(sd_bus *bus) {
* anyway. */
bus->wqueue_size--;
sd_bus_message_unref(bus->wqueue[0]);
bus_message_unref_queued(bus->wqueue[0], bus);
memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
bus->windex = 0;
@ -1840,6 +1840,15 @@ int bus_rqueue_make_room(sd_bus *bus) {
return 0;
}
static void rqueue_drop_one(sd_bus *bus, size_t i) {
assert(bus);
assert(i < bus->rqueue_size);
bus_message_unref_queued(bus->rqueue[i], bus);
memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
bus->rqueue_size--;
}
static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **m) {
int r, ret = 0;
@ -1854,10 +1863,8 @@ static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd
for (;;) {
if (bus->rqueue_size > 0) {
/* Dispatch a queued message */
*m = bus->rqueue[0];
bus->rqueue_size--;
memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
*m = sd_bus_message_ref(bus->rqueue[0]);
rqueue_drop_one(bus, 0);
return 1;
}
@ -1865,8 +1872,10 @@ static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd
r = bus_read_message(bus, hint_priority, priority);
if (r < 0)
return r;
if (r == 0)
if (r == 0) {
*m = NULL;
return ret;
}
ret = 1;
}
@ -1933,7 +1942,7 @@ _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie) {
* of the wqueue array is always allocated so
* that we always can remember how much was
* written. */
bus->wqueue[0] = sd_bus_message_ref(m);
bus->wqueue[0] = bus_message_ref_queued(m, bus);
bus->wqueue_size = 1;
bus->windex = idx;
}
@ -1947,7 +1956,7 @@ _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie) {
if (!GREEDY_REALLOC(bus->wqueue, bus->wqueue_allocated, bus->wqueue_size + 1))
return -ENOMEM;
bus->wqueue[bus->wqueue_size++] = sd_bus_message_ref(m);
bus->wqueue[bus->wqueue_size++] = bus_message_ref_queued(m, bus);
}
finish:
@ -2125,7 +2134,7 @@ _public_ int sd_bus_call(
_cleanup_(sd_bus_message_unrefp) sd_bus_message *m = sd_bus_message_ref(_m);
usec_t timeout;
uint64_t cookie;
unsigned i;
size_t i;
int r;
bus_assert_return(m, -EINVAL, error);
@ -2167,37 +2176,30 @@ _public_ int sd_bus_call(
usec_t left;
while (i < bus->rqueue_size) {
sd_bus_message *incoming = NULL;
_cleanup_(sd_bus_message_unrefp) sd_bus_message *incoming = NULL;
incoming = bus->rqueue[i];
incoming = sd_bus_message_ref(bus->rqueue[i]);
if (incoming->reply_cookie == cookie) {
/* Found a match! */
memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
bus->rqueue_size--;
rqueue_drop_one(bus, i);
log_debug_bus_message(incoming);
if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
if (incoming->n_fds <= 0 || bus->accept_fd) {
if (reply)
*reply = incoming;
else
sd_bus_message_unref(incoming);
*reply = TAKE_PTR(incoming);
return 1;
}
r = sd_bus_error_setf(error, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Reply message contained file descriptors which I couldn't accept. Sorry.");
sd_bus_message_unref(incoming);
return r;
return sd_bus_error_setf(error, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Reply message contained file descriptors which I couldn't accept. Sorry.");
} else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
r = sd_bus_error_copy(error, &incoming->error);
sd_bus_message_unref(incoming);
return r;
} else {
} else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
return sd_bus_error_copy(error, &incoming->error);
else {
r = -EIO;
goto fail;
}
@ -2207,15 +2209,11 @@ _public_ int sd_bus_call(
incoming->sender &&
streq(bus->unique_name, incoming->sender)) {
memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
bus->rqueue_size--;
rqueue_drop_one(bus, i);
/* Our own message? Somebody is trying
* to send its own client a message,
* let's not dead-lock, let's fail
* immediately. */
/* Our own message? Somebody is trying to send its own client a message,
* let's not dead-lock, let's fail immediately. */
sd_bus_message_unref(incoming);
r = -ELOOP;
goto fail;
}
@ -2673,7 +2671,6 @@ static int process_builtin(sd_bus *bus, sd_bus_message *m) {
SD_BUS_ERROR_UNKNOWN_METHOD,
"Unknown method '%s' on interface '%s'.", m->member, m->interface);
}
if (r < 0)
return r;
@ -2797,7 +2794,6 @@ static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd
return r;
*ret = TAKE_PTR(m);
return 1;
}

View file

@ -40,8 +40,8 @@ static void test_bus_set_address_system_remote(char **args) {
-EINVAL, NULL);
test_one_address(b, "user@host",
0, "unixexec:path=ssh,argv1=-xT,argv2=--,argv3=user%40host,argv4=systemd-stdio-bridge");
test_one_address(b, "user@host@host",
-EINVAL, NULL);
test_one_address(b, "user@host@host",
-EINVAL, NULL);
test_one_address(b, "[::1]",
0, "unixexec:path=ssh,argv1=-xT,argv2=--,argv3=%3a%3a1,argv4=systemd-stdio-bridge");
test_one_address(b, "user@[::1]",

View file

@ -0,0 +1,44 @@
/* SPDX-License-Identifier: LGPL-2.1+ */
#include "main-func.h"
#include "sd-bus.h"
#include "tests.h"
static int run(int argc, char *argv[]) {
sd_bus_message *m = NULL;
sd_bus *bus = NULL;
int r;
/* This test will result in a memory leak in <= v240, but not on v241. Hence to be really useful it
* should be run through a leak tracker such as valgrind. */
r = sd_bus_open_system(&bus);
if (r < 0)
return log_tests_skipped("Failed to connect to bus");
/* Create a message and enqueue it (this shouldn't send it though as the connection setup is not complete yet) */
assert_se(sd_bus_message_new_method_call(bus, &m, "foo.bar", "/foo", "quux.quux", "waldo") >= 0);
assert_se(sd_bus_send(bus, m, NULL) >= 0);
/* Let's now unref the message first and the bus second. */
m = sd_bus_message_unref(m);
bus = sd_bus_unref(bus);
/* We should have a memory leak now on <= v240. Let's do this again, but destory in the opposite
* order. On v240 that too should be a leak. */
r = sd_bus_open_system(&bus);
if (r < 0)
return log_tests_skipped("Failed to connect to bus");
assert_se(sd_bus_message_new_method_call(bus, &m, "foo.bar", "/foo", "quux.quux", "waldo") >= 0);
assert_se(sd_bus_send(bus, m, NULL) >= 0);
/* Let's now unref things in the opposite order */
bus = sd_bus_unref(bus);
m = sd_bus_message_unref(m);
return 0;
}
DEFINE_MAIN_FUNCTION(run);

View file

@ -867,6 +867,10 @@ tests += [
[],
[threads]],
[['src/libsystemd/sd-bus/test-bus-queue-ref-cycle.c'],
[],
[threads]],
[['src/libsystemd/sd-bus/test-bus-watch-bind.c'],
[],
[threads], '', 'timeout=120'],