diff options
author | tpearson <tpearson@283d02a7-25f6-0310-bc7c-ecb5cbfe19da> | 2010-01-20 02:37:40 +0000 |
---|---|---|
committer | tpearson <tpearson@283d02a7-25f6-0310-bc7c-ecb5cbfe19da> | 2010-01-20 02:37:40 +0000 |
commit | 9ad5c7b5e23b4940e7a3ea3ca3a6fb77e6a8fab0 (patch) | |
tree | d088b5210e77d9fa91d954d8550e00e372b47378 /libktorrent/kademlia | |
download | ktorrent-9ad5c7b5e23b4940e7a3ea3ca3a6fb77e6a8fab0.tar.gz ktorrent-9ad5c7b5e23b4940e7a3ea3ca3a6fb77e6a8fab0.zip |
Updated to final KDE3 ktorrent release (2.2.6)
git-svn-id: svn://anonsvn.kde.org/home/kde/branches/trinity/applications/ktorrent@1077377 283d02a7-25f6-0310-bc7c-ecb5cbfe19da
Diffstat (limited to 'libktorrent/kademlia')
33 files changed, 4969 insertions, 0 deletions
diff --git a/libktorrent/kademlia/Makefile.am b/libktorrent/kademlia/Makefile.am new file mode 100644 index 0000000..1b567b2 --- /dev/null +++ b/libktorrent/kademlia/Makefile.am @@ -0,0 +1,12 @@ +INCLUDES = -I$(srcdir)/.. $(all_includes) +METASOURCES = AUTO +libkademlia_la_LDFLAGS = $(all_libraries) +noinst_LTLIBRARIES = libkademlia.la +noinst_HEADERS = key.h node.h kbucket.h rpccall.h rpcserver.h database.h dht.h \ + rpcmsg.h kclosestnodessearch.h nodelookup.h task.h pack.h \ + taskmanager.h announcetask.h dhttrackerbackend.h dhtbase.h +libkademlia_la_SOURCES = key.cpp node.cpp kbucket.cpp rpccall.cpp rpcserver.cpp \ + database.cpp dht.cpp rpcmsg.cpp kclosestnodessearch.cpp nodelookup.cpp task.cpp \ + pack.cpp taskmanager.cpp announcetask.cpp \ + dhttrackerbackend.cpp dhtbase.cpp +KDE_CXXFLAGS = $(USE_EXCEPTIONS) $(USE_RTTI) diff --git a/libktorrent/kademlia/announcetask.cpp b/libktorrent/kademlia/announcetask.cpp new file mode 100644 index 0000000..b7350a2 --- /dev/null +++ b/libktorrent/kademlia/announcetask.cpp @@ -0,0 +1,154 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <util/log.h> +#include <torrent/globals.h> +#include "announcetask.h" +#include "node.h" +#include "pack.h" + +using namespace bt; + +namespace dht +{ + + AnnounceTask::AnnounceTask(Database* db,RPCServer* rpc, Node* node,const dht::Key & info_hash,bt::Uint16 port) + : Task(rpc, node),info_hash(info_hash),port(port),db(db) + {} + + + AnnounceTask::~AnnounceTask() + {} + + + void AnnounceTask::callFinished(RPCCall* c, MsgBase* rsp) + { + // Out() << "AnnounceTask::callFinished" << endl; + // if we do not have a get peers response, return + // announce_peer's response are just empty anyway + if (c->getMsgMethod() != dht::GET_PEERS) + return; + + // it is either a GetPeersNodesRsp or a GetPeersValuesRsp + GetPeersRsp* gpr = dynamic_cast<GetPeersRsp*>(rsp); + if (!gpr) + return; + + if (gpr->containsNodes()) + { + const QByteArray & n = gpr->getData(); + Uint32 nval = n.size() / 26; + for (Uint32 i = 0;i < nval;i++) + { + // add node to todo list + KBucketEntry e = UnpackBucketEntry(n,i*26); + if (!todo.contains(e) && !visited.contains(e) && + todo.count() < 100) + { + todo.append(e); + } + } + } + else + { + // store the items in the database + const DBItemList & items = gpr->getItemList(); + for (DBItemList::const_iterator i = items.begin();i != items.end();i++) + { + db->store(info_hash,*i); + // also add the items to the returned_items list + returned_items.append(*i); + } + + // add the peer who responded to the answered list, so we can do an announce + KBucketEntry e(rsp->getOrigin(),rsp->getID()); + if (!answered.contains(KBucketEntryAndToken(e,gpr->getToken())) && !answered_visited.contains(e)) + { + answered.append(KBucketEntryAndToken(e,gpr->getToken())); + } + + emitDataReady(); + } + } + + void AnnounceTask::callTimeout(RPCCall* ) + { + //Out() << "AnnounceTask::callTimeout " << endl; + } + + void AnnounceTask::update() + { +/* Out() << "AnnounceTask::update " << endl; + Out() << "todo " << todo.count() << " ; answered " << answered.count() << endl; + Out() << "visited " << visited.count() << " ; answered_visited " << answered_visited.count() << endl; + */ + while (!answered.empty() && canDoRequest()) + { + KBucketEntryAndToken & e = answered.first(); + if (!answered_visited.contains(e)) + { + AnnounceReq* anr = new AnnounceReq(node->getOurID(),info_hash,port,e.getToken()); + anr->setOrigin(e.getAddress()); + rpcCall(anr); + answered_visited.append(e); + } + answered.pop_front(); + } + + // go over the todo list and send get_peers requests + // until we have nothing left + while (!todo.empty() && canDoRequest()) + { + KBucketEntry e = todo.first(); + // onLy send a findNode if we haven't allrready visited the node + if (!visited.contains(e)) + { + // send a findNode to the node + GetPeersReq* gpr = new GetPeersReq(node->getOurID(),info_hash); + gpr->setOrigin(e.getAddress()); + rpcCall(gpr); + visited.append(e); + } + // remove the entry from the todo list + todo.pop_front(); + } + + if (todo.empty() && answered.empty() && getNumOutstandingRequests() == 0 && !isFinished()) + { + Out(SYS_DHT|LOG_NOTICE) << "DHT: AnnounceTask done" << endl; + done(); + } + else if (answered_visited.count() >= dht::K) + { + // if K announces have occurred stop + Out(SYS_DHT|LOG_NOTICE) << "DHT: AnnounceTask done" << endl; + done(); + } + } + + bool AnnounceTask::takeItem(DBItem & item) + { + if (returned_items.empty()) + return false; + + item = returned_items.first(); + returned_items.pop_front(); + return true; + } +} diff --git a/libktorrent/kademlia/announcetask.h b/libktorrent/kademlia/announcetask.h new file mode 100644 index 0000000..d6bfa7c --- /dev/null +++ b/libktorrent/kademlia/announcetask.h @@ -0,0 +1,74 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTANNOUNCETASK_H +#define DHTANNOUNCETASK_H + +#include <task.h> +#include "kbucket.h" + +namespace dht +{ + class Database; + + class KBucketEntryAndToken : public KBucketEntry + { + Key token; + public: + KBucketEntryAndToken() {} + KBucketEntryAndToken(const KBucketEntry & e,const Key & token) + : KBucketEntry(e),token(token) {} + virtual ~KBucketEntryAndToken() {} + + const Key & getToken() const {return token;} + }; + + /** + @author Joris Guisson <[email protected]> + */ + class AnnounceTask : public Task + { + public: + AnnounceTask(Database* db,RPCServer* rpc, Node* node,const dht::Key & info_hash,bt::Uint16 port); + virtual ~AnnounceTask(); + + virtual void callFinished(RPCCall* c, MsgBase* rsp); + virtual void callTimeout(RPCCall* c); + virtual void update(); + + /** + * Take one item from the returned values. + * Returns false if there is no item to take. + * @param item The item + * @return false if no item to take, true else + */ + bool takeItem(DBItem & item); + private: + dht::Key info_hash; + bt::Uint16 port; + QValueList<KBucketEntryAndToken> answered; // nodes which have answered with values + QValueList<KBucketEntry> answered_visited; // nodes which have answered with values which have been visited + Database* db; + DBItemList returned_items; + + }; + +} + +#endif diff --git a/libktorrent/kademlia/database.cpp b/libktorrent/kademlia/database.cpp new file mode 100644 index 0000000..447975f --- /dev/null +++ b/libktorrent/kademlia/database.cpp @@ -0,0 +1,186 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <util/functions.h> +#include <util/log.h> +#include <torrent/globals.h> +#include "database.h" + +using namespace bt; + +namespace dht +{ + DBItem::DBItem() + { + memset(item,0,9); + time_stamp = bt::GetCurrentTime(); + } + + DBItem::DBItem(const bt::Uint8* ip_port) + { + memcpy(item,ip_port,6); + time_stamp = bt::GetCurrentTime(); + } + + DBItem::DBItem(const DBItem & it) + { + memcpy(item,it.item,6); + time_stamp = it.time_stamp; + } + + DBItem::~DBItem() + {} + + bool DBItem::expired(bt::TimeStamp now) const + { + return (now - time_stamp >= MAX_ITEM_AGE); + } + + DBItem & DBItem::operator = (const DBItem & it) + { + memcpy(item,it.item,6); + time_stamp = it.time_stamp; + return *this; + } + + /////////////////////////////////////////////// + + Database::Database() + { + items.setAutoDelete(true); + } + + + Database::~Database() + {} + + void Database::store(const dht::Key & key,const DBItem & dbi) + { + DBItemList* dbl = items.find(key); + if (!dbl) + { + dbl = new DBItemList(); + items.insert(key,dbl); + } + dbl->append(dbi); + } + + void Database::sample(const dht::Key & key,DBItemList & tdbl,bt::Uint32 max_entries) + { + DBItemList* dbl = items.find(key); + if (!dbl) + return; + + if (dbl->count() < max_entries) + { + DBItemList::iterator i = dbl->begin(); + while (i != dbl->end()) + { + tdbl.append(*i); + i++; + } + } + else + { + Uint32 num_added = 0; + DBItemList::iterator i = dbl->begin(); + while (i != dbl->end() && num_added < max_entries) + { + tdbl.append(*i); + num_added++; + i++; + } + } + } + + void Database::expire(bt::TimeStamp now) + { + bt::PtrMap<dht::Key,DBItemList>::iterator itr = items.begin(); + while (itr != items.end()) + { + DBItemList* dbl = itr->second; + // newer keys are inserted at the back + // so we can stop when we hit the first key which is not expired + while (dbl->count() > 0 && dbl->first().expired(now)) + { + dbl->pop_front(); + } + itr++; + } + } + + dht::Key Database::genToken(Uint32 ip,Uint16 port) + { + Uint8 tdata[14]; + TimeStamp now = bt::GetCurrentTime(); + // generate a hash of the ip port and the current time + // should prevent anybody from crapping things up + bt::WriteUint32(tdata,0,ip); + bt::WriteUint16(tdata,4,port); + bt::WriteUint64(tdata,6,now); + + dht::Key token = SHA1Hash::generate(tdata,14); + // keep track of the token, tokens will expire after a while + tokens.insert(token,now); + return token; + } + + bool Database::checkToken(const dht::Key & token,Uint32 ip,Uint16 port) + { + // the token must be in the map + if (!tokens.contains(token)) + { + Out(SYS_DHT|LOG_DEBUG) << "Unknown token" << endl; + return false; + } + + // in the map so now get the timestamp and regenerate the token + // using the IP and port of the sender + TimeStamp ts = tokens[token]; + Uint8 tdata[14]; + bt::WriteUint32(tdata,0,ip); + bt::WriteUint16(tdata,4,port); + bt::WriteUint64(tdata,6,ts); + dht::Key ct = SHA1Hash::generate(tdata,14); + // compare the generated token to the one received + if (token != ct) // not good, this peer didn't went through the proper channels + { + Out(SYS_DHT|LOG_DEBUG) << "Invalid token" << endl; + return false; + } + // expire the token + tokens.erase(token); + return true; + } + + bool Database::contains(const dht::Key & key) const + { + return items.find(key) != 0; + } + + void Database::insert(const dht::Key & key) + { + DBItemList* dbl = items.find(key); + if (!dbl) + { + dbl = new DBItemList(); + items.insert(key,dbl); + } + } +} diff --git a/libktorrent/kademlia/database.h b/libktorrent/kademlia/database.h new file mode 100644 index 0000000..94e6b3f --- /dev/null +++ b/libktorrent/kademlia/database.h @@ -0,0 +1,129 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTDATABASE_H +#define DHTDATABASE_H + +#include <qmap.h> +#include <qvaluelist.h> +#include <util/ptrmap.h> +#include <util/constants.h> +#include <util/array.h> +#include "key.h" + + +namespace dht +{ + /// Each item may only exist for 30 minutes + const bt::Uint32 MAX_ITEM_AGE = 30 * 60 * 1000; + + /** + * @author Joris Guisson + * + * Item in the database, will keep track of an IP and port combination. + * As well as the time it was inserted. + */ + class DBItem + { + bt::Uint8 item[6]; + bt::TimeStamp time_stamp; + public: + DBItem(); + DBItem(const bt::Uint8* ip_port); + DBItem(const DBItem & item); + virtual ~DBItem(); + + /// See if the item is expired + bool expired(bt::TimeStamp now) const; + + /// Get the data of an item + const bt::Uint8* getData() const {return item;} + + DBItem & operator = (const DBItem & item); + }; + + typedef QValueList<DBItem> DBItemList; + + /** + * @author Joris Guisson + * + * Class where all the key value paires get stored. + */ + class Database + { + bt::PtrMap<dht::Key,DBItemList> items; + QMap<dht::Key,bt::TimeStamp> tokens; + public: + Database(); + virtual ~Database(); + + /** + * Store an entry in the database + * @param key The key + * @param dbi The DBItem to store + */ + void store(const dht::Key & key,const DBItem & dbi); + + /** + * Get max_entries items from the database, which have + * the same key, items are taken randomly from the list. + * If the key is not present no items will be returned, if + * there are fewer then max_entries items for the key, all + * entries will be returned + * @param key The key to search for + * @param dbl The list to store the items in + * @param max_entries The maximum number entries + */ + void sample(const dht::Key & key,DBItemList & dbl,bt::Uint32 max_entries); + + /** + * Expire all items older then 30 minutes + * @param now The time it is now + * (we pass this along so we only have to calculate it once) + */ + void expire(bt::TimeStamp now); + + /** + * Generate a write token, which will give peers write access to + * the DB. + * @param ip The IP of the peer + * @param port The port of the peer + * @return A Key + */ + dht::Key genToken(bt::Uint32 ip,bt::Uint16 port); + + /** + * Check if a received token is OK. + * @param token The token received + * @param ip The ip of the sender + * @param port The port of the sender + * @return true if the token was given to this peer, false other wise + */ + bool checkToken(const dht::Key & token,bt::Uint32 ip,bt::Uint16 port); + + /// Test wether or not the DB contains a key + bool contains(const dht::Key & key) const; + + /// Insert an empty item (only if it isn't already in the DB) + void insert(const dht::Key & key); + }; + +} + +#endif diff --git a/libktorrent/kademlia/dht.cpp b/libktorrent/kademlia/dht.cpp new file mode 100644 index 0000000..1d00ab8 --- /dev/null +++ b/libktorrent/kademlia/dht.cpp @@ -0,0 +1,378 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <qmap.h> +#include <kresolver.h> +#include <util/log.h> +#include <util/array.h> +#include <util/functions.h> +#include <torrent/bnode.h> +#include <torrent/globals.h> +#include <ksocketaddress.h> +#include "announcetask.h" +#include "dht.h" +#include "node.h" +#include "rpcserver.h" +#include "rpcmsg.h" +#include "kclosestnodessearch.h" +#include "database.h" +#include "taskmanager.h" +#include "nodelookup.h" + + +using namespace bt; +using namespace KNetwork; + +namespace dht +{ + + + + DHT::DHT() : node(0),srv(0),db(0),tman(0) + { + connect(&update_timer,SIGNAL(timeout()),this,SLOT(update())); + } + + + DHT::~DHT() + { + if (running) + stop(); + } + + void DHT::start(const QString & table,const QString & key_file,bt::Uint16 port) + { + if (running) + return; + + if (port == 0) + port = 6881; + + table_file = table; + this->port = port; + Out(SYS_DHT|LOG_NOTICE) << "DHT: Starting on port " << port << endl; + srv = new RPCServer(this,port); + node = new Node(srv,key_file); + db = new Database(); + tman = new TaskManager(); + expire_timer.update(); + running = true; + srv->start(); + node->loadTable(table); + update_timer.start(1000); + started(); + } + + + void DHT::stop() + { + if (!running) + return; + + update_timer.stop(); + Out(SYS_DHT|LOG_NOTICE) << "DHT: Stopping " << endl; + srv->stop(); + node->saveTable(table_file); + running = false; + stopped(); + delete tman; tman = 0; + delete db; db = 0; + delete node; node = 0; + delete srv; srv = 0; + } + + void DHT::ping(PingReq* r) + { + if (!running) + return; + + // ignore requests we get from ourself + if (r->getID() == node->getOurID()) + return; + + Out(SYS_DHT|LOG_NOTICE) << "DHT: Sending ping response" << endl; + PingRsp rsp(r->getMTID(),node->getOurID()); + rsp.setOrigin(r->getOrigin()); + srv->sendMsg(&rsp); + node->recieved(this,r); + } + + + + void DHT::findNode(FindNodeReq* r) + { + if (!running) + return; + + // ignore requests we get from ourself + if (r->getID() == node->getOurID()) + return; + + Out(SYS_DHT|LOG_DEBUG) << "DHT: got findNode request" << endl; + node->recieved(this,r); + // find the K closest nodes and pack them + KClosestNodesSearch kns(r->getTarget(),K); + + node->findKClosestNodes(kns); + + Uint32 rs = kns.requiredSpace(); + // create the data + QByteArray nodes(rs); + // pack the found nodes in a byte array + if (rs > 0) + kns.pack(nodes); + + FindNodeRsp fnr(r->getMTID(),node->getOurID(),nodes); + fnr.setOrigin(r->getOrigin()); + srv->sendMsg(&fnr); + } + + + void DHT::announce(AnnounceReq* r) + { + if (!running) + return; + + // ignore requests we get from ourself + if (r->getID() == node->getOurID()) + return; + + Out(SYS_DHT|LOG_DEBUG) << "DHT: got announce request" << endl; + node->recieved(this,r); + // first check if the token is OK + dht::Key token = r->getToken(); + if (!db->checkToken(token,r->getOrigin().ipAddress().IPv4Addr(),r->getOrigin().port())) + return; + + // everything OK, so store the value + Uint8 tdata[6]; + bt::WriteUint32(tdata,0,r->getOrigin().ipAddress().IPv4Addr()); + bt::WriteUint16(tdata,4,r->getPort()); + db->store(r->getInfoHash(),DBItem(tdata)); + // send a proper response to indicate everything is OK + AnnounceRsp rsp(r->getMTID(),node->getOurID()); + rsp.setOrigin(r->getOrigin()); + srv->sendMsg(&rsp); + } + + + + void DHT::getPeers(GetPeersReq* r) + { + if (!running) + return; + + // ignore requests we get from ourself + if (r->getID() == node->getOurID()) + return; + + Out(SYS_DHT|LOG_DEBUG) << "DHT: got getPeers request" << endl; + node->recieved(this,r); + DBItemList dbl; + db->sample(r->getInfoHash(),dbl,50); + + // generate a token + dht::Key token = db->genToken(r->getOrigin().ipAddress().IPv4Addr(),r->getOrigin().port()); + + if (dbl.count() == 0) + { + // if data is null do the same as when we have a findNode request + // find the K closest nodes and pack them + KClosestNodesSearch kns(r->getInfoHash(),K); + node->findKClosestNodes(kns); + Uint32 rs = kns.requiredSpace(); + // create the data + QByteArray nodes(rs); + // pack the found nodes in a byte array + if (rs > 0) + kns.pack(nodes); + + GetPeersRsp fnr(r->getMTID(),node->getOurID(),nodes,token); + fnr.setOrigin(r->getOrigin()); + srv->sendMsg(&fnr); + } + else + { + // send a get peers response + GetPeersRsp fvr(r->getMTID(),node->getOurID(),dbl,token); + fvr.setOrigin(r->getOrigin()); + srv->sendMsg(&fvr); + } + } + + void DHT::response(MsgBase* r) + { + if (!running) + return; + + node->recieved(this,r); + } + + void DHT::error(ErrMsg* ) + {} + + + void DHT::portRecieved(const QString & ip,bt::Uint16 port) + { + if (!running) + return; + + Out(SYS_DHT|LOG_DEBUG) << "Sending ping request to " << ip << ":" << port << endl; + PingReq* r = new PingReq(node->getOurID()); + r->setOrigin(KInetSocketAddress(ip,port)); + srv->doCall(r); + } + + bool DHT::canStartTask() const + { + // we can start a task if we have less then 7 runnning and + // there are at least 16 RPC slots available + if (tman->getNumTasks() >= 7) + return false; + else if (256 - srv->getNumActiveRPCCalls() <= 16) + return false; + + return true; + } + + AnnounceTask* DHT::announce(const bt::SHA1Hash & info_hash,bt::Uint16 port) + { + if (!running) + return 0; + + KClosestNodesSearch kns(info_hash,K); + node->findKClosestNodes(kns); + if (kns.getNumEntries() > 0) + { + Out(SYS_DHT|LOG_NOTICE) << "DHT: Doing announce " << endl; + AnnounceTask* at = new AnnounceTask(db,srv,node,info_hash,port); + at->start(kns,!canStartTask()); + tman->addTask(at); + if (!db->contains(info_hash)) + db->insert(info_hash); + return at; + } + + return 0; + } + + NodeLookup* DHT::refreshBucket(const dht::Key & id,KBucket & bucket) + { + if (!running) + return 0; + + KClosestNodesSearch kns(id,K); + bucket.findKClosestNodes(kns); + bucket.updateRefreshTimer(); + if (kns.getNumEntries() > 0) + { + Out(SYS_DHT|LOG_DEBUG) << "DHT: refreshing bucket " << endl; + NodeLookup* nl = new NodeLookup(id,srv,node); + nl->start(kns,!canStartTask()); + tman->addTask(nl); + return nl; + } + + return 0; + } + + NodeLookup* DHT::findNode(const dht::Key & id) + { + if (!running) + return 0; + + KClosestNodesSearch kns(id,K); + node->findKClosestNodes(kns); + if (kns.getNumEntries() > 0) + { + Out(SYS_DHT|LOG_DEBUG) << "DHT: finding node " << endl; + NodeLookup* at = new NodeLookup(id,srv,node); + at->start(kns,!canStartTask()); + tman->addTask(at); + return at; + } + + return 0; + } + + void DHT::update() + { + if (!running) + return; + + if (expire_timer.getElapsedSinceUpdate() > 5*60*1000) + { + db->expire(bt::GetCurrentTime()); + expire_timer.update(); + } + + node->refreshBuckets(this); + tman->removeFinishedTasks(this); + stats.num_tasks = tman->getNumTasks() + tman->getNumQueuedTasks(); + stats.num_peers = node->getNumEntriesInRoutingTable(); + } + + void DHT::timeout(const MsgBase* r) + { + node->onTimeout(r); + } + + void DHT::addDHTNode(const QString & host,Uint16 hport) + { + if (!running) + return; + + KResolverResults res = KResolver::resolve(host,QString::number(hport)); + if (res.count() > 0) + { + srv->ping(node->getOurID(),res.front().address()); + } + } + + QMap<QString, int> DHT::getClosestGoodNodes(int maxNodes) + { + QMap<QString, int> map; + + if(!node) + return map; + + int max = 0; + KClosestNodesSearch kns(node->getOurID(), maxNodes*2); + node->findKClosestNodes(kns); + + KClosestNodesSearch::Itr it; + for(it = kns.begin(); it != kns.end(); ++it) + { + KBucketEntry e = it->second; + + if(!e.isGood()) + continue; + + KInetSocketAddress a = e.getAddress(); + + map.insert(a.ipAddress().toString(), a.port()); + if(++max >= maxNodes) + break; + } + + return map; + } +} + +#include "dht.moc" diff --git a/libktorrent/kademlia/dht.h b/libktorrent/kademlia/dht.h new file mode 100644 index 0000000..8642836 --- /dev/null +++ b/libktorrent/kademlia/dht.h @@ -0,0 +1,136 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTDHT_H +#define DHTDHT_H + +#include <qtimer.h> +#include <qstring.h> +#include <qmap.h> +#include <util/constants.h> +#include <util/timer.h> +#include "key.h" +#include "dhtbase.h" + +namespace bt +{ + class SHA1Hash; +} + +namespace KNetwork +{ + class KInetSocketAddress; +} + +namespace dht +{ + class Node; + class RPCServer; + class PingReq; + class FindNodeReq; + class FindValueReq; + class StoreValueReq; + class GetPeersReq; + class MsgBase; + class ErrMsg; + class MsgBase; + class AnnounceReq; + class Database; + class TaskManager; + class Task; + class AnnounceTask; + class NodeLookup; + class KBucket; + + /** + @author Joris Guisson <[email protected]> + */ + class DHT : public DHTBase + { + Q_OBJECT + public: + DHT(); + virtual ~DHT(); + + void ping(PingReq* r); + void findNode(FindNodeReq* r); + void response(MsgBase* r); + void getPeers(GetPeersReq* r); + void announce(AnnounceReq* r); + void error(ErrMsg* r); + void timeout(const MsgBase* r); + + /** + * A Peer has received a PORT message, and uses this function to alert the DHT of it. + * @param ip The IP of the peer + * @param port The port in the PORT message + */ + void portRecieved(const QString & ip,bt::Uint16 port); + + /** + * Do an announce on the DHT network + * @param info_hash The info_hash + * @param port The port + * @return The task which handles this + */ + AnnounceTask* announce(const bt::SHA1Hash & info_hash,bt::Uint16 port); + + /** + * Refresh a bucket using a find node task. + * @param id The id + * @param bucket The bucket to refresh + */ + NodeLookup* refreshBucket(const dht::Key & id,KBucket & bucket); + + /** + * Do a NodeLookup. + * @param id The id of the key to search + */ + NodeLookup* findNode(const dht::Key & id); + + /// See if it is possible to start a task + bool canStartTask() const; + + void start(const QString & table,const QString & key_file,bt::Uint16 port); + void stop(); + void addDHTNode(const QString & host,bt::Uint16 hport); + + /** + * Returns maxNodes number of <IP address, port> nodes + * that are closest to ourselves and are good. + * @param maxNodes maximum nr of nodes in QMap to return. + */ + QMap<QString, int> getClosestGoodNodes(int maxNodes); + + private slots: + void update(); + + private: + Node* node; + RPCServer* srv; + Database* db; + TaskManager* tman; + bt::Timer expire_timer; + QString table_file; + QTimer update_timer; + }; + +} + +#endif diff --git a/libktorrent/kademlia/dhtbase.cpp b/libktorrent/kademlia/dhtbase.cpp new file mode 100644 index 0000000..b0ff582 --- /dev/null +++ b/libktorrent/kademlia/dhtbase.cpp @@ -0,0 +1,37 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include "dhtbase.h" + +namespace dht +{ + + DHTBase::DHTBase() : running(false),port(0) + { + stats.num_peers = 0; + stats.num_tasks = 0; + } + + + DHTBase::~DHTBase() + {} +} + +#include "dhtbase.moc" + diff --git a/libktorrent/kademlia/dhtbase.h b/libktorrent/kademlia/dhtbase.h new file mode 100644 index 0000000..dfa880a --- /dev/null +++ b/libktorrent/kademlia/dhtbase.h @@ -0,0 +1,129 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTDHTBASE_H +#define DHTDHTBASE_H + +#include <qobject.h> +#include <util/constants.h> + +class QString; + +namespace bt +{ + class SHA1Hash; +} + +namespace dht +{ + class AnnounceTask; + + struct Stats + { + /// number of peers in the routing table + bt::Uint32 num_peers; + /// Number of running tasks + bt::Uint32 num_tasks; + }; + + /** + * @author Joris Guisson <[email protected]> + * + * Interface for DHT class, this is to keep other things separate from the inner workings + * of the DHT. + */ + class DHTBase : public QObject + { + Q_OBJECT + public: + DHTBase(); + virtual ~DHTBase(); + + + /** + * Start the DHT + * @param table File where the save table is located + * @param key_file The file where the key is stored + * @param port The port to use + */ + virtual void start(const QString & table,const QString & key_file,bt::Uint16 port) = 0; + + /** + * Stop the DHT + */ + virtual void stop() = 0; + + /** + * Update the DHT + */ + virtual void update() = 0; + + /** + * A Peer has received a PORT message, and uses this function to alert the DHT of it. + * @param ip The IP of the peer + * @param port The port in the PORT message + */ + virtual void portRecieved(const QString & ip,bt::Uint16 port) = 0; + + /** + * Do an announce on the DHT network + * @param info_hash The info_hash + * @param port The port + * @return The task which handles this + */ + virtual AnnounceTask* announce(const bt::SHA1Hash & info_hash,bt::Uint16 port) = 0; + + /** + * See if the DHT is running. + */ + bool isRunning() const {return running;} + + /// Get the DHT port + bt::Uint16 getPort() const {return port;} + + /// Get statistics about the DHT + const dht::Stats & getStats() const {return stats;} + + /** + * Add a DHT node. This node shall be pinged immediately. + * @param host The hostname or ip + * @param hport The port of the host + */ + virtual void addDHTNode(const QString & host,bt::Uint16 hport) = 0; + + /** + * Returns maxNodes number of <IP address, port> nodes + * that are closest to ourselves and are good. + * @param maxNodes maximum nr of nodes in QMap to return. + */ + virtual QMap<QString, int> getClosestGoodNodes(int maxNodes) = 0; + + signals: + void started(); + void stopped(); + + protected: + bool running; + bt::Uint16 port; + dht::Stats stats; + }; + +} + +#endif diff --git a/libktorrent/kademlia/dhttrackerbackend.cpp b/libktorrent/kademlia/dhttrackerbackend.cpp new file mode 100644 index 0000000..c90e6f7 --- /dev/null +++ b/libktorrent/kademlia/dhttrackerbackend.cpp @@ -0,0 +1,154 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <kurl.h> +#include <qhostaddress.h> +#include <util/log.h> +#include <util/functions.h> +#include <torrent/globals.h> +#include <torrent/server.h> +#include <torrent/peermanager.h> +#include <interfaces/torrentinterface.h> +#include "dhttrackerbackend.h" +#include "dht.h" +#include "announcetask.h" + +using namespace bt; + +namespace dht +{ + + DHTTrackerBackend::DHTTrackerBackend(DHTBase & dh_table,kt::TorrentInterface* tor) + : dh_table(dh_table),curr_task(0),tor(tor) + { + connect(&timer,SIGNAL(timeout()),this,SLOT(onTimeout())); + connect(&dh_table,SIGNAL(started()),this,SLOT(manualUpdate())); + connect(&dh_table,SIGNAL(stopped()),this,SLOT(dhtStopped())); + started = false; + } + + + DHTTrackerBackend::~DHTTrackerBackend() + { + if (curr_task) + curr_task->kill(); + } + + void DHTTrackerBackend::start() + { + started = true; + if (dh_table.isRunning()) + doRequest(); + } + + void DHTTrackerBackend::dhtStopped() + { + stop(0); + curr_task = 0; + } + + void DHTTrackerBackend::stop(bt::WaitJob*) + { + started = false; + if (curr_task) + { + curr_task->kill(); + timer.stop(); + } + } + + void DHTTrackerBackend::manualUpdate() + { + if (dh_table.isRunning() && started) + doRequest(); + } + + + bool DHTTrackerBackend::doRequest() + { + if (!dh_table.isRunning()) + return false; + + if (curr_task) + return true; + + const SHA1Hash & info_hash = tor->getInfoHash(); + Uint16 port = bt::Globals::instance().getServer().getPortInUse(); + curr_task = dh_table.announce(info_hash,port); + if (curr_task) + { + for (Uint32 i = 0;i < tor->getNumDHTNodes();i++) + { + const kt::DHTNode & n = tor->getDHTNode(i); + curr_task->addDHTNode(n.ip,n.port); + } + connect(curr_task,SIGNAL(dataReady( Task* )),this,SLOT(onDataReady( Task* ))); + connect(curr_task,SIGNAL(finished( Task* )),this,SLOT(onFinished( Task* ))); + + return true; + } + + return false; + } + + void DHTTrackerBackend::onFinished(Task* t) + { + if (curr_task == t) + { + onDataReady(curr_task); + curr_task = 0; + // do another announce in 5 minutes or so + timer.start(5 * 60 * 1000,true); + } + } + + void DHTTrackerBackend::onDataReady(Task* t) + { + if (curr_task == t) + { + Uint32 cnt = 0; + DBItem item; + while (curr_task->takeItem(item)) + { + Uint16 port = bt::ReadUint16(item.getData(),4); + QString ip = QHostAddress(ReadUint32(item.getData(),0)).toString(); + + addPeer(ip,port); + cnt++; + } + + if (cnt) + { + Out(SYS_DHT|LOG_NOTICE) << + QString("DHT: Got %1 potential peers for torrent %2") + .arg(cnt).arg(tor->getStats().torrent_name) << endl; + peersReady(this); + } + } + } + + void DHTTrackerBackend::onTimeout() + { + if (dh_table.isRunning() && started) + doRequest(); + } + +} + +#include "dhttrackerbackend.moc" diff --git a/libktorrent/kademlia/dhttrackerbackend.h b/libktorrent/kademlia/dhttrackerbackend.h new file mode 100644 index 0000000..355aab9 --- /dev/null +++ b/libktorrent/kademlia/dhttrackerbackend.h @@ -0,0 +1,75 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTDHTTRACKERBACKEND_H +#define DHTDHTTRACKERBACKEND_H + +#include <qtimer.h> +#include <interfaces/peersource.h> +#include "task.h" + +namespace kt +{ + class TorrentInterface; +} + +namespace bt +{ + class WaitJob; +} + + +namespace dht +{ + class DHTBase; + class AnnounceTask; + + + /** + @author Joris Guisson <[email protected]> + */ + class DHTTrackerBackend : public kt::PeerSource + { + Q_OBJECT + public: + DHTTrackerBackend(DHTBase & dh_table,kt::TorrentInterface* tor); + virtual ~DHTTrackerBackend(); + + virtual void start(); + virtual void stop(bt::WaitJob* wjob = 0); + virtual void manualUpdate(); + + private slots: + void onTimeout(); + bool doRequest(); + void onDataReady(Task* t); + void onFinished(Task* t); + void dhtStopped(); + + private: + DHTBase & dh_table; + AnnounceTask* curr_task; + kt::TorrentInterface* tor; + QTimer timer; + bool started; + }; + +} + +#endif diff --git a/libktorrent/kademlia/kbucket.cpp b/libktorrent/kademlia/kbucket.cpp new file mode 100644 index 0000000..fb60d1b --- /dev/null +++ b/libktorrent/kademlia/kbucket.cpp @@ -0,0 +1,355 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <ksocketaddress.h> +#include <util/file.h> +#include <util/log.h> +#include <util/functions.h> +#include <netinet/in.h> +#include "kbucket.h" +#include "kclosestnodessearch.h" +#include "rpcserver.h" +#include "node.h" + +using namespace KNetwork; +using namespace bt; + +namespace dht +{ + KBucketEntry::KBucketEntry() + { + last_responded = bt::GetCurrentTime(); + failed_queries = 0; + questionable_pings = 0; + } + + KBucketEntry::KBucketEntry(const KInetSocketAddress & addr,const Key & id) + : addr(addr),node_id(id) + { + last_responded = bt::GetCurrentTime(); + failed_queries = 0; + questionable_pings = 0; + } + + KBucketEntry::KBucketEntry(const KBucketEntry & other) + : addr(other.addr),node_id(other.node_id), + last_responded(other.last_responded),failed_queries(other.failed_queries),questionable_pings(other.questionable_pings) + {} + + + KBucketEntry::~KBucketEntry() + {} + + KBucketEntry & KBucketEntry::operator = (const KBucketEntry & other) + { + addr = other.addr; + node_id = other.node_id; + last_responded = other.last_responded; + failed_queries = other.failed_queries; + questionable_pings = other.questionable_pings; + return *this; + } + + bool KBucketEntry::operator == (const KBucketEntry & entry) const + { + return addr == entry.addr && node_id == entry.node_id; + } + + bool KBucketEntry::isGood() const + { + if (bt::GetCurrentTime() - last_responded > 15 * 60 * 1000) + return false; + else + return true; + } + + bool KBucketEntry::isQuestionable() const + { + if (bt::GetCurrentTime() - last_responded > 15 * 60 * 1000) + return true; + else + return false; + } + + + bool KBucketEntry::isBad() const + { + if (isGood()) + return false; + + return failed_queries > 2 || questionable_pings > 2; + } + + void KBucketEntry::hasResponded() + { + last_responded = bt::GetCurrentTime(); + failed_queries = 0; // reset failed queries + questionable_pings = 0; + } + + + ////////////////////////////////////////////////////////// + + KBucket::KBucket(Uint32 idx,RPCServer* srv,Node* node) + : idx(idx),srv(srv),node(node) + { + last_modified = bt::GetCurrentTime(); + refresh_task = 0; + } + + + KBucket::~KBucket() + {} + + void KBucket::insert(const KBucketEntry & entry) + { + QValueList<KBucketEntry>::iterator i = entries.find(entry); + + // If in the list, move it to the end + if (i != entries.end()) + { + KBucketEntry & e = *i; + e.hasResponded(); + last_modified = bt::GetCurrentTime(); + entries.remove(i); + entries.append(entry); + return; + } + + // insert if not already in the list and we still have room + if (i == entries.end() && entries.count() < dht::K) + { + entries.append(entry); + last_modified = bt::GetCurrentTime(); + } + else if (!replaceBadEntry(entry)) + { + // ping questionable nodes when replacing a bad one fails + pingQuestionable(entry); + } + } + + void KBucket::onResponse(RPCCall* c,MsgBase* rsp) + { + last_modified = bt::GetCurrentTime(); + + if (!pending_entries_busy_pinging.contains(c)) + return; + + KBucketEntry entry = pending_entries_busy_pinging[c]; + pending_entries_busy_pinging.erase(c); // call is done so erase it + + // we have a response so try to find the next bad or questionable node + // if we do not have room see if we can get rid of some bad peers + if (!replaceBadEntry(entry)) // if no bad peers ping a questionable one + pingQuestionable(entry); + + } + + + + void KBucket::onTimeout(RPCCall* c) + { + if (!pending_entries_busy_pinging.contains(c)) + return; + + KBucketEntry entry = pending_entries_busy_pinging[c]; + + // replace the entry which timed out + QValueList<KBucketEntry>::iterator i; + for (i = entries.begin();i != entries.end();i++) + { + KBucketEntry & e = *i; + if (e.getAddress() == c->getRequest()->getOrigin()) + { + last_modified = bt::GetCurrentTime(); + entries.remove(i); + entries.append(entry); + break; + } + } + pending_entries_busy_pinging.erase(c); // call is done so erase it + // see if we can do another pending entry + if (pending_entries_busy_pinging.count() < 2 && pending_entries.count() > 0) + { + KBucketEntry pe = pending_entries.front(); + pending_entries.pop_front(); + if (!replaceBadEntry(pe)) // if no bad peers ping a questionable one + pingQuestionable(pe); + } + } + + void KBucket::pingQuestionable(const KBucketEntry & replacement_entry) + { + if (pending_entries_busy_pinging.count() >= 2) + { + pending_entries.append(replacement_entry); // lets not have to many pending_entries calls going on + return; + } + + QValueList<KBucketEntry>::iterator i; + // we haven't found any bad ones so try the questionable ones + for (i = entries.begin();i != entries.end();i++) + { + KBucketEntry & e = *i; + if (e.isQuestionable()) + { + Out(SYS_DHT|LOG_DEBUG) << "Pinging questionable node : " << e.getAddress().toString() << endl; + PingReq* p = new PingReq(node->getOurID()); + p->setDestination(e.getAddress()); + RPCCall* c = srv->doCall(p); + if (c) + { + e.onPingQuestionable(); + c->addListener(this); + // add the pending entry + pending_entries_busy_pinging.insert(c,replacement_entry); + return; + } + } + } + } + + bool KBucket::replaceBadEntry(const KBucketEntry & entry) + { + QValueList<KBucketEntry>::iterator i; + for (i = entries.begin();i != entries.end();i++) + { + KBucketEntry & e = *i; + if (e.isBad()) + { + // bad one get rid of it + last_modified = bt::GetCurrentTime(); + entries.remove(i); + entries.append(entry); + return true; + } + } + return false; + } + + bool KBucket::contains(const KBucketEntry & entry) const + { + return entries.contains(entry); + } + + void KBucket::findKClosestNodes(KClosestNodesSearch & kns) + { + QValueList<KBucketEntry>::iterator i = entries.begin(); + while (i != entries.end()) + { + kns.tryInsert(*i); + i++; + } + } + + bool KBucket::onTimeout(const KInetSocketAddress & addr) + { + QValueList<KBucketEntry>::iterator i; + + for (i = entries.begin();i != entries.end();i++) + { + KBucketEntry & e = *i; + if (e.getAddress() == addr) + { + e.requestTimeout(); + return true; + } + } + return false; + } + + bool KBucket::needsToBeRefreshed() const + { + bt::TimeStamp now = bt::GetCurrentTime(); + if (last_modified > now) + { + last_modified = now; + return false; + } + + return !refresh_task && entries.count() > 0 && (now - last_modified > BUCKET_REFRESH_INTERVAL); + } + + void KBucket::updateRefreshTimer() + { + last_modified = bt::GetCurrentTime(); + } + + + + void KBucket::save(bt::File & fptr) + { + BucketHeader hdr; + hdr.magic = BUCKET_MAGIC_NUMBER; + hdr.index = idx; + hdr.num_entries = entries.count(); + + fptr.write(&hdr,sizeof(BucketHeader)); + QValueList<KBucketEntry>::iterator i; + for (i = entries.begin();i != entries.end();i++) + { + KBucketEntry & e = *i; + const KIpAddress & ip = e.getAddress().ipAddress(); + Uint8 tmp[26]; + bt::WriteUint32(tmp,0,ip.IPv4Addr()); + bt::WriteUint16(tmp,4,e.getAddress().port()); + memcpy(tmp+6,e.getID().getData(),20); + fptr.write(tmp,26); + } + } + + void KBucket::load(bt::File & fptr,const BucketHeader & hdr) + { + if (hdr.num_entries > K) + return; + + for (Uint32 i = 0;i < hdr.num_entries;i++) + { + Uint8 tmp[26]; + if (fptr.read(tmp,26) != 26) + return; + + entries.append(KBucketEntry( + KInetSocketAddress( + KIpAddress(bt::ReadUint32(tmp,0)), + bt::ReadUint16(tmp,4)), + dht::Key(tmp+6))); + } + } + + void KBucket::onFinished(Task* t) + { + if (t == refresh_task) + refresh_task = 0; + } + + void KBucket::setRefreshTask(Task* t) + { + refresh_task = t; + if (refresh_task) + { + connect(refresh_task,SIGNAL(finished( Task* )), + this,SLOT(onFinished( Task* ))); + } + } + +} + +#include "kbucket.moc" diff --git a/libktorrent/kademlia/kbucket.h b/libktorrent/kademlia/kbucket.h new file mode 100644 index 0000000..139ce10 --- /dev/null +++ b/libktorrent/kademlia/kbucket.h @@ -0,0 +1,212 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTKBUCKET_H +#define DHTKBUCKET_H + +#include <qvaluelist.h> +#include <util/constants.h> +#include <ksocketaddress.h> +#include "key.h" +#include "rpccall.h" +#include "task.h" + +using bt::Uint32; +using bt::Uint16; +using bt::Uint8; +using KNetwork::KInetSocketAddress; + +namespace bt +{ + class File; +} + +namespace dht +{ + class RPCServer; + class KClosestNodesSearch; + class Node; + class Task; + + const Uint32 K = 8; + const Uint32 BUCKET_MAGIC_NUMBER = 0xB0C4B0C4; + const Uint32 BUCKET_REFRESH_INTERVAL = 15 * 60 * 1000; +// const Uint32 BUCKET_REFRESH_INTERVAL = 120 * 1000; + + struct BucketHeader + { + Uint32 magic; + Uint32 index; + Uint32 num_entries; + }; + + /** + * @author Joris Guisson + * + * Entry in a KBucket, it basically contains an ip_address of a node, + * the udp port of the node and a node_id. + */ + class KBucketEntry + { + KInetSocketAddress addr; + Key node_id; + bt::TimeStamp last_responded; + Uint32 failed_queries; + Uint32 questionable_pings; + public: + /** + * Constructor, sets everything to 0. + * @return + */ + KBucketEntry(); + + /** + * Constructor, set the ip, port and key + * @param addr socket address + * @param id ID of node + */ + KBucketEntry(const KInetSocketAddress & addr,const Key & id); + + /** + * Copy constructor. + * @param other KBucketEntry to copy + * @return + */ + KBucketEntry(const KBucketEntry & other); + + /// Destructor + virtual ~KBucketEntry(); + + /** + * Assignment operator. + * @param other Node to copy + * @return this KBucketEntry + */ + KBucketEntry & operator = (const KBucketEntry & other); + + /// Equality operator + bool operator == (const KBucketEntry & entry) const; + + /// Get the socket address of the node + const KInetSocketAddress & getAddress() const {return addr;} + + /// Get it's ID + const Key & getID() const {return node_id;} + + /// Is this node a good node + bool isGood() const; + + /// Is this node questionable (haven't heard from it in the last 15 minutes) + bool isQuestionable() const; + + /// Is it a bad node. (Hasn't responded to a query + bool isBad() const; + + /// Signal the entry that the peer has responded + void hasResponded(); + + /// A request timed out + void requestTimeout() {failed_queries++;} + + /// The entry has been pinged because it is questionable + void onPingQuestionable() {questionable_pings++;} + + /// The null entry + static KBucketEntry null; + }; + + + /** + * @author Joris Guisson + * + * A KBucket is just a list of KBucketEntry objects. + * The list is sorted by time last seen : + * The first element is the least recently seen, the last + * the most recently seen. + */ + class KBucket : public RPCCallListener + { + Q_OBJECT + + Uint32 idx; + QValueList<KBucketEntry> entries,pending_entries; + RPCServer* srv; + Node* node; + QMap<RPCCall*,KBucketEntry> pending_entries_busy_pinging; + mutable bt::TimeStamp last_modified; + Task* refresh_task; + public: + KBucket(Uint32 idx,RPCServer* srv,Node* node); + virtual ~KBucket(); + + /** + * Inserts an entry into the bucket. + * @param entry The entry to insert + */ + void insert(const KBucketEntry & entry); + + /// Get the least recently seen node + const KBucketEntry & leastRecentlySeen() const {return entries[0];} + + /// Get the number of entries + Uint32 getNumEntries() const {return entries.count();} + + /// See if this bucket contains an entry + bool contains(const KBucketEntry & entry) const; + + /** + * Find the K closest entries to a key and store them in the KClosestNodesSearch + * object. + * @param kns The object to storre the search results + */ + void findKClosestNodes(KClosestNodesSearch & kns); + + /** + * A peer failed to respond + * @param addr Address of the peer + */ + bool onTimeout(const KInetSocketAddress & addr); + + /// Check if the bucket needs to be refreshed + bool needsToBeRefreshed() const; + + /// save the bucket to a file + void save(bt::File & fptr); + + /// Load the bucket from a file + void load(bt::File & fptr,const BucketHeader & hdr); + + /// Update the refresh timer of the bucket + void updateRefreshTimer(); + + /// Set the refresh task + void setRefreshTask(Task* t); + + private: + virtual void onResponse(RPCCall* c,MsgBase* rsp); + virtual void onTimeout(RPCCall* c); + void pingQuestionable(const KBucketEntry & replacement_entry); + bool replaceBadEntry(const KBucketEntry & entry); + + private slots: + void onFinished(Task* t); + }; +} + +#endif diff --git a/libktorrent/kademlia/kclosestnodessearch.cpp b/libktorrent/kademlia/kclosestnodessearch.cpp new file mode 100644 index 0000000..4a97c7f --- /dev/null +++ b/libktorrent/kademlia/kclosestnodessearch.cpp @@ -0,0 +1,84 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <util/functions.h> +#include "kclosestnodessearch.h" +#include "pack.h" + +using namespace bt; +using namespace KNetwork; + +namespace dht +{ + typedef std::map<dht::Key,KBucketEntry>::iterator KNSitr; + + KClosestNodesSearch::KClosestNodesSearch(const dht::Key & key,Uint32 max_entries) + : key(key),max_entries(max_entries) + {} + + + KClosestNodesSearch::~KClosestNodesSearch() + {} + + + void KClosestNodesSearch::tryInsert(const KBucketEntry & e) + { + // calculate distance between key and e + dht::Key d = dht::Key::distance(key,e.getID()); + + if (emap.size() < max_entries) + { + // room in the map so just insert + emap.insert(std::make_pair(d,e)); + } + else + { + // now find the max distance + // seeing that the last element of the map has also + // the biggest distance to key (std::map is sorted on the distance) + // we just take the last + const dht::Key & max = emap.rbegin()->first; + if (d < max) + { + // insert if d is smaller then max + emap.insert(std::make_pair(d,e)); + // erase the old max value + emap.erase(max); + } + } + + } + + void KClosestNodesSearch::pack(QByteArray & ba) + { + // make sure we do not writ to much + Uint32 max_items = ba.size() / 26; + Uint32 j = 0; + + KNSitr i = emap.begin(); + while (i != emap.end() && j < max_items) + { + PackBucketEntry(i->second,ba,j*26); + i++; + j++; + i++; + } + } + +} diff --git a/libktorrent/kademlia/kclosestnodessearch.h b/libktorrent/kademlia/kclosestnodessearch.h new file mode 100644 index 0000000..e006a25 --- /dev/null +++ b/libktorrent/kademlia/kclosestnodessearch.h @@ -0,0 +1,90 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTKCLOSESTNODESSEARCH_H +#define DHTKCLOSESTNODESSEARCH_H + +#include <map> +#include "key.h" +#include "kbucket.h" + +namespace dht +{ + + /** + * @author Joris Guisson <[email protected]> + * + * Class used to store the search results during a K closests nodes search + * Note: we use a std::map because of lack of functionality in QMap + */ + class KClosestNodesSearch + { + dht::Key key; + std::map<dht::Key,KBucketEntry> emap; + Uint32 max_entries; + public: + /** + * Constructor sets the key to compare with + * @param key The key to compare with + * @param max_entries The maximum number of entries can be in the map + * @return + */ + KClosestNodesSearch(const dht::Key & key,Uint32 max_entries); + virtual ~KClosestNodesSearch(); + + typedef std::map<dht::Key,KBucketEntry>::iterator Itr; + typedef std::map<dht::Key,KBucketEntry>::const_iterator CItr; + + Itr begin() {return emap.begin();} + Itr end() {return emap.end();} + + CItr begin() const {return emap.begin();} + CItr end() const {return emap.end();} + + /// Get the target key of the search3 + const dht::Key & getSearchTarget() const {return key;} + + /// Get the number of entries. + bt::Uint32 getNumEntries() const {return emap.size();} + + /** + * Try to insert an entry. + * @param e The entry + */ + void tryInsert(const KBucketEntry & e); + + /** + * Gets the required space in bytes to pack the nodes. + * This should be used to determin the size of the buffer + * passed to pack. + * @return 26 * number of entries + */ + Uint32 requiredSpace() const {return emap.size()* 26;} + + /** + * Pack the search results in a buffer, the buffer should have + * enough space to store requiredSpace() bytes. + * @param ba The buffer + */ + void pack(QByteArray & ba); + }; + +} + +#endif diff --git a/libktorrent/kademlia/key.cpp b/libktorrent/kademlia/key.cpp new file mode 100644 index 0000000..6e62ff6 --- /dev/null +++ b/libktorrent/kademlia/key.cpp @@ -0,0 +1,110 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <time.h> +#include <stdlib.h> +#include <qcstring.h> +#include <util/constants.h> +#include "key.h" + +using namespace bt; + +namespace dht +{ + + Key::Key() + {} + + Key::Key(const bt::SHA1Hash & k) : bt::SHA1Hash(k) + { + } + + Key::Key(const Uint8* d) : bt::SHA1Hash(d) + { + } + + Key::Key(const QByteArray & ba) + { + for (Uint32 i = 0;i < 20 && i < ba.size();i++) + hash[i] = ba[i]; + } + + Key::~Key() + {} + + bool Key::operator == (const Key & other) const + { + return bt::SHA1Hash::operator ==(other); + } + + bool Key::operator != (const Key & other) const + { + return !operator == (other); + } + + bool Key::operator < (const Key & other) const + { + for (int i = 0;i < 20;i++) + { + if (hash[i] < other.hash[i]) + return true; + else if (hash[i] > other.hash[i]) + return false; + } + return false; + } + + bool Key::operator <= (const Key & other) const + { + return operator < (other) || operator == (other); + } + + bool Key::operator > (const Key & other) const + { + for (int i = 0;i < 20;i++) + { + if (hash[i] < other.hash[i]) + return false; + else if (hash[i] > other.hash[i]) + return true; + } + return false; + } + + bool Key::operator >= (const Key & other) const + { + return operator > (other) || operator == (other); + } + + Key Key::distance(const Key & a,const Key & b) + { + return a ^ b; + } + + Key Key::random() + { + srand(time(0)); + Key k; + for (int i = 0;i < 20;i++) + { + k.hash[i] = (Uint8)rand() % 0xFF; + } + return k; + } +} diff --git a/libktorrent/kademlia/key.h b/libktorrent/kademlia/key.h new file mode 100644 index 0000000..e818dc1 --- /dev/null +++ b/libktorrent/kademlia/key.h @@ -0,0 +1,129 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTKEY_H +#define DHTKEY_H + +#include <qcstring.h> +#include <util/sha1hash.h> + + + +namespace dht +{ + + /** + * @author Joris Guisson + * @brief Key in the distributed hash table + * + * Key's in the distributed hash table are just SHA-1 hashes. + * Key provides all necesarry operators to be used as a value. + */ + class Key : public bt::SHA1Hash + { + public: + /** + * Constructor, sets key to 0. + */ + Key(); + + /** + * Copy constructor. Seeing that Key doesn't add any data + * we just pass a SHA1Hash, Key's are automatically covered by this + * @param k Hash to copy + */ + Key(const bt::SHA1Hash & k); + + /** + * Make a key out of a bytearray + * @param ba The QByteArray + */ + Key(const QByteArray & ba); + + /** + * Make a key out of a 20 byte array. + * @param d The array + */ + Key(const bt::Uint8* d); + + /// Destructor. + virtual ~Key(); + + /** + * Create a random key. + * @return A random Key + */ + static Key random(); + + /** + * Equality operator. + * @param other The key to compare + * @return true if this key is equal to other + */ + bool operator == (const Key & other) const; + + /** + * Inequality operator. + * @param other The key to compare + * @return true if this key is not equal to other + */ + bool operator != (const Key & other) const; + + /** + * Smaller then operator. + * @param other The key to compare + * @return rue if this key is smaller then other + */ + bool operator < (const Key & other) const; + + + /** + * Smaller then or equal operator. + * @param other The key to compare + * @return rue if this key is smaller then or equal to other + */ + bool operator <= (const Key & other) const; + + + /** + * Greater then operator. + * @param other The key to compare + * @return rue if this key is greater then other + */ + bool operator > (const Key & other) const; + + /** + * Greater then or equal operator. + * @param other The key to compare + * @return rue if this key is greater then or equal to other + */ + bool operator >= (const Key & other) const; + + /** + * The distance of two keys is the keys xor together. + * @param a The first key + * @param b The second key + * @return a xor b + */ + static Key distance(const Key & a,const Key & b); + }; + +} + +#endif diff --git a/libktorrent/kademlia/node.cpp b/libktorrent/kademlia/node.cpp new file mode 100644 index 0000000..96c39a4 --- /dev/null +++ b/libktorrent/kademlia/node.cpp @@ -0,0 +1,287 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ + +#include <util/log.h> +#include <util/file.h> +#include <util/fileops.h> +#include <util/functions.h> +#include <torrent/globals.h> +#include "node.h" +#include "rpcmsg.h" +#include "key.h" +#include "rpccall.h" +#include "rpcserver.h" +#include "kclosestnodessearch.h" +#include "dht.h" +#include "nodelookup.h" + +using namespace bt; +using namespace KNetwork; + +namespace dht +{ + static void SaveKey(const dht::Key & key,const QString & key_file) + { + bt::File fptr; + if (!fptr.open(key_file,"wb")) + { + Out(SYS_DHT|LOG_IMPORTANT) << "DHT: Cannot open file " << key_file << " : " << fptr.errorString() << endl; + return; + } + + fptr.write(key.getData(),20); + fptr.close(); + } + + static dht::Key LoadKey(const QString & key_file,bool & new_key) + { + bt::File fptr; + if (!fptr.open(key_file,"rb")) + { + Out(SYS_DHT|LOG_IMPORTANT) << "DHT: Cannot open file " << key_file << " : " << fptr.errorString() << endl; + dht::Key r = dht::Key::random(); + SaveKey(r,key_file); + new_key = true; + return r; + } + + Uint8 data[20]; + if (fptr.read(data,20) != 20) + { + dht::Key r = dht::Key::random(); + SaveKey(r,key_file); + new_key = true; + return r; + } + + new_key = false; + return dht::Key(data); + } + + Node::Node(RPCServer* srv,const QString & key_file) : srv(srv) + { + num_receives = 0; + num_entries = 0; + delete_table = false; + our_id = LoadKey(key_file,delete_table); + for (int i = 0;i < 160;i++) + bucket[i] = 0; + } + + + Node::~Node() + { + for (int i = 0;i < 160;i++) + { + KBucket* b = bucket[i]; + if (b) + delete b; + } + } + + Uint8 Node::findBucket(const dht::Key & id) + { + // XOR our id and the sender's ID + dht::Key d = dht::Key::distance(id,our_id); + // now use the first on bit to determin which bucket it should go in + + Uint8 bit_on = 0xFF; + for (Uint32 i = 0;i < 20;i++) + { + // get the byte + Uint8 b = *(d.getData() + i); + // no bit on in this byte so continue + if (b == 0x00) + continue; + + for (Uint8 j = 0;j < 8;j++) + { + if (b & (0x80 >> j)) + { + // we have found the bit + bit_on = (19 - i)*8 + (7 - j); + return bit_on; + } + } + } + return bit_on; + } + + void Node::recieved(DHT* dh_table,const MsgBase* msg) + { + Uint8 bit_on = findBucket(msg->getID()); + + // return if bit_on is not good + if (bit_on >= 160) + return; + + // make the bucket if it doesn't exist + if (!bucket[bit_on]) + bucket[bit_on] = new KBucket(bit_on,srv,this); + + // insert it into the bucket + KBucket* kb = bucket[bit_on]; + kb->insert(KBucketEntry(msg->getOrigin(),msg->getID())); + num_receives++; + if (num_receives == 3) + { + // do a node lookup upon our own id + // when we insert the first entry in the table + dh_table->findNode(our_id); + } + + num_entries = 0; + for (Uint32 i = 0;i < 160;i++) + if (bucket[i]) + num_entries += bucket[i]->getNumEntries(); + } + + void Node::findKClosestNodes(KClosestNodesSearch & kns) + { + // go over all buckets until + for (Uint32 i = 0;i < 160;i++) + { + if (bucket[i]) + { + bucket[i]->findKClosestNodes(kns); + } + } + } + + void Node::onTimeout(const MsgBase* msg) + { + for (Uint32 i = 0;i < 160;i++) + { + if (bucket[i] && bucket[i]->onTimeout(msg->getDestination())) + { + return; + } + } + } + + /// Generate a random key which lies in a certain bucket + Key RandomKeyInBucket(Uint32 b,const Key & our_id) + { + // first generate a random one + Key r = dht::Key::random(); + Uint8* data = (Uint8*)r.getData(); + + // before we hit bit b, everything needs to be equal to our_id + Uint8 nb = b / 8; + for (Uint8 i = 0;i < nb;i++) + data[i] = *(our_id.getData() + i); + + + // copy all bits of ob, until we hit the bit which needs to be different + Uint8 ob = *(our_id.getData() + nb); + for (Uint8 j = 0;j < b % 8;j++) + { + if ((0x80 >> j) & ob) + data[nb] |= (0x80 >> j); + else + data[nb] &= ~(0x80 >> j); + } + + // if the bit b is on turn it off else turn it on + if ((0x80 >> (b % 8)) & ob) + data[nb] &= ~(0x80 >> (b % 8)); + else + data[nb] |= (0x80 >> (b % 8)); + + return Key(data); + } + + void Node::refreshBuckets(DHT* dh_table) + { + for (Uint32 i = 0;i < 160;i++) + { + KBucket* b = bucket[i]; + if (b && b->needsToBeRefreshed()) + { + // the key needs to be the refreshed + NodeLookup* nl = dh_table->refreshBucket(RandomKeyInBucket(i,our_id),*b); + if (nl) + b->setRefreshTask(nl); + } + } + } + + + void Node::saveTable(const QString & file) + { + bt::File fptr; + if (!fptr.open(file,"wb")) + { + Out(SYS_DHT|LOG_IMPORTANT) << "DHT: Cannot open file " << file << " : " << fptr.errorString() << endl; + return; + } + + for (Uint32 i = 0;i < 160;i++) + { + KBucket* b = bucket[i]; + if (b) + { + b->save(fptr); + } + } + } + + void Node::loadTable(const QString & file) + { + if (delete_table) + { + delete_table = false; + bt::Delete(file,true); + Out(SYS_DHT|LOG_IMPORTANT) << "DHT: new key, so removing table" << endl; + return; + } + + bt::File fptr; + if (!fptr.open(file,"rb")) + { + Out(SYS_DHT|LOG_IMPORTANT) << "DHT: Cannot open file " << file << " : " << fptr.errorString() << endl; + return; + } + + num_entries = 0; + while (!fptr.eof()) + { + BucketHeader hdr; + if (fptr.read(&hdr,sizeof(BucketHeader)) != sizeof(BucketHeader)) + return; + + if (hdr.magic != dht::BUCKET_MAGIC_NUMBER || hdr.num_entries > dht::K || hdr.index > 160) + return; + + if (hdr.num_entries == 0) + continue; + + Out(SYS_DHT|LOG_NOTICE) << "DHT: Loading bucket " << hdr.index << endl; + if (bucket[hdr.index]) + delete bucket[hdr.index]; + + bucket[hdr.index] = new KBucket(hdr.index,srv,this); + bucket[hdr.index]->load(fptr,hdr); + num_entries += bucket[hdr.index]->getNumEntries(); + } + } +} + +#include "node.moc" diff --git a/libktorrent/kademlia/node.h b/libktorrent/kademlia/node.h new file mode 100644 index 0000000..56f41f1 --- /dev/null +++ b/libktorrent/kademlia/node.h @@ -0,0 +1,103 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTNODE_H +#define DHTNODE_H + +#include <qobject.h> +#include "key.h" +#include "kbucket.h" + + +using bt::Uint8; + +namespace dht +{ + class DHT; + class MsgBase; + class RPCServer; + class KClosestNodesSearch; + + /** + * @author Joris Guisson + * + * A Node represents us in the kademlia network. It contains + * our id and 160 KBucket's. + * A KBucketEntry is in node i, when the difference between our id and + * the KBucketEntry's id is between 2 to the power i and 2 to the power i+1. + */ + class Node : public QObject + { + Q_OBJECT + public: + Node(RPCServer* srv,const QString & key_file); + virtual ~Node(); + + /** + * An RPC message was received, the node must now update + * the right bucket. + * @param dh_table The DHT + * @param msg The message + * @param srv The RPCServer to send a ping if necessary + */ + void recieved(DHT* dh_table,const MsgBase* msg); + + /// Get our own ID + const dht::Key & getOurID() const {return our_id;} + + /** + * Find the K closest entries to a key and store them in the KClosestNodesSearch + * object. + * @param kns The object to storre the search results + */ + void findKClosestNodes(KClosestNodesSearch & kns); + + /** + * Increase the failed queries count of the bucket entry we sent the message to + */ + void onTimeout(const MsgBase* msg); + + /// Check if a buckets needs to be refreshed, and refresh if necesarry + void refreshBuckets(DHT* dh_table); + + /// Save the routing table to a file + void saveTable(const QString & file); + + /// Load the routing table from a file + void loadTable(const QString & file); + + /// Get the number of entries in the routing table + Uint32 getNumEntriesInRoutingTable() const {return num_entries;} + private: + Uint8 findBucket(const dht::Key & id); + + + + private: + dht::Key our_id; + KBucket* bucket[160]; + RPCServer* srv; + Uint32 num_receives; + Uint32 num_entries; + bool delete_table; + }; + +} + +#endif diff --git a/libktorrent/kademlia/nodelookup.cpp b/libktorrent/kademlia/nodelookup.cpp new file mode 100644 index 0000000..9fa616c --- /dev/null +++ b/libktorrent/kademlia/nodelookup.cpp @@ -0,0 +1,98 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <util/log.h> +#include <torrent/globals.h> +#include "nodelookup.h" +#include "rpcmsg.h" +#include "node.h" +#include "pack.h" + +using namespace bt; + +namespace dht +{ + + NodeLookup::NodeLookup(const dht::Key & key,RPCServer* rpc,Node* node) + : Task(rpc,node),node_id(key),num_nodes_rsp(0) + { + } + + + NodeLookup::~NodeLookup() + {} + + + void NodeLookup::callFinished(RPCCall* ,MsgBase* rsp) + { + // Out() << "NodeLookup::callFinished" << endl; + if (isFinished()) + return; + + // check the response and see if it is a good one + if (rsp->getMethod() == dht::FIND_NODE && rsp->getType() == dht::RSP_MSG) + { + FindNodeRsp* fnr = (FindNodeRsp*)rsp; + const QByteArray & nodes = fnr->getNodes(); + Uint32 nnodes = nodes.size() / 26; + for (Uint32 j = 0;j < nnodes;j++) + { + // unpack an entry and add it to the todo list + KBucketEntry e = UnpackBucketEntry(nodes,j*26); + // lets not talk to ourself + if (e.getID() != node->getOurID() && !todo.contains(e) && !visited.contains(e)) + todo.append(e); + } + num_nodes_rsp++; + } + } + + void NodeLookup::callTimeout(RPCCall*) + { + // Out() << "NodeLookup::callTimeout" << endl; + } + + void NodeLookup::update() + { + // Out() << "NodeLookup::update" << endl; + // Out() << "todo = " << todo.count() << " ; visited = " << visited.count() << endl; + // go over the todo list and send find node calls + // until we have nothing left + while (!todo.empty() && canDoRequest()) + { + KBucketEntry e = todo.first(); + // only send a findNode if we haven't allrready visited the node + if (!visited.contains(e)) + { + // send a findNode to the node + FindNodeReq* fnr = new FindNodeReq(node->getOurID(),node_id); + fnr->setOrigin(e.getAddress()); + rpcCall(fnr); + visited.append(e); + } + // remove the entry from the todo list + todo.pop_front(); + } + + if (todo.empty() && getNumOutstandingRequests() == 0 && !isFinished()) + done(); + else if (num_nodes_rsp > 50) + done(); // quit after 50 nodes responses + } +} diff --git a/libktorrent/kademlia/nodelookup.h b/libktorrent/kademlia/nodelookup.h new file mode 100644 index 0000000..ff19e92 --- /dev/null +++ b/libktorrent/kademlia/nodelookup.h @@ -0,0 +1,52 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTNODELOOKUP_H +#define DHTNODELOOKUP_H + +#include "key.h" +#include "task.h" + +namespace dht +{ + class Node; + class RPCServer; + + /** + * @author Joris Guisson <[email protected]> + * + * Task to do a node lookup. + */ + class NodeLookup : public Task + { + public: + NodeLookup(const dht::Key & node_id,RPCServer* rpc,Node* node); + virtual ~NodeLookup(); + + virtual void update(); + virtual void callFinished(RPCCall* c, MsgBase* rsp); + virtual void callTimeout(RPCCall* c); + private: + dht::Key node_id; + bt::Uint32 num_nodes_rsp; + }; + +} + +#endif diff --git a/libktorrent/kademlia/pack.cpp b/libktorrent/kademlia/pack.cpp new file mode 100644 index 0000000..a5acafb --- /dev/null +++ b/libktorrent/kademlia/pack.cpp @@ -0,0 +1,62 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <util/error.h> +#include <util/functions.h> +#include "pack.h" + +using namespace bt; +using namespace KNetwork; + +namespace dht +{ + + void PackBucketEntry(const KBucketEntry & e,QByteArray & ba,Uint32 off) + { + // first check size + if (off + 26 > ba.size()) + throw bt::Error("Not enough room in buffer"); + + Uint8* data = (Uint8*)ba.data(); + Uint8* ptr = data + off; + + const KInetSocketAddress & addr = e.getAddress(); + // copy ID, IP address and port into the buffer + memcpy(ptr,e.getID().getData(),20); + bt::WriteUint32(ptr,20,addr.ipAddress().IPv4Addr()); + bt::WriteUint16(ptr,24,addr.port()); + } + + KBucketEntry UnpackBucketEntry(const QByteArray & ba,Uint32 off) + { + if (off + 26 > ba.size()) + throw bt::Error("Not enough room in buffer"); + + const Uint8* data = (Uint8*)ba.data(); + const Uint8* ptr = data + off; + + // get the port, ip and key); + Uint16 port = bt::ReadUint16(ptr,24); + Uint8 key[20]; + memcpy(key,ptr,20); + + return KBucketEntry(KInetSocketAddress(KIpAddress(ptr+20,4),port),dht::Key(key)); + } + +} diff --git a/libktorrent/kademlia/pack.h b/libktorrent/kademlia/pack.h new file mode 100644 index 0000000..dab1523 --- /dev/null +++ b/libktorrent/kademlia/pack.h @@ -0,0 +1,48 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTPACK_H +#define DHTPACK_H + +#include "kbucket.h" + +namespace dht +{ + + /** + * Pack a KBucketEntry into a byte array. + * If the array is not large enough, an error will be thrown + * @param e The entry + * @param ba The byte array + * @param off The offset into the array + */ + void PackBucketEntry(const KBucketEntry & e,QByteArray & ba,Uint32 off); + + /** + * Unpack a KBucketEntry from a byte array. + * If a full entry cannot be read an error will be thrown. + * @param ba The byte array + * @param off The offset + * @return The entry + */ + KBucketEntry UnpackBucketEntry(const QByteArray & ba,Uint32 off); + +} + +#endif diff --git a/libktorrent/kademlia/rpccall.cpp b/libktorrent/kademlia/rpccall.cpp new file mode 100644 index 0000000..b86e8f7 --- /dev/null +++ b/libktorrent/kademlia/rpccall.cpp @@ -0,0 +1,79 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include "dht.h" +#include "rpcmsg.h" +#include "rpccall.h" +#include "rpcserver.h" + +namespace dht +{ + RPCCallListener::RPCCallListener() + {} + + RPCCallListener::~RPCCallListener() + { + } + + RPCCall::RPCCall(RPCServer* rpc,MsgBase* msg,bool queued) : msg(msg),rpc(rpc),queued(queued) + { + connect(&timer,SIGNAL(timeout()),this,SLOT(onTimeout())); + if (!queued) + timer.start(30*1000,true); + } + + + RPCCall::~RPCCall() + { + delete msg; + } + + void RPCCall::start() + { + queued = false; + timer.start(30*1000,true); + } + + void RPCCall::onTimeout() + { + onCallTimeout(this); + rpc->timedOut(msg->getMTID()); + } + + void RPCCall::response(MsgBase* rsp) + { + onCallResponse(this,rsp); + } + + Method RPCCall::getMsgMethod() const + { + if (msg) + return msg->getMethod(); + else + return dht::NONE; + } + + void RPCCall::addListener(RPCCallListener* cl) + { + connect(this,SIGNAL(onCallResponse( RPCCall*, MsgBase* )),cl,SLOT(onResponse( RPCCall*, MsgBase* ))); + connect(this,SIGNAL(onCallTimeout( RPCCall* )),cl,SLOT(onTimeout( RPCCall* ))); + } + +} +#include "rpccall.moc" diff --git a/libktorrent/kademlia/rpccall.h b/libktorrent/kademlia/rpccall.h new file mode 100644 index 0000000..6e54933 --- /dev/null +++ b/libktorrent/kademlia/rpccall.h @@ -0,0 +1,110 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTRPCCALL_H +#define DHTRPCCALL_H + +#include <qtimer.h> +#include "key.h" +#include "rpcmsg.h" + +namespace dht +{ + class RPCServer; + class RPCCall; + + /** + * Class which objects should derive from, if they want to know the result of a call. + */ + class RPCCallListener : public QObject + { + Q_OBJECT + public: + RPCCallListener(); + virtual ~RPCCallListener(); + + public slots: + /** + * A response was received. + * @param c The call + * @param rsp The response + */ + virtual void onResponse(RPCCall* c,MsgBase* rsp) = 0; + + /** + * The call has timed out. + * @param c The call + */ + virtual void onTimeout(RPCCall* c) = 0; + + }; + + /** + * @author Joris Guisson + */ + class RPCCall : public QObject + { + Q_OBJECT + public: + RPCCall(RPCServer* rpc,MsgBase* msg,bool queued); + virtual ~RPCCall(); + + /** + * Called when a queued call gets started. Starts the timeout timer. + */ + void start(); + + /** + * Called by the server if a response is received. + * @param rsp + */ + void response(MsgBase* rsp); + + /** + * Add a listener for this call + * @param cl The listener + */ + void addListener(RPCCallListener* cl); + + /// Get the message type + Method getMsgMethod() const; + + /// Get the request sent + const MsgBase* getRequest() const {return msg;} + + /// Get the request sent + MsgBase* getRequest() {return msg;} + + private slots: + void onTimeout(); + + signals: + void onCallResponse(RPCCall* c,MsgBase* rsp); + void onCallTimeout(RPCCall* c); + + private: + MsgBase* msg; + QTimer timer; + RPCServer* rpc; + bool queued; + }; + +} + +#endif diff --git a/libktorrent/kademlia/rpcmsg.cpp b/libktorrent/kademlia/rpcmsg.cpp new file mode 100644 index 0000000..97364e1 --- /dev/null +++ b/libktorrent/kademlia/rpcmsg.cpp @@ -0,0 +1,596 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <util/log.h> +#include <torrent/bnode.h> +#include <torrent/globals.h> +#include <torrent/bencoder.h> +#include "rpcmsg.h" +#include "rpccall.h" +#include "rpcserver.h" +#include "dht.h" + +using namespace bt; + +namespace dht +{ + const QString TID = "t"; + const QString REQ = "q"; + const QString RSP = "r"; + const QString TYP = "y"; + const QString ARG = "a"; + // ERR apparently is defined as a macro on solaris in some header file, + // which causes things not to compile on it, so we have changed it to ERR_DHT + const QString ERR_DHT = "e"; + + + MsgBase* MakeMsg(bt::BDictNode* dict); + + + MsgBase* ParseReq(bt::BDictNode* dict) + { + BValueNode* vn = dict->getValue(REQ); + BDictNode* args = dict->getDict(ARG); + if (!vn || !args) + return 0; + + if (!args->getValue("id")) + return 0; + + if (!dict->getValue(TID)) + return 0; + + Key id = Key(args->getValue("id")->data().toByteArray()); + QByteArray mtid_d = dict->getValue(TID)->data().toByteArray(); + if (mtid_d.size() == 0) + return 0; + Uint8 mtid = (Uint8)mtid_d.at(0); + MsgBase* msg = 0; + + QString str = vn->data().toString(); + if (str == "ping") + { + msg = new PingReq(id); + } + else if (str == "find_node") + { + if (args->getValue("target")) + msg = new FindNodeReq(id,Key(args->getValue("target")->data().toByteArray())); + } + else if (str == "get_peers") + { + if (args->getValue("info_hash")) + msg = new GetPeersReq(id,Key(args->getValue("info_hash")->data().toByteArray())); + } + else if (str == "announce_peer") + { + if (args->getValue("info_hash") && args->getValue("port") && args->getValue("token")) + { + msg = new AnnounceReq(id, + Key(args->getValue("info_hash")->data().toByteArray()), + args->getValue("port")->data().toInt(), + Key(args->getValue("token")->data().toByteArray())); + } + } + + if (msg) + msg->setMTID(mtid); + + return msg; + } + + MsgBase* ParseRsp(bt::BDictNode* dict,dht::Method req_method,Uint8 mtid) + { + BDictNode* args = dict->getDict(RSP); + if (!args || !args->getValue("id")) + return 0; + + Key id = Key(args->getValue("id")->data().toByteArray()); + + switch (req_method) + { + case PING : + return new PingRsp(mtid,id); + case FIND_NODE : + if (!args->getValue("nodes")) + return 0; + else + return new FindNodeRsp(mtid,id,args->getValue("nodes")->data().toByteArray()); + case GET_PEERS : + if (args->getValue("token")) + { + Key token = args->getValue("token")->data().toByteArray(); + QByteArray data; + BListNode* vals = args->getList("values"); + DBItemList dbl; + if (vals) + { + for (Uint32 i = 0;i < vals->getNumChildren();i++) + { + BValueNode* vn = dynamic_cast<BValueNode*>(vals->getChild(i)); + if (!vn) + continue; + dbl.append(DBItem((Uint8*)vn->data().toByteArray().data())); + } + return new GetPeersRsp(mtid,id,dbl,token); + } + else if (args->getValue("nodes")) + { + data = args->getValue("nodes")->data().toByteArray(); + return new GetPeersRsp(mtid,id,data,token); + } + else + { + Out(SYS_DHT|LOG_DEBUG) << "No nodes or values in get_peers response" << endl; + return 0; + } + } + else + { + Out(SYS_DHT|LOG_DEBUG) << "No token in get_peers response" << endl; + } + case ANNOUNCE_PEER : + return new AnnounceRsp(mtid,id); + default: + return 0; + } + return 0; + } + + MsgBase* ParseRsp(bt::BDictNode* dict,RPCServer* srv) + { + BDictNode* args = dict->getDict(RSP); + if (!args || !dict->getValue(TID)) + { + Out(SYS_DHT|LOG_DEBUG) << "ParseRsp : args || !args->getValue(id) || !dict->getValue(TID)" << endl; + return 0; + } + + + QByteArray ba = dict->getValue(TID)->data().toByteArray(); + // check for empty byte arrays should prevent 144416 + if (ba.size() == 0) + return 0; + + Uint8 mtid = (Uint8)ba.at(0); + // find the call + const RPCCall* c = srv->findCall(mtid); + if (!c) + { + Out(SYS_DHT|LOG_DEBUG) << "Cannot find RPC call" << endl; + return 0; + } + + return ParseRsp(dict,c->getMsgMethod(),mtid); + } + + MsgBase* ParseErr(bt::BDictNode* dict) + { + BValueNode* vn = dict->getValue(RSP); + BDictNode* args = dict->getDict(ARG); + if (!vn || !args || !args->getValue("id") || !dict->getValue(TID)) + return 0; + + Key id = Key(args->getValue("id")->data().toByteArray()); + QString mt_id = dict->getValue(TID)->data().toString(); + if (mt_id.length() == 0) + return 0; + + Uint8 mtid = (char)mt_id.at(0).latin1(); + QString str = vn->data().toString(); + + return new ErrMsg(mtid,id,str); + } + + + MsgBase* MakeRPCMsg(bt::BDictNode* dict,RPCServer* srv) + { + BValueNode* vn = dict->getValue(TYP); + if (!vn) + return 0; + + if (vn->data().toString() == REQ) + { + return ParseReq(dict); + } + else if (vn->data().toString() == RSP) + { + return ParseRsp(dict,srv); + } + else if (vn->data().toString() == ERR_DHT) + { + return ParseErr(dict); + } + + return 0; + } + + MsgBase* MakeRPCMsgTest(bt::BDictNode* dict,dht::Method req_method) + { + BValueNode* vn = dict->getValue(TYP); + if (!vn) + return 0; + + if (vn->data().toString() == REQ) + { + return ParseReq(dict); + } + else if (vn->data().toString() == RSP) + { + return ParseRsp(dict,req_method,0); + } + else if (vn->data().toString() == ERR_DHT) + { + return ParseErr(dict); + } + + return 0; + } + + MsgBase::MsgBase(Uint8 mtid,Method m,Type type,const Key & id) + : mtid(mtid),method(m),type(type),id(id) + {} + + MsgBase::~MsgBase() + {} + + //////////////////////////////// + + PingReq::PingReq(const Key & id) : MsgBase(0xFF,PING,REQ_MSG,id) + { + } + + PingReq::~PingReq() + {} + + void PingReq::apply(DHT* dh_table) + { + dh_table->ping(this); + } + + void PingReq::print() + { + Out(SYS_DHT|LOG_DEBUG) << QString("REQ: %1 %2 : ping").arg(mtid).arg(id.toString()) << endl; + } + + void PingReq::encode(QByteArray & arr) + { + BEncoder enc(new BEncoderBufferOutput(arr)); + enc.beginDict(); + { + enc.write(ARG); enc.beginDict(); + { + enc.write("id"); enc.write(id.getData(),20); + } + enc.end(); + enc.write(REQ); enc.write("ping"); + enc.write(TID); enc.write(&mtid,1); + enc.write(TYP); enc.write(REQ); + } + enc.end(); + } + + //////////////////////////////// + + FindNodeReq::FindNodeReq(const Key & id,const Key & target) + : MsgBase(0xFF,FIND_NODE,REQ_MSG,id),target(target) + {} + + FindNodeReq::~FindNodeReq() + {} + + void FindNodeReq::apply(DHT* dh_table) + { + dh_table->findNode(this); + } + + void FindNodeReq::print() + { + Out(SYS_DHT|LOG_NOTICE) << QString("REQ: %1 %2 : find_node %3") + .arg(mtid).arg(id.toString()).arg(target.toString()) << endl; + } + + void FindNodeReq::encode(QByteArray & arr) + { + BEncoder enc(new BEncoderBufferOutput(arr)); + enc.beginDict(); + { + enc.write(ARG); enc.beginDict(); + { + enc.write("id"); enc.write(id.getData(),20); + enc.write("target"); enc.write(target.getData(),20); + } + enc.end(); + enc.write(REQ); enc.write("find_node"); + enc.write(TID); enc.write(&mtid,1); + enc.write(TYP); enc.write(REQ); + } + enc.end(); + } + + //////////////////////////////// + + //////////////////////////////// + GetPeersReq::GetPeersReq(const Key & id,const Key & info_hash) + : MsgBase(0xFF,GET_PEERS,REQ_MSG,id),info_hash(info_hash) + {} + + GetPeersReq::~GetPeersReq() + {} + + void GetPeersReq::apply(DHT* dh_table) + { + dh_table->getPeers(this); + } + + void GetPeersReq::print() + { + Out(SYS_DHT|LOG_DEBUG) << QString("REQ: %1 %2 : get_peers %3") + .arg(mtid).arg(id.toString()).arg(info_hash.toString()) << endl; + } + + void GetPeersReq::encode(QByteArray & arr) + { + BEncoder enc(new BEncoderBufferOutput(arr)); + enc.beginDict(); + { + enc.write(ARG); enc.beginDict(); + { + enc.write("id"); enc.write(id.getData(),20); + enc.write("info_hash"); enc.write(info_hash.getData(),20); + } + enc.end(); + enc.write(REQ); enc.write("get_peers"); + enc.write(TID); enc.write(&mtid,1); + enc.write(TYP); enc.write(REQ); + } + enc.end(); + } + + //////////////////////////////// + + AnnounceReq::AnnounceReq(const Key & id,const Key & info_hash,Uint16 port,const Key & token) + : GetPeersReq(id,info_hash),port(port),token(token) + { + method = dht::ANNOUNCE_PEER; + } + + AnnounceReq::~AnnounceReq() {} + + void AnnounceReq::apply(DHT* dh_table) + { + dh_table->announce(this); + } + + void AnnounceReq::print() + { + Out(SYS_DHT|LOG_DEBUG) << QString("REQ: %1 %2 : announce_peer %3 %4 %5") + .arg(mtid).arg(id.toString()).arg(info_hash.toString()) + .arg(port).arg(token.toString()) << endl; + } + + void AnnounceReq::encode(QByteArray & arr) + { + BEncoder enc(new BEncoderBufferOutput(arr)); + enc.beginDict(); + { + enc.write(ARG); enc.beginDict(); + { + enc.write("id"); enc.write(id.getData(),20); + enc.write("info_hash"); enc.write(info_hash.getData(),20); + enc.write("port"); enc.write((Uint32)port); + enc.write("token"); enc.write(token.getData(),20); + } + enc.end(); + enc.write(REQ); enc.write("announce_peer"); + enc.write(TID); enc.write(&mtid,1); + enc.write(TYP); enc.write(REQ); + } + enc.end(); + } + + //////////////////////////////// + + PingRsp::PingRsp(Uint8 mtid,const Key & id) + : MsgBase(mtid,PING,RSP_MSG,id) + {} + + PingRsp::~PingRsp() {} + + void PingRsp::apply(DHT* dh_table) + { + dh_table->response(this); + } + + void PingRsp::print() + { + Out(SYS_DHT|LOG_DEBUG) << QString("RSP: %1 %2 : ping") + .arg(mtid).arg(id.toString()) << endl; + } + + void PingRsp::encode(QByteArray & arr) + { + BEncoder enc(new BEncoderBufferOutput(arr)); + enc.beginDict(); + { + enc.write(RSP); enc.beginDict(); + { + enc.write("id"); enc.write(id.getData(),20); + } + enc.end(); + enc.write(TID); enc.write(&mtid,1); + enc.write(TYP); enc.write(RSP); + } + enc.end(); + } + + //////////////////////////////// + + FindNodeRsp::FindNodeRsp(Uint8 mtid,const Key & id,const QByteArray & nodes) + : MsgBase(mtid,FIND_NODE,RSP_MSG,id),nodes(nodes) + {} + + FindNodeRsp::~FindNodeRsp() {} + + void FindNodeRsp::apply(DHT* dh_table) + { + dh_table->response(this); + } + + void FindNodeRsp::print() + { + Out(SYS_DHT|LOG_DEBUG) << QString("RSP: %1 %2 : find_node") + .arg(mtid).arg(id.toString()) << endl; + } + + void FindNodeRsp::encode(QByteArray & arr) + { + BEncoder enc(new BEncoderBufferOutput(arr)); + enc.beginDict(); + { + enc.write(RSP); enc.beginDict(); + { + enc.write("id"); enc.write(id.getData(),20); + enc.write("nodes"); enc.write(nodes); + } + enc.end(); + enc.write(TID); enc.write(&mtid,1); + enc.write(TYP); enc.write(RSP); + } + enc.end(); + } + + //////////////////////////////// + + GetPeersRsp::GetPeersRsp(Uint8 mtid,const Key & id,const QByteArray & data,const Key & token) + : MsgBase(mtid,dht::GET_PEERS,dht::RSP_MSG,id),token(token),data(data) + { + this->data.detach(); + } + + GetPeersRsp::GetPeersRsp(Uint8 mtid,const Key & id,const DBItemList & values,const Key & token) + : MsgBase(mtid,dht::GET_PEERS,dht::RSP_MSG,id),token(token),items(values) + {} + + GetPeersRsp::~GetPeersRsp() + {} + + void GetPeersRsp::apply(DHT* dh_table) + { + dh_table->response(this); + } + void GetPeersRsp::print() + { + Out() << QString("RSP: %1 %2 : get_peers(%3)") + .arg(mtid).arg(id.toString()).arg(data.size() > 0 ? "nodes" : "values") << endl; + } + + void GetPeersRsp::encode(QByteArray & arr) + { + BEncoder enc(new BEncoderBufferOutput(arr)); + enc.beginDict(); + { + enc.write(RSP); enc.beginDict(); + { + enc.write("id"); enc.write(id.getData(),20); + if (data.size() > 0) + { + enc.write("nodes"); enc.write(data); + enc.write("token"); enc.write(token.getData(),20); + } + else + { + enc.write("token"); enc.write(token.getData(),20); + enc.write("values"); enc.beginList(); + DBItemList::iterator i = items.begin(); + while (i != items.end()) + { + const DBItem & item = *i; + enc.write(item.getData(),6); + i++; + } + enc.end(); + } + } + enc.end(); + enc.write(TID); enc.write(&mtid,1); + enc.write(TYP); enc.write(RSP); + } + enc.end(); + } + + + //////////////////////////////// + //////////////////////////////// + + AnnounceRsp::AnnounceRsp(Uint8 mtid,const Key & id) : MsgBase(mtid,ANNOUNCE_PEER,RSP_MSG,id) + {} + + AnnounceRsp::~AnnounceRsp(){} + + void AnnounceRsp::apply(DHT* dh_table) + { + dh_table->response(this); + } + + void AnnounceRsp::print() + { + Out() << QString("RSP: %1 %2 : announce_peer") + .arg(mtid).arg(id.toString()) << endl; + } + + void AnnounceRsp::encode(QByteArray & arr) + { + BEncoder enc(new BEncoderBufferOutput(arr)); + enc.beginDict(); + { + enc.write(RSP); enc.beginDict(); + { + enc.write("id"); enc.write(id.getData(),20); + } + enc.end(); + enc.write(TID); enc.write(&mtid,1); + enc.write(TYP); enc.write(RSP); + } + enc.end(); + } + + + //////////////////////////////// + + ErrMsg::ErrMsg(Uint8 mtid,const Key & id,const QString & msg) + : MsgBase(mtid,NONE,ERR_MSG,id),msg(msg) + {} + + ErrMsg::~ErrMsg() + {} + + void ErrMsg::apply(DHT* dh_table) + { + dh_table->error(this); + } + + void ErrMsg::print() + { + Out(SYS_DHT|LOG_NOTICE) << "ERR: " << mtid << " " << msg << endl; + } + + void ErrMsg::encode(QByteArray & ) + {} +} diff --git a/libktorrent/kademlia/rpcmsg.h b/libktorrent/kademlia/rpcmsg.h new file mode 100644 index 0000000..4863ae2 --- /dev/null +++ b/libktorrent/kademlia/rpcmsg.h @@ -0,0 +1,269 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTRPCMSG_H +#define DHTRPCMSG_H + +#include <ksocketaddress.h> +#include <util/constants.h> +#include "key.h" +#include "database.h" + +namespace bt +{ + class BDictNode; +} + +using bt::Uint8; +using bt::Uint32; + +namespace dht +{ + class DHT; + class RPCServer; + + enum Type + { + REQ_MSG, + RSP_MSG, + ERR_MSG, + INVALID + }; + + enum Method + { + PING, + FIND_NODE, + GET_PEERS, + ANNOUNCE_PEER, + NONE + }; + + + + /** + * Base class for all RPC messages. + */ + class MsgBase + { + public: + MsgBase(Uint8 mtid,Method m,Type type,const Key & id); + virtual ~MsgBase(); + + + /** + * When this message arrives this function will be called upon the DHT. + * The message should then call the appropriate DHT function (double dispatch) + * @param dh_table Pointer to DHT + */ + virtual void apply(DHT* dh_table) = 0; + + /** + * Print the message for debugging purposes. + */ + virtual void print() = 0; + + /** + * BEncode the message. + * @param arr Data array + */ + virtual void encode(QByteArray & arr) = 0; + + /// Set the origin (i.e. where the message came from) + void setOrigin(const KNetwork::KSocketAddress & o) {origin = o;} + + /// Get the origin + const KNetwork::KInetSocketAddress & getOrigin() const {return origin;} + + /// Set the origin (i.e. where the message came from) + void setDestination(const KNetwork::KSocketAddress & o) {origin = o;} + + /// Get the origin + const KNetwork::KInetSocketAddress & getDestination() const {return origin;} + + /// Get the MTID + Uint8 getMTID() const {return mtid;} + + /// Set the MTID + void setMTID(Uint8 m) {mtid = m;} + + /// Get the id of the sender + const Key & getID() const {return id;} + + /// Get the type of the message + Type getType() const {return type;} + + /// Get the message it's method + Method getMethod() const {return method;} + + protected: + Uint8 mtid; + Method method; + Type type; + Key id; + KNetwork::KInetSocketAddress origin; + }; + + /** + * Creates a message out of a BDictNode. + * @param dict The BDictNode + * @param srv The RPCServer + * @return A newly created message or 0 upon error + */ + MsgBase* MakeRPCMsg(bt::BDictNode* dict,RPCServer* srv); + + MsgBase* MakeRPCMsgTest(bt::BDictNode* dict,dht::Method req_method); + + class ErrMsg : public MsgBase + { + public: + ErrMsg(Uint8 mtid,const Key & id,const QString & msg); + virtual ~ErrMsg(); + + virtual void apply(DHT* dh_table); + virtual void print(); + virtual void encode(QByteArray & arr); + private: + QString msg; + }; + + class PingReq : public MsgBase + { + public: + PingReq(const Key & id); + virtual ~PingReq(); + + virtual void apply(DHT* dh_table); + virtual void print(); + virtual void encode(QByteArray & arr); + }; + + class FindNodeReq : public MsgBase + { + public: + FindNodeReq(const Key & id,const Key & target); + virtual ~FindNodeReq(); + + virtual void apply(DHT* dh_table); + virtual void print(); + virtual void encode(QByteArray & arr); + + const Key & getTarget() const {return target;} + + private: + Key target; + }; + + class GetPeersReq : public MsgBase + { + public: + GetPeersReq(const Key & id,const Key & info_hash); + virtual ~GetPeersReq(); + + const Key & getInfoHash() const {return info_hash;} + virtual void apply(DHT* dh_table); + virtual void print(); + virtual void encode(QByteArray & arr); + protected: + Key info_hash; + }; + + class AnnounceReq : public GetPeersReq + { + public: + AnnounceReq(const Key & id,const Key & info_hash,bt::Uint16 port,const Key & token); + virtual ~AnnounceReq(); + + virtual void apply(DHT* dh_table); + virtual void print(); + virtual void encode(QByteArray & arr); + + const Key & getToken() const {return token;} + bt::Uint16 getPort() const {return port;} + private: + bt::Uint16 port; + Key token; + }; + + class PingRsp : public MsgBase + { + public: + PingRsp(Uint8 mtid,const Key & id); + virtual ~PingRsp(); + + virtual void apply(DHT* dh_table); + virtual void print(); + virtual void encode(QByteArray & arr); + }; + + + + class FindNodeRsp : public MsgBase + { + public: + FindNodeRsp(Uint8 mtid,const Key & id,const QByteArray & nodes); + virtual ~FindNodeRsp(); + + virtual void apply(DHT* dh_table); + virtual void print(); + virtual void encode(QByteArray & arr); + + const QByteArray & getNodes() const {return nodes;} + protected: + QByteArray nodes; + }; + + class GetPeersRsp : public MsgBase + { + public: + GetPeersRsp(Uint8 mtid,const Key & id,const QByteArray & data,const Key & token); + GetPeersRsp(Uint8 mtid,const Key & id,const DBItemList & values,const Key & token); + virtual ~GetPeersRsp(); + + virtual void apply(DHT* dh_table); + virtual void print(); + virtual void encode(QByteArray & arr); + + const QByteArray & getData() const {return data;} + const DBItemList & getItemList() const {return items;} + const Key & getToken() const {return token;} + bool containsNodes() const {return data.size() > 0;} + bool containsValues() const {return data.size() == 0;} + private: + Key token; + QByteArray data; + DBItemList items; + }; + + + class AnnounceRsp : public MsgBase + { + public: + AnnounceRsp(Uint8 mtid,const Key & id); + virtual ~AnnounceRsp(); + + virtual void apply(DHT* dh_table); + virtual void print(); + virtual void encode(QByteArray & arr); + }; + + +} + +#endif diff --git a/libktorrent/kademlia/rpcserver.cpp b/libktorrent/kademlia/rpcserver.cpp new file mode 100644 index 0000000..1242dae --- /dev/null +++ b/libktorrent/kademlia/rpcserver.cpp @@ -0,0 +1,243 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <unistd.h> +#include <string.h> +#include <net/portlist.h> +#include <util/log.h> +#include <util/error.h> +#include <torrent/globals.h> +#include <torrent/bnode.h> +#include <torrent/bdecoder.h> +#include <torrent/bencoder.h> +#include <ksocketdevice.h> +#include "rpcserver.h" +#include "rpccall.h" +#include "rpcmsg.h" +#include "kbucket.h" +#include "node.h" +#include "dht.h" + +using namespace KNetwork; +using namespace bt; + +namespace dht +{ + + + + RPCServer::RPCServer(DHT* dh_table,Uint16 port,QObject *parent) : QObject(parent),dh_table(dh_table),next_mtid(0),port(port) + { + sock = new KDatagramSocket(this); + sock->setBlocking(false); + sock->setAddressReuseable(true); + } + + + RPCServer::~RPCServer() + { + bt::Globals::instance().getPortList().removePort(port,net::UDP); + sock->close(); + calls.setAutoDelete(true); + calls.clear(); + call_queue.setAutoDelete(true); + call_queue.clear(); + } + + void RPCServer::start() + { + sock->setBlocking(true); + if (!sock->bind(QString::null,QString::number(port))) + { + Out(SYS_DHT|LOG_IMPORTANT) << "DHT: Failed to bind to UDP port " << port << " for DHT" << endl; + } + else + { + bt::Globals::instance().getPortList().addNewPort(port,net::UDP,true); + } + sock->setBlocking(false); + connect(sock,SIGNAL(readyRead()),this,SLOT(readPacket())); + } + + void RPCServer::stop() + { + bt::Globals::instance().getPortList().removePort(port,net::UDP); + sock->close(); + } + + static void PrintRawData(const QByteArray & data) + { + QString tmp; + for (Uint32 i = 0;i < data.size();i++) + { + char c = QChar(data[i]).latin1(); + if (!QChar(data[i]).isPrint() || c == 0) + tmp += '#'; + else + tmp += c; + } + + Out(SYS_DHT|LOG_DEBUG) << tmp << endl; + } + + void RPCServer::readPacket() + { + if (sock->bytesAvailable() == 0) + { + Out(SYS_DHT|LOG_NOTICE) << "0 byte UDP packet " << endl; + // KDatagramSocket wrongly handles UDP packets with no payload + // so we need to deal with it oursleves + int fd = sock->socketDevice()->socket(); + char tmp; + read(fd,&tmp,1); + return; + } + + KDatagramPacket pck = sock->receive(); + /* + Out() << "RPCServer::readPacket" << endl; + PrintRawData(pck.data()); + */ + BNode* n = 0; + try + { + // read and decode the packet + BDecoder bdec(pck.data(),false); + n = bdec.decode(); + + if (!n || n->getType() != BNode::DICT) + { + delete n; + return; + } + + // try to make a RPCMsg of it + MsgBase* msg = MakeRPCMsg((BDictNode*)n,this); + if (msg) + { + msg->setOrigin(pck.address()); + msg->apply(dh_table); + // erase an existing call + if (msg->getType() == RSP_MSG && calls.contains(msg->getMTID())) + { + // delete the call, but first notify it off the response + RPCCall* c = calls.find(msg->getMTID()); + c->response(msg); + calls.erase(msg->getMTID()); + c->deleteLater(); + doQueuedCalls(); + } + delete msg; + } + } + catch (bt::Error & err) + { + Out(SYS_DHT|LOG_IMPORTANT) << "Error happened during parsing : " << err.toString() << endl; + } + delete n; + + if (sock->bytesAvailable() > 0) + readPacket(); + } + + + void RPCServer::send(const KNetwork::KSocketAddress & addr,const QByteArray & msg) + { + sock->send(KNetwork::KDatagramPacket(msg,addr)); + } + + RPCCall* RPCServer::doCall(MsgBase* msg) + { + Uint8 start = next_mtid; + while (calls.contains(next_mtid)) + { + next_mtid++; + if (next_mtid == start) // if this happens we cannot do any calls + { + // so queue the call + RPCCall* c = new RPCCall(this,msg,true); + call_queue.append(c); + Out(SYS_DHT|LOG_NOTICE) << "Queueing RPC call, no slots available at the moment" << endl; + return c; + } + } + + msg->setMTID(next_mtid++); + sendMsg(msg); + RPCCall* c = new RPCCall(this,msg,false); + calls.insert(msg->getMTID(),c); + return c; + } + + void RPCServer::sendMsg(MsgBase* msg) + { + QByteArray data; + msg->encode(data); + send(msg->getDestination(),data); + + // PrintRawData(data); + } + + void RPCServer::timedOut(Uint8 mtid) + { + // delete the call + RPCCall* c = calls.find(mtid); + if (c) + { + dh_table->timeout(c->getRequest()); + calls.erase(mtid); + c->deleteLater(); + } + doQueuedCalls(); + } + + void RPCServer::doQueuedCalls() + { + while (call_queue.count() > 0 && calls.count() < 256) + { + RPCCall* c = call_queue.first(); + call_queue.removeFirst(); + + while (calls.contains(next_mtid)) + next_mtid++; + + MsgBase* msg = c->getRequest(); + msg->setMTID(next_mtid++); + sendMsg(msg); + calls.insert(msg->getMTID(),c); + c->start(); + } + } + + const RPCCall* RPCServer::findCall(Uint8 mtid) const + { + return calls.find(mtid); + } + + void RPCServer::ping(const dht::Key & our_id,const KNetwork::KSocketAddress & addr) + { + Out(SYS_DHT|LOG_NOTICE) << "DHT: pinging " << addr.nodeName() << endl; + PingReq* pr = new PingReq(our_id); + pr->setOrigin(addr); + doCall(pr); + } + + +} +#include "rpcserver.moc" diff --git a/libktorrent/kademlia/rpcserver.h b/libktorrent/kademlia/rpcserver.h new file mode 100644 index 0000000..4e54076 --- /dev/null +++ b/libktorrent/kademlia/rpcserver.h @@ -0,0 +1,122 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTRPCSERVER_H +#define DHTRPCSERVER_H + +#include <qptrlist.h> +#include <kdatagramsocket.h> +#include <util/constants.h> +#include <util/array.h> +#include <util/ptrmap.h> + + +using KNetwork::KDatagramSocket; +using bt::Uint32; +using bt::Uint16; +using bt::Uint8; + +namespace bt +{ + class BDictNode; +} + +namespace dht +{ + class Key; + class KBucketEntry; + class RPCCall; + class RPCMsg; + class Node; + class DHT; + class MsgBase; + + /** + * @author Joris Guisson + * + * Class to handle incoming and outgoing RPC messages. + */ + class RPCServer : public QObject + { + Q_OBJECT + public: + RPCServer(DHT* dh_table,Uint16 port,QObject *parent = 0); + virtual ~RPCServer(); + + /// Start the server + void start(); + + /// Stop the server + void stop(); + + /** + * Do a RPC call. + * @param msg The message to send + * @return The call object + */ + RPCCall* doCall(MsgBase* msg); + + /** + * Send a message, this only sends the message, it does not keep any call + * information. This should be used for replies. + * @param msg The message to send + */ + void sendMsg(MsgBase* msg); + + + /** + * A call was timed out. + * @param mtid mtid of call + */ + void timedOut(Uint8 mtid); + + /** + * Ping a node, we don't care about the MTID. + * @param addr The address + */ + void ping(const dht::Key & our_id,const KNetwork::KSocketAddress & addr); + + /** + * Find a RPC call, based on the mtid + * @param mtid The mtid + * @return The call + */ + const RPCCall* findCall(Uint8 mtid) const; + + /// Get the number of active calls + Uint32 getNumActiveRPCCalls() const {return calls.count();} + private slots: + void readPacket(); + + private: + void send(const KNetwork::KSocketAddress & addr,const QByteArray & msg); + void doQueuedCalls(); + + private: + KDatagramSocket* sock; + DHT* dh_table; + bt::PtrMap<bt::Uint8,RPCCall> calls; + QPtrList<RPCCall> call_queue; + bt::Uint8 next_mtid; + bt::Uint16 port; + }; + +} + +#endif diff --git a/libktorrent/kademlia/task.cpp b/libktorrent/kademlia/task.cpp new file mode 100644 index 0000000..877a698 --- /dev/null +++ b/libktorrent/kademlia/task.cpp @@ -0,0 +1,134 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <kresolver.h> +#include "task.h" +#include "kclosestnodessearch.h" +#include "rpcserver.h" +#include "kbucket.h" + +using namespace KNetwork; + +namespace dht +{ + + Task::Task(RPCServer* rpc,Node* node) + : node(node),rpc(rpc),outstanding_reqs(0),task_finished(false),queued(queued) + { + + } + + + Task::~Task() + { + } + + void Task::start(const KClosestNodesSearch & kns,bool queued) + { + // fill the todo list + for (KClosestNodesSearch::CItr i = kns.begin(); i != kns.end();i++) + todo.append(i->second); + this->queued = queued; + if (!queued) + update(); + } + + void Task::start() + { + if (queued) + { + queued = false; + update(); + } + } + + + void Task::onResponse(RPCCall* c, MsgBase* rsp) + { + if (outstanding_reqs > 0) + outstanding_reqs--; + + if (!isFinished()) + { + callFinished(c,rsp); + + if (canDoRequest() && !isFinished()) + update(); + } + } + + void Task::onTimeout(RPCCall* c) + { + if (outstanding_reqs > 0) + outstanding_reqs--; + + if (!isFinished()) + { + callTimeout(c); + + if (canDoRequest() && !isFinished()) + update(); + } + } + + bool Task::rpcCall(MsgBase* req) + { + if (!canDoRequest()) + return false; + + RPCCall* c = rpc->doCall(req); + c->addListener(this); + outstanding_reqs++; + return true; + } + + void Task::done() + { + task_finished = true; + finished(this); + } + + void Task::emitDataReady() + { + dataReady(this); + } + + void Task::kill() + { + task_finished = true; + finished(this); + } + + void Task::addDHTNode(const QString & ip,bt::Uint16 port) + { + KResolver::resolveAsync(this,SLOT(onResolverResults(KResolverResults )), + ip,QString::number(port)); + } + + void Task::onResolverResults(KResolverResults res) + { + if (res.count() == 0) + return; + + todo.append(KBucketEntry(res.front().address(),dht::Key())); + } + +} + +#include "task.moc" diff --git a/libktorrent/kademlia/task.h b/libktorrent/kademlia/task.h new file mode 100644 index 0000000..5a33ac0 --- /dev/null +++ b/libktorrent/kademlia/task.h @@ -0,0 +1,174 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTTASK_H +#define DHTTASK_H + +#include <qvaluelist.h> +#include "rpccall.h" +//#include "kbucket.h" + +namespace KNetwork +{ + class KResolverResults; +} + +namespace dht +{ + class Node; + class Task; + class KClosestNodesSearch; + class KBucketEntry; + + const Uint32 MAX_CONCURRENT_REQS = 16; + + using KNetwork::KResolverResults; + + /** + * @author Joris Guisson <[email protected]> + * + * Performs a task on K nodes provided by a KClosestNodesSearch. + * This is a base class for all tasks. + */ + class Task : public RPCCallListener + { + Q_OBJECT + public: + /** + * Create a task. + * @param rpc The RPC server to do RPC calls + * @param node The node + */ + Task(RPCServer* rpc,Node* node); + virtual ~Task(); + + /** + * This will copy the results from the KClosestNodesSearch + * object into the todo list. And call update if the task is not queued. + * @param kns The KClosestNodesSearch object + * @param queued Is the task queued + */ + void start(const KClosestNodesSearch & kns,bool queued); + + + /** + * Start the task, to be used when a task is queued. + */ + void start(); + + /// Decrements the outstanding_reqs + virtual void onResponse(RPCCall* c, MsgBase* rsp); + + /// Decrements the outstanding_reqs + virtual void onTimeout(RPCCall* c); + + /** + * Will continue the task, this will be called every time we have + * rpc slots available for this task. Should be implemented by derived classes. + */ + virtual void update() = 0; + + /** + * A call is finished and a response was received. + * @param c The call + * @param rsp The response + */ + virtual void callFinished(RPCCall* c, MsgBase* rsp) = 0; + + /** + * A call timedout + * @param c The call + */ + virtual void callTimeout(RPCCall* c) = 0; + + /** + * Do a call to the rpc server, increments the outstanding_reqs variable. + * @param req THe request to send + * @return true if call was made, false if not + */ + bool rpcCall(MsgBase* req); + + /// See if we can do a request + bool canDoRequest() const {return outstanding_reqs < MAX_CONCURRENT_REQS;} + + /// Is the task finished + bool isFinished() const {return task_finished;} + + /// Set the task ID + void setTaskID(bt::Uint32 tid) {task_id = tid;} + + /// Get the task ID + bt::Uint32 getTaskID() const {return task_id;} + + /// Get the number of outstanding requests + bt::Uint32 getNumOutstandingRequests() const {return outstanding_reqs;} + + bool isQueued() const {return queued;} + + /** + * Tell listeners data is ready. + */ + void emitDataReady(); + + /// Kills the task + void kill(); + + /** + * Add a node to the todo list + * @param ip The ip or hostname of the node + * @param port The port + */ + void addDHTNode(const QString & ip,bt::Uint16 port); + + signals: + /** + * The task is finsihed. + * @param t The Task + */ + void finished(Task* t); + + /** + * Called by the task when data is ready. + * Can be overrided if wanted. + * @param t The Task + */ + void dataReady(Task* t); + + protected: + void done(); + + protected slots: + void onResolverResults(KResolverResults res); + + protected: + QValueList<KBucketEntry> visited; // nodes visited + QValueList<KBucketEntry> todo; // nodes todo + Node* node; + + private: + RPCServer* rpc; + bt::Uint32 outstanding_reqs; + bt::Uint32 task_id; + bool task_finished; + bool queued; + }; + +} + +#endif diff --git a/libktorrent/kademlia/taskmanager.cpp b/libktorrent/kademlia/taskmanager.cpp new file mode 100644 index 0000000..f71fc0d --- /dev/null +++ b/libktorrent/kademlia/taskmanager.cpp @@ -0,0 +1,79 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#include <util/log.h> +#include <torrent/globals.h> +#include "taskmanager.h" +#include "nodelookup.h" +#include "dht.h" + +using namespace bt; + +namespace dht +{ + typedef bt::PtrMap<Uint32,Task>::iterator TaskItr; + + TaskManager::TaskManager() : next_id(0) + { + tasks.setAutoDelete(true); + } + + + TaskManager::~TaskManager() + { + queued.setAutoDelete(true); + tasks.clear(); + } + + + void TaskManager::addTask(Task* task) + { + Uint32 id = next_id++; + task->setTaskID(id); + if (task->isQueued()) + queued.append(task); + else + tasks.insert(id,task); + } + + void TaskManager::removeFinishedTasks(const DHT* dh_table) + { + QValueList<Uint32> rm; + for (TaskItr i = tasks.begin();i != tasks.end();i++) + { + if (i->second->isFinished()) + rm.append(i->first); + } + + for (QValueList<Uint32>::iterator i = rm.begin();i != rm.end();i++) + { + tasks.erase(*i); + } + + while (dh_table->canStartTask() && queued.count() > 0) + { + Task* t = queued.first(); + queued.removeFirst(); + Out(SYS_DHT|LOG_NOTICE) << "DHT: starting queued task" << endl; + t->start(); + tasks.insert(t->getTaskID(),t); + } + } + +} diff --git a/libktorrent/kademlia/taskmanager.h b/libktorrent/kademlia/taskmanager.h new file mode 100644 index 0000000..3df52b6 --- /dev/null +++ b/libktorrent/kademlia/taskmanager.h @@ -0,0 +1,69 @@ +/*************************************************************************** + * Copyright (C) 2005 by Joris Guisson * + * [email protected] * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * + ***************************************************************************/ +#ifndef DHTTASKMANAGER_H +#define DHTTASKMANAGER_H + +#include <qptrlist.h> +#include <util/ptrmap.h> +#include <util/constants.h> +#include "task.h" + +namespace dht +{ + class DHT; + + /** + * @author Joris Guisson <[email protected]> + * + * Manages all dht tasks. + */ + class TaskManager + { + public: + TaskManager(); + virtual ~TaskManager(); + + /** + * Add a task to manage. + * @param task + */ + void addTask(Task* task); + + /** + * Remove all finished tasks. + * @param dh_table Needed to ask permission to start a task + */ + void removeFinishedTasks(const DHT* dh_table); + + /// Get the number of running tasks + bt::Uint32 getNumTasks() const {return tasks.count();} + + /// Get the number of queued tasks + bt::Uint32 getNumQueuedTasks() const {return queued.count();} + + private: + bt::PtrMap<Uint32,Task> tasks; + QPtrList<Task> queued; + bt::Uint32 next_id; + }; + +} + +#endif |