summaryrefslogtreecommitdiffstats
path: root/libktorrent/mse/streamsocket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'libktorrent/mse/streamsocket.cpp')
-rw-r--r--libktorrent/mse/streamsocket.cpp326
1 files changed, 326 insertions, 0 deletions
diff --git a/libktorrent/mse/streamsocket.cpp b/libktorrent/mse/streamsocket.cpp
new file mode 100644
index 0000000..19a0a2e
--- /dev/null
+++ b/libktorrent/mse/streamsocket.cpp
@@ -0,0 +1,326 @@
+/***************************************************************************
+ * Copyright (C) 2005 by Joris Guisson *
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) any later version. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the *
+ * Free Software Foundation, Inc., *
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ***************************************************************************/
+#include <errno.h>
+#include <qsocket.h>
+#include <qsocketdevice.h>
+#include <util/sha1hash.h>
+#include <util/log.h>
+#include <torrent/peer.h>
+#include <torrent/globals.h>
+#include <torrent/authenticatebase.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/in_systm.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+#include <net/socketmonitor.h>
+#include "streamsocket.h"
+#include "rc4encryptor.h"
+
+using namespace bt;
+using namespace net;
+
+namespace mse
+{
+
+ Uint8 StreamSocket::tos = IPTOS_THROUGHPUT;
+ Uint32 StreamSocket::num_connecting = 0;
+ Uint32 StreamSocket::max_connecting = 50;
+
+ StreamSocket::StreamSocket() : sock(0),enc(0),monitored(false)
+ {
+ sock = new BufferedSocket(true);
+ sock->setNonBlocking();
+ reinserted_data = 0;
+ reinserted_data_size = 0;
+ reinserted_data_read = 0;
+
+ }
+
+ StreamSocket::StreamSocket(int fd) : sock(0),enc(0),monitored(false)
+ {
+ sock = new BufferedSocket(fd);
+ sock->setNonBlocking();
+ reinserted_data = 0;
+ reinserted_data_size = 0;
+ reinserted_data_read = 0;
+ sock->setTOS(tos);
+ }
+
+ StreamSocket::~StreamSocket()
+ {
+ // make sure the number of connecting sockets is updated
+ if (connecting() && num_connecting > 0)
+ num_connecting--;
+
+ SocketMonitor::instance().remove(sock);
+ delete [] reinserted_data;
+ delete enc;
+ delete sock;
+ }
+
+ void StreamSocket::startMonitoring(net::SocketReader* rdr,net::SocketWriter* wrt)
+ {
+ this->rdr = rdr;
+ this->wrt = wrt;
+ sock->setReader(this);
+ sock->setWriter(this);
+ SocketMonitor::instance().add(sock);
+ monitored = true;
+ if (reinserted_data)
+ {
+ if (enc)
+ enc->decrypt(reinserted_data + reinserted_data_read,
+ reinserted_data_size - reinserted_data_read);
+
+ rdr->onDataReady(reinserted_data + reinserted_data_read,
+ reinserted_data_size - reinserted_data_read);
+ delete [] reinserted_data;
+ reinserted_data = 0;
+ reinserted_data_size = 0;
+ }
+ }
+
+
+ Uint32 StreamSocket::sendData(const Uint8* data,Uint32 len)
+ {
+ if (enc)
+ {
+ // we need to make sure all data is sent because of the encryption
+ Uint32 ds = 0;
+ const Uint8* ed = enc->encrypt(data,len);
+ while (sock->ok() && ds < len)
+ {
+ Uint32 ret = sock->send(ed + ds,len - ds);
+ ds += ret;
+ if (ret == 0)
+ {
+ Out(SYS_CON|LOG_DEBUG) << "ret = 0" << endl;
+ }
+ }
+ if (ds != len)
+ Out() << "ds != len" << endl;
+ return ds;
+ }
+ else
+ {
+ Uint32 ret = sock->send(data,len);
+ if (ret != len)
+ Out() << "ret != len" << endl;
+ return ret;
+ }
+ }
+
+ Uint32 StreamSocket::readData(Uint8* buf,Uint32 len)
+ {
+ Uint32 ret2 = 0;
+ if (reinserted_data)
+ {
+ Uint32 tr = reinserted_data_size - reinserted_data_read;
+ if (tr < len)
+ {
+ memcpy(buf,reinserted_data + reinserted_data_read,tr);
+ delete [] reinserted_data;
+ reinserted_data = 0;
+ reinserted_data_size = reinserted_data_read = 0;
+ ret2 = tr;
+ if (enc)
+ enc->decrypt(buf,tr);
+ }
+ else
+ {
+ tr = len;
+ memcpy(buf,reinserted_data + reinserted_data_read,tr);
+ reinserted_data_read += tr;
+ if (enc)
+ enc->decrypt(buf,tr);
+ return tr;
+ }
+ }
+
+ if (len == ret2)
+ return ret2;
+
+ Uint32 ret = sock->recv(buf + ret2,len - ret2);
+ if (ret + ret2 > 0 && enc)
+ enc->decrypt(buf,ret + ret2);
+
+ return ret;
+ }
+
+ Uint32 StreamSocket::bytesAvailable() const
+ {
+ Uint32 ba = sock->bytesAvailable();
+ if (reinserted_data_size - reinserted_data_read > 0)
+ return ba + (reinserted_data_size - reinserted_data_read);
+ else
+ return ba;
+ }
+
+ void StreamSocket::close()
+ {
+ sock->close();
+ }
+
+ bool StreamSocket::connectTo(const QString & ip,Uint16 port)
+ {
+ // do a safety check
+ if (ip.isNull() || ip.length() == 0)
+ return false;
+
+ // we don't wanna block the current thread so set non blocking
+ sock->setNonBlocking();
+ if (sock->connectTo(Address(ip,port)))
+ {
+ sock->setTOS(tos);
+ return true;
+ }
+ else if (connecting())
+ {
+ num_connecting++;
+ }
+
+ return false;
+ }
+
+ void StreamSocket::initCrypt(const bt::SHA1Hash & dkey,const bt::SHA1Hash & ekey)
+ {
+ if (enc)
+ delete enc;
+
+ enc = new RC4Encryptor(dkey,ekey);
+ }
+
+ void StreamSocket::disableCrypt()
+ {
+ delete enc;
+ enc = 0;
+ }
+
+ bool StreamSocket::ok() const
+ {
+ return sock->ok();
+ }
+
+ QString StreamSocket::getRemoteIPAddress() const
+ {
+ return sock->getPeerName().toString();
+ }
+
+ bt::Uint16 StreamSocket::getRemotePort() const
+ {
+ return sock->getPeerName().port();
+ }
+
+ net::Address StreamSocket::getRemoteAddress() const
+ {
+ return sock->getPeerName();
+ }
+
+ void StreamSocket::setRC4Encryptor(RC4Encryptor* e)
+ {
+ if (enc)
+ delete enc;
+
+ enc = e;
+ }
+
+ void StreamSocket::reinsert(const Uint8* d,Uint32 size)
+ {
+// Out() << "Reinsert : " << size << endl;
+ Uint32 off = 0;
+ if (reinserted_data)
+ {
+ off = reinserted_data_size;
+ reinserted_data = (Uint8*)realloc(reinserted_data,reinserted_data_size + size);
+ reinserted_data_size += size;
+ }
+ else
+ {
+ reinserted_data = new Uint8[size];
+ reinserted_data_size = size;
+ }
+ memcpy(reinserted_data + off,d,size);
+ }
+
+ bool StreamSocket::connecting() const
+ {
+ return sock->state() == net::Socket::CONNECTING;
+ }
+
+ void StreamSocket::onDataReady(Uint8* buf,Uint32 size)
+ {
+ if (enc)
+ enc->decrypt(buf,size);
+
+ if (rdr)
+ rdr->onDataReady(buf,size);
+ }
+
+ Uint32 StreamSocket::onReadyToWrite(Uint8* data,Uint32 max_to_write)
+ {
+ if (!wrt)
+ return 0;
+
+ Uint32 ret = wrt->onReadyToWrite(data,max_to_write);
+ if (enc && ret > 0) // do encryption if necessary
+ enc->encryptReplace(data,ret);
+
+
+ return ret;
+ }
+
+ bool StreamSocket::hasBytesToWrite() const
+ {
+ return wrt ? wrt->hasBytesToWrite() : false;
+ }
+
+ float StreamSocket::getDownloadRate() const
+ {
+ return sock ? sock->getDownloadRate() : 0.0f;
+ }
+
+ float StreamSocket::getUploadRate() const
+ {
+ return sock ? sock->getUploadRate() : 0.0f;
+ }
+
+ bool StreamSocket::connectSuccesFull() const
+ {
+ bool ret = sock->connectSuccesFull();
+ if (ret)
+ sock->setTOS(tos);
+
+ if (num_connecting > 0)
+ num_connecting--;
+
+ return ret;
+ }
+
+ void StreamSocket::setGroupIDs(Uint32 up,Uint32 down)
+ {
+ sock->setGroupID(up,true);
+ sock->setGroupID(down,false);
+ }
+}
+
+#include "streamsocket.moc"