sd-bus: use "queue" message references for managing r/w message queues in connection objects

Let's make use of the new concept the previous commit added.

See: #4846
This commit is contained in:
Lennart Poettering 2019-01-17 18:31:59 +01:00
parent 1b3f9dd759
commit c1757a70ea
2 changed files with 32 additions and 34 deletions

View File

@ -1110,8 +1110,10 @@ static int bus_socket_make_message(sd_bus *bus, size_t size) {
bus->fds = NULL;
bus->n_fds = 0;
if (t)
bus->rqueue[bus->rqueue_size++] = t;
if (t) {
bus->rqueue[bus->rqueue_size++] = bus_message_ref_queued(t, bus);
sd_bus_message_unref(t);
}
return 1;
}

View File

@ -146,13 +146,13 @@ static void bus_reset_queues(sd_bus *b) {
assert(b);
while (b->rqueue_size > 0)
sd_bus_message_unref(b->rqueue[--b->rqueue_size]);
bus_message_unref_queued(b->rqueue[--b->rqueue_size], b);
b->rqueue = mfree(b->rqueue);
b->rqueue_allocated = 0;
while (b->wqueue_size > 0)
sd_bus_message_unref(b->wqueue[--b->wqueue_size]);
bus_message_unref_queued(b->wqueue[--b->wqueue_size], b);
b->wqueue = mfree(b->wqueue);
b->wqueue_allocated = 0;
@ -493,7 +493,7 @@ static int synthesize_connected_signal(sd_bus *bus) {
/* Insert at the very front */
memmove(bus->rqueue + 1, bus->rqueue, sizeof(sd_bus_message*) * bus->rqueue_size);
bus->rqueue[0] = TAKE_PTR(m);
bus->rqueue[0] = bus_message_ref_queued(m, bus);
bus->rqueue_size++;
return 0;
@ -1811,7 +1811,7 @@ static int dispatch_wqueue(sd_bus *bus) {
* anyway. */
bus->wqueue_size--;
sd_bus_message_unref(bus->wqueue[0]);
bus_message_unref_queued(bus->wqueue[0], bus);
memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
bus->windex = 0;
@ -1840,6 +1840,15 @@ int bus_rqueue_make_room(sd_bus *bus) {
return 0;
}
static void rqueue_drop_one(sd_bus *bus, size_t i) {
assert(bus);
assert(i < bus->rqueue_size);
bus_message_unref_queued(bus->rqueue[i], bus);
memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
bus->rqueue_size--;
}
static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **m) {
int r, ret = 0;
@ -1854,10 +1863,8 @@ static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd
for (;;) {
if (bus->rqueue_size > 0) {
/* Dispatch a queued message */
*m = bus->rqueue[0];
bus->rqueue_size--;
memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
*m = sd_bus_message_ref(bus->rqueue[0]);
rqueue_drop_one(bus, 0);
return 1;
}
@ -1935,7 +1942,7 @@ _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie) {
* of the wqueue array is always allocated so
* that we always can remember how much was
* written. */
bus->wqueue[0] = sd_bus_message_ref(m);
bus->wqueue[0] = bus_message_ref_queued(m, bus);
bus->wqueue_size = 1;
bus->windex = idx;
}
@ -1949,7 +1956,7 @@ _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie) {
if (!GREEDY_REALLOC(bus->wqueue, bus->wqueue_allocated, bus->wqueue_size + 1))
return -ENOMEM;
bus->wqueue[bus->wqueue_size++] = sd_bus_message_ref(m);
bus->wqueue[bus->wqueue_size++] = bus_message_ref_queued(m, bus);
}
finish:
@ -2169,37 +2176,30 @@ _public_ int sd_bus_call(
usec_t left;
while (i < bus->rqueue_size) {
sd_bus_message *incoming = NULL;
_cleanup_(sd_bus_message_unrefp) sd_bus_message *incoming = NULL;
incoming = bus->rqueue[i];
incoming = sd_bus_message_ref(bus->rqueue[i]);
if (incoming->reply_cookie == cookie) {
/* Found a match! */
memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
bus->rqueue_size--;
rqueue_drop_one(bus, i);
log_debug_bus_message(incoming);
if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
if (incoming->n_fds <= 0 || bus->accept_fd) {
if (reply)
*reply = incoming;
else
sd_bus_message_unref(incoming);
*reply = TAKE_PTR(incoming);
return 1;
}
r = sd_bus_error_setf(error, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Reply message contained file descriptors which I couldn't accept. Sorry.");
sd_bus_message_unref(incoming);
return r;
return sd_bus_error_setf(error, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Reply message contained file descriptors which I couldn't accept. Sorry.");
} else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
r = sd_bus_error_copy(error, &incoming->error);
sd_bus_message_unref(incoming);
return r;
} else {
} else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
return sd_bus_error_copy(error, &incoming->error);
else {
r = -EIO;
goto fail;
}
@ -2209,15 +2209,11 @@ _public_ int sd_bus_call(
incoming->sender &&
streq(bus->unique_name, incoming->sender)) {
memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
bus->rqueue_size--;
rqueue_drop_one(bus, i);
/* Our own message? Somebody is trying
* to send its own client a message,
* let's not dead-lock, let's fail
* immediately. */
/* Our own message? Somebody is trying to send its own client a message,
* let's not dead-lock, let's fail immediately. */
sd_bus_message_unref(incoming);
r = -ELOOP;
goto fail;
}