Handle log messages from build-remote

This makes the progress indicator show statuses like "connecting to
'root@machine'".
This commit is contained in:
Eelco Dolstra 2017-10-24 13:41:52 +02:00
parent 0d59f1ca49
commit fe9d2f974d
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE
4 changed files with 175 additions and 124 deletions

View file

@ -42,6 +42,8 @@ int main (int argc, char * * argv)
return handleExceptions(argv[0], [&]() {
initNix();
logger = makeJSONLogger(*logger);
/* Ensure we don't get any SSH passphrase or host key popups. */
unsetenv("DISPLAY");
unsetenv("SSH_ASKPASS");
@ -172,6 +174,8 @@ int main (int argc, char * * argv)
try {
Activity act(*logger, lvlTalkative, actUnknown, fmt("connecting to '%s'", bestMachine->storeUri));
Store::Params storeParams{{"max-connections", "1"}, {"log-fd", "4"}};
if (bestMachine->sshKey != "")
storeParams["ssh-key"] = bestMachine->sshKey;
@ -197,21 +201,25 @@ connected:
auto inputs = readStrings<PathSet>(source);
auto outputs = readStrings<PathSet>(source);
AutoCloseFD uploadLock = openLockFile(currentLoad + "/" + escapeUri(storeUri) + ".upload-lock", true);
{
Activity act(*logger, lvlTalkative, actUnknown, fmt("waiting for the upload lock to '%s'", storeUri));
auto old = signal(SIGALRM, handleAlarm);
alarm(15 * 60);
if (!lockFile(uploadLock.get(), ltWrite, true))
printError("somebody is hogging the upload lock for '%s', continuing...");
alarm(0);
signal(SIGALRM, old);
copyPaths(store, ref<Store>(sshStore), inputs, NoRepair, NoCheckSigs);
uploadLock = -1;
AutoCloseFD uploadLock = openLockFile(currentLoad + "/" + escapeUri(storeUri) + ".upload-lock", true);
auto old = signal(SIGALRM, handleAlarm);
alarm(15 * 60);
if (!lockFile(uploadLock.get(), ltWrite, true))
printError("somebody is hogging the upload lock for '%s', continuing...");
alarm(0);
signal(SIGALRM, old);
copyPaths(store, ref<Store>(sshStore), inputs, NoRepair, NoCheckSigs);
uploadLock = -1;
}
BasicDerivation drv(readDerivation(drvPath));
drv.inputSrcs = inputs;
printError("building '%s' on '%s'", drvPath, storeUri);
printInfo("building '%s' on '%s'", drvPath, storeUri);
auto result = sshStore->buildDerivation(drvPath, drv);
if (!result.success())

View file

@ -55,8 +55,6 @@
#include <sys/statvfs.h>
#endif
#include <nlohmann/json.hpp>
namespace nix {
@ -612,6 +610,8 @@ struct HookInstance
FdSink sink;
std::map<ActivityId, Activity> activities;
HookInstance();
~HookInstance();
@ -769,6 +769,8 @@ private:
std::string currentLogLine;
size_t currentLogLinePos = 0; // to handle carriage return
std::string currentHookLine;
/* Pipe for the builder's standard output/error. */
Pipe builderOut;
@ -1653,12 +1655,16 @@ HookReply DerivationGoal::tryBuildHook()
string reply;
while (true) {
string s = readLine(worker.hook->fromHook.readSide.get());
if (string(s, 0, 2) == "# ") {
if (handleJSONLogMessage(s, worker.act, worker.hook->activities))
;
else if (string(s, 0, 2) == "# ") {
reply = string(s, 2);
break;
}
s += "\n";
writeToStderr(s);
else {
s += "\n";
writeToStderr(s);
}
}
debug(format("hook reply is '%1%'") % reply);
@ -2414,64 +2420,6 @@ void setupSeccomp()
}
struct BuilderLogger : Logger
{
Logger & prevLogger;
BuilderLogger(Logger & prevLogger) : prevLogger(prevLogger) { }
void addFields(nlohmann::json & json, const Fields & fields)
{
if (fields.empty()) return;
auto & arr = json["fields"] = nlohmann::json::array();
for (auto & f : fields)
if (f.type == Logger::Field::tInt)
arr.push_back(f.i);
else if (f.type == Logger::Field::tString)
arr.push_back(f.s);
else
abort();
}
void log(Verbosity lvl, const FormatOrString & fs) override
{
prevLogger.log(lvl, fs);
}
void startActivity(ActivityId act, Verbosity lvl, ActivityType type,
const std::string & s, const Fields & fields, ActivityId parent) override
{
nlohmann::json json;
json["action"] = "start";
json["id"] = act;
json["level"] = lvl;
json["type"] = type;
json["text"] = s;
addFields(json, fields);
// FIXME: handle parent
log(lvlError, "@nix " + json.dump());
}
void stopActivity(ActivityId act) override
{
nlohmann::json json;
json["action"] = "stop";
json["id"] = act;
log(lvlError, "@nix " + json.dump());
}
void result(ActivityId act, ResultType type, const Fields & fields) override
{
nlohmann::json json;
json["action"] = "result";
json["id"] = act;
json["type"] = type;
addFields(json, fields);
log(lvlError, "@nix " + json.dump());
}
};
void DerivationGoal::runChild()
{
/* Warning: in the child we should absolutely not make any SQLite
@ -2874,7 +2822,7 @@ void DerivationGoal::runChild()
/* Execute the program. This should not return. */
if (drv->isBuiltin()) {
try {
logger = new BuilderLogger(*logger);
logger = makeJSONLogger(*logger);
if (drv->builder == "builtin:fetchurl")
builtinFetchurl(*drv, netrcData);
else
@ -3315,8 +3263,14 @@ void DerivationGoal::handleChildOutput(int fd, const string & data)
if (logSink) (*logSink)(data);
}
if (hook && fd == hook->fromHook.readSide.get())
printError(chomp(data));
if (hook && fd == hook->fromHook.readSide.get()) {
for (auto c : data)
if (c == '\n') {
handleJSONLogMessage(currentHookLine, worker.act, hook->activities);
currentHookLine.clear();
} else
currentHookLine += c;
}
}
@ -3327,56 +3281,10 @@ void DerivationGoal::handleEOF(int fd)
}
static Logger::Fields getFields(nlohmann::json & json)
{
Logger::Fields fields;
for (auto & f : json) {
if (f.type() == nlohmann::json::value_t::number_unsigned)
fields.emplace_back(Logger::Field(f.get<uint64_t>()));
else if (f.type() == nlohmann::json::value_t::string)
fields.emplace_back(Logger::Field(f.get<std::string>()));
else throw Error("unsupported JSON type %d", (int) f.type());
}
return fields;
}
void DerivationGoal::flushLine()
{
if (hasPrefix(currentLogLine, "@nix ")) {
try {
auto json = nlohmann::json::parse(std::string(currentLogLine, 5));
std::string action = json["action"];
if (action == "start") {
auto type = (ActivityType) json["type"];
if (type == actDownload)
builderActivities.emplace(std::piecewise_construct,
std::forward_as_tuple(json["id"]),
std::forward_as_tuple(*logger, (Verbosity) json["level"], type,
json["text"], getFields(json["fields"]), act->id));
}
else if (action == "stop")
builderActivities.erase((ActivityId) json["id"]);
else if (action == "result") {
auto i = builderActivities.find((ActivityId) json["id"]);
if (i != builderActivities.end())
i->second.result((ResultType) json["type"], getFields(json["fields"]));
}
else if (action == "setPhase") {
std::string phase = json["phase"];
act->result(resSetPhase, phase);
}
} catch (std::exception & e) {
printError("bad log message from builder: %s", e.what());
}
}
if (handleJSONLogMessage(currentLogLine, *act, builderActivities))
;
else {
if (settings.verboseBuild &&

View file

@ -2,6 +2,7 @@
#include "util.hh"
#include <atomic>
#include <nlohmann/json.hpp>
namespace nix {
@ -90,4 +91,133 @@ Activity::Activity(Logger & logger, Verbosity lvl, ActivityType type,
logger.startActivity(id, lvl, type, s, fields, parent);
}
struct JSONLogger : Logger
{
Logger & prevLogger;
JSONLogger(Logger & prevLogger) : prevLogger(prevLogger) { }
void addFields(nlohmann::json & json, const Fields & fields)
{
if (fields.empty()) return;
auto & arr = json["fields"] = nlohmann::json::array();
for (auto & f : fields)
if (f.type == Logger::Field::tInt)
arr.push_back(f.i);
else if (f.type == Logger::Field::tString)
arr.push_back(f.s);
else
abort();
}
void write(const nlohmann::json & json)
{
prevLogger.log(lvlError, "@nix " + json.dump());
}
void log(Verbosity lvl, const FormatOrString & fs) override
{
nlohmann::json json;
json["action"] = "msg";
json["level"] = lvl;
json["msg"] = fs.s;
write(json);
}
void startActivity(ActivityId act, Verbosity lvl, ActivityType type,
const std::string & s, const Fields & fields, ActivityId parent) override
{
nlohmann::json json;
json["action"] = "start";
json["id"] = act;
json["level"] = lvl;
json["type"] = type;
json["text"] = s;
addFields(json, fields);
// FIXME: handle parent
write(json);
}
void stopActivity(ActivityId act) override
{
nlohmann::json json;
json["action"] = "stop";
json["id"] = act;
write(json);
}
void result(ActivityId act, ResultType type, const Fields & fields) override
{
nlohmann::json json;
json["action"] = "result";
json["id"] = act;
json["type"] = type;
addFields(json, fields);
write(json);
}
};
Logger * makeJSONLogger(Logger & prevLogger)
{
return new JSONLogger(prevLogger);
}
static Logger::Fields getFields(nlohmann::json & json)
{
Logger::Fields fields;
for (auto & f : json) {
if (f.type() == nlohmann::json::value_t::number_unsigned)
fields.emplace_back(Logger::Field(f.get<uint64_t>()));
else if (f.type() == nlohmann::json::value_t::string)
fields.emplace_back(Logger::Field(f.get<std::string>()));
else throw Error("unsupported JSON type %d", (int) f.type());
}
return fields;
}
bool handleJSONLogMessage(const std::string & msg,
const Activity & act, std::map<ActivityId, Activity> & activities)
{
if (!hasPrefix(msg, "@nix ")) return false;
try {
auto json = nlohmann::json::parse(std::string(msg, 5));
std::string action = json["action"];
if (action == "start") {
auto type = (ActivityType) json["type"];
if (type == actDownload || type == actUnknown)
activities.emplace(std::piecewise_construct,
std::forward_as_tuple(json["id"]),
std::forward_as_tuple(*logger, (Verbosity) json["level"], type,
json["text"], getFields(json["fields"]), act.id));
}
else if (action == "stop")
activities.erase((ActivityId) json["id"]);
else if (action == "result") {
auto i = activities.find((ActivityId) json["id"]);
if (i != activities.end())
i->second.result((ResultType) json["type"], getFields(json["fields"]));
}
else if (action == "setPhase") {
std::string phase = json["phase"];
act.result(resSetPhase, phase);
}
else if (action == "msg") {
std::string msg = json["msg"];
logger->log((Verbosity) json["level"], msg);
}
} catch (std::exception & e) {
printError("bad log message from builder: %s", e.what());
}
return true;
}
}

View file

@ -130,6 +130,11 @@ extern Logger * logger;
Logger * makeDefaultLogger();
Logger * makeJSONLogger(Logger & prevLogger);
bool handleJSONLogMessage(const std::string & msg,
const Activity & act, std::map<ActivityId, Activity> & activities);
extern Verbosity verbosity; /* suppress msgs > this */
/* Print a message if the current log level is at least the specified