/*
    This file is part of the TDE games library
    Copyright (C) 2001 Burkhard Lehner (Burkhard.Lehner@gmx.de)

    This library is free software; you can redistribute it and/or
    modify it under the terms of the GNU Library General Public
    License version 2 as published by the Free Software Foundation.

    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    Library General Public License for more details.

    You should have received a copy of the GNU Library General Public License
    along with this library; see the file COPYING.LIB.  If not, write to
    the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
    Boston, MA 02110-1301, USA.
*/

/*
     KMessageIO class and subclasses KMessageSocket and KMessageDirect
*/

#include "kmessageio.h"
#include <tqsocket.h>
#include <kdebug.h>
#include <tdeprocess.h>
#include <tqfile.h>

// ----------------------- KMessageIO -------------------------

KMessageIO::KMessageIO (TQObject *parent, const char *name)
  : TQObject (parent, name), m_id (0)
{}

KMessageIO::~KMessageIO ()
{}

void KMessageIO::setId (TQ_UINT32 id)
{
  m_id = id;
}

TQ_UINT32 KMessageIO::id ()
{
  return m_id;
}

// ----------------------KMessageSocket -----------------------

KMessageSocket::KMessageSocket (TQString host, TQ_UINT16 port, TQObject *parent, 
const char *name)
  : KMessageIO (parent, name)
{
  mSocket = new TQSocket ();
  mSocket->connectToHost (host, port);
  initSocket ();
}

KMessageSocket::KMessageSocket (TQHostAddress host, TQ_UINT16 port, TQObject 
*parent, const char *name)
  : KMessageIO (parent, name)
{
  mSocket = new TQSocket ();
  mSocket->connectToHost (host.toString(), port);
  initSocket ();
}

KMessageSocket::KMessageSocket (TQSocket *socket, TQObject *parent, const char 
*name)
  : KMessageIO (parent, name)
{
  mSocket = socket;
  initSocket ();
}

KMessageSocket::KMessageSocket (int socketFD, TQObject *parent, const char 
*name)
  : KMessageIO (parent, name)
{
  mSocket = new TQSocket ();
  mSocket->setSocket (socketFD);
  initSocket ();
}

KMessageSocket::~KMessageSocket ()
{
  delete mSocket;
}

bool KMessageSocket::isConnected () const
{
  return mSocket->state() == TQSocket::Connection;
}

void KMessageSocket::send (const TQByteArray &msg)
{
  TQDataStream str (mSocket);
  str << TQ_UINT8 ('M');  // magic number for begin of message
  str.writeBytes (msg.data(), msg.size());  // writes the length (as TQ_UINT32) and the data
}

void KMessageSocket::processNewData ()
{
  if (isRecursive)
    return;
  isRecursive = true;

  TQDataStream str (mSocket);
  while (mSocket->bytesAvailable() > 0)
  {
    if (mAwaitingHeader)
    {
      // Header = magic number + packet length = 5 bytes
      if (mSocket->bytesAvailable() < 5)
      {
        isRecursive = false;
        return;
      }

      // Read the magic number first. If something unexpected is found,
      // start over again, ignoring the data that was read up to then.

      TQ_UINT8 v;
      str >> v;
      if (v != 'M')
      {
        kdWarning(11001) << k_funcinfo << ": Received unexpected data, magic number wrong!" << endl;
        continue;
      }

      str >> mNextBlockLength;
      mAwaitingHeader = false;
    }
    else
    {
      // Data not completely read => wait for more
      if (mSocket->bytesAvailable() < (TQ_ULONG) mNextBlockLength)
      {
        isRecursive = false;
        return;
      }

      TQByteArray msg (mNextBlockLength);
      str.readRawBytes (msg.data(), mNextBlockLength);

      // send the received message
      emit received (msg);

      // Waiting for the header of the next message
      mAwaitingHeader = true;
    }
  }

  isRecursive = false;
}

void KMessageSocket::initSocket ()
{
  connect (mSocket, TQ_SIGNAL (error(int)), TQ_SIGNAL (connectionBroken()));
  connect (mSocket, TQ_SIGNAL (connectionClosed()), TQ_SIGNAL (connectionBroken()));
  connect (mSocket, TQ_SIGNAL (readyRead()), TQ_SLOT (processNewData()));
  mAwaitingHeader = true;
  mNextBlockLength = 0;
  isRecursive = false;
}

TQ_UINT16 KMessageSocket::peerPort () const
{
  return mSocket->peerPort();
}

TQString KMessageSocket::peerName () const
{
  return mSocket->peerName();
}

// ----------------------KMessageDirect -----------------------

KMessageDirect::KMessageDirect (KMessageDirect *partner, TQObject *parent, 
const char *name)
  : KMessageIO (parent, name), mPartner (0)
{
  // 0 as first parameter leaves the object unconnected
  if (!partner)
    return;

  // Check if the other object is already connected
  if (partner && partner->mPartner)
  {
    kdWarning(11001) << k_funcinfo << ": Object is already connected!" << endl;
    return;
  }

  // Connect from us to that object
  mPartner = partner;

  // Connect the other object to us
  partner->mPartner = this;
}

