Merge pull request #17703 from poettering/event-ratelimit

sd-event: add a concept of ratelimiting
This commit is contained in:
Zbigniew Jędrzejewski-Szmek 2020-12-01 21:47:43 +01:00 committed by GitHub
commit f319b2b1b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 694 additions and 109 deletions

View File

@ -582,6 +582,10 @@ manpages = [
'SD_EVENT_PRIORITY_NORMAL',
'sd_event_source_get_priority'],
''],
['sd_event_source_set_ratelimit',
'3',
['sd_event_source_get_ratelimit', 'sd_event_source_is_ratelimited'],
''],
['sd_event_source_set_userdata', '3', ['sd_event_source_get_userdata'], ''],
['sd_event_source_unref',
'3',

View File

@ -56,6 +56,7 @@
<citerefentry><refentrytitle>sd_event_source_get_pending</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_source_set_description</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_source_set_prepare</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_source_set_ratelimit</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_wait</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_get_fd</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_set_watchdog</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
@ -147,6 +148,7 @@
<citerefentry><refentrytitle>sd_event_source_get_pending</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_source_set_description</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_source_set_prepare</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_source_set_ratelimit</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_wait</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_get_fd</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_set_watchdog</refentrytitle><manvolnum>3</manvolnum></citerefentry>,

View File

@ -147,7 +147,8 @@
<citerefentry><refentrytitle>sd_event_add_child</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_add_inotify</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_add_defer</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_source_unref</refentrytitle><manvolnum>3</manvolnum></citerefentry>
<citerefentry><refentrytitle>sd_event_source_unref</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_source_set_ratelimit</refentrytitle><manvolnum>3</manvolnum></citerefentry>
</para>
</refsect1>

View File

@ -0,0 +1,148 @@
<?xml version='1.0'?>
<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN"
"http://www.oasis-open.org/docbook/xml/4.2/docbookx.dtd">
<!-- SPDX-License-Identifier: LGPL-2.1-or-later -->
<refentry id="sd_event_source_set_ratelimit" xmlns:xi="http://www.w3.org/2001/XInclude">
<refentryinfo>
<title>sd_event_source_set_ratelimit</title>
<productname>systemd</productname>
</refentryinfo>
<refmeta>
<refentrytitle>sd_event_source_set_ratelimit</refentrytitle>
<manvolnum>3</manvolnum>
</refmeta>
<refnamediv>
<refname>sd_event_source_set_ratelimit</refname>
<refname>sd_event_source_get_ratelimit</refname>
<refname>sd_event_source_is_ratelimited</refname>
<refpurpose>Configure rate limiting on event sources</refpurpose>
</refnamediv>
<refsynopsisdiv>
<funcsynopsis>
<funcsynopsisinfo>#include &lt;systemd/sd-event.h&gt;</funcsynopsisinfo>
<funcprototype>
<funcdef>int <function>sd_event_source_set_ratelimit</function></funcdef>
<paramdef>sd_event_source *<parameter>source</parameter></paramdef>
<paramdef>uint64_t <parameter>interval_usec</parameter></paramdef>
<paramdef>unsigned <parameter>burst</parameter></paramdef>
</funcprototype>
<funcprototype>
<funcdef>int <function>sd_event_source_get_ratelimit</function></funcdef>
<paramdef>sd_event_source *<parameter>source</parameter></paramdef>
<paramdef>uint64_t* <parameter>ret_interval_usec</parameter></paramdef>
<paramdef>unsigned* <parameter>ret_burst</parameter></paramdef>
</funcprototype>
<funcprototype>
<funcdef>int <function>sd_event_source_is_ratelimited</function></funcdef>
<paramdef>sd_event_source *<parameter>source</parameter></paramdef>
</funcprototype>
</funcsynopsis>
</refsynopsisdiv>
<refsect1>
<title>Description</title>
<para><function>sd_event_source_set_ratelimit()</function> may be used to enforce rate limiting on an
event source. When used an event source will be temporarily turned off when it fires more often then a
specified burst number within a specified time interval. This is useful as simple mechanism to avoid
event source starvation if high priority event sources fire very frequently.</para>
<para>Pass the event source to operate on as first argument, a time interval in microseconds as second
argument and a maximum dispatch limit ("burst") as third parameter. Whenever the event source is
dispatched more often than the specified burst within the specified interval it is placed in a mode
similar to being disabled with
<citerefentry><refentrytitle>sd_event_source_set_enabled</refentrytitle><manvolnum>3</manvolnum></citerefentry>
and the <constant>SD_EVENT_OFF</constant> parameter. However it is disabled only temporarily once the
specified interval is over regular operation resumes. It is again disabled temporarily once the specified rate
limiting is hit the next time. If either the interval or the burst value are specified as zero, rate
limiting is turned off. By default event sources do not have rate limiting enabled. Note that rate
limiting and disabling via <function>sd_event_source_set_enabled()</function> are independent of each
other, and an event source will only effect event loop wake-ups and is dispatched while it both is
enabled and rate limiting is not in effect.</para>
<para><function>sd_event_source_get_ratelimit()</function> may be used to query the current rate limiting
parameters set on the event source object <parameter>source</parameter>. The previously set interval and
burst vales are returned in the second and third argument.</para>
<para><function>sd_event_source_is_ratelimited()</function> may be used to query whether the event source
is currently affected by rate limiting, i.e. it has recently hit the rate limit and is currently
temporarily disabled due to that.</para>
<para>Rate limiting is currently implemented for I/O, timer, signal, defer and inotify event
sources.</para>
</refsect1>
<refsect1>
<title>Return Value</title>
<para>On success, <function>sd_event_source_set_ratelimit()</function> and
<function>sd_event_source_get_ratelimit()</function> return a non-negative integer. On failure, they
return a negative errno-style error code. <function>sd_event_source_is_ratelimited</function> returns
zero if rate limiting is currently not in effect and greater than zero if it is in effect; it returns a
negative errno-style error code on failure.</para>
<refsect2>
<title>Errors</title>
<para>Returned errors may indicate the following problems:</para>
<variablelist>
<varlistentry>
<term><constant>-EINVAL</constant></term>
<listitem><para><parameter>source</parameter> is not a valid pointer to an
<structname>sd_event_source</structname> object.
</para></listitem>
</varlistentry>
<varlistentry>
<term><constant>-ECHILD</constant></term>
<listitem><para>The event loop has been created in a different process.</para></listitem>
</varlistentry>
<varlistentry>
<term><constant>-EDOM</constant></term>
<listitem><para>It was attempted to use the rate limiting feature on an event source type that does
not support rate limiting.</para></listitem>
</varlistentry>
<varlistentry>
<term><constant>-ENOEXEC</constant></term>
<listitem><para><function>sd_event_source_get_ratelimit()</function> was called on a event source
that doesn't have rate limiting configured.</para></listitem>
</varlistentry>
</variablelist>
</refsect2>
</refsect1>
<xi:include href="libsystemd-pkgconfig.xml" />
<refsect1>
<title>See Also</title>
<para>
<citerefentry><refentrytitle>sd-event</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_add_io</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_add_time</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_add_signal</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_add_inotify</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_add_defer</refentrytitle><manvolnum>3</manvolnum></citerefentry>,
<citerefentry><refentrytitle>sd_event_source_set_enabled</refentrytitle><manvolnum>3</manvolnum></citerefentry>
</para>
</refsect1>
</refentry>

View File

@ -1855,6 +1855,12 @@ static void mount_enumerate(Manager *m) {
goto fail;
}
r = sd_event_source_set_ratelimit(m->mount_event_source, 1 * USEC_PER_SEC, 5);
if (r < 0) {
log_error_errno(r, "Failed to enable rate limit for mount events: %m");
goto fail;
}
(void) sd_event_source_set_description(m->mount_event_source, "mount-monitor-dispatch");
}

