journal-remote: rework fd and writer reference handling

This commit is contained in:
Zbigniew Jędrzejewski-Szmek 2014-07-02 00:15:37 -04:00
parent a83f403760
commit 9ff48d0982
7 changed files with 284 additions and 214 deletions

View file

@ -3455,6 +3455,7 @@ systemd_journal_remote_SOURCES = \
src/journal-remote/journal-remote-parse.c \ src/journal-remote/journal-remote-parse.c \
src/journal-remote/journal-remote-write.h \ src/journal-remote/journal-remote-write.h \
src/journal-remote/journal-remote-write.c \ src/journal-remote/journal-remote-write.c \
src/journal-remote/journal-remote.h \
src/journal-remote/journal-remote.c src/journal-remote/journal-remote.c
systemd_journal_remote_LDADD = \ systemd_journal_remote_LDADD = \

View file

@ -28,19 +28,48 @@ void source_free(RemoteSource *source) {
if (!source) if (!source)
return; return;
if (source->fd >= 0) { if (source->fd >= 0 && !source->passive_fd) {
log_debug("Closing fd:%d (%s)", source->fd, source->name); log_debug("Closing fd:%d (%s)", source->fd, source->name);
safe_close(source->fd); safe_close(source->fd);
} }
free(source->name); free(source->name);
free(source->buf); free(source->buf);
iovw_free_contents(&source->iovw); iovw_free_contents(&source->iovw);
log_debug("Writer ref count %u", source->writer->n_ref);
writer_unref(source->writer);
sd_event_source_unref(source->event); sd_event_source_unref(source->event);
free(source); free(source);
} }
/**
* Initialize zero-filled source with given values. On success, takes
* ownerhship of fd and writer, otherwise does not touch them.
*/
RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
RemoteSource *source;
log_debug("Creating source for %sfd:%d (%s)",
passive_fd ? "passive " : "", fd, name);
assert(fd >= 0);
source = new0(RemoteSource, 1);
if (!source)
return NULL;
source->fd = fd;
source->passive_fd = passive_fd;
source->name = name;
source->writer = writer;
return source;
}
static int get_line(RemoteSource *source, char **line, size_t *size) { static int get_line(RemoteSource *source, char **line, size_t *size) {
ssize_t n, remain; ssize_t n, remain;
char *c = NULL; char *c = NULL;
@ -51,6 +80,7 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
assert(source->state == STATE_LINE); assert(source->state == STATE_LINE);
assert(source->filled <= source->size); assert(source->filled <= source->size);
assert(source->buf == NULL || source->size > 0); assert(source->buf == NULL || source->size > 0);
assert(source->fd >= 0);
while (true) { while (true) {
if (source->buf) if (source->buf)
@ -65,7 +95,7 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
return -E2BIG; return -E2BIG;
} }
if (source->fd < 0) if (source->passive_fd)
/* we have to wait for some data to come to us */ /* we have to wait for some data to come to us */
return -EWOULDBLOCK; return -EWOULDBLOCK;
@ -146,10 +176,11 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
assert(source->scanned <= source->filled); assert(source->scanned <= source->filled);
assert(source->buf != NULL || source->size == 0); assert(source->buf != NULL || source->size == 0);
assert(source->buf == NULL || source->size > 0); assert(source->buf == NULL || source->size > 0);
assert(source->fd >= 0);
assert(data); assert(data);
while(source->filled < size) { while(source->filled < size) {
if (source->fd < 0) if (source->passive_fd)
/* we have to wait for some data to come to us */ /* we have to wait for some data to come to us */
return -EWOULDBLOCK; return -EWOULDBLOCK;
@ -418,11 +449,11 @@ int process_data(RemoteSource *source) {
} }
} }
int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) { int process_source(RemoteSource *source, bool compress, bool seal) {
int r; int r;
assert(source); assert(source);
assert(writer); assert(source->writer);
r = process_data(source); r = process_data(source);
if (r <= 0) if (r <= 0)
@ -440,7 +471,7 @@ int process_source(RemoteSource *source, Writer *writer, bool compress, bool sea
assert(source->iovw.iovec); assert(source->iovw.iovec);
assert(source->iovw.count); assert(source->iovw.count);
r = writer_write(writer, &source->iovw, &source->ts, compress, seal); r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
if (r < 0) if (r < 0)
log_error("Failed to write entry of %zu bytes: %s", log_error("Failed to write entry of %zu bytes: %s",
iovw_size(&source->iovw), strerror(-r)); iovw_size(&source->iovw), strerror(-r));

View file

@ -35,6 +35,7 @@ typedef enum {
typedef struct RemoteSource { typedef struct RemoteSource {
char *name; char *name;
int fd; int fd;
bool passive_fd;
char *buf; char *buf;
size_t size; size_t size;
@ -47,9 +48,13 @@ typedef struct RemoteSource {
source_state state; source_state state;
dual_timestamp ts; dual_timestamp ts;
Writer *writer;
sd_event_source *event; sd_event_source *event;
} RemoteSource; } RemoteSource;
RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer);
static inline size_t source_non_empty(RemoteSource *source) { static inline size_t source_non_empty(RemoteSource *source) {
assert(source); assert(source);
@ -59,4 +64,4 @@ static inline size_t source_non_empty(RemoteSource *source) {
void source_free(RemoteSource *source); void source_free(RemoteSource *source);
int process_data(RemoteSource *source); int process_data(RemoteSource *source);
int push_data(RemoteSource *source, const char *data, size_t size); int push_data(RemoteSource *source, const char *data, size_t size);
int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal); int process_source(RemoteSource *source, bool compress, bool seal);

View file

@ -19,6 +19,7 @@
along with systemd; If not, see <http://www.gnu.org/licenses/>. along with systemd; If not, see <http://www.gnu.org/licenses/>.
***/ ***/
#include "journal-remote.h"
#include "journal-remote-write.h" #include "journal-remote-write.h"
int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) { int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
@ -64,32 +65,67 @@ static int do_rotate(JournalFile **f, bool compress, bool seal) {
return r; return r;
} }
int writer_init(Writer *s) { Writer* writer_new(RemoteServer *server) {
assert(s); Writer *w;
s->journal = NULL; w = new0(Writer, 1);
if (!w)
return NULL;
memset(&s->metrics, 0xFF, sizeof(s->metrics)); memset(&w->metrics, 0xFF, sizeof(w->metrics));
s->mmap = mmap_cache_new(); w->mmap = mmap_cache_new();
if (!s->mmap) if (!w->mmap) {
return log_oom(); free(w);
return NULL;
s->seqnum = 0;
return 0;
}
int writer_close(Writer *s) {
if (s->journal) {
journal_file_close(s->journal);
log_debug("Journal has been closed.");
} }
if (s->mmap)
mmap_cache_unref(s->mmap); w->n_ref = 1;
return 0; w->server = server;
return w;
} }
Writer* writer_free(Writer *w) {
if (!w)
return NULL;
if (w->journal) {
log_debug("Closing journal file %s.", w->journal->path);
journal_file_close(w->journal);
}
if (w->server) {
w->server->event_count += w->seqnum;
if (w->hashmap_key)
hashmap_remove(w->server->writers, w->hashmap_key);
}
free(w->hashmap_key);
if (w->mmap)
mmap_cache_unref(w->mmap);
free(w);
return NULL;
}
Writer* writer_unref(Writer *w) {
if (w && (-- w->n_ref <= 0))
writer_free(w);
return NULL;
}
Writer* writer_ref(Writer *w) {
if (w)
assert_se(++ w->n_ref >= 2);
return w;
}
int writer_write(Writer *s, int writer_write(Writer *s,
struct iovec_wrapper *iovw, struct iovec_wrapper *iovw,
dual_timestamp *ts, dual_timestamp *ts,

View file

@ -25,6 +25,8 @@
#include "journal-file.h" #include "journal-file.h"
typedef struct RemoteServer RemoteServer;
struct iovec_wrapper { struct iovec_wrapper {
struct iovec *iovec; struct iovec *iovec;
size_t size_bytes; size_t size_bytes;
@ -38,12 +40,25 @@ size_t iovw_size(struct iovec_wrapper *iovw);
typedef struct Writer { typedef struct Writer {
JournalFile *journal; JournalFile *journal;
JournalMetrics metrics; JournalMetrics metrics;
MMapCache *mmap; MMapCache *mmap;
RemoteServer *server;
char *hashmap_key;
uint64_t seqnum; uint64_t seqnum;
int n_ref;
} Writer; } Writer;
int writer_init(Writer *s); Writer* writer_new(RemoteServer* server);
int writer_close(Writer *s); Writer* writer_free(Writer *w);
Writer* writer_ref(Writer *w);
Writer* writer_unref(Writer *w);
DEFINE_TRIVIAL_CLEANUP_FUNC(Writer*, writer_unref);
#define _cleanup_writer_unref_ _cleanup_(writer_unrefp)
int writer_write(Writer *s, int writer_write(Writer *s,
struct iovec_wrapper *iovw, struct iovec_wrapper *iovw,
dual_timestamp *ts, dual_timestamp *ts,

View file

@ -21,7 +21,6 @@
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <inttypes.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -33,7 +32,6 @@
#include <getopt.h> #include <getopt.h>
#include "sd-daemon.h" #include "sd-daemon.h"
#include "sd-event.h"
#include "journal-file.h" #include "journal-file.h"
#include "journald-native.h" #include "journald-native.h"
#include "socket-util.h" #include "socket-util.h"
@ -41,17 +39,15 @@
#include "build.h" #include "build.h"
#include "macro.h" #include "macro.h"
#include "strv.h" #include "strv.h"
#include "hashmap.h"
#include "fileio.h" #include "fileio.h"
#include "conf-parser.h" #include "conf-parser.h"
#include "microhttpd-util.h"
#include "siphash24.h" #include "siphash24.h"
#ifdef HAVE_GNUTLS #ifdef HAVE_GNUTLS
#include <gnutls/gnutls.h> #include <gnutls/gnutls.h>
#endif #endif
#include "journal-remote-parse.h" #include "journal-remote.h"
#include "journal-remote-write.h" #include "journal-remote-write.h"
#define REMOTE_JOURNAL_PATH "/var/log/journal/remote" #define REMOTE_JOURNAL_PATH "/var/log/journal/remote"
@ -71,7 +67,7 @@ static int arg_seal = false;
static int http_socket = -1, https_socket = -1; static int http_socket = -1, https_socket = -1;
static char** arg_gnutls_log = NULL; static char** arg_gnutls_log = NULL;
static JournalWriteSplitMode arg_split_mode = JOURNAL_WRITE_SPLIT_NONE; static JournalWriteSplitMode arg_split_mode = JOURNAL_WRITE_SPLIT_HOST;
static char* arg_output = NULL; static char* arg_output = NULL;
static char *arg_key = NULL; static char *arg_key = NULL;
@ -170,7 +166,7 @@ static int spawn_getter(const char *getter, const char *url) {
return r; return r;
} }
#define filename_escape(s) xescape((s), "./ ") #define filename_escape(s) xescape((s), "/ ")
static int open_output(Writer *w, const char* host) { static int open_output(Writer *w, const char* host) {
_cleanup_free_ char *_output = NULL; _cleanup_free_ char *_output = NULL;
@ -223,26 +219,78 @@ static int open_output(Writer *w, const char* host) {
********************************************************************** **********************************************************************
**********************************************************************/ **********************************************************************/
typedef struct MHDDaemonWrapper { static int init_writer_hashmap(RemoteServer *s) {
uint64_t fd; static const struct {
struct MHD_Daemon *daemon; hash_func_t hash_func;
compare_func_t compare_func;
} functions[] = {
[JOURNAL_WRITE_SPLIT_NONE] = {trivial_hash_func,
trivial_compare_func},
[JOURNAL_WRITE_SPLIT_HOST] = {string_hash_func,
string_compare_func},
};
sd_event_source *event; assert(arg_split_mode >= 0 && arg_split_mode < (int) ELEMENTSOF(functions));
} MHDDaemonWrapper;
typedef struct RemoteServer { s->writers = hashmap_new(functions[arg_split_mode].hash_func,
RemoteSource **sources; functions[arg_split_mode].compare_func);
size_t sources_size; if (!s->writers)
size_t active; return log_oom();
sd_event *events; return 0;
sd_event_source *sigterm_event, *sigint_event, *listen_event; }
Hashmap *writers; static int get_writer(RemoteServer *s, const char *host,
Writer **writer) {
const void *key;
_cleanup_writer_unref_ Writer *w = NULL;
int r;
bool check_trust; switch(arg_split_mode) {
Hashmap *daemons; case JOURNAL_WRITE_SPLIT_NONE:
} RemoteServer; key = "one and only";
break;
case JOURNAL_WRITE_SPLIT_HOST:
assert(host);
key = host;
break;
default:
assert_not_reached("what split mode?");
}
w = hashmap_get(s->writers, key);
if (w)
writer_ref(w);
else {
w = writer_new(s);
if (!w)
return log_oom();
if (arg_split_mode == JOURNAL_WRITE_SPLIT_HOST) {
w->hashmap_key = strdup(key);
if (!w->hashmap_key)
return log_oom();
}
r = open_output(w, host);
if (r < 0)
return r;
r = hashmap_put(s->writers, w->hashmap_key ?: key, w);
if (r < 0)
return r;
}
*writer = w;
w = NULL;
return 0;
}
/**********************************************************************
**********************************************************************
**********************************************************************/
/* This should go away as soon as µhttpd allows state to be passed around. */ /* This should go away as soon as µhttpd allows state to be passed around. */
static RemoteServer *server; static RemoteServer *server;
@ -260,18 +308,31 @@ static int dispatch_http_event(sd_event_source *event,
uint32_t revents, uint32_t revents,
void *userdata); void *userdata);
static int get_source_for_fd(RemoteServer *s, int fd, RemoteSource **source) { static int get_source_for_fd(RemoteServer *s,
int fd, char *name, RemoteSource **source) {
Writer *writer;
int r;
assert(fd >= 0); assert(fd >= 0);
assert(source); assert(source);
if (!GREEDY_REALLOC0(s->sources, s->sources_size, fd + 1)) if (!GREEDY_REALLOC0(s->sources, s->sources_size, fd + 1))
return log_oom(); return log_oom();
r = get_writer(s, name, &writer);
if (r < 0) {
log_warning("Failed to get writer for source %s: %s",
name, strerror(-r));
return r;
}
if (s->sources[fd] == NULL) { if (s->sources[fd] == NULL) {
s->sources[fd] = new0(RemoteSource, 1); s->sources[fd] = source_new(fd, false, name, writer);
if (!s->sources[fd]) if (!s->sources[fd]) {
writer_unref(writer);
return log_oom(); return log_oom();
s->sources[fd]->fd = -1; }
s->active++; s->active++;
} }
@ -296,35 +357,27 @@ static int remove_source(RemoteServer *s, int fd) {
return 0; return 0;
} }
static int add_source(RemoteServer *s, int fd, const char* _name) { static int add_source(RemoteServer *s, int fd, char* name, bool own_name) {
RemoteSource *source; RemoteSource *source;
char *name;
int r; int r;
assert(s); assert(s);
assert(fd >= 0); assert(fd >= 0);
assert(_name); assert(name);
log_debug("Creating source for fd:%d (%s)", fd, _name); if (!own_name) {
name = strdup(name);
name = strdup(_name); if (!name)
if (!name) return log_oom();
return log_oom();
r = get_source_for_fd(s, fd, &source);
if (r < 0) {
log_error("Failed to create source for fd:%d (%s)", fd, name);
free(name);
return r;
} }
assert(source); r = get_source_for_fd(s, fd, name, &source);
assert(source->fd < 0); if (r < 0) {
assert(!source->name); log_error("Failed to create source for fd:%d (%s): %s",
fd, name, strerror(-r));
source->fd = fd; return r;
source->name = name; }
r = sd_event_add_io(s->events, &source->event, r = sd_event_add_io(s->events, &source->event,
fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI, fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
@ -367,100 +420,32 @@ static int setup_raw_socket(RemoteServer *s, const char *address) {
return add_raw_socket(s, fd); return add_raw_socket(s, fd);
} }
/**********************************************************************
**********************************************************************
**********************************************************************/
static int init_writer_hashmap(RemoteServer *s) {
static const struct {
hash_func_t hash_func;
compare_func_t compare_func;
} functions[] = {
[JOURNAL_WRITE_SPLIT_NONE] = {trivial_hash_func,
trivial_compare_func},
[JOURNAL_WRITE_SPLIT_HOST] = {string_hash_func,
string_compare_func},
};
assert(arg_split_mode >= 0 && arg_split_mode < (int) ELEMENTSOF(functions));
s->writers = hashmap_new(functions[arg_split_mode].hash_func,
functions[arg_split_mode].compare_func);
if (!s->writers)
return log_oom();
return 0;
}
static int get_writer(RemoteServer *s, const char *host, sd_id128_t *machine,
Writer **writer) {
const void *key;
Writer *w;
int r;
switch(arg_split_mode) {
case JOURNAL_WRITE_SPLIT_NONE:
key = "one and only";
break;
case JOURNAL_WRITE_SPLIT_HOST:
assert(host);
key = host;
break;
default:
assert_not_reached("what split mode?");
}
w = hashmap_get(s->writers, key);
if (!w) {
w = new0(Writer, 1);
if (!w)
return -ENOMEM;
r = writer_init(w);
if (r < 0) {
free(w);
return r;
}
r = hashmap_put(s->writers, key, w);
if (r < 0) {
writer_close(w);
free(w);
return r;
}
r = open_output(w, host);
if (r < 0)
return r;
}
*writer = w;
return 0;
}
/********************************************************************** /**********************************************************************
********************************************************************** **********************************************************************
**********************************************************************/ **********************************************************************/
static RemoteSource *request_meta(void **connection_cls, int fd, char *hostname) { static RemoteSource *request_meta(void **connection_cls, int fd, char *hostname) {
RemoteSource *source; RemoteSource *source;
Writer *writer;
int r;
assert(connection_cls); assert(connection_cls);
if (*connection_cls) { if (*connection_cls)
free(hostname);
return *connection_cls; return *connection_cls;
}
source = new0(RemoteSource, 1); r = get_writer(server, hostname, &writer);
if (!source) { if (r < 0) {
free(hostname); log_warning("Failed to get writer for source %s: %s",
hostname, strerror(-r));
return NULL; return NULL;
} }
source->fd = -1; /* fd */ source = source_new(fd, true, hostname, writer);
source->name = hostname; if (!source) {
log_oom();
writer_unref(writer);
return NULL;
}
log_debug("Added RemoteSource as connection metadata %p", source); log_debug("Added RemoteSource as connection metadata %p", source);
@ -488,26 +473,15 @@ static int process_http_upload(
size_t *upload_data_size, size_t *upload_data_size,
RemoteSource *source) { RemoteSource *source) {
Writer *w;
int r;
bool finished = false; bool finished = false;
size_t remaining; size_t remaining;
int r;
assert(source); assert(source);
log_debug("request_handler_upload: connection %p, %zu bytes", log_debug("request_handler_upload: connection %p, %zu bytes",
connection, *upload_data_size); connection, *upload_data_size);
r = get_writer(server, source->name, NULL, &w);
if (r < 0) {
log_warning("Failed to get writer for source %s: %s",
source->name, strerror(-r));
return mhd_respondf(connection,
MHD_HTTP_SERVICE_UNAVAILABLE,
"Failed to get writer for connection: %s.\n",
strerror(-r));
}
if (*upload_data_size) { if (*upload_data_size) {
log_debug("Received %zu bytes", *upload_data_size); log_debug("Received %zu bytes", *upload_data_size);
@ -520,8 +494,7 @@ static int process_http_upload(
finished = true; finished = true;
while (true) { while (true) {
r = process_source(source, arg_compress, arg_seal);
r = process_source(source, w, arg_compress, arg_seal);
if (r == -EAGAIN || r == -EWOULDBLOCK) if (r == -EAGAIN || r == -EWOULDBLOCK)
break; break;
else if (r < 0) { else if (r < 0) {
@ -566,7 +539,7 @@ static int request_handler(
const char *header; const char *header;
int r, code, fd; int r, code, fd;
char *hostname; _cleanup_free_ char *hostname = NULL;
assert(connection); assert(connection);
assert(connection_cls); assert(connection_cls);
@ -627,6 +600,7 @@ static int request_handler(
if (!request_meta(connection_cls, fd, hostname)) if (!request_meta(connection_cls, fd, hostname))
return respond_oom(connection); return respond_oom(connection);
hostname = NULL;
return MHD_YES; return MHD_YES;
} }
@ -871,7 +845,7 @@ static int remoteserver_init(RemoteServer *s,
else else
r = add_raw_socket(s, fd); r = add_raw_socket(s, fd);
} else if (sd_is_socket(fd, AF_UNSPEC, 0, true)) { } else if (sd_is_socket(fd, AF_UNSPEC, 0, true)) {
_cleanup_free_ char *hostname = NULL; char *hostname;
r = getnameinfo_pretty(fd, &hostname); r = getnameinfo_pretty(fd, &hostname);
if (r < 0) { if (r < 0) {
@ -881,7 +855,9 @@ static int remoteserver_init(RemoteServer *s,
log_info("Received a connection socket (fd:%d) from %s", fd, hostname); log_info("Received a connection socket (fd:%d) from %s", fd, hostname);
r = add_source(s, fd, hostname); r = add_source(s, fd, hostname, true);
if (r < 0)
free(hostname);
} else { } else {
log_error("Unknown socket passed on fd:%d", fd); log_error("Unknown socket passed on fd:%d", fd);
@ -917,7 +893,7 @@ static int remoteserver_init(RemoteServer *s,
startswith(arg_url, "http://") ?: startswith(arg_url, "http://") ?:
arg_url; arg_url;
r = add_source(s, fd, hostname); r = add_source(s, fd, (char*) hostname, false);
if (r < 0) if (r < 0)
return r; return r;
@ -966,7 +942,7 @@ static int remoteserver_init(RemoteServer *s,
output_name = *file; output_name = *file;
} }
r = add_source(s, fd, output_name); r = add_source(s, fd, (char*) output_name, false);
if (r < 0) if (r < 0)
return r; return r;
} }
@ -987,9 +963,7 @@ static int remoteserver_init(RemoteServer *s,
/* In this case we know what the writer will be /* In this case we know what the writer will be
called, so we can create it and verify that we can called, so we can create it and verify that we can
create output as expected. */ create output as expected. */
Writer *w; r = get_writer(s, NULL, &s->_single_writer);
r = get_writer(s, NULL, NULL, &w);
if (r < 0) if (r < 0)
return r; return r;
} }
@ -997,26 +971,10 @@ static int remoteserver_init(RemoteServer *s,
return 0; return 0;
} }
static int server_destroy(RemoteServer *s, uint64_t *event_count) { static void server_destroy(RemoteServer *s) {
int r;
size_t i; size_t i;
Writer *w;
MHDDaemonWrapper *d; MHDDaemonWrapper *d;
*event_count = 0;
while ((w = hashmap_steal_first(s->writers))) {
log_info("seqnum %"PRIu64, w->seqnum);
*event_count += w->seqnum;
r = writer_close(w);
if (r < 0)
log_warning("Failed to close writer: %s", strerror(-r));
free(w);
}
hashmap_free(s->writers);
while ((d = hashmap_steal_first(s->daemons))) { while ((d = hashmap_steal_first(s->daemons))) {
MHD_stop_daemon(d->daemon); MHD_stop_daemon(d->daemon);
sd_event_source_unref(d->event); sd_event_source_unref(d->event);
@ -1028,17 +986,17 @@ static int server_destroy(RemoteServer *s, uint64_t *event_count) {
assert(s->sources_size == 0 || s->sources); assert(s->sources_size == 0 || s->sources);
for (i = 0; i < s->sources_size; i++) for (i = 0; i < s->sources_size; i++)
remove_source(s, i); remove_source(s, i);
free(s->sources); free(s->sources);
writer_unref(s->_single_writer);
hashmap_free(s->writers);
sd_event_source_unref(s->sigterm_event); sd_event_source_unref(s->sigterm_event);
sd_event_source_unref(s->sigint_event); sd_event_source_unref(s->sigint_event);
sd_event_source_unref(s->listen_event); sd_event_source_unref(s->listen_event);
sd_event_unref(s->events); sd_event_unref(s->events);
/* fds that we're listening on remain open... */ /* fds that we're listening on remain open... */
return r;
} }
/********************************************************************** /**********************************************************************
@ -1050,7 +1008,6 @@ static int dispatch_raw_source_event(sd_event_source *event,
uint32_t revents, uint32_t revents,
void *userdata) { void *userdata) {
Writer *w;
RemoteServer *s = userdata; RemoteServer *s = userdata;
RemoteSource *source; RemoteSource *source;
int r; int r;
@ -1059,14 +1016,7 @@ static int dispatch_raw_source_event(sd_event_source *event,
source = s->sources[fd]; source = s->sources[fd];
assert(source->fd == fd); assert(source->fd == fd);
r = get_writer(s, source->name, NULL, &w); r = process_source(source, arg_compress, arg_seal);
if (r < 0) {
log_warning("Failed to get writer for source %s: %s",
source->name, strerror(-r));
return r;
}
r = process_source(source, w, arg_compress, arg_seal);
if (source->state == STATE_EOF) { if (source->state == STATE_EOF) {
size_t remaining; size_t remaining;
@ -1145,7 +1095,7 @@ static int dispatch_raw_connection_event(sd_event_source *event,
uint32_t revents, uint32_t revents,
void *userdata) { void *userdata) {
RemoteServer *s = userdata; RemoteServer *s = userdata;
int fd2; int fd2, r;
SocketAddress addr = { SocketAddress addr = {
.size = sizeof(union sockaddr_union), .size = sizeof(union sockaddr_union),
.type = SOCK_STREAM, .type = SOCK_STREAM,
@ -1156,7 +1106,10 @@ static int dispatch_raw_connection_event(sd_event_source *event,
if (fd2 < 0) if (fd2 < 0)
return fd2; return fd2;
return add_source(s, fd2, hostname); r = add_source(s, fd2, hostname, true);
if (r < 0)
free(hostname);
return r;
} }
/********************************************************************** /**********************************************************************
@ -1537,9 +1490,8 @@ static int setup_gnutls_logger(char **categories) {
int main(int argc, char **argv) { int main(int argc, char **argv) {
RemoteServer s = {}; RemoteServer s = {};
int r, r2; int r;
_cleanup_free_ char *key = NULL, *cert = NULL, *trust = NULL; _cleanup_free_ char *key = NULL, *cert = NULL, *trust = NULL;
uint64_t entry_count;
log_show_color(true); log_show_color(true);
log_parse_environment(); log_parse_environment();
@ -1585,8 +1537,8 @@ int main(int argc, char **argv) {
} }
} }
r2 = server_destroy(&s, &entry_count); server_destroy(&s);
log_info("Finishing after writing %" PRIu64 " entries", entry_count); log_info("Finishing after writing %" PRIu64 " entries", s.event_count);
sd_notify(false, "STATUS=Shutting down..."); sd_notify(false, "STATUS=Shutting down...");
@ -1594,5 +1546,5 @@ int main(int argc, char **argv) {
free(arg_cert); free(arg_cert);
free(arg_trust); free(arg_trust);
return r >= 0 && r2 >= 0 ? EXIT_SUCCESS : EXIT_FAILURE; return r >= 0 ? EXIT_SUCCESS : EXIT_FAILURE;
} }

View file

@ -0,0 +1,30 @@
#include <inttypes.h>
#include "sd-event.h"
#include "hashmap.h"
#include "microhttpd-util.h"
#include "journal-remote-parse.h"
typedef struct MHDDaemonWrapper {
uint64_t fd;
struct MHD_Daemon *daemon;
sd_event_source *event;
} MHDDaemonWrapper;
typedef struct RemoteServer {
RemoteSource **sources;
size_t sources_size;
size_t active;
sd_event *events;
sd_event_source *sigterm_event, *sigint_event, *listen_event;
Hashmap *writers;
Writer *_single_writer;
uint64_t event_count;
bool check_trust;
Hashmap *daemons;
} RemoteServer;