// Ryzom - MMORPG Framework
// Copyright (C) 2010 Winch Gate Property Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
#include "stdpch.h"
#include
#include
#include
#include "impulse_encoder.h"
#include "client_host.h"
#include "game_share/action_factory.h"
#include "game_share/tick_event_handler.h"
using namespace CLFECOMMON;
using namespace NLMISC;
TImpulseReceivedCallback CImpulseEncoder::_Callbacks[256];
uint MaxImpulseBitSizes [3];
void cbImpulsionByteSize0Changed( IVariable& var );
void cbImpulsionByteSize1Changed( IVariable& var );
void cbImpulsionByteSize2Changed( IVariable& var );
CVariable ImpulsionByteSize0( "fe", "ImpulsionByteSize0", "Size of impulsion channel 0", 20, 0, true, cbImpulsionByteSize0Changed, true );
CVariable ImpulsionByteSize1( "fe", "ImpulsionByteSize1", "Size of impulsion channel 1", 200, 0, true, cbImpulsionByteSize1Changed, true );
CVariable ImpulsionByteSize2( "fe", "ImpulsionByteSize2", "Size of impulsion channel 0", 200, 0, true, cbImpulsionByteSize2Changed, true );
void cbImpulsionByteSize0Changed( IVariable& var )
{
// -IMPULSE_ACTION_HEADER_BITSIZE not needed, as actions know their own size
// and handle it right
MaxImpulseBitSizes[0] = (ImpulsionByteSize0.get())*8; // - IMPULSE_ACTION_HEADER_BITSIZE;
}
void cbImpulsionByteSize1Changed( IVariable& var )
{
MaxImpulseBitSizes[1] = (ImpulsionByteSize1.get())*8; // - IMPULSE_ACTION_HEADER_BITSIZE;
}
void cbImpulsionByteSize2Changed( IVariable& var )
{
MaxImpulseBitSizes[2] = (ImpulsionByteSize2.get())*8; // - IMPULSE_ACTION_HEADER_BITSIZE;
}
bool verboseImpulsions = false;
//
uint32 CImpulseQueue::send( TActionQueue& sourceQueue, uint32 packet, NLMISC::CBitMemStream &outbox, uint32 &sentActions)
{
bool wasFlushed = false;
{
H_AUTO(ImpulseQueueSend1);
// If the current sending impulse queue is empty (flushed), fill it with actions from the source queue
if ( Queue.empty() )
{
wasFlushed = true;
FirstSent = packet;
uint actSize;
while ( (!sourceQueue.empty()) && // stop if there are no more actions in the source queue
(
// stop if the max size is reached
(TotalBitsInImpulseQueue + (actSize=CActionFactory::getInstance()->size(sourceQueue.front())) < MaxBitSize) ||
// continue if the action can exceed the total size (only one!)
(sourceQueue.front()->AllowExceedingMaxSize && Queue.empty())
)
)
{
// Move action from the source queue to the current sending impulse queue
Queue.push_back( sourceQueue.front() );
//nlinfo("add action '%d' size '%d' in queue %d:%d", sourceQueue.front()->Code, actSize, Level, Channel);
TotalBitsInImpulseQueue += actSize;
sourceQueue.pop_front();
}
}
}
uint i;
bool impulsionFollowBit = true;
uint32 serialised = 0;
{
H_AUTO(ImpulseQueueSend2);
// Send all the current sending impulse queue
for ( i=0; ipack( Queue[i], outbox, false );
#ifdef NL_DEBUG
if ( verboseImpulsions )
nldebug( "Sending effectively impulsion %p (len=%u)", Queue[i], CActionFactory::getInstance()->size(Queue[i]) );
#endif
serialised += (1+CActionFactory::getInstance()->size(Queue[i]));
++sentActions;
}
if ( ! Queue.empty() )
LOG_IMPULSION_DEBUG("FEIMP: %s %d actions (%u bits) to send on Channel %d at Level %d", (wasFlushed) ? "sent new" : "resent", Queue.size(), serialised+1, Channel, Level);
}
{
H_AUTO(ImpulseQueueSend3);
bool impulsionEndBit = false;
outbox.serialBitAndLog(impulsionEndBit);
}
//nlinfo("serialised %d bits", TotalBitsInImpulseQueue);
return serialised+1;
}
//
void CImpulseQueue::flush(uint packet, CClientHost *client, std::vector &impcounts)
{
if (packet < FirstSent)
return;
// LOG_IMPULSION_DEBUG("FEIMP: flush %d actions from Channel %d at Level %d", QueueMark, Channel, Level);
// Call acknowledge callback for each action in the impulse queue, then clear the queue
for ( uint i=0; iGameCycle );
#endif
// call associated callback
if (CImpulseEncoder::_Callbacks[action->Code] != NULL)
CImpulseEncoder::_Callbacks[action->Code] (client, action);
--impcounts[action->Slot];
CActionFactory::getInstance()->remove(action);
}
Queue.clear();
// stat part
countEffectiveSent(TotalBitsInImpulseQueue);
TotalBitsInImpulseQueue = 0;
++FlushTimes;
}
// Get Current Effective Send Rate (based on flush rate) (in bits per second)
uint CImpulseQueue::effectiveSendRate()
{
flushSendRateQueue(CTime::getLocalTime());
uint totalBitSent = 0;
uint i;
for (i=0; iGameCycle = CTickEventHandler::getGameCycle();
#endif
++_QueuedImpulses[action->Slot];
countAddedAction(action, level);
/*
CImpulseQueue *table = NULL;
switch (level)
{
case 0: table = _Level0; break;
case 1: table = _Level1; break;
case 2: table = _Level2; break;
}
if (channel == -1)
{
uint i, min = -1;
channel = 0;
uint nbQueues = (uint)(1<= 0);
nlassertex(channel < (1<Code, level);
}
//
uint32 CImpulseEncoder::send(uint32 packet, NLMISC::CBitMemStream &outbox, uint32 &sentActions)
{
//LOG_IMPULSION_DEBUG("FEIMP: send() packet %d", packet);
uint32 serialised = 0;
sentActions = 0;
// Send one queue for each level
serialised += _Level0[0].send( _MainQueues[0], packet, outbox, sentActions );
serialised += _Level1[packet&1].send( _MainQueues[1], packet, outbox, sentActions );
serialised += _Level2[packet&3].send( _MainQueues[2], packet, outbox, sentActions );
++_TotalPackets;
return serialised;
}
//
void CImpulseEncoder::ack(uint32 packet)
{
//LOG_IMPULSION_DEBUG("FEIMP: ack() packet %d", packet);
_Level0[0].flush(packet, _ClientHost, _QueuedImpulses);
_Level1[packet&1].flush(packet, _ClientHost, _QueuedImpulses);
_Level2[packet&3].flush(packet, _ClientHost, _QueuedImpulses);
}
//
uint CImpulseEncoder::queueSize(uint level) const
{
nlassert(level < 3);
uint channel,
maxChannel = (1<size(queue[i]);
return bitSize;
}
//
void CImpulseEncoder::reset()
{
_MainQueues[0].clear();
_MainQueues[1].clear();
_MainQueues[2].clear();
uint i;
for (i=0; i<1; ++i) _Level0[i].reset();
for (i=0; i<2; ++i) _Level1[i].reset();
for (i=0; i<4; ++i) _Level2[i].reset();
}
//
void CImpulseEncoder::unmarkAll()
{
uint i;
for (i=0; i<1; ++i) _Level0[i].reinit();
for (i=0; i<2; ++i) _Level1[i].reinit();
for (i=0; i<4; ++i) _Level2[i].reinit();
}
//
void CImpulseEncoder::setReceivedCallback(TImpulseReceivedCallback cb, sint actionCode)
{
if (actionCode == -1)
{
uint i;
for (i=0; i<256; ++i)
_Callbacks[i] = cb;
}
else
{
nlassert(actionCode >= 0);
nlassert(actionCode < 256);
_Callbacks[actionCode] = cb;
}
}
//
void CImpulseEncoder::removeEntityReferences(CLFECOMMON::TCLEntityId id)
{
if (hasEntityReferences(id))
{
TActionQueue::iterator it;
for (it=_MainQueues[0].begin(); it<_MainQueues[0].end(); )
if ((*it)->Slot == id)
CLFECOMMON::CActionFactory::getInstance()->remove(*it), it = _MainQueues[0].erase(it);
else
++it;
for (it=_MainQueues[1].begin(); it<_MainQueues[1].end(); )
if ((*it)->Slot == id)
CLFECOMMON::CActionFactory::getInstance()->remove(*it), it = _MainQueues[1].erase(it);
else
++it;
for (it=_MainQueues[2].begin(); it<_MainQueues[2].end(); )
if ((*it)->Slot == id)
CLFECOMMON::CActionFactory::getInstance()->remove(*it), it = _MainQueues[2].erase(it);
else
++it;
uint i;
for (i=0; i<1; ++i) _Level0[i].removeReferences(id);
for (i=0; i<2; ++i) _Level1[i].removeReferences(id);
for (i=0; i<4; ++i) _Level2[i].removeReferences(id);
_QueuedImpulses[id] = 0;
}
}
// Dump stats to XML stream
void CImpulseEncoder::dump(NLMISC::IStream& s)
{
s.xmlPushBegin("ImpulseStat");
s.xmlSetAttrib("id");
uint32 id = _ClientHost->clientId();
s.serial(id);
s.xmlSetAttrib("eid");
CEntityId eid = _ClientHost->eId();
s.serial(eid);
s.xmlSetAttrib("totalPacketSent");
s.serial(_TotalPackets);
s.xmlPushEnd();
s.xmlPushBegin("Level0");
s.xmlSetAttrib("efficiency");
float efficiency = efficiencyRatio(0);
s.serial(efficiency);
s.xmlSetAttrib("stillEnqueued");
uint32 enqueuedSize = getEnqueuedSize(0);
s.serial(enqueuedSize);
s.xmlSetAttrib("addRate");
uint32 addRate = effectiveAddRate(0);
s.serial(addRate);
s.xmlPushEnd();
_Level0[0].dump(s);
s.xmlPop();
s.xmlPushBegin("Level1");
s.xmlSetAttrib("efficiency");
efficiency = efficiencyRatio(1);
s.serial(efficiency);
s.xmlSetAttrib("stillEnqueued");
enqueuedSize = getEnqueuedSize(1);
s.serial(enqueuedSize);
s.xmlSetAttrib("addRate");
addRate = effectiveAddRate(1);
s.serial(addRate);
s.xmlPushEnd();
_Level1[0].dump(s);
_Level1[1].dump(s);
s.xmlPop();
s.xmlPushBegin("Level2");
s.xmlSetAttrib("efficiency");
efficiency = efficiencyRatio(2);
s.serial(efficiency);
s.xmlSetAttrib("stillEnqueued");
enqueuedSize = getEnqueuedSize(2);
s.serial(enqueuedSize);
s.xmlSetAttrib("addRate");
addRate = effectiveAddRate(2);
s.serial(addRate);
s.xmlPushEnd();
_Level2[0].dump(s);
_Level2[1].dump(s);
_Level2[2].dump(s);
_Level2[3].dump(s);
s.xmlPop();
s.xmlPop();
}
// Count Added action
void CImpulseEncoder::countAddedAction(CLFECOMMON::CActionImpulsion *action, uint level)
{
TSendRateQueue& queue = _AddRateQueues[level];
NLMISC::TTime ctime = NLMISC::CTime::getLocalTime();
while (!queue.empty() && ctime > queue.front().Time+(IMPULSE_STAT_MEAN_TIME*1000))
queue.pop_front();
queue.push_back(CTimedSize(ctime, CActionFactory::getInstance()->size(action)));
}
// Effective Add Rate
uint CImpulseEncoder::effectiveAddRate(uint level)
{
TSendRateQueue& queue = _AddRateQueues[level];
NLMISC::TTime ctime = NLMISC::CTime::getLocalTime();
while (!queue.empty() && ctime > queue.front().Time+(IMPULSE_STAT_MEAN_TIME*1000))
queue.pop_front();
uint totalBitAdded = 0;
uint i;
for (i=0; i 0 && l0 < less) less = l0;
if (l1 > 0 && l1 < less) less = l1;
if (l2 > 0 && l2 < less) less = l2;
return less;
}
// Get biggest queue size
uint CImpulseEncoder::biggestQueueSize()
{
uint l0 = getEnqueuedSize(0);
uint l1 = getEnqueuedSize(1);
uint l2 = getEnqueuedSize(2);
uint more = 0;
if (l0 > more) more = l0;
if (l1 > more) more = l1;
if (l2 > more) more = l2;
return more;
}
//
void CImpulseQueue::dump(NLMISC::IStream& s)
{
s.xmlPushBegin("QueueStat");
s.xmlSetAttrib("channel");
s.serial(Channel);
s.xmlSetAttrib("maxBitSize");
s.serial(MaxBitSize);
s.xmlSetAttrib("sendRate");
uint32 sendRate = effectiveSendRate();
s.serial(sendRate);
s.xmlSetAttrib("sentData");
uint64 sentData = totalSentData();
s.serial(sentData);
s.xmlSetAttrib("effectiveSend");
uint32 effectiveSend = FlushTimes;
s.serial(effectiveSend);
s.xmlPushEnd();
s.xmlPush("SendQueue");
for (uint i=0; iremove( *it );
}
}
/*
* Destructor
*/
CImpulseEncoder::~CImpulseEncoder()
{
for ( uint i=0; i!=3; ++i )
{
TActionQueue& queue = _MainQueues[i];
for ( TActionQueue::iterator it=queue.begin(); it!=queue.end(); ++it )
{
CActionFactory::getInstance()->remove( *it );
}
}
}