core: GC redundant device jobs from the run queue

In contrast to all other unit types device units when queued just track
external state, they cannot effect state changes on their own. Hence unless a
client or other job waits for them there's no reason to keep them in the job
queue. This adds a concept of GC'ing jobs of this type as soon as no client or
other job waits for them anymore.

To ensure this works correctly we need to track which clients actually
reference a job (i.e. which ones enqueued it). Unfortunately that's pretty
nasty to do for direct connections, as sd_bus_track doesn't work for
them. For now, work around this, by simply remembering in a boolean that a job
was requested by a direct connection, and reset it when we notice the direct
connection is gone. This means the GC logic works fine, except that jobs are
not immediately removed when direct connections disconnect.

In the longer term, a rework of the bus logic should fix this properly. For now
this should be good enough, as GC works for fine all cases except this one, and
thus is a clear improvement over the previous behaviour.

Fixes: #1921
This commit is contained in:
Lennart Poettering 2016-11-15 19:32:50 +01:00
parent 1a465207ab
commit c5a97ed132
11 changed files with 244 additions and 32 deletions

View File

@ -191,3 +191,71 @@ void bus_job_send_removed_signal(Job *j) {
if (r < 0)
log_debug_errno(r, "Failed to send job remove signal for %u: %m", j->id);
}
static int bus_job_track_handler(sd_bus_track *t, void *userdata) {
Job *j = userdata;
assert(t);
assert(j);
j->bus_track = sd_bus_track_unref(j->bus_track); /* make sure we aren't called again */
/* Last client dropped off the bus, maybe we should GC this now? */
job_add_to_gc_queue(j);
return 0;
}
static int bus_job_allocate_bus_track(Job *j) {
int r;
assert(j);
if (j->bus_track)
return 0;
r = sd_bus_track_new(j->unit->manager->api_bus, &j->bus_track, bus_job_track_handler, j);
if (r < 0)
return r;
return 0;
}
int bus_job_coldplug_bus_track(Job *j) {
int r = 0;
assert(j);
if (strv_isempty(j->deserialized_clients))
goto finish;
if (!j->manager->api_bus)
goto finish;
r = bus_job_allocate_bus_track(j);
if (r < 0)
goto finish;
r = bus_track_add_name_many(j->bus_track, j->deserialized_clients);
finish:
j->deserialized_clients = strv_free(j->deserialized_clients);
return r;
}
int bus_job_track_sender(Job *j, sd_bus_message *m) {
int r;
assert(j);
assert(m);
if (sd_bus_message_get_bus(m) != j->unit->manager->api_bus) {
j->ref_by_private_bus = true;
return 0;
}
r = bus_job_allocate_bus_track(j);
if (r < 0)
return r;
return sd_bus_track_add_sender(j->bus_track, m);
}

View File

