#!/usr/bin/python3 # -*- coding: utf-8 -*- # # script to start/stop/status/send command/read log for khaganat process # # Copyright (C) 2017 AleaJactaEst # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Configuration File ------------------ This script need configuration file (see below for model):: [config:server] # Define port listen (default 8000) port = 8000 # Example to generate all key : see pycertificate # key keyfile = /home/gameserver/ca/appli/private/serverkey.pem # certificate certfile = /home/gameserver/ca/appli/certs/servercert.pem # certification to check signature ca_cert = /home/gameserver/ca/appli/certs/cachaincert.pem # address listen (default all port) address = # method : http or https method = https # Admin Executor Service [command:aes] # command to launch the program command = ryzom_admin_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/log/khanat --nobreak --fulladminname=admin_executor_service --shortadminname=AES # Path : where this program is launched path = /home/gameserver/khanat/server/ # size buffer log for each program launched (number line stdout) logsize = 1000 # buffer size (define value bufsize on subprocess.Popen, this buffer is use before read by manager) bufsize = 100 # bms_master : backup_service [command:bms_master] # command to launch the program command = ryzom_backup_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/khanat/server/log --nobreak --writepid -P49990 # Path : where this program is launched path = /home/gameserver/khanat/server/ # we keep [logsize] last number line stdout logsize = 1000 # buffer size (define value bufsize on subprocess.Popen) bufsize = 100 # It's possible to collect some message on output (example player conected) with regex command # keep some data on array/dict state activate_filter = yes # size array/dict state size_max_filter = 1000 # search regex to add state (python regex) add_filter = "^((.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P.*)|(.*)(disconnectPlayer)(.+:.+<.+>){0,1}[\s]+(?P.*)[\s]+(is disconnected))" del_filter = "^((.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P.*)|(.*)(disconnectPlayer)(.+:.+<.+>){0,1}[\s]+(?P.*)[\s]+(is disconnected))" # autostart (when start OpenNelManager, launch this program) autostart = no # restart after crash restart_after_crash = yes # Delay after each restart (second) restart_delay = 10 # Enable special filter EGS (account connection / command admin) egs_filter = yes Manager ------- Manage all process khaganat Launch this prorgam in background and use clientManager to manipulate process Design .. graphviz:: digraph Manager { "Manager" -> "ManageCommand (command 1)"; "ManageCommand (command 1)" -> "read_output (thread1)"; "Manager" -> "ManageCommand (command 2)"; "ManageCommand (command 2)" -> "read_output (thread2)"; "Manager" -> "ServerHttp"; "ServerHttp" -> "khaganatHTTPServer"; "khaganatHTTPServer" -> "ManageHttpRequest"; "ManageHttpRequest" -> "ManageCommand (command 1)" [style=dashed]; "ManageHttpRequest" -> "ManageCommand (command 2)" [style=dashed]; } http(s) command : ----------------- +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **Html command** | **Path** | **Argument** {json format} | **Comment** | **Return*** | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **POST** | /SHUTDOWN | | Stop all process and stop pymanager | | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **POST** | /STARTALL | | Start all processes | | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **GET** | /STATUSALL | | Get status all processes | | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **POST** | /STOPALL | | Stop all processes | | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **POST** | /START | {'name': program} | Start for one program | {'state': ''} | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **POST** | /STDIN | {'name': program, 'action': action} | Send action for one program (send to input) | | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **GET** | /STATUS | {'name': program} | Get status for one program | {'state': ''} | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **POST** | /STOP | {'name': program} | Stop for one program | {'state': ''} | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **GET** | /STDOUT | {'name': program, 'first-line': firstline } | Get log for one program | | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **GET** | /FILTER | {'name': program } | Get all state (key find in stdout add/remove) | | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **GET** | /CONFIG | {'name': program } | Get configuration | | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **GET** | /INFO | {'name': program } | Get Information (number player, ...) | | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **GET** | /PLAYER | {'name': program } | Get active player | | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ | **GET** | /ADMINCOMMAND | {'name': program } | Get admin commmand | | +------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ Example :: nohup pymanager --log info --filelog /home/gameserver/log/manager.log -c /home/gameserver/cfg/khaganat.cfg 2>/dev/null 1>/dev/null 0 %s" % (command, name, outjson)) self._set_headers() self.wfile.write(bytes(json.dumps(outjson), "utf-8")) def check_authentication(self): if not self.server.authentification: return True if __DISABLE_BCRYPT__: logging.error("Error module python bcrypt not installed") return False try: auth_header = self.headers['Authorization'].split() if auth_header[0] != 'Basic': logging.error("Authentification with Bad method (%s)" % auth_header[0]) return False decode = base64.b64decode(auth_header[1]).decode('UTF-8') account, password = decode.split(':', maxsplit=1) if account not in self.server.users: logging.error("Authentification with unknown user (%s)" % account) return False hashed_password = self.server.users[account] if bcrypt.checkpw(password.encode('utf-8'), hashed_password): return True else: logging.error("Authentification with wrong password for user (%s)" % account) return False except (ValueError, IndexError, AttributeError) as e: logging.error("Error detected %s" % e) return False def do_GET(self): """ Manage request READ we can execute LOG, STATUS, LIST & STATUSALL """ logging.debug('get recieved : %s' % self.path) if not self.check_authentication(): self.send_error(403, 'Wrong authentication') logging.error("Wrong authentication") elif self.path == '/STDOUT': self._command_log() elif self.path == "/FILTER": self._send_command("FILTER") elif self.path == "/INFO": self._send_command("INFO") elif self.path == "/PLAYER": self._send_command("PLAYER") elif self.path == "/ADMINCOMMAND": self._send_command("ADMINCOMMAND") elif self.path == "/CONFIG": self._send_command("CONFIG") elif self.path == '/STATUS': self._send_command("STATUS") elif self.path == '/LIST': self._send_list() elif self.path == '/STATUSALL': self._send_command_all("STATUS") else: self.send_error(400, 'Path unknown') logging.error("Path unknwon '%s'" % self.path) def do_POST(self): """ Manage request POST (CREATE) currently, we execute START, STOP, ACTION, SHUTDOWN, STARTALL & STOPALL """ logging.debug('post recieved : %s' % self.path) if not self.check_authentication(): self.send_error(403, 'Wrong authentication') logging.error("Wrong authentication") elif self.path == '/START': self._send_command("START") elif self.path == '/STOP': self._send_command("STOP") elif self.path == '/STDIN': self._send_action() elif self.path == '/SHUTDOWN': self._send_shutdown() elif self.path == '/STARTALL': self._send_command_all("START") elif self.path == '/STOPALL': self._send_command_all("STOP") else: self.send_error(400, 'Path unknown') logging.error("Path unknwon '%s'" % self.path) def do_HEAD(self): """ request HEAD received """ logging.debug('head recieved : %s' % self.path) self.send_error(404, 'File Not Found: %s' % self.path) def do_PUT(self): """ request PUT (UPDATE/REPLACE) received """ logging.debug('put recieved!') self.send_error(404, 'File Not Found: %s' % self.path) def do_PATCH(self): """ request PATCH (UPDATE/MODIFY) received """ logging.debug('patch recieved!') self.send_error(404, 'File Not Found: %s' % self.path) def do_DELETE(self): """ request DELETE received """ logging.debug('delete recieved!') self.send_error(404, 'File Not Found: %s' % self.path) def do_OPTIONS(self): """ request OPTIONS received """ self.send_response(200, "ok") self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header("Access-Control-Allow-Headers", "Content-Type, *") self.end_headers() class khaganatHTTPServer(ThreadingMixIn, http.server.HTTPServer): """ Class khaganatHTTPServer Redefine HTTPServer (adding queue input & queue output, use by ManageHttpRequest) """ def __init__(self, listQueueIn, listQueueOut, listSemaphore, server_address, RequestHandlerClass, authentification, users, bind_and_activate=True): http.server.HTTPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate) self.listQueueIn = listQueueIn self.listQueueOut = listQueueOut self.listSemaphore = listSemaphore self.authentification = authentification self.users = users class ServerHttp(multiprocessing.Process): """ Initialize server HTTPS * define Dictionnary queueIn & queueOut (with key as section's name in configuration) """ def __init__(self, keyfile, certfile, ca_cert, address='', port=8000, authentification=True, method='http', users={}): multiprocessing.Process.__init__(self) self.listQueueIn = {} self.listQueueOut = {} self.listSemaphore = {} self.port = port self.key_file = keyfile self.cert_file = certfile self.ca_cert = ca_cert self.address = address self.authentification = authentification self.users = users self.method = method def run(self): server_address = (self.address, self.port) httpd = khaganatHTTPServer(self.listQueueIn, self.listQueueOut, self.listSemaphore, server_address, ManageHttpRequest, self.authentification, self.users) if self.method == 'http': logging.info('http listen') elif self.method == 'https': if self.ca_cert: httpd.socket = ssl.wrap_socket(httpd.socket, keyfile=self.key_file, certfile=self.cert_file, ca_certs=self.ca_cert, cert_reqs=ssl.CERT_REQUIRED, ssl_version=ssl.PROTOCOL_TLSv1_2, server_side=True) else: httpd.socket = ssl.wrap_socket(httpd.socket, keyfile=self.key_file, certfile=self.cert_file, server_side=True) logging.info('https listen') else: logging.error("Bad value 'method' (%s)" % str(self.method)) raise ValueError httpd.serve_forever() logging.info("End") def appendchild(self, name, queueIn, queueOut, semaphore): self.listQueueIn.setdefault(name, queueIn) self.listQueueOut.setdefault(name, queueOut) self.listSemaphore.setdefault(name, semaphore) class ManageCommand(): """ Manage Command (only one) * start/stop/status/get log/send an action [stdin] for command (receive order with queueIn) * read output [in other thread] * communicate with ManageHttpRequest (with queueOut) """ def __init__(self, name, command, path, logsize, bufsize, queueIn, queueOut, semaphore, activate_filter, size_max_filter, add_filter, del_filter, autostart, restart_after_crash, restart_delay, egs_filter, maxWaitEnd=10, waitDelay=1): self.process = None self.queueIn = queueIn self.queueOut = queueOut self.name = name self.command = command self.path = path self.log = [] self.poslastlog = 0 self.maxlog = logsize self.semaphore = semaphore self.bufsize = bufsize self.threadRead = None self.running = False # self.state = multiprocessing.Queue() self.pipeIn, self.pipeOut = multiprocessing.Pipe() self.eventRunningReader = threading.Event() self.eventRunningRestart = threading.Event() self.maxWaitEnd = maxWaitEnd self.waitDelay = waitDelay self.activate_filter = activate_filter self.size_max_filter = size_max_filter self.add_filter_cmd = add_filter[1:-1] self.del_filter_cmd = del_filter[1:-1] self.filter_add_filter = re.compile(self.add_filter_cmd) self.filter_del_filter = re.compile(self.del_filter_cmd) self.filter = {} self.autostart = autostart self.restart_after_crash = restart_after_crash self.restart_delay = restart_delay self.threadRestart = None self.egs_filter = egs_filter self.egs_filter_load_character = re.compile(".*(egs_plinfo).*(: LOADED User )'(?P[\d]+)' Character '(?P[^']+)' from BS stream file 'characters/([\d]+)/account_(?P[\d]+)_(?P[\d]+)_pdr.bin") self.egs_filter_active_character = re.compile(".*(setActiveCharForPlayer).*(: set active char )(?P[\d]+)( for player )(?P[\d]+)") self.egs_filter_sid = re.compile(".*(Mapping UID )(?P[\d]+)( => Sid )\((?P.*)\)") self.egs_filter_client_ready = re.compile(".*(Updating IS_NEWBIE flag for character: )\((?P.*)\)") self.egs_filter_disconnected = re.compile(".*(disconnectPlayer).+[\s]+(player )(?P[\d]+)[\s]+(is disconnected)") self.egs_filter_admin = re.compile("(.*)(cbClientAdmin).*(: ADMIN)(: Player )\((?P.*)\)(?P.+)") # cbClientAdmin EGS-133 : ADMIN: Player (0x0000000021:00:00:86) tried to execute a no valid client admin command 'info' self.filter_load_character = {} self.filter_active_character = {} self.filter_admin = {} self.number_start = 0 self.first_line = 0 self.last_line = 0 self.pos_admin = 0 def _analyze_line(self, msg): now = time.strftime('%Y/%m/%d %H:%M:%S %Z') self.poslastlog += 1 while len(self.log) >= self.maxlog: self.log.pop(0) self.first_line = self.first_line + 1 self.log.append(now + ' ' + msg) self.last_line = self.last_line + 1 # If option sate is defined, analyze message and keep state (example , all player connected) logging.debug("recu: '%s'" % (msg)) if self.activate_filter: res = self.filter_add_filter.match(msg) if res: logging.debug("add_filter found") if len(self.filter) < self.size_max_filter: logging.debug("include add_filter found") dico = res.groupdict() for key in dico: logging.debug("set add_filter found [%s]" % (str(key))) if dico[key]: logging.debug("set1 add_filter found [%s][%s]" % (str(key), str(dico[key]))) self.filter.setdefault(key, {}) self.filter[key].setdefault(dico[key], now) res = self.filter_del_filter.match(msg) if res: logging.debug("del_filter found") dico = res.groupdict() for key in dico: logging.debug("prepare del_filter found %s" % str(key)) if dico[key]: self.filter.setdefault(key, {}) if dico[key] in self.filter[key]: logging.debug("del1 del_filter found [%s][%s][%s]" % (str(key), str(dico[key]), str(self.filter[key]))) del self.filter[key][dico[key]] if self.egs_filter: res = self.egs_filter_load_character.match(msg) if res: logging.debug("egs_filter_load_character found") if len(self.filter_load_character) < self.size_max_filter: logging.debug("include add_filter found") dico = res.groupdict() try: self.filter_load_character.setdefault(dico['UID'], {}) self.filter_load_character[dico['UID']].setdefault(dico['IDCHAR'], {'NameDomain': dico['NameDomain'], 'UID': dico['UIDBIS'], 'when': now}) except KeyError as e: logging.error('Missing key when read "load_character" (%s)' % e) else: logging.warning("impossible to add param 'load_character' (size too high)") return res = self.egs_filter_active_character.match(msg) if res: logging.debug("egs_filter_active_character found") if len(self.filter_active_character) < self.size_max_filter: dico = res.groupdict() try: self.filter_active_character.setdefault(dico['UID'], {}) self.filter_active_character[dico['UID']] = self.filter_load_character[dico['UID']][dico['IDCHAR']] del self.filter_load_character[dico['UID']] except KeyError as e: logging.error('Missing key when read "active_character" (%s)' % e) else: logging.warning("impossible to add param 'active_character' (size too high)") return res = self.egs_filter_sid.match(msg) if res: logging.debug("egs_filter_sid found") dico = res.groupdict() try: if dico['UID'] in self.filter_active_character: self.filter_active_character[dico['UID']].setdefault("SID", dico['SID']) else: logging.error('Impossible to add SID on player %s (Player not found)' % dico['UID']) except KeyError as e: logging.error('Missing key when read "sid" (%s)' % e) return res = self.egs_filter_disconnected.match(msg) if res: logging.debug("egs_filter_sid found") dico = res.groupdict() try: if dico['UID'] in self.filter_active_character: del self.filter_active_character[dico['UID']] else: logging.error('Impossible to remove player %s (Player not found)' % dico['UID']) except KeyError as e: logging.error('Missing key when remove player (%s)' % e) return res = self.egs_filter_admin.match(msg) if res: logging.debug("egs_filter_admin found") while len(self.filter_admin) >= self.maxlog: print(self.pos_admin, self.pos_admin - self.maxlog ) del self.filter_admin[self.pos_admin - self.maxlog] try: dico = res.groupdict() username = '' try: for key in self.filter_active_character: if self.filter_active_character[key]['SID'] == dico['SID']: username = self.filter_active_character[key]['NameDomain'] break except KeyError: pass self.filter_admin.setdefault( self.pos_admin, {'when': now, 'SID': dico['SID'], 'ACTION': dico['ACTION'], 'USER': username}) except KeyError as e: logging.error('Missing key when admin player (%s)' % e) self.pos_admin = self.pos_admin + 1 return def _readline_stdout(self): try: line = self.process.stdout.readline() except AttributeError: logging.error("process %s down (not detected)" % self.name) return True, False except ValueError: logging.error("process %s down (not detected)" % self.name) return True, False if not line: time.sleep(self.waitDelay) return False, True logging.debug("line %s " % line) self._analyze_line(line.decode().strip()) return False, False def read_output(self): """ Thread to read output (stdout) """ fl = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL) fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) logging.debug("Start reader %s" % self.name) crashed = False while self.eventRunningReader.is_set(): try: logging.debug("ping") code = self.process.poll() if code is not None: logging.error("process %s down" % self.name) #self.eventRunning.clear() crashed = True except AttributeError as e: logging.warning("process %s down (%s)" % (self.name, e)) break crashedbis, end = self._readline_stdout() if end and (crashed or crashedbis): break # Send to thread manage process if crashed: logging.debug("Process stopped : '%s'" % self.name) wait_semaphore = self.semaphore.acquire(False) while self.eventRunningReader.is_set() and not wait_semaphore: time.sleep(1) wait_semaphore = self.semaphore.acquire(False) if wait_semaphore == True: self.queueIn.put("STOPPED") self.semaphore.release() if self.activate_filter: self.filter_load_character = {} self.filter_active_character = {} logging.debug("End reader: '%s'" % self.name) def restart(self): """ Thread to restart after crash """ logging.debug('initialize process restart %s (wait %ds)' % (self.name, self.restart_delay)) time.sleep(self.restart_delay) logging.debug('Prepare restart service %s' % (self.name)) wait_semaphore = self.semaphore.acquire(False) while self.eventRunningRestart.is_set() and not wait_semaphore: logging.debug('Ping - restart service %s' % (self.name)) time.sleep(1) wait_semaphore = self.semaphore.acquire(False) logging.debug('Prepare restart service %s (step 2)' % (self.name)) if wait_semaphore == True: logging.debug('Restart service %s' % (self.name)) self.queueIn.put("START") self.queueOut.get() self.semaphore.release() logging.debug('Prepare restart service %s (step 3)' % (self.name)) def receive_signal(self, signum, frame): """ Managed signal (not used) """ logging.info("Received signal %s (%d)" % (self.name, signum)) self.queueIn.put("SHUTDOWN") self.queueIn.put("SHUTDOWN") def start(self): """ Start program """ logging.debug("start %s" % (self.name)) if self.process: logging.debug("%s already exist" % self.name) code = self.process.poll() if code is None: logging.debug("%s already exist" % self.name) return 0 else: logging.debug("%s crashed" % self.name) code = self.process.wait() self.process.stdin.close() self.process.stdout.close() logging.error("%s crashed (return code:%d) - restart program" % (self.name, code)) try: self.process = subprocess.Popen(self.command.split(), cwd=self.path, shell=False, bufsize=self.bufsize, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) except FileNotFoundError as e: logging.error("Impossible to start %s (%s)" % (self.name, e)) return 2 except PermissionError as e: logging.error("Impossible to start %s (%s)" % (self.name, e)) return 2 if self.threadRead: self.eventRunningReader.clear() self.threadRead.join() self.threadRead = None self.running = True self.eventRunningReader.set() self.threadRead = threading.Thread(target=self.read_output) self.threadRead.start() tmp = self.number_start tmp = tmp + 1 if tmp > self.number_start: self.number_start = tmp return 0 def status(self): """ Get status of program """ logging.debug("status %s" % (self.name)) if self.process: logging.debug("status %s - check" % (self.name)) code = self.process.poll() if code is None: logging.debug("%s status [started]" % (self.name)) return 0 else: logging.error("%s crashed (return code:%d)" % (self.name, code)) #self.semaphore #self.queueIn.put("STOPPED") return 2 else: logging.debug("%s status [stopped]" % (self.name)) return 1 def list_thread(self): """ List number thrad (not used) """ logging.debug('list thread') # main_thread = threading.currentThread() for t in threading.enumerate(): logging.debug('thread %s', t.getName()) logging.debug("id %d" % t.ident) def stop(self): """ Stop program """ logging.debug("stop %s" % (self.name)) if not self.process: return 1 else: try: code = self.process.poll() loop = self.maxWaitEnd while (code is None) and (loop > 0): logging.debug("stop process %s", self.name) self.process.send_signal(15) time.sleep(1) code = self.process.poll() loop -= 1 except ProcessLookupError as e: logging.warning("Stop process (%s)" % str(e)) try: loop = self.maxWaitEnd while (code is None) and (loop > 0): logging.debug("terminate process %s", self.name) self.process.terminate() time.sleep(1) code = self.process.poll() loop -= 1 except ProcessLookupError as e: logging.warning("Stop process (%s)" % str(e)) try: loop = self.maxWaitEnd while (code is None) and (loop > 0): logging.debug("kill process %s", self.name) self.process.send_signal(9) time.sleep(1) code = self.process.poll() loop -= 1 except ProcessLookupError as e: logging.warning("Stop process (%s)" % str(e)) try: code = self.process.wait() self.process.stdin.close() self.process.stdout.close() self.process = None if self.threadRead: self.eventRunningReader.clear() self.threadRead.join() self.threadRead = None logging.info("%s stopped (return code:%d)" % (self.name, code)) except ProcessLookupError as e: logging.warning("Stop process (%s)" % str(e)) return 1 def getlog(self, firstline): """ Get log """ logging.debug("read log %d " % firstline) outjson = {} pos = self.poslastlog - len(self.log) + 1 firstlinefound = 0 for line in self.log: if pos >= firstline: outjson.setdefault(pos, line) if not firstlinefound: firstlinefound = pos pos += 1 outjson.setdefault('first-line', firstlinefound) outjson.setdefault('last-line', pos - 1) return outjson def getfilter(self): """ Get filter """ return self.filter def getconfig(self): outjson = { 'activate_filter': str(self.activate_filter), 'bufsize': str(self.bufsize), 'size_max_filter': str(self.size_max_filter), 'path': str(self.path), 'add_filter': str(self.add_filter_cmd), 'del_filter': str(self.del_filter_cmd), 'command': str(self.command), 'maxWaitEnd': str(self.maxWaitEnd), 'waitDelay': str(self.waitDelay), 'maxlog': str(self.maxlog), 'filter': str(self.activate_filter), 'egs': str(self.egs_filter) } return outjson def getinfo(self): outjson = { 'number_launch': str(self.number_start), 'first_line': str(self.first_line), 'last_line': str(self.last_line), 'number_filter': len(self.filter), 'player_connected': len(self.filter_active_character) } return outjson def getplayer(self): return self.filter_active_character def getadmincommand(self): return self.filter_admin def action(self, action): """ Send action to program (send input to stdin) """ logging.debug("STDIN '%s'" % action) if self.process: code = self.process.poll() if code is None: if action: self.process.stdin.write(bytes(action+'\n', 'UTF-8')) self.process.stdin.flush() return "ok" return "ko" def run(self): """ loop, run child (wait command) """ signal.signal(signal.SIGABRT, self.receive_signal) signal.signal(signal.SIGTERM, self.receive_signal) statuscmd = {0:'started', 1:'stopped', 2:'crashed'} loop = True if self.autostart: savedstate = self.start() else: savedstate = 1 while loop: logging.debug('wait event %s' % self.name) msg = self.queueIn.get() logging.debug("command : '%s'" % msg) command = msg.split()[0] if command == "SHUTDOWN": loop = False continue elif command == "START": #if savedstate != 0: savedstate = self.start() self.queueOut.put({'state': statuscmd[savedstate]}) elif command == "STATUS": currentstate = self.status() if currentstate != 1 or savedstate != 2: savedstate = currentstate self.queueOut.put({'state': statuscmd[savedstate], 'last_line': str(self.last_line), 'number_launch': str(self.number_start), 'filter': str(self.activate_filter), 'egs': str(self.egs_filter)}) elif command == "STOP": savedstate = self.stop() self.queueOut.put({'state': statuscmd[savedstate]}) elif command == "STDIN": data = msg.split(maxsplit=1)[1] self.queueOut.put({'state': self.action(data)}) elif command == "STDOUT": try: firstline = int(msg.split(maxsplit=1)[1]) except ValueError: logging.warning("Bad value for param first-line (need integer)") firstline = 0 except IndexError: firstline = 0 self.queueOut.put(self.getlog(firstline)) elif command == "FILTER": self.queueOut.put(self.getfilter()) elif command == "CONFIG": self.queueOut.put(self.getconfig()) elif command == "INFO": self.queueOut.put(self.getinfo()) elif command == "PLAYER": self.queueOut.put(self.getplayer()) elif command == "ADMINCOMMAND": self.queueOut.put(self.getadmincommand()) elif command == "STOPPED": currentstate = self.status() logging.debug('Received event process stopped (current state:%d, saved state:%d)' % (currentstate, savedstate)) if currentstate == 2 and savedstate != 1 and self.restart_after_crash: logging.debug('Prepare restart') self.stop() savedstate = 2 self.eventRunningRestart.clear() #logging.warning("program (%s) is crashed" % self.name) try: self.threadRestart.terminate() self.threadRestart.join() except AttributeError: pass self.eventRunningRestart.set() self.threadRestart = threading.Thread(target=self.restart) self.threadRestart.start() else: logging.warning("Bad command (%s)" % command) self.queueOut.put( {"error" : "command unknown"} ) logging.debug('Stop %s' % self.name) self.stop() logging.debug('prepare end') self.eventRunningReader.clear() if self.threadRead: try: self.threadRead.join() except AttributeError: pass self.eventRunningRestart.clear() if self.threadRestart: try: self.threadRestart.terminate() self.threadRestart.join() except AttributeError: pass logging.debug('end') class Manager(): """ Manage all services (read configuration, launch ManageCommand & launch ServerHttp & wait the end) * https service * all child to manage (it start ManageCommand by command define in configuration) """ def __init__(self, launch_program): self.threadCommand = [] self.info = {} self.command = [] self.launch_program = launch_program self.param = {} self.users = {} self.passwordfile = None self.serverHttp = None self.port = 8000 self.address = '' self.keyfile = 'crt/key.pem' self.certfile = 'crt/cert.pem' self.ca_cert = 'crt/ca_cert.crt' self.authentification = False self.method = 'http' def load_config(self, filecfg): if filecfg is None: raise ValueError config = configparser.ConfigParser() config.read_file(filecfg) self._load_config(config) def load_password(self): if self.passwordfile: with open(self.passwordfile, 'rt') as fp: for line in fp: line = line.strip() if not line: continue username, password = line.split(':', maxsplit=1) self.users.setdefault(username, password) def _load_config(self, config): """ Read configuration object param: config: configuration object """ logging.debug("Sections :%s" % config.sections()) for name in config.sections(): if name == 'config:client': continue if name == 'config:user': continue elif name == 'config:server': logging.debug("read config '%s'" % name) try: self.port = int(config[name]['port']) except (TypeError, KeyError, ValueError): pass try: self.address = config[name]['address'] except (TypeError, KeyError): pass try: self.keyfile = config[name]['keyfile'] except (TypeError, KeyError): pass try: self.certfile = config[name]['certfile'] except (TypeError, KeyError): pass try: self.ca_cert = config[name]['ca_cert'] except (TypeError, KeyError): pass try: tmp = config[name]['authentification'] if tmp.upper().strip() == 'YES': self.authentification = True else: self.authentification = False except (TypeError, KeyError): pass try: self.passwordfile = config[name]['passwordfile'] except (TypeError, KeyError): pass try: self.method = config[name]['method'] except (TypeError, KeyError): pass else: try: head, value = name.split(':', maxsplit=1) except ValueError: logging.warning("ignore bad parameter '%s'" % (name)) continue if head == 'command' and 'command' in config[name]: logging.debug("read command '%s'" % name) if 'path' in config[name]: path = config[name]['path'] else: path = None if 'logsize' in config[name]: try: logsize = int(config[name]['logsize']) except (TypeError, KeyError, ValueError): logging.error("Impossible to read param logsize (command:%s)", name) raise ValueError else: logsize = 100 if 'bufsize' in config[name]: try: bufsize = int(config[name]['bufsize']) except (TypeError, KeyError, ValueError): logging.error("Impossible to read param bufsize (command:%s)", name) raise ValueError else: bufsize = 100 if 'activate_filter' in config[name]: try: tmp = config[name]['activate_filter'] if tmp.upper().strip() == 'YES': activate_filter = True else: activate_filter = False except (TypeError, KeyError, ValueError): logging.error("Impossible to read param activate_filter (command:%s)", name) raise ValueError else: activate_filter = False if 'size_max_filter' in config[name]: try: size_max_filter = int(config[name]['size_max_filter']) except (TypeError, KeyError, ValueError): logging.error("Impossible to read param size_max_filter (command:%s)", name) raise ValueError else: size_max_filter = 100 if 'add_filter' in config[name]: try: add_filter = config[name]['add_filter'] except (TypeError, KeyError, ValueError): logging.error("Impossible to read param add_filter (command:%s)", name) raise ValueError else: add_filter = '' if 'del_filter' in config[name]: try: del_filter = config[name]['del_filter'] except (TypeError, KeyError, ValueError): logging.error("Impossible to read param del_filter (command:%s)", name) raise ValueError else: del_filter = '' if 'autostart' in config[name]: try: tmp = config[name]['autostart'] if tmp.upper().strip() == 'YES': autostart = True else: autostart = False except (TypeError, KeyError, ValueError): logging.error("Impossible to read param autostart (command:%s)", name) raise ValueError else: autostart = False if 'restart_after_crash' in config[name]: try: tmp = config[name]['restart_after_crash'] if tmp.upper().strip() == 'YES': restart_after_crash = True else: restart_after_crash = False except (TypeError, KeyError, ValueError): logging.error("Impossible to read param restart_after_crash (command:%s)", name) raise ValueError else: restart_after_crash = False if 'restart_delay' in config[name]: try: restart_delay = int(config[name]['restart_delay']) except (TypeError, KeyError, ValueError): logging.error("Impossible to read param restart_delay (command:%s)", name) raise ValueError else: restart_delay = 10 if 'egs_filter' in config[name]: try: tmp = config[name]['egs_filter'] if tmp.upper().strip() == 'YES': egs_filter = True else: egs_filter = False except (TypeError, KeyError, ValueError): logging.error("Impossible to read param autostart (command:%s)", name) raise ValueError else: egs_filter = False self.param.setdefault(name, {'command': config[name]['command'], 'path': path, 'logsize': logsize, 'bufsize': bufsize, 'activate_filter': activate_filter, 'size_max_filter': size_max_filter, 'add_filter': add_filter, 'del_filter': del_filter, 'autostart': autostart, 'restart_after_crash': restart_after_crash, 'restart_delay': restart_delay, 'egs_filter': egs_filter}) def initialize_http(self): """ Initialize object serverHttp """ logging.debug("Initialize server http(s)") self.serverHttp = ServerHttp(self.keyfile, self.certfile, self.ca_cert, self.address, self.port, authentification=self.authentification, method=self.method, users=self.users) def runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, semaphore, activate_filter, size_max_filter, add_filter, del_filter, autostart, restart_after_crash, restart_delay, egs_filter): """ Thread to manage khaganat program """ logging.debug("Initialize '%s'" % name) manageCommand = ManageCommand(name=name, command=command, path=path, logsize=logsize, bufsize=bufsize, queueIn=queueIn, queueOut=queueOut, semaphore=semaphore, activate_filter=activate_filter, size_max_filter=size_max_filter, add_filter=add_filter, del_filter=del_filter, autostart=autostart, restart_after_crash=restart_after_crash, restart_delay=restart_delay, egs_filter=egs_filter) manageCommand.run() def launch_server_http(self): """ Launch server https """ self.serverHttp.daemon = True self.serverHttp.start() def launch_command(self): """ Launch child to manage each program """ for name in self.param: logging.debug("Initialize '%s'" % name) queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() # semaphore = multiprocessing.Semaphore() semaphore = multiprocessing.BoundedSemaphore() self.serverHttp.appendchild(name, queueIn, queueOut, semaphore) if self.launch_program: autostart = True else: autostart = self.param[name]['autostart'] threadCommand = multiprocessing.Process(target=self.runCommand, args=(name, self.param[name]['command'], self.param[name]['path'], self.param[name]['logsize'], self.param[name]['bufsize'], queueIn, queueOut, semaphore, self.param[name]['activate_filter'], self.param[name]['size_max_filter'], self.param[name]['add_filter'], self.param[name]['del_filter'], autostart, self.param[name]['restart_after_crash'], self.param[name]['restart_delay'], self.param[name]['egs_filter'])) threadCommand.start() self.threadCommand.append(threadCommand) self.info.setdefault(name, {'queueIn': queueIn, 'queueOut': queueOut, 'semaphore': semaphore, 'threadCommand': threadCommand, 'command': self.param[name]['command'], 'path': self.param[name]['path'], 'logsize': self.param[name]['logsize'], 'bufsize': self.param[name]['bufsize'], 'activate_filter': self.param[name]['activate_filter'], 'size_max_filter': self.param[name]['size_max_filter'], 'add_filter': self.param[name]['add_filter'], 'del_filter': self.param[name]['del_filter'], 'autostart': autostart, 'restart_after_crash': self.param[name]['restart_after_crash'], 'restart_delay': self.param[name]['restart_delay'], 'egs_filter': self.param[name]['egs_filter']}) def receive_signal(self, signum, frame): """ Managed signal """ logging.info("Received signal (%d)" % (signum)) for child in self.threadCommand: logging.info("send signal to child %s" % (child.name)) try: child.terminate() child.join() except AttributeError: logging.info("child not started") pass if self.serverHttp: logging.info("send signal to server http") self.serverHttp.terminate() logging.info("Finalize signal (%d)" % (signum)) def wait_children_commands(self): for child in self.threadCommand: child.join() def wait_child_server_http(self): self.serverHttp.terminate() self.serverHttp.join() def run(self): """ launch all """ signal.signal(signal.SIGABRT, self.receive_signal) signal.signal(signal.SIGTERM, self.receive_signal) self.launch_command() self.launch_server_http() logging.info('started') self.wait_children_commands() logging.info('execute shutdown') signal.alarm(0) logging.info('wait thread http') time.sleep(1) self.wait_child_server_http() logging.info('shutdown completed') def root(filecfg, fileLog, logLevel, launch_program, show_log_console): """ Main function :param str filecfg: configuration file :param str fileLog: log file :param bool launch_program: do you launch program when you start manager (auto start) :param bool show_log_console: do you need show log on console """ # Manage log logging.getLogger('logging') numeric_level = getattr(logging, logLevel.upper(), None) if not isinstance(numeric_level, int): raise ValueError('Invalid log level: %s' % logLevel) handlers = [] if show_log_console: handlers.append(logging.StreamHandler()) if fileLog: handlers.append(logging.FileHandler(fileLog.name)) logging.basicConfig(handlers=handlers, level=numeric_level, format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s') if filecfg is None: logging.error("Missing configuration file") raise ValueError manager = Manager(launch_program) manager.load_config(filecfg) manager.load_password() manager.initialize_http() manager.run() def main(args=sys.argv[1:]): """ Main function :param list args: all arguments ('--help, '--version', ...) """ parser = argparse.ArgumentParser(description='Manage khaganat process') parser.add_argument('--version', action='version', version='%(prog)s ' + __VERSION__) parser.add_argument('-c', '--conf', type=argparse.FileType('r'), default='khaganat.cfg', help='configuration file') parser.add_argument('--show-log-console', action='store_true', help='show message in console', default=False) parser.add_argument('--filelog', type=argparse.FileType('wt'), default=None, help='log file') parser.add_argument('--log', default='INFO', help='log level [DEBUG, INFO, WARNING, ERROR') parser.add_argument('--launch-program', action='store_true', help='launch program when start manager', default=False) param = parser.parse_args(args) root(filecfg=param.conf, fileLog=param.filelog, logLevel=param.log, launch_program=param.launch_program, show_log_console=param.show_log_console) logging.debug("End") if __name__ == '__main__': main()