KMessageDirect::~KMessageDirect ()
{
  if (mPartner)
  {
    mPartner->mPartner = 0;
    emit mPartner->connectionBroken();
  }
}

bool KMessageDirect::isConnected () const
{
  return mPartner != 0;
}

void KMessageDirect::send (const TQByteArray &msg)
{
  if (mPartner)
    emit mPartner->received (msg);
  else
    kdError(11001) << k_funcinfo << ": Not yet connected!" << endl;
}


// ----------------------- KMessageProcess ---------------------------

KMessageProcess::~KMessageProcess()
{
  kdDebug(11001) << "@@@KMessageProcess::Delete process" << endl;
  if (mProcess)
  {
    mProcess->kill();
    delete mProcess;
    mProcess=0;
    // Remove not send buffers
    mQueue.setAutoDelete(true);
    mQueue.clear();
    // Maybe todo: delete mSendBuffer
  }
}
KMessageProcess::KMessageProcess(TQObject *parent, TQString file) : KMessageIO(parent,0)
{
  // Start process
  kdDebug(11001) << "@@@KMessageProcess::Start process" << endl;
  mProcessName=file;
  mProcess=new TDEProcess;
  int id=0;
  *mProcess << mProcessName << TQString("%1").arg(id);
  kdDebug(11001) << "@@@KMessageProcess::Init:Id= " << id << endl;
  kdDebug(11001) << "@@@KMessgeProcess::Init:Processname: " << mProcessName << endl;
  connect(mProcess, TQ_SIGNAL(receivedStdout(TDEProcess *, char *, int )),
                        this, TQ_SLOT(slotReceivedStdout(TDEProcess *, char * , int )));
  connect(mProcess, TQ_SIGNAL(receivedStderr(TDEProcess *, char *, int )),
                        this, TQ_SLOT(slotReceivedStderr(TDEProcess *, char * , int )));
  connect(mProcess, TQ_SIGNAL(processExited(TDEProcess *)),
                        this, TQ_SLOT(slotProcessExited(TDEProcess *)));
  connect(mProcess, TQ_SIGNAL(wroteStdin(TDEProcess *)),
                        this, TQ_SLOT(slotWroteStdin(TDEProcess *)));
  mProcess->start(TDEProcess::NotifyOnExit,TDEProcess::All);
  mSendBuffer=0;
  mReceiveCount=0;
  mReceiveBuffer.resize(1024);
}
bool KMessageProcess::isConnected() const
{
  kdDebug(11001) << "@@@KMessageProcess::Is conencted" << endl;
  if (!mProcess) return false;
  return mProcess->isRunning();
}
void KMessageProcess::send(const TQByteArray &msg)
{
  kdDebug(11001) << "@@@KMessageProcess:: SEND("<<msg.size()<<") to process" << endl;
  unsigned int size=msg.size()+2*sizeof(long);

  char *tmpbuffer=new char[size];
  long *p1=(long *)tmpbuffer;
  long *p2=p1+1;
  kdDebug(11001)  << "p1="<<p1 << "p2="<< p2 << endl;
  memcpy(tmpbuffer+2*sizeof(long),msg.data(),msg.size());
  *p1=0x4242aeae;
  *p2=size;
  
  TQByteArray *buffer=new TQByteArray();
  buffer->assign(tmpbuffer,size);
  // buffer->duplicate(msg);
  mQueue.enqueue(buffer);
  writeToProcess(); 
}
void KMessageProcess::writeToProcess()
{
  // Previous send ok and item in queue
  if (mSendBuffer || mQueue.isEmpty()) return ;
  mSendBuffer=mQueue.dequeue();
  if (!mSendBuffer) return ;

  // write it out to the process
  //  kdDebug(11001) << " @@@@@@ writeToProcess::SEND to process " << mSendBuffer->size() << " BYTE " << endl;
  //  char *p=mSendBuffer->data();
  //  for (int i=0;i<16;i++) printf("%02x ",(unsigned char)(*(p+i)));printf("\n");
  mProcess->writeStdin(mSendBuffer->data(),mSendBuffer->size());

}
void KMessageProcess::slotWroteStdin(TDEProcess * )
{
  kdDebug(11001) << k_funcinfo << endl;
  if (mSendBuffer)
  {
    delete mSendBuffer;
    mSendBuffer=0;
  }
  writeToProcess();
}

void KMessageProcess::slotReceivedStderr(TDEProcess * proc, char *buffer, int buflen)
{
  int pid=0;
  int len;
  char *p;
  char *pos;
//  kdDebug(11001)<<"############# Got stderr " << buflen << " bytes" << endl;

  if (!buffer || buflen==0) return ; 
  if (proc) pid=proc->pid();


  pos=buffer;
  do
  {
    p=(char *)memchr(pos,'\n',buflen);
    if (!p) len=buflen;
    else len=p-pos;

    TQByteArray a;
    a.setRawData(pos,len);
    TQString s(a);
    kdDebug(11001) << "PID" <<pid<< ":" << s << endl;
    a.resetRawData(pos,len);
    if (p) pos=p+1;
    buflen-=len+1;
  }while(buflen>0);
}


