khanat-opennel-code/code/nel/samples/net/udp/bench_service.cpp

781 lines
18 KiB
C++

// NeL - MMORPG Framework <http://dev.ryzom.com/projects/nel/>
// Copyright (C) 2010 Winch Gate Property Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
// Includes
//
#include "nel/misc/types_nl.h"
#include <string>
#include <map>
#include <time.h>
#ifdef NL_OS_WINDOWS
# include <direct.h>
# define mkdir _mkdir
#else
# include <sys/stat.h>
# define mkdir(a) mkdir(a,0755)
#endif
#include "nel/misc/debug.h"
#include "nel/misc/mem_stream.h"
#include "nel/misc/path.h"
#include "nel/net/service.h"
#include "nel/net/udp_sock.h"
#include "receive_task.h"
#ifdef NL_OS_WINDOWS
# define NOMINMAX
# include <windows.h>
#endif // NL_OS_WINDOWS
#ifndef UDP_DIR
#define UDP_DIR ""
#endif // UDP_DIR
//
// Namespaces
//
using namespace std;
using namespace NLMISC;
using namespace NLNET;
//
// Structures
//
struct CClient
{
CClient (TSockId from, uint32 session, const string &cn) : From(from), Session(session), NextPingNumber(0), LastPongReceived(0), ConnectionName(cn),
BlockNumber(0), FullMeanPongTime(0), FullNbPong(0), NbPing(0), NbPong(0), MeanPongTime(0), NbDuplicated(0), FirstWrite(true)
{ PongReceived.resize (1001); }
CInetAddress Address; // udp address
TSockId From; // used to find the TCP connection
uint32 Session; // used to find the link between UDP and TCP connection at startup
vector<pair<uint8, uint16> > PongReceived; // contains the number of pong receive for each message number and the time
uint32 NextPingNumber, LastPongReceived;
string ConnectionName;
// this number is increase each time we filled the PongReceived array, the goal is to avoid a very old packet to use as a new one
uint32 BlockNumber;
uint32 FullMeanPongTime, FullNbPong;
// used for stat, reset every stat update
uint32 NbPing, NbPong, MeanPongTime, NbDuplicated;
// true if the client just connect and we don't log stat
bool FirstWrite;
void updatePong (sint64 pingTime, sint64 pongTime, uint32 pongNumber, uint32 blockNumber);
void updateStat ();
void updateFullStat ();
};
struct TInetAddressHash
{
static const size_t bucket_size = 4;
static const size_t min_buckets = 8;
inline bool operator() (const NLNET::CInetAddress &x1, const NLNET::CInetAddress &x2) const
{
return x1 == x2;
}
/// Hash function
inline size_t operator() ( const NLNET::CInetAddress& x ) const
{
return x.port();
//return x.internalIPAddress();
}
};
//
// Types
//
typedef CHashMap<NLNET::CInetAddress,CClient*,TInetAddressHash> TClientMap;
#define GETCLIENTA(it) (*it).second
//
// Variables
//
// must be increase at each version and must be the same value as the client
uint32 Version = 2;
string StatPathName = "stats/";
uint16 UDPPort = 45455;
uint16 TCPPort = 45456;
uint32 MaxUDPPacketSize = 1000;
CBufFIFO Queue1, Queue2;
CBufFIFO *CurrentReadQueue = NULL;
TReceivedMessage *CurrentInMsg = NULL;
IThread *ReceiveThread = NULL;
CReceiveTask *ReceiveTask = NULL;
list<CClient> Clients; // contains all clients
TClientMap ClientMap; // contains a quick access to the client using the udp address
// TCP server for clients
CCallbackServer *CallbackServer = NULL;
//
// Functions
//
string getDate()
{
struct tm *newtime;
time_t long_time;
time( &long_time );
newtime = localtime( &long_time );
if (newtime)
{
string res = toString("%02d", newtime->tm_year-100) + "_";
res += toString("%02d", newtime->tm_mon+1) + "_";
res += toString("%02d", newtime->tm_mday);
return res;
}
return "bad date "+toString( (uint32)long_time);
}
//
// Callbacks
//
void cbInit (CMessage &msgin, TSockId from, CCallbackNetBase &netbase)
{
uint64 session = (uint64)(uintptr_t) from;
string connectionName;
msgin.serial (connectionName);
try
{
uint32 version;
msgin.serial (version);
if (version != Version)
{
// bad client version, disconnect it
CallbackServer->disconnect (from);
return;
}
}
catch (const Exception &)
{
// bad client version, disconnect it
CallbackServer->disconnect (from);
return;
}
CMessage msgout ("INIT");
msgout.serial (session);
CallbackServer->send (msgout, from);
Clients.push_back(CClient(from, (uint32)session, connectionName));
nlinfo ("Added client TCP %s, session %x", from->asString().c_str(), session);
}
void cbDisconnect (TSockId from, void *arg)
{
for (list<CClient>::iterator it = Clients.begin(); it != Clients.end(); it++)
{
if ((*it).From == from)
{
// clear struct
(*it).updateFullStat();
nlinfo( "Removing client %s", (*it).Address.asString().c_str() );
ClientMap.erase ((*it).Address);
Clients.erase (it);
return;
}
}
}
//
// Callback Array
//
TCallbackItem CallbackArray[] =
{
{ "INIT", cbInit },
};
void CClient::updatePong (sint64 pingTime, sint64 pongTime, uint32 pongNumber, uint32 blockNumber)
{
// it means that it s a too old packet, discard it
if (blockNumber != BlockNumber)
return;
// add the pong in the array to detect lost, duplication
if (pongNumber >= PongReceived.size())
{
// if the array is too big, we flush actual data and restart all
updateFullStat ();
return;
}
PongReceived[pongNumber].first++;
if (PongReceived[pongNumber].first > 1)
{
NbDuplicated++;
}
else
{
// increase only for new pong
NbPong++;
MeanPongTime += (uint32)(pongTime-pingTime);
FullNbPong++;
FullMeanPongTime += (uint32)(pongTime-pingTime);
PongReceived[pongNumber].second = (uint16)(pongTime - pingTime);
}
if (pongNumber > LastPongReceived)
LastPongReceived = pongNumber;
// write each pong in a file
string ha = Address.hostName();
if (ha.empty())
{
ha = Address.ipAddress();
}
string fn = StatPathName + ConnectionName + "_" + ha + "_" + getDate() + ".pong";
FILE *fp = fopen (fn.c_str(), "rt");
if (fp == NULL)
{
// new file, add the header
FILE *fp = fopen (fn.c_str(), "wt");
if (fp != NULL)
{
fprintf (fp, "#%s\t%s\t%s\t%s\n", "PingTime", "PongTime", "Delta", "PingNumber");
fclose (fp);
}
}
else
{
fclose (fp);
}
fp = fopen (fn.c_str(), "at");
if (fp == NULL)
{
nlwarning ("Can't open pong file name '%s'", fn.c_str());
}
else
{
fprintf (fp, "%"NL_I64"d\t%"NL_I64"d\t%"NL_I64"d\t%d\n", pongTime, pingTime, (pongTime-pingTime), pongNumber);
fclose (fp);
}
}
void CClient::updateFullStat ()
{
uint32 NbLost = 0, NbDup = 0, NbPong = 0;
/* if (Address.hostName().empty())
{
// don't log because we receive no pong at all
return;
}*/
for (uint i = 0; i < LastPongReceived; i++)
{
if (PongReceived[i].first == 0) NbLost++;
else
{
NbPong++;
NbDup += PongReceived[i].first - 1;
}
}
{
// write each pong in a file
string ha = Address.hostName();
if (ha.empty())
{
ha = Address.ipAddress();
}
string fn = StatPathName + ConnectionName + "_" + ha + "_" + getDate() + ".stat";
string line = "Full Summary: ";
line += "NbPing " + toString(LastPongReceived) + " ";
line += "NbPong " + toString(NbPong) + " ";
line += "NbLost " + toString(NbLost) + " ";
if (LastPongReceived>0) line += "(" + toString((float)NbLost/LastPongReceived*100.0f) + "pc) ";
line += "NbDuplicated " + toString(NbDup) + " ";
if (LastPongReceived>0) line += "(" + toString((float)NbDup/LastPongReceived*100.0f) + "pc) ";
if (FullNbPong == 0)
line += "MeanPongTime <Undef> ";
else
line += "MeanPongTime " + toString(FullMeanPongTime/FullNbPong) + " ";
FILE *fp = fopen (fn.c_str(), "at");
if (fp == NULL)
{
nlwarning ("Can't open stat file name '%s'", fn.c_str());
}
else
{
fprintf (fp, "%s\n", line.c_str());
fclose (fp);
// send the full sumary to the client
CMessage msgout("INFO");
msgout.serial(line);
CallbackServer->send (msgout, From);
}
nlinfo (line.c_str());
}
{
// write each ping in a file
string ha = Address.hostName();
if (ha.empty())
{
ha = Address.ipAddress();
}
string fn = StatPathName + ConnectionName + "_" + ha + "_" + getDate() + ".ping";
FILE *fp = fopen (fn.c_str(), "rt");
if (fp == NULL)
{
// new file, add the header
FILE *fp = fopen (fn.c_str(), "wt");
if (fp != NULL)
{
fprintf (fp, "#%s\t%s\n", "NbPongRcv", "Delta");
fclose (fp);
}
}
else
{
fclose (fp);
}
fp = fopen (fn.c_str(), "at");
if (fp == NULL)
{
nlwarning ("Can't open ping file name '%s'", fn.c_str());
}
else
{
// add a fake value to know that it s a different session
fprintf (fp, "-1\t0\n");
for (uint i = 0; i < LastPongReceived; i++)
{
fprintf (fp, "%d\t%d\n", PongReceived[i].first, PongReceived[i].second);
}
fclose (fp);
}
}
// clear all structures
PongReceived.clear ();
PongReceived.resize (1001);
BlockNumber++;
NextPingNumber = LastPongReceived = 0;
FullMeanPongTime = FullNbPong = 0;
// NbPing = NbPong = MeanPongTime = NbDuplicated = 0;
}
void CClient::updateStat ()
{
// write each pong in a file
string ha = Address.hostName();
if (ha.empty())
{
ha = Address.ipAddress();
}
string fn = StatPathName + ConnectionName + "_" + ha + "_" + getDate() + ".stat";
string line;
line += "NbPing " + toString(NbPing) + " ";
line += "NbPong " + toString(NbPong) + " ";
if (NbPong == 0)
line += "MeanPongTime <Undef> ";
else
line += "MeanPongTime " + toString(MeanPongTime/NbPong) + " ";
line += "NbDuplicated " + toString(NbDuplicated) + " ";
FILE *fp = fopen (fn.c_str(), "at");
if (fp == NULL)
{
nlwarning ("Can't open stat file name '%s'", fn.c_str());
}
else
{
if (FirstWrite)
{
//nlassert (!Address.hostName().empty())
fprintf (fp, "HostAddress: %s\n", Address.asString().c_str());
FirstWrite = false;
}
fprintf (fp, "%s\n", line.c_str());
fclose (fp);
}
nlinfo (line.c_str());
CMessage msgout("INFO");
msgout.serial(line);
CallbackServer->send (msgout, From);
NbPing = NbPong = MeanPongTime = NbDuplicated = 0;
}
void updateStat ()
{
static sint64 lastUpdate = CTime::getLocalTime ();
if (CTime::getLocalTime() - lastUpdate < 2*1000)
return;
lastUpdate = CTime::getLocalTime();
// update stat only at the linked UDP-TCP connection
for (TClientMap::iterator it = ClientMap.begin (); it != ClientMap.end(); it++)
{
GETCLIENTA(it)->updateStat ();
}
}
//
// Functions
//
void removeClientByAddr( TClientMap::iterator iclient )
{
if ( iclient == ClientMap.end() )
{
// It may have already been removed on purpose
return;
}
for (list<CClient>::iterator it = Clients.begin(); it != Clients.end(); it++)
{
if ((*it).Address == (*iclient).first)
{
(*it).updateFullStat();
nlinfo( "Removing client %s", GETCLIENTA(iclient)->Address.asString().c_str() );
Clients.erase(it);
break;
}
}
ClientMap.erase( iclient );
}
void handleReceivedPong (CClient *client, sint64 pongTime)
{
// Preconditions
nlassert( CurrentInMsg && (! CurrentInMsg->data().empty()) );
// Prepare message to read
CMemStream msgin( true );
uint32 currentsize = CurrentInMsg->userSize();
memcpy (msgin.bufferToFill (currentsize), CurrentInMsg->userDataR(), currentsize);
// Read the header
uint8 mode = 0;
msgin.serial (mode);
if (mode == 0)
{
// init the UDP connection
if (client == NULL)
{
uint32 session = 0;
msgin.serial (session);
// Find a new udp connection, find the linked
list<CClient>::iterator it;
for (it = Clients.begin(); it != Clients.end(); it++)
{
if ((*it).Session == session)
{
client = &(*it);
// Found it, add in the map
client->Address = CurrentInMsg->AddrFrom;
ClientMap.insert (make_pair (client->Address, client));
nlinfo ("TCP-UDP linked TCP is %s, UDP is %s", client->From->asString().c_str(), client->Address.asString().c_str());
// Send a TCP message to the client to say that we can start
CMessage msgout ("START");
CallbackServer->send (msgout, client->From);
break;
}
}
if (it == Clients.end())
{
nlwarning ("Unknown TCP client, discard the UDP message (hacker?)");
return;
}
}
return;
}
else if (mode == 1)
{
if (client == NULL)
{
nlwarning ("Received a UDP packet from an old client (hacker?)");
return;
}
// Read the message
sint64 pingTime = 0;
msgin.serial(pingTime);
uint32 pongNumber = 0;
msgin.serial(pongNumber);
uint32 blockNumber = 0;
msgin.serial(blockNumber);
// nlinfo ("receive a pong from %s pongnb %d %"NL_I64"d", CurrentInMsg->AddrFrom.asString().c_str(), pongNumber, pongTime - pingTime);
client->updatePong (pingTime, pongTime, pongNumber, blockNumber);
}
}
void sendPing ()
{
CMemStream msgout;
for (TClientMap::iterator it = ClientMap.begin (); it != ClientMap.end(); it++)
{
msgout.clear();
sint64 t = CTime::getLocalTime ();
msgout.serial (t);
uint32 p = GETCLIENTA(it)->NextPingNumber;
msgout.serial (p);
uint32 b = GETCLIENTA(it)->BlockNumber;
msgout.serial (b);
uint8 dummy=0;
while (msgout.length() < 200)
msgout.serial (dummy);
uint32 size = msgout.length();
nlassert (size == 200);
try
{
// send the new ping to the client
ReceiveTask->DataSock->sendTo (msgout.buffer(), size, GETCLIENTA(it)->Address);
}
catch (const Exception &e)
{
nlwarning ("Can't send UDP packet to '%s' (%s)", GETCLIENTA(it)->Address.asString().c_str(), e.what());
}
GETCLIENTA(it)->NextPingNumber++;
GETCLIENTA(it)->NbPing++;
}
}
//
// Main Class
//
class CBenchService : public IService
{
public:
void init()
{
nlassert( ReceiveTask==NULL && ReceiveThread==NULL );
// Create stat folder if necessary
if (!CFile::isExists (StatPathName))
{
mkdir (StatPathName.c_str());
}
// Create and start UDP server
nlinfo( "Starting external UDP socket on port %d", UDPPort);
ReceiveTask = new CReceiveTask (UDPPort, MaxUDPPacketSize);
CurrentReadQueue = &Queue2;
ReceiveTask->setWriteQueue( &Queue1 );
nlassert( ReceiveTask != NULL );
ReceiveThread = IThread::create( ReceiveTask );
nlassert( ReceiveThread != NULL );
ReceiveThread->start();
// Setup current message placeholder
CurrentInMsg = new TReceivedMessage();
// Create the TCP server
nlinfo( "Starting external TCP socket on port %d", TCPPort);
CallbackServer = new CCallbackServer;
CallbackServer->addCallbackArray (CallbackArray, sizeof(CallbackArray)/sizeof(CallbackArray[0]));
CallbackServer->init (TCPPort);
CallbackServer->setDisconnectionCallback (cbDisconnect, NULL);
}
bool update ()
{
try
{
// Send ping to every client
sendPing();
// Update and manage TCP connections
CallbackServer->update ();
// Swap queues
if ( CurrentReadQueue == &Queue1 )
{
CurrentReadQueue = &Queue2;
ReceiveTask->setWriteQueue( &Queue1 );
}
else
{
CurrentReadQueue = &Queue1;
ReceiveTask->setWriteQueue( &Queue2 );
}
// Update and manage UDP connections
while ( ! CurrentReadQueue->empty() )
{
sint64 pongTime;
// Get a UDP message
CurrentReadQueue->front( CurrentInMsg->data() );
CurrentReadQueue->pop();
nlassert( ! CurrentReadQueue->empty() );
CurrentReadQueue->front( CurrentInMsg->VAddrFrom );
CurrentReadQueue->pop();
CurrentInMsg->vectorToAddress();
pongTime = CurrentInMsg->getDate ();
// Handle the UDP message
// Retrieve client info or add one
TClientMap::iterator ihm = ClientMap.find( CurrentInMsg->AddrFrom );
if ( ihm == ClientMap.end() )
{
if ( CurrentInMsg->eventType() == TReceivedMessage::User )
{
// Handle message for a new client
handleReceivedPong( NULL, pongTime );
}
else
{
nlinfo( "Not removing already removed client" );
}
}
else
{
// Already existing
if ( CurrentInMsg->eventType() == TReceivedMessage::RemoveClient )
{
// Remove client
removeClientByAddr( ihm );
}
else
{
// Handle message
handleReceivedPong( GETCLIENTA(ihm), pongTime );
}
}
updateStat ();
}
}
catch (const Exception &e)
{
nlerrornoex ("Exception not catched: '%s'", e.what());
}
return true;
}
void release ()
{
nlassert( ReceiveTask != NULL );
nlassert( ReceiveThread != NULL );
ReceiveTask->requireExit();
ReceiveTask->DataSock->close();
ReceiveThread->wait();
if (ReceiveThread != NULL)
{
delete ReceiveThread;
ReceiveThread = NULL;
}
if (ReceiveTask != NULL)
{
delete ReceiveTask;
ReceiveTask = NULL;
}
if (CurrentInMsg != NULL)
{
delete CurrentInMsg;
CurrentInMsg = NULL;
}
if (CallbackServer != NULL)
{
delete CallbackServer;
CallbackServer = NULL;
}
}
};
NLNET_SERVICE_MAIN (CBenchService, "BS", "bench_service", 45459, EmptyCallbackArray, UDP_DIR, "")