/* Copyright (C) 2000-2001 Stefan Westerfeld stefan@space.twc.de This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include <config.h> #include "dispatcher.h" #include "delayedreturn.h" #include "startupmanager.h" #include "unixconnection.h" #include "tcpconnection.h" #include "referenceclean.h" #include "core.h" #include "md5auth.h" #include "mcoputils.h" #include "loopback.h" #include "debug.h" #include "ifacerepo_impl.h" #include "thread.h" #include <sys/stat.h> #include <stdio.h> #include <signal.h> #include <cstring> #include <cstdlib> #include <errno.h> #include <iostream> #if TIME_WITH_SYS_TIME # include <sys/time.h> # include <time.h> #elif HAVE_SYS_TIME_H # include <sys/time.h> #else # include <time.h> #endif /* Dispatcher private data class (to ensure binary compatibility) */ using namespace std; using namespace Arts; namespace Arts { class DispatcherWakeUpHandler; class DispatcherPrivate { public: GlobalComm globalComm; InterfaceRepo interfaceRepo; AuthAccept *accept; LoopbackConnection *loopbackConnection; DelayedReturn *delayedReturn; bool allowNoAuthentication; Mutex mutex; /* * Thread condition that gets signalled whenever something relevant for * waitForResult happens. Note that broken connections are also relevant * for waitForResult. */ ThreadCondition requestResultCondition; /* * Thread condition that gets signalled whenever something relevant for * the server connection process happens. This is either: * - authentication fails * - authentication succeeds * - a connection breaks */ ThreadCondition serverConnectCondition; DispatcherWakeUpHandler *wakeUpHandler; }; /** * Class that performs dispatcher wakeup. * * The sending thread (requesting wakeup) writes a byte to a pipe. The * main thread watches the pipe, and as soon as the byte arrives, gets * woken by the IOManager. This should work, no matter what type of IOManager * is used (i.e. StdIOManager/GIOManager/QIOManager). */ class DispatcherWakeUpHandler : public IONotify { private: enum { wReceive = 0, wSend = 1 }; int wakeUpPipe[2]; public: DispatcherWakeUpHandler() { if(pipe(wakeUpPipe) != 0) arts_fatal("can't initialize wakeUp pipe (%s)",strerror(errno)); Dispatcher::the()->ioManager()->watchFD(wakeUpPipe[wReceive], IOType::read | IOType::reentrant, this); } virtual ~DispatcherWakeUpHandler() { Dispatcher::the()->ioManager()->remove(this, IOType::all); close(wakeUpPipe[wSend]); close(wakeUpPipe[wReceive]); } void notifyIO(int fd, int type) { arts_return_if_fail(fd == wakeUpPipe[wReceive]); arts_return_if_fail(type == IOType::read); mcopbyte one; int result; do result = read(wakeUpPipe[wReceive],&one,1); while(result < 0 && errno == EINTR); } void wakeUp() { mcopbyte one = 1; int result; do result = write(wakeUpPipe[wSend],&one,1); while(result < 0 && errno == EINTR); } }; } Dispatcher *Dispatcher::_instance = 0; Dispatcher::Dispatcher(IOManager *ioManager, StartServer startServer) { assert(!_instance); _instance = this; /* private data pointer */ d = new DispatcherPrivate(); lock(); /* makes arts_debug/arts_message/arts_return_if_fail/... threadsafe */ Debug::initMutex(); generateServerID(); if(ioManager) { _ioManager = ioManager; deleteIOManagerOnExit = false; } else { _ioManager = new StdIOManager; deleteIOManagerOnExit = true; } d->wakeUpHandler = new DispatcherWakeUpHandler; objectManager = new ObjectManager; notificationManager = new NotificationManager; if(startServer & startUnixServer) { unixServer = new UnixServer(this,serverID); if(!unixServer->running()) { delete unixServer; arts_warning("[mcop dispatcher] Couldn't start UnixServer"); unixServer = 0; } } else unixServer = 0; if(startServer & startTCPServer) { tcpServer = new TCPServer(this); if(!tcpServer->running()) { delete tcpServer; arts_warning("[mcop dispatcher] Couldn't start TCPServer"); tcpServer = 0; } } else tcpServer = 0; d->allowNoAuthentication = startServer & noAuthentication; d->accept = 0; d->loopbackConnection = new LoopbackConnection(serverID); d->interfaceRepo = InterfaceRepo::_from_base(new InterfaceRepo_impl()); d->delayedReturn = 0; _flowSystem = 0; referenceClean = new ReferenceClean(objectPool); /* * setup signal handler for SIGPIPE */ orig_sigpipe = signal(SIGPIPE,SIG_IGN); if((orig_sigpipe != SIG_DFL) && (orig_sigpipe != SIG_IGN)) { cerr << "[mcop dispatcher] warning: user defined signal handler found for" " SIG_PIPE, overriding" << endl; } StartupManager::startup(); /* * this is required for publishing global references - might be a good * reason for startup priorities as since this is required for cookie&co, * no communication is possible without that */ char *env = getenv("ARTS_SERVER"); bool envOk = false; if(env) { string url = "tcp:"; url += env; Connection *conn = connectUrl(url); arts_debug("connection to %s for globalComm", url.c_str()); if(conn) { arts_debug("hint %s", conn->findHint("GlobalComm").c_str()); d->globalComm = Reference(conn->findHint("GlobalComm")); envOk = true; arts_debug("using globalcomm from env variable"); } } if(!envOk) { string globalCommName = MCOPUtils::readConfigEntry("GlobalComm","Arts::TmpGlobalComm"); d->globalComm = GlobalComm(SubClass(globalCommName)); } // --- initialize MD5auth --- /* * Path for random seed: better to store it in home, because some * installations wipe /tmp on reboot. */ string seedpath = MCOPUtils::createFilePath("random-seed"); string mcopdir = MCOPUtils::mcopDirectory(); if(!mcopdir.empty()) seedpath = mcopdir + "/random-seed"; arts_md5_auth_init_seed(seedpath.c_str()); /* * first generate a new random cookie and try to set secret-cookie to it * as put will not overwrite, this has no effect if there is already a * secret cookie */ char *cookie = arts_md5_auth_mkcookie(); globalComm().put("secret-cookie",cookie); /* * Then get the secret cookie from globalComm. As we've just set one, * and as it is never removed, this always works. */ string secretCookie = globalComm().get("secret-cookie"); if(!arts_md5_auth_set_cookie(secretCookie.c_str())) { /* * Handle the case where the cookie obtained from GlobalComm is not * a valid cookie (i.e. too short) - this should practically never * happen. In this case, we will remove the cookie and overwrite it * with our previously generated cookie. */ arts_warning("[mcop dispatcher] Bad md5 secret-cookie obtained from %s - replacing it", globalComm()._interfaceName().c_str()); globalComm().erase("secret-cookie"); globalComm().put("secret-cookie",cookie); if(!arts_md5_auth_set_cookie(cookie)) arts_fatal("error initializing md5 secret cookie " "(generated cookie invalid)"); } memset(cookie,0,strlen(cookie)); // try to keep memory clean free(cookie); string::iterator i; // try to keep memory clean from secret cookie for(i=secretCookie.begin();i != secretCookie.end();i++) *i = 'y'; unlock(); } Dispatcher::~Dispatcher() { lock(); /* no interaction possible now anymore - remove our global references */ if(objectManager) objectManager->removeGlobalReferences(); /* remove everything that might have been tagged for remote copying */ referenceClean->forceClean(); delete referenceClean; d->globalComm = GlobalComm::null(); /* shutdown all extensions we loaded */ if(objectManager) objectManager->shutdownExtensions(); StartupManager::shutdown(); /* drop all open connections */ list<Connection *>::iterator ci; for(ci=connections.begin(); ci != connections.end();ci++) { Connection *conn = *ci; conn->drop(); } d->requestResultCondition.wakeAll(); d->serverConnectCondition.wakeAll(); /* * remove signal handler for SIGPIPE */ signal(SIGPIPE,orig_sigpipe); d->interfaceRepo = InterfaceRepo::null(); if(d->accept) { delete d->accept; d->accept = 0; } if(d->loopbackConnection) { d->loopbackConnection->_release(); d->loopbackConnection = 0; } if(unixServer) { delete unixServer; unixServer = 0; } if(tcpServer) { delete tcpServer; tcpServer = 0; } if(notificationManager) { delete notificationManager; notificationManager = 0; } if(objectManager && Object_base::_objectCount() == 0) { objectManager->removeExtensions(); delete objectManager; objectManager = 0; } if(d->wakeUpHandler) { delete d->wakeUpHandler; d->wakeUpHandler = 0; } if(deleteIOManagerOnExit) { delete _ioManager; _ioManager = 0; } if(Object_base::_objectCount()) { cerr << "[mcop dispatcher] warning: leaving MCOP Dispatcher and still " << Object_base::_objectCount() << " object references alive." << endl; list<Object_skel *> which = objectPool.enumerate(); list<Object_skel *>::iterator i; for(i = which.begin(); i != which.end();i++) cerr << " - " << (*i)->_interfaceName() << endl; } if(Type::_typeCount()) { cerr << "[mcop dispatcher] warning: leaving MCOP Dispatcher and still " << Type::_typeCount() << " types alive." << endl; } if(GenericDataPacket::_dataPacketCount()) { cerr << "[mcop dispatcher] warning: leaving MCOP Dispatcher and still " << GenericDataPacket::_dataPacketCount() << " data packets alive." << endl; } Debug::freeMutex(); unlock(); /* private data pointer */ assert(d); delete d; d = 0; assert(_instance); _instance = 0; } InterfaceRepo Dispatcher::interfaceRepo() { return d->interfaceRepo; } FlowSystem_impl *Dispatcher::flowSystem() { assert(_flowSystem); return _flowSystem; } GlobalComm Dispatcher::globalComm() { assert(!d->globalComm.isNull()); return d->globalComm; } void Dispatcher::setFlowSystem(FlowSystem_impl *fs) { assert(!_flowSystem); _flowSystem = fs; } Dispatcher *Dispatcher::the() { return _instance; } Buffer *Dispatcher::waitForResult(long requestID, Connection *connection) { bool isMainThread = SystemThreads::the()->isMainThread(); Buffer *b = requestResultPool[requestID]; connection->_copy(); // Keep extra ref while(!b && !connection->broken()) { if(isMainThread) _ioManager->processOneEvent(true); else d->requestResultCondition.wait(d->mutex); b = requestResultPool[requestID]; } requestResultPool.releaseSlot(requestID); if(connection->broken()) // connection went away before we got some result b = 0; connection->_release(); // Give up extra ref return b; } Buffer *Dispatcher::createRequest(long& requestID, long objectID, long methodID) { Buffer *buffer = new Buffer; // write mcop header record buffer->writeLong(MCOP_MAGIC); buffer->writeLong(0); // message length - to be patched later buffer->writeLong(mcopInvocation); // generate a request ID requestID = requestResultPool.allocSlot(); // write invocation record buffer->writeLong(objectID); buffer->writeLong(methodID); buffer->writeLong(requestID); return buffer; } Buffer *Dispatcher::createOnewayRequest(long objectID, long methodID) { Buffer *buffer = new Buffer; // write mcop header record buffer->writeLong(MCOP_MAGIC); buffer->writeLong(0); // message length - to be patched later buffer->writeLong(mcopOnewayInvocation); // write oneway invocation record buffer->writeLong(objectID); buffer->writeLong(methodID); return buffer; } void Dispatcher::handle(Connection *conn, Buffer *buffer, long messageType) { _activeConnection = conn; #ifdef DEBUG_IO printf("got a message %ld, %ld bytes in body\n", messageType,buffer->remaining()); if(conn->connState() == Connection::unknown) cout << "connectionState = unknown" << endl; if(conn->connState() == Connection::expectClientHello) cout << "connectionState = expectClientHello" << endl; if(conn->connState() == Connection::expectServerHello) cout << "connectionState = expectServerHello" << endl; if(conn->connState() == Connection::expectAuthAccept) cout << "connectionState = expectAuthAccept" << endl; if(conn->connState() == Connection::established) cout << "connectionState = established" << endl; #endif switch(conn->connState()) { case Connection::established: /* * we're connected to a trusted server, so we can accept * invocations */ if(messageType == mcopInvocation) { #ifdef DEBUG_MESSAGES printf("[got Invocation]\n"); #endif long objectID = buffer->readLong(); long methodID = buffer->readLong(); long requestID = buffer->readLong(); Buffer *result = new Buffer; // write mcop header record result->writeLong(MCOP_MAGIC); result->writeLong(0); // message length - to be patched later result->writeLong(mcopReturn); // write result record (returnCode is written by dispatch) result->writeLong(requestID); // perform the request Object_skel *object = objectPool[objectID]; object->_copy(); object->_dispatch(buffer,result,methodID); object->_release(); assert(!buffer->readError() && !buffer->remaining()); delete buffer; if(d->delayedReturn) { delete result; result = new Buffer; result->writeLong(MCOP_MAGIC); result->writeLong(0); // to be patched later result->writeLong(mcopReturn); result->writeLong(requestID); d->delayedReturn->initialize(conn,result); d->delayedReturn = 0; } else /* return normally */ { result->patchLength(); conn->qSendBuffer(result); } return; /* everything ok - leave here */ } if(messageType == mcopReturn) { #ifdef DEBUG_MESSAGES printf("[got Return]\n"); #endif long requestID = buffer->readLong(); requestResultPool[requestID] = buffer; d->requestResultCondition.wakeAll(); return; /* everything ok - leave here */ } if(messageType == mcopOnewayInvocation) { #ifdef DEBUG_MESSAGES printf("[got OnewayInvocation]\n"); #endif long objectID = buffer->readLong(); long methodID = buffer->readLong(); // perform the request Object_skel *object = objectPool[objectID]; object->_copy(); object->_dispatch(buffer,methodID); object->_release(); assert(!buffer->readError() && !buffer->remaining()); delete buffer; return; /* everything ok - leave here */ } break; case Connection::expectServerHello: if(messageType == mcopServerHello) { #ifdef DEBUG_MESSAGES printf("[got ServerHello]\n"); #endif /* * if we get a server hello, answer with a client hello */ ServerHello h; h.readType(*buffer); bool valid = (!buffer->readError() && !buffer->remaining()); delete buffer; if(!valid) break; // invalid hello received -> forget it conn->setServerID(h.serverID); /* * check if md5auth or noauth is offered by the server */ bool md5authSupported = false; bool noauthSupported = false; vector<string>::iterator ai; for(ai = h.authProtocols.begin(); ai != h.authProtocols.end(); ai++) { if(*ai == "md5auth") md5authSupported = true; if(*ai == "noauth") noauthSupported = true; } if(noauthSupported) // noauth is usually easier to pass ;) { Buffer *helloBuffer = new Buffer; Header header(MCOP_MAGIC,0,mcopClientHello); header.writeType(*helloBuffer); ClientHello clientHello(serverID,"noauth",""); clientHello.writeType(*helloBuffer); helloBuffer->patchLength(); conn->qSendBuffer(helloBuffer); conn->setConnState(Connection::expectAuthAccept); return; /* everything ok - leave here */ } else if(md5authSupported) { Buffer *helloBuffer = new Buffer; Header header(MCOP_MAGIC,0,mcopClientHello); header.writeType(*helloBuffer); ClientHello clientHello(serverID,"md5auth",""); const char *random_cookie = h.authSeed.c_str(); if(strlen(random_cookie) == 32) { char *response = arts_md5_auth_mangle(random_cookie); clientHello.authData = response; #ifdef DEBUG_AUTH printf(" got random_cookie = %s\n",random_cookie); printf("reply with authData = %s\n",response); #endif free(response); } clientHello.writeType(*helloBuffer); helloBuffer->patchLength(); conn->qSendBuffer(helloBuffer); conn->setConnState(Connection::expectAuthAccept); return; /* everything ok - leave here */ } else { cerr << "[mcop dispatcher] error: don't know authentication protocol" << endl; cerr << " server offered: "; for(ai = h.authProtocols.begin(); ai != h.authProtocols.end(); ai++) cerr << *ai << " "; cerr << endl; } } break; case Connection::expectClientHello: if(messageType == mcopClientHello) { #ifdef DEBUG_MESSAGES printf("[got ClientHello]\n"); #endif ClientHello c; c.readType(*buffer); bool valid = (!buffer->readError() && !buffer->remaining()); delete buffer; if(valid && ( (c.authProtocol == "md5auth" && c.authData == conn->cookie()) || (c.authProtocol == "noauth" && d->allowNoAuthentication) )) { conn->setServerID(c.serverID); /* build hints only for the first connection */ if(!d->accept) { d->accept = new AuthAccept(); d->accept->hints.push_back( "GlobalComm="+d->globalComm.toString()); d->accept->hints.push_back( "InterfaceRepo="+d->interfaceRepo.toString()); } Buffer *helloBuffer = new Buffer; Header header(MCOP_MAGIC,0,mcopAuthAccept); header.writeType(*helloBuffer); d->accept->writeType(*helloBuffer); helloBuffer->patchLength(); conn->qSendBuffer(helloBuffer); conn->setConnState(Connection::established); return; /* everything ok - leave here */ } } break; case Connection::expectAuthAccept: if(messageType == mcopAuthAccept) { #ifdef DEBUG_MESSAGES printf("[got AuthAccept]\n"); #endif AuthAccept a; a.readType(*buffer); delete buffer; #ifdef DEBUG_MESSAGES vector<string>::iterator hi; for(hi = a.hints.begin(); hi != a.hints.end(); hi++) cout << "[got ConnectionHint] " << *hi << endl; #endif conn->setConnState(Connection::established); conn->setHints(a.hints); d->serverConnectCondition.wakeAll(); return; /* everything ok - leave here */ } break; case Connection::unknown: assert(false); break; } /* * We shouldn't reach this point if everything went all right */ cerr << "[mcop dispatcher] Fatal communication error with a client" << endl; if(conn->connState() != Connection::established) { cerr << "[mcop dispatcher] Authentication of this client was not successful" << endl; cerr << "[mcop dispatcher] Connection dropped" << endl; conn->drop(); } } long Dispatcher::addObject(Object_skel *object) { long objectID = objectPool.allocSlot(); objectPool[objectID] = object; return objectID; } void Dispatcher::removeObject(long objectID) { assert(objectPool[objectID]); objectPool.releaseSlot(objectID); } void Dispatcher::generateServerID() { char *buffer; buffer = arts_strdup_printf("%s-%04x-%08lx", MCOPUtils::getFullHostname().c_str(), getpid(),time(0)); serverID = buffer; free(buffer); } string Dispatcher::objectToString(long objectID) { Buffer b; ObjectReference oref; oref.serverID = serverID; oref.objectID = objectID; // prefer a unix domainsocket connection over a plain tcp connection if(unixServer) oref.urls.push_back(unixServer->url()); if(tcpServer) oref.urls.push_back(tcpServer->url()); oref.writeType(b); return b.toString("MCOP-Object"); } bool Dispatcher::stringToObjectReference(ObjectReference& r, const string& s) { if(strncmp(s.c_str(),"global:",7) == 0) { // if the object reference starts with "global:", it refers to // a global object which can be found with the objectManager string lookup = objectManager->getGlobalReference(&s.c_str()[7]); return stringToObjectReference(r,lookup); } Buffer b; if(!b.fromString(s,"MCOP-Object")) return false; r.readType(b); if(b.readError() || b.remaining()) return false; return true; } void *Dispatcher::connectObjectLocal(ObjectReference& reference, const string& interface) { if(reference.serverID == serverID) { void *result = objectPool[reference.objectID]->_cast(interface); if(result) { objectPool[reference.objectID]->_copy(); return result; } } return 0; } Connection *Dispatcher::connectObjectRemote(ObjectReference& reference) { if(reference.serverID == "null") // null reference? return 0; if(reference.serverID == serverID) return loopbackConnection(); list<Connection *>::iterator i; for(i=connections.begin(); i != connections.end();i++) { Connection *conn = *i; if(conn->isConnected(reference.serverID)) { // fixme: we should check for the existence of the object // and increment a reference count or something like that return conn; } } /* try to connect the server */ vector<string>::iterator ui; for(ui = reference.urls.begin(); ui != reference.urls.end(); ui++) { Connection *conn = connectUrl(*ui); if(conn) { if(conn->isConnected(reference.serverID)) { return conn; } else { /* we connected somewhere, but not the right server ;) */ connections.remove(conn); conn->_release(); } } } return 0; } Connection *Dispatcher::connectUrl(const string& url) { Connection *conn = 0; bool isMainThread = SystemThreads::the()->isMainThread(); if(strncmp(url.c_str(),"tcp:",4) == 0) { conn = new TCPConnection(url); } else if(strncmp(url.c_str(),"unix:",5) == 0) { conn = new UnixConnection(url); } if(conn) { conn->_copy(); // Keep extra ref for when the connection breaks conn->setConnState(Connection::expectServerHello); while((conn->connState() != Connection::established) && !conn->broken()) { if(isMainThread) _ioManager->processOneEvent(true); else d->serverConnectCondition.wait(d->mutex); } if(conn->connState() == Connection::established) { connections.push_back(conn); conn->_release(); // Give up extra ref return conn; } // well - bad luck (building a connection failed) // Give up extra ref conn->_release(); } return 0; } void Dispatcher::run() { assert(SystemThreads::the()->isMainThread()); _ioManager->run(); } void Dispatcher::terminate() { _ioManager->terminate(); } void Dispatcher::initiateConnection(Connection *connection) { vector<string> authProtocols; authProtocols.push_back("md5auth"); if(d->allowNoAuthentication) authProtocols.push_back("noauth"); char *authSeed = arts_md5_auth_mkcookie(); char *authResult = arts_md5_auth_mangle(authSeed); Buffer *helloBuffer = new Buffer; Header header(MCOP_MAGIC,0,mcopServerHello); header.writeType(*helloBuffer); ServerHello serverHello("aRts/MCOP-1.0.0",serverID,authProtocols,authSeed); serverHello.writeType(*helloBuffer); helloBuffer->patchLength(); connection->qSendBuffer(helloBuffer); connection->setConnState(Connection::expectClientHello); connection->setCookie(authResult); free(authSeed); free(authResult); connections.push_back(connection); } void Dispatcher::handleCorrupt(Connection *connection) { if(connection->connState() != Connection::established) { cerr << "[mcop dispatcher] Received corrupt message on unauthenticated connection" <<endl; cerr << "closing connection." << endl; connection->drop(); d->serverConnectCondition.wakeAll(); } else { cerr << "[mcop dispatcher] warning: got corrupt MCOP message !??" << endl; } } void Dispatcher::handleConnectionClose(Connection *connection) { /* * we can't use enumerate here, because the "existing objects list" might * be changing due to the other _disconnectRemote calls we make, so we * enumerate() the objects manually */ unsigned long l; for(l=0; l<objectPool.max(); l++) { Object_skel *skel = objectPool[l]; if(skel) skel->_disconnectRemote(connection); } d->requestResultCondition.wakeAll(); d->serverConnectCondition.wakeAll(); /* * FIXME: * * there may be error handling to do (e.g., check that the _stub's that * still refer to that connection don't crash now). */ connection->_release(); list<Connection *>::iterator i; for(i=connections.begin(); i != connections.end();i++) { if(*i == connection) { connections.erase(i); return; } } } Connection *Dispatcher::activeConnection() { return _activeConnection; } Connection *Dispatcher::loopbackConnection() { return d->loopbackConnection; } DelayedReturn *Dispatcher::delayReturn() { assert(!d->delayedReturn); return d->delayedReturn = new DelayedReturn(); } Object_skel *Dispatcher::getLocalObject(long objectID) { Object_skel *result = objectPool[objectID]; if(result) result->_copy(); return result; } void Dispatcher::lock() { _instance->d->mutex.lock(); } void Dispatcher::unlock() { _instance->d->mutex.unlock(); } void Dispatcher::wakeUp() { if(SystemThreads::the()->isMainThread()) return; _instance->d->wakeUpHandler->wakeUp(); } /* void Dispatcher::reloadTraderData() is declared in trader_impl.cpp */