varlink: fix support for more/continues method calls
This commit is contained in:
parent
bdf2357c12
commit
45a6c96598
|
@ -29,6 +29,7 @@ typedef enum VarlinkState {
|
||||||
/* Client side states */
|
/* Client side states */
|
||||||
VARLINK_IDLE_CLIENT,
|
VARLINK_IDLE_CLIENT,
|
||||||
VARLINK_AWAITING_REPLY,
|
VARLINK_AWAITING_REPLY,
|
||||||
|
VARLINK_AWAITING_REPLY_MORE,
|
||||||
VARLINK_CALLING,
|
VARLINK_CALLING,
|
||||||
VARLINK_CALLED,
|
VARLINK_CALLED,
|
||||||
VARLINK_PROCESSING_REPLY,
|
VARLINK_PROCESSING_REPLY,
|
||||||
|
@ -39,7 +40,6 @@ typedef enum VarlinkState {
|
||||||
VARLINK_PROCESSING_METHOD_MORE,
|
VARLINK_PROCESSING_METHOD_MORE,
|
||||||
VARLINK_PROCESSING_METHOD_ONEWAY,
|
VARLINK_PROCESSING_METHOD_ONEWAY,
|
||||||
VARLINK_PROCESSED_METHOD,
|
VARLINK_PROCESSED_METHOD,
|
||||||
VARLINK_PROCESSED_METHOD_MORE,
|
|
||||||
VARLINK_PENDING_METHOD,
|
VARLINK_PENDING_METHOD,
|
||||||
VARLINK_PENDING_METHOD_MORE,
|
VARLINK_PENDING_METHOD_MORE,
|
||||||
|
|
||||||
|
@ -63,6 +63,7 @@ typedef enum VarlinkState {
|
||||||
IN_SET(state, \
|
IN_SET(state, \
|
||||||
VARLINK_IDLE_CLIENT, \
|
VARLINK_IDLE_CLIENT, \
|
||||||
VARLINK_AWAITING_REPLY, \
|
VARLINK_AWAITING_REPLY, \
|
||||||
|
VARLINK_AWAITING_REPLY_MORE, \
|
||||||
VARLINK_CALLING, \
|
VARLINK_CALLING, \
|
||||||
VARLINK_CALLED, \
|
VARLINK_CALLED, \
|
||||||
VARLINK_PROCESSING_REPLY, \
|
VARLINK_PROCESSING_REPLY, \
|
||||||
|
@ -71,7 +72,6 @@ typedef enum VarlinkState {
|
||||||
VARLINK_PROCESSING_METHOD_MORE, \
|
VARLINK_PROCESSING_METHOD_MORE, \
|
||||||
VARLINK_PROCESSING_METHOD_ONEWAY, \
|
VARLINK_PROCESSING_METHOD_ONEWAY, \
|
||||||
VARLINK_PROCESSED_METHOD, \
|
VARLINK_PROCESSED_METHOD, \
|
||||||
VARLINK_PROCESSED_METHOD_MORE, \
|
|
||||||
VARLINK_PENDING_METHOD, \
|
VARLINK_PENDING_METHOD, \
|
||||||
VARLINK_PENDING_METHOD_MORE)
|
VARLINK_PENDING_METHOD_MORE)
|
||||||
|
|
||||||
|
@ -185,6 +185,7 @@ struct VarlinkServer {
|
||||||
static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
|
static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
|
||||||
[VARLINK_IDLE_CLIENT] = "idle-client",
|
[VARLINK_IDLE_CLIENT] = "idle-client",
|
||||||
[VARLINK_AWAITING_REPLY] = "awaiting-reply",
|
[VARLINK_AWAITING_REPLY] = "awaiting-reply",
|
||||||
|
[VARLINK_AWAITING_REPLY_MORE] = "awaiting-reply-more",
|
||||||
[VARLINK_CALLING] = "calling",
|
[VARLINK_CALLING] = "calling",
|
||||||
[VARLINK_CALLED] = "called",
|
[VARLINK_CALLED] = "called",
|
||||||
[VARLINK_PROCESSING_REPLY] = "processing-reply",
|
[VARLINK_PROCESSING_REPLY] = "processing-reply",
|
||||||
|
@ -193,7 +194,6 @@ static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
|
||||||
[VARLINK_PROCESSING_METHOD_MORE] = "processing-method-more",
|
[VARLINK_PROCESSING_METHOD_MORE] = "processing-method-more",
|
||||||
[VARLINK_PROCESSING_METHOD_ONEWAY] = "processing-method-oneway",
|
[VARLINK_PROCESSING_METHOD_ONEWAY] = "processing-method-oneway",
|
||||||
[VARLINK_PROCESSED_METHOD] = "processed-method",
|
[VARLINK_PROCESSED_METHOD] = "processed-method",
|
||||||
[VARLINK_PROCESSED_METHOD_MORE] = "processed-method-more",
|
|
||||||
[VARLINK_PENDING_METHOD] = "pending-method",
|
[VARLINK_PENDING_METHOD] = "pending-method",
|
||||||
[VARLINK_PENDING_METHOD_MORE] = "pending-method-more",
|
[VARLINK_PENDING_METHOD_MORE] = "pending-method-more",
|
||||||
[VARLINK_PENDING_DISCONNECT] = "pending-disconnect",
|
[VARLINK_PENDING_DISCONNECT] = "pending-disconnect",
|
||||||
|
@ -405,7 +405,7 @@ static int varlink_test_disconnect(Varlink *v) {
|
||||||
goto disconnect;
|
goto disconnect;
|
||||||
|
|
||||||
/* If we are waiting for incoming data but the read side is shut down, disconnect. */
|
/* If we are waiting for incoming data but the read side is shut down, disconnect. */
|
||||||
if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected)
|
if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected)
|
||||||
goto disconnect;
|
goto disconnect;
|
||||||
|
|
||||||
/* Similar, if are a client that hasn't written anything yet but the write side is dead, also
|
/* Similar, if are a client that hasn't written anything yet but the write side is dead, also
|
||||||
|
@ -478,7 +478,7 @@ static int varlink_read(Varlink *v) {
|
||||||
|
|
||||||
assert(v);
|
assert(v);
|
||||||
|
|
||||||
if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER))
|
if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER))
|
||||||
return 0;
|
return 0;
|
||||||
if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
|
if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -596,7 +596,7 @@ static int varlink_parse_message(Varlink *v) {
|
||||||
static int varlink_test_timeout(Varlink *v) {
|
static int varlink_test_timeout(Varlink *v) {
|
||||||
assert(v);
|
assert(v);
|
||||||
|
|
||||||
if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING))
|
if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
|
||||||
return 0;
|
return 0;
|
||||||
if (v->timeout == USEC_INFINITY)
|
if (v->timeout == USEC_INFINITY)
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -673,7 +673,7 @@ static int varlink_dispatch_reply(Varlink *v) {
|
||||||
|
|
||||||
assert(v);
|
assert(v);
|
||||||
|
|
||||||
if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING))
|
if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
|
||||||
return 0;
|
return 0;
|
||||||
if (!v->current)
|
if (!v->current)
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -715,6 +715,11 @@ static int varlink_dispatch_reply(Varlink *v) {
|
||||||
goto invalid;
|
goto invalid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Replies with 'continue' set are only OK if we set 'more' when the method call was initiated */
|
||||||
|
if (v->state != VARLINK_AWAITING_REPLY_MORE && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
|
||||||
|
goto invalid;
|
||||||
|
|
||||||
|
/* An error is final */
|
||||||
if (error && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
|
if (error && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
|
||||||
goto invalid;
|
goto invalid;
|
||||||
|
|
||||||
|
@ -722,7 +727,7 @@ static int varlink_dispatch_reply(Varlink *v) {
|
||||||
if (r < 0)
|
if (r < 0)
|
||||||
goto invalid;
|
goto invalid;
|
||||||
|
|
||||||
if (v->state == VARLINK_AWAITING_REPLY) {
|
if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE)) {
|
||||||
varlink_set_state(v, VARLINK_PROCESSING_REPLY);
|
varlink_set_state(v, VARLINK_PROCESSING_REPLY);
|
||||||
|
|
||||||
if (v->reply_callback) {
|
if (v->reply_callback) {
|
||||||
|
@ -734,17 +739,18 @@ static int varlink_dispatch_reply(Varlink *v) {
|
||||||
v->current = json_variant_unref(v->current);
|
v->current = json_variant_unref(v->current);
|
||||||
|
|
||||||
if (v->state == VARLINK_PROCESSING_REPLY) {
|
if (v->state == VARLINK_PROCESSING_REPLY) {
|
||||||
assert(v->n_pending > 0);
|
|
||||||
v->n_pending--;
|
|
||||||
|
|
||||||
varlink_set_state(v, v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY);
|
assert(v->n_pending > 0);
|
||||||
|
|
||||||
|
if (!FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
|
||||||
|
v->n_pending--;
|
||||||
|
|
||||||
|
varlink_set_state(v,
|
||||||
|
FLAGS_SET(flags, VARLINK_REPLY_CONTINUES) ? VARLINK_AWAITING_REPLY_MORE :
|
||||||
|
v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
assert(v->state == VARLINK_CALLING);
|
assert(v->state == VARLINK_CALLING);
|
||||||
|
|
||||||
if (FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
|
|
||||||
goto invalid;
|
|
||||||
|
|
||||||
varlink_set_state(v, VARLINK_CALLED);
|
varlink_set_state(v, VARLINK_CALLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -878,7 +884,6 @@ static int varlink_dispatch_method(Varlink *v) {
|
||||||
varlink_set_state(v, VARLINK_PENDING_METHOD);
|
varlink_set_state(v, VARLINK_PENDING_METHOD);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case VARLINK_PROCESSED_METHOD_MORE: /* One reply for a "more" message was sent, more to come */
|
|
||||||
case VARLINK_PROCESSING_METHOD_MORE: /* No reply for a "more" message was sent, more to come */
|
case VARLINK_PROCESSING_METHOD_MORE: /* No reply for a "more" message was sent, more to come */
|
||||||
varlink_set_state(v, VARLINK_PENDING_METHOD_MORE);
|
varlink_set_state(v, VARLINK_PENDING_METHOD_MORE);
|
||||||
break;
|
break;
|
||||||
|
@ -1073,7 +1078,7 @@ int varlink_get_events(Varlink *v) {
|
||||||
return EPOLLOUT;
|
return EPOLLOUT;
|
||||||
|
|
||||||
if (!v->read_disconnected &&
|
if (!v->read_disconnected &&
|
||||||
IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) &&
|
IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) &&
|
||||||
!v->current &&
|
!v->current &&
|
||||||
v->input_buffer_unscanned <= 0)
|
v->input_buffer_unscanned <= 0)
|
||||||
ret |= EPOLLIN;
|
ret |= EPOLLIN;
|
||||||
|
@ -1091,7 +1096,7 @@ int varlink_get_timeout(Varlink *v, usec_t *ret) {
|
||||||
if (v->state == VARLINK_DISCONNECTED)
|
if (v->state == VARLINK_DISCONNECTED)
|
||||||
return -ENOTCONN;
|
return -ENOTCONN;
|
||||||
|
|
||||||
if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING) &&
|
if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING) &&
|
||||||
v->timeout != USEC_INFINITY) {
|
v->timeout != USEC_INFINITY) {
|
||||||
if (ret)
|
if (ret)
|
||||||
*ret = usec_add(v->timestamp, v->timeout);
|
*ret = usec_add(v->timestamp, v->timeout);
|
||||||
|
@ -1259,6 +1264,8 @@ int varlink_send(Varlink *v, const char *method, JsonVariant *parameters) {
|
||||||
|
|
||||||
if (v->state == VARLINK_DISCONNECTED)
|
if (v->state == VARLINK_DISCONNECTED)
|
||||||
return -ENOTCONN;
|
return -ENOTCONN;
|
||||||
|
|
||||||
|
/* We allow enqueuing multiple method calls at once! */
|
||||||
if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
|
if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
|
||||||
return -EBUSY;
|
return -EBUSY;
|
||||||
|
|
||||||
|
@ -1308,6 +1315,8 @@ int varlink_invoke(Varlink *v, const char *method, JsonVariant *parameters) {
|
||||||
|
|
||||||
if (v->state == VARLINK_DISCONNECTED)
|
if (v->state == VARLINK_DISCONNECTED)
|
||||||
return -ENOTCONN;
|
return -ENOTCONN;
|
||||||
|
|
||||||
|
/* We allow enqueing multiple method calls at once! */
|
||||||
if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
|
if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
|
||||||
return -EBUSY;
|
return -EBUSY;
|
||||||
|
|
||||||
|
@ -1349,6 +1358,60 @@ int varlink_invokeb(Varlink *v, const char *method, ...) {
|
||||||
return varlink_invoke(v, method, parameters);
|
return varlink_invoke(v, method, parameters);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int varlink_observe(Varlink *v, const char *method, JsonVariant *parameters) {
|
||||||
|
_cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
|
||||||
|
int r;
|
||||||
|
|
||||||
|
assert_return(v, -EINVAL);
|
||||||
|
assert_return(method, -EINVAL);
|
||||||
|
|
||||||
|
if (v->state == VARLINK_DISCONNECTED)
|
||||||
|
return -ENOTCONN;
|
||||||
|
/* Note that we don't allow enqueuing multiple method calls when we are in more/continues mode! We
|
||||||
|
* thus insist on an idle client here. */
|
||||||
|
if (v->state != VARLINK_IDLE_CLIENT)
|
||||||
|
return -EBUSY;
|
||||||
|
|
||||||
|
r = varlink_sanitize_parameters(¶meters);
|
||||||
|
if (r < 0)
|
||||||
|
return r;
|
||||||
|
|
||||||
|
r = json_build(&m, JSON_BUILD_OBJECT(
|
||||||
|
JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
|
||||||
|
JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
|
||||||
|
JSON_BUILD_PAIR("more", JSON_BUILD_BOOLEAN(true))));
|
||||||
|
if (r < 0)
|
||||||
|
return r;
|
||||||
|
|
||||||
|
r = varlink_enqueue_json(v, m);
|
||||||
|
if (r < 0)
|
||||||
|
return r;
|
||||||
|
|
||||||
|
|
||||||
|
varlink_set_state(v, VARLINK_AWAITING_REPLY_MORE);
|
||||||
|
v->n_pending++;
|
||||||
|
v->timestamp = now(CLOCK_MONOTONIC);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int varlink_observeb(Varlink *v, const char *method, ...) {
|
||||||
|
_cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
|
||||||
|
va_list ap;
|
||||||
|
int r;
|
||||||
|
|
||||||
|
assert_return(v, -EINVAL);
|
||||||
|
|
||||||
|
va_start(ap, method);
|
||||||
|
r = json_buildv(¶meters, ap);
|
||||||
|
va_end(ap);
|
||||||
|
|
||||||
|
if (r < 0)
|
||||||
|
return r;
|
||||||
|
|
||||||
|
return varlink_observe(v, method, parameters);
|
||||||
|
}
|
||||||
|
|
||||||
int varlink_call(
|
int varlink_call(
|
||||||
Varlink *v,
|
Varlink *v,
|
||||||
const char *method,
|
const char *method,
|
||||||
|
|
|
@ -86,6 +86,10 @@ int varlink_callb(Varlink *v, const char *method, JsonVariant **ret_parameters,
|
||||||
int varlink_invoke(Varlink *v, const char *method, JsonVariant *parameters);
|
int varlink_invoke(Varlink *v, const char *method, JsonVariant *parameters);
|
||||||
int varlink_invokeb(Varlink *v, const char *method, ...);
|
int varlink_invokeb(Varlink *v, const char *method, ...);
|
||||||
|
|
||||||
|
/* Enqueue method call, expect a reply now, and possibly more later, which are all delivered to the reply callback */
|
||||||
|
int varlink_observe(Varlink *v, const char *method, JsonVariant *parameters);
|
||||||
|
int varlink_observeb(Varlink *v, const char *method, ...);
|
||||||
|
|
||||||
/* Enqueue a final reply */
|
/* Enqueue a final reply */
|
||||||
int varlink_reply(Varlink *v, JsonVariant *parameters);
|
int varlink_reply(Varlink *v, JsonVariant *parameters);
|
||||||
int varlink_replyb(Varlink *v, ...);
|
int varlink_replyb(Varlink *v, ...);
|
||||||
|
|
Loading…
Reference in a new issue