// NeLNS - MMORPG Framework
// Copyright (C) 2010 Winch Gate Property Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif // HAVE_CONFIG_H
#ifndef NELNS_CONFIG
#define NELNS_CONFIG ""
#endif // NELNS_CONFIG
#ifndef NELNS_LOGS
#define NELNS_LOGS ""
#endif // NELNS_LOGS
#include "nel/misc/types_nl.h"
#include
#include
#ifdef NL_OS_WINDOWS
#include
#include
typedef unsigned long ulong;
#endif
#include
#include
#include "nel/misc/debug.h"
#include "nel/misc/config_file.h"
#include "nel/misc/path.h"
#include "nel/misc/command.h"
#include "nel/net/service.h"
#include "nel/net/varpath.h"
#include "nel/net/email.h"
#include "connection_web.h"
//
// Namespaces
//
using namespace std;
using namespace NLMISC;
using namespace NLNET;
//
// NeL Variables (for config file, etc)
//
// this variable should be used in conjunction with UseExplicitAESRegistration.
// the AS / AES registration process works as follows:
// - aes creates a layer 5 connection to as
// - as gets a serviceUp callback and looks in the database to try to find a match for the AES
// - if the match fails then AS sends a reject message to the AES
// - when the AES receives the reject message they check their UseExplicitAESRegistration flag - if it's set they
// attempt an explicit connection, sending the info required by the AS that would normally come from the database
// - when the AS receives an explicit registration, it verifies the state of the AllowExplicitAESRegistration flag
// and completes the registration work that failed earlier due to the database access failure
CVariable AllowExplicitAESRegistration("as","AllowExplicitAESRegistration","flag to allow AES services to register explicitly",false,0,true);
// this variable allows one to launch an AS on a machine that doesn't have a database setup
// the functionality of the AS is reduced (particularly in respect to alarms and graphs which are configured via the database)
CVariable DontUseDataBase("as","DontUseDataBase","if this flag is set calls to the database will be ignored",false,0,true);
//
// Structures
//
struct CRequest
{
CRequest (uint32 id, TSockId from) : Id(id), NbWaiting(0), NbReceived(0), From(from), NbRow(0), NbLines(1)
{
Time = CTime::getSecondsSince1970 ();
}
uint32 Id;
uint NbWaiting;
uint32 NbReceived;
TSockId From;
uint32 Time; // when the request was ask
uint32 NbRow;
uint32 NbLines;
vector > Array; // it's the 2 dimensional array that will be send to the php for variables
vector Log; // this log contains the answer if a command was asked, othewise, Array contains the results
uint32 getVariable(const string &variable)
{
for (uint32 i = 0; i < NbRow; i++)
if (Array[i][0] == variable)
return i;
// need to add the variable
vector NewRow;
NewRow.resize (NbLines);
NewRow[0] = variable;
Array.push_back (NewRow);
return NbRow++;
}
void addLine ()
{
for (uint32 i = 0; i < NbRow; i++)
Array[i].push_back("");
NbLines++;
}
void display ()
{
if (Log.empty())
{
nlinfo ("Display answer array for request %d: %d row %d lines", Id, NbRow, NbLines);
for (uint i = 0; i < NbLines; i++)
{
for (uint j = 0; j < NbRow; j++)
{
nlassert (Array.size () == NbRow);
InfoLog->displayRaw ("%-20s", Array[j][i].c_str());
}
InfoLog->displayRawNL ("");
}
InfoLog->displayRawNL ("End of the array");
}
else
{
nlinfo ("Display the log for request %d: %d lines", Id, Log.size());
for (uint i = 0; i < Log.size(); i++)
{
InfoLog->displayRaw ("%s", Log[i].c_str());
}
InfoLog->displayRawNL ("End of the log");
}
}
};
struct CAdminExecutorService
{
CAdminExecutorService (const string &shard, const string &name, TServiceId sid) : Shard(shard), SId(sid), Name(name) { }
string Shard; /// Name of the shard
TServiceId SId; /// uniq number to identify the AES
string Name; /// name of the admin executor service
vector WaitingRequestId; /// contains all request that the server hasn't reply yet
};
typedef list TAdminExecutorServices;
typedef list::iterator AESIT;
//
// Variables
//
TAdminExecutorServices AdminExecutorServices;
MYSQL *DatabaseConnection = NULL;
vector Requests;
uint32 RequestTimeout = 5; // in second
// cumulate 5 seconds of alert
sint32 AdminAlertAccumlationTime = 5;
//
// Functions
//
AESIT findAES (TServiceId sid, bool asrt = true)
{
AESIT aesit;
for (aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
if ((*aesit).SId == sid)
break;
if (asrt)
nlassert (aesit != AdminExecutorServices.end());
return aesit;
}
AESIT findAES (const string &name, bool asrt = true)
{
AESIT aesit;
for (aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
if ((*aesit).Name == name)
break;
if (asrt)
nlassert (aesit != AdminExecutorServices.end());
return aesit;
}
//
// SQL helpers
//
MYSQL_RES *sqlCurrentQueryResult = NULL;
MYSQL_ROW sqlQuery (const char *format, ...)
{
if (DontUseDataBase)
return 0;
char *query;
NLMISC_CONVERT_VARGS (query, format, 1024);
if (DatabaseConnection == 0)
{
nlwarning ("MYSQL: mysql_query (%s) failed: DatabaseConnection is 0", query);
return NULL;
}
int ret = mysql_query (DatabaseConnection, query);
if (ret != 0)
{
nlwarning ("MYSQL: mysql_query () failed for query '%s': %s", query, mysql_error(DatabaseConnection));
return 0;
}
sqlCurrentQueryResult = mysql_store_result(DatabaseConnection);
if (sqlCurrentQueryResult == 0)
{
nlwarning ("MYSQL: mysql_store_result () failed for query '%s': %s", query, mysql_error(DatabaseConnection));
return 0;
}
MYSQL_ROW row = mysql_fetch_row(sqlCurrentQueryResult);
if (row == 0)
{
nlwarning ("MYSQL: mysql_fetch_row () failed for query '%s': %s", query, mysql_error(DatabaseConnection));
}
nldebug ("MYSQL: sqlQuery(%s) returns %d rows", query, mysql_num_rows(sqlCurrentQueryResult));
return row;
}
MYSQL_ROW sqlNextRow ()
{
if (DontUseDataBase)
return 0;
if (sqlCurrentQueryResult == 0)
return 0;
return mysql_fetch_row(sqlCurrentQueryResult);
}
void sqlFlushResult()
{
if (DontUseDataBase)
return;
if (sqlCurrentQueryResult == NULL)
return;
mysql_free_result(sqlCurrentQueryResult);
sqlCurrentQueryResult = NULL;
}
//
// Admin functions
//
string Email;
uint32 FirstEmailTime = 0;
void sendAdminAlert (const char *format, ...)
{
char *text;
NLMISC_CONVERT_VARGS (text, format, 4096);
if (AdminAlertAccumlationTime == -1)
{
// we don't send email so just display a warning
nlwarning ("ALERT: %s", text);
}
else
{
if(Email.empty() && FirstEmailTime == 0)
{
Email += text;
FirstEmailTime = CTime::getSecondsSince1970();
}
else
{
Email += "\n";
Email += text;
}
nldebug ("ALERT: pushing email into queue: %s", text);
}
}
void updateSendAdminAlert ()
{
if(!Email.empty() && FirstEmailTime != 0 && AdminAlertAccumlationTime >=0 && CTime::getSecondsSince1970() > FirstEmailTime + AdminAlertAccumlationTime)
{
vector lines;
explode (Email, string("\n"), lines, true);
if (!lines.empty())
{
if (IService::getInstance()->ConfigFile.exists("SysLogPath") && IService::getInstance()->ConfigFile.exists("SysLogParams"))
{
// syslog
string param;
if (lines.size() > 1)
{
param = "Multiple problems, first is: ";
}
param += lines[0];
string res = toString(IService::getInstance()->ConfigFile.getVar("SysLogParams").asString().c_str(), param.c_str());
launchProgram(IService::getInstance()->ConfigFile.getVar("SysLogPath").asString(), res);
}
if (IService::getInstance()->ConfigFile.exists("AdminEmail"))
{
// email
string subject;
if (lines.size() == 1)
{
subject = lines[0];
}
else
{
subject = "Multiple problems";
}
std::string from;
if(IService::getInstance()->ConfigFile.exists("AdminEmailFrom"))
from = IService::getInstance()->ConfigFile.getVar("AdminEmailFrom").asString();
CConfigFile::CVar &var = IService::getInstance()->ConfigFile.getVar("AdminEmail");
for (uint i = 0; i < var.size(); i++)
{
if (!sendEmail ("", from, var.asString(i), subject, Email))
{
nlwarning ("Can't send email to '%s'", var.asString(i).c_str());
}
else
{
nlinfo ("ALERT: Sent email to admin %s the subject: %s", var.asString(i).c_str(), subject.c_str());
}
}
}
}
Email = "";
FirstEmailTime = 0;
}
}
static void cbAdminEmail (CMessage &msgin, const std::string &serviceName, TServiceId sid)
{
string str;
msgin.serial(str);
sendAdminAlert (str.c_str());
}
static void cbGraphUpdate (CMessage &msgin, const std::string &serviceName, TServiceId sid)
{
uint32 CurrentTime;
msgin.serial (CurrentTime);
while (msgin.getPos() < (sint32)msgin.length())
{
string var, service;
sint32 val;
msgin.serial (service, var, val);
AESIT aesit = findAES (sid);
string shard, server;
shard = (*aesit).Shard;
server = (*aesit).Name;
if (!shard.empty() && !server.empty() && !service.empty() && !var.empty())
{
string path = CPath::standardizePath (IService::getInstance()->ConfigFile.getVar("RRDVarPath").asString());
string rrdfilename = path + shard+"."+server+"."+service+"."+var+".rrd";
string arg;
if (!NLMISC::CFile::fileExists(rrdfilename))
{
MYSQL_ROW row = sqlQuery ("select graph_update from variable where path like '%%%s' and graph_update!=0", var.c_str());
if (row != NULL)
{
uint32 freq = atoi(row[0]);
arg = "create "+rrdfilename+" --step "+toString(freq)+" DS:var:GAUGE:"+toString(freq*2)+":U:U RRA:AVERAGE:0.5:1:1000 RRA:AVERAGE:0.5:10:1000 RRA:AVERAGE:0.5:100:1000";
launchProgram(IService::getInstance()->ConfigFile.getVar("RRDToolPath").asString(), arg);
}
else
{
nlwarning ("Can't create the rrd because no graph_update in database");
}
sqlFlushResult();
}
arg = "update " + rrdfilename + " " + toString (CurrentTime) + ":" + toString(val);
launchProgram(IService::getInstance()->ConfigFile.getVar("RRDToolPath").asString(), arg);
}
else
{
nlwarning ("Shard server service var val is empty");
}
}
}
//
// Request functions
//
uint32 newRequest (TSockId from)
{
static uint32 NextId = 5461231;
Requests.push_back (CRequest(NextId, from));
return NextId++;
}
void addRequestWaitingNb (uint32 rid)
{
for (uint i = 0 ; i < Requests.size (); i++)
{
if (Requests[i].Id == rid)
{
Requests[i].NbWaiting++;
Requests[i].Time = CTime::getSecondsSince1970 ();
return;
}
}
nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
}
void subRequestWaitingNb (uint32 rid)
{
for (uint i = 0 ; i < Requests.size (); i++)
{
if (Requests[i].Id == rid)
{
Requests[i].NbWaiting--;
return;
}
}
nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
}
void addRequestReceived (uint32 rid)
{
for (uint i = 0 ; i < Requests.size (); i++)
{
if (Requests[i].Id == rid)
{
Requests[i].NbReceived++;
return;
}
}
nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
}
void addRequestAnswer (uint32 rid, const vector &variables, const vector &values)
{
for (uint i = 0 ; i < Requests.size (); i++)
{
Requests[i].addLine ();
if (Requests[i].Id == rid)
{
if (!variables.empty() && variables[0]=="__log")
{
nlassert (variables.size() == 1);
for (uint j = 0; j < values.size(); j++)
{
Requests[i].Log.push_back (values[j]);
}
}
else
{
nlassert (variables.size() == values.size ());
for (uint j = 0; j < variables.size(); j++)
{
uint32 pos = Requests[i].getVariable (variables[j]);
Requests[i].Array[pos][Requests[i].NbLines-1] = values[j];
}
}
return;
}
}
nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
}
bool emptyRequest (uint32 rid)
{
for (uint i = 0 ; i < Requests.size (); i++)
{
if (Requests[i].Id == rid && Requests[i].NbWaiting != 0)
{
return false;
}
}
return true;
}
void cleanRequest ()
{
uint32 currentTime = CTime::getSecondsSince1970 ();
bool timeout;
for (uint i = 0 ; i < Requests.size ();)
{
// the AES doesn't answer quickly
timeout = (currentTime >= Requests[i].Time+RequestTimeout);
if (Requests[i].NbWaiting <= Requests[i].NbReceived || timeout)
{
// the request is over, send to the php
string str;
if (timeout)
{
nlwarning ("REQUEST: Request %d timeouted, only %d on %d services have replied", Requests[i].Id, Requests[i].NbReceived, Requests[i].NbWaiting);
}
if (Requests[i].Log.empty())
{
if (Requests[i].NbRow == 0 && timeout)
{
str = "1 ((TIMEOUT))";
}
else
{
str = toString(Requests[i].NbRow) + " ";
for (uint k = 0; k < Requests[i].NbLines; k++)
{
for (uint j = 0; j < Requests[i].NbRow; j++)
{
nlassert (Requests[i].Array.size () == Requests[i].NbRow);
if (Requests[i].Array[j][k].empty ())
str += "??? ";
else
{
str += Requests[i].Array[j][k];
if (timeout)
str += "((TIMEOUT))";
str += " ";
}
}
}
}
}
else
{
for (uint k = 0; k < Requests[i].Log.size(); k++)
{
str += Requests[i].Log[k];
if (timeout)
str += "((TIMEOUT))";
}
}
sendString (Requests[i].From, str);
// set to 0 to erase it
Requests[i].NbWaiting = 0;
}
if (Requests[i].NbWaiting == 0)
{
Requests.erase (Requests.begin ()+i);
}
else
{
i++;
}
}
}
//
// SQL functions
//
void sqlInit ()
{
if (DontUseDataBase)
return;
MYSQL *db = mysql_init(NULL);
if(db == NULL)
{
nlerror ("mysql_init() failed");
}
my_bool opt = true;
if (mysql_options (db, MYSQL_OPT_RECONNECT, &opt))
{
mysql_close(db);
DatabaseConnection = 0;
nlerror("mysql_options() failed for database connection to '%s'", IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str());
return;
}
DatabaseConnection = mysql_real_connect(db,
IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str(),
IService::getInstance()->ConfigFile.getVar("DatabaseLogin").asString().c_str(),
IService::getInstance()->ConfigFile.getVar("DatabasePassword").asString().c_str(),
IService::getInstance()->ConfigFile.getVar("DatabaseName").asString().c_str(),
0,NULL,0);
if (DatabaseConnection == NULL || DatabaseConnection != db)
{
nlerror ("mysql_real_connect() failed to '%s' with login '%s' and database name '%s' with %s",
IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str(),
IService::getInstance()->ConfigFile.getVar("DatabaseLogin").asString().c_str(),
IService::getInstance()->ConfigFile.getVar("DatabaseName").asString().c_str(),
(IService::getInstance()->ConfigFile.getVar("DatabasePassword").asString().empty()?"empty password":"password")
);
}
#if MYSQL_VERSION_ID < 50019
opt = true;
if (mysql_options (DatabaseConnection, MYSQL_OPT_RECONNECT, &opt))
{
mysql_close(db);
DatabaseConnection = 0;
nlerror("mysql_options() failed for database connection to '%s'", IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str());
return;
}
#endif
}
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////// CONNECTION TO THE AES ///////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
void sendAESInformation (TServiceId sid)
{
AESIT aesit = findAES (sid);
vector information;
CMessage msgout("AES_INFO");
//
// send services that should be running on this AES
//
information.clear ();
MYSQL_ROW row = sqlQuery ("select name from service where server='%s'", (*aesit).Name.c_str());
while (row != NULL)
{
string service = row[0];
nlinfo ("Adding '%s' in registered services to AES-%hu", row[0], sid.get());
information.push_back (service);
row = sqlNextRow ();
}
sqlFlushResult();
msgout.serialCont (information);
//
// send variable alarms for services that should running on this AES
//
information.clear ();
row = sqlQuery ("select path, error_bound, alarm_order from variable where error_bound!=-1");
while (row != NULL)
{
nlinfo ("Adding '%s' '%s' '%s' in alarm to AES-%hu", row[0], row[1], row[2], sid.get());
information.push_back (row[0]);
information.push_back (row[1]);
information.push_back (row[2]);
row = sqlNextRow ();
}
sqlFlushResult();
msgout.serialCont (information);
//
// send graph update for services that should running on this AES
//
information.clear ();
row = sqlQuery ("select path, graph_update from variable where graph_update!=0");
while (row != NULL)
{
CVarPath varpath (row[0]);
for(uint i = 0; i < varpath.Destination.size(); i++)
{
string a = varpath.Destination[i].first, b = (*aesit).Shard;
if(varpath.Destination[i].first == "*" || varpath.Destination[i].first == (*aesit).Shard)
{
CVarPath varpath2 (varpath.Destination[i].second);
for(uint j = 0; j < varpath2.Destination.size(); j++)
{
string c = varpath2.Destination[j].first, d = (*aesit).Name;
if(varpath2.Destination[j].first == "*" || varpath2.Destination[j].first == (*aesit).Name)
{
nlinfo ("Adding '%s' '%s' in graph to AES-%hu", row[0], row[1], sid.get());
information.push_back (row[0]);
information.push_back (row[1]);
}
}
}
}
row = sqlNextRow ();
}
sqlFlushResult();
msgout.serialCont (information);
nlinfo ("Sending all information about %s AES-%hu (hostedservices, alarms,grapupdate)", (*aesit).Name.c_str(), (*aesit).SId.get());
CUnifiedNetwork::getInstance ()->send (sid, msgout);
}
void rejectAES(TServiceId sid, const string &res)
{
CMessage msgout("REJECTED");
msgout.serial ((string &)res);
CUnifiedNetwork::getInstance ()->send (sid, msgout);
}
// i'm connected to a new admin executor service
static void cbNewAESConnection (const std::string &serviceName, TServiceId sid, void *arg)
{
TSockId from;
CCallbackNetBase *cnb = CUnifiedNetwork::getInstance ()->getNetBase (sid, from);
const CInetAddress &ia = cnb->hostAddress (from);
AESIT aesit = findAES (sid, false);
if (aesit != AdminExecutorServices.end ())
{
nlwarning ("Connection of an AES that is already in the list (%s)", ia.asString ().c_str ());
rejectAES (sid, "This AES is already in the AS list");
return;
}
MYSQL_ROW row = sqlQuery ("select name from server where address='%s'", ia.ipAddress().c_str());
if (row == NULL)
{
if (!AllowExplicitAESRegistration)
{
nlwarning ("Connection of an AES that is not in database server list (%s)", ia.asString ().c_str ());
}
else
{
nlinfo ("Rejecting auto-connection of an AES (%s) - this should provke explicitly reconnect", ia.asString ().c_str ());
}
rejectAES (sid, "This AES is not registered in the database");
sqlFlushResult();
return;
}
string server = row[0];
sqlFlushResult();
row = sqlQuery ("select shard from service where server='%s'", server.c_str());
if (row == NULL)
{
nlwarning ("Connection of an AES that is not in database server list (%s)", ia.asString ().c_str ());
rejectAES (sid, "This AES is not registered in the database");
sqlFlushResult();
return;
}
string shard = row[0];
sqlFlushResult();
AdminExecutorServices.push_back (CAdminExecutorService(shard, server, sid));
nlinfo ("%s-%hu, server name %s, for shard %s connected and added in the list", serviceName.c_str(), sid.get(), server.c_str(), shard.c_str());
// send him services that should run on this server
sendAESInformation (sid);
}
// i'm disconnected from an admin executor service
static void cbNewAESDisconnection (const std::string &serviceName, TServiceId sid, void *arg)
{
TSockId from;
CCallbackNetBase *cnb = CUnifiedNetwork::getInstance ()->getNetBase (sid, from);
const CInetAddress &ia = cnb->hostAddress (from);
AESIT aesit = findAES (sid, false);
if (aesit == AdminExecutorServices.end ())
{
nlwarning ("Disconnection of %s-%hu that is not in my list (%s)", serviceName.c_str (), sid.get(), ia.asString ().c_str ());
return;
}
nlinfo ("%s-%hu, shard name %s, disconnected and removed from the list", serviceName.c_str(), sid.get(), (*aesit).Name.c_str ());
// we need to remove pending request
for(uint i = 0; i < (*aesit).WaitingRequestId.size (); i++)
{
subRequestWaitingNb ((*aesit).WaitingRequestId[i]);
}
AdminExecutorServices.erase (aesit);
}
// we receive an explicit registration message from an AES
void cbRegisterAES(CMessage &msgin, const std::string &serviceName, TServiceId sid)
{
if (!AllowExplicitAESRegistration)
{
nlwarning("Ignoring attempted AES registration because AllowExplicitAESRegistration==false");
return;
}
string server;
string shard;
try
{
msgin.serial(server);
msgin.serial(shard);
}
catch(...)
{
nlwarning("Ignoring attempted AES registration due to execption during message decoding");
return;
}
AdminExecutorServices.push_back (CAdminExecutorService(shard, server, sid));
nlinfo ("%s-%hu, server name %s, for shard %s connected and added in the list", serviceName.c_str(), sid.get(), server.c_str(), shard.c_str());
// send him services that should run on this server
sendAESInformation (sid);
}
static void cbView (CMessage &msgin, const std::string &serviceName, TServiceId sid)
{
uint32 rid;
msgin.serial (rid);
AESIT aesit = findAES (sid);
for (uint i = 0; i < (*aesit).WaitingRequestId.size();)
{
if ((*aesit).WaitingRequestId[i] == rid)
{
(*aesit).WaitingRequestId.erase ((*aesit).WaitingRequestId.begin ()+i);
}
else
{
i++;
}
}
MYSQL_ROW row = sqlQuery ("select distinct shard from service where server='%s'", (*aesit).Name.c_str ());
// shard name is find using the "service" table, so, if there s no shard name in it, it returns ???
string shardName;
if (row != NULL) shardName = row[0];
else shardName = DontUseDataBase? aesit->Shard: "???";
vector vara, vala;
while ((uint32)msgin.getPos() < msgin.length())
{
vara.clear ();
vala.clear ();
// adding default row
vara.push_back ("shard");
vara.push_back ("server");
vala.push_back (shardName);
vala.push_back ((*aesit).Name);
uint32 i, nb;
string var, val;
msgin.serial (nb);
for (i = 0; i < nb; i++)
{
msgin.serial (var);
if (var == "__log")
{
vara.clear ();
vala.clear ();
}
vara.push_back (var);
}
if (vara.size() > 0 && vara[0] == "__log")
vala.push_back ("----- Result from Shard "+shardName+" Server "+(*aesit).Name+"\n");
msgin.serial (nb);
for (i = 0; i < nb; i++)
{
msgin.serial (val);
vala.push_back (val);
}
addRequestAnswer (rid, vara, vala);
}
sqlFlushResult();
// inc the NbReceived counter
addRequestReceived (rid);
}
TUnifiedCallbackItem CallbackArray[] =
{
{ "REGISTER_AES", cbRegisterAES },
{ "VIEW", cbView },
{ "ADMIN_EMAIL", cbAdminEmail },
{ "GRAPH_UPDATE", cbGraphUpdate },
};
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////// CONNECTION TO THE CLIENT ////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
void addRequest (const string &rawvarpath, TSockId from)
{
nlinfo ("addRequest from %s: '%s'", from->asString ().c_str (), rawvarpath.c_str ());
if(rawvarpath.empty ())
{
// send an empty string to say to php that there's nothing
string str;
sendString (from, str);
}
//
// special cases
//
if(rawvarpath == "reload")
{
// it means the we have to resend the list of services managed by AES from the mysql tables
for (AESIT aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
{
sendAESInformation ((*aesit).SId);
}
// send an empty string to say to php that there's nothing
string str;
sendString (from, str);
return;
}
//
// normal cases
//
CVarPath varpath (rawvarpath);
uint32 rid = newRequest (from);
for (uint i = 0; i < varpath.Destination.size (); i++)
{
string shard = varpath.Destination[i].first;
CVarPath subvarpath (varpath.Destination[i].second);
for (uint j = 0; j < subvarpath.Destination.size (); j++)
{
string server = subvarpath.Destination[j].first;
if (shard == "*" && server == "*")
{
// Send the request to all online servers of all online shards
AESIT aesit;
for (aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
{
addRequestWaitingNb (rid);
(*aesit).WaitingRequestId.push_back (rid);
CMessage msgout("AES_GET_VIEW");
msgout.serial (rid);
msgout.serial (subvarpath.Destination[j].second);
CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
}
}
else if (shard == "*" && server == "#")
{
// Select all shard all server including offline one
MYSQL_ROW row = sqlQuery ("select distinct server, shard from service");
while (row != NULL)
{
AESIT aesit = findAES (row[0], false);
if (aesit != AdminExecutorServices.end())
{
addRequestWaitingNb (rid);
(*aesit).WaitingRequestId.push_back (rid);
CMessage msgout("AES_GET_VIEW");
msgout.serial (rid);
msgout.serial (subvarpath.Destination[j].second);
CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
}
else if (server == "#")
{
vector vara, vala;
// adding default row
vara.push_back ("shard");
vala.push_back (row[1]);
vara.push_back ("server");
vala.push_back (row[0]);
vara.push_back ("service");
vala.push_back ("AES");
vara.push_back ("State");
vala.push_back ("Offline");
addRequestAnswer (rid, vara, vala);
}
row = sqlNextRow ();
}
sqlFlushResult();
}
else if (server == "*" || server == "#")
{
// Send the request to all online server of a specific shard
MYSQL_ROW row = sqlQuery ("select distinct server from service where shard='%s'", shard.c_str ());
while (row != NULL)
{
AESIT aesit = findAES (row[0], false);
if (aesit != AdminExecutorServices.end())
{
addRequestWaitingNb (rid);
(*aesit).WaitingRequestId.push_back (rid);
CMessage msgout("AES_GET_VIEW");
msgout.serial (rid);
msgout.serial (subvarpath.Destination[j].second);
CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
}
else if (server == "#")
{
vector vara, vala;
// adding default row
vara.push_back ("shard");
vala.push_back (shard);
vara.push_back ("server");
vala.push_back (row[0]);
vara.push_back ("service");
vala.push_back ("AES");
vara.push_back ("State");
vala.push_back ("Offline");
addRequestAnswer (rid, vara, vala);
}
row = sqlNextRow ();
}
sqlFlushResult();
}
else
{
AESIT aesit = findAES (server, false);
if (aesit != AdminExecutorServices.end())
{
addRequestWaitingNb (rid);
(*aesit).WaitingRequestId.push_back (rid);
CMessage msgout("AES_GET_VIEW");
msgout.serial (rid);
msgout.serial (subvarpath.Destination[j].second);
CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
}
else
{
nlwarning ("Server %s is not found in the list", server.c_str ());
}
}
}
}
}
static void varRequestTimeout(CConfigFile::CVar &var)
{
RequestTimeout = var.asInt();
nlinfo ("Request timeout is now after %d seconds", RequestTimeout);
}
static void varAdminAlertAccumlationTime (CConfigFile::CVar &var)
{
AdminAlertAccumlationTime = var.asInt();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////// SERVICE IMPLEMENTATION //////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
class CAdminService : public IService
{
public:
/// Init the service, load the universal time.
void init ()
{
setDefaultEmailParams (ConfigFile.getVar ("SMTPServer").asString (), ConfigFile.getVar("DefaultEmailFrom").asString(), "");
sqlInit ();
connectionWebInit ();
//CVarPath toto ("[toto");
//CVarPath toto ("*.*.*.*");
//CVarPath toto ("[srv1,srv2].*.*.*");
//CVarPath toto ("[svr1.svc1,srv2.svc2].*.*");
//CVarPath toto ("[svr1.[svc1,svc2].*.var1,srv2.svc2.fe*.var2].toto");
//CVarPath toto ("[svr1.svc1.*.toto,srv2.svc2.*.tata]");
CUnifiedNetwork::getInstance ()->setServiceUpCallback ("AES", cbNewAESConnection);
CUnifiedNetwork::getInstance ()->setServiceDownCallback ("AES", cbNewAESDisconnection);
varRequestTimeout (ConfigFile.getVar ("RequestTimeout"));
ConfigFile.setCallback("RequestTimeout", &varRequestTimeout);
varAdminAlertAccumlationTime (ConfigFile.getVar ("AdminAlertAccumlationTime"));
ConfigFile.setCallback("AdmimAlertAccumlationTime", &varAdminAlertAccumlationTime);
}
bool update ()
{
cleanRequest ();
connectionWebUpdate ();
updateSendAdminAlert ();
return true;
}
void release ()
{
connectionWebRelease ();
}
};
/// Admin Service
NLNET_SERVICE_MAIN (CAdminService, "AS", "admin_service", 49996, CallbackArray, NELNS_CONFIG, NELNS_LOGS);
NLMISC_COMMAND (getViewAS, "send a view and receive an array as result", "")
{
string cmd;
for (uint i = 0; i < args.size(); i++)
{
if (i != 0) cmd += " ";
cmd += args[i];
}
addRequest (cmd, NULL);
return true;
}
NLMISC_COMMAND (clearRequests, "clear all pending requests", "")
{
if(args.size() != 0) return false;
// for all request, set the NbWaiting to NbReceived, next cleanRequest() will send answer and clear all request
for (uint i = 0 ; i < Requests.size (); i++)
{
if (Requests[i].NbWaiting <= Requests[i].NbReceived)
{
Requests[i].NbWaiting = Requests[i].NbReceived;
}
}
return true;
}
NLMISC_COMMAND (displayRequests, "display all pending requests", "")
{
if(args.size() != 0) return false;
log.displayNL ("Display %d pending requests", Requests.size ());
for (uint i = 0 ; i < Requests.size (); i++)
{
log.displayNL ("id: %d wait: %d recv: %d from: %s nbrow: %d", Requests[i].Id, Requests[i].NbWaiting, Requests[i].NbReceived, Requests[i].From->asString ().c_str (), Requests[i].NbRow);
}
log.displayNL ("End of display pending requests");
return true;
}
NLMISC_COMMAND (generateAlert, "generate an alert", "")
{
if(args.size() != 1) return false;
sendAdminAlert (args[0].c_str());
return true;
}