bus: add sd_bus_process_priority() to support prioq mode of kdbus
This commit is contained in:
parent
ca7b42c816
commit
766c580959
|
@ -46,6 +46,7 @@ global:
|
|||
sd_bus_get_events;
|
||||
sd_bus_get_timeout;
|
||||
sd_bus_process;
|
||||
sd_bus_process_priority;
|
||||
sd_bus_wait;
|
||||
sd_bus_flush;
|
||||
sd_bus_get_current;
|
||||
|
|
|
@ -1039,7 +1039,7 @@ static int bus_kernel_translate_message(sd_bus *bus, struct kdbus_msg *k) {
|
|||
return translate[found->type - _KDBUS_ITEM_KERNEL_BASE](bus, k, found);
|
||||
}
|
||||
|
||||
int bus_kernel_read_message(sd_bus *bus) {
|
||||
int bus_kernel_read_message(sd_bus *bus, bool hint_priority, int64_t priority) {
|
||||
struct kdbus_cmd_recv recv = {};
|
||||
struct kdbus_msg *k;
|
||||
int r;
|
||||
|
@ -1050,6 +1050,11 @@ int bus_kernel_read_message(sd_bus *bus) {
|
|||
if (r < 0)
|
||||
return r;
|
||||
|
||||
if (hint_priority) {
|
||||
recv.flags |= KDBUS_RECV_USE_PRIORITY;
|
||||
recv.priority = priority;
|
||||
}
|
||||
|
||||
r = ioctl(bus->input_fd, KDBUS_CMD_MSG_RECV, &recv);
|
||||
if (r < 0) {
|
||||
if (errno == EAGAIN)
|
||||
|
|
|
@ -61,7 +61,7 @@ int bus_kernel_connect(sd_bus *b);
|
|||
int bus_kernel_take_fd(sd_bus *b);
|
||||
|
||||
int bus_kernel_write_message(sd_bus *bus, sd_bus_message *m, bool hint_sync_call);
|
||||
int bus_kernel_read_message(sd_bus *bus);
|
||||
int bus_kernel_read_message(sd_bus *bus, bool hint_priority, int64_t priority);
|
||||
|
||||
int bus_kernel_create_bus(const char *name, bool world, char **s);
|
||||
int bus_kernel_create_namespace(const char *name, char **s);
|
||||
|
|
|
@ -1458,11 +1458,11 @@ static int dispatch_wqueue(sd_bus *bus) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
static int bus_read_message(sd_bus *bus) {
|
||||
static int bus_read_message(sd_bus *bus, bool hint_priority, int64_t priority) {
|
||||
assert(bus);
|
||||
|
||||
if (bus->is_kernel)
|
||||
return bus_kernel_read_message(bus);
|
||||
return bus_kernel_read_message(bus, hint_priority, priority);
|
||||
else
|
||||
return bus_socket_read_message(bus);
|
||||
}
|
||||
|
@ -1479,13 +1479,17 @@ int bus_rqueue_make_room(sd_bus *bus) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
|
||||
static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **m) {
|
||||
int r, ret = 0;
|
||||
|
||||
assert(bus);
|
||||
assert(m);
|
||||
assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
|
||||
|
||||
/* Note that the priority logic is only available on kdbus,
|
||||
* where the rqueue is unused. We check the rqueue here
|
||||
* anyway, because it's simple... */
|
||||
|
||||
for (;;) {
|
||||
if (bus->rqueue_size > 0) {
|
||||
/* Dispatch a queued message */
|
||||
|
@ -1497,7 +1501,7 @@ static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
|
|||
}
|
||||
|
||||
/* Try to read a new message */
|
||||
r = bus_read_message(bus);
|
||||
r = bus_read_message(bus, hint_priority, priority);
|
||||
if (r < 0)
|
||||
return r;
|
||||
if (r == 0)
|
||||
|
@ -1837,7 +1841,7 @@ _public_ int sd_bus_call(
|
|||
i++;
|
||||
}
|
||||
|
||||
r = bus_read_message(bus);
|
||||
r = bus_read_message(bus, false, 0);
|
||||
if (r < 0) {
|
||||
if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
|
||||
bus_enter_closing(bus);
|
||||
|
@ -2203,7 +2207,7 @@ finish:
|
|||
return r;
|
||||
}
|
||||
|
||||
static int process_running(sd_bus *bus, sd_bus_message **ret) {
|
||||
static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
|
||||
_cleanup_bus_message_unref_ sd_bus_message *m = NULL;
|
||||
int r;
|
||||
|
||||
|
@ -2218,7 +2222,7 @@ static int process_running(sd_bus *bus, sd_bus_message **ret) {
|
|||
if (r != 0)
|
||||
goto null_message;
|
||||
|
||||
r = dispatch_rqueue(bus, &m);
|
||||
r = dispatch_rqueue(bus, hint_priority, priority, &m);
|
||||
if (r < 0)
|
||||
return r;
|
||||
if (!m)
|
||||
|
@ -2344,7 +2348,7 @@ finish:
|
|||
return r;
|
||||
}
|
||||
|
||||
_public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
|
||||
static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
|
||||
BUS_DONT_DESTROY(bus);
|
||||
int r;
|
||||
|
||||
|
@ -2393,7 +2397,7 @@ _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
|
|||
|
||||
case BUS_RUNNING:
|
||||
case BUS_HELLO:
|
||||
r = process_running(bus, ret);
|
||||
r = process_running(bus, hint_priority, priority, ret);
|
||||
if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
|
||||
bus_enter_closing(bus);
|
||||
r = 1;
|
||||
|
@ -2411,6 +2415,14 @@ _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
|
|||
assert_not_reached("Unknown state");
|
||||
}
|
||||
|
||||
_public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
|
||||
return bus_process_internal(bus, false, 0, ret);
|
||||
}
|
||||
|
||||
_public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_message **ret) {
|
||||
return bus_process_internal(bus, true, priority, ret);
|
||||
}
|
||||
|
||||
static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
|
||||
struct pollfd p[2] = {};
|
||||
int r, e, n;
|
||||
|
|
|
@ -110,6 +110,9 @@ int main(int argc, char *argv[]) {
|
|||
r = sd_bus_try_close(b);
|
||||
assert_se(r == -EBUSY);
|
||||
|
||||
r = sd_bus_process_priority(b, -10, &m);
|
||||
assert_se(r == -ENOMSG);
|
||||
|
||||
r = sd_bus_process(b, &m);
|
||||
assert_se(r > 0);
|
||||
assert_se(m);
|
||||
|
|
|
@ -139,6 +139,7 @@ int sd_bus_get_fd(sd_bus *bus);
|
|||
int sd_bus_get_events(sd_bus *bus);
|
||||
int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec);
|
||||
int sd_bus_process(sd_bus *bus, sd_bus_message **r);
|
||||
int sd_bus_process_priority(sd_bus *bus, int64_t max_priority, sd_bus_message **r);
|
||||
int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec);
|
||||
int sd_bus_flush(sd_bus *bus);
|
||||
|
||||
|
|
Loading…
Reference in New Issue