View File

@ -736,3 +736,10 @@ global:
sd_device_has_current_tag;
sd_device_set_sysattr_valuef;
} LIBSYSTEMD_246;
LIBSYSTEMD_248 {
global:
sd_event_source_set_ratelimit;
sd_event_source_get_ratelimit;
sd_event_source_is_ratelimited;
} LIBSYSTEMD_246;

View File

@ -11,6 +11,7 @@
#include "hashmap.h"
#include "list.h"
#include "prioq.h"
#include "ratelimit.h"
typedef enum EventSourceType {
SOURCE_IO,
@ -61,6 +62,7 @@ struct sd_event_source {
bool dispatching:1;
bool floating:1;
bool exit_on_failure:1;
bool ratelimited:1;
int64_t priority;
unsigned pending_index;
@ -72,6 +74,13 @@ struct sd_event_source {
LIST_FIELDS(sd_event_source, sources);
RateLimit rate_limit;
/* These are primarily fields relevant for time event sources, but since any event source can
* effectively become one when rate-limited, this is part of the common fields. */
unsigned earliest_index;
unsigned latest_index;
union {
struct {
sd_event_io_handler_t callback;
@ -84,8 +93,6 @@ struct sd_event_source {
struct {
sd_event_time_handler_t callback;
usec_t next, accuracy;
unsigned earliest_index;
unsigned latest_index;
} time;
struct {
sd_event_signal_handler_t callback;

View File

@ -37,6 +37,16 @@ static bool EVENT_SOURCE_WATCH_PIDFD(sd_event_source *s) {
s->child.options == WEXITED;
}
static bool event_source_is_online(sd_event_source *s) {
assert(s);
return s->enabled != SD_EVENT_OFF && !s->ratelimited;
}
static bool event_source_is_offline(sd_event_source *s) {
assert(s);
return s->enabled == SD_EVENT_OFF || s->ratelimited;
}
static const char* const event_source_type_table[_SOURCE_EVENT_SOURCE_TYPE_MAX] = {
[SOURCE_IO] = "io",
[SOURCE_TIME_REALTIME] = "realtime",
@ -55,7 +65,25 @@ static const char* const event_source_type_table[_SOURCE_EVENT_SOURCE_TYPE_MAX]
DEFINE_PRIVATE_STRING_TABLE_LOOKUP_TO_STRING(event_source_type, int);
#define EVENT_SOURCE_IS_TIME(t) IN_SET((t), SOURCE_TIME_REALTIME, SOURCE_TIME_BOOTTIME, SOURCE_TIME_MONOTONIC, SOURCE_TIME_REALTIME_ALARM, SOURCE_TIME_BOOTTIME_ALARM)
#define EVENT_SOURCE_IS_TIME(t) \
IN_SET((t), \
SOURCE_TIME_REALTIME, \
SOURCE_TIME_BOOTTIME, \
SOURCE_TIME_MONOTONIC, \
SOURCE_TIME_REALTIME_ALARM, \
SOURCE_TIME_BOOTTIME_ALARM)
#define EVENT_SOURCE_CAN_RATE_LIMIT(t) \
IN_SET((t), \
SOURCE_IO, \
SOURCE_TIME_REALTIME, \
SOURCE_TIME_BOOTTIME, \
SOURCE_TIME_MONOTONIC, \
SOURCE_TIME_REALTIME_ALARM, \
SOURCE_TIME_BOOTTIME_ALARM, \
SOURCE_SIGNAL, \
SOURCE_DEFER, \
SOURCE_INOTIFY)
struct sd_event {
unsigned n_ref;
@ -81,7 +109,7 @@ struct sd_event {
Hashmap *signal_data; /* indexed by priority */
Hashmap *child_sources;
unsigned n_enabled_child_sources;
unsigned n_online_child_sources;
Set *post_sources;
@ -120,7 +148,7 @@ struct sd_event {
LIST_HEAD(sd_event_source, sources);
usec_t last_run, last_log;
usec_t last_run_usec, last_log_usec;
unsigned delays[sizeof(usec_t) * 8];
};
@ -146,6 +174,11 @@ static int pending_prioq_compare(const void *a, const void *b) {
if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
return 1;
/* Non rate-limited ones first. */
r = CMP(!!x->ratelimited, !!y->ratelimited);
if (r != 0)
return r;
/* Lower priority values first */
r = CMP(x->priority, y->priority);
if (r != 0)
@ -168,6 +201,11 @@ static int prepare_prioq_compare(const void *a, const void *b) {
if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
return 1;
/* Non rate-limited ones first. */
r = CMP(!!x->ratelimited, !!y->ratelimited);
if (r != 0)
return r;
/* Move most recently prepared ones last, so that we can stop
* preparing as soon as we hit one that has already been
* prepared in the current iteration */
@ -179,12 +217,30 @@ static int prepare_prioq_compare(const void *a, const void *b) {
return CMP(x->priority, y->priority);
}
static usec_t time_event_source_next(const sd_event_source *s) {
assert(s);
/* We have two kinds of event sources that have elapsation times associated with them: the actual
* time based ones and the ones for which a ratelimit can be in effect (where we want to be notified
* once the ratelimit time window ends). Let's return the next elapsing time depending on what we are
* looking at here. */
if (s->ratelimited) { /* If rate-limited the next elapsation is when the ratelimit time window ends */
assert(s->rate_limit.begin != 0);
assert(s->rate_limit.interval != 0);
return usec_add(s->rate_limit.begin, s->rate_limit.interval);
}
/* Otherwise this must be a time event source, if not ratelimited */
if (EVENT_SOURCE_IS_TIME(s->type))
return s->time.next;
return USEC_INFINITY;
}
static int earliest_time_prioq_compare(const void *a, const void *b) {
const sd_event_source *x = a, *y = b;
assert(EVENT_SOURCE_IS_TIME(x->type));
assert(x->type == y->type);
/* Enabled ones first */
if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
return -1;
@ -198,19 +254,30 @@ static int earliest_time_prioq_compare(const void *a, const void *b) {
return 1;
/* Order by time */
return CMP(x->time.next, y->time.next);
return CMP(time_event_source_next(x), time_event_source_next(y));
}
static usec_t time_event_source_latest(const sd_event_source *s) {
return usec_add(s->time.next, s->time.accuracy);
assert(s);
if (s->ratelimited) { /* For ratelimited stuff the earliest and the latest time shall actually be the
* same, as we should avoid adding additional inaccuracy on an inaccuracy time
* window */
assert(s->rate_limit.begin != 0);
assert(s->rate_limit.interval != 0);
return usec_add(s->rate_limit.begin, s->rate_limit.interval);
}
/* Must be a time event source, if not ratelimited */
if (EVENT_SOURCE_IS_TIME(s->type))
return usec_add(s->time.next, s->time.accuracy);
return USEC_INFINITY;
}
static int latest_time_prioq_compare(const void *a, const void *b) {
const sd_event_source *x = a, *y = b;
assert(EVENT_SOURCE_IS_TIME(x->type));
assert(x->type == y->type);
/* Enabled ones first */
if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
return -1;
@ -380,7 +447,7 @@ static void source_io_unregister(sd_event_source *s) {
return;
if (epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL) < 0)
log_debug_errno(errno, "Failed to remove source %s (type %s) from epoll: %m",
log_debug_errno(errno, "Failed to remove source %s (type %s) from epoll, ignoring: %m",
strna(s->description), event_source_type_to_string(s->type));
s->io.registered = false;
@ -422,7 +489,7 @@ static void source_child_pidfd_unregister(sd_event_source *s) {
if (EVENT_SOURCE_WATCH_PIDFD(s))
if (epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->child.pidfd, NULL) < 0)
log_debug_errno(errno, "Failed to remove source %s (type %s) from epoll: %m",
log_debug_errno(errno, "Failed to remove source %s (type %s) from epoll, ignoring: %m",
strna(s->description), event_source_type_to_string(s->type));
s->child.registered = false;
@ -661,12 +728,12 @@ static void event_gc_signal_data(sd_event *e, const int64_t *priority, int sig)
* and possibly drop the signalfd for it. */
if (sig == SIGCHLD &&
e->n_enabled_child_sources > 0)
e->n_online_child_sources > 0)
return;
if (e->signal_sources &&
e->signal_sources[sig] &&
e->signal_sources[sig]->enabled != SD_EVENT_OFF)
event_source_is_online(e->signal_sources[sig]))
return;
/*
@ -713,13 +780,32 @@ static void event_source_time_prioq_reshuffle(sd_event_source *s) {
struct clock_data *d;
assert(s);
assert(EVENT_SOURCE_IS_TIME(s->type));
/* Called whenever the event source's timer ordering properties changed, i.e. time, accuracy,
* pending, enable state. Makes sure the two prioq's are ordered properly again. */
assert_se(d = event_get_clock_data(s->event, s->type));
prioq_reshuffle(d->earliest, s, &s->time.earliest_index);
prioq_reshuffle(d->latest, s, &s->time.latest_index);
if (s->ratelimited)
d = &s->event->monotonic;
else {
assert(EVENT_SOURCE_IS_TIME(s->type));
assert_se(d = event_get_clock_data(s->event, s->type));
}
prioq_reshuffle(d->earliest, s, &s->earliest_index);
prioq_reshuffle(d->latest, s, &s->latest_index);
d->needs_rearm = true;
}
static void event_source_time_prioq_remove(
sd_event_source *s,
struct clock_data *d) {
assert(s);
assert(d);
prioq_remove(d->earliest, s, &s->earliest_index);
prioq_remove(d->latest, s, &s->latest_index);
s->earliest_index = s->latest_index = PRIOQ_IDX_NULL;
d->needs_rearm = true;
}
@ -745,17 +831,18 @@ static void source_disconnect(sd_event_source *s) {
case SOURCE_TIME_BOOTTIME:
case SOURCE_TIME_MONOTONIC:
case SOURCE_TIME_REALTIME_ALARM:
case SOURCE_TIME_BOOTTIME_ALARM: {
struct clock_data *d;
case SOURCE_TIME_BOOTTIME_ALARM:
/* Only remove this event source from the time event source here if it is not ratelimited. If
* it is ratelimited, we'll remove it below, separately. Why? Because the clock used might
* differ: ratelimiting always uses CLOCK_MONOTONIC, but timer events might use any clock */
d = event_get_clock_data(s->event, s->type);
assert(d);
if (!s->ratelimited) {
struct clock_data *d;
assert_se(d = event_get_clock_data(s->event, s->type));
event_source_time_prioq_remove(s, d);
}
prioq_remove(d->earliest, s, &s->time.earliest_index);
prioq_remove(d->latest, s, &s->time.latest_index);
d->needs_rearm = true;
break;
}
case SOURCE_SIGNAL:
if (s->signal.sig > 0) {
@ -770,9 +857,9 @@ static void source_disconnect(sd_event_source *s) {
case SOURCE_CHILD:
if (s->child.pid > 0) {
if (s->enabled != SD_EVENT_OFF) {
assert(s->event->n_enabled_child_sources > 0);
s->event->n_enabled_child_sources--;
if (event_source_is_online(s)) {
assert(s->event->n_online_child_sources > 0);
s->event->n_online_child_sources--;
}
(void) hashmap_remove(s->event->child_sources, PID_TO_PTR(s->child.pid));
@ -842,6 +929,9 @@ static void source_disconnect(sd_event_source *s) {
if (s->prepare)
prioq_remove(s->event->prepare, s, &s->prepare_index);
if (s->ratelimited)
event_source_time_prioq_remove(s, &s->event->monotonic);
event = TAKE_PTR(s->event);
LIST_REMOVE(sources, event->sources, s);
event->n_sources--;
@ -1088,6 +1178,52 @@ static int time_exit_callback(sd_event_source *s, uint64_t usec, void *userdata)
return sd_event_exit(sd_event_source_get_event(s), PTR_TO_INT(userdata));
}
static int setup_clock_data(sd_event *e, struct clock_data *d, clockid_t clock) {
int r;
assert(d);
if (d->fd < 0) {
r = event_setup_timer_fd(e, d, clock);
if (r < 0)
return r;
}
r = prioq_ensure_allocated(&d->earliest, earliest_time_prioq_compare);
if (r < 0)
return r;
r = prioq_ensure_allocated(&d->latest, latest_time_prioq_compare);
if (r < 0)
return r;
return 0;
}
static int event_source_time_prioq_put(
sd_event_source *s,
struct clock_data *d) {
int r;
assert(s);
assert(d);
r = prioq_put(d->earliest, s, &s->earliest_index);
if (r < 0)
return r;
r = prioq_put(d->latest, s, &s->latest_index);
if (r < 0) {
assert_se(prioq_remove(d->earliest, s, &s->earliest_index) > 0);
s->earliest_index = PRIOQ_IDX_NULL;
return r;
}
d->needs_rearm = true;
return 0;
}
_public_ int sd_event_add_time(
sd_event *e,
sd_event_source **ret,
@ -1118,23 +1254,12 @@ _public_ int sd_event_add_time(
if (!callback)
callback = time_exit_callback;
d = event_get_clock_data(e, type);
assert(d);
assert_se(d = event_get_clock_data(e, type));
r = prioq_ensure_allocated(&d->earliest, earliest_time_prioq_compare);
r = setup_clock_data(e, d, clock);
if (r < 0)
return r;
r = prioq_ensure_allocated(&d->latest, latest_time_prioq_compare);
if (r < 0)
return r;
if (d->fd < 0) {
r = event_setup_timer_fd(e, d, clock);
if (r < 0)
return r;
}
s = source_new(e, !ret, type);
if (!s)
return -ENOMEM;
@ -1142,17 +1267,11 @@ _public_ int sd_event_add_time(
s->time.next = usec;
s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
s->time.callback = callback;
s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
s->earliest_index = s->latest_index = PRIOQ_IDX_NULL;
s->userdata = userdata;
s->enabled = SD_EVENT_ONESHOT;
d->needs_rearm = true;
r = prioq_put(d->earliest, s, &s->time.earliest_index);
if (r < 0)
return r;
r = prioq_put(d->latest, s, &s->time.latest_index);
r = event_source_time_prioq_put(s, d);
if (r < 0)
return r;
@ -1285,7 +1404,7 @@ _public_ int sd_event_add_child(
if (!callback)
callback = child_exit_callback;
if (e->n_enabled_child_sources == 0) {
if (e->n_online_child_sources == 0) {
/* Caller must block SIGCHLD before using us to watch children, even if pidfd is available,
* for compatibility with pre-pidfd and because we don't want the reap the child processes
* ourselves, i.e. call waitid(), and don't want Linux' default internal logic for that to
@ -1350,7 +1469,7 @@ _public_ int sd_event_add_child(
e->need_process_child = true;
}
e->n_enabled_child_sources++;
e->n_online_child_sources++;
if (ret)
*ret = s;
@ -1382,7 +1501,7 @@ _public_ int sd_event_add_child_pidfd(
if (!callback)
callback = child_exit_callback;
if (e->n_enabled_child_sources == 0) {
if (e->n_online_child_sources == 0) {
r = signal_is_blocked(SIGCHLD);
if (r < 0)
return r;
@ -1432,7 +1551,7 @@ _public_ int sd_event_add_child_pidfd(
e->need_process_child = true;
}
e->n_enabled_child_sources++;
e->n_online_child_sources++;
if (ret)
*ret = s;
@ -2018,7 +2137,7 @@ _public_ int sd_event_source_set_io_fd(sd_event_source *s, int fd) {
if (s->io.fd == fd)
return 0;
if (s->enabled == SD_EVENT_OFF) {
if (event_source_is_offline(s)) {
s->io.fd = fd;
s->io.registered = false;
} else {
@ -2085,7 +2204,7 @@ _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events)
if (r < 0)
return r;
if (s->enabled != SD_EVENT_OFF) {
if (event_source_is_online(s)) {
r = source_io_register(s, s->enabled, events);
if (r < 0)
return r;
@ -2188,7 +2307,7 @@ _public_ int sd_event_source_set_priority(sd_event_source *s, int64_t priority)
event_gc_inode_data(s->event, old_inode_data);
} else if (s->type == SOURCE_SIGNAL && s->enabled != SD_EVENT_OFF) {
} else if (s->type == SOURCE_SIGNAL && event_source_is_online(s)) {
struct signal_data *old, *d;
/* Move us from the signalfd belonging to the old
@ -2225,29 +2344,39 @@ fail:
return r;
}
_public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
_public_ int sd_event_source_get_enabled(sd_event_source *s, int *ret) {
assert_return(s, -EINVAL);
assert_return(!event_pid_changed(s->event), -ECHILD);
if (m)
*m = s->enabled;
if (ret)
*ret = s->enabled;
return s->enabled != SD_EVENT_OFF;
}
static int event_source_disable(sd_event_source *s) {
static int event_source_offline(
sd_event_source *s,
int enabled,
bool ratelimited) {
bool was_offline;
int r;
assert(s);
assert(s->enabled != SD_EVENT_OFF);
assert(enabled == SD_EVENT_OFF || ratelimited);
/* Unset the pending flag when this event source is disabled */
if (!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
if (s->enabled != SD_EVENT_OFF &&
enabled == SD_EVENT_OFF &&
!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
r = source_set_pending(s, false);
if (r < 0)
return r;
}
s->enabled = SD_EVENT_OFF;
was_offline = event_source_is_offline(s);
s->enabled = enabled;
s->ratelimited = ratelimited;
switch (s->type) {
@ -2268,8 +2397,10 @@ static int event_source_disable(sd_event_source *s) {
break;
case SOURCE_CHILD:
assert(s->event->n_enabled_child_sources > 0);
s->event->n_enabled_child_sources--;
if (!was_offline) {
assert(s->event->n_online_child_sources > 0);
s->event->n_online_child_sources--;
}
if (EVENT_SOURCE_WATCH_PIDFD(s))
source_child_pidfd_unregister(s);
@ -2290,26 +2421,42 @@ static int event_source_disable(sd_event_source *s) {
assert_not_reached("Wut? I shouldn't exist.");
}
return 0;
return 1;
}
static int event_source_enable(sd_event_source *s, int enable) {
static int event_source_online(
sd_event_source *s,
int enabled,
bool ratelimited) {
bool was_online;
int r;
assert(s);
assert(IN_SET(enable, SD_EVENT_ON, SD_EVENT_ONESHOT));
assert(s->enabled == SD_EVENT_OFF);
assert(enabled != SD_EVENT_OFF || !ratelimited);
/* Unset the pending flag when this event source is enabled */
if (!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
if (s->enabled == SD_EVENT_OFF &&
enabled != SD_EVENT_OFF &&
!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
r = source_set_pending(s, false);
if (r < 0)
return r;
}
/* Are we really ready for onlining? */
if (enabled == SD_EVENT_OFF || ratelimited) {
/* Nope, we are not ready for onlining, then just update the precise state and exit */
s->enabled = enabled;
s->ratelimited = ratelimited;
return 0;
}
was_online = event_source_is_online(s);
switch (s->type) {
case SOURCE_IO:
r = source_io_register(s, enable, s->io.events);
r = source_io_register(s, enabled, s->io.events);
if (r < 0)
return r;
break;
@ -2327,7 +2474,7 @@ static int event_source_enable(sd_event_source *s, int enable) {
if (EVENT_SOURCE_WATCH_PIDFD(s)) {
/* yes, we have pidfd */
r = source_child_pidfd_register(s, enable);
r = source_child_pidfd_register(s, enabled);
if (r < 0)
return r;
} else {
@ -2340,8 +2487,8 @@ static int event_source_enable(sd_event_source *s, int enable) {
}
}
s->event->n_enabled_child_sources++;
if (!was_online)
s->event->n_online_child_sources++;
break;
case SOURCE_TIME_REALTIME:
@ -2359,7 +2506,8 @@ static int event_source_enable(sd_event_source *s, int enable) {
assert_not_reached("Wut? I shouldn't exist.");
}
s->enabled = enable;
s->enabled = enabled;
s->ratelimited = ratelimited;
/* Non-failing operations below */
switch (s->type) {
@ -2379,7 +2527,7 @@ static int event_source_enable(sd_event_source *s, int enable) {
break;
}
return 0;
return 1;
}
_public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
@ -2397,7 +2545,7 @@ _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
return 0;
if (m == SD_EVENT_OFF)
r = event_source_disable(s);
r = event_source_offline(s, m, s->ratelimited);
else {
if (s->enabled != SD_EVENT_OFF) {
/* Switching from "on" to "oneshot" or back? If that's the case, we can take a shortcut, the
@ -2406,7 +2554,7 @@ _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
return 0;
}
r = event_source_enable(s, m);
r = event_source_online(s, m, s->ratelimited);
}
if (r < 0)
return r;
@ -2663,6 +2811,96 @@ _public_ void *sd_event_source_set_userdata(sd_event_source *s, void *userdata)
return ret;
}
static int event_source_enter_ratelimited(sd_event_source *s) {
int r;
assert(s);
/* When an event source becomes ratelimited, we place it in the CLOCK_MONOTONIC priority queue, with
* the end of the rate limit time window, much as if it was a timer event source. */
if (s->ratelimited)
return 0; /* Already ratelimited, this is a NOP hence */
/* Make sure we can install a CLOCK_MONOTONIC event further down. */
r = setup_clock_data(s->event, &s->event->monotonic, CLOCK_MONOTONIC);
if (r < 0)
return r;
/* Timer event sources are already using the earliest/latest queues for the timer scheduling. Let's
* first remove them from the prioq appropriate for their own clock, so that we can use the prioq
* fields of the event source then for adding it to the CLOCK_MONOTONIC prioq instead. */
if (EVENT_SOURCE_IS_TIME(s->type))
event_source_time_prioq_remove(s, event_get_clock_data(s->event, s->type));
/* Now, let's add the event source to the monotonic clock instead */
r = event_source_time_prioq_put(s, &s->event->monotonic);
if (r < 0)
goto fail;
/* And let's take the event source officially offline */
r = event_source_offline(s, s->enabled, /* ratelimited= */ true);
if (r < 0) {
event_source_time_prioq_remove(s, &s->event->monotonic);
goto fail;
}
event_source_pp_prioq_reshuffle(s);
log_debug("Event source %p (%s) entered rate limit state.", s, strna(s->description));
return 0;
fail:
/* Reinstall time event sources in the priority queue as before. This shouldn't fail, since the queue
* space for it should already be allocated. */
if (EVENT_SOURCE_IS_TIME(s->type))
assert_se(event_source_time_prioq_put(s, event_get_clock_data(s->event, s->type)) >= 0);
return r;
}
static int event_source_leave_ratelimit(sd_event_source *s) {
int r;
assert(s);
if (!s->ratelimited)
return 0;
/* Let's take the event source out of the monotonic prioq first. */
event_source_time_prioq_remove(s, &s->event->monotonic);
/* Let's then add the event source to its native clock prioq again — if this is a timer event source */
if (EVENT_SOURCE_IS_TIME(s->type)) {
r = event_source_time_prioq_put(s, event_get_clock_data(s->event, s->type));
if (r < 0)
goto fail;
}
/* Let's try to take it online again. */
r = event_source_online(s, s->enabled, /* ratelimited= */ false);
if (r < 0) {
/* Do something roughly sensible when this failed: undo the two prioq ops above */
if (EVENT_SOURCE_IS_TIME(s->type))
event_source_time_prioq_remove(s, event_get_clock_data(s->event, s->type));
goto fail;
}
event_source_pp_prioq_reshuffle(s);
ratelimit_reset(&s->rate_limit);
log_debug("Event source %p (%s) left rate limit state.", s, strna(s->description));
return 0;
fail:
/* Do something somewhat reasonable when we cannot move an event sources out of ratelimited mode:
* simply put it back in it, maybe we can then process it more successfully next iteration. */
assert_se(event_source_time_prioq_put(s, &s->event->monotonic) >= 0);
return r;
}
static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
usec_t c;
assert(e);
@ -2760,7 +2998,7 @@ static int event_arm_timer(
d->needs_rearm = false;
a = prioq_peek(d->earliest);
if (!a || a->enabled == SD_EVENT_OFF || a->time.next == USEC_INFINITY) {
if (!a || a->enabled == SD_EVENT_OFF || time_event_source_next(a) == USEC_INFINITY) {
if (d->fd < 0)
return 0;
@ -2779,7 +3017,7 @@ static int event_arm_timer(
b = prioq_peek(d->latest);
assert_se(b && b->enabled != SD_EVENT_OFF);
t = sleep_between(e, a->time.next, time_event_source_latest(b));
t = sleep_between(e, time_event_source_next(a), time_event_source_latest(b));
if (d->next == t)
return 0;
@ -2857,10 +3095,22 @@ static int process_timer(
for (;;) {
s = prioq_peek(d->earliest);
if (!s ||
s->time.next > n ||
s->enabled == SD_EVENT_OFF ||
s->pending)
if (!s || time_event_source_next(s) > n)
break;
if (s->ratelimited) {
/* This is an event sources whose ratelimit window has ended. Let's turn it on
* again. */
assert(s->ratelimited);
r = event_source_leave_ratelimit(s);
if (r < 0)
return r;
continue;
}
if (s->enabled == SD_EVENT_OFF || s->pending)
break;
r = source_set_pending(s, true);
@ -2905,7 +3155,7 @@ static int process_child(sd_event *e) {
if (s->pending)
continue;
if (s->enabled == SD_EVENT_OFF)
if (event_source_is_offline(s))
continue;
if (s->child.exited)
@ -2952,7 +3202,7 @@ static int process_pidfd(sd_event *e, sd_event_source *s, uint32_t revents) {
if (s->pending)
return 0;
if (s->enabled == SD_EVENT_OFF)
if (event_source_is_offline(s))
return 0;
if (!EVENT_SOURCE_WATCH_PIDFD(s))
@ -3112,7 +3362,7 @@ static int event_inotify_data_process(sd_event *e, struct inotify_data *d) {
LIST_FOREACH(inotify.by_inode_data, s, inode_data->event_sources) {
if (s->enabled == SD_EVENT_OFF)
if (event_source_is_offline(s))
continue;
r = source_set_pending(s, true);
@ -3148,7 +3398,7 @@ static int event_inotify_data_process(sd_event *e, struct inotify_data *d) {
* sources if IN_IGNORED or IN_UNMOUNT is set. */
LIST_FOREACH(inotify.by_inode_data, s, inode_data->event_sources) {
if (s->enabled == SD_EVENT_OFF)
if (event_source_is_offline(s))
continue;
if ((d->buffer.ev.mask & (IN_IGNORED|IN_UNMOUNT)) == 0 &&
@ -3202,6 +3452,16 @@ static int source_dispatch(sd_event_source *s) {
* callback might have invalidated/disconnected the event source. */
saved_event = sd_event_ref(s->event);
/* Check if we hit the ratelimit for this event source, if so, let's disable it. */
assert(!s->ratelimited);
if (!ratelimit_below(&s->rate_limit)) {
r = event_source_enter_ratelimited(s);
if (r < 0)
return r;
return 1;
}
if (!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
r = source_set_pending(s, false);
if (r < 0)
@ -3215,7 +3475,7 @@ static int source_dispatch(sd_event_source *s) {
* post sources as pending */
SET_FOREACH(z, s->event->post_sources) {
if (z->enabled == SD_EVENT_OFF)
if (event_source_is_offline(z))
continue;
r = source_set_pending(z, true);
@ -3335,7 +3595,7 @@ static int event_prepare(sd_event *e) {
sd_event_source *s;
s = prioq_peek(e->prepare);
if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
if (!s || s->prepare_iteration == e->iteration || event_source_is_offline(s))
break;
s->prepare_iteration = e->iteration;
@ -3370,18 +3630,17 @@ static int event_prepare(sd_event *e) {
static int dispatch_exit(sd_event *e) {
sd_event_source *p;
_cleanup_(sd_event_unrefp) sd_event *ref = NULL;
int r;
assert(e);
p = prioq_peek(e->exit);
if (!p || p->enabled == SD_EVENT_OFF) {
if (!p || event_source_is_offline(p)) {
e->state = SD_EVENT_FINISHED;
return 0;
}
ref = sd_event_ref(e);
_unused_ _cleanup_(sd_event_unrefp) sd_event *ref = sd_event_ref(e);
e->iteration++;
e->state = SD_EVENT_EXITING;
r = source_dispatch(p);
@ -3398,7 +3657,7 @@ static sd_event_source* event_next_pending(sd_event *e) {
if (!p)
return NULL;
if (p->enabled == SD_EVENT_OFF)
if (event_source_is_offline(p))
return NULL;
return p;
@ -3477,6 +3736,9 @@ _public_ int sd_event_prepare(sd_event *e) {
* syscalls */
assert_return(!e->default_event_ptr || e->tid == gettid(), -EREMOTEIO);
/* Make sure that none of the preparation callbacks ends up freeing the event source under our feet */
_unused_ _cleanup_(sd_event_unrefp) sd_event *ref = sd_event_ref(e);
if (e->exit_requested)
goto pending;
@ -3682,9 +3944,8 @@ _public_ int sd_event_dispatch(sd_event *e) {
p = event_next_pending(e);
if (p) {
_cleanup_(sd_event_unrefp) sd_event *ref = NULL;
_unused_ _cleanup_(sd_event_unrefp) sd_event *ref = sd_event_ref(e);
ref = sd_event_ref(e);
e->state = SD_EVENT_RUNNING;
r = source_dispatch(p);
e->state = SD_EVENT_INITIAL;
@ -3718,29 +3979,32 @@ _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
assert_return(e->state == SD_EVENT_INITIAL, -EBUSY);
if (e->profile_delays && e->last_run) {
if (e->profile_delays && e->last_run_usec != 0) {
usec_t this_run;
unsigned l;
this_run = now(CLOCK_MONOTONIC);
l = u64log2(this_run - e->last_run);
l = u64log2(this_run - e->last_run_usec);
assert(l < ELEMENTSOF(e->delays));
e->delays[l]++;
if (this_run - e->last_log >= 5*USEC_PER_SEC) {
if (this_run - e->last_log_usec >= 5*USEC_PER_SEC) {
event_log_delays(e);
e->last_log = this_run;
e->last_log_usec = this_run;
}
}
/* Make sure that none of the preparation callbacks ends up freeing the event source under our feet */
_unused_ _cleanup_(sd_event_unrefp) sd_event *ref = sd_event_ref(e);
r = sd_event_prepare(e);
if (r == 0)
/* There was nothing? Then wait... */
r = sd_event_wait(e, timeout);
if (e->profile_delays)
e->last_run = now(CLOCK_MONOTONIC);
e->last_run_usec = now(CLOCK_MONOTONIC);
if (r > 0) {
/* There's something now, then let's dispatch it */
@ -3755,7 +4019,6 @@ _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
}
_public_ int sd_event_loop(sd_event *e) {
_cleanup_(sd_event_unrefp) sd_event *ref = NULL;
int r;
assert_return(e, -EINVAL);
@ -3763,7 +4026,7 @@ _public_ int sd_event_loop(sd_event *e) {
assert_return(!event_pid_changed(e), -ECHILD);
assert_return(e->state == SD_EVENT_INITIAL, -EBUSY);
ref = sd_event_ref(e);
_unused_ _cleanup_(sd_event_unrefp) sd_event *ref = NULL;
while (e->state != SD_EVENT_FINISHED) {
r = sd_event_run(e, (uint64_t) -1);
@ -4008,3 +4271,53 @@ _public_ int sd_event_source_set_exit_on_failure(sd_event_source *s, int b) {
s->exit_on_failure = b;
return 1;
}
_public_ int sd_event_source_set_ratelimit(sd_event_source *s, uint64_t interval, unsigned burst) {
int r;
assert_return(s, -EINVAL);
/* Turning on ratelimiting on event source types that don't support it, is a loggable offense. Doing
* so is a programming error. */
assert_return(EVENT_SOURCE_CAN_RATE_LIMIT(s->type), -EDOM);
/* When ratelimiting is configured we'll always reset the rate limit state first and start fresh,
* non-ratelimited. */
r = event_source_leave_ratelimit(s);
if (r < 0)
return r;
s->rate_limit = (RateLimit) { interval, burst };
return 0;
}
_public_ int sd_event_source_get_ratelimit(sd_event_source *s, uint64_t *ret_interval, unsigned *ret_burst) {
assert_return(s, -EINVAL);
/* Querying whether an event source has ratelimiting configured is not a loggable offsense, hence
* don't use assert_return(). Unlike turning on ratelimiting it's not really a programming error */
if (!EVENT_SOURCE_CAN_RATE_LIMIT(s->type))
return -EDOM;
if (!ratelimit_configured(&s->rate_limit))
return -ENOEXEC;
if (ret_interval)
*ret_interval = s->rate_limit.interval;
if (ret_burst)
*ret_burst = s->rate_limit.burst;
return 0;
}
_public_ int sd_event_source_is_ratelimited(sd_event_source *s) {
assert_return(s, -EINVAL);
if (!EVENT_SOURCE_CAN_RATE_LIMIT(s->type))
return false;
if (!ratelimit_configured(&s->rate_limit))
return false;
return s->ratelimited;
}

View File

@ -589,8 +589,100 @@ static void test_pidfd(void) {
sd_event_unref(e);
}
static int ratelimit_io_handler(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
unsigned *c = (unsigned*) userdata;
*c += 1;
return 0;
}
static int ratelimit_time_handler(sd_event_source *s, uint64_t usec, void *userdata) {
int r;
r = sd_event_source_set_enabled(s, SD_EVENT_ON);
if (r < 0)
log_warning_errno(r, "Failed to turn on notify event source: %m");
r = sd_event_source_set_time(s, usec + 1000);
if (r < 0)
log_error_errno(r, "Failed to restart watchdog event source: %m");
unsigned *c = (unsigned*) userdata;
*c += 1;
return 0;
}
static void test_ratelimit(void) {
_cleanup_close_pair_ int p[2] = {-1, -1};
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
_cleanup_(sd_event_source_unrefp) sd_event_source *s = NULL;
uint64_t interval;
unsigned count, burst;
assert_se(sd_event_default(&e) >= 0);
assert_se(pipe2(p, O_CLOEXEC|O_NONBLOCK) >= 0);
assert_se(sd_event_add_io(e, &s, p[0], EPOLLIN, ratelimit_io_handler, &count) >= 0);
assert_se(sd_event_source_set_description(s, "test-ratelimit-io") >= 0);
assert_se(sd_event_source_set_ratelimit(s, 1 * USEC_PER_SEC, 5) >= 0);
assert_se(sd_event_source_get_ratelimit(s, &interval, &burst) >= 0);
assert_se(interval == 1 * USEC_PER_SEC && burst == 5);
assert_se(write(p[1], "1", 1) == 1);
count = 0;
for (unsigned i = 0; i < 10; i++) {
log_debug("slow loop iteration %u", i);
assert_se(sd_event_run(e, UINT64_MAX) >= 0);
assert_se(usleep(250 * USEC_PER_MSEC) >= 0);
}
assert_se(sd_event_source_is_ratelimited(s) == 0);
assert_se(count == 10);
log_info("ratelimit_io_handler: called %d times, event source not ratelimited", count);
assert_se(sd_event_source_set_ratelimit(s, 0, 0) >= 0);
assert_se(sd_event_source_set_ratelimit(s, 1 * USEC_PER_SEC, 5) >= 0);
count = 0;
for (unsigned i = 0; i < 10; i++) {
log_debug("fast event loop iteration %u", i);
assert_se(sd_event_run(e, UINT64_MAX) >= 0);
assert_se(usleep(10) >= 0);
}
log_info("ratelimit_io_handler: called %d times, event source got ratelimited", count);
assert_se(count < 10);
s = sd_event_source_unref(s);
safe_close_pair(p);
count = 0;
assert_se(sd_event_add_time_relative(e, &s, CLOCK_MONOTONIC, 1000, 1, ratelimit_time_handler, &count) >= 0);
assert_se(sd_event_source_set_ratelimit(s, 1 * USEC_PER_SEC, 10) == 0);
do {
assert_se(sd_event_run(e, UINT64_MAX) >= 0);
} while (!sd_event_source_is_ratelimited(s));
log_info("ratelimit_time_handler: called %d times, event source got ratelimited", count);
assert_se(count == 10);
/* In order to get rid of active rate limit client needs to disable it explicitely */
assert_se(sd_event_source_set_ratelimit(s, 0, 0) >= 0);
assert_se(!sd_event_source_is_ratelimited(s));
assert_se(sd_event_source_set_ratelimit(s, 1 * USEC_PER_SEC, 10) >= 0);
do {
assert_se(sd_event_run(e, UINT64_MAX) >= 0);
} while (!sd_event_source_is_ratelimited(s));
log_info("ratelimit_time_handler: called 10 more times, event source got ratelimited");
assert_se(count == 20);
}
int main(int argc, char *argv[]) {
test_setup_logging(LOG_INFO);
test_setup_logging(LOG_DEBUG);
test_basic(true); /* test with pidfd */
test_basic(false); /* test without pidfd */
@ -603,5 +695,7 @@ int main(int argc, char *argv[]) {
test_pidfd();
test_ratelimit();
return 0;
}

View File

@ -162,6 +162,9 @@ int sd_event_source_get_floating(sd_event_source *s);
int sd_event_source_set_floating(sd_event_source *s, int b);
int sd_event_source_get_exit_on_failure(sd_event_source *s);
int sd_event_source_set_exit_on_failure(sd_event_source *s, int b);
int sd_event_source_set_ratelimit(sd_event_source *s, uint64_t interval_usec, unsigned burst);
int sd_event_source_get_ratelimit(sd_event_source *s, uint64_t *ret_interval_usec, unsigned *ret_burst);
int sd_event_source_is_ratelimited(sd_event_source *s);
/* Define helpers so that __attribute__((cleanup(sd_event_unrefp))) and similar may be used. */
_SD_DEFINE_POINTER_CLEANUP_FUNC(sd_event, sd_event_unref);