journal: asynchronous journal_file_set_offline()

This adds a wait flag to journal_file_set_offline(), when false the offline is
performed asynchronously in a separate thread.

When wait is true, if an asynchronous offline is already in-progress it is
restarted and waited for.  Otherwise the offline is performed synchronously
without the use of a thread.

journal_file_set_online() cancels or waits for the asynchronous offline to
complete if in-flight, depending on where in the offline process the thread
happens to be.  If the thread is in the fsync() phase, it is cancelled and
waiting is unnecessary.  Otherwise, the thread is joined before proceeding.

A new offline_state member is added to JournalFile which is used via
atomic operations for communicating between the offline thread and the
journal_file_set_{offline,online}() functions.
This commit is contained in:
Vito Caputo 2016-02-12 04:59:57 -08:00
parent 69a3a6fd3d
commit ac2e41f510
3 changed files with 216 additions and 32 deletions

View file

@ -20,6 +20,7 @@
#include <errno.h>
#include <fcntl.h>
#include <linux/fs.h>
#include <pthread.h>
#include <stddef.h>
#include <sys/mman.h>
#include <sys/statvfs.h>
@ -86,7 +87,127 @@
/* The mmap context to use for the header we pick as one above the last defined typed */
#define CONTEXT_HEADER _OBJECT_TYPE_MAX
static int journal_file_set_online(JournalFile *f) {
/* This may be called from a separate thread to prevent blocking the caller for the duration of fsync().
* As a result we use atomic operations on f->offline_state for inter-thread communications with
* journal_file_set_offline() and journal_file_set_online(). */
static void journal_file_set_offline_internal(JournalFile *f) {
assert(f);
assert(f->fd >= 0);
assert(f->header);
for (;;) {
switch (f->offline_state) {
case OFFLINE_CANCEL:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_DONE))
continue;
return;
case OFFLINE_AGAIN_FROM_SYNCING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_SYNCING))
continue;
break;
case OFFLINE_AGAIN_FROM_OFFLINING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_SYNCING))
continue;
break;
case OFFLINE_SYNCING:
(void) fsync(f->fd);
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_OFFLINING))
continue;
f->header->state = STATE_OFFLINE;
(void) fsync(f->fd);
break;
case OFFLINE_OFFLINING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_DONE))
continue;
/* fall through */
case OFFLINE_DONE:
return;
case OFFLINE_JOINED:
log_debug("OFFLINE_JOINED unexpected offline state for journal_file_set_offline_internal()");
return;
}
}
}
static void * journal_file_set_offline_thread(void *arg) {
JournalFile *f = arg;
journal_file_set_offline_internal(f);
return NULL;
}
static int journal_file_set_offline_thread_join(JournalFile *f) {
int r;
assert(f);
if (f->offline_state == OFFLINE_JOINED)
return 0;
r = pthread_join(f->offline_thread, NULL);
if (r)
return -r;
f->offline_state = OFFLINE_JOINED;
if (mmap_cache_got_sigbus(f->mmap, f->fd))
return -EIO;
return 0;
}
/* Trigger a restart if the offline thread is mid-flight in a restartable state. */
static bool journal_file_set_offline_try_restart(JournalFile *f) {
for (;;) {
switch (f->offline_state) {
case OFFLINE_AGAIN_FROM_SYNCING:
case OFFLINE_AGAIN_FROM_OFFLINING:
return true;
case OFFLINE_CANCEL:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_AGAIN_FROM_SYNCING))
continue;
return true;
case OFFLINE_SYNCING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_AGAIN_FROM_SYNCING))
continue;
return true;
case OFFLINE_OFFLINING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_AGAIN_FROM_OFFLINING))
continue;
return true;
default:
return false;
}
}
}
/* Sets a journal offline.
*
* If wait is false then an offline is dispatched in a separate thread for a
* subsequent journal_file_set_offline() or journal_file_set_online() of the
* same journal to synchronize with.
*
* If wait is true, then either an existing offline thread will be restarted
* and joined, or if none exists the offline is simply performed in this
* context without involving another thread.
*/
int journal_file_set_offline(JournalFile *f, bool wait) {
bool restarted;
int r;
assert(f);
if (!f->writable)
@ -95,6 +216,83 @@ static int journal_file_set_online(JournalFile *f) {
if (!(f->fd >= 0 && f->header))
return -EINVAL;
if (f->header->state != STATE_ONLINE)
return 0;
/* Restart an in-flight offline thread and wait if needed, or join a lingering done one. */
restarted = journal_file_set_offline_try_restart(f);
if ((restarted && wait) || !restarted) {
r = journal_file_set_offline_thread_join(f);
if (r < 0)
return r;
}
if (restarted)
return 0;
/* Initiate a new offline. */
f->offline_state = OFFLINE_SYNCING;
if (wait) /* Without using a thread if waiting. */
journal_file_set_offline_internal(f);
else {
r = pthread_create(&f->offline_thread, NULL, journal_file_set_offline_thread, f);
if (r > 0)
return -r;
}
return 0;
}
static int journal_file_set_online(JournalFile *f) {
bool joined = false;
assert(f);
if (!f->writable)
return -EPERM;
if (!(f->fd >= 0 && f->header))
return -EINVAL;
while (!joined) {
switch (f->offline_state) {
case OFFLINE_JOINED:
/* No offline thread, no need to wait. */
joined = true;
break;
case OFFLINE_SYNCING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_CANCEL))
continue;
/* Canceled syncing prior to offlining, no need to wait. */
break;
case OFFLINE_AGAIN_FROM_SYNCING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_CANCEL))
continue;
/* Canceled restart from syncing, no need to wait. */
break;
case OFFLINE_AGAIN_FROM_OFFLINING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_CANCEL))
continue;
/* Canceled restart from offlining, must wait for offlining to complete however. */
/* fall through to wait */
default: {
int r;
r = journal_file_set_offline_thread_join(f);
if (r < 0)
return r;
joined = true;
break;
}
}
}
if (mmap_cache_got_sigbus(f->mmap, f->fd))
return -EIO;
@ -112,33 +310,6 @@ static int journal_file_set_online(JournalFile *f) {
}
}
int journal_file_set_offline(JournalFile *f) {
assert(f);
if (!f->writable)
return -EPERM;
if (!(f->fd >= 0 && f->header))
return -EINVAL;
if (f->header->state != STATE_ONLINE)
return 0;
(void) fsync(f->fd);
if (mmap_cache_got_sigbus(f->mmap, f->fd))
return -EIO;
f->header->state = STATE_OFFLINE;
if (mmap_cache_got_sigbus(f->mmap, f->fd))
return -EIO;
(void) fsync(f->fd);
return 0;
}
JournalFile* journal_file_close(JournalFile *f) {
assert(f);
@ -159,7 +330,7 @@ JournalFile* journal_file_close(JournalFile *f) {
sd_event_source_unref(f->post_change_timer);
}
journal_file_set_offline(f);
journal_file_set_offline(f, true);
if (f->mmap && f->fd >= 0)
mmap_cache_close_fd(f->mmap, f->fd);

View file

@ -63,6 +63,16 @@ typedef enum LocationType {
LOCATION_SEEK
} LocationType;
typedef enum OfflineState {
OFFLINE_JOINED,
OFFLINE_SYNCING,
OFFLINE_OFFLINING,
OFFLINE_CANCEL,
OFFLINE_AGAIN_FROM_SYNCING,
OFFLINE_AGAIN_FROM_OFFLINING,
OFFLINE_DONE
} OfflineState;
typedef struct JournalFile {
int fd;
@ -105,6 +115,9 @@ typedef struct JournalFile {
OrderedHashmap *chain_cache;
pthread_t offline_thread;
volatile OfflineState offline_state;
#if defined(HAVE_XZ) || defined(HAVE_LZ4)
void *compress_buffer;
size_t compress_buffer_size;
@ -139,7 +152,7 @@ int journal_file_open(
JournalFile *template,
JournalFile **ret);
int journal_file_set_offline(JournalFile *f);
int journal_file_set_offline(JournalFile *f, bool wait);
JournalFile* journal_file_close(JournalFile *j);
int journal_file_open_reliably(

View file

@ -372,13 +372,13 @@ void server_sync(Server *s) {
int r;
if (s->system_journal) {
r = journal_file_set_offline(s->system_journal);
r = journal_file_set_offline(s->system_journal, false);
if (r < 0)
log_warning_errno(r, "Failed to sync system journal, ignoring: %m");
}
ORDERED_HASHMAP_FOREACH(f, s->user_journals, i) {
r = journal_file_set_offline(f);
r = journal_file_set_offline(f, false);
if (r < 0)
log_warning_errno(r, "Failed to sync user journal, ignoring: %m");
}