@ -29,3 +29,6 @@ int bus_job_method_cancel(sd_bus_message *message, void *job, sd_bus_error *erro
void bus_job_send_change_signal(Job *j);
void bus_job_send_removed_signal(Job *j);
int bus_job_coldplug_bus_track(Job *j);
int bus_job_track_sender(Job *j, sd_bus_message *m);

View File

@ -22,6 +22,7 @@
#include "alloc-util.h"
#include "bus-common-errors.h"
#include "cgroup-util.h"
#include "dbus-job.h"
#include "dbus-unit.h"
#include "dbus.h"
#include "fd-util.h"
@ -1223,17 +1224,9 @@ int bus_unit_queue_job(
if (r < 0)
return r;
if (sd_bus_message_get_bus(message) == u->manager->api_bus) {
if (!j->bus_track) {
r = sd_bus_track_new(sd_bus_message_get_bus(message), &j->bus_track, NULL, NULL);
if (r < 0)
return r;
}
r = sd_bus_track_add_sender(j->bus_track, message);
if (r < 0)
return r;
}
r = bus_job_track_sender(j, message);
if (r < 0)
return r;
path = job_dbus_path(j);
if (!path)
@ -1507,7 +1500,7 @@ int bus_unit_check_load_state(Unit *u, sd_bus_error *error) {
return sd_bus_error_set_errnof(error, u->load_error, "Unit %s is not loaded properly: %m.", u->id);
}
static int bus_track_handler(sd_bus_track *t, void *userdata) {
static int bus_unit_track_handler(sd_bus_track *t, void *userdata) {
Unit *u = userdata;
assert(t);
@ -1519,7 +1512,7 @@ static int bus_track_handler(sd_bus_track *t, void *userdata) {
return 0;
}
static int allocate_bus_track(Unit *u) {
static int bus_unit_allocate_bus_track(Unit *u) {
int r;
assert(u);
@ -1527,7 +1520,7 @@ static int allocate_bus_track(Unit *u) {
if (u->bus_track)
return 0;
r = sd_bus_track_new(u->manager->api_bus, &u->bus_track, bus_track_handler, u);
r = sd_bus_track_new(u->manager->api_bus, &u->bus_track, bus_unit_track_handler, u);
if (r < 0)
return r;
@ -1545,7 +1538,7 @@ int bus_unit_track_add_name(Unit *u, const char *name) {
assert(u);
r = allocate_bus_track(u);
r = bus_unit_allocate_bus_track(u);
if (r < 0)
return r;
@ -1557,7 +1550,7 @@ int bus_unit_track_add_sender(Unit *u, sd_bus_message *m) {
assert(u);
r = allocate_bus_track(u);
r = bus_unit_allocate_bus_track(u);
if (r < 0)
return r;

View File

@ -831,6 +831,8 @@ const UnitVTable device_vtable = {
"Device\0"
"Install\0",
.gc_jobs = true,
.init = device_init,
.done = device_done,
.load = unit_load_fragment_and_dropin_optional,

View File

@ -90,6 +90,9 @@ void job_free(Job *j) {
if (j->in_dbus_queue)
LIST_REMOVE(dbus_queue, j->manager->dbus_job_queue, j);
if (j->in_gc_queue)
LIST_REMOVE(gc_queue, j->manager->gc_job_queue, j);
sd_event_source_unref(j->timer_event_source);
sd_bus_track_unref(j->bus_track);
@ -226,6 +229,9 @@ Job* job_install(Job *j) {
log_unit_debug(j->unit,
"Installed new job %s/%s as %u",
j->unit->id, job_type_to_string(j->type), (unsigned) j->id);
job_add_to_gc_queue(j);
return j;
}
@ -639,6 +645,7 @@ _pure_ static const char *job_get_status_message_format(Unit *u, JobType t, JobR
[JOB_DEPENDENCY] = "Dependency failed for %s.",
[JOB_ASSERT] = "Assertion failed for %s.",
[JOB_UNSUPPORTED] = "Starting of %s not supported.",
[JOB_COLLECTED] = "Unecessary job for %s was removed.",
};
static const char *const generic_finished_stop_job[_JOB_RESULT_MAX] = {
[JOB_DONE] = "Stopped %s.",
@ -698,6 +705,7 @@ static void job_print_status_message(Unit *u, JobType t, JobResult result) {
[JOB_SKIPPED] = { ANSI_HIGHLIGHT, " INFO " },
[JOB_ASSERT] = { ANSI_HIGHLIGHT_YELLOW, "ASSERT" },
[JOB_UNSUPPORTED] = { ANSI_HIGHLIGHT_YELLOW, "UNSUPP" },
[JOB_COLLECTED] = { ANSI_HIGHLIGHT, " INFO " },
};
const char *format;
@ -749,6 +757,7 @@ static void job_log_status_message(Unit *u, JobType t, JobResult result) {
[JOB_INVALID] = LOG_INFO,
[JOB_ASSERT] = LOG_WARNING,
[JOB_UNSUPPORTED] = LOG_WARNING,
[JOB_COLLECTED] = LOG_INFO,
};
assert(u);
@ -860,6 +869,7 @@ int job_finish_and_invalidate(Job *j, JobResult result, bool recursive, bool alr
job_set_state(j, JOB_WAITING);
job_add_to_run_queue(j);
job_add_to_gc_queue(j);
goto finish;
}
@ -903,11 +913,15 @@ int job_finish_and_invalidate(Job *j, JobResult result, bool recursive, bool alr
finish:
/* Try to start the next jobs that can be started */
SET_FOREACH(other, u->dependencies[UNIT_AFTER], i)
if (other->job)
if (other->job) {
job_add_to_run_queue(other->job);
job_add_to_gc_queue(other->job);
}
SET_FOREACH(other, u->dependencies[UNIT_BEFORE], i)
if (other->job)
if (other->job) {
job_add_to_run_queue(other->job);
job_add_to_gc_queue(other->job);
}
manager_check_finished(u->manager);
@ -1121,12 +1135,14 @@ int job_coldplug(Job *j) {
/* After deserialization is complete and the bus connection
* set up again, let's start watching our subscribers again */
(void) bus_track_coldplug(j->manager, &j->bus_track, false, j->deserialized_clients);
j->deserialized_clients = strv_free(j->deserialized_clients);
(void) bus_job_coldplug_bus_track(j);
if (j->state == JOB_WAITING)
job_add_to_run_queue(j);
/* Maybe due to new dependencies we don't actually need this job anymore? */
job_add_to_gc_queue(j);
if (j->begin_usec == 0 || j->unit->job_timeout == USEC_INFINITY)
return 0;
@ -1201,6 +1217,95 @@ int job_get_timeout(Job *j, usec_t *timeout) {
return 1;
}
bool job_check_gc(Job *j) {
Unit *other;
Iterator i;
assert(j);
/* Checks whether this job should be GC'ed away. We only do this for jobs of units that have no effect on their
* own and just track external state. For now the only unit type that qualifies for this are .device units. */
if (!UNIT_VTABLE(j->unit)->gc_jobs)
return true;
if (sd_bus_track_count(j->bus_track) > 0)
return true;
/* FIXME: So this is a bit ugly: for now we don't properly track references made via private bus connections
* (because it's nasty, as sd_bus_track doesn't apply to it). We simply remember that the job was once
* referenced by one, and reset this whenever we notice that no private bus connections are around. This means
* the GC is a bit too conservative when it comes to jobs created by private bus connections. */
if (j->ref_by_private_bus) {
if (set_isempty(j->unit->manager->private_buses))
j->ref_by_private_bus = false;
else
return true;
}
if (j->type == JOB_NOP)
return true;
/* If a job is ordered after ours, and is to be started, then it needs to wait for us, regardless if we stop or
* start, hence let's not GC in that case. */
SET_FOREACH(other, j->unit->dependencies[UNIT_BEFORE], i) {
if (!other->job)
continue;
if (other->job->ignore_order)
continue;
if (IN_SET(other->job->type, JOB_START, JOB_VERIFY_ACTIVE, JOB_RELOAD))
return true;
}
/* If we are going down, but something else is orederd After= us, then it needs to wait for us */
if (IN_SET(j->type, JOB_STOP, JOB_RESTART)) {
SET_FOREACH(other, j->unit->dependencies[UNIT_AFTER], i) {
if (!other->job)
continue;
if (other->job->ignore_order)
continue;
return true;
}
}
/* The logic above is kinda the inverse of the job_is_runnable() logic. Specifically, if the job "we" is
* ordered before the job "other":
*
* we start + other start stay
* we start + other stop gc
* we stop + other start stay
* we stop + other stop gc
*
* "we" are ordered after "other":
*
* we start + other start gc
* we start + other stop gc
* we stop + other start stay
* we stop + other stop stay
*
*/
return false;
}
void job_add_to_gc_queue(Job *j) {
assert(j);
if (j->in_gc_queue)
return;
if (job_check_gc(j))
return;
LIST_PREPEND(gc_queue, j->unit->manager->gc_job_queue, j);
j->in_gc_queue = true;
}
static const char* const job_state_table[_JOB_STATE_MAX] = {
[JOB_WAITING] = "waiting",
[JOB_RUNNING] = "running",
@ -1244,6 +1349,7 @@ static const char* const job_result_table[_JOB_RESULT_MAX] = {
[JOB_INVALID] = "invalid",
[JOB_ASSERT] = "assert",
[JOB_UNSUPPORTED] = "unsupported",
[JOB_COLLECTED] = "collected",
};
DEFINE_STRING_TABLE_LOOKUP(job_result, JobResult);

View File

@ -107,6 +107,7 @@ enum JobResult {
JOB_INVALID, /* JOB_RELOAD of inactive unit */
JOB_ASSERT, /* Couldn't start a unit, because an assert didn't hold */
JOB_UNSUPPORTED, /* Couldn't start a unit, because the unit type is not supported on the system */
JOB_COLLECTED, /* Job was garbage collected, since nothing needed it anymore */
_JOB_RESULT_MAX,
_JOB_RESULT_INVALID = -1
};
@ -133,6 +134,7 @@ struct Job {
LIST_FIELDS(Job, transaction);
LIST_FIELDS(Job, run_queue);
LIST_FIELDS(Job, dbus_queue);
LIST_FIELDS(Job, gc_queue);
LIST_HEAD(JobDependency, subject_list);
LIST_HEAD(JobDependency, object_list);
@ -168,6 +170,8 @@ struct Job {
bool sent_dbus_new_signal:1;
bool ignore_order:1;
bool irreversible:1;
bool in_gc_queue:1;
bool ref_by_private_bus:1;
};
Job* job_new(Unit *unit, JobType type);
@ -227,6 +231,9 @@ void job_shutdown_magic(Job *j);
int job_get_timeout(Job *j, usec_t *timeout) _pure_;
bool job_check_gc(Job *j);
void job_add_to_gc_queue(Job *j);
const char* job_type_to_string(JobType t) _const_;
JobType job_type_from_string(const char *s) _pure_;

View File

@ -981,10 +981,9 @@ good:
unit_gc_mark_good(u, gc_marker);
}
static unsigned manager_dispatch_gc_queue(Manager *m) {
static unsigned manager_dispatch_gc_unit_queue(Manager *m) {
unsigned n = 0, gc_marker;
Unit *u;
unsigned n = 0;
unsigned gc_marker;
assert(m);
@ -996,12 +995,12 @@ static unsigned manager_dispatch_gc_queue(Manager *m) {
gc_marker = m->gc_marker;
while ((u = m->gc_queue)) {
while ((u = m->gc_unit_queue)) {
assert(u->in_gc_queue);
unit_gc_sweep(u, gc_marker);
LIST_REMOVE(gc_queue, m->gc_queue, u);
LIST_REMOVE(gc_queue, m->gc_unit_queue, u);
u->in_gc_queue = false;
n++;
@ -1018,6 +1017,30 @@ static unsigned manager_dispatch_gc_queue(Manager *m) {
return n;
}
static unsigned manager_dispatch_gc_job_queue(Manager *m) {
unsigned n = 0;
Job *j;
assert(m);
while ((j = m->gc_job_queue)) {
assert(j->in_gc_queue);
LIST_REMOVE(gc_queue, m->gc_job_queue, j);
j->in_gc_queue = false;
n++;
if (job_check_gc(j))
continue;
log_unit_debug(j->unit, "Collecting job.");
(void) job_finish_and_invalidate(j, JOB_COLLECTED, false, false);
}
return n;
}
static void manager_clear_jobs_and_units(Manager *m) {
Unit *u;
@ -1033,7 +1056,8 @@ static void manager_clear_jobs_and_units(Manager *m) {
assert(!m->dbus_unit_queue);
assert(!m->dbus_job_queue);
assert(!m->cleanup_queue);
assert(!m->gc_queue);
assert(!m->gc_unit_queue);
assert(!m->gc_job_queue);
assert(hashmap_isempty(m->jobs));
assert(hashmap_isempty(m->units));
@ -2226,7 +2250,10 @@ int manager_loop(Manager *m) {
if (manager_dispatch_load_queue(m) > 0)
continue;
if (manager_dispatch_gc_queue(m) > 0)
if (manager_dispatch_gc_job_queue(m) > 0)
continue;
if (manager_dispatch_gc_unit_queue(m) > 0)
continue;
if (manager_dispatch_cleanup_queue(m) > 0)

View File

@ -104,8 +104,9 @@ struct Manager {
/* Units to remove */
LIST_HEAD(Unit, cleanup_queue);
/* Units to check when doing GC */
LIST_HEAD(Unit, gc_queue);
/* Units and jobs to check when doing GC */
LIST_HEAD(Unit, gc_unit_queue);
LIST_HEAD(Job, gc_job_queue);
/* Units that should be realized */
LIST_HEAD(Unit, cgroup_queue);

View File

@ -389,7 +389,7 @@ void unit_add_to_gc_queue(Unit *u) {
if (unit_check_gc(u))
return;
LIST_PREPEND(gc_queue, u->manager->gc_queue, u);
LIST_PREPEND(gc_queue, u->manager->gc_unit_queue, u);
u->in_gc_queue = true;
}
@ -569,7 +569,7 @@ void unit_free(Unit *u) {
LIST_REMOVE(cleanup_queue, u->manager->cleanup_queue, u);
if (u->in_gc_queue)
LIST_REMOVE(gc_queue, u->manager->gc_queue, u);
LIST_REMOVE(gc_queue, u->manager->gc_unit_queue, u);
if (u->in_cgroup_queue)
LIST_REMOVE(cgroup_queue, u->manager->cgroup_queue, u);

View File

@ -441,6 +441,9 @@ struct UnitVTable {
/* True if transient units of this type are OK */
bool can_transient:1;
/* True if queued jobs of this type should be GC'ed if no other job needs them anymore */
bool gc_jobs:1;
};
extern const UnitVTable * const unit_vtable[_UNIT_TYPE_MAX];

View File

@ -838,6 +838,8 @@ static int check_wait_response(BusWaitForJobs *d, bool quiet, const char* const*
log_error("Assertion failed on job for %s.", strna(d->name));
else if (streq(d->result, "unsupported"))
log_error("Operation on or unit type of %s not supported on this system.", strna(d->name));
else if (streq(d->result, "collected"))
log_error("Queued job for %s was garbage collected.", strna(d->name));
else if (!streq(d->result, "done") && !streq(d->result, "skipped")) {
if (d->name) {
int q;
@ -853,7 +855,7 @@ static int check_wait_response(BusWaitForJobs *d, bool quiet, const char* const*
}
}
if (streq(d->result, "canceled"))
if (STR_IN_SET(d->result, "canceled", "collected"))
r = -ECANCELED;
else if (streq(d->result, "timeout"))
r = -ETIME;