summaryrefslogtreecommitdiffstats
path: root/libktorrent/kademlia/dht.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'libktorrent/kademlia/dht.cpp')
-rw-r--r--libktorrent/kademlia/dht.cpp378
1 files changed, 378 insertions, 0 deletions
diff --git a/libktorrent/kademlia/dht.cpp b/libktorrent/kademlia/dht.cpp
new file mode 100644
index 0000000..1d00ab8
--- /dev/null
+++ b/libktorrent/kademlia/dht.cpp
@@ -0,0 +1,378 @@
+/***************************************************************************
+ * Copyright (C) 2005 by Joris Guisson *
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) any later version. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the *
+ * Free Software Foundation, Inc., *
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ***************************************************************************/
+#include <qmap.h>
+#include <kresolver.h>
+#include <util/log.h>
+#include <util/array.h>
+#include <util/functions.h>
+#include <torrent/bnode.h>
+#include <torrent/globals.h>
+#include <ksocketaddress.h>
+#include "announcetask.h"
+#include "dht.h"
+#include "node.h"
+#include "rpcserver.h"
+#include "rpcmsg.h"
+#include "kclosestnodessearch.h"
+#include "database.h"
+#include "taskmanager.h"
+#include "nodelookup.h"
+
+
+using namespace bt;
+using namespace KNetwork;
+
+namespace dht
+{
+
+
+
+ DHT::DHT() : node(0),srv(0),db(0),tman(0)
+ {
+ connect(&update_timer,SIGNAL(timeout()),this,SLOT(update()));
+ }
+
+
+ DHT::~DHT()
+ {
+ if (running)
+ stop();
+ }
+
+ void DHT::start(const QString & table,const QString & key_file,bt::Uint16 port)
+ {
+ if (running)
+ return;
+
+ if (port == 0)
+ port = 6881;
+
+ table_file = table;
+ this->port = port;
+ Out(SYS_DHT|LOG_NOTICE) << "DHT: Starting on port " << port << endl;
+ srv = new RPCServer(this,port);
+ node = new Node(srv,key_file);
+ db = new Database();
+ tman = new TaskManager();
+ expire_timer.update();
+ running = true;
+ srv->start();
+ node->loadTable(table);
+ update_timer.start(1000);
+ started();
+ }
+
+
+ void DHT::stop()
+ {
+ if (!running)
+ return;
+
+ update_timer.stop();
+ Out(SYS_DHT|LOG_NOTICE) << "DHT: Stopping " << endl;
+ srv->stop();
+ node->saveTable(table_file);
+ running = false;
+ stopped();
+ delete tman; tman = 0;
+ delete db; db = 0;
+ delete node; node = 0;
+ delete srv; srv = 0;
+ }
+
+ void DHT::ping(PingReq* r)
+ {
+ if (!running)
+ return;
+
+ // ignore requests we get from ourself
+ if (r->getID() == node->getOurID())
+ return;
+
+ Out(SYS_DHT|LOG_NOTICE) << "DHT: Sending ping response" << endl;
+ PingRsp rsp(r->getMTID(),node->getOurID());
+ rsp.setOrigin(r->getOrigin());
+ srv->sendMsg(&rsp);
+ node->recieved(this,r);
+ }
+
+
+
+ void DHT::findNode(FindNodeReq* r)
+ {
+ if (!running)
+ return;
+
+ // ignore requests we get from ourself
+ if (r->getID() == node->getOurID())
+ return;
+
+ Out(SYS_DHT|LOG_DEBUG) << "DHT: got findNode request" << endl;
+ node->recieved(this,r);
+ // find the K closest nodes and pack them
+ KClosestNodesSearch kns(r->getTarget(),K);
+
+ node->findKClosestNodes(kns);
+
+ Uint32 rs = kns.requiredSpace();
+ // create the data
+ QByteArray nodes(rs);
+ // pack the found nodes in a byte array
+ if (rs > 0)
+ kns.pack(nodes);
+
+ FindNodeRsp fnr(r->getMTID(),node->getOurID(),nodes);
+ fnr.setOrigin(r->getOrigin());
+ srv->sendMsg(&fnr);
+ }
+
+
+ void DHT::announce(AnnounceReq* r)
+ {
+ if (!running)
+ return;
+
+ // ignore requests we get from ourself
+ if (r->getID() == node->getOurID())
+ return;
+
+ Out(SYS_DHT|LOG_DEBUG) << "DHT: got announce request" << endl;
+ node->recieved(this,r);
+ // first check if the token is OK
+ dht::Key token = r->getToken();
+ if (!db->checkToken(token,r->getOrigin().ipAddress().IPv4Addr(),r->getOrigin().port()))
+ return;
+
+ // everything OK, so store the value
+ Uint8 tdata[6];
+ bt::WriteUint32(tdata,0,r->getOrigin().ipAddress().IPv4Addr());
+ bt::WriteUint16(tdata,4,r->getPort());
+ db->store(r->getInfoHash(),DBItem(tdata));
+ // send a proper response to indicate everything is OK
+ AnnounceRsp rsp(r->getMTID(),node->getOurID());
+ rsp.setOrigin(r->getOrigin());
+ srv->sendMsg(&rsp);
+ }
+
+
+
+ void DHT::getPeers(GetPeersReq* r)
+ {
+ if (!running)
+ return;
+
+ // ignore requests we get from ourself
+ if (r->getID() == node->getOurID())
+ return;
+
+ Out(SYS_DHT|LOG_DEBUG) << "DHT: got getPeers request" << endl;
+ node->recieved(this,r);
+ DBItemList dbl;
+ db->sample(r->getInfoHash(),dbl,50);
+
+ // generate a token
+ dht::Key token = db->genToken(r->getOrigin().ipAddress().IPv4Addr(),r->getOrigin().port());
+
+ if (dbl.count() == 0)
+ {
+ // if data is null do the same as when we have a findNode request
+ // find the K closest nodes and pack them
+ KClosestNodesSearch kns(r->getInfoHash(),K);
+ node->findKClosestNodes(kns);
+ Uint32 rs = kns.requiredSpace();
+ // create the data
+ QByteArray nodes(rs);
+ // pack the found nodes in a byte array
+ if (rs > 0)
+ kns.pack(nodes);
+
+ GetPeersRsp fnr(r->getMTID(),node->getOurID(),nodes,token);
+ fnr.setOrigin(r->getOrigin());
+ srv->sendMsg(&fnr);
+ }
+ else
+ {
+ // send a get peers response
+ GetPeersRsp fvr(r->getMTID(),node->getOurID(),dbl,token);
+ fvr.setOrigin(r->getOrigin());
+ srv->sendMsg(&fvr);
+ }
+ }
+
+ void DHT::response(MsgBase* r)
+ {
+ if (!running)
+ return;
+
+ node->recieved(this,r);
+ }
+
+ void DHT::error(ErrMsg* )
+ {}
+
+
+ void DHT::portRecieved(const QString & ip,bt::Uint16 port)
+ {
+ if (!running)
+ return;
+
+ Out(SYS_DHT|LOG_DEBUG) << "Sending ping request to " << ip << ":" << port << endl;
+ PingReq* r = new PingReq(node->getOurID());
+ r->setOrigin(KInetSocketAddress(ip,port));
+ srv->doCall(r);
+ }
+
+ bool DHT::canStartTask() const
+ {
+ // we can start a task if we have less then 7 runnning and
+ // there are at least 16 RPC slots available
+ if (tman->getNumTasks() >= 7)
+ return false;
+ else if (256 - srv->getNumActiveRPCCalls() <= 16)
+ return false;
+
+ return true;
+ }
+
+ AnnounceTask* DHT::announce(const bt::SHA1Hash & info_hash,bt::Uint16 port)
+ {
+ if (!running)
+ return 0;
+
+ KClosestNodesSearch kns(info_hash,K);
+ node->findKClosestNodes(kns);
+ if (kns.getNumEntries() > 0)
+ {
+ Out(SYS_DHT|LOG_NOTICE) << "DHT: Doing announce " << endl;
+ AnnounceTask* at = new AnnounceTask(db,srv,node,info_hash,port);
+ at->start(kns,!canStartTask());
+ tman->addTask(at);
+ if (!db->contains(info_hash))
+ db->insert(info_hash);
+ return at;
+ }
+
+ return 0;
+ }
+
+ NodeLookup* DHT::refreshBucket(const dht::Key & id,KBucket & bucket)
+ {
+ if (!running)
+ return 0;
+
+ KClosestNodesSearch kns(id,K);
+ bucket.findKClosestNodes(kns);
+ bucket.updateRefreshTimer();
+ if (kns.getNumEntries() > 0)
+ {
+ Out(SYS_DHT|LOG_DEBUG) << "DHT: refreshing bucket " << endl;
+ NodeLookup* nl = new NodeLookup(id,srv,node);
+ nl->start(kns,!canStartTask());
+ tman->addTask(nl);
+ return nl;
+ }
+
+ return 0;
+ }
+
+ NodeLookup* DHT::findNode(const dht::Key & id)
+ {
+ if (!running)
+ return 0;
+
+ KClosestNodesSearch kns(id,K);
+ node->findKClosestNodes(kns);
+ if (kns.getNumEntries() > 0)
+ {
+ Out(SYS_DHT|LOG_DEBUG) << "DHT: finding node " << endl;
+ NodeLookup* at = new NodeLookup(id,srv,node);
+ at->start(kns,!canStartTask());
+ tman->addTask(at);
+ return at;
+ }
+
+ return 0;
+ }
+
+ void DHT::update()
+ {
+ if (!running)
+ return;
+
+ if (expire_timer.getElapsedSinceUpdate() > 5*60*1000)
+ {
+ db->expire(bt::GetCurrentTime());
+ expire_timer.update();
+ }
+
+ node->refreshBuckets(this);
+ tman->removeFinishedTasks(this);
+ stats.num_tasks = tman->getNumTasks() + tman->getNumQueuedTasks();
+ stats.num_peers = node->getNumEntriesInRoutingTable();
+ }
+
+ void DHT::timeout(const MsgBase* r)
+ {
+ node->onTimeout(r);
+ }
+
+ void DHT::addDHTNode(const QString & host,Uint16 hport)
+ {
+ if (!running)
+ return;
+
+ KResolverResults res = KResolver::resolve(host,QString::number(hport));
+ if (res.count() > 0)
+ {
+ srv->ping(node->getOurID(),res.front().address());
+ }
+ }
+
+ QMap<QString, int> DHT::getClosestGoodNodes(int maxNodes)
+ {
+ QMap<QString, int> map;
+
+ if(!node)
+ return map;
+
+ int max = 0;
+ KClosestNodesSearch kns(node->getOurID(), maxNodes*2);
+ node->findKClosestNodes(kns);
+
+ KClosestNodesSearch::Itr it;
+ for(it = kns.begin(); it != kns.end(); ++it)
+ {
+ KBucketEntry e = it->second;
+
+ if(!e.isGood())
+ continue;
+
+ KInetSocketAddress a = e.getAddress();
+
+ map.insert(a.ipAddress().toString(), a.port());
+ if(++max >= maxNodes)
+ break;
+ }
+
+ return map;
+ }
+}
+
+#include "dht.moc"