Systemd/src/basic/journal-importer.c
Lennart Poettering e6a7ec4b8e io-util: add new IOVEC_INIT/IOVEC_MAKE macros
This adds IOVEC_INIT() and IOVEC_MAKE() for initializing iovec structures
from a pointer and a size. On top of these IOVEC_INIT_STRING() and
IOVEC_MAKE_STRING() are added which take a string and automatically
determine the size of the string using strlen().

This patch removes the old IOVEC_SET_STRING() macro, given that
IOVEC_MAKE_STRING() is now useful for similar purposes. Note that the
old IOVEC_SET_STRING() invocations were two characters shorter than the
new ones using IOVEC_MAKE_STRING(), but I think the new syntax is more
readable and more generic as it simply resolves to a C99 literal
structure initialization. Moreover, we can use very similar syntax now
for initializing strings and pointer+size iovec entries. We canalso use
the new macros to initialize function parameters on-the-fly or array
definitions. And given that we shouldn't have so many ways to do the
same stuff, let's just settle on the new macros.

(This also converts some code to use _cleanup_ where dynamically
allocated strings were using IOVEC_SET_STRING() before, to modernize
things a bit)
2017-09-22 15:28:04 +02:00

485 lines
15 KiB
C

/***
This file is part of systemd.
Copyright 2014 Zbigniew Jędrzejewski-Szmek
systemd is free software; you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
systemd is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with systemd; If not, see <http://www.gnu.org/licenses/>.
***/
#include <unistd.h>
#include "alloc-util.h"
#include "fd-util.h"
#include "io-util.h"
#include "journal-importer.h"
#include "parse-util.h"
#include "string-util.h"
#include "unaligned.h"
enum {
IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
IMPORTER_STATE_DATA_START, /* reading binary data header */
IMPORTER_STATE_DATA, /* reading binary data */
IMPORTER_STATE_DATA_FINISH, /* expecting newline */
IMPORTER_STATE_EOF, /* done */
};
static int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
if (!GREEDY_REALLOC(iovw->iovec, iovw->size_bytes, iovw->count + 1))
return log_oom();
iovw->iovec[iovw->count++] = IOVEC_MAKE(data, len);
return 0;
}
static void iovw_free_contents(struct iovec_wrapper *iovw) {
iovw->iovec = mfree(iovw->iovec);
iovw->size_bytes = iovw->count = 0;
}
static void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
size_t i;
for (i = 0; i < iovw->count; i++)
iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
}
size_t iovw_size(struct iovec_wrapper *iovw) {
size_t n = 0, i;
for (i = 0; i < iovw->count; i++)
n += iovw->iovec[i].iov_len;
return n;
}
void journal_importer_cleanup(JournalImporter *imp) {
if (imp->fd >= 0 && !imp->passive_fd) {
log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
safe_close(imp->fd);
}
free(imp->name);
free(imp->buf);
iovw_free_contents(&imp->iovw);
}
static char* realloc_buffer(JournalImporter *imp, size_t size) {
char *b, *old = imp->buf;
b = GREEDY_REALLOC(imp->buf, imp->size, size);
if (!b)
return NULL;
iovw_rebase(&imp->iovw, old, imp->buf);
return b;
}
static int get_line(JournalImporter *imp, char **line, size_t *size) {
ssize_t n;
char *c = NULL;
assert(imp);
assert(imp->state == IMPORTER_STATE_LINE);
assert(imp->offset <= imp->filled);
assert(imp->filled <= imp->size);
assert(imp->buf == NULL || imp->size > 0);
assert(imp->fd >= 0);
for (;;) {
if (imp->buf) {
size_t start = MAX(imp->scanned, imp->offset);
c = memchr(imp->buf + start, '\n',
imp->filled - start);
if (c != NULL)
break;
}
imp->scanned = imp->filled;
if (imp->scanned >= DATA_SIZE_MAX) {
log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
return -E2BIG;
}
if (imp->passive_fd)
/* we have to wait for some data to come to us */
return -EAGAIN;
/* We know that imp->filled is at most DATA_SIZE_MAX, so if
we reallocate it, we'll increase the size at least a bit. */
assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
if (imp->size - imp->filled < LINE_CHUNK &&
!realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
return log_oom();
assert(imp->buf);
assert(imp->size - imp->filled >= LINE_CHUNK ||
imp->size == ENTRY_SIZE_MAX);
n = read(imp->fd,
imp->buf + imp->filled,
imp->size - imp->filled);
if (n < 0) {
if (errno != EAGAIN)
log_error_errno(errno, "read(%d, ..., %zu): %m",
imp->fd,
imp->size - imp->filled);
return -errno;
} else if (n == 0)
return 0;
imp->filled += n;
}
*line = imp->buf + imp->offset;
*size = c + 1 - imp->buf - imp->offset;
imp->offset += *size;
return 1;
}
static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
assert(imp);
assert(imp->state == IMPORTER_STATE_DATA_START ||
imp->state == IMPORTER_STATE_DATA ||
imp->state == IMPORTER_STATE_DATA_FINISH);
assert(size <= DATA_SIZE_MAX);
assert(imp->offset <= imp->filled);
assert(imp->filled <= imp->size);
assert(imp->buf != NULL || imp->size == 0);
assert(imp->buf == NULL || imp->size > 0);
assert(imp->fd >= 0);
assert(data);
while (imp->filled - imp->offset < size) {
int n;
if (imp->passive_fd)
/* we have to wait for some data to come to us */
return -EAGAIN;
if (!realloc_buffer(imp, imp->offset + size))
return log_oom();
n = read(imp->fd, imp->buf + imp->filled,
imp->size - imp->filled);
if (n < 0) {
if (errno != EAGAIN)
log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
imp->size - imp->filled);
return -errno;
} else if (n == 0)
return 0;
imp->filled += n;
}
*data = imp->buf + imp->offset;
imp->offset += size;
return 1;
}
static int get_data_size(JournalImporter *imp) {
int r;
void *data;
assert(imp);
assert(imp->state == IMPORTER_STATE_DATA_START);
assert(imp->data_size == 0);
r = fill_fixed_size(imp, &data, sizeof(uint64_t));
if (r <= 0)
return r;
imp->data_size = unaligned_read_le64(data);
if (imp->data_size > DATA_SIZE_MAX) {
log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
imp->data_size, DATA_SIZE_MAX);
return -EINVAL;
}
if (imp->data_size == 0)
log_warning("Binary field with zero length");
return 1;
}
static int get_data_data(JournalImporter *imp, void **data) {
int r;
assert(imp);
assert(data);
assert(imp->state == IMPORTER_STATE_DATA);
r = fill_fixed_size(imp, data, imp->data_size);
if (r <= 0)
return r;
return 1;
}
static int get_data_newline(JournalImporter *imp) {
int r;
char *data;
assert(imp);
assert(imp->state == IMPORTER_STATE_DATA_FINISH);
r = fill_fixed_size(imp, (void**) &data, 1);
if (r <= 0)
return r;
assert(data);
if (*data != '\n') {
log_error("expected newline, got '%c'", *data);
return -EINVAL;
}
return 1;
}
static int process_dunder(JournalImporter *imp, char *line, size_t n) {
const char *timestamp;
int r;
assert(line);
assert(n > 0);
assert(line[n-1] == '\n');
/* XXX: is it worth to support timestamps in extended format?
* We don't produce them, but who knows... */
timestamp = startswith(line, "__CURSOR=");
if (timestamp)
/* ignore __CURSOR */
return 1;
timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
if (timestamp) {
long long unsigned x;
line[n-1] = '\0';
r = safe_atollu(timestamp, &x);
if (r < 0)
log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
else
imp->ts.realtime = x;
return r < 0 ? r : 1;
}
timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
if (timestamp) {
long long unsigned x;
line[n-1] = '\0';
r = safe_atollu(timestamp, &x);
if (r < 0)
log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
else
imp->ts.monotonic = x;
return r < 0 ? r : 1;
}
timestamp = startswith(line, "__");
if (timestamp) {
log_notice("Unknown dunder line %s", line);
return 1;
}
/* no dunder */
return 0;
}
int journal_importer_process_data(JournalImporter *imp) {
int r;
switch(imp->state) {
case IMPORTER_STATE_LINE: {
char *line, *sep;
size_t n = 0;
assert(imp->data_size == 0);
r = get_line(imp, &line, &n);
if (r < 0)
return r;
if (r == 0) {
imp->state = IMPORTER_STATE_EOF;
return 0;
}
assert(n > 0);
assert(line[n-1] == '\n');
if (n == 1) {
log_trace("Received empty line, event is ready");
return 1;
}
r = process_dunder(imp, line, n);
if (r != 0)
return r < 0 ? r : 0;
/* MESSAGE=xxx\n
or
COREDUMP\n
LLLLLLLL0011223344...\n
*/
sep = memchr(line, '=', n);
if (sep) {
/* chomp newline */
n--;
r = iovw_put(&imp->iovw, line, n);
if (r < 0)
return r;
} else {
/* replace \n with = */
line[n-1] = '=';
imp->field_len = n;
imp->state = IMPORTER_STATE_DATA_START;
/* we cannot put the field in iovec until we have all data */
}
log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
return 0; /* continue */
}
case IMPORTER_STATE_DATA_START:
assert(imp->data_size == 0);
r = get_data_size(imp);
// log_debug("get_data_size() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
imp->state = IMPORTER_STATE_EOF;
return 0;
}
imp->state = imp->data_size > 0 ?
IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
return 0; /* continue */
case IMPORTER_STATE_DATA: {
void *data;
char *field;
assert(imp->data_size > 0);
r = get_data_data(imp, &data);
// log_debug("get_data_data() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
imp->state = IMPORTER_STATE_EOF;
return 0;
}
assert(data);
field = (char*) data - sizeof(uint64_t) - imp->field_len;
memmove(field + sizeof(uint64_t), field, imp->field_len);
r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
if (r < 0)
return r;
imp->state = IMPORTER_STATE_DATA_FINISH;
return 0; /* continue */
}
case IMPORTER_STATE_DATA_FINISH:
r = get_data_newline(imp);
// log_debug("get_data_newline() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
imp->state = IMPORTER_STATE_EOF;
return 0;
}
imp->data_size = 0;
imp->state = IMPORTER_STATE_LINE;
return 0; /* continue */
default:
assert_not_reached("wtf?");
}
}
int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
assert(imp);
assert(imp->state != IMPORTER_STATE_EOF);
if (!realloc_buffer(imp, imp->filled + size)) {
log_error("Failed to store received data of size %zu "
"(in addition to existing %zu bytes with %zu filled): %s",
size, imp->size, imp->filled, strerror(ENOMEM));
return -ENOMEM;
}
memcpy(imp->buf + imp->filled, data, size);
imp->filled += size;
return 0;
}
void journal_importer_drop_iovw(JournalImporter *imp) {
size_t remain, target;
/* This function drops processed data that along with the iovw that points at it */
iovw_free_contents(&imp->iovw);
/* possibly reset buffer position */
remain = imp->filled - imp->offset;
if (remain == 0) /* no brainer */
imp->offset = imp->scanned = imp->filled = 0;
else if (imp->offset > imp->size - imp->filled &&
imp->offset > remain) {
memcpy(imp->buf, imp->buf + imp->offset, remain);
imp->offset = imp->scanned = 0;
imp->filled = remain;
}
target = imp->size;
while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
target /= 2;
if (target < imp->size) {
char *tmp;
tmp = realloc(imp->buf, target);
if (!tmp)
log_warning("Failed to reallocate buffer to (smaller) size %zu",
target);
else {
log_debug("Reallocated buffer from %zu to %zu bytes",
imp->size, target);
imp->buf = tmp;
imp->size = target;
}
}
}
bool journal_importer_eof(const JournalImporter *imp) {
return imp->state == IMPORTER_STATE_EOF;
}