Nix/src/libstore/remote-store.cc
Eelco Dolstra 737ed88f35
Modularize config settings
Allow global config settings to be defined in multiple Config
classes. For example, this means that libutil can have settings and
evaluator settings can be moved out of libstore. The Config classes
are registered in a new GlobalConfig class to which config files
etc. are applied.

Relevant to https://github.com/NixOS/nix/issues/2009 in that it
removes the need for ad hoc handling of useCaseHack, which was the
underlying cause of that issue.
2018-05-30 13:28:01 +02:00

749 lines
21 KiB
C++

#include "serialise.hh"
#include "util.hh"
#include "remote-store.hh"
#include "worker-protocol.hh"
#include "archive.hh"
#include "affinity.hh"
#include "globals.hh"
#include "derivations.hh"
#include "pool.hh"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
namespace nix {
Path readStorePath(Store & store, Source & from)
{
Path path = readString(from);
store.assertStorePath(path);
return path;
}
template<class T> T readStorePaths(Store & store, Source & from)
{
T paths = readStrings<T>(from);
for (auto & i : paths) store.assertStorePath(i);
return paths;
}
template PathSet readStorePaths(Store & store, Source & from);
template Paths readStorePaths(Store & store, Source & from);
/* TODO: Separate these store impls into different files, give them better names */
RemoteStore::RemoteStore(const Params & params)
: Store(params)
, connections(make_ref<Pool<Connection>>(
std::max(1, (int) maxConnections),
[this]() { return openConnectionWrapper(); },
[this](const ref<Connection> & r) {
return
r->to.good()
&& r->from.good()
&& std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - r->startTime).count() < maxConnectionAge;
}
))
{
}
ref<RemoteStore::Connection> RemoteStore::openConnectionWrapper()
{
if (failed)
throw Error("opening a connection to remote store '%s' previously failed", getUri());
try {
return openConnection();
} catch (...) {
failed = true;
throw;
}
}
UDSRemoteStore::UDSRemoteStore(const Params & params)
: Store(params)
, LocalFSStore(params)
, RemoteStore(params)
{
}
UDSRemoteStore::UDSRemoteStore(std::string socket_path, const Params & params)
: Store(params)
, LocalFSStore(params)
, RemoteStore(params)
, path(socket_path)
{
}
std::string UDSRemoteStore::getUri()
{
if (path) {
return std::string("unix://") + *path;
} else {
return "daemon";
}
}
ref<RemoteStore::Connection> UDSRemoteStore::openConnection()
{
auto conn = make_ref<Connection>();
/* Connect to a daemon that does the privileged work for us. */
conn->fd = socket(PF_UNIX, SOCK_STREAM
#ifdef SOCK_CLOEXEC
| SOCK_CLOEXEC
#endif
, 0);
if (!conn->fd)
throw SysError("cannot create Unix domain socket");
closeOnExec(conn->fd.get());
string socketPath = path ? *path : settings.nixDaemonSocketFile;
struct sockaddr_un addr;
addr.sun_family = AF_UNIX;
if (socketPath.size() + 1 >= sizeof(addr.sun_path))
throw Error(format("socket path '%1%' is too long") % socketPath);
strcpy(addr.sun_path, socketPath.c_str());
if (::connect(conn->fd.get(), (struct sockaddr *) &addr, sizeof(addr)) == -1)
throw SysError(format("cannot connect to daemon at '%1%'") % socketPath);
conn->from.fd = conn->fd.get();
conn->to.fd = conn->fd.get();
conn->startTime = std::chrono::steady_clock::now();
initConnection(*conn);
return conn;
}
void RemoteStore::initConnection(Connection & conn)
{
/* Send the magic greeting, check for the reply. */
try {
conn.to << WORKER_MAGIC_1;
conn.to.flush();
unsigned int magic = readInt(conn.from);
if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
conn.from >> conn.daemonVersion;
if (GET_PROTOCOL_MAJOR(conn.daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
throw Error("Nix daemon protocol version not supported");
if (GET_PROTOCOL_MINOR(conn.daemonVersion) < 10)
throw Error("the Nix daemon version is too old");
conn.to << PROTOCOL_VERSION;
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 14) {
int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
if (cpu != -1)
conn.to << 1 << cpu;
else
conn.to << 0;
}
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 11)
conn.to << false;
conn.processStderr();
}
catch (Error & e) {
throw Error("cannot open connection to remote store '%s': %s", getUri(), e.what());
}
setOptions(conn);
}
void RemoteStore::setOptions(Connection & conn)
{
conn.to << wopSetOptions
<< settings.keepFailed
<< settings.keepGoing
<< settings.tryFallback
<< verbosity
<< settings.maxBuildJobs
<< settings.maxSilentTime
<< true
<< (settings.verboseBuild ? lvlError : lvlVomit)
<< 0 // obsolete log type
<< 0 /* obsolete print build trace */
<< settings.buildCores
<< settings.useSubstitutes;
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 12) {
std::map<std::string, Config::SettingInfo> overrides;
globalConfig.getSettings(overrides, true);
conn.to << overrides.size();
for (auto & i : overrides)
conn.to << i.first << i.second.value;
}
conn.processStderr();
}
bool RemoteStore::isValidPathUncached(const Path & path)
{
auto conn(connections->get());
conn->to << wopIsValidPath << path;
conn->processStderr();
return readInt(conn->from);
}
PathSet RemoteStore::queryValidPaths(const PathSet & paths, SubstituteFlag maybeSubstitute)
{
auto conn(connections->get());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res;
for (auto & i : paths)
if (isValidPath(i)) res.insert(i);
return res;
} else {
conn->to << wopQueryValidPaths << paths;
conn->processStderr();
return readStorePaths<PathSet>(*this, conn->from);
}
}
PathSet RemoteStore::queryAllValidPaths()
{
auto conn(connections->get());
conn->to << wopQueryAllValidPaths;
conn->processStderr();
return readStorePaths<PathSet>(*this, conn->from);
}
PathSet RemoteStore::querySubstitutablePaths(const PathSet & paths)
{
auto conn(connections->get());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res;
for (auto & i : paths) {
conn->to << wopHasSubstitutes << i;
conn->processStderr();
if (readInt(conn->from)) res.insert(i);
}
return res;
} else {
conn->to << wopQuerySubstitutablePaths << paths;
conn->processStderr();
return readStorePaths<PathSet>(*this, conn->from);
}
}
void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
SubstitutablePathInfos & infos)
{
if (paths.empty()) return;
auto conn(connections->get());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
for (auto & i : paths) {
SubstitutablePathInfo info;
conn->to << wopQuerySubstitutablePathInfo << i;
conn->processStderr();
unsigned int reply = readInt(conn->from);
if (reply == 0) continue;
info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
info.references = readStorePaths<PathSet>(*this, conn->from);
info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(conn->from);
infos[i] = info;
}
} else {
conn->to << wopQuerySubstitutablePathInfos << paths;
conn->processStderr();
size_t count = readNum<size_t>(conn->from);
for (size_t n = 0; n < count; n++) {
Path path = readStorePath(*this, conn->from);
SubstitutablePathInfo & info(infos[path]);
info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
info.references = readStorePaths<PathSet>(*this, conn->from);
info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(conn->from);
}
}
}
void RemoteStore::queryPathInfoUncached(const Path & path,
std::function<void(std::shared_ptr<ValidPathInfo>)> success,
std::function<void(std::exception_ptr exc)> failure)
{
sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() {
auto conn(connections->get());
conn->to << wopQueryPathInfo << path;
try {
conn->processStderr();
} catch (Error & e) {
// Ugly backwards compatibility hack.
if (e.msg().find("is not valid") != std::string::npos)
throw InvalidPath(e.what());
throw;
}
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) {
bool valid; conn->from >> valid;
if (!valid) throw InvalidPath(format("path '%s' is not valid") % path);
}
auto info = std::make_shared<ValidPathInfo>();
info->path = path;
info->deriver = readString(conn->from);
if (info->deriver != "") assertStorePath(info->deriver);
info->narHash = Hash(readString(conn->from), htSHA256);
info->references = readStorePaths<PathSet>(*this, conn->from);
conn->from >> info->registrationTime >> info->narSize;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
conn->from >> info->ultimate;
info->sigs = readStrings<StringSet>(conn->from);
conn->from >> info->ca;
}
return info;
});
}
void RemoteStore::queryReferrers(const Path & path,
PathSet & referrers)
{
auto conn(connections->get());
conn->to << wopQueryReferrers << path;
conn->processStderr();
PathSet referrers2 = readStorePaths<PathSet>(*this, conn->from);
referrers.insert(referrers2.begin(), referrers2.end());
}
PathSet RemoteStore::queryValidDerivers(const Path & path)
{
auto conn(connections->get());
conn->to << wopQueryValidDerivers << path;
conn->processStderr();
return readStorePaths<PathSet>(*this, conn->from);
}
PathSet RemoteStore::queryDerivationOutputs(const Path & path)
{
auto conn(connections->get());
conn->to << wopQueryDerivationOutputs << path;
conn->processStderr();
return readStorePaths<PathSet>(*this, conn->from);
}
PathSet RemoteStore::queryDerivationOutputNames(const Path & path)
{
auto conn(connections->get());
conn->to << wopQueryDerivationOutputNames << path;
conn->processStderr();
return readStrings<PathSet>(conn->from);
}
Path RemoteStore::queryPathFromHashPart(const string & hashPart)
{
auto conn(connections->get());
conn->to << wopQueryPathFromHashPart << hashPart;
conn->processStderr();
Path path = readString(conn->from);
if (!path.empty()) assertStorePath(path);
return path;
}
void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor)
{
auto conn(connections->get());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) {
conn->to << wopImportPaths;
auto source2 = sinkToSource([&](Sink & sink) {
sink << 1 // == path follows
;
copyNAR(source, sink);
sink
<< exportMagic
<< info.path
<< info.references
<< info.deriver
<< 0 // == no legacy signature
<< 0 // == no path follows
;
});
conn->processStderr(0, source2.get());
auto importedPaths = readStorePaths<PathSet>(*this, conn->from);
assert(importedPaths.size() <= 1);
}
else {
conn->to << wopAddToStoreNar
<< info.path << info.deriver << info.narHash.to_string(Base16, false)
<< info.references << info.registrationTime << info.narSize
<< info.ultimate << info.sigs << info.ca
<< repair << !checkSigs;
bool tunnel = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21;
if (!tunnel) copyNAR(source, conn->to);
conn->processStderr(0, tunnel ? &source : nullptr);
}
}
Path RemoteStore::addToStore(const string & name, const Path & _srcPath,
bool recursive, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
auto conn(connections->get());
Path srcPath(absPath(_srcPath));
conn->to << wopAddToStore << name
<< ((hashAlgo == htSHA256 && recursive) ? 0 : 1) /* backwards compatibility hack */
<< (recursive ? 1 : 0)
<< printHashType(hashAlgo);
try {
conn->to.written = 0;
conn->to.warn = true;
connections->incCapacity();
dumpPath(srcPath, conn->to, filter);
connections->decCapacity();
conn->to.warn = false;
conn->processStderr();
} catch (SysError & e) {
/* Daemon closed while we were sending the path. Probably OOM
or I/O error. */
if (e.errNo == EPIPE)
try {
conn->processStderr();
} catch (EndOfFile & e) { }
throw;
}
return readStorePath(*this, conn->from);
}
Path RemoteStore::addTextToStore(const string & name, const string & s,
const PathSet & references, RepairFlag repair)
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
auto conn(connections->get());
conn->to << wopAddTextToStore << name << s << references;
conn->processStderr();
return readStorePath(*this, conn->from);
}
void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
{
auto conn(connections->get());
conn->to << wopBuildPaths;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13) {
conn->to << drvPaths;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15)
conn->to << buildMode;
else
/* Old daemons did not take a 'buildMode' parameter, so we
need to validate it here on the client side. */
if (buildMode != bmNormal)
throw Error("repairing or checking is not supported when building through the Nix daemon");
} else {
/* For backwards compatibility with old daemons, strip output
identifiers. */
PathSet drvPaths2;
for (auto & i : drvPaths)
drvPaths2.insert(string(i, 0, i.find('!')));
conn->to << drvPaths2;
}
conn->processStderr();
readInt(conn->from);
}
BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDerivation & drv,
BuildMode buildMode)
{
auto conn(connections->get());
conn->to << wopBuildDerivation << drvPath << drv << buildMode;
conn->processStderr();
BuildResult res;
unsigned int status;
conn->from >> status >> res.errorMsg;
res.status = (BuildResult::Status) status;
return res;
}
void RemoteStore::ensurePath(const Path & path)
{
auto conn(connections->get());
conn->to << wopEnsurePath << path;
conn->processStderr();
readInt(conn->from);
}
void RemoteStore::addTempRoot(const Path & path)
{
auto conn(connections->get());
conn->to << wopAddTempRoot << path;
conn->processStderr();
readInt(conn->from);
}
void RemoteStore::addIndirectRoot(const Path & path)
{
auto conn(connections->get());
conn->to << wopAddIndirectRoot << path;
conn->processStderr();
readInt(conn->from);
}
void RemoteStore::syncWithGC()
{
auto conn(connections->get());
conn->to << wopSyncWithGC;
conn->processStderr();
readInt(conn->from);
}
Roots RemoteStore::findRoots()
{
auto conn(connections->get());
conn->to << wopFindRoots;
conn->processStderr();
size_t count = readNum<size_t>(conn->from);
Roots result;
while (count--) {
Path link = readString(conn->from);
Path target = readStorePath(*this, conn->from);
result[link] = target;
}
return result;
}
void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
{
auto conn(connections->get());
conn->to
<< wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
<< options.maxFreed
/* removed options */
<< 0 << 0 << 0;
conn->processStderr();
results.paths = readStrings<PathSet>(conn->from);
results.bytesFreed = readLongLong(conn->from);
readLongLong(conn->from); // obsolete
{
auto state_(Store::state.lock());
state_->pathInfoCache.clear();
}
}
void RemoteStore::optimiseStore()
{
auto conn(connections->get());
conn->to << wopOptimiseStore;
conn->processStderr();
readInt(conn->from);
}
bool RemoteStore::verifyStore(bool checkContents, RepairFlag repair)
{
auto conn(connections->get());
conn->to << wopVerifyStore << checkContents << repair;
conn->processStderr();
return readInt(conn->from);
}
void RemoteStore::addSignatures(const Path & storePath, const StringSet & sigs)
{
auto conn(connections->get());
conn->to << wopAddSignatures << storePath << sigs;
conn->processStderr();
readInt(conn->from);
}
void RemoteStore::queryMissing(const PathSet & targets,
PathSet & willBuild, PathSet & willSubstitute, PathSet & unknown,
unsigned long long & downloadSize, unsigned long long & narSize)
{
{
auto conn(connections->get());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 19)
// Don't hold the connection handle in the fallback case
// to prevent a deadlock.
goto fallback;
conn->to << wopQueryMissing << targets;
conn->processStderr();
willBuild = readStorePaths<PathSet>(*this, conn->from);
willSubstitute = readStorePaths<PathSet>(*this, conn->from);
unknown = readStorePaths<PathSet>(*this, conn->from);
conn->from >> downloadSize >> narSize;
return;
}
fallback:
return Store::queryMissing(targets, willBuild, willSubstitute,
unknown, downloadSize, narSize);
}
void RemoteStore::connect()
{
auto conn(connections->get());
}
void RemoteStore::flushBadConnections()
{
connections->flushBad();
}
RemoteStore::Connection::~Connection()
{
try {
to.flush();
} catch (...) {
ignoreException();
}
}
static Logger::Fields readFields(Source & from)
{
Logger::Fields fields;
size_t size = readInt(from);
for (size_t n = 0; n < size; n++) {
auto type = (decltype(Logger::Field::type)) readInt(from);
if (type == Logger::Field::tInt)
fields.push_back(readNum<uint64_t>(from));
else if (type == Logger::Field::tString)
fields.push_back(readString(from));
else
throw Error("got unsupported field type %x from Nix daemon", (int) type);
}
return fields;
}
void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
{
to.flush();
while (true) {
auto msg = readNum<uint64_t>(from);
if (msg == STDERR_WRITE) {
string s = readString(from);
if (!sink) throw Error("no sink");
(*sink)(s);
}
else if (msg == STDERR_READ) {
if (!source) throw Error("no source");
size_t len = readNum<size_t>(from);
auto buf = std::make_unique<unsigned char[]>(len);
writeString(buf.get(), source->read(buf.get(), len), to);
to.flush();
}
else if (msg == STDERR_ERROR) {
string error = readString(from);
unsigned int status = readInt(from);
throw Error(status, error);
}
else if (msg == STDERR_NEXT)
printError(chomp(readString(from)));
else if (msg == STDERR_START_ACTIVITY) {
auto act = readNum<ActivityId>(from);
auto lvl = (Verbosity) readInt(from);
auto type = (ActivityType) readInt(from);
auto s = readString(from);
auto fields = readFields(from);
auto parent = readNum<ActivityId>(from);
logger->startActivity(act, lvl, type, s, fields, parent);
}
else if (msg == STDERR_STOP_ACTIVITY) {
auto act = readNum<ActivityId>(from);
logger->stopActivity(act);
}
else if (msg == STDERR_RESULT) {
auto act = readNum<ActivityId>(from);
auto type = (ResultType) readInt(from);
auto fields = readFields(from);
logger->result(act, type, fields);
}
else if (msg == STDERR_LAST)
break;
else
throw Error("got unknown message type %x from Nix daemon", msg);
}
}
static std::string uriScheme = "unix://";
static RegisterStoreImplementation regStore([](
const std::string & uri, const Store::Params & params)
-> std::shared_ptr<Store>
{
if (std::string(uri, 0, uriScheme.size()) != uriScheme) return 0;
return std::make_shared<UDSRemoteStore>(std::string(uri, uriScheme.size()), params);
});
}