// @(#)root/net:$Name:  $:$Id: TPSocket.cxx,v 1.6 2001/02/12 14:31:17 rdm Exp $
// Author: Fons Rademakers   22/1/2001

/*************************************************************************
 * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TPSocket                                                             //
//                                                                      //
// This class implements parallel client sockets. A parallel socket is  //
// an endpoint for communication between two machines. It is parallel   //
// because several TSockets are open at the same time to the same       //
// destination. This especially speeds up communication over Big Fat    //
// Pipes (i.e. high bandwidth, high latency WAN connections).           //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#include "TPSocket.h"
#include "TServerSocket.h"
#include "TMonitor.h"
#include "TSystem.h"
#include "TMessage.h"
#include "Bytes.h"
#include "TROOT.h"
#include "TError.h"


ClassImp(TPSocket)

//______________________________________________________________________________
 TPSocket::TPSocket(TInetAddress addr, const char *service, Int_t size,
                   Int_t tcpwindowsize) : TSocket(addr, service)
{
   // Create a parallel socket. Connect to the named service at address addr.
   // Use tcpwindowsize to specify the size of the receive buffer, it has
   // to be specified here to make sure the window scale option is set (for
   // tcpwindowsize > 65KB and for platforms supporting window scaling).
   // Returns when connection has been accepted by remote side. Use IsValid()
   // to check the validity of the socket. Every socket is added to the TROOT
   // sockets list which will make sure that any open sockets are properly
   // closed on program termination.

   fSize = size;
   Init(tcpwindowsize);
}

//______________________________________________________________________________
 TPSocket::TPSocket(TInetAddress addr, Int_t port, Int_t size,
                   Int_t tcpwindowsize) : TSocket(addr, port)
{
   // Create a parallel socket. Connect to the specified port # at address addr.
   // Use tcpwindowsize to specify the size of the receive buffer, it has
   // to be specified here to make sure the window scale option is set (for
   // tcpwindowsize > 65KB and for platforms supporting window scaling).
   // Returns when connection has been accepted by remote side. Use IsValid()
   // to check the validity of the socket. Every socket is added to the TROOT
   // sockets list which will make sure that any open sockets are properly
   // closed on program termination.

   fSize = size;
   Init(tcpwindowsize);
}

//______________________________________________________________________________
 TPSocket::TPSocket(const char *host, const char *service, Int_t size,
                   Int_t tcpwindowsize) : TSocket(host, service)
{
   // Create a parallel socket. Connect to named service on the remote host.
   // Use tcpwindowsize to specify the size of the receive buffer, it has
   // to be specified here to make sure the window scale option is set (for
   // tcpwindowsize > 65KB and for platforms supporting window scaling).
   // Returns when connection has been accepted by remote side. Use IsValid()
   // to check the validity of the socket. Every socket is added to the TROOT
   // sockets list which will make sure that any open sockets are properly
   // closed on program termination.

   fSize = size;
   Init(tcpwindowsize);
}

//______________________________________________________________________________
 TPSocket::TPSocket(const char *host, Int_t port, Int_t size,
                   Int_t tcpwindowsize) : TSocket(host, port)
{
   // Create a parallel socket. Connect to specified port # on the remote host.
   // Use tcpwindowsize to specify the size of the receive buffer, it has
   // to be specified here to make sure the window scale option is set (for
   // tcpwindowsize > 65KB and for platforms supporting window scaling).
   // Returns when connection has been accepted by remote side. Use IsValid()
   // to check the validity of the socket. Every socket is added to the TROOT
   // sockets list which will make sure that any open sockets are properly
   // closed on program termination.

   fSize = size;
   Init(tcpwindowsize);
}

//______________________________________________________________________________
 TPSocket::TPSocket(TSocket *pSockets[], Int_t size)
{
   // Create a parallel socket. This ctor is called by TPServerSocket.

   fSockets = pSockets;
   fSize    = size;

   // set socket options (no blocking and no delay)
   SetOption(kNoDelay, 1);
   SetOption(kNoBlock, 1);

   fWriteMonitor   = new TMonitor;
   fReadMonitor    = new TMonitor;
   fWriteBytesLeft = new Int_t[fSize];
   fReadBytesLeft  = new Int_t[fSize];
   fWritePtr       = new char*[fSize];
   fReadPtr        = new char*[fSize];

   for (int i = 0; i < fSize; i++) {
      fWriteMonitor->Add(fSockets[i], TMonitor::kWrite);
      fReadMonitor->Add(fSockets[i], TMonitor::kRead);
   }
   fWriteMonitor->DeActivateAll();
   fReadMonitor->DeActivateAll();

   SetName(fSockets[0]->GetName());
   SetTitle(fSockets[0]->GetTitle());
   fAddress = fSockets[0]->GetInetAddress();

   gROOT->GetListOfSockets()->Add(this);
}

//______________________________________________________________________________
 TPSocket::~TPSocket()
{
   // Cleanup the parallel socket.

   Close();

   delete fWriteMonitor;
   delete fReadMonitor;
   delete [] fWriteBytesLeft;
   delete [] fReadBytesLeft;
   delete [] fWritePtr;
   delete [] fReadPtr;
}

//______________________________________________________________________________
 void TPSocket::Close(Option_t *option)
{
   // Close a parallel socket. If option is "force", calls shutdown(id,2) to
   // shut down the connection. This will close the connection also
   // for the parent of this process. Also called via the dtor (without
   // option "force", call explicitely Close("force") if this is desired).

   for (int i = 0; i < fSize; i++) {
      fSockets[i]->Close(option);
      delete fSockets[i];
   }
   delete [] fSockets;
   fSockets = 0;

   gROOT->GetListOfSockets()->Remove(this);
}

//______________________________________________________________________________
 void TPSocket::Init(Int_t tcpwindowsize)
{
   // Create a parallel socket to the specified host.

   fSockets        = 0;
   fWriteMonitor   = 0;
   fReadMonitor    = 0;
   fWriteBytesLeft = 0;
   fReadBytesLeft  = 0;
   fWritePtr       = 0;
   fReadPtr        = 0;

   if (!TSocket::IsValid())
      return;

   // create server that will be used to accept the parallel sockets from
   // the remote host, use port=0 to scan for a free port
   TServerSocket ss(0, kFALSE, fSize, tcpwindowsize);

   // send the local port number of the just created server socket and the
   // number of desired parallel sockets
   TSocket::Send(ss.GetLocalPort(), fSize);

   fSockets = new TSocket*[fSize];

   // establish fSize parallel socket connections between client and server
   int i;
   for (i = 0; i < fSize; i++) {
      fSockets[i] = ss.Accept();
      gROOT->GetListOfSockets()->Remove(fSockets[i]);
   }

   // set socket options (no blocking and no delay)
   SetOption(kNoDelay, 1);
   SetOption(kNoBlock, 1);

   fWriteMonitor   = new TMonitor;
   fReadMonitor    = new TMonitor;
   fWriteBytesLeft = new Int_t[fSize];
   fReadBytesLeft  = new Int_t[fSize];
   fWritePtr       = new char*[fSize];
   fReadPtr        = new char*[fSize];

   for (i = 0; i < fSize; i++) {
      fWriteMonitor->Add(fSockets[i], TMonitor::kWrite);
      fReadMonitor->Add(fSockets[i], TMonitor::kRead);
   }
   fWriteMonitor->DeActivateAll();
   fReadMonitor->DeActivateAll();

   TSocket::Close();

   gROOT->GetListOfSockets()->Add(this);
}

//______________________________________________________________________________
 TInetAddress TPSocket::GetLocalInetAddress()
{
   // Return internet address of local host to which the socket is bound.
   // In case of error TInetAddress::IsValid() returns kFALSE.

   if (IsValid()) {
      if (fLocalAddress.GetPort() == -1)
         fLocalAddress = gSystem->GetSockName(fSockets[0]->GetDescriptor());
      return fLocalAddress;
   }
   return TInetAddress();
}

//______________________________________________________________________________
 Int_t TPSocket::Send(const TMessage &mess)
{
   // Send a TMessage object. Returns the number of bytes in the TMessage
   // that were sent and -1 in case of error. In case the TMessage::What
   // has been or'ed with kMESS_ACK, the call will only return after having
   // received an acknowledgement, making the sending process synchronous.
   // Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.

   if (!fSockets)
      return TSocket::Send(mess);  // only the case when called via Init()

   if (mess.IsReading()) {
      Error("Send", "cannot send a message used for reading");
      return -1;
   }

   mess.SetLength();   //write length in first word of buffer

   Int_t nsent, ulen = (Int_t) sizeof(UInt_t);
   // send length
   if ((nsent = SendRaw(mess.Buffer(), ulen, kDefault)) <= 0)
      return nsent;

   // send buffer (this might go in parallel)
   if ((nsent = SendRaw(mess.Buffer()+ulen, mess.Length()-ulen, kDefault)) <= 0)
      return nsent;

   // If acknowledgement is desired, wait for it
   if (mess.What() & kMESS_ACK) {
      char buf[2];
      if (RecvRaw(buf, sizeof(buf), kDefault) < 0)
         return -1;
      if (strncmp(buf, "ok", 2)) {
         Error("Send", "bad acknowledgement");
         return -1;
      }
   }

   return nsent;  //length - length header
}

//______________________________________________________________________________
 Int_t TPSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
{
   // Send a raw buffer of specified length. Returns the number of bytes
   // send and -1 in case of error.

   if (!fSockets) return -1;

   // if data buffer size < 4K use only one socket
   Int_t i, nsocks = fSize, len = length;
   if (len < 4096)
      nsocks = 1;

   ESendRecvOptions sendopt = kDontBlock;
   if (nsocks == 1)
      sendopt = kDefault;

   if (opt != kDefault) {
      nsocks  = 1;
      sendopt = opt;
   }

   if (nsocks == 1)
      fSockets[0]->SetOption(kNoBlock, 0);
   else
      fSockets[0]->SetOption(kNoBlock, 1);

   // setup pointer appropriately for transferring data equally on the
   // parallel sockets
   for (i = 0; i < nsocks; i++) {
      fWriteBytesLeft[i] = len/nsocks;
      fWritePtr[i] = (char *)buffer + (i*fWriteBytesLeft[i]);
      fWriteMonitor->Activate(fSockets[i]);
   }
   fWriteBytesLeft[nsocks-1] += len%nsocks;

   // send the data on the parallel sockets
   while (len > 0) {
      TSocket *s = fWriteMonitor->Select();
      for (int is = 0; is < nsocks; is++) {
         if (s == fSockets[is]) {
            if (fWriteBytesLeft[is] > 0) {
               Int_t nsent;
again:
               if ((nsent = fSockets[is]->SendRaw(fWritePtr[is],
                                                  fWriteBytesLeft[is],
                                                  sendopt)) <= 0) {
                  if (nsent == -4) {
                     // got EAGAIN/EWOULDBLOCK error, keep trying...
                     goto again;
                  }
                  fWriteMonitor->DeActivateAll();
                  return -1;
               }
               if (opt == kDontBlock) {
                  fWriteMonitor->DeActivateAll();
                  return nsent;
               }
               fWriteBytesLeft[is] -= nsent;
               fWritePtr[is] += nsent;
               len -= nsent;
            }
         }
      }
   }
   fWriteMonitor->DeActivateAll();

   return length;
}

//______________________________________________________________________________
 Int_t TPSocket::Recv(TMessage *&mess)
{
   // Receive a TMessage object. The user must delete the TMessage object.
   // Returns length of message in bytes (can be 0 if other side of connection
   // is closed) or -1 in case of error or -4 in case a non-blocking socket would
   // block (i.e. there is nothing to be read). In those case mess == 0.

   if (!fSockets) {
      mess = 0;
      return -1;
   }

   Int_t  n;
   UInt_t len;
   if ((n = RecvRaw(&len, sizeof(UInt_t), kDefault)) <= 0) {
      mess = 0;
      return n;
   }
   len = net2host(len);  //from network to host byte order

   char *buf = new char[len+sizeof(UInt_t)];
   if ((n = RecvRaw(buf+sizeof(UInt_t), len, kDefault)) <= 0) {
      delete [] buf;
      mess = 0;
      return n;
   }

   mess = new TMessage(buf, len+sizeof(UInt_t));

   if (mess->What() & kMESS_ACK) {
      char ok[2] = { 'o', 'k' };
      if (SendRaw(ok, sizeof(ok), kDefault) < 0) {
         delete mess;
         mess = 0;
         return -1;
      }
      mess->SetWhat(mess->What() & ~kMESS_ACK);
   }

   return n;
}

//______________________________________________________________________________
 Int_t TPSocket::RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt)
{
   // Send a raw buffer of specified length. Returns the number of bytes
   // sent or -1 in case of error.

   if (!fSockets) return -1;

   // if data buffer size < 4K use only one socket
   Int_t i, nsocks = fSize, len = length;
   if (len < 4096)
      nsocks = 1;

   ESendRecvOptions recvopt = kDontBlock;
   if (nsocks == 1)
      recvopt = kDefault;

   if (opt != kDefault) {
      nsocks  = 1;
      recvopt = opt;
   }

   if (nsocks == 1)
      fSockets[0]->SetOption(kNoBlock, 0);
   else
      fSockets[0]->SetOption(kNoBlock, 1);

   // setup pointer appropriately for transferring data equally on the
   // parallel sockets
   for (i = 0; i < nsocks; i++) {
      fReadBytesLeft[i] = len/nsocks;
      fReadPtr[i] = (char *)buffer + (i*fReadBytesLeft[i]);
      fReadMonitor->Activate(fSockets[i]);
   }
   fReadBytesLeft[nsocks-1] += len%nsocks;

   // start receiving data on all sockets. Receive data as and when
   // they are available on a socket by by using select.
   // Exit the loop as soon as all data has been received.
   while (len > 0) {
      TSocket *s = fReadMonitor->Select();
      for (int is = 0; is < nsocks; is++) {
         if (s == fSockets[is]) {
            if (fReadBytesLeft[is] > 0) {
               Int_t nrecv;
               if ((nrecv = fSockets[is]->RecvRaw(fReadPtr[is],
                                                  fReadBytesLeft[is],
                                                  recvopt)) <= 0) {
                  fReadMonitor->DeActivateAll();
                  return -1;
               }
               if (opt == kDontBlock) {
                  fReadMonitor->DeActivateAll();
                  return nrecv;
               }
               fReadBytesLeft[is] -= nrecv;
               fReadPtr[is] += nrecv;
               len -= nrecv;
            }
         }
      }
   }
   fReadMonitor->DeActivateAll();

   return length;
}

//______________________________________________________________________________
 Int_t TPSocket::SetOption(ESockOptions opt, Int_t val)
{
   // Set socket options.

   Int_t ret = 0;
   for (int i = 0; i < fSize; i++)
      ret = fSockets[i]->SetOption(opt, val);
   return ret;
}

//______________________________________________________________________________
 Int_t TPSocket::GetOption(ESockOptions opt, Int_t &val)
{
   // Get socket options. Returns -1 in case of error.

   Int_t ret = 0;
   for (int i = 0; i < fSize; i++)
      ret = fSockets[i]->GetOption(opt, val);
   return ret;
}

//______________________________________________________________________________
 Int_t TPSocket::GetErrorCode() const
{
   // Returns error code. Meaning depends on context where it is called.
   // If no error condition returns 0 else a value < 0.

   return fSockets[0] ? fSockets[0]->GetErrorCode() : 0;
}


ROOT page - Class index - Top of the page

This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.