/*

    Copyright (C) 2000-2001 Stefan Westerfeld
                            stefan@space.twc.de

    This library is free software; you can redistribute it and/or
    modify it under the terms of the GNU Library General Public
    License as published by the Free Software Foundation; either
    version 2 of the License, or (at your option) any later version.
  
    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    Library General Public License for more details.
   
    You should have received a copy of the GNU Library General Public License
    along with this library; see the file COPYING.LIB.  If not, write to
    the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
    Boston, MA 02110-1301, USA.

    */

#include <config.h>
#include "dispatcher.h"
#include "delayedreturn.h"
#include "startupmanager.h"
#include "unixconnection.h"
#include "tcpconnection.h"
#include "referenceclean.h"
#include "core.h"
#include "md5auth.h"
#include "mcoputils.h"
#include "loopback.h"
#include "debug.h"
#include "ifacerepo_impl.h"
#include "thread.h"

#include <sys/stat.h>
#include <stdio.h>
#include <signal.h>
#include <cstring>
#include <cstdlib>
#include <errno.h>
#include <iostream>

#if TIME_WITH_SYS_TIME
# include <sys/time.h>
# include <time.h>
#elif HAVE_SYS_TIME_H
# include <sys/time.h>
#else
# include <time.h>
#endif                                                                          

/* Dispatcher private data class (to ensure binary compatibility) */

using namespace std;
using namespace Arts;

namespace Arts {

class DispatcherWakeUpHandler;

class DispatcherPrivate {
public:
	GlobalComm globalComm;
	InterfaceRepo interfaceRepo;
	AuthAccept *accept;
	LoopbackConnection *loopbackConnection;
	DelayedReturn *delayedReturn;
	bool allowNoAuthentication;
	Mutex mutex;

	/*
	 * Thread condition that gets signalled whenever something relevant for
	 * waitForResult happens. Note that broken connections are also relevant
	 * for waitForResult.
	 */
	ThreadCondition requestResultCondition;

	/*
	 * Thread condition that gets signalled whenever something relevant for
	 * the server connection process happens. This is either:
	 *  - authentication fails
	 *  - authentication succeeds
	 *  - a connection breaks
	 */
	ThreadCondition serverConnectCondition;

	DispatcherWakeUpHandler *wakeUpHandler;
};

/**
 * Class that performs dispatcher wakeup.
 *
 * The sending thread (requesting wakeup) writes a byte to a pipe. The
 * main thread watches the pipe, and as soon as the byte arrives, gets
 * woken by the IOManager. This should work, no matter what type of IOManager
 * is used (i.e. StdIOManager/GIOManager/QIOManager).
 */
class DispatcherWakeUpHandler : public IONotify {
private:
	enum { wReceive = 0, wSend = 1 };
	int wakeUpPipe[2];

public:
	DispatcherWakeUpHandler()
	{
		if(pipe(wakeUpPipe) != 0)
			arts_fatal("can't initialize wakeUp pipe (%s)",strerror(errno));

		Dispatcher::the()->ioManager()->watchFD(wakeUpPipe[wReceive],
									IOType::read | IOType::reentrant, this);
	}
	virtual ~DispatcherWakeUpHandler()
	{
		Dispatcher::the()->ioManager()->remove(this, IOType::all);

		close(wakeUpPipe[wSend]);
		close(wakeUpPipe[wReceive]);
	}
	void notifyIO(int fd, int type)
	{
		arts_return_if_fail(fd == wakeUpPipe[wReceive]);
		arts_return_if_fail(type == IOType::read);

		mcopbyte one;
		int result;
		do
			result = read(wakeUpPipe[wReceive],&one,1);
		while(result < 0 && errno == EINTR);
	}
	void wakeUp()
	{
		mcopbyte one = 1;

		int result;
		do
			result = write(wakeUpPipe[wSend],&one,1);
		while(result < 0 && errno == EINTR);
	}
};

}

Dispatcher *Dispatcher::_instance = 0;

