khanat-code-old/code/nelns/admin_service/admin_service.cpp

1252 lines
32 KiB
C++
Raw Normal View History

2014-09-05 22:35:28 +00:00
// NeLNS - MMORPG Framework <http://dev.ryzom.com/projects/nel/>
// Copyright (C) 2010 Winch Gate Property Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif // HAVE_CONFIG_H
#ifndef NELNS_CONFIG
#define NELNS_CONFIG ""
#endif // NELNS_CONFIG
#ifndef NELNS_LOGS
#define NELNS_LOGS ""
#endif // NELNS_LOGS
#include "nel/misc/types_nl.h"
#include <string>
#include <list>
#ifdef NL_OS_WINDOWS
#include <winsock2.h>
#include <windows.h>
typedef unsigned long ulong;
#endif
#include <mysql.h>
#include <mysql_version.h>
#include "nel/misc/debug.h"
#include "nel/misc/config_file.h"
#include "nel/misc/path.h"
#include "nel/misc/command.h"
#include "nel/net/service.h"
#include "nel/net/varpath.h"
#include "nel/net/email.h"
#include "connection_web.h"
//
// Namespaces
//
using namespace std;
using namespace NLMISC;
using namespace NLNET;
//
// NeL Variables (for config file, etc)
//
// this variable should be used in conjunction with UseExplicitAESRegistration.
// the AS / AES registration process works as follows:
// - aes creates a layer 5 connection to as
// - as gets a serviceUp callback and looks in the database to try to find a match for the AES
// - if the match fails then AS sends a reject message to the AES
// - when the AES receives the reject message they check their UseExplicitAESRegistration flag - if it's set they
// attempt an explicit connection, sending the info required by the AS that would normally come from the database
// - when the AS receives an explicit registration, it verifies the state of the AllowExplicitAESRegistration flag
// and completes the registration work that failed earlier due to the database access failure
CVariable<bool> AllowExplicitAESRegistration("as","AllowExplicitAESRegistration","flag to allow AES services to register explicitly",false,0,true);
// this variable allows one to launch an AS on a machine that doesn't have a database setup
// the functionality of the AS is reduced (particularly in respect to alarms and graphs which are configured via the database)
CVariable<bool> DontUseDataBase("as","DontUseDataBase","if this flag is set calls to the database will be ignored",false,0,true);
//
// Structures
//
struct CRequest
{
CRequest (uint32 id, TSockId from) : Id(id), NbWaiting(0), NbReceived(0), From(from), NbRow(0), NbLines(1)
{
Time = CTime::getSecondsSince1970 ();
}
uint32 Id;
uint NbWaiting;
uint32 NbReceived;
TSockId From;
uint32 Time; // when the request was ask
uint32 NbRow;
uint32 NbLines;
vector<vector<string> > Array; // it's the 2 dimensional array that will be send to the php for variables
vector<string> Log; // this log contains the answer if a command was asked, othewise, Array contains the results
uint32 getVariable(const string &variable)
{
for (uint32 i = 0; i < NbRow; i++)
if (Array[i][0] == variable)
return i;
// need to add the variable
vector<string> NewRow;
NewRow.resize (NbLines);
NewRow[0] = variable;
Array.push_back (NewRow);
return NbRow++;
}
void addLine ()
{
for (uint32 i = 0; i < NbRow; i++)
Array[i].push_back("");
NbLines++;
}
void display ()
{
if (Log.empty())
{
nlinfo ("Display answer array for request %d: %d row %d lines", Id, NbRow, NbLines);
for (uint i = 0; i < NbLines; i++)
{
for (uint j = 0; j < NbRow; j++)
{
nlassert (Array.size () == NbRow);
InfoLog->displayRaw ("%-20s", Array[j][i].c_str());
}
InfoLog->displayRawNL ("");
}
InfoLog->displayRawNL ("End of the array");
}
else
{
nlinfo ("Display the log for request %d: %d lines", Id, Log.size());
for (uint i = 0; i < Log.size(); i++)
{
InfoLog->displayRaw ("%s", Log[i].c_str());
}
InfoLog->displayRawNL ("End of the log");
}
}
};
struct CAdminExecutorService
{
CAdminExecutorService (const string &shard, const string &name, TServiceId sid) : Shard(shard), SId(sid), Name(name) { }
string Shard; /// Name of the shard
TServiceId SId; /// uniq number to identify the AES
string Name; /// name of the admin executor service
vector<uint32> WaitingRequestId; /// contains all request that the server hasn't reply yet
};
typedef list<CAdminExecutorService> TAdminExecutorServices;
typedef list<CAdminExecutorService>::iterator AESIT;
//
// Variables
//
TAdminExecutorServices AdminExecutorServices;
MYSQL *DatabaseConnection = NULL;
vector<CRequest> Requests;
uint32 RequestTimeout = 5; // in second
// cumulate 5 seconds of alert
sint32 AdminAlertAccumlationTime = 5;
//
// Functions
//
AESIT findAES (TServiceId sid, bool asrt = true)
{
AESIT aesit;
for (aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
if ((*aesit).SId == sid)
break;
if (asrt)
nlassert (aesit != AdminExecutorServices.end());
return aesit;
}
AESIT findAES (const string &name, bool asrt = true)
{
AESIT aesit;
for (aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
if ((*aesit).Name == name)
break;
if (asrt)
nlassert (aesit != AdminExecutorServices.end());
return aesit;
}
//
// SQL helpers
//
MYSQL_RES *sqlCurrentQueryResult = NULL;
MYSQL_ROW sqlQuery (const char *format, ...)
{
if (DontUseDataBase)
return 0;
char *query;
NLMISC_CONVERT_VARGS (query, format, 1024);
if (DatabaseConnection == 0)
{
nlwarning ("MYSQL: mysql_query (%s) failed: DatabaseConnection is 0", query);
return NULL;
}
int ret = mysql_query (DatabaseConnection, query);
if (ret != 0)
{
nlwarning ("MYSQL: mysql_query () failed for query '%s': %s", query, mysql_error(DatabaseConnection));
return 0;
}
sqlCurrentQueryResult = mysql_store_result(DatabaseConnection);
if (sqlCurrentQueryResult == 0)
{
nlwarning ("MYSQL: mysql_store_result () failed for query '%s': %s", query, mysql_error(DatabaseConnection));
return 0;
}
MYSQL_ROW row = mysql_fetch_row(sqlCurrentQueryResult);
if (row == 0)
{
nlwarning ("MYSQL: mysql_fetch_row () failed for query '%s': %s", query, mysql_error(DatabaseConnection));
}
nldebug ("MYSQL: sqlQuery(%s) returns %d rows", query, mysql_num_rows(sqlCurrentQueryResult));
return row;
}
MYSQL_ROW sqlNextRow ()
{
if (DontUseDataBase)
return 0;
if (sqlCurrentQueryResult == 0)
return 0;
return mysql_fetch_row(sqlCurrentQueryResult);
}
void sqlFlushResult()
{
if (DontUseDataBase)
return;
if (sqlCurrentQueryResult == NULL)
return;
mysql_free_result(sqlCurrentQueryResult);
sqlCurrentQueryResult = NULL;
}
//
// Admin functions
//
string Email;
uint32 FirstEmailTime = 0;
void sendAdminAlert (const char *format, ...)
{
char *text;
NLMISC_CONVERT_VARGS (text, format, 4096);
if (AdminAlertAccumlationTime == -1)
{
// we don't send email so just display a warning
nlwarning ("ALERT: %s", text);
}
else
{
if(Email.empty() && FirstEmailTime == 0)
{
Email += text;
FirstEmailTime = CTime::getSecondsSince1970();
}
else
{
Email += "\n";
Email += text;
}
nldebug ("ALERT: pushing email into queue: %s", text);
}
}
void updateSendAdminAlert ()
{
if(!Email.empty() && FirstEmailTime != 0 && AdminAlertAccumlationTime >=0 && CTime::getSecondsSince1970() > FirstEmailTime + AdminAlertAccumlationTime)
{
vector<string> lines;
explode (Email, string("\n"), lines, true);
if (!lines.empty())
{
if (IService::getInstance()->ConfigFile.exists("SysLogPath") && IService::getInstance()->ConfigFile.exists("SysLogParams"))
{
// syslog
string param;
if (lines.size() > 1)
{
param = "Multiple problems, first is: ";
}
param += lines[0];
string res = toString(IService::getInstance()->ConfigFile.getVar("SysLogParams").asString().c_str(), param.c_str());
launchProgram(IService::getInstance()->ConfigFile.getVar("SysLogPath").asString(), res);
}
if (IService::getInstance()->ConfigFile.exists("AdminEmail"))
{
// email
string subject;
if (lines.size() == 1)
{
subject = lines[0];
}
else
{
subject = "Multiple problems";
}
std::string from;
if(IService::getInstance()->ConfigFile.exists("AdminEmailFrom"))
from = IService::getInstance()->ConfigFile.getVar("AdminEmailFrom").asString();
CConfigFile::CVar &var = IService::getInstance()->ConfigFile.getVar("AdminEmail");
for (uint i = 0; i < var.size(); i++)
{
if (!sendEmail ("", from, var.asString(i), subject, Email))
{
nlwarning ("Can't send email to '%s'", var.asString(i).c_str());
}
else
{
nlinfo ("ALERT: Sent email to admin %s the subject: %s", var.asString(i).c_str(), subject.c_str());
}
}
}
}
Email = "";
FirstEmailTime = 0;
}
}
static void cbAdminEmail (CMessage &msgin, const std::string &serviceName, TServiceId sid)
{
string str;
msgin.serial(str);
sendAdminAlert (str.c_str());
}
static void cbGraphUpdate (CMessage &msgin, const std::string &serviceName, TServiceId sid)
{
uint32 CurrentTime;
msgin.serial (CurrentTime);
while (msgin.getPos() < (sint32)msgin.length())
{
string var, service;
sint32 val;
msgin.serial (service, var, val);
AESIT aesit = findAES (sid);
string shard, server;
shard = (*aesit).Shard;
server = (*aesit).Name;
if (!shard.empty() && !server.empty() && !service.empty() && !var.empty())
{
string path = CPath::standardizePath (IService::getInstance()->ConfigFile.getVar("RRDVarPath").asString());
string rrdfilename = path + shard+"."+server+"."+service+"."+var+".rrd";
string arg;
if (!NLMISC::CFile::fileExists(rrdfilename))
{
MYSQL_ROW row = sqlQuery ("select graph_update from variable where path like '%%%s' and graph_update!=0", var.c_str());
if (row != NULL)
{
uint32 freq = atoi(row[0]);
arg = "create "+rrdfilename+" --step "+toString(freq)+" DS:var:GAUGE:"+toString(freq*2)+":U:U RRA:AVERAGE:0.5:1:1000 RRA:AVERAGE:0.5:10:1000 RRA:AVERAGE:0.5:100:1000";
launchProgram(IService::getInstance()->ConfigFile.getVar("RRDToolPath").asString(), arg);
}
else
{
nlwarning ("Can't create the rrd because no graph_update in database");
}
sqlFlushResult();
}
arg = "update " + rrdfilename + " " + toString (CurrentTime) + ":" + toString(val);
launchProgram(IService::getInstance()->ConfigFile.getVar("RRDToolPath").asString(), arg);
}
else
{
nlwarning ("Shard server service var val is empty");
}
}
}
//
// Request functions
//
uint32 newRequest (TSockId from)
{
static uint32 NextId = 5461231;
Requests.push_back (CRequest(NextId, from));
return NextId++;
}
void addRequestWaitingNb (uint32 rid)
{
for (uint i = 0 ; i < Requests.size (); i++)
{
if (Requests[i].Id == rid)
{
Requests[i].NbWaiting++;
Requests[i].Time = CTime::getSecondsSince1970 ();
return;
}
}
nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
}
void subRequestWaitingNb (uint32 rid)
{
for (uint i = 0 ; i < Requests.size (); i++)
{
if (Requests[i].Id == rid)
{
Requests[i].NbWaiting--;
return;
}
}
nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
}
void addRequestReceived (uint32 rid)
{
for (uint i = 0 ; i < Requests.size (); i++)
{
if (Requests[i].Id == rid)
{
Requests[i].NbReceived++;
return;
}
}
nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
}
void addRequestAnswer (uint32 rid, const vector<string> &variables, const vector<string> &values)
{
for (uint i = 0 ; i < Requests.size (); i++)
{
Requests[i].addLine ();
if (Requests[i].Id == rid)
{
if (!variables.empty() && variables[0]=="__log")
{
nlassert (variables.size() == 1);
for (uint j = 0; j < values.size(); j++)
{
Requests[i].Log.push_back (values[j]);
}
}
else
{
nlassert (variables.size() == values.size ());
for (uint j = 0; j < variables.size(); j++)
{
uint32 pos = Requests[i].getVariable (variables[j]);
Requests[i].Array[pos][Requests[i].NbLines-1] = values[j];
}
}
return;
}
}
nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
}
bool emptyRequest (uint32 rid)
{
for (uint i = 0 ; i < Requests.size (); i++)
{
if (Requests[i].Id == rid && Requests[i].NbWaiting != 0)
{
return false;
}
}
return true;
}
void cleanRequest ()
{
uint32 currentTime = CTime::getSecondsSince1970 ();
bool timeout;
for (uint i = 0 ; i < Requests.size ();)
{
// the AES doesn't answer quickly
timeout = (currentTime >= Requests[i].Time+RequestTimeout);
if (Requests[i].NbWaiting <= Requests[i].NbReceived || timeout)
{
// the request is over, send to the php
string str;
if (timeout)
{
nlwarning ("REQUEST: Request %d timeouted, only %d on %d services have replied", Requests[i].Id, Requests[i].NbReceived, Requests[i].NbWaiting);
}
if (Requests[i].Log.empty())
{
if (Requests[i].NbRow == 0 && timeout)
{
str = "1 ((TIMEOUT))";
}
else
{
str = toString(Requests[i].NbRow) + " ";
for (uint k = 0; k < Requests[i].NbLines; k++)
{
for (uint j = 0; j < Requests[i].NbRow; j++)
{
nlassert (Requests[i].Array.size () == Requests[i].NbRow);
if (Requests[i].Array[j][k].empty ())
str += "??? ";
else
{
str += Requests[i].Array[j][k];
if (timeout)
str += "((TIMEOUT))";
str += " ";
}
}
}
}
}
else
{
for (uint k = 0; k < Requests[i].Log.size(); k++)
{
str += Requests[i].Log[k];
if (timeout)
str += "((TIMEOUT))";
}
}
sendString (Requests[i].From, str);
// set to 0 to erase it
Requests[i].NbWaiting = 0;
}
if (Requests[i].NbWaiting == 0)
{
Requests.erase (Requests.begin ()+i);
}
else
{
i++;
}
}
}
//
// SQL functions
//
void sqlInit ()
{
if (DontUseDataBase)
return;
MYSQL *db = mysql_init(NULL);
if(db == NULL)
{
nlerror ("mysql_init() failed");
}
my_bool opt = true;
if (mysql_options (db, MYSQL_OPT_RECONNECT, &opt))
{
mysql_close(db);
DatabaseConnection = 0;
nlerror("mysql_options() failed for database connection to '%s'", IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str());
return;
}
DatabaseConnection = mysql_real_connect(db,
IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str(),
IService::getInstance()->ConfigFile.getVar("DatabaseLogin").asString().c_str(),
IService::getInstance()->ConfigFile.getVar("DatabasePassword").asString().c_str(),
IService::getInstance()->ConfigFile.getVar("DatabaseName").asString().c_str(),
0,NULL,0);
if (DatabaseConnection == NULL || DatabaseConnection != db)
{
nlerror ("mysql_real_connect() failed to '%s' with login '%s' and database name '%s' with %s",
IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str(),
IService::getInstance()->ConfigFile.getVar("DatabaseLogin").asString().c_str(),
IService::getInstance()->ConfigFile.getVar("DatabaseName").asString().c_str(),
(IService::getInstance()->ConfigFile.getVar("DatabasePassword").asString().empty()?"empty password":"password")
);
}
#if MYSQL_VERSION_ID < 50019
opt = true;
if (mysql_options (DatabaseConnection, MYSQL_OPT_RECONNECT, &opt))
{
mysql_close(db);
DatabaseConnection = 0;
nlerror("mysql_options() failed for database connection to '%s'", IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str());
return;
}
#endif
}
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////// CONNECTION TO THE AES ///////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
void sendAESInformation (TServiceId sid)
{
AESIT aesit = findAES (sid);
vector<string> information;
CMessage msgout("AES_INFO");
//
// send services that should be running on this AES
//
information.clear ();
MYSQL_ROW row = sqlQuery ("select name from service where server='%s'", (*aesit).Name.c_str());
while (row != NULL)
{
string service = row[0];
nlinfo ("Adding '%s' in registered services to AES-%hu", row[0], sid.get());
information.push_back (service);
row = sqlNextRow ();
}
sqlFlushResult();
msgout.serialCont (information);
//
// send variable alarms for services that should running on this AES
//
information.clear ();
row = sqlQuery ("select path, error_bound, alarm_order from variable where error_bound!=-1");
while (row != NULL)
{
nlinfo ("Adding '%s' '%s' '%s' in alarm to AES-%hu", row[0], row[1], row[2], sid.get());
information.push_back (row[0]);
information.push_back (row[1]);
information.push_back (row[2]);
row = sqlNextRow ();
}
sqlFlushResult();
msgout.serialCont (information);
//
// send graph update for services that should running on this AES
//
information.clear ();
row = sqlQuery ("select path, graph_update from variable where graph_update!=0");
while (row != NULL)
{
CVarPath varpath (row[0]);
for(uint i = 0; i < varpath.Destination.size(); i++)
{
string a = varpath.Destination[i].first, b = (*aesit).Shard;
if(varpath.Destination[i].first == "*" || varpath.Destination[i].first == (*aesit).Shard)
{
CVarPath varpath2 (varpath.Destination[i].second);
for(uint j = 0; j < varpath2.Destination.size(); j++)
{
string c = varpath2.Destination[j].first, d = (*aesit).Name;
if(varpath2.Destination[j].first == "*" || varpath2.Destination[j].first == (*aesit).Name)
{
nlinfo ("Adding '%s' '%s' in graph to AES-%hu", row[0], row[1], sid.get());
information.push_back (row[0]);
information.push_back (row[1]);
}
}
}
}
row = sqlNextRow ();
}
sqlFlushResult();
msgout.serialCont (information);
nlinfo ("Sending all information about %s AES-%hu (hostedservices, alarms,grapupdate)", (*aesit).Name.c_str(), (*aesit).SId.get());
CUnifiedNetwork::getInstance ()->send (sid, msgout);
}
void rejectAES(TServiceId sid, const string &res)
{
CMessage msgout("REJECTED");
msgout.serial ((string &)res);
CUnifiedNetwork::getInstance ()->send (sid, msgout);
}
// i'm connected to a new admin executor service
static void cbNewAESConnection (const std::string &serviceName, TServiceId sid, void *arg)
{
TSockId from;
CCallbackNetBase *cnb = CUnifiedNetwork::getInstance ()->getNetBase (sid, from);
const CInetAddress &ia = cnb->hostAddress (from);
AESIT aesit = findAES (sid, false);
if (aesit != AdminExecutorServices.end ())
{
nlwarning ("Connection of an AES that is already in the list (%s)", ia.asString ().c_str ());
rejectAES (sid, "This AES is already in the AS list");
return;
}
MYSQL_ROW row = sqlQuery ("select name from server where address='%s'", ia.ipAddress().c_str());
if (row == NULL)
{
if (!AllowExplicitAESRegistration)
{
nlwarning ("Connection of an AES that is not in database server list (%s)", ia.asString ().c_str ());
}
else
{
nlinfo ("Rejecting auto-connection of an AES (%s) - this should provke explicitly reconnect", ia.asString ().c_str ());
}
rejectAES (sid, "This AES is not registered in the database");
sqlFlushResult();
return;
}
string server = row[0];
sqlFlushResult();
row = sqlQuery ("select shard from service where server='%s'", server.c_str());
if (row == NULL)
{
nlwarning ("Connection of an AES that is not in database server list (%s)", ia.asString ().c_str ());
rejectAES (sid, "This AES is not registered in the database");
sqlFlushResult();
return;
}
string shard = row[0];
sqlFlushResult();
AdminExecutorServices.push_back (CAdminExecutorService(shard, server, sid));
nlinfo ("%s-%hu, server name %s, for shard %s connected and added in the list", serviceName.c_str(), sid.get(), server.c_str(), shard.c_str());
// send him services that should run on this server
sendAESInformation (sid);
}
// i'm disconnected from an admin executor service
static void cbNewAESDisconnection (const std::string &serviceName, TServiceId sid, void *arg)
{
TSockId from;
CCallbackNetBase *cnb = CUnifiedNetwork::getInstance ()->getNetBase (sid, from);
const CInetAddress &ia = cnb->hostAddress (from);
AESIT aesit = findAES (sid, false);
if (aesit == AdminExecutorServices.end ())
{
nlwarning ("Disconnection of %s-%hu that is not in my list (%s)", serviceName.c_str (), sid.get(), ia.asString ().c_str ());
return;
}
nlinfo ("%s-%hu, shard name %s, disconnected and removed from the list", serviceName.c_str(), sid.get(), (*aesit).Name.c_str ());
// we need to remove pending request
for(uint i = 0; i < (*aesit).WaitingRequestId.size (); i++)
{
subRequestWaitingNb ((*aesit).WaitingRequestId[i]);
}
AdminExecutorServices.erase (aesit);
}
// we receive an explicit registration message from an AES
void cbRegisterAES(CMessage &msgin, const std::string &serviceName, TServiceId sid)
{
if (!AllowExplicitAESRegistration)
{
nlwarning("Ignoring attempted AES registration because AllowExplicitAESRegistration==false");
return;
}
string server;
string shard;
try
{
msgin.serial(server);
msgin.serial(shard);
}
catch(...)
{
nlwarning("Ignoring attempted AES registration due to execption during message decoding");
return;
}
AdminExecutorServices.push_back (CAdminExecutorService(shard, server, sid));
nlinfo ("%s-%hu, server name %s, for shard %s connected and added in the list", serviceName.c_str(), sid.get(), server.c_str(), shard.c_str());
// send him services that should run on this server
sendAESInformation (sid);
}
static void cbView (CMessage &msgin, const std::string &serviceName, TServiceId sid)
{
uint32 rid;
msgin.serial (rid);
AESIT aesit = findAES (sid);
for (uint i = 0; i < (*aesit).WaitingRequestId.size();)
{
if ((*aesit).WaitingRequestId[i] == rid)
{
(*aesit).WaitingRequestId.erase ((*aesit).WaitingRequestId.begin ()+i);
}
else
{
i++;
}
}
MYSQL_ROW row = sqlQuery ("select distinct shard from service where server='%s'", (*aesit).Name.c_str ());
// shard name is find using the "service" table, so, if there s no shard name in it, it returns ???
string shardName;
if (row != NULL) shardName = row[0];
else shardName = DontUseDataBase? aesit->Shard: "???";
vector<string> vara, vala;
while ((uint32)msgin.getPos() < msgin.length())
{
vara.clear ();
vala.clear ();
// adding default row
vara.push_back ("shard");
vara.push_back ("server");
vala.push_back (shardName);
vala.push_back ((*aesit).Name);
uint32 i, nb;
string var, val;
msgin.serial (nb);
for (i = 0; i < nb; i++)
{
msgin.serial (var);
if (var == "__log")
{
vara.clear ();
vala.clear ();
}
vara.push_back (var);
}
if (vara.size() > 0 && vara[0] == "__log")
vala.push_back ("----- Result from Shard "+shardName+" Server "+(*aesit).Name+"\n");
msgin.serial (nb);
for (i = 0; i < nb; i++)
{
msgin.serial (val);
vala.push_back (val);
}
addRequestAnswer (rid, vara, vala);
}
sqlFlushResult();
// inc the NbReceived counter
addRequestReceived (rid);
}
TUnifiedCallbackItem CallbackArray[] =
{
{ "REGISTER_AES", cbRegisterAES },
{ "VIEW", cbView },
{ "ADMIN_EMAIL", cbAdminEmail },
{ "GRAPH_UPDATE", cbGraphUpdate },
};
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////// CONNECTION TO THE CLIENT ////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
void addRequest (const string &rawvarpath, TSockId from)
{
nlinfo ("addRequest from %s: '%s'", from->asString ().c_str (), rawvarpath.c_str ());
if(rawvarpath.empty ())
{
// send an empty string to say to php that there's nothing
string str;
sendString (from, str);
}
//
// special cases
//
if(rawvarpath == "reload")
{
// it means the we have to resend the list of services managed by AES from the mysql tables
for (AESIT aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
{
sendAESInformation ((*aesit).SId);
}
// send an empty string to say to php that there's nothing
string str;
sendString (from, str);
return;
}
//
// normal cases
//
CVarPath varpath (rawvarpath);
uint32 rid = newRequest (from);
for (uint i = 0; i < varpath.Destination.size (); i++)
{
string shard = varpath.Destination[i].first;
CVarPath subvarpath (varpath.Destination[i].second);
for (uint j = 0; j < subvarpath.Destination.size (); j++)
{
string server = subvarpath.Destination[j].first;
if (shard == "*" && server == "*")
{
// Send the request to all online servers of all online shards
AESIT aesit;
for (aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
{
addRequestWaitingNb (rid);
(*aesit).WaitingRequestId.push_back (rid);
CMessage msgout("AES_GET_VIEW");
msgout.serial (rid);
msgout.serial (subvarpath.Destination[j].second);
CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
}
}
else if (shard == "*" && server == "#")
{
// Select all shard all server including offline one
MYSQL_ROW row = sqlQuery ("select distinct server, shard from service");
while (row != NULL)
{
AESIT aesit = findAES (row[0], false);
if (aesit != AdminExecutorServices.end())
{
addRequestWaitingNb (rid);
(*aesit).WaitingRequestId.push_back (rid);
CMessage msgout("AES_GET_VIEW");
msgout.serial (rid);
msgout.serial (subvarpath.Destination[j].second);
CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
}
else if (server == "#")
{
vector<string> vara, vala;
// adding default row
vara.push_back ("shard");
vala.push_back (row[1]);
vara.push_back ("server");
vala.push_back (row[0]);
vara.push_back ("service");
vala.push_back ("AES");
vara.push_back ("State");
vala.push_back ("Offline");
addRequestAnswer (rid, vara, vala);
}
row = sqlNextRow ();
}
sqlFlushResult();
}
else if (server == "*" || server == "#")
{
// Send the request to all online server of a specific shard
MYSQL_ROW row = sqlQuery ("select distinct server from service where shard='%s'", shard.c_str ());
while (row != NULL)
{
AESIT aesit = findAES (row[0], false);
if (aesit != AdminExecutorServices.end())
{
addRequestWaitingNb (rid);
(*aesit).WaitingRequestId.push_back (rid);
CMessage msgout("AES_GET_VIEW");
msgout.serial (rid);
msgout.serial (subvarpath.Destination[j].second);
CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
}
else if (server == "#")
{
vector<string> vara, vala;
// adding default row
vara.push_back ("shard");
vala.push_back (shard);
vara.push_back ("server");
vala.push_back (row[0]);
vara.push_back ("service");
vala.push_back ("AES");
vara.push_back ("State");
vala.push_back ("Offline");
addRequestAnswer (rid, vara, vala);
}
row = sqlNextRow ();
}
sqlFlushResult();
}
else
{
AESIT aesit = findAES (server, false);
if (aesit != AdminExecutorServices.end())
{
addRequestWaitingNb (rid);
(*aesit).WaitingRequestId.push_back (rid);
CMessage msgout("AES_GET_VIEW");
msgout.serial (rid);
msgout.serial (subvarpath.Destination[j].second);
CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
}
else
{
nlwarning ("Server %s is not found in the list", server.c_str ());
}
}
}
}
}
static void varRequestTimeout(CConfigFile::CVar &var)
{
RequestTimeout = var.asInt();
nlinfo ("Request timeout is now after %d seconds", RequestTimeout);
}
static void varAdminAlertAccumlationTime (CConfigFile::CVar &var)
{
AdminAlertAccumlationTime = var.asInt();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////// SERVICE IMPLEMENTATION //////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////
class CAdminService : public IService
{
public:
/// Init the service, load the universal time.
void init ()
{
setDefaultEmailParams (ConfigFile.getVar ("SMTPServer").asString (), ConfigFile.getVar("DefaultEmailFrom").asString(), "");
sqlInit ();
connectionWebInit ();
//CVarPath toto ("[toto");
//CVarPath toto ("*.*.*.*");
//CVarPath toto ("[srv1,srv2].*.*.*");
//CVarPath toto ("[svr1.svc1,srv2.svc2].*.*");
//CVarPath toto ("[svr1.[svc1,svc2].*.var1,srv2.svc2.fe*.var2].toto");
//CVarPath toto ("[svr1.svc1.*.toto,srv2.svc2.*.tata]");
CUnifiedNetwork::getInstance ()->setServiceUpCallback ("AES", cbNewAESConnection);
CUnifiedNetwork::getInstance ()->setServiceDownCallback ("AES", cbNewAESDisconnection);
varRequestTimeout (ConfigFile.getVar ("RequestTimeout"));
ConfigFile.setCallback("RequestTimeout", &varRequestTimeout);
varAdminAlertAccumlationTime (ConfigFile.getVar ("AdminAlertAccumlationTime"));
ConfigFile.setCallback("AdmimAlertAccumlationTime", &varAdminAlertAccumlationTime);
}
bool update ()
{
cleanRequest ();
connectionWebUpdate ();
updateSendAdminAlert ();
return true;
}
void release ()
{
connectionWebRelease ();
}
};
/// Admin Service
NLNET_SERVICE_MAIN (CAdminService, "AS", "admin_service", 49996, CallbackArray, NELNS_CONFIG, NELNS_LOGS);
NLMISC_COMMAND (getViewAS, "send a view and receive an array as result", "<varpath>")
{
string cmd;
for (uint i = 0; i < args.size(); i++)
{
if (i != 0) cmd += " ";
cmd += args[i];
}
addRequest (cmd, NULL);
return true;
}
NLMISC_COMMAND (clearRequests, "clear all pending requests", "")
{
if(args.size() != 0) return false;
// for all request, set the NbWaiting to NbReceived, next cleanRequest() will send answer and clear all request
for (uint i = 0 ; i < Requests.size (); i++)
{
if (Requests[i].NbWaiting <= Requests[i].NbReceived)
{
Requests[i].NbWaiting = Requests[i].NbReceived;
}
}
return true;
}
NLMISC_COMMAND (displayRequests, "display all pending requests", "")
{
if(args.size() != 0) return false;
log.displayNL ("Display %d pending requests", Requests.size ());
for (uint i = 0 ; i < Requests.size (); i++)
{
log.displayNL ("id: %d wait: %d recv: %d from: %s nbrow: %d", Requests[i].Id, Requests[i].NbWaiting, Requests[i].NbReceived, Requests[i].From->asString ().c_str (), Requests[i].NbRow);
}
log.displayNL ("End of display pending requests");
return true;
}
NLMISC_COMMAND (generateAlert, "generate an alert", "<text>")
{
if(args.size() != 1) return false;
sendAdminAlert (args[0].c_str());
return true;
}