integ/base/systemd/centos/patches/919-sd-event-add-ability-to-ratelimit-event-sources.patch
Li Zhou ccfeeef59d systemd: Prevent excessive /proc/1/mountinfo reparsing
Backport the patches for this issue:
https://bugzilla.redhat.com/show_bug.cgi?id=1819868

We met such an issue:
When testing a large number of pods (> 230), occasionally observed a
number of issues related to systemd process:
    systemd ran continually 90-100% cpu usage
    systemd memory usage started increasing rapidly (20GB/hour)
    systemctl commands would always timeout (Failed to get properties:
        Connection timed out)
    sm services failed and can't recover: open-ldap,
        registry-token-server, docker-distribution, etcd
    new pods can't start, and got stuck in state ContainerCreating

Those patches work to prevent excessive /proc/1/mountinfo reparsing.
It has been verified that those patches can improve this performance
greatly.

16 commits are listed in sequence (from [1] to [16]) at below link
for the issue:
https://github.com/systemd-rhel/rhel-8/pull/154/commits

[16](10)core: prevent excessive /proc/self/mountinfo parsing
[15][Dropped-6]test: add ratelimiting test
[14](9)sd-event: add ability to ratelimit event sources
[13](8)sd-event: increase n_enabled_child_sources just once
[12](7)sd-event: update state at the end in event_source_enable
[11](6)sd-event: remove earliest_index/latest_index into common part of
event source objects
[10][Dropped-5]sd-event: follow coding style with naming return
parameter
[9] [Dropped-4]sd-event: ref event loop while in sd_event_prepare() ot
sd_event_run()
[8] (5)sd-event: refuse running default event loops in any other thread
than the one they are default for
[7] [Dropped-3]sd-event: let's suffix last_run/last_log with "_usec"
[6] [Dropped-2]sd-event: fix delays assert brain-o (#17790)
[5] (4)sd-event: split out code to add/remove timer event sources to
earliest/latest prioq
[4] (3)sd-event: split clock data allocation out of sd_event_add_time()
[3] [Dropped-1]sd-event: mention that two debug logged events are
ignored
[2] (2)sd-event: split out enable and disable codepaths from
sd_event_source_set_enabled()
[1] (1)sd-event: split out helper functions for reshuffling prioqs

I ported 10 of them back (from (1) to (10)) to fix this issue
and dropped the other 6 (from [Dropped-1] to [Dropped-6]) for those
reasons:
[Dropped-1]Only changes error log.
[Dropped-2]Fixes a bug introduced in a commit which doesn't exist in
this version.
[Dropped-3]Only changes vars' names and there is no functional change.
[Dropped-4]More commits are needed for merging it, while I don't see
any help on adding the rate-limiting ability.
[Dropped-5]Change coding style for a function which isn't really used
by anyone.
[Dropped-6]Add test cases.

Closes-Bug: #1924686
Signed-off-by: Li Zhou <li.zhou@windriver.com>
Change-Id: Ia4c8f162cb1a47b40d1b26cf4d604976b97e92d6
2021-04-22 22:09:33 -04:00

842 lines
31 KiB
Diff

From 69266c451910d2b57313b2fe7561e07cd5400d27 Mon Sep 17 00:00:00 2001
From: Lennart Poettering <lennart@poettering.net>
Date: Mon, 23 Nov 2020 18:02:40 +0100
Subject: [PATCH 19/20] sd-event: add ability to ratelimit event sources
Let's a concept of "rate limiting" to event sources: if specific event
sources fire too often in some time interval temporarily take them
offline, and take them back online once the interval passed.
This is a simple scheme of avoiding starvation of event sources if some
event source fires too often.
This introduces the new conceptual states of "offline" and "online" for
event sources: an event source is "online" only when enabled *and* not
ratelimited, and offline in all other cases. An event source that is
online hence has its fds registered in the epoll, its signals in the
signalfd and so on.
(cherry picked from commit b6d5481b3d9f7c9b1198ab54b54326ec73e855bf)
Related: #1819868
[commit 395eb7753a9772f505102fbbe3ba3261b57abbe9 from
https://github.com/systemd-rhel/rhel-8/
LZ: Moved the changes in libsystemd.sym to libsystemd.sym.m4 from the
file changing history; patch ratelimit.h in its old path; dropped
SOURCE_INOTIFY related parts in sd-event.c because it hasn't been
added in this systemd version.]
Signed-off-by: Li Zhou <li.zhou@windriver.com>
---
src/libsystemd/libsystemd.sym.m4 | 7 +
src/libsystemd/sd-event/sd-event.c | 427 +++++++++++++++++++++++------
src/shared/ratelimit.h | 8 +
src/systemd/sd-event.h | 3 +
4 files changed, 365 insertions(+), 80 deletions(-)
diff --git a/src/libsystemd/libsystemd.sym.m4 b/src/libsystemd/libsystemd.sym.m4
index b1c2b43..ceb5d7f 100644
--- a/src/libsystemd/libsystemd.sym.m4
+++ b/src/libsystemd/libsystemd.sym.m4
@@ -169,6 +169,13 @@ global:
sd_journal_has_persistent_files;
} LIBSYSTEMD_219;
+LIBSYSTEMD_248 {
+global:
+ sd_event_source_set_ratelimit;
+ sd_event_source_get_ratelimit;
+ sd_event_source_is_ratelimited;
+} LIBSYSTEMD_229;
+
m4_ifdef(`ENABLE_KDBUS',
LIBSYSTEMD_FUTURE {
global:
diff --git a/src/libsystemd/sd-event/sd-event.c b/src/libsystemd/sd-event/sd-event.c
index 69dd02b..a3ade40 100644
--- a/src/libsystemd/sd-event/sd-event.c
+++ b/src/libsystemd/sd-event/sd-event.c
@@ -32,6 +32,7 @@
#include "util.h"
#include "time-util.h"
#include "missing.h"
+#include "ratelimit.h"
#include "set.h"
#include "list.h"
@@ -67,7 +68,24 @@ typedef enum WakeupType {
_WAKEUP_TYPE_INVALID = -1,
} WakeupType;
-#define EVENT_SOURCE_IS_TIME(t) IN_SET((t), SOURCE_TIME_REALTIME, SOURCE_TIME_BOOTTIME, SOURCE_TIME_MONOTONIC, SOURCE_TIME_REALTIME_ALARM, SOURCE_TIME_BOOTTIME_ALARM)
+#define EVENT_SOURCE_IS_TIME(t) \
+ IN_SET((t), \
+ SOURCE_TIME_REALTIME, \
+ SOURCE_TIME_BOOTTIME, \
+ SOURCE_TIME_MONOTONIC, \
+ SOURCE_TIME_REALTIME_ALARM, \
+ SOURCE_TIME_BOOTTIME_ALARM)
+
+#define EVENT_SOURCE_CAN_RATE_LIMIT(t) \
+ IN_SET((t), \
+ SOURCE_IO, \
+ SOURCE_TIME_REALTIME, \
+ SOURCE_TIME_BOOTTIME, \
+ SOURCE_TIME_MONOTONIC, \
+ SOURCE_TIME_REALTIME_ALARM, \
+ SOURCE_TIME_BOOTTIME_ALARM, \
+ SOURCE_SIGNAL, \
+ SOURCE_DEFER)
struct sd_event_source {
WakeupType wakeup;
@@ -85,6 +103,7 @@ struct sd_event_source {
bool pending:1;
bool dispatching:1;
bool floating:1;
+ bool ratelimited:1;
int64_t priority;
unsigned pending_index;
@@ -94,6 +113,10 @@ struct sd_event_source {
LIST_FIELDS(sd_event_source, sources);
+ RateLimit rate_limit;
+
+ /* These are primarily fields relevant for time event sources, but since any event source can
+ * effectively become one when rate-limited, this is part of the common fields. */
unsigned earliest_index;
unsigned latest_index;
@@ -188,7 +211,7 @@ struct sd_event {
Hashmap *signal_data; /* indexed by priority */
Hashmap *child_sources;
- unsigned n_enabled_child_sources;
+ unsigned n_online_child_sources;
Set *post_sources;
@@ -219,8 +242,19 @@ struct sd_event {
static void source_disconnect(sd_event_source *s);
+static bool event_source_is_online(sd_event_source *s) {
+ assert(s);
+ return s->enabled != SD_EVENT_OFF && !s->ratelimited;
+}
+
+static bool event_source_is_offline(sd_event_source *s) {
+ assert(s);
+ return s->enabled == SD_EVENT_OFF || s->ratelimited;
+}
+
static int pending_prioq_compare(const void *a, const void *b) {
const sd_event_source *x = a, *y = b;
+ int r;
assert(x->pending);
assert(y->pending);
@@ -231,23 +265,23 @@ static int pending_prioq_compare(const void *a, const void *b) {
if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
return 1;
+ /* Non rate-limited ones first. */
+ r = CMP(!!x->ratelimited, !!y->ratelimited);
+ if (r != 0)
+ return r;
+
/* Lower priority values first */
- if (x->priority < y->priority)
- return -1;
- if (x->priority > y->priority)
- return 1;
+ r = CMP(x->priority, y->priority);
+ if (r != 0)
+ return r;
/* Older entries first */
- if (x->pending_iteration < y->pending_iteration)
- return -1;
- if (x->pending_iteration > y->pending_iteration)
- return 1;
-
- return 0;
+ return CMP(x->pending_iteration, y->pending_iteration);
}
static int prepare_prioq_compare(const void *a, const void *b) {
const sd_event_source *x = a, *y = b;
+ int r;
assert(x->prepare);
assert(y->prepare);
@@ -258,29 +292,46 @@ static int prepare_prioq_compare(const void *a, const void *b) {
if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
return 1;
+ /* Non rate-limited ones first. */
+ r = CMP(!!x->ratelimited, !!y->ratelimited);
+ if (r != 0)
+ return r;
+
/* Move most recently prepared ones last, so that we can stop
* preparing as soon as we hit one that has already been
* prepared in the current iteration */
- if (x->prepare_iteration < y->prepare_iteration)
- return -1;
- if (x->prepare_iteration > y->prepare_iteration)
- return 1;
+ r = CMP(x->prepare_iteration, y->prepare_iteration);
+ if (r != 0)
+ return r;
/* Lower priority values first */
- if (x->priority < y->priority)
- return -1;
- if (x->priority > y->priority)
- return 1;
+ return CMP(x->priority, y->priority);
+}
- return 0;
+static usec_t time_event_source_next(const sd_event_source *s) {
+ assert(s);
+
+ /* We have two kinds of event sources that have elapsation times associated with them: the actual
+ * time based ones and the ones for which a ratelimit can be in effect (where we want to be notified
+ * once the ratelimit time window ends). Let's return the next elapsing time depending on what we are
+ * looking at here. */
+
+ if (s->ratelimited) { /* If rate-limited the next elapsation is when the ratelimit time window ends */
+ assert(s->rate_limit.begin != 0);
+ assert(s->rate_limit.interval != 0);
+ return usec_add(s->rate_limit.begin, s->rate_limit.interval);
+ }
+
+ /* Otherwise this must be a time event source, if not ratelimited */
+ if (EVENT_SOURCE_IS_TIME(s->type))
+ return s->time.next;
+
+ return USEC_INFINITY;
}
static int earliest_time_prioq_compare(const void *a, const void *b) {
const sd_event_source *x = a, *y = b;
- assert(EVENT_SOURCE_IS_TIME(x->type));
- assert(x->type == y->type);
-
/* Enabled ones first */
if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
return -1;
@@ -294,24 +345,30 @@ static int earliest_time_prioq_compare(const void *a, const void *b) {
return 1;
/* Order by time */
- if (x->time.next < y->time.next)
- return -1;
- if (x->time.next > y->time.next)
- return 1;
-
- return 0;
+ return CMP(time_event_source_next(x), time_event_source_next(y));
}
static usec_t time_event_source_latest(const sd_event_source *s) {
- return usec_add(s->time.next, s->time.accuracy);
+ assert(s);
+
+ if (s->ratelimited) { /* For ratelimited stuff the earliest and the latest time shall actually be the
+ * same, as we should avoid adding additional inaccuracy on an inaccuracy time
+ * window */
+ assert(s->rate_limit.begin != 0);
+ assert(s->rate_limit.interval != 0);
+ return usec_add(s->rate_limit.begin, s->rate_limit.interval);
+ }
+
+ /* Must be a time event source, if not ratelimited */
+ if (EVENT_SOURCE_IS_TIME(s->type))
+ return usec_add(s->time.next, s->time.accuracy);
+
+ return USEC_INFINITY;
}
static int latest_time_prioq_compare(const void *a, const void *b) {
const sd_event_source *x = a, *y = b;
- assert(EVENT_SOURCE_IS_TIME(x->type));
- assert(x->type == y->type);
-
/* Enabled ones first */
if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
return -1;
@@ -722,12 +779,12 @@ static void event_gc_signal_data(sd_event *e, const int64_t *priority, int sig)
* the signalfd for it. */
if (sig == SIGCHLD &&
- e->n_enabled_child_sources > 0)
+ e->n_online_child_sources > 0)
return;
if (e->signal_sources &&
e->signal_sources[sig] &&
- e->signal_sources[sig]->enabled != SD_EVENT_OFF)
+ event_source_is_online(e->signal_sources[sig]))
return;
/*
@@ -774,11 +831,17 @@ static void event_source_time_prioq_reshuffle(sd_event_source *s) {
struct clock_data *d;
assert(s);
- assert(EVENT_SOURCE_IS_TIME(s->type));
/* Called whenever the event source's timer ordering properties changed, i.e. time, accuracy,
* pending, enable state. Makes sure the two prioq's are ordered properly again. */
- assert_se(d = event_get_clock_data(s->event, s->type));
+
+ if (s->ratelimited)
+ d = &s->event->monotonic;
+ else {
+ assert(EVENT_SOURCE_IS_TIME(s->type));
+ assert_se(d = event_get_clock_data(s->event, s->type));
+ }
+
prioq_reshuffle(d->earliest, s, &s->earliest_index);
prioq_reshuffle(d->latest, s, &s->latest_index);
d->needs_rearm = true;
@@ -819,12 +882,18 @@ static void source_disconnect(sd_event_source *s) {
case SOURCE_TIME_BOOTTIME:
case SOURCE_TIME_MONOTONIC:
case SOURCE_TIME_REALTIME_ALARM:
- case SOURCE_TIME_BOOTTIME_ALARM: {
- struct clock_data *d;
- assert_se(d = event_get_clock_data(s->event, s->type));
- event_source_time_prioq_remove(s, d);
+ case SOURCE_TIME_BOOTTIME_ALARM:
+ /* Only remove this event source from the time event source here if it is not ratelimited. If
+ * it is ratelimited, we'll remove it below, separately. Why? Because the clock used might
+ * differ: ratelimiting always uses CLOCK_MONOTONIC, but timer events might use any clock */
+
+ if (!s->ratelimited) {
+ struct clock_data *d;
+ assert_se(d = event_get_clock_data(s->event, s->type));
+ event_source_time_prioq_remove(s, d);
+ }
+
break;
- }
case SOURCE_SIGNAL:
if (s->signal.sig > 0) {
@@ -839,9 +908,9 @@ static void source_disconnect(sd_event_source *s) {
case SOURCE_CHILD:
if (s->child.pid > 0) {
- if (s->enabled != SD_EVENT_OFF) {
- assert(s->event->n_enabled_child_sources > 0);
- s->event->n_enabled_child_sources--;
+ if (event_source_is_online(s)) {
+ assert(s->event->n_online_child_sources > 0);
+ s->event->n_online_child_sources--;
}
(void) hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
@@ -872,6 +941,9 @@ static void source_disconnect(sd_event_source *s) {
if (s->prepare)
prioq_remove(s->event->prepare, s, &s->prepare_index);
+ if (s->ratelimited)
+ event_source_time_prioq_remove(s, &s->event->monotonic);
+
event = s->event;
s->type = _SOURCE_EVENT_SOURCE_TYPE_INVALID;
@@ -1259,11 +1331,11 @@ _public_ int sd_event_add_child(
return r;
}
- e->n_enabled_child_sources ++;
+ e->n_online_child_sources++;
r = event_make_signal_data(e, SIGCHLD, NULL);
if (r < 0) {
- e->n_enabled_child_sources--;
+ e->n_online_child_sources--;
source_free(s);
return r;
}
@@ -1476,7 +1548,7 @@ _public_ int sd_event_source_set_io_fd(sd_event_source *s, int fd) {
if (s->io.fd == fd)
return 0;
- if (s->enabled == SD_EVENT_OFF) {
+ if (event_source_is_offline(s)) {
s->io.fd = fd;
s->io.registered = false;
} else {
@@ -1524,7 +1596,7 @@ _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events)
if (s->io.events == events && !(events & EPOLLET))
return 0;
- if (s->enabled != SD_EVENT_OFF) {
+ if (event_source_is_online(s)) {
r = source_io_register(s, s->enabled, events);
if (r < 0)
return r;
@@ -1572,7 +1644,7 @@ _public_ int sd_event_source_set_priority(sd_event_source *s, int64_t priority)
if (s->priority == priority)
return 0;
- if (s->type == SOURCE_SIGNAL && s->enabled != SD_EVENT_OFF) {
+ if (s->type == SOURCE_SIGNAL && event_source_is_online(s)) {
struct signal_data *old, *d;
/* Move us from the signalfd belonging to the old
@@ -1609,20 +1681,29 @@ _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
return 0;
}
-static int event_source_disable(sd_event_source *s) {
+static int event_source_offline(
+ sd_event_source *s,
+ int enabled,
+ bool ratelimited) {
+
+ bool was_offline;
int r;
assert(s);
- assert(s->enabled != SD_EVENT_OFF);
+ assert(enabled == SD_EVENT_OFF || ratelimited);
/* Unset the pending flag when this event source is disabled */
- if (!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
+ if (s->enabled != SD_EVENT_OFF &&
+ enabled == SD_EVENT_OFF &&
+ !IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
r = source_set_pending(s, false);
if (r < 0)
return r;
}
- s->enabled = SD_EVENT_OFF;
+ was_offline = event_source_is_offline(s);
+ s->enabled = enabled;
+ s->ratelimited = ratelimited;
switch (s->type) {
@@ -1643,8 +1724,10 @@ static int event_source_disable(sd_event_source *s) {
break;
case SOURCE_CHILD:
- assert(s->event->n_enabled_child_sources > 0);
- s->event->n_enabled_child_sources--;
+ if (!was_offline) {
+ assert(s->event->n_online_child_sources > 0);
+ s->event->n_online_child_sources--;
+ }
event_gc_signal_data(s->event, &s->priority, SIGCHLD);
break;
@@ -1661,26 +1744,42 @@ static int event_source_disable(sd_event_source *s) {
assert_not_reached("Wut? I shouldn't exist.");
}
- return 0;
+ return 1;
}
-static int event_source_enable(sd_event_source *s, int enable) {
+static int event_source_online(
+ sd_event_source *s,
+ int enabled,
+ bool ratelimited) {
+
+ bool was_online;
int r;
assert(s);
- assert(IN_SET(enable, SD_EVENT_ON, SD_EVENT_ONESHOT));
- assert(s->enabled == SD_EVENT_OFF);
+ assert(enabled != SD_EVENT_OFF || !ratelimited);
/* Unset the pending flag when this event source is enabled */
- if (!IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
+ if (s->enabled == SD_EVENT_OFF &&
+ enabled != SD_EVENT_OFF &&
+ !IN_SET(s->type, SOURCE_DEFER, SOURCE_EXIT)) {
r = source_set_pending(s, false);
if (r < 0)
return r;
}
+ /* Are we really ready for onlining? */
+ if (enabled == SD_EVENT_OFF || ratelimited) {
+ /* Nope, we are not ready for onlining, then just update the precise state and exit */
+ s->enabled = enabled;
+ s->ratelimited = ratelimited;
+ return 0;
+ }
+
+ was_online = event_source_is_online(s);
+
switch (s->type) {
case SOURCE_IO:
- r = source_io_register(s, enable, s->io.events);
+ r = source_io_register(s, enabled, s->io.events);
if (r < 0)
return r;
break;
@@ -1698,13 +1797,13 @@ static int event_source_enable(sd_event_source *s, int enable) {
r = event_make_signal_data(s->event, SIGCHLD, NULL);
if (r < 0) {
s->enabled = SD_EVENT_OFF;
- s->event->n_enabled_child_sources--;
+ s->event->n_online_child_sources--;
event_gc_signal_data(s->event, &s->priority, SIGCHLD);
return r;
}
- s->event->n_enabled_child_sources++;
-
+ if (!was_online)
+ s->event->n_online_child_sources++;
break;
case SOURCE_TIME_REALTIME:
@@ -1721,7 +1820,8 @@ static int event_source_enable(sd_event_source *s, int enable) {
assert_not_reached("Wut? I shouldn't exist.");
}
- s->enabled = enable;
+ s->enabled = enabled;
+ s->ratelimited = ratelimited;
/* Non-failing operations below */
switch (s->type) {
@@ -1741,7 +1841,7 @@ static int event_source_enable(sd_event_source *s, int enable) {
break;
}
- return 0;
+ return 1;
}
_public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
@@ -1759,7 +1859,7 @@ _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
return 0;
if (m == SD_EVENT_OFF)
- r = event_source_disable(s);
+ r = event_source_offline(s, m, s->ratelimited);
else {
if (s->enabled != SD_EVENT_OFF) {
/* Switching from "on" to "oneshot" or back? If that's the case, we can take a shortcut, the
@@ -1768,7 +1868,7 @@ _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
return 0;
}
- r = event_source_enable(s, m);
+ r = event_source_online(s, m, s->ratelimited);
}
if (r < 0)
return r;
@@ -1900,6 +2000,96 @@ _public_ void *sd_event_source_set_userdata(sd_event_source *s, void *userdata)
return ret;
}
+static int event_source_enter_ratelimited(sd_event_source *s) {
+ int r;
+
+ assert(s);
+
+ /* When an event source becomes ratelimited, we place it in the CLOCK_MONOTONIC priority queue, with
+ * the end of the rate limit time window, much as if it was a timer event source. */
+
+ if (s->ratelimited)
+ return 0; /* Already ratelimited, this is a NOP hence */
+
+ /* Make sure we can install a CLOCK_MONOTONIC event further down. */
+ r = setup_clock_data(s->event, &s->event->monotonic, CLOCK_MONOTONIC);
+ if (r < 0)
+ return r;
+
+ /* Timer event sources are already using the earliest/latest queues for the timer scheduling. Let's
+ * first remove them from the prioq appropriate for their own clock, so that we can use the prioq
+ * fields of the event source then for adding it to the CLOCK_MONOTONIC prioq instead. */
+ if (EVENT_SOURCE_IS_TIME(s->type))
+ event_source_time_prioq_remove(s, event_get_clock_data(s->event, s->type));
+
+ /* Now, let's add the event source to the monotonic clock instead */
+ r = event_source_time_prioq_put(s, &s->event->monotonic);
+ if (r < 0)
+ goto fail;
+
+ /* And let's take the event source officially offline */
+ r = event_source_offline(s, s->enabled, /* ratelimited= */ true);
+ if (r < 0) {
+ event_source_time_prioq_remove(s, &s->event->monotonic);
+ goto fail;
+ }
+
+ event_source_pp_prioq_reshuffle(s);
+
+ log_debug("Event source %p (%s) entered rate limit state.", s, strna(s->description));
+ return 0;
+
+fail:
+ /* Reinstall time event sources in the priority queue as before. This shouldn't fail, since the queue
+ * space for it should already be allocated. */
+ if (EVENT_SOURCE_IS_TIME(s->type))
+ assert_se(event_source_time_prioq_put(s, event_get_clock_data(s->event, s->type)) >= 0);
+
+ return r;
+}
+
+static int event_source_leave_ratelimit(sd_event_source *s) {
+ int r;
+
+ assert(s);
+
+ if (!s->ratelimited)
+ return 0;
+
+ /* Let's take the event source out of the monotonic prioq first. */
+ event_source_time_prioq_remove(s, &s->event->monotonic);
+
+ /* Let's then add the event source to its native clock prioq again — if this is a timer event source */
+ if (EVENT_SOURCE_IS_TIME(s->type)) {
+ r = event_source_time_prioq_put(s, event_get_clock_data(s->event, s->type));
+ if (r < 0)
+ goto fail;
+ }
+
+ /* Let's try to take it online again. */
+ r = event_source_online(s, s->enabled, /* ratelimited= */ false);
+ if (r < 0) {
+ /* Do something roughly sensible when this failed: undo the two prioq ops above */
+ if (EVENT_SOURCE_IS_TIME(s->type))
+ event_source_time_prioq_remove(s, event_get_clock_data(s->event, s->type));
+
+ goto fail;
+ }
+
+ event_source_pp_prioq_reshuffle(s);
+ ratelimit_reset(&s->rate_limit);
+
+ log_debug("Event source %p (%s) left rate limit state.", s, strna(s->description));
+ return 0;
+
+fail:
+ /* Do something somewhat reasonable when we cannot move an event sources out of ratelimited mode:
+ * simply put it back in it, maybe we can then process it more successfully next iteration. */
+ assert_se(event_source_time_prioq_put(s, &s->event->monotonic) >= 0);
+
+ return r;
+}
+
static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
usec_t c;
assert(e);
@@ -1998,7 +2188,7 @@ static int event_arm_timer(
d->needs_rearm = false;
a = prioq_peek(d->earliest);
- if (!a || a->enabled == SD_EVENT_OFF || a->time.next == USEC_INFINITY) {
+ if (!a || a->enabled == SD_EVENT_OFF || time_event_source_next(a) == USEC_INFINITY) {
if (d->fd < 0)
return 0;
@@ -2018,7 +2208,7 @@ static int event_arm_timer(
b = prioq_peek(d->latest);
assert_se(b && b->enabled != SD_EVENT_OFF);
- t = sleep_between(e, a->time.next, time_event_source_latest(b));
+ t = sleep_between(e, time_event_source_next(a), time_event_source_latest(b));
if (d->next == t)
return 0;
@@ -2097,10 +2287,22 @@ static int process_timer(
for (;;) {
s = prioq_peek(d->earliest);
- if (!s ||
- s->time.next > n ||
- s->enabled == SD_EVENT_OFF ||
- s->pending)
+ if (!s || time_event_source_next(s) > n)
+ break;
+
+ if (s->ratelimited) {
+ /* This is an event sources whose ratelimit window has ended. Let's turn it on
+ * again. */
+ assert(s->ratelimited);
+
+ r = event_source_leave_ratelimit(s);
+ if (r < 0)
+ return r;
+
+ continue;
+ }
+
+ if (s->enabled == SD_EVENT_OFF || s->pending)
break;
r = source_set_pending(s, true);
@@ -2146,7 +2348,7 @@ static int process_child(sd_event *e) {
if (s->pending)
continue;
- if (s->enabled == SD_EVENT_OFF)
+ if (event_source_is_offline(s))
continue;
zero(s->child.siginfo);
@@ -2242,11 +2444,26 @@ static int process_signal(sd_event *e, struct signal_data *d, uint32_t events) {
}
static int source_dispatch(sd_event_source *s) {
+ _cleanup_(sd_event_unrefp) sd_event *saved_event = NULL;
int r = 0;
assert(s);
assert(s->pending || s->type == SOURCE_EXIT);
+ /* Similar, store a reference to the event loop object, so that we can still access it after the
+ * callback might have invalidated/disconnected the event source. */
+ saved_event = sd_event_ref(s->event);
+
+ /* Check if we hit the ratelimit for this event source, if so, let's disable it. */
+ assert(!s->ratelimited);
+ if (!ratelimit_below(&s->rate_limit)) {
+ r = event_source_enter_ratelimited(s);
+ if (r < 0)
+ return r;
+
+ return 1;
+ }
+
if (s->type != SOURCE_DEFER && s->type != SOURCE_EXIT) {
r = source_set_pending(s, false);
if (r < 0)
@@ -2356,7 +2573,7 @@ static int event_prepare(sd_event *e) {
sd_event_source *s;
s = prioq_peek(e->prepare);
- if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
+ if (!s || s->prepare_iteration == e->iteration || event_source_is_offline(s))
break;
s->prepare_iteration = e->iteration;
@@ -2393,7 +2610,7 @@ static int dispatch_exit(sd_event *e) {
assert(e);
p = prioq_peek(e->exit);
- if (!p || p->enabled == SD_EVENT_OFF) {
+ if (!p || event_source_is_offline(p)) {
e->state = SD_EVENT_FINISHED;
return 0;
}
@@ -2419,7 +2636,7 @@ static sd_event_source* event_next_pending(sd_event *e) {
if (!p)
return NULL;
- if (p->enabled == SD_EVENT_OFF)
+ if (event_source_is_offline(p))
return NULL;
return p;
@@ -2879,3 +3096,53 @@ _public_ int sd_event_get_iteration(sd_event *e, uint64_t *ret) {
*ret = e->iteration;
return 0;
}
+
+_public_ int sd_event_source_set_ratelimit(sd_event_source *s, uint64_t interval, unsigned burst) {
+ int r;
+
+ assert_return(s, -EINVAL);
+
+ /* Turning on ratelimiting on event source types that don't support it, is a loggable offense. Doing
+ * so is a programming error. */
+ assert_return(EVENT_SOURCE_CAN_RATE_LIMIT(s->type), -EDOM);
+
+ /* When ratelimiting is configured we'll always reset the rate limit state first and start fresh,
+ * non-ratelimited. */
+ r = event_source_leave_ratelimit(s);
+ if (r < 0)
+ return r;
+
+ RATELIMIT_INIT(s->rate_limit, interval, burst);
+ return 0;
+}
+
+_public_ int sd_event_source_get_ratelimit(sd_event_source *s, uint64_t *ret_interval, unsigned *ret_burst) {
+ assert_return(s, -EINVAL);
+
+ /* Querying whether an event source has ratelimiting configured is not a loggable offsense, hence
+ * don't use assert_return(). Unlike turning on ratelimiting it's not really a programming error */
+ if (!EVENT_SOURCE_CAN_RATE_LIMIT(s->type))
+ return -EDOM;
+
+ if (!ratelimit_configured(&s->rate_limit))
+ return -ENOEXEC;
+
+ if (ret_interval)
+ *ret_interval = s->rate_limit.interval;
+ if (ret_burst)
+ *ret_burst = s->rate_limit.burst;
+
+ return 0;
+}
+
+_public_ int sd_event_source_is_ratelimited(sd_event_source *s) {
+ assert_return(s, -EINVAL);
+
+ if (!EVENT_SOURCE_CAN_RATE_LIMIT(s->type))
+ return false;
+
+ if (!ratelimit_configured(&s->rate_limit))
+ return false;
+
+ return s->ratelimited;
+}
diff --git a/src/shared/ratelimit.h b/src/shared/ratelimit.h
index 58efca7..434089e 100644
--- a/src/shared/ratelimit.h
+++ b/src/shared/ratelimit.h
@@ -55,3 +55,11 @@ typedef struct RateLimit {
} while (false)
bool ratelimit_test(RateLimit *r);
+
+static inline void ratelimit_reset(RateLimit *rl) {
+ rl->num = rl->begin = 0;
+}
+
+static inline bool ratelimit_configured(RateLimit *rl) {
+ return rl->interval > 0 && rl->burst > 0;
+}
diff --git a/src/systemd/sd-event.h b/src/systemd/sd-event.h
index ffde7c8..f297c6a 100644
--- a/src/systemd/sd-event.h
+++ b/src/systemd/sd-event.h
@@ -130,6 +130,9 @@ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec);
int sd_event_source_get_time_clock(sd_event_source *s, clockid_t *clock);
int sd_event_source_get_signal(sd_event_source *s);
int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid);
+int sd_event_source_set_ratelimit(sd_event_source *s, uint64_t interval_usec, unsigned burst);
+int sd_event_source_get_ratelimit(sd_event_source *s, uint64_t *ret_interval_usec, unsigned *ret_burst);
+int sd_event_source_is_ratelimited(sd_event_source *s);
_SD_END_DECLARATIONS;
--
2.17.1