diff --git a/man/rules/meson.build b/man/rules/meson.build index cacbbd75bc..97136fe758 100644 --- a/man/rules/meson.build +++ b/man/rules/meson.build @@ -582,6 +582,10 @@ manpages = [ 'SD_EVENT_PRIORITY_NORMAL', 'sd_event_source_get_priority'], ''], + ['sd_event_source_set_ratelimit', + '3', + ['sd_event_source_get_ratelimit', 'sd_event_source_is_ratelimited'], + ''], ['sd_event_source_set_userdata', '3', ['sd_event_source_get_userdata'], ''], ['sd_event_source_unref', '3', diff --git a/man/sd-event.xml b/man/sd-event.xml index a28c9b87ec..1bcf4e32a5 100644 --- a/man/sd-event.xml +++ b/man/sd-event.xml @@ -56,6 +56,7 @@ sd_event_source_get_pending3, sd_event_source_set_description3, sd_event_source_set_prepare3, + sd_event_source_set_ratelimit3, sd_event_wait3, sd_event_get_fd3, sd_event_set_watchdog3, @@ -147,6 +148,7 @@ sd_event_source_get_pending3, sd_event_source_set_description3, sd_event_source_set_prepare3, + sd_event_source_set_ratelimit3, sd_event_wait3, sd_event_get_fd3, sd_event_set_watchdog3, diff --git a/man/sd_event_source_set_enabled.xml b/man/sd_event_source_set_enabled.xml index cf00695fda..c8ae169c17 100644 --- a/man/sd_event_source_set_enabled.xml +++ b/man/sd_event_source_set_enabled.xml @@ -147,7 +147,8 @@ sd_event_add_child3, sd_event_add_inotify3, sd_event_add_defer3, - sd_event_source_unref3 + sd_event_source_unref3, + sd_event_source_set_ratelimit3 diff --git a/man/sd_event_source_set_ratelimit.xml b/man/sd_event_source_set_ratelimit.xml new file mode 100644 index 0000000000..754bfbe3b2 --- /dev/null +++ b/man/sd_event_source_set_ratelimit.xml @@ -0,0 +1,148 @@ + + + + + + + + sd_event_source_set_ratelimit + systemd + + + + sd_event_source_set_ratelimit + 3 + + + + sd_event_source_set_ratelimit + sd_event_source_get_ratelimit + sd_event_source_is_ratelimited + + Configure rate limiting on event sources + + + + + #include <systemd/sd-event.h> + + + int sd_event_source_set_ratelimit + sd_event_source *source + uint64_t interval_usec + unsigned burst + + + + int sd_event_source_get_ratelimit + sd_event_source *source + uint64_t* ret_interval_usec + unsigned* ret_burst + + + + int sd_event_source_is_ratelimited + sd_event_source *source + + + + + + + Description + + sd_event_source_set_ratelimit() may be used to enforce rate limiting on an + event source. When used an event source will be temporarily turned off when it fires more often then a + specified burst number within a specified time interval. This is useful as simple mechanism to avoid + event source starvation if high priority event sources fire very frequently. + + Pass the event source to operate on as first argument, a time interval in microseconds as second + argument and a maximum dispatch limit ("burst") as third parameter. Whenever the event source is + dispatched more often than the specified burst within the specified interval it is placed in a mode + similar to being disabled with + sd_event_source_set_enabled3 + and the SD_EVENT_OFF parameter. However it is disabled only temporarily – once the + specified interval is over regular operation resumes. It is again disabled temporarily once the specified rate + limiting is hit the next time. If either the interval or the burst value are specified as zero, rate + limiting is turned off. By default event sources do not have rate limiting enabled. Note that rate + limiting and disabling via sd_event_source_set_enabled() are independent of each + other, and an event source will only effect event loop wake-ups and is dispatched while it both is + enabled and rate limiting is not in effect. + + sd_event_source_get_ratelimit() may be used to query the current rate limiting + parameters set on the event source object source. The previously set interval and + burst vales are returned in the second and third argument. + + sd_event_source_is_ratelimited() may be used to query whether the event source + is currently affected by rate limiting, i.e. it has recently hit the rate limit and is currently + temporarily disabled due to that. + + Rate limiting is currently implemented for I/O, timer, signal, defer and inotify event + sources. + + + + Return Value + + On success, sd_event_source_set_ratelimit() and + sd_event_source_get_ratelimit() return a non-negative integer. On failure, they + return a negative errno-style error code. sd_event_source_is_ratelimited returns + zero if rate limiting is currently not in effect and greater than zero if it is in effect; it returns a + negative errno-style error code on failure. + + + Errors + + Returned errors may indicate the following problems: + + + + -EINVAL + + source is not a valid pointer to an + sd_event_source object. + + + + + -ECHILD + + The event loop has been created in a different process. + + + + -EDOM + + It was attempted to use the rate limiting feature on an event source type that does + not support rate limiting. + + + + -ENOEXEC + + sd_event_source_get_ratelimit() was called on a event source + that doesn't have rate limiting configured. + + + + + + + + + + See Also + + + sd-event3, + sd_event_add_io3, + sd_event_add_time3, + sd_event_add_signal3, + sd_event_add_inotify3, + sd_event_add_defer3, + sd_event_source_set_enabled3 + + + + diff --git a/src/core/mount.c b/src/core/mount.c index e672796694..5479cf7bf2 100644 --- a/src/core/mount.c +++ b/src/core/mount.c @@ -1855,6 +1855,12 @@ static void mount_enumerate(Manager *m) { goto fail; } + r = sd_event_source_set_ratelimit(m->mount_event_source, 1 * USEC_PER_SEC, 5); + if (r < 0) { + log_error_errno(r, "Failed to enable rate limit for mount events: %m"); + goto fail; + } + (void) sd_event_source_set_description(m->mount_event_source, "mount-monitor-dispatch"); } diff --git a/src/libsystemd/libsystemd.sym b/src/libsystemd/libsystemd.sym index f83b364c96..b03bcd952f 100644 --- a/src/libsystemd/libsystemd.sym +++ b/src/libsystemd/libsystemd.sym @@ -736,3 +736,10 @@ global: sd_device_has_current_tag; sd_device_set_sysattr_valuef; } LIBSYSTEMD_246; + +LIBSYSTEMD_248 { +global: + sd_event_source_set_ratelimit; + sd_event_source_get_ratelimit; + sd_event_source_is_ratelimited; +} LIBSYSTEMD_246; diff --git a/src/libsystemd/sd-event/event-source.h b/src/libsystemd/sd-event/event-source.h index 62d07187a2..f0d2a1b9e6 100644 --- a/src/libsystemd/sd-event/event-source.h +++ b/src/libsystemd/sd-event/event-source.h @@ -11,6 +11,7 @@ #include "hashmap.h" #include "list.h" #include "prioq.h" +#include "ratelimit.h" typedef enum EventSourceType { SOURCE_IO, @@ -61,6 +62,7 @@ struct sd_event_source { bool dispatching:1; bool floating:1; bool exit_on_failure:1; + bool ratelimited:1; int64_t priority; unsigned pending_index; @@ -72,6 +74,13 @@ struct sd_event_source { LIST_FIELDS(sd_event_source, sources); + RateLimit rate_limit; + + /* These are primarily fields relevant for time event sources, but since any event source can + * effectively become one when rate-limited, this is part of the common fields. */ + unsigned earliest_index; + unsigned latest_index; + union { struct { sd_event_io_handler_t callback; @@ -84,8 +93,6 @@ struct sd_event_source { struct { sd_event_time_handler_t callback; usec_t next, accuracy; - unsigned earliest_index; - unsigned latest_index; } time; struct { sd_event_signal_handler_t callback; diff --git a/src/libsystemd/sd-event/sd-event.c b/src/libsystemd/sd-event/sd-event.c index 789a8c7df4..3f1a6776fe 100644 --- a/src/libsystemd/sd-event/sd-event.c +++ b/src/libsystemd/sd-event/sd-event.c @@ -37,6 +37,16 @@ static bool EVENT_SOURCE_WATCH_PIDFD(sd_event_source *s) { s->child.options == WEXITED; } +static bool event_source_is_online(sd_event_source *s) { + assert(s); + return s->enabled != SD_EVENT_OFF && !s->ratelimited; +} + +static bool event_source_is_offline(sd_event_source *s) { + assert(s); + return s->enabled == SD_EVENT_OFF || s->ratelimited; +} + static const char* const event_source_type_table[_SOURCE_EVENT_SOURCE_TYPE_MAX] = { [SOURCE_IO] = "io", [SOURCE_TIME_REALTIME] = "realtime", @@ -55,7 +65,25 @@ static const char* const event_source_type_table[_SOURCE_EVENT_SOURCE_TYPE_MAX] DEFINE_PRIVATE_STRING_TABLE_LOOKUP_TO_STRING(event_source_type, int); -#define EVENT_SOURCE_IS_TIME(t) IN_SET((t), SOURCE_TIME_REALTIME, SOURCE_TIME_BOOTTIME, SOURCE_TIME_MONOTONIC, SOURCE_TIME_REALTIME_ALARM, SOURCE_TIME_BOOTTIME_ALARM) +#define EVENT_SOURCE_IS_TIME(t) \ + IN_SET((t), \ + SOURCE_TIME_REALTIME, \ + SOURCE_TIME_BOOTTIME, \ + SOURCE_TIME_MONOTONIC, \ + SOURCE_TIME_REALTIME_ALARM, \ + SOURCE_TIME_BOOTTIME_ALARM) + +#define EVENT_SOURCE_CAN_RATE_LIMIT(t) \ + IN_SET((t), \ + SOURCE_IO, \ + SOURCE_TIME_REALTIME, \ + SOURCE_TIME_BOOTTIME, \ + SOURCE_TIME_MONOTONIC, \ + SOURCE_TIME_REALTIME_ALARM, \ + SOURCE_TIME_BOOTTIME_ALARM, \ + SOURCE_SIGNAL, \ + SOURCE_DEFER, \ + SOURCE_INOTIFY) struct sd_event { unsigned n_ref; @@ -81,7 +109,7 @@ struct sd_event { Hashmap *signal_data; /* indexed by priority */ Hashmap *child_sources; - unsigned n_enabled_child_sources; + unsigned n_online_child_sources; Set *post_sources; @@ -120,7 +148,7 @@ struct sd_event { LIST_HEAD(sd_event_source, sources); - usec_t last_run, last_log; + usec_t last_run_usec, last_log_usec; unsigned delays[sizeof(usec_t) * 8]; }; @@ -146,6 +174,11 @@ static int pending_prioq_compare(const void *a, const void *b) { if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF) return 1; + /* Non rate-limited ones first. */ + r = CMP(!!x->ratelimited, !!y->ratelimited); + if (r != 0) + return r; + /* Lower priority values first */ r = CMP(x->priority, y->priority); if (r != 0) @@ -168,6 +201,11 @@ static int prepare_prioq_compare(const void *a, const void *b) { if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF) return 1; + /* Non rate-limited ones first. */ + r = CMP(!!x->ratelimited, !!y->ratelimited); + if (r != 0) + return r; + /* Move most recently prepared ones last, so that we can stop * preparing as soon as we hit one that has already been * prepared in the current iteration */ @@ -179,12 +217,30 @@ static int prepare_prioq_compare(const void *a, const void *b) { return CMP(x->priority, y->priority); } +static usec_t time_event_source_next(const sd_event_source *s) { + assert(s); + + /* We have two kinds of event sources that have elapsation times associated with them: the actual + * time based ones and the ones for which a ratelimit can be in effect (where we want to be notified + * once the ratelimit time window ends). Let's return the next elapsing time depending on what we are + * looking at here. */ + + if (s->ratelimited) { /* If rate-limited the next elapsation is when the ratelimit time window ends */ + assert(s->rate_limit.begin != 0); + assert(s->rate_limit.interval != 0); + return usec_add(s->rate_limit.begin, s->rate_limit.interval); + } + + /* Otherwise this must be a time event source, if not ratelimited */ + if (EVENT_SOURCE_IS_TIME(s->type)) + return s->time.next; + + return USEC_INFINITY; +} + static int earliest_time_prioq_compare(const void *a, const void *b) { const sd_event_source *x = a, *y = b; - assert(EVENT_SOURCE_IS_TIME(x->type)); - assert(x->type == y->type); - /* Enabled ones first */ if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF) return -1; @@ -198,19 +254,30 @@ static int earliest_time_prioq_compare(const void *a, const void *b) { return 1; /* Order by time */ - return CMP(x->time.next, y->time.next); + return CMP(time_event_source_next(x), time_event_source_next(y)); } static usec_t time_event_source_latest(const sd_event_source *s) { - return usec_add(s->time.next, s->time.accuracy); + assert(s); + + if (s->ratelimited) { /* For ratelimited stuff the earliest and the latest time shall actually be the + * same, as we should avoid adding additional inaccuracy on an inaccuracy time + * window */ + assert(s->rate_limit.begin != 0); + assert(s->rate_limit.interval != 0); + return usec_add(s->rate_limit.begin, s->rate_limit.interval); + } + + /* Must be a time event source, if not ratelimited */ + if (EVENT_SOURCE_IS_TIME(s->type)) + return usec_add(s->time.next, s->time.accuracy); + + return USEC_INFINITY; } static int latest_time_prioq_compare(const void *a, const void *b) { const sd_event_source *x = a, *y = b; - assert(EVENT_SOURCE_IS_TIME(x->type)); - assert(x->type == y->type); - /* Enabled ones first */ if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF) return -1; @@ -380,7 +447,7 @@ static void source_io_unregister(sd_event_source *s) { return; if (epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL) < 0) - log_debug_errno(errno, "Failed to remove source %s (type %s) from epoll: %m", + log_debug_errno(errno, "Failed to remove source %s (type %s) from epoll, ignoring: %m", strna(s->description), event_source_type_to_string(s->type)); s->io.registered = false; @@ -422,7 +489,7 @@ static void source_child_pidfd_unregister(sd_event_source *s) { if (EVENT_SOURCE_WATCH_PIDFD(s)) if (epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->child.pidfd, NULL) < 0) - log_debug_errno(errno, "Failed to remove source %s (type %s) from epoll: %m", + log_debug_errno(errno, "Failed to remove source %s (type %s) from epoll, ignoring: %m", strna(s->description), event_source_type_to_string(s->type)); s->child.registered = false; @@ -661,12 +728,12 @@ static void event_gc_signal_data(sd_event *e, const int64_t *priority, int sig) * and possibly drop the signalfd for it. */ if (sig == SIGCHLD && - e->n_enabled_child_sources > 0) + e->n_online_child_sources > 0) return; if (e->signal_sources && e->signal_sources[sig] && - e->signal_sources[sig]->enabled != SD_EVENT_OFF) + event_source_is_online(e->signal_sources[sig])) return; /* @@ -713,13 +780,32 @@ static void event_source_time_prioq_reshuffle(sd_event_source *s) { struct clock_data *d; assert(s); - assert(EVENT_SOURCE_IS_TIME(s->type)); /* Called whenever the event source's timer ordering properties changed, i.e. time, accuracy, * pending, enable state. Makes sure the two prioq's are ordered properly again. */ - assert_se(d = event_get_clock_data(s->event, s->type)); - prioq_reshuffle(d->earliest, s, &s->time.earliest_index); - prioq_reshuffle(d->latest, s, &s->time.latest_index); + + if (s->ratelimited) + d = &s->event->monotonic; + else { + assert(EVENT_SOURCE_IS_TIME(s->type)); + assert_se(d = event_get_clock_data(s->event, s->type)); + } + + prioq_reshuffle(d->earliest, s, &s->earliest_index); + prioq_reshuffle(d->latest, s, &s->latest_index); + d->needs_rearm = true; +} + +static void event_source_time_prioq_remove( + sd_event_source *s, + struct clock_data *d) { + + assert(s); + assert(d); + + prioq_remove(d->earliest, s, &s->earliest_index); + prioq_remove(d->latest, s, &s->latest_index); + s->earliest_index = s->latest_index = PRIOQ_IDX_NULL; d->needs_rearm = true; } @@ -745,17 +831,18 @@ static void source_disconnect(sd_event_source *s) { case SOURCE_TIME_BOOTTIME: case SOURCE_TIME_MONOTONIC: case SOURCE_TIME_REALTIME_ALARM: - case SOURCE_TIME_BOOTTIME_ALARM: { - struct clock_data *d; + case SOURCE_TIME_BOOTTIME_ALARM: + /* Only remove this event source from the time event source here if it is not ratelimited. If + * it is ratelimited, we'll remove it below, separately. Why? Because the clock used might + * differ: ratelimiting always uses CLOCK_MONOTONIC, but timer events might use any clock */ - d = event_get_clock_data(s->event, s->type); - assert(d); + if (!s->ratelimited) { + struct clock_data *d; + assert_se(d = event_get_clock_data(s->event, s->type)); + event_source_time_prioq_remove(s, d); + } - prioq_remove(d->earliest, s, &s->time.earliest_index); - prioq_remove(d->latest, s, &s->time.latest_index); - d->needs_rearm = true; break; - } case SOURCE_SIGNAL: if (s->signal.sig > 0) { @@ -770,9 +857,9 @@ static void source_disconnect(sd_event_source *s) { case SOURCE_CHILD: if (s->child.pid > 0) { - if (s->enabled != SD_EVENT_OFF) { - assert(s->event->n_enabled_child_sources > 0); - s->event->n_enabled_child_sources--; + if (event_source_is_online(s)) { + assert(s->event->n_online_child_sources > 0); + s->event->n_online_child_sources--; } (void) hashmap_remove(s->event->child_sources, PID_TO_PTR(s->child.pid)); @@ -842,6 +929,9 @@ static void source_disconnect(sd_event_source *s) { if (s->prepare) prioq_remove(s->event->prepare, s, &s->prepare_index); + if (s->ratelimited) + event_source_time_prioq_remove(s, &s->event->monotonic); + event = TAKE_PTR(s->event); LIST_REMOVE(sources, event->sources, s); event->n_sources--; @@ -1088,6 +1178,52 @@ static int time_exit_callback(sd_event_source *s, uint64_t usec, void *userdata) return sd_event_exit(sd_event_source_get_event(s), PTR_TO_INT(userdata)); } +static int setup_clock_data(sd_event *e, struct clock_data *d, clockid_t clock) { + int r; + + assert(d); + + if (d->fd < 0) { + r = event_setup_timer_fd(e, d, clock); + if (r < 0) + return r; + } + + r = prioq_ensure_allocated(&d->earliest, earliest_time_prioq_compare); + if (r < 0) + return r; + + r = prioq_ensure_allocated(&d->latest, latest_time_prioq_compare); + if (r < 0) + return r; + + return 0; +} + +static int event_source_time_prioq_put( + sd_event_source *s, + struct clock_data *d) { + + int r; + + assert(s); + assert(d); + + r = prioq_put(d->earliest, s, &s->earliest_index); + if (r < 0) + return r; + + r = prioq_put(d->latest, s, &s->latest_index); + if (r < 0) { + assert_se(prioq_remove(d->earliest, s, &s->earliest_index) > 0); + s->earliest_index = PRIOQ_IDX_NULL; + return r; + } + + d->needs_rearm = true; + return 0; +} + _public_ int sd_event_add_time( sd_event *e, sd_event_source **ret, @@ -1118,23 +1254,12 @@ _public_ int sd_event_add_time( if (!callback) callback = time_exit_callback; - d = event_get_clock_data(e, type); - assert(d); + assert_se(d = event_get_clock_data(e, type)); - r = prioq_ensure_allocated(&d->earliest, earliest_time_prioq_compare); + r = setup_clock_data(e, d, clock); if (r < 0) return r; - r = prioq_ensure_allocated(&d->latest, latest_time_prioq_compare); - if (r < 0) - return r; - - if (d->fd < 0) { - r = event_setup_timer_fd(e, d, clock); - if (r < 0) - return r; - } - s = source_new(e, !ret, type); if (!s) return -ENOMEM; @@ -1142,17 +1267,11 @@ _public_ int sd_event_add_time( s->time.next = usec; s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy; s->time.callback = callback; - s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL; + s->earliest_index = s->latest_index = PRIOQ_IDX_NULL; s->userdata = userdata; s->enabled = SD_EVENT_ONESHOT; - d->needs_rearm = true; - - r = prioq_put(d->earliest, s, &s->time.earliest_index); - if (r < 0) - return r; - - r = prioq_put(d->latest, s, &s->time.latest_index); + r = event_source_time_prioq_put(s, d); if (r < 0) return r; @@ -1285,7 +1404,7 @@ _public_ int sd_event_add_child( if (!callback) callback = child_exit_callback; - if (e->n_enabled_child_sources == 0) { + if (e->n_online_child_sources == 0) { /* Caller must block SIGCHLD before using us to watch children, even if pidfd is available, * for compatibility with pre-pidfd and because we don't want the reap the child processes * ourselves, i.e. call waitid(), and don't want Linux' default internal logic for that to @@ -1350,7 +1469,7 @@ _public_ int sd_event_add_child( e->need_process_child = true; } - e->n_enabled_child_sources++; + e->n_online_child_sources++; if (ret) *ret = s; @@ -1382,7 +1501,7 @@ _public_ int sd_event_add_child_pidfd( if (!callback) callback = child_exit_callback; - if (e->n_enabled_child_sources == 0) { + if (e->n_online_child_sources == 0) { r = signal_is_blocked(SIGCHLD); if (r < 0) return r; @@ -1432,7 +1551,7 @@ _public_ int sd_event_add_child_pidfd( e->need_process_child = true; } - e->n_enabled_child_sources++; + e->n_online_child_sources++; if (ret) *ret = s; @@ -2018,7 +2137,7 @@ _public_ int sd_event_source_set_io_fd(sd_event_source *s, int fd) { if (s->io.fd == fd) return 0; - if (s->enabled == SD_EVENT_OFF) { + if (event_source_is_offline(s)) { s->io.fd = fd; s->io.registered = false; } else { @@ -2085,7 +2204,7 @@ _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) if (r < 0) return r; - if (s->enabled != SD_EVENT_OFF) { + if (event_source_is_online(s)) { r = source_io_register(s, s->enabled, events); if (r < 0) return r; @@ -2188,7 +2307,7 @@ _public_ int sd_event_source_set_priority(sd_event_source *s, int64_t priority) event_gc_inode_data(s->event, old_inode_data); - } else if (s->type == SOURCE_SIGNAL && s->enabled != SD_EVENT_OFF) { + } else if (s->type == SOURCE_SIGNAL && event_source_is_online(s)) { struct signal_data *old, *d; /* Move us from the signalfd belonging to the old @@ -2225,29 +2344,39 @@ fail: return r; } -_public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) { +_public_ int sd_event_source_get_enabled(sd_event_source *s, int *ret) { assert_return(s, -EINVAL); assert_return(!event_pid_changed(s->event), -ECHILD); - if (m) - *m = s->enabled; + if (ret) + *ret = s->enabled; + return s->enabled != SD_EVENT_OFF; } -static int event_source_disable(sd_event_source *s) { +static int event_source_offline( + sd_event_source *s, + int enabled, + bool ratelimited) { + + bool was_offline; int r; assert(s); - assert(s->enabled != SD_EVENT_OFF); + assert(enabled == SD_EVENT_OFF || ratelimited); /* Unset the pending flag when this event source is disabled */ - if (!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) { + if (s->enabled != SD_EVENT_OFF && + enabled == SD_EVENT_OFF && + !IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) { r = source_set_pending(s, false); if (r < 0) return r; } - s->enabled = SD_EVENT_OFF; + was_offline = event_source_is_offline(s); + s->enabled = enabled; + s->ratelimited = ratelimited; switch (s->type) { @@ -2268,8 +2397,10 @@ static int event_source_disable(sd_event_source *s) { break; case SOURCE_CHILD: - assert(s->event->n_enabled_child_sources > 0); - s->event->n_enabled_child_sources--; + if (!was_offline) { + assert(s->event->n_online_child_sources > 0); + s->event->n_online_child_sources--; + } if (EVENT_SOURCE_WATCH_PIDFD(s)) source_child_pidfd_unregister(s); @@ -2290,26 +2421,42 @@ static int event_source_disable(sd_event_source *s) { assert_not_reached("Wut? I shouldn't exist."); } - return 0; + return 1; } -static int event_source_enable(sd_event_source *s, int enable) { +static int event_source_online( + sd_event_source *s, + int enabled, + bool ratelimited) { + + bool was_online; int r; assert(s); - assert(IN_SET(enable, SD_EVENT_ON, SD_EVENT_ONESHOT)); - assert(s->enabled == SD_EVENT_OFF); + assert(enabled != SD_EVENT_OFF || !ratelimited); /* Unset the pending flag when this event source is enabled */ - if (!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) { + if (s->enabled == SD_EVENT_OFF && + enabled != SD_EVENT_OFF && + !IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) { r = source_set_pending(s, false); if (r < 0) return r; } + /* Are we really ready for onlining? */ + if (enabled == SD_EVENT_OFF || ratelimited) { + /* Nope, we are not ready for onlining, then just update the precise state and exit */ + s->enabled = enabled; + s->ratelimited = ratelimited; + return 0; + } + + was_online = event_source_is_online(s); + switch (s->type) { case SOURCE_IO: - r = source_io_register(s, enable, s->io.events); + r = source_io_register(s, enabled, s->io.events); if (r < 0) return r; break; @@ -2327,7 +2474,7 @@ static int event_source_enable(sd_event_source *s, int enable) { if (EVENT_SOURCE_WATCH_PIDFD(s)) { /* yes, we have pidfd */ - r = source_child_pidfd_register(s, enable); + r = source_child_pidfd_register(s, enabled); if (r < 0) return r; } else { @@ -2340,8 +2487,8 @@ static int event_source_enable(sd_event_source *s, int enable) { } } - s->event->n_enabled_child_sources++; - + if (!was_online) + s->event->n_online_child_sources++; break; case SOURCE_TIME_REALTIME: @@ -2359,7 +2506,8 @@ static int event_source_enable(sd_event_source *s, int enable) { assert_not_reached("Wut? I shouldn't exist."); } - s->enabled = enable; + s->enabled = enabled; + s->ratelimited = ratelimited; /* Non-failing operations below */ switch (s->type) { @@ -2379,7 +2527,7 @@ static int event_source_enable(sd_event_source *s, int enable) { break; } - return 0; + return 1; } _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) { @@ -2397,7 +2545,7 @@ _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) { return 0; if (m == SD_EVENT_OFF) - r = event_source_disable(s); + r = event_source_offline(s, m, s->ratelimited); else { if (s->enabled != SD_EVENT_OFF) { /* Switching from "on" to "oneshot" or back? If that's the case, we can take a shortcut, the @@ -2406,7 +2554,7 @@ _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) { return 0; } - r = event_source_enable(s, m); + r = event_source_online(s, m, s->ratelimited); } if (r < 0) return r; @@ -2663,6 +2811,96 @@ _public_ void *sd_event_source_set_userdata(sd_event_source *s, void *userdata) return ret; } +static int event_source_enter_ratelimited(sd_event_source *s) { + int r; + + assert(s); + + /* When an event source becomes ratelimited, we place it in the CLOCK_MONOTONIC priority queue, with + * the end of the rate limit time window, much as if it was a timer event source. */ + + if (s->ratelimited) + return 0; /* Already ratelimited, this is a NOP hence */ + + /* Make sure we can install a CLOCK_MONOTONIC event further down. */ + r = setup_clock_data(s->event, &s->event->monotonic, CLOCK_MONOTONIC); + if (r < 0) + return r; + + /* Timer event sources are already using the earliest/latest queues for the timer scheduling. Let's + * first remove them from the prioq appropriate for their own clock, so that we can use the prioq + * fields of the event source then for adding it to the CLOCK_MONOTONIC prioq instead. */ + if (EVENT_SOURCE_IS_TIME(s->type)) + event_source_time_prioq_remove(s, event_get_clock_data(s->event, s->type)); + + /* Now, let's add the event source to the monotonic clock instead */ + r = event_source_time_prioq_put(s, &s->event->monotonic); + if (r < 0) + goto fail; + + /* And let's take the event source officially offline */ + r = event_source_offline(s, s->enabled, /* ratelimited= */ true); + if (r < 0) { + event_source_time_prioq_remove(s, &s->event->monotonic); + goto fail; + } + + event_source_pp_prioq_reshuffle(s); + + log_debug("Event source %p (%s) entered rate limit state.", s, strna(s->description)); + return 0; + +fail: + /* Reinstall time event sources in the priority queue as before. This shouldn't fail, since the queue + * space for it should already be allocated. */ + if (EVENT_SOURCE_IS_TIME(s->type)) + assert_se(event_source_time_prioq_put(s, event_get_clock_data(s->event, s->type)) >= 0); + + return r; +} + +static int event_source_leave_ratelimit(sd_event_source *s) { + int r; + + assert(s); + + if (!s->ratelimited) + return 0; + + /* Let's take the event source out of the monotonic prioq first. */ + event_source_time_prioq_remove(s, &s->event->monotonic); + + /* Let's then add the event source to its native clock prioq again — if this is a timer event source */ + if (EVENT_SOURCE_IS_TIME(s->type)) { + r = event_source_time_prioq_put(s, event_get_clock_data(s->event, s->type)); + if (r < 0) + goto fail; + } + + /* Let's try to take it online again. */ + r = event_source_online(s, s->enabled, /* ratelimited= */ false); + if (r < 0) { + /* Do something roughly sensible when this failed: undo the two prioq ops above */ + if (EVENT_SOURCE_IS_TIME(s->type)) + event_source_time_prioq_remove(s, event_get_clock_data(s->event, s->type)); + + goto fail; + } + + event_source_pp_prioq_reshuffle(s); + ratelimit_reset(&s->rate_limit); + + log_debug("Event source %p (%s) left rate limit state.", s, strna(s->description)); + return 0; + +fail: + /* Do something somewhat reasonable when we cannot move an event sources out of ratelimited mode: + * simply put it back in it, maybe we can then process it more successfully next iteration. */ + assert_se(event_source_time_prioq_put(s, &s->event->monotonic) >= 0); + + return r; +} + static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) { usec_t c; assert(e); @@ -2760,7 +2998,7 @@ static int event_arm_timer( d->needs_rearm = false; a = prioq_peek(d->earliest); - if (!a || a->enabled == SD_EVENT_OFF || a->time.next == USEC_INFINITY) { + if (!a || a->enabled == SD_EVENT_OFF || time_event_source_next(a) == USEC_INFINITY) { if (d->fd < 0) return 0; @@ -2779,7 +3017,7 @@ static int event_arm_timer( b = prioq_peek(d->latest); assert_se(b && b->enabled != SD_EVENT_OFF); - t = sleep_between(e, a->time.next, time_event_source_latest(b)); + t = sleep_between(e, time_event_source_next(a), time_event_source_latest(b)); if (d->next == t) return 0; @@ -2857,10 +3095,22 @@ static int process_timer( for (;;) { s = prioq_peek(d->earliest); - if (!s || - s->time.next > n || - s->enabled == SD_EVENT_OFF || - s->pending) + if (!s || time_event_source_next(s) > n) + break; + + if (s->ratelimited) { + /* This is an event sources whose ratelimit window has ended. Let's turn it on + * again. */ + assert(s->ratelimited); + + r = event_source_leave_ratelimit(s); + if (r < 0) + return r; + + continue; + } + + if (s->enabled == SD_EVENT_OFF || s->pending) break; r = source_set_pending(s, true); @@ -2905,7 +3155,7 @@ static int process_child(sd_event *e) { if (s->pending) continue; - if (s->enabled == SD_EVENT_OFF) + if (event_source_is_offline(s)) continue; if (s->child.exited) @@ -2952,7 +3202,7 @@ static int process_pidfd(sd_event *e, sd_event_source *s, uint32_t revents) { if (s->pending) return 0; - if (s->enabled == SD_EVENT_OFF) + if (event_source_is_offline(s)) return 0; if (!EVENT_SOURCE_WATCH_PIDFD(s)) @@ -3112,7 +3362,7 @@ static int event_inotify_data_process(sd_event *e, struct inotify_data *d) { LIST_FOREACH(inotify.by_inode_data, s, inode_data->event_sources) { - if (s->enabled == SD_EVENT_OFF) + if (event_source_is_offline(s)) continue; r = source_set_pending(s, true); @@ -3148,7 +3398,7 @@ static int event_inotify_data_process(sd_event *e, struct inotify_data *d) { * sources if IN_IGNORED or IN_UNMOUNT is set. */ LIST_FOREACH(inotify.by_inode_data, s, inode_data->event_sources) { - if (s->enabled == SD_EVENT_OFF) + if (event_source_is_offline(s)) continue; if ((d->buffer.ev.mask & (IN_IGNORED|IN_UNMOUNT)) == 0 && @@ -3202,6 +3452,16 @@ static int source_dispatch(sd_event_source *s) { * callback might have invalidated/disconnected the event source. */ saved_event = sd_event_ref(s->event); + /* Check if we hit the ratelimit for this event source, if so, let's disable it. */ + assert(!s->ratelimited); + if (!ratelimit_below(&s->rate_limit)) { + r = event_source_enter_ratelimited(s); + if (r < 0) + return r; + + return 1; + } + if (!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) { r = source_set_pending(s, false); if (r < 0) @@ -3215,7 +3475,7 @@ static int source_dispatch(sd_event_source *s) { * post sources as pending */ SET_FOREACH(z, s->event->post_sources) { - if (z->enabled == SD_EVENT_OFF) + if (event_source_is_offline(z)) continue; r = source_set_pending(z, true); @@ -3335,7 +3595,7 @@ static int event_prepare(sd_event *e) { sd_event_source *s; s = prioq_peek(e->prepare); - if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF) + if (!s || s->prepare_iteration == e->iteration || event_source_is_offline(s)) break; s->prepare_iteration = e->iteration; @@ -3370,18 +3630,17 @@ static int event_prepare(sd_event *e) { static int dispatch_exit(sd_event *e) { sd_event_source *p; - _cleanup_(sd_event_unrefp) sd_event *ref = NULL; int r; assert(e); p = prioq_peek(e->exit); - if (!p || p->enabled == SD_EVENT_OFF) { + if (!p || event_source_is_offline(p)) { e->state = SD_EVENT_FINISHED; return 0; } - ref = sd_event_ref(e); + _unused_ _cleanup_(sd_event_unrefp) sd_event *ref = sd_event_ref(e); e->iteration++; e->state = SD_EVENT_EXITING; r = source_dispatch(p); @@ -3398,7 +3657,7 @@ static sd_event_source* event_next_pending(sd_event *e) { if (!p) return NULL; - if (p->enabled == SD_EVENT_OFF) + if (event_source_is_offline(p)) return NULL; return p; @@ -3477,6 +3736,9 @@ _public_ int sd_event_prepare(sd_event *e) { * syscalls */ assert_return(!e->default_event_ptr || e->tid == gettid(), -EREMOTEIO); + /* Make sure that none of the preparation callbacks ends up freeing the event source under our feet */ + _unused_ _cleanup_(sd_event_unrefp) sd_event *ref = sd_event_ref(e); + if (e->exit_requested) goto pending; @@ -3682,9 +3944,8 @@ _public_ int sd_event_dispatch(sd_event *e) { p = event_next_pending(e); if (p) { - _cleanup_(sd_event_unrefp) sd_event *ref = NULL; + _unused_ _cleanup_(sd_event_unrefp) sd_event *ref = sd_event_ref(e); - ref = sd_event_ref(e); e->state = SD_EVENT_RUNNING; r = source_dispatch(p); e->state = SD_EVENT_INITIAL; @@ -3718,29 +3979,32 @@ _public_ int sd_event_run(sd_event *e, uint64_t timeout) { assert_return(e->state != SD_EVENT_FINISHED, -ESTALE); assert_return(e->state == SD_EVENT_INITIAL, -EBUSY); - if (e->profile_delays && e->last_run) { + if (e->profile_delays && e->last_run_usec != 0) { usec_t this_run; unsigned l; this_run = now(CLOCK_MONOTONIC); - l = u64log2(this_run - e->last_run); + l = u64log2(this_run - e->last_run_usec); assert(l < ELEMENTSOF(e->delays)); e->delays[l]++; - if (this_run - e->last_log >= 5*USEC_PER_SEC) { + if (this_run - e->last_log_usec >= 5*USEC_PER_SEC) { event_log_delays(e); - e->last_log = this_run; + e->last_log_usec = this_run; } } + /* Make sure that none of the preparation callbacks ends up freeing the event source under our feet */ + _unused_ _cleanup_(sd_event_unrefp) sd_event *ref = sd_event_ref(e); + r = sd_event_prepare(e); if (r == 0) /* There was nothing? Then wait... */ r = sd_event_wait(e, timeout); if (e->profile_delays) - e->last_run = now(CLOCK_MONOTONIC); + e->last_run_usec = now(CLOCK_MONOTONIC); if (r > 0) { /* There's something now, then let's dispatch it */ @@ -3755,7 +4019,6 @@ _public_ int sd_event_run(sd_event *e, uint64_t timeout) { } _public_ int sd_event_loop(sd_event *e) { - _cleanup_(sd_event_unrefp) sd_event *ref = NULL; int r; assert_return(e, -EINVAL); @@ -3763,7 +4026,7 @@ _public_ int sd_event_loop(sd_event *e) { assert_return(!event_pid_changed(e), -ECHILD); assert_return(e->state == SD_EVENT_INITIAL, -EBUSY); - ref = sd_event_ref(e); + _unused_ _cleanup_(sd_event_unrefp) sd_event *ref = NULL; while (e->state != SD_EVENT_FINISHED) { r = sd_event_run(e, (uint64_t) -1); @@ -4008,3 +4271,53 @@ _public_ int sd_event_source_set_exit_on_failure(sd_event_source *s, int b) { s->exit_on_failure = b; return 1; } + +_public_ int sd_event_source_set_ratelimit(sd_event_source *s, uint64_t interval, unsigned burst) { + int r; + + assert_return(s, -EINVAL); + + /* Turning on ratelimiting on event source types that don't support it, is a loggable offense. Doing + * so is a programming error. */ + assert_return(EVENT_SOURCE_CAN_RATE_LIMIT(s->type), -EDOM); + + /* When ratelimiting is configured we'll always reset the rate limit state first and start fresh, + * non-ratelimited. */ + r = event_source_leave_ratelimit(s); + if (r < 0) + return r; + + s->rate_limit = (RateLimit) { interval, burst }; + return 0; +} + +_public_ int sd_event_source_get_ratelimit(sd_event_source *s, uint64_t *ret_interval, unsigned *ret_burst) { + assert_return(s, -EINVAL); + + /* Querying whether an event source has ratelimiting configured is not a loggable offsense, hence + * don't use assert_return(). Unlike turning on ratelimiting it's not really a programming error */ + if (!EVENT_SOURCE_CAN_RATE_LIMIT(s->type)) + return -EDOM; + + if (!ratelimit_configured(&s->rate_limit)) + return -ENOEXEC; + + if (ret_interval) + *ret_interval = s->rate_limit.interval; + if (ret_burst) + *ret_burst = s->rate_limit.burst; + + return 0; +} + +_public_ int sd_event_source_is_ratelimited(sd_event_source *s) { + assert_return(s, -EINVAL); + + if (!EVENT_SOURCE_CAN_RATE_LIMIT(s->type)) + return false; + + if (!ratelimit_configured(&s->rate_limit)) + return false; + + return s->ratelimited; +} diff --git a/src/libsystemd/sd-event/test-event.c b/src/libsystemd/sd-event/test-event.c index 1c4d0e25ab..41745338bf 100644 --- a/src/libsystemd/sd-event/test-event.c +++ b/src/libsystemd/sd-event/test-event.c @@ -589,8 +589,100 @@ static void test_pidfd(void) { sd_event_unref(e); } +static int ratelimit_io_handler(sd_event_source *s, int fd, uint32_t revents, void *userdata) { + unsigned *c = (unsigned*) userdata; + *c += 1; + return 0; +} + +static int ratelimit_time_handler(sd_event_source *s, uint64_t usec, void *userdata) { + int r; + + r = sd_event_source_set_enabled(s, SD_EVENT_ON); + if (r < 0) + log_warning_errno(r, "Failed to turn on notify event source: %m"); + + r = sd_event_source_set_time(s, usec + 1000); + if (r < 0) + log_error_errno(r, "Failed to restart watchdog event source: %m"); + + unsigned *c = (unsigned*) userdata; + *c += 1; + + return 0; +} + +static void test_ratelimit(void) { + _cleanup_close_pair_ int p[2] = {-1, -1}; + _cleanup_(sd_event_unrefp) sd_event *e = NULL; + _cleanup_(sd_event_source_unrefp) sd_event_source *s = NULL; + uint64_t interval; + unsigned count, burst; + + assert_se(sd_event_default(&e) >= 0); + assert_se(pipe2(p, O_CLOEXEC|O_NONBLOCK) >= 0); + + assert_se(sd_event_add_io(e, &s, p[0], EPOLLIN, ratelimit_io_handler, &count) >= 0); + assert_se(sd_event_source_set_description(s, "test-ratelimit-io") >= 0); + assert_se(sd_event_source_set_ratelimit(s, 1 * USEC_PER_SEC, 5) >= 0); + assert_se(sd_event_source_get_ratelimit(s, &interval, &burst) >= 0); + assert_se(interval == 1 * USEC_PER_SEC && burst == 5); + + assert_se(write(p[1], "1", 1) == 1); + + count = 0; + for (unsigned i = 0; i < 10; i++) { + log_debug("slow loop iteration %u", i); + assert_se(sd_event_run(e, UINT64_MAX) >= 0); + assert_se(usleep(250 * USEC_PER_MSEC) >= 0); + } + + assert_se(sd_event_source_is_ratelimited(s) == 0); + assert_se(count == 10); + log_info("ratelimit_io_handler: called %d times, event source not ratelimited", count); + + assert_se(sd_event_source_set_ratelimit(s, 0, 0) >= 0); + assert_se(sd_event_source_set_ratelimit(s, 1 * USEC_PER_SEC, 5) >= 0); + + count = 0; + for (unsigned i = 0; i < 10; i++) { + log_debug("fast event loop iteration %u", i); + assert_se(sd_event_run(e, UINT64_MAX) >= 0); + assert_se(usleep(10) >= 0); + } + log_info("ratelimit_io_handler: called %d times, event source got ratelimited", count); + assert_se(count < 10); + + s = sd_event_source_unref(s); + safe_close_pair(p); + + count = 0; + assert_se(sd_event_add_time_relative(e, &s, CLOCK_MONOTONIC, 1000, 1, ratelimit_time_handler, &count) >= 0); + assert_se(sd_event_source_set_ratelimit(s, 1 * USEC_PER_SEC, 10) == 0); + + do { + assert_se(sd_event_run(e, UINT64_MAX) >= 0); + } while (!sd_event_source_is_ratelimited(s)); + + log_info("ratelimit_time_handler: called %d times, event source got ratelimited", count); + assert_se(count == 10); + + /* In order to get rid of active rate limit client needs to disable it explicitely */ + assert_se(sd_event_source_set_ratelimit(s, 0, 0) >= 0); + assert_se(!sd_event_source_is_ratelimited(s)); + + assert_se(sd_event_source_set_ratelimit(s, 1 * USEC_PER_SEC, 10) >= 0); + + do { + assert_se(sd_event_run(e, UINT64_MAX) >= 0); + } while (!sd_event_source_is_ratelimited(s)); + + log_info("ratelimit_time_handler: called 10 more times, event source got ratelimited"); + assert_se(count == 20); +} + int main(int argc, char *argv[]) { - test_setup_logging(LOG_INFO); + test_setup_logging(LOG_DEBUG); test_basic(true); /* test with pidfd */ test_basic(false); /* test without pidfd */ @@ -603,5 +695,7 @@ int main(int argc, char *argv[]) { test_pidfd(); + test_ratelimit(); + return 0; } diff --git a/src/systemd/sd-event.h b/src/systemd/sd-event.h index 937c9bd460..2ae2a0da48 100644 --- a/src/systemd/sd-event.h +++ b/src/systemd/sd-event.h @@ -162,6 +162,9 @@ int sd_event_source_get_floating(sd_event_source *s); int sd_event_source_set_floating(sd_event_source *s, int b); int sd_event_source_get_exit_on_failure(sd_event_source *s); int sd_event_source_set_exit_on_failure(sd_event_source *s, int b); +int sd_event_source_set_ratelimit(sd_event_source *s, uint64_t interval_usec, unsigned burst); +int sd_event_source_get_ratelimit(sd_event_source *s, uint64_t *ret_interval_usec, unsigned *ret_burst); +int sd_event_source_is_ratelimited(sd_event_source *s); /* Define helpers so that __attribute__((cleanup(sd_event_unrefp))) and similar may be used. */ _SD_DEFINE_POINTER_CLEANUP_FUNC(sd_event, sd_event_unref);