diff --git a/src/journal/journalctl.c b/src/journal/journalctl.c index 72c1622596..f08d0e8322 100644 --- a/src/journal/journalctl.c +++ b/src/journal/journalctl.c @@ -67,6 +67,7 @@ #include "tmpfile-util.h" #include "unit-name.h" #include "user-util.h" +#include "varlink.h" #define DEFAULT_FSS_INTERVAL_USEC (15*USEC_PER_MINUTE) @@ -1901,156 +1902,35 @@ static int verify(sd_journal *j) { return r; } -static int watch_run_systemd_journal(uint32_t mask) { - _cleanup_close_ int watch_fd = -1; - - (void) mkdir_p("/run/systemd/journal", 0755); - - watch_fd = inotify_init1(IN_NONBLOCK|IN_CLOEXEC); - if (watch_fd < 0) - return log_error_errno(errno, "Failed to create inotify object: %m"); - - if (inotify_add_watch(watch_fd, "/run/systemd/journal", mask) < 0) - return log_error_errno(errno, "Failed to watch \"/run/systemd/journal\": %m"); - - return TAKE_FD(watch_fd); -} - -static int flush_to_var(void) { - _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; - _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; - _cleanup_close_ int watch_fd = -1; +static int simple_varlink_call(const char *option, const char *method) { + _cleanup_(varlink_flush_close_unrefp) Varlink *link = NULL; + const char *error; int r; - if (arg_machine) { - log_error("--flush is not supported in conjunction with --machine=."); - return -EOPNOTSUPP; - } + if (arg_machine) + return log_error_errno(SYNTHETIC_ERRNO(EOPNOTSUPP), "%s is not supported in conjunction with --machine=.", option); - /* Quick exit */ - if (access("/run/systemd/journal/flushed", F_OK) >= 0) - return 0; - - /* OK, let's actually do the full logic, send SIGUSR1 to the - * daemon and set up inotify to wait for the flushed file to appear */ - r = bus_connect_system_systemd(&bus); + r = varlink_connect_address(&link, "/run/systemd/journal/io.systemd.journal"); if (r < 0) - return log_error_errno(r, "Failed to get D-Bus connection: %m"); + return log_error_errno(r, "Failed to connect to journal: %m"); - r = sd_bus_call_method( - bus, - "org.freedesktop.systemd1", - "/org/freedesktop/systemd1", - "org.freedesktop.systemd1.Manager", - "KillUnit", - &error, - NULL, - "ssi", "systemd-journald.service", "main", SIGUSR1); + r = varlink_call(link, method, NULL, NULL, &error, NULL); if (r < 0) - return log_error_errno(r, "Failed to kill journal service: %s", bus_error_message(&error, r)); - - watch_fd = watch_run_systemd_journal(IN_CREATE|IN_DONT_FOLLOW|IN_ONLYDIR); - if (watch_fd < 0) - return watch_fd; - - for (;;) { - if (access("/run/systemd/journal/flushed", F_OK) >= 0) - return 0; - - if (errno != ENOENT) - return log_error_errno(errno, "Failed to check for existence of /run/systemd/journal/flushed: %m"); - - r = fd_wait_for_event(watch_fd, POLLIN, USEC_INFINITY); - if (r < 0) - return log_error_errno(r, "Failed to wait for event: %m"); - - r = flush_fd(watch_fd); - if (r < 0) - return log_error_errno(r, "Failed to flush inotify events: %m"); - } -} - -static int send_signal_and_wait(int sig, const char *watch_path) { - _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL; - _cleanup_close_ int watch_fd = -1; - usec_t start; - int r; - - if (arg_machine) { - log_error("--sync and --rotate are not supported in conjunction with --machine=."); - return -EOPNOTSUPP; - } - - start = now(CLOCK_MONOTONIC); - - /* This call sends the specified signal to journald, and waits - * for acknowledgment by watching the mtime of the specified - * flag file. This is used to trigger syncing or rotation and - * then wait for the operation to complete. */ - - for (;;) { - usec_t tstamp; - - /* See if a sync happened by now. */ - r = read_timestamp_file(watch_path, &tstamp); - if (r < 0 && r != -ENOENT) - return log_error_errno(r, "Failed to read %s: %m", watch_path); - if (r >= 0 && tstamp >= start) - return 0; - - /* Let's ask for a sync, but only once. */ - if (!bus) { - _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL; - - r = bus_connect_system_systemd(&bus); - if (r < 0) - return log_error_errno(r, "Failed to get D-Bus connection: %m"); - - r = sd_bus_call_method( - bus, - "org.freedesktop.systemd1", - "/org/freedesktop/systemd1", - "org.freedesktop.systemd1.Manager", - "KillUnit", - &error, - NULL, - "ssi", "systemd-journald.service", "main", sig); - if (r < 0) - return log_error_errno(r, "Failed to kill journal service: %s", bus_error_message(&error, r)); - - continue; - } - - /* Let's install the inotify watch, if we didn't do that yet. */ - if (watch_fd < 0) { - watch_fd = watch_run_systemd_journal(IN_MOVED_TO|IN_DONT_FOLLOW|IN_ONLYDIR); - if (watch_fd < 0) - return watch_fd; - - /* Recheck the flag file immediately, so that we don't miss any event since the last check. */ - continue; - } - - /* OK, all preparatory steps done, let's wait until inotify reports an event. */ - - r = fd_wait_for_event(watch_fd, POLLIN, USEC_INFINITY); - if (r < 0) - return log_error_errno(r, "Failed to wait for event: %m"); - - r = flush_fd(watch_fd); - if (r < 0) - return log_error_errno(r, "Failed to flush inotify events: %m"); - } + return log_error_errno(r, "Failed to execute operation: %s", error); return 0; } +static int flush_to_var(void) { + return simple_varlink_call("--flush", "io.systemd.Journal.FlushToVar"); +} + static int rotate(void) { - return send_signal_and_wait(SIGUSR2, "/run/systemd/journal/rotated"); + return simple_varlink_call("--rotate", "io.systemd.Journal.Rotate"); } static int sync_journal(void) { - return send_signal_and_wait(SIGRTMIN+1, "/run/systemd/journal/synced"); + return simple_varlink_call("--sync", "io.systemd.Journal.Synchronize"); } static int wait_for_change(sd_journal *j, int poll_fd) { diff --git a/src/journal/journald-server.c b/src/journal/journald-server.c index b5b11474f9..8ee7d78bae 100644 --- a/src/journal/journald-server.c +++ b/src/journal/journald-server.c @@ -1825,8 +1825,37 @@ static int server_connect_notify(Server *s) { return 0; } +static int synchronize_second_half(sd_event_source *event_source, void *userdata) { + Varlink *link = userdata; + Server *s; + int r; + + assert(link); + assert_se(s = varlink_get_userdata(link)); + + /* This is the "second half" of the Synchronize() varlink method. This function is called as deferred + * event source at a low priority to ensure the synchronization completes after all queued log + * messages are processed. */ + server_full_sync(s); + + /* Let's get rid of the event source now, by marking it as non-floating again. It then has no ref + * anymore and is immediately destroyed after we return from this function, i.e. from this event + * source handler at the end. */ + r = sd_event_source_set_floating(event_source, false); + if (r < 0) + return log_error_errno(r, "Failed to mark event source as non-floating: %m"); + + return varlink_reply(link, NULL); +} + +static void synchronize_destroy(void *userdata) { + varlink_unref(userdata); +} + static int vl_method_synchronize(Varlink *link, JsonVariant *parameters, VarlinkMethodFlags flags, void *userdata) { + _cleanup_(sd_event_source_unrefp) sd_event_source *event_source = NULL; Server *s = userdata; + int r; assert(link); assert(s); @@ -1835,9 +1864,34 @@ static int vl_method_synchronize(Varlink *link, JsonVariant *parameters, Varlink return varlink_error_invalid_parameter(link, parameters); log_info("Received client request to rotate journal."); - server_full_sync(s); - return varlink_reply(link, NULL); + /* We don't do the main work now, but instead enqueue a deferred event loop job which will do + * it. That job is scheduled at low priority, so that we return from this method call only after all + * queued but not processed log messages are written to disk, so that this method call returning can + * be used as nice synchronization point. */ + r = sd_event_add_defer(s->event, &event_source, synchronize_second_half, link); + if (r < 0) + return log_error_errno(r, "Failed to allocate defer event source: %m"); + + r = sd_event_source_set_destroy_callback(event_source, synchronize_destroy); + if (r < 0) + return log_error_errno(r, "Failed to set event source destroy callback: %m"); + + varlink_ref(link); /* The varlink object is now left to the destroy callack to unref */ + + r = sd_event_source_set_priority(event_source, SD_EVENT_PRIORITY_NORMAL+15); + if (r < 0) + return log_error_errno(r, "Failed to set defer event source priority: %m"); + + /* Give up ownership of this event source. It will now be destroyed along with event loop itself, + * unless it destroys itself earlier. */ + r = sd_event_source_set_floating(event_source, true); + if (r < 0) + return log_error_errno(r, "Failed to mark event source as floating: %m"); + + (void) sd_event_source_set_description(event_source, "deferred-sync"); + + return 0; } static int vl_method_rotate(Varlink *link, JsonVariant *parameters, VarlinkMethodFlags flags, void *userdata) {