Systemd/src/basic/journal-importer.c
Zbigniew Jędrzejewski-Szmek 11a1589223 tree-wide: drop license boilerplate
Files which are installed as-is (any .service and other unit files, .conf
files, .policy files, etc), are left as is. My assumption is that SPDX
identifiers are not yet that well known, so it's better to retain the
extended header to avoid any doubt.

I also kept any copyright lines. We can probably remove them, but it'd nice to
obtain explicit acks from all involved authors before doing that.
2018-04-06 18:58:55 +02:00

472 lines
14 KiB
C

/* SPDX-License-Identifier: LGPL-2.1+ */
/***
This file is part of systemd.
Copyright 2014 Zbigniew Jędrzejewski-Szmek
***/
#include <errno.h>
#include <unistd.h>
#include "alloc-util.h"
#include "fd-util.h"
#include "io-util.h"
#include "journal-importer.h"
#include "parse-util.h"
#include "string-util.h"
#include "unaligned.h"
enum {
IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
IMPORTER_STATE_DATA_START, /* reading binary data header */
IMPORTER_STATE_DATA, /* reading binary data */
IMPORTER_STATE_DATA_FINISH, /* expecting newline */
IMPORTER_STATE_EOF, /* done */
};
static int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
if (!GREEDY_REALLOC(iovw->iovec, iovw->size_bytes, iovw->count + 1))
return log_oom();
iovw->iovec[iovw->count++] = IOVEC_MAKE(data, len);
return 0;
}
static void iovw_free_contents(struct iovec_wrapper *iovw) {
iovw->iovec = mfree(iovw->iovec);
iovw->size_bytes = iovw->count = 0;
}
static void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
size_t i;
for (i = 0; i < iovw->count; i++)
iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
}
size_t iovw_size(struct iovec_wrapper *iovw) {
size_t n = 0, i;
for (i = 0; i < iovw->count; i++)
n += iovw->iovec[i].iov_len;
return n;
}
void journal_importer_cleanup(JournalImporter *imp) {
if (imp->fd >= 0 && !imp->passive_fd) {
log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
safe_close(imp->fd);
}
free(imp->name);
free(imp->buf);
iovw_free_contents(&imp->iovw);
}
static char* realloc_buffer(JournalImporter *imp, size_t size) {
char *b, *old = imp->buf;
b = GREEDY_REALLOC(imp->buf, imp->size, size);
if (!b)
return NULL;
iovw_rebase(&imp->iovw, old, imp->buf);
return b;
}
static int get_line(JournalImporter *imp, char **line, size_t *size) {
ssize_t n;
char *c = NULL;
assert(imp);
assert(imp->state == IMPORTER_STATE_LINE);
assert(imp->offset <= imp->filled);
assert(imp->filled <= imp->size);
assert(!imp->buf || imp->size > 0);
assert(imp->fd >= 0);
for (;;) {
if (imp->buf) {
size_t start = MAX(imp->scanned, imp->offset);
c = memchr(imp->buf + start, '\n',
imp->filled - start);
if (c != NULL)
break;
}
imp->scanned = imp->filled;
if (imp->scanned >= DATA_SIZE_MAX) {
log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
return -E2BIG;
}
if (imp->passive_fd)
/* we have to wait for some data to come to us */
return -EAGAIN;
/* We know that imp->filled is at most DATA_SIZE_MAX, so if
we reallocate it, we'll increase the size at least a bit. */
assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
if (imp->size - imp->filled < LINE_CHUNK &&
!realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
return log_oom();
assert(imp->buf);
assert(imp->size - imp->filled >= LINE_CHUNK ||
imp->size == ENTRY_SIZE_MAX);
n = read(imp->fd,
imp->buf + imp->filled,
imp->size - imp->filled);
if (n < 0) {
if (errno != EAGAIN)
log_error_errno(errno, "read(%d, ..., %zu): %m",
imp->fd,
imp->size - imp->filled);
return -errno;
} else if (n == 0)
return 0;
imp->filled += n;
}
*line = imp->buf + imp->offset;
*size = c + 1 - imp->buf - imp->offset;
imp->offset += *size;
return 1;
}
static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
assert(imp);
assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINISH));
assert(size <= DATA_SIZE_MAX);
assert(imp->offset <= imp->filled);
assert(imp->filled <= imp->size);
assert(imp->buf || imp->size == 0);
assert(!imp->buf || imp->size > 0);
assert(imp->fd >= 0);
assert(data);
while (imp->filled - imp->offset < size) {
int n;
if (imp->passive_fd)
/* we have to wait for some data to come to us */
return -EAGAIN;
if (!realloc_buffer(imp, imp->offset + size))
return log_oom();
n = read(imp->fd, imp->buf + imp->filled,
imp->size - imp->filled);
if (n < 0) {
if (errno != EAGAIN)
log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
imp->size - imp->filled);
return -errno;
} else if (n == 0)
return 0;
imp->filled += n;
}
*data = imp->buf + imp->offset;
imp->offset += size;
return 1;
}
static int get_data_size(JournalImporter *imp) {
int r;
void *data;
assert(imp);
assert(imp->state == IMPORTER_STATE_DATA_START);
assert(imp->data_size == 0);
r = fill_fixed_size(imp, &data, sizeof(uint64_t));
if (r <= 0)
return r;
imp->data_size = unaligned_read_le64(data);
if (imp->data_size > DATA_SIZE_MAX) {
log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
imp->data_size, DATA_SIZE_MAX);
return -EINVAL;
}
if (imp->data_size == 0)
log_warning("Binary field with zero length");
return 1;
}
static int get_data_data(JournalImporter *imp, void **data) {
int r;
assert(imp);
assert(data);
assert(imp->state == IMPORTER_STATE_DATA);
r = fill_fixed_size(imp, data, imp->data_size);
if (r <= 0)
return r;
return 1;
}
static int get_data_newline(JournalImporter *imp) {
int r;
char *data;
assert(imp);
assert(imp->state == IMPORTER_STATE_DATA_FINISH);
r = fill_fixed_size(imp, (void**) &data, 1);
if (r <= 0)
return r;
assert(data);
if (*data != '\n') {
log_error("expected newline, got '%c'", *data);
return -EINVAL;
}
return 1;
}
static int process_dunder(JournalImporter *imp, char *line, size_t n) {
const char *timestamp;
int r;
assert(line);
assert(n > 0);
assert(line[n-1] == '\n');
/* XXX: is it worth to support timestamps in extended format?
* We don't produce them, but who knows... */
timestamp = startswith(line, "__CURSOR=");
if (timestamp)
/* ignore __CURSOR */
return 1;
timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
if (timestamp) {
long long unsigned x;
line[n-1] = '\0';
r = safe_atollu(timestamp, &x);
if (r < 0)
log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
else
imp->ts.realtime = x;
return r < 0 ? r : 1;
}
timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
if (timestamp) {
long long unsigned x;
line[n-1] = '\0';
r = safe_atollu(timestamp, &x);
if (r < 0)
log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
else
imp->ts.monotonic = x;
return r < 0 ? r : 1;
}
timestamp = startswith(line, "__");
if (timestamp) {
log_notice("Unknown dunder line %s", line);
return 1;
}
/* no dunder */
return 0;
}
int journal_importer_process_data(JournalImporter *imp) {
int r;
switch(imp->state) {
case IMPORTER_STATE_LINE: {
char *line, *sep;
size_t n = 0;
assert(imp->data_size == 0);
r = get_line(imp, &line, &n);
if (r < 0)
return r;
if (r == 0) {
imp->state = IMPORTER_STATE_EOF;
return 0;
}
assert(n > 0);
assert(line[n-1] == '\n');
if (n == 1) {
log_trace("Received empty line, event is ready");
return 1;
}
r = process_dunder(imp, line, n);
if (r != 0)
return r < 0 ? r : 0;
/* MESSAGE=xxx\n
or
COREDUMP\n
LLLLLLLL0011223344...\n
*/
sep = memchr(line, '=', n);
if (sep) {
/* chomp newline */
n--;
r = iovw_put(&imp->iovw, line, n);
if (r < 0)
return r;
} else {
/* replace \n with = */
line[n-1] = '=';
imp->field_len = n;
imp->state = IMPORTER_STATE_DATA_START;
/* we cannot put the field in iovec until we have all data */
}
log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
return 0; /* continue */
}
case IMPORTER_STATE_DATA_START:
assert(imp->data_size == 0);
r = get_data_size(imp);
// log_debug("get_data_size() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
imp->state = IMPORTER_STATE_EOF;
return 0;
}
imp->state = imp->data_size > 0 ?
IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
return 0; /* continue */
case IMPORTER_STATE_DATA: {
void *data;
char *field;
assert(imp->data_size > 0);
r = get_data_data(imp, &data);
// log_debug("get_data_data() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
imp->state = IMPORTER_STATE_EOF;
return 0;
}
assert(data);
field = (char*) data - sizeof(uint64_t) - imp->field_len;
memmove(field + sizeof(uint64_t), field, imp->field_len);
r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
if (r < 0)
return r;
imp->state = IMPORTER_STATE_DATA_FINISH;
return 0; /* continue */
}
case IMPORTER_STATE_DATA_FINISH:
r = get_data_newline(imp);
// log_debug("get_data_newline() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
imp->state = IMPORTER_STATE_EOF;
return 0;
}
imp->data_size = 0;
imp->state = IMPORTER_STATE_LINE;
return 0; /* continue */
default:
assert_not_reached("wtf?");
}
}
int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
assert(imp);
assert(imp->state != IMPORTER_STATE_EOF);
if (!realloc_buffer(imp, imp->filled + size)) {
log_error("Failed to store received data of size %zu "
"(in addition to existing %zu bytes with %zu filled): %s",
size, imp->size, imp->filled, strerror(ENOMEM));
return -ENOMEM;
}
memcpy(imp->buf + imp->filled, data, size);
imp->filled += size;
return 0;
}
void journal_importer_drop_iovw(JournalImporter *imp) {
size_t remain, target;
/* This function drops processed data that along with the iovw that points at it */
iovw_free_contents(&imp->iovw);
/* possibly reset buffer position */
remain = imp->filled - imp->offset;
if (remain == 0) /* no brainer */
imp->offset = imp->scanned = imp->filled = 0;
else if (imp->offset > imp->size - imp->filled &&
imp->offset > remain) {
memcpy(imp->buf, imp->buf + imp->offset, remain);
imp->offset = imp->scanned = 0;
imp->filled = remain;
}
target = imp->size;
while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
target /= 2;
if (target < imp->size) {
char *tmp;
tmp = realloc(imp->buf, target);
if (!tmp)
log_warning("Failed to reallocate buffer to (smaller) size %zu",
target);
else {
log_debug("Reallocated buffer from %zu to %zu bytes",
imp->size, target);
imp->buf = tmp;
imp->size = target;
}
}
}
bool journal_importer_eof(const JournalImporter *imp) {
return imp->state == IMPORTER_STATE_EOF;
}