Dispatcher::Dispatcher(IOManager *ioManager, StartServer startServer)
{
	assert(!_instance);
	_instance = this;
	
	/* private data pointer */
	d = new DispatcherPrivate();

	lock();

	/* makes arts_debug/arts_message/arts_return_if_fail/... threadsafe */
	Debug::initMutex();

	generateServerID();

	if(ioManager)
	{
		_ioManager = ioManager;
		deleteIOManagerOnExit = false;
	}
	else
	{
		_ioManager = new StdIOManager;
		deleteIOManagerOnExit = true;
	}

	d->wakeUpHandler = new DispatcherWakeUpHandler;

	objectManager = new ObjectManager;

	notificationManager = new NotificationManager;

	if(startServer & startUnixServer)
	{
		unixServer = new UnixServer(this,serverID);
		if(!unixServer->running())
		{
			delete unixServer;
			arts_warning("[mcop dispatcher] Couldn't start UnixServer");
			unixServer = 0;
		}
	}
	else unixServer = 0;

	if(startServer & startTCPServer)
	{
		tcpServer = new TCPServer(this);
		if(!tcpServer->running())
		{
			delete tcpServer;
			arts_warning("[mcop dispatcher] Couldn't start TCPServer");
			tcpServer = 0;
		}
	}
	else tcpServer = 0;

	d->allowNoAuthentication = startServer & noAuthentication;
	d->accept = 0;
	d->loopbackConnection = new LoopbackConnection(serverID);
	d->interfaceRepo = InterfaceRepo::_from_base(new InterfaceRepo_impl());
	d->delayedReturn = 0;

	_flowSystem = 0;
	referenceClean = new ReferenceClean(objectPool);

	/*
	 * setup signal handler for SIGPIPE
	 */
	orig_sigpipe = signal(SIGPIPE,SIG_IGN);
	if((orig_sigpipe != SIG_DFL) && (orig_sigpipe != SIG_IGN))
	{
		cerr << "[mcop dispatcher] warning: user defined signal handler found for"
		        " SIG_PIPE, overriding" << endl;
	}
	
	StartupManager::startup();

	/*
	 * this is required for publishing global references - might be a good
	 * reason for startup priorities as since this is required for cookie&co,
	 * no communication is possible without that
	 */

	
	char *env = getenv("ARTS_SERVER");
	bool envOk = false;
	if(env)
	{
		string url = "tcp:"; url += env;
		Connection *conn = connectUrl(url);
		arts_debug("connection to %s for globalComm", url.c_str());
		if(conn)
		{
			arts_debug("hint %s", conn->findHint("GlobalComm").c_str());
			d->globalComm = Reference(conn->findHint("GlobalComm"));
			envOk = true;
			arts_debug("using globalcomm from env variable");
		}
	}

	if(!envOk)
	{
		string globalCommName
			= MCOPUtils::readConfigEntry("GlobalComm","Arts::TmpGlobalComm");
		d->globalComm = GlobalComm(SubClass(globalCommName));
	}

	// --- initialize MD5auth ---
	/*
	 * Path for random seed: better to store it in home, because some
	 * installations wipe /tmp on reboot.
	 */
	string seedpath = MCOPUtils::createFilePath("random-seed");
	string mcopdir = MCOPUtils::mcopDirectory();
	if(!mcopdir.empty()) seedpath = mcopdir + "/random-seed";
	arts_md5_auth_init_seed(seedpath.c_str());

	/*
	 * first generate a new random cookie and try to set secret-cookie to it
	 * as put will not overwrite, this has no effect if there is already a
	 * secret cookie
	 */
	char *cookie = arts_md5_auth_mkcookie();
	globalComm().put("secret-cookie",cookie);

	/*
	 * Then get the secret cookie from globalComm. As we've just set one,
	 * and as it is never removed, this always works.
	 */
	string secretCookie = globalComm().get("secret-cookie");
	if(!arts_md5_auth_set_cookie(secretCookie.c_str()))
	{
		/*
		 * Handle the case where the cookie obtained from GlobalComm is not
		 * a valid cookie (i.e. too short) - this should practically never
		 * happen. In this case, we will remove the cookie and overwrite it
		 * with our previously generated cookie.
		 */
		arts_warning("[mcop dispatcher] Bad md5 secret-cookie obtained from %s - replacing it",
				globalComm()._interfaceName().c_str());

		globalComm().erase("secret-cookie");
		globalComm().put("secret-cookie",cookie);

		if(!arts_md5_auth_set_cookie(cookie))
			arts_fatal("error initializing md5 secret cookie "
					   "(generated cookie invalid)");
	}
	memset(cookie,0,strlen(cookie));	// try to keep memory clean
	free(cookie);

	string::iterator i;	// try to keep memory clean from secret cookie
	for(i=secretCookie.begin();i != secretCookie.end();i++) *i = 'y';

	unlock();
}