void KMessageProcess::slotReceivedStdout(TDEProcess * , char *buffer, int buflen)
{
  kdDebug(11001) << "$$$$$$ " << k_funcinfo << ": Received " << buflen << " bytes over inter process communication" << endl;

  // TODO Make a plausibility check on buflen to avoid memory overflow
  while (mReceiveCount+buflen>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
  memcpy(mReceiveBuffer.data()+mReceiveCount,buffer,buflen);
  mReceiveCount+=buflen;

  // Possbile message
  while (mReceiveCount>2*sizeof(long))
  {
    long *p1=(long *)mReceiveBuffer.data();
    long *p2=p1+1;
    unsigned int len;
    if (*p1!=0x4242aeae)
    {
      kdDebug(11001) << k_funcinfo << ": Cookie error...transmission failure...serious problem..." << endl;
//      for (int i=0;i<mReceiveCount;i++) fprintf(stderr,"%02x ",mReceiveBuffer[i]);fprintf(stderr,"\n");
    }
    len=(int)(*p2);
    if (len<2*sizeof(long))
    {
      kdDebug(11001) << k_funcinfo << ": Message size error" << endl;
      break;
    }
    if (len<=mReceiveCount)
    {
      kdDebug(11001) << k_funcinfo << ": Got message with len " << len << endl;

      TQByteArray msg;
    //  msg.setRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
      msg.duplicate(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
      emit received(msg);
     // msg.resetRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
      // Shift buffer
      if (len<mReceiveCount)
      {
        memmove(mReceiveBuffer.data(),mReceiveBuffer.data()+len,mReceiveCount-len);
      }
      mReceiveCount-=len;
    }
    else break;
  }
}

void KMessageProcess::slotProcessExited(TDEProcess * /*p*/)
{
  kdDebug(11001) << "Process exited (slot)" << endl;
  emit connectionBroken();
  delete mProcess;
  mProcess=0;
}


// ----------------------- KMessageFilePipe ---------------------------
KMessageFilePipe::KMessageFilePipe(TQObject *parent,TQFile *readfile,TQFile *writefile) : KMessageIO(parent,0)
{
  mReadFile=readfile;
  mWriteFile=writefile;
  mReceiveCount=0;
  mReceiveBuffer.resize(1024);
}

KMessageFilePipe::~KMessageFilePipe()
{
}

bool KMessageFilePipe::isConnected () const
{
  return (mReadFile!=0)&&(mWriteFile!=0);
}

void KMessageFilePipe::send(const TQByteArray &msg)
{
  unsigned int size=msg.size()+2*sizeof(long);

  char *tmpbuffer=new char[size];
  long *p1=(long *)tmpbuffer;
  long *p2=p1+1;
  memcpy(tmpbuffer+2*sizeof(long),msg.data(),msg.size());
  *p1=0x4242aeae;
  *p2=size;
  
  TQByteArray buffer;
  buffer.assign(tmpbuffer,size);
  mWriteFile->writeBlock(buffer);
  mWriteFile->flush();
  /*
  fprintf(stderr,"+++ KMessageFilePipe:: SEND(%d to parent) realsize=%d\n",msg.size(),buffer.size());
  for (int i=0;i<buffer.size();i++) fprintf(stderr,"%02x ",buffer[i]);fprintf(stderr,"\n");
  fflush(stderr);
  */
}

void KMessageFilePipe::exec()
{

  // According to BL: Blocking read is ok
  // while(mReadFile->atEnd()) { usleep(100); }

   int ch=mReadFile->getch();

   while (mReceiveCount>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
   mReceiveBuffer[mReceiveCount]=(char)ch;
   mReceiveCount++;

   // Change for message 
   if (mReceiveCount>=2*sizeof(long))
   {
     long *p1=(long *)mReceiveBuffer.data();
     long *p2=p1+1;
     unsigned int len;
     if (*p1!=0x4242aeae)
     {
       fprintf(stderr,"KMessageFilePipe::exec:: Cookie error...transmission failure...serious problem...\n");
//       for (int i=0;i<16;i++) fprintf(stderr,"%02x ",mReceiveBuffer[i]);fprintf(stderr,"\n");
     }
     len=(int)(*p2);
     if (len==mReceiveCount)
     {
       //fprintf(stderr,"KMessageFilePipe::exec:: Got Message with len %d\n",len);

       TQByteArray msg;
       //msg.setRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
       msg.duplicate(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
       emit received(msg);
       //msg.resetRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
       mReceiveCount=0;
     }
   }
   

   return ;

  
}

#include "kmessageio.moc"