Dispatcher::~Dispatcher()
{
	lock();

	/* no interaction possible now anymore - remove our global references */
	if(objectManager)
		objectManager->removeGlobalReferences();

	/* remove everything that might have been tagged for remote copying */
	referenceClean->forceClean();
	delete referenceClean;

	d->globalComm = GlobalComm::null();

	/* shutdown all extensions we loaded */
	if(objectManager)
		objectManager->shutdownExtensions();

	StartupManager::shutdown();

	/* drop all open connections */
	list<Connection *>::iterator ci;
	for(ci=connections.begin(); ci != connections.end();ci++)
	{
		Connection *conn = *ci;
		conn->drop();
	}
	d->requestResultCondition.wakeAll();
	d->serverConnectCondition.wakeAll();

	/*
	 * remove signal handler for SIGPIPE
	 */
	signal(SIGPIPE,orig_sigpipe);


	d->interfaceRepo = InterfaceRepo::null();

	if(d->accept)
	{
		delete d->accept;
		d->accept = 0;
	}

	if(d->loopbackConnection)
	{
		d->loopbackConnection->_release();
		d->loopbackConnection = 0;
	}
	if(unixServer)
	{
		delete unixServer;
		unixServer = 0;
	}

	if(tcpServer)
	{
		delete tcpServer;
		tcpServer = 0;
	}

	if(notificationManager)
	{
		delete notificationManager;
		notificationManager = 0;
	}

	if(objectManager && Object_base::_objectCount() == 0)
	{
		objectManager->removeExtensions();
		delete objectManager;
		objectManager = 0;
	}

	if(d->wakeUpHandler)
	{
		delete d->wakeUpHandler;
		d->wakeUpHandler = 0;
	}

	if(deleteIOManagerOnExit)
	{
		delete _ioManager;
		_ioManager = 0;
	}

	if(Object_base::_objectCount())
	{
		cerr << "[mcop dispatcher] warning: leaving MCOP Dispatcher and still "
			 << Object_base::_objectCount() << " object references alive." << endl;
		list<Object_skel *> which = objectPool.enumerate();
		list<Object_skel *>::iterator i;
		for(i = which.begin(); i != which.end();i++)
			cerr << "  - " << (*i)->_interfaceName() << endl;
	}

	if(Type::_typeCount())
	{
		cerr << "[mcop dispatcher] warning: leaving MCOP Dispatcher and still "
			 << Type::_typeCount() << " types alive." << endl;
	}

	if(GenericDataPacket::_dataPacketCount())
	{
		cerr << "[mcop dispatcher] warning: leaving MCOP Dispatcher and still "
			 << GenericDataPacket::_dataPacketCount()
			 << " data packets alive." << endl;
	}

	Debug::freeMutex();

	unlock();

	/* private data pointer */
	assert(d);
	delete d;
	d = 0;

	assert(_instance);
	_instance = 0;
}

InterfaceRepo Dispatcher::interfaceRepo()
{
	return d->interfaceRepo;
}

FlowSystem_impl *Dispatcher::flowSystem()
{
	assert(_flowSystem);
	return _flowSystem;
}

GlobalComm Dispatcher::globalComm()
{
	assert(!d->globalComm.isNull());
	return d->globalComm;
}

void Dispatcher::setFlowSystem(FlowSystem_impl *fs)
{
	assert(!_flowSystem);
	_flowSystem = fs;
}

Dispatcher *Dispatcher::the()
{
	return _instance;
}

Buffer *Dispatcher::waitForResult(long requestID, Connection *connection)
{
	bool isMainThread = SystemThreads::the()->isMainThread();
	Buffer *b = requestResultPool[requestID];
	
	connection->_copy(); // Keep extra ref

	while(!b && !connection->broken()) {
		if(isMainThread)
			_ioManager->processOneEvent(true);
		else
			d->requestResultCondition.wait(d->mutex);

		b = requestResultPool[requestID];
	}

	requestResultPool.releaseSlot(requestID);

	if(connection->broken()) // connection went away before we got some result
		b = 0;
	
	connection->_release(); // Give up extra ref
	
	return b;
}

Buffer *Dispatcher::createRequest(long& requestID, long objectID, long methodID)
{
	Buffer *buffer = new Buffer;

	// write mcop header record
	buffer->writeLong(MCOP_MAGIC);
	buffer->writeLong(0);			// message length - to be patched later
	buffer->writeLong(mcopInvocation);

	// generate a request ID
	requestID = requestResultPool.allocSlot();

	// write invocation record
	buffer->writeLong(objectID);
	buffer->writeLong(methodID);
	buffer->writeLong(requestID);

	return buffer;
}

Buffer *Dispatcher::createOnewayRequest(long objectID, long methodID)
{
	Buffer *buffer = new Buffer;

	// write mcop header record
	buffer->writeLong(MCOP_MAGIC);
	buffer->writeLong(0);			// message length - to be patched later
	buffer->writeLong(mcopOnewayInvocation);

	// write oneway invocation record
	buffer->writeLong(objectID);
	buffer->writeLong(methodID);

	return buffer;
}

void Dispatcher::handle(Connection *conn, Buffer *buffer, long messageType)
{
	_activeConnection = conn;

#ifdef DEBUG_IO
	printf("got a message %ld, %ld bytes in body\n",
			messageType,buffer->remaining());
	if(conn->connState() == Connection::unknown)
		cout << "connectionState = unknown" << endl;
	if(conn->connState() == Connection::expectClientHello)
		cout << "connectionState = expectClientHello" << endl;
	if(conn->connState() == Connection::expectServerHello)
		cout << "connectionState = expectServerHello" << endl;
	if(conn->connState() == Connection::expectAuthAccept)
		cout << "connectionState = expectAuthAccept" << endl;
	if(conn->connState() == Connection::established)
		cout << "connectionState = established" << endl;
#endif
	switch(conn->connState())
	{
		case Connection::established:
			/*
			 * we're connected to a trusted server, so we can accept
			 * invocations
			 */
			if(messageType == mcopInvocation) {
#ifdef DEBUG_MESSAGES
		printf("[got Invocation]\n");
#endif
				long objectID = buffer->readLong();
				long methodID = buffer->readLong();
				long requestID = buffer->readLong();

				Buffer *result = new Buffer;
				// write mcop header record
				result->writeLong(MCOP_MAGIC);
				result->writeLong(0);	// message length - to be patched later
				result->writeLong(mcopReturn);

				// write result record (returnCode is written by dispatch)
				result->writeLong(requestID);

				// perform the request
				Object_skel *object = objectPool[objectID];
				object->_copy();
				object->_dispatch(buffer,result,methodID);
				object->_release();

				assert(!buffer->readError() && !buffer->remaining());
				delete buffer;

				if(d->delayedReturn)
				{
					delete result;

					result = new Buffer;
					result->writeLong(MCOP_MAGIC);
					result->writeLong(0);	// to be patched later
					result->writeLong(mcopReturn);
					result->writeLong(requestID);

					d->delayedReturn->initialize(conn,result);
					d->delayedReturn = 0;
				}
				else	/* return normally */
				{
					result->patchLength();
					conn->qSendBuffer(result);
				}
				return;		/* everything ok - leave here */
			}

			if(messageType == mcopReturn)
			{
#ifdef DEBUG_MESSAGES
				printf("[got Return]\n");
#endif
				long requestID = buffer->readLong();
				requestResultPool[requestID] = buffer;
				d->requestResultCondition.wakeAll();

				return;		/* everything ok - leave here */
			}

			if(messageType == mcopOnewayInvocation) {
#ifdef DEBUG_MESSAGES
		printf("[got OnewayInvocation]\n");
#endif
				long objectID = buffer->readLong();
				long methodID = buffer->readLong();

				// perform the request
				Object_skel *object = objectPool[objectID];
				object->_copy();
				object->_dispatch(buffer,methodID);
				object->_release();

				assert(!buffer->readError() && !buffer->remaining());
				delete buffer;

				return;		/* everything ok - leave here */
			}
			break;

		case Connection::expectServerHello:
			if(messageType == mcopServerHello)
			{
#ifdef DEBUG_MESSAGES
				printf("[got ServerHello]\n");
#endif
				/*
		 		 * if we get a server hello, answer with a client hello
		 		 */
				ServerHello h;
				h.readType(*buffer);
				bool valid = (!buffer->readError() && !buffer->remaining());
				delete buffer;

				if(!valid) break;		// invalid hello received -> forget it

				conn->setServerID(h.serverID);

				/*
				 * check if md5auth or noauth is offered by the server
				 */
				bool md5authSupported = false;
				bool noauthSupported = false;
				vector<string>::iterator ai;
				for(ai = h.authProtocols.begin(); ai != h.authProtocols.end(); ai++)
				{
					if(*ai == "md5auth") md5authSupported = true;
					if(*ai == "noauth")  noauthSupported = true;
				}

				if(noauthSupported)		// noauth is usually easier to pass ;)
				{
					Buffer *helloBuffer = new Buffer;

					Header header(MCOP_MAGIC,0,mcopClientHello);
					header.writeType(*helloBuffer);
					ClientHello clientHello(serverID,"noauth","");
					clientHello.writeType(*helloBuffer);

					helloBuffer->patchLength();

					conn->qSendBuffer(helloBuffer);
					conn->setConnState(Connection::expectAuthAccept);
					return;		/* everything ok - leave here */
				}
				else if(md5authSupported)
				{
					Buffer *helloBuffer = new Buffer;

					Header header(MCOP_MAGIC,0,mcopClientHello);
					header.writeType(*helloBuffer);
					ClientHello clientHello(serverID,"md5auth","");

					const char *random_cookie = h.authSeed.c_str();
					if(strlen(random_cookie) == 32)
					{
						char *response = arts_md5_auth_mangle(random_cookie);
						clientHello.authData = response;
#ifdef DEBUG_AUTH
						printf("  got random_cookie = %s\n",random_cookie);
						printf("reply with authData = %s\n",response);
#endif
						free(response);
					}
					clientHello.writeType(*helloBuffer);

					helloBuffer->patchLength();

					conn->qSendBuffer(helloBuffer);
					conn->setConnState(Connection::expectAuthAccept);
					return;		/* everything ok - leave here */
				}
				else
				{
					cerr << "[mcop dispatcher] error: don't know authentication protocol" << endl;
					cerr << "   server offered: ";
					for(ai = h.authProtocols.begin(); ai != h.authProtocols.end(); ai++)
						cerr << *ai << " ";
					cerr << endl;
				}
			}
			break;

		case Connection::expectClientHello:
			if(messageType == mcopClientHello)
			{
#ifdef DEBUG_MESSAGES
				printf("[got ClientHello]\n");
#endif
				ClientHello c;
				c.readType(*buffer);
				bool valid = (!buffer->readError() && !buffer->remaining());
				delete buffer;

				if(valid && (
				       (c.authProtocol == "md5auth" && c.authData == conn->cookie())
					|| (c.authProtocol == "noauth"  && d->allowNoAuthentication) ))
				{
					conn->setServerID(c.serverID);
	
					/* build hints only for the first connection */
					if(!d->accept)
					{
						d->accept = new AuthAccept();

						d->accept->hints.push_back(
							"GlobalComm="+d->globalComm.toString());
						d->accept->hints.push_back(
							"InterfaceRepo="+d->interfaceRepo.toString());
					}
	
					Buffer *helloBuffer = new Buffer;
					Header header(MCOP_MAGIC,0,mcopAuthAccept);
					header.writeType(*helloBuffer);
					d->accept->writeType(*helloBuffer);
	
					helloBuffer->patchLength();
					conn->qSendBuffer(helloBuffer);
					conn->setConnState(Connection::established);

					return;		/* everything ok - leave here */
				}
			}
			break;

		case Connection::expectAuthAccept:
			if(messageType == mcopAuthAccept)
			{
#ifdef DEBUG_MESSAGES
				printf("[got AuthAccept]\n");
#endif
				AuthAccept a;
				a.readType(*buffer);
				delete buffer;
#ifdef DEBUG_MESSAGES

				vector<string>::iterator hi;
				for(hi = a.hints.begin(); hi != a.hints.end(); hi++)
					cout << "[got ConnectionHint] " << *hi << endl;

#endif

				conn->setConnState(Connection::established);
				conn->setHints(a.hints);
				d->serverConnectCondition.wakeAll();
				return;		/* everything ok - leave here */
			}
			break;

		case Connection::unknown:
			assert(false);
			break;
	}

	/*
	 * We shouldn't reach this point if everything went all right
	 */
	cerr << "[mcop dispatcher] Fatal communication error with a client" << endl;
	if(conn->connState() != Connection::established)
	{
		cerr << "[mcop dispatcher]   Authentication of this client was not successful" << endl;
		cerr << "[mcop dispatcher]   Connection dropped" << endl;
		conn->drop();
	}
}

long Dispatcher::addObject(Object_skel *object)
{
	long objectID = objectPool.allocSlot();

	objectPool[objectID] = object;
	return objectID;
}

void Dispatcher::removeObject(long objectID)
{
	assert(objectPool[objectID]);
	objectPool.releaseSlot(objectID);
}

void Dispatcher::generateServerID()
{
	char *buffer;
	buffer = arts_strdup_printf("%s-%04x-%08lx",
								MCOPUtils::getFullHostname().c_str(),
								getpid(),time(0));
	serverID = buffer;
	free(buffer);
}

string Dispatcher::objectToString(long objectID)
{
	Buffer b;
	ObjectReference oref;

	oref.serverID = serverID;
	oref.objectID = objectID;

	// prefer a unix domainsocket connection over a plain tcp connection
	if(unixServer) oref.urls.push_back(unixServer->url());
	if(tcpServer) oref.urls.push_back(tcpServer->url());

	oref.writeType(b);

	return b.toString("MCOP-Object");
}

bool Dispatcher::stringToObjectReference(ObjectReference& r, const string& s)
{
	if(strncmp(s.c_str(),"global:",7) == 0)
	{
		// if the object reference starts with "global:", it refers to
		// a global object which can be found with the objectManager

		string lookup = objectManager->getGlobalReference(&s.c_str()[7]);
		return stringToObjectReference(r,lookup);
	}


	Buffer b;
	if(!b.fromString(s,"MCOP-Object")) return false;

	r.readType(b);
	if(b.readError() || b.remaining()) return false;

	return true;
}

void *Dispatcher::connectObjectLocal(ObjectReference& reference,
													const string& interface)
{
	if(reference.serverID == serverID)
	{
		void *result = objectPool[reference.objectID]->_cast(interface);

		if(result)
		{
			objectPool[reference.objectID]->_copy();
			return result;
		}
	}

	return 0;
}

Connection *Dispatcher::connectObjectRemote(ObjectReference& reference)
{
	if(reference.serverID == "null")		// null reference?
		return 0;

	if(reference.serverID == serverID)
		return loopbackConnection();

	list<Connection *>::iterator i;

	for(i=connections.begin(); i != connections.end();i++)
	{
		Connection *conn = *i;

		if(conn->isConnected(reference.serverID))
		{
			// fixme: we should check for the existence of the object
			// and increment a reference count or something like that
			return conn;
		}
	}

	/* try to connect the server */
	vector<string>::iterator ui;
	for(ui = reference.urls.begin(); ui != reference.urls.end(); ui++)
	{
		Connection *conn = connectUrl(*ui);
		if(conn)
		{
			if(conn->isConnected(reference.serverID))
			{
				return conn;
			}
			else
			{
				/* we connected somewhere, but not the right server ;) */
				connections.remove(conn);
				conn->_release();
			}
		}
	}
	return 0;
}

Connection *Dispatcher::connectUrl(const string& url)
{
	Connection *conn = 0;
	bool isMainThread = SystemThreads::the()->isMainThread();

	if(strncmp(url.c_str(),"tcp:",4) == 0)
	{
		conn = new TCPConnection(url);
	}
	else if(strncmp(url.c_str(),"unix:",5) == 0)
	{
		conn = new UnixConnection(url);
	}

	if(conn)
	{
		conn->_copy(); // Keep extra ref for when the connection breaks
		conn->setConnState(Connection::expectServerHello);

		while((conn->connState() != Connection::established)
		       && !conn->broken())
		{
			if(isMainThread)
				_ioManager->processOneEvent(true);
			else
				d->serverConnectCondition.wait(d->mutex);
		}

		if(conn->connState() == Connection::established)
		{
			connections.push_back(conn);
			conn->_release(); // Give up extra ref
			return conn;
		}
			
		// well - bad luck (building a connection failed)
		
		// Give up extra ref
		conn->_release();
	}
	return 0;
}

void Dispatcher::run()
{
	assert(SystemThreads::the()->isMainThread());

	_ioManager->run();
}

void Dispatcher::terminate()
{
	_ioManager->terminate();
}

void Dispatcher::initiateConnection(Connection *connection)
{
	vector<string> authProtocols;
	authProtocols.push_back("md5auth");

	if(d->allowNoAuthentication)
		authProtocols.push_back("noauth");

	char *authSeed = arts_md5_auth_mkcookie();
	char *authResult = arts_md5_auth_mangle(authSeed);

	Buffer *helloBuffer = new Buffer;

	Header header(MCOP_MAGIC,0,mcopServerHello);
	header.writeType(*helloBuffer);
	ServerHello serverHello("aRts/MCOP-1.0.0",serverID,authProtocols,authSeed);
	serverHello.writeType(*helloBuffer);

	helloBuffer->patchLength();

	connection->qSendBuffer(helloBuffer);
	connection->setConnState(Connection::expectClientHello);

	connection->setCookie(authResult);
	free(authSeed);
	free(authResult);

	connections.push_back(connection);
}

void Dispatcher::handleCorrupt(Connection *connection)
{
	if(connection->connState() != Connection::established)
	{
		cerr << "[mcop dispatcher] Received corrupt message on unauthenticated connection" <<endl;
		cerr << "closing connection." << endl;
		connection->drop();
		d->serverConnectCondition.wakeAll();
	}
	else
	{
		cerr << "[mcop dispatcher] warning: got corrupt MCOP message !??" << endl;
	}
}

void Dispatcher::handleConnectionClose(Connection *connection)
{
	/*
	 * we can't use enumerate here, because the "existing objects list" might
	 * be changing due to the other _disconnectRemote calls we make, so we
	 * enumerate() the objects manually
	 */
	unsigned long l;
	for(l=0; l<objectPool.max(); l++)
	{
		Object_skel *skel = objectPool[l]; 
		if(skel) skel->_disconnectRemote(connection);
	}

	d->requestResultCondition.wakeAll();
	d->serverConnectCondition.wakeAll();

	/*
	 * FIXME:
	 *
	 * there may be error handling to do (e.g., check that the _stub's that
	 * still refer to that connection don't crash now).
	 */
	connection->_release();

	list<Connection *>::iterator i;
	for(i=connections.begin(); i != connections.end();i++)
	{
		if(*i == connection)
		{
			connections.erase(i);
			return;
		}
	}
}

Connection *Dispatcher::activeConnection()
{
	return _activeConnection;
}

Connection *Dispatcher::loopbackConnection()
{
	return d->loopbackConnection;
}

DelayedReturn *Dispatcher::delayReturn()
{
	assert(!d->delayedReturn);

	return d->delayedReturn = new DelayedReturn();
}

Object_skel *Dispatcher::getLocalObject(long objectID)
{
	Object_skel *result = objectPool[objectID];

	if(result) result->_copy();
	return result;
}

void Dispatcher::lock()
{
	_instance->d->mutex.lock();
}

void Dispatcher::unlock()
{
	_instance->d->mutex.unlock();
}

void Dispatcher::wakeUp()
{
	if(SystemThreads::the()->isMainThread()) return;

	_instance->d->wakeUpHandler->wakeUp();
}

/*
void Dispatcher::reloadTraderData() is declared in trader_impl.cpp
*/