#!/usr/bin/python3 # -*- coding: utf-8 -*- # # script to start/stop/status/send command/read log for khaganat process # # Copyright (C) 2017 AleaJactaEst # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Configuration File ------------------ This script need configuration file (see below for model):: [config:server] # Define port listen (default 8000) port = 8000 # Example to generate all key : see pycertificate # key keyfile = /home/gameserver/ca/appli/private/serverkey.pem # certificate certfile = /home/gameserver/ca/appli/certs/servercert.pem # certification to check signature ca_cert = /home/gameserver/ca/appli/certs/cachaincert.pem # address listen (default all port) address = # method : http or https method = https # Admin Executor Service [command:aes] # command to launch the program command = ryzom_admin_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/log/khanat --nobreak --fulladminname=admin_executor_service --shortadminname=AES # Path : where this program is launched path = /home/gameserver/khanat/server/ # size buffer log for each program launched (number line stdout) logsize = 1000 # buffer size (define value bufsize on subprocess.Popen, this buffer is use before read by manager) bufsize = 100 # bms_master : backup_service [command:bms_master] # command to launch the program command = ryzom_backup_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/khanat/server/log --nobreak --writepid -P49990 # Path : where this program is launched path = /home/gameserver/khanat/server/ # we keep [logsize] last number line stdout logsize = 1000 # buffer size (define value bufsize on subprocess.Popen) bufsize = 100 Manager ------- Manage all process khaganat Launch this prorgam in background and use clientManager to manipulate process Design .. graphviz:: digraph Manager { "Manager" -> "ManageCommand (command 1)"; "ManageCommand (command 1)" -> "read_output (thread1)"; "Manager" -> "ManageCommand (command 2)"; "ManageCommand (command 2)" -> "read_output (thread2)"; "Manager" -> "ServerHttp"; "ServerHttp" -> "khaganatHTTPServer"; "khaganatHTTPServer" -> "ManageHttpRequest"; "ManageHttpRequest" -> "ManageCommand (command 1)" [style=dashed]; "ManageHttpRequest" -> "ManageCommand (command 2)" [style=dashed]; } http(s) command : ----------------- +------------------+-------------+---------------------------------------------+---------------------------------------------+ | **Html command** | **Path** | **Argument** {json format} | **Comment** | +------------------+-------------+---------------------------------------------+---------------------------------------------+ | **POST** | /SHUTDOWN | | Stop all process and stop pymanager | +------------------+-------------+---------------------------------------------+---------------------------------------------+ | **POST** | /STARTALL | | Start all processes | +------------------+-------------+---------------------------------------------+---------------------------------------------+ | **GET** | /STATUSALL | | Get status all processes | +------------------+-------------+---------------------------------------------+---------------------------------------------+ | **POST** | /STOPALL | | Stop all processes | +------------------+-------------+---------------------------------------------+---------------------------------------------+ | **POST** | /START | {'name': program} | Start for one program | +------------------+-------------+---------------------------------------------+---------------------------------------------+ | **POST** | /STDIN | {'name': program, 'action': action} | Send action for one program (send to input) | +------------------+-------------+---------------------------------------------+---------------------------------------------+ | **GET** | /STATUS | {'name': program} | Get status for one program | +------------------+-------------+---------------------------------------------+---------------------------------------------+ | **POST** | /STOP | {'name': program} | Stop for one program | +------------------+-------------+---------------------------------------------+---------------------------------------------+ | **GET** | /STDOUT | {'name': program, 'first-line': firstline } | Get log for one program | +------------------+-------------+---------------------------------------------+---------------------------------------------+ Example :: nohup pymanager --log info --filelog /home/gameserver/log/manager.log -c /home/gameserver/cfg/khaganat.cfg 2>/dev/null 1>/dev/null 0 %s" % (command, name, result)) outjson = {'state': result} self._set_headers() self.wfile.write(bytes(json.dumps(outjson), "utf-8")) def check_authentication(self): if not self.server.authentification: return True if __DISABLE_BCRYPT__: logging.error("Error module python bcrypt not installed") return False try: auth_header = self.headers['Authorization'].split() if auth_header[0] != 'Basic': logging.error("Authentification with Bad method (%s)" % auth_header[0]) return False decode = base64.b64decode(auth_header[1]).decode('UTF-8') account, password = decode.split(':', maxsplit=1) if account not in self.server.users: logging.error("Authentification with unknown user (%s)" % account) return False hashed_password = self.server.users[account] if bcrypt.checkpw(password.encode('utf-8'), hashed_password): return True else: logging.error("Authentification with wrong password for user (%s)" % account) return False except (ValueError, IndexError, AttributeError) as e: logging.error("Error detected %s" % e) return False def do_GET(self): """ Manage request READ we can execute LOG, STATUS, LIST & STATUSALL """ logging.debug('get recieved : %s' % self.path) if not self.check_authentication(): self.send_error(403, 'Wrong authentication') logging.error("Wrong authentication") elif self.path == '/STDOUT': self._command_log() elif self.path == '/STATUS': self._send_command("STATUS") elif self.path == '/LIST': self._send_list() elif self.path == '/STATUSALL': self._send_command_all("STATUS") else: self.send_error(400, 'Path unknown') logging.error("Path unknwon '%s'" % self.path) def do_POST(self): """ Manage request POST (CREATE) currently, we execute START, STOP, ACTION, SHUTDOWN, STARTALL & STOPALL """ logging.debug('post recieved : %s' % self.path) if not self.check_authentication(): self.send_error(403, 'Wrong authentication') logging.error("Wrong authentication") elif self.path == '/START': self._send_command("START") elif self.path == '/STOP': self._send_command("STOP") elif self.path == '/STDIN': self._send_action() elif self.path == '/SHUTDOWN': self._send_shutdown() elif self.path == '/STARTALL': self._send_command_all("START") elif self.path == '/STOPALL': self._send_command_all("STOP") else: self.send_error(400, 'Path unknown') logging.error("Path unknwon '%s'" % self.path) def do_HEAD(self): """ request HEAD received """ logging.debug('head recieved : %s' % self.path) self.send_error(404, 'File Not Found: %s' % self.path) def do_PUT(self): """ request PUT (UPDATE/REPLACE) received """ logging.debug('put recieved!') self.send_error(404, 'File Not Found: %s' % self.path) def do_PATCH(self): """ request PATCH (UPDATE/MODIFY) received """ logging.debug('patch recieved!') self.send_error(404, 'File Not Found: %s' % self.path) def do_DELETE(self): """ request DELETE received """ logging.debug('delete recieved!') self.send_error(404, 'File Not Found: %s' % self.path) def do_OPTIONS(self): """ request OPTIONS received """ self.send_response(200, "ok") self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header("Access-Control-Allow-Headers", "*") self.end_headers() class khaganatHTTPServer(ThreadingMixIn, http.server.HTTPServer): """ Class khaganatHTTPServer Redefine HTTPServer (adding queue input & queue output, use by ManageHttpRequest) """ def __init__(self, listQueueIn, listQueueOut, listEvent, server_address, RequestHandlerClass, authentification, users, bind_and_activate=True): http.server.HTTPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate) self.listQueueIn = listQueueIn self.listQueueOut = listQueueOut self.listEvent = listEvent self.authentification = authentification self.users = users class ServerHttp(multiprocessing.Process): """ Initialize server HTTPS * define Dictionnary queueIn & queueOut (with key as section's name in configuration) """ def __init__(self, keyfile, certfile, ca_cert, address='', port=8000, authentification=True, method='http', users={}): multiprocessing.Process.__init__(self) self.listQueueIn = {} self.listQueueOut = {} self.listEvent = {} self.port = port self.key_file = keyfile self.cert_file = certfile self.ca_cert = ca_cert self.address = address self.authentification = authentification self.users = users self.method = method def run(self): server_address = (self.address, self.port) httpd = khaganatHTTPServer(self.listQueueIn, self.listQueueOut, self.listEvent, server_address, ManageHttpRequest, self.authentification, self.users) if self.method == 'http': logging.info('http listen') elif self.method == 'https': if self.ca_cert: httpd.socket = ssl.wrap_socket(httpd.socket, keyfile=self.key_file, certfile=self.cert_file, ca_certs=self.ca_cert, cert_reqs=ssl.CERT_REQUIRED, ssl_version=ssl.PROTOCOL_TLSv1_2, server_side=True) else: httpd.socket = ssl.wrap_socket(httpd.socket, keyfile=self.key_file, certfile=self.cert_file, server_side=True) logging.info('https listen') else: logging.error("Bad value 'method' (%s)" % str(self.method)) raise ValueError httpd.serve_forever() def append(self, name, queueIn, queueOut, event): self.listQueueIn.setdefault(name, queueIn) self.listQueueOut.setdefault(name, queueOut) self.listEvent.setdefault(name, event) class ManageCommand(): """ Manage Command (only one) * start/stop/status/get log/send an action [stdin] for command (receive order with queueIn) * read output [in other thread] * communicate with ManageHttpRequest (with queueOut) """ def __init__(self, name, command, path, logsize, bufsize, queueIn, queueOut, event, maxWaitEnd=10, waitDelay=1): self.process = None self.queueIn = queueIn self.queueOut = queueOut self.name = name self.command = command self.path = path self.log = [] self.poslastlog = 0 self.maxlog = logsize self.event = event self.bufsize = bufsize self.threadRead = None self.running = False self.state = multiprocessing.Queue() self.pipeIn, self.pipeOut = multiprocessing.Pipe() self.eventRunning = threading.Event() self.maxWaitEnd = maxWaitEnd self.waitDelay = waitDelay def read_output(self): """ Thread to read output (stdout) """ fl = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL) fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) logging.debug("Start reader %s" % self.name) while self.eventRunning.is_set(): code = self.process.poll() if code is not None: logging.error("process %s down" % self.name) self.eventRunning.clear() continue try: line = self.process.stdout.readline() except AttributeError: logging.error("process %s down (not detected)" % self.name) self.eventRunning.clear() continue if not line: time.sleep(self.waitDelay) continue now = time.strftime('%Y/%m/%d %H:%M:%S %Z') logging.debug("line %s " % line) self.poslastlog += 1 while len(self.log) >= self.maxlog: self.log.pop(0) msg = line.decode().strip() self.log.append(now + ' ' + msg) logging.debug("recu: '%s'" % (msg)) logging.debug("End reader: '%s'" % self.name) def handler(self, signum, frame): """ Managed signal (not used) """ if self.process: # logging.debug("Send signal %d to '%s'" %(signum, self.name)) self.process.send_signal(signum) else: logging.error("Impossible to send signal %d to '%s'" % (signum, self.name)) raise IOError("signal received") def start(self): """ Start program """ logging.debug("start %s" % (self.name)) if self.process: logging.debug("%s already exist" % self.name) code = self.process.poll() if code is None: logging.debug("%s already exist" % self.name) return "already-started" else: logging.debug("%s crashed" % self.name) code = self.process.wait() logging.error("%s crashed (return code:%d) - restart program" % (self.name, code)) try: self.process = subprocess.Popen(self.command.split(), cwd=self.path, shell=False, bufsize=self.bufsize, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) except FileNotFoundError as e: logging.error("Impossible to start %s (%s)" % (self.name, e)) return "crashed" except PermissionError as e: logging.error("Impossible to start %s (%s)" % (self.name, e)) return "crashed" self.eventRunning.set() if self.threadRead: self.eventRunning.clear() self.threadRead.join() self.threadRead = None self.running = True self.threadRead = threading.Thread(target=self.read_output) self.threadRead.start() return "started" def status(self): """ Get status of program """ logging.debug("status %s" % (self.name)) if self.process: logging.debug("status %s - check" % (self.name)) code = self.process.poll() if code is None: logging.debug("%s status" % (self.name)) return "started" else: logging.error("%s crashed (return code:%d)" % (self.name, code)) self.process = None return "stopped" else: logging.debug("%s status" % (self.name)) return "stopped" def list_thread(self): """ List number thrad (not used) """ logging.debug('list thread') # main_thread = threading.currentThread() for t in threading.enumerate(): logging.debug('thread %s', t.getName()) logging.debug("id %d" % t.ident) def stop(self): """ Stop program """ logging.debug("stop %s" % (self.name)) if not self.process: return "stopped" else: try: code = self.process.poll() loop = self.maxWaitEnd while (code is None) and (loop > 0): logging.debug("stop process %s", self.name) self.process.send_signal(15) time.sleep(1) code = self.process.poll() loop -= 1 except ProcessLookupError as e: logging.warning("Stop process (%s)" % str(e)) try: loop = self.maxWaitEnd while (code is None) and (loop > 0): logging.debug("terminate process %s", self.name) self.process.terminate() time.sleep(1) code = self.process.poll() loop -= 1 except ProcessLookupError as e: logging.warning("Stop process (%s)" % str(e)) try: loop = self.maxWaitEnd while (code is None) and (loop > 0): logging.debug("kill process %s", self.name) self.process.send_signal(9) time.sleep(1) code = self.process.poll() loop -= 1 except ProcessLookupError as e: logging.warning("Stop process (%s)" % str(e)) try: code = self.process.wait() self.process = None if self.threadRead: self.eventRunning.clear() self.threadRead.join() self.threadRead = None logging.info("%s stopped (return code:%d)" % (self.name, code)) except ProcessLookupError as e: logging.warning("Stop process (%s)" % str(e)) return "stopped" def getlog(self, firstline): """ Get log """ logging.debug("read log %d " % firstline) outjson = {} pos = self.poslastlog - len(self.log) + 1 firstlinefound = None for line in self.log: if pos >= firstline: outjson.setdefault(pos, line) if not firstlinefound: firstlinefound = pos pos += 1 outjson.setdefault('first-line', firstlinefound) outjson.setdefault('last-line', pos - 1) return json.dumps(outjson) def action(self, action): """ Send action to program (send input to stdin) """ logging.debug("STDIN '%s'" % action) if self.process: code = self.process.poll() if code is None: if action: self.process.stdin.write(bytes(action+'\n', 'UTF-8')) self.process.stdin.flush() return "ok" return "ko" def run(self): """ loop, run child (wait command) """ loop = True while loop: logging.debug('wait %s' % self.name) self.event.wait() logging.debug('received event %s' % self.name) try: msg = self.queueIn.get(timeout=4) except queue.Empty: self.event.clear() logging.debug("[%s] Queue empty (no message)" % self.name) return logging.debug("command : '%s'" % msg) command = msg.split()[0] if command == "SHUTDOWN": loop = False continue elif command == "START": self.queueOut.put(self.start()) elif command == "STATUS": self.queueOut.put(self.status()) elif command == "STOP": self.queueOut.put(self.stop()) elif command == "STDIN": data = msg.split(maxsplit=1)[1] self.queueOut.put(self.action(data)) elif command == "STDOUT": try: firstline = int(msg.split(maxsplit=1)[1]) except ValueError: logging.warning("Bad value for param first-line (need integer)") firstline = 0 self.queueOut.put(self.getlog(firstline)) else: logging.warning("Bad command (%s)" % command) self.queueOut.put("error : command unknown") self.event.clear() self.stop() self.event.clear() logging.debug('end') class Manager(): """ Manage all services (read configuration, launch ManageCommand & launch ServerHttp & wait the end) * https service * all child to manage (it start ManageCommand by command define in configuration) """ def __init__(self, launch_program): self.threadCommand = [] self.command = [] self.launch_program = launch_program self.param = {} self.users = {} self.passwordfile = None self.serverHttp = None self.port = 8000 self.address = '' self.keyfile = 'crt/key.pem' self.certfile = 'crt/cert.pem' self.ca_cert = 'crt/ca_cert.crt' self.authentification = False self.method = 'http' def load_config(self, filecfg): if filecfg is None: raise ValueError config = configparser.ConfigParser() config.read_file(filecfg) self._load_config(config) def load_password(self): if self.passwordfile: with open(self.passwordfile, 'rt') as fp: for line in fp: line = line.strip() if not line: continue username, password = line.split(':', maxsplit=1) self.users.setdefault(username, password) def _load_config(self, config): """ Read configuration object param: config: configuration object """ logging.debug("Sections :%s" % config.sections()) for name in config.sections(): if name == 'config:client': continue if name == 'config:user': continue elif name == 'config:server': logging.debug("read config '%s'" % name) try: self.port = int(config[name]['port']) except (TypeError, KeyError, ValueError): pass try: self.address = config[name]['address'] except (TypeError, KeyError): pass try: self.keyfile = config[name]['keyfile'] except (TypeError, KeyError): pass try: self.certfile = config[name]['certfile'] except (TypeError, KeyError): pass try: self.ca_cert = config[name]['ca_cert'] except (TypeError, KeyError): pass try: tmp = config[name]['authentification'] if tmp.upper().strip() == 'YES': self.authentification = True else: self.authentification = False except (TypeError, KeyError): pass try: self.passwordfile = config[name]['passwordfile'] except (TypeError, KeyError): pass try: self.method = config[name]['method'] except (TypeError, KeyError): pass else: try: head, value = name.split(':', maxsplit=1) except ValueError: logging.warning("ignore bad parameter '%s'" % (name)) continue if head == 'command' and 'command' in config[name]: logging.debug("read command '%s'" % name) if 'path' in config[name]: path = config[name]['path'] else: path = None if 'logsize' in config[name]: try: logsize = int(config[name]['logsize']) except (TypeError, KeyError, ValueError): logging.error("Impossible to read param logsize (command:%s)", name) raise ValueError else: logsize = 100 if 'bufsize' in config[name]: try: bufsize = int(config[name]['bufsize']) except (TypeError, KeyError, ValueError): logging.error("Impossible to read param bufsize (command:%s)", name) raise ValueError else: bufsize = 100 self.param.setdefault(name, {'command': config[name]['command'], 'path': path, 'logsize': logsize, 'bufsize': bufsize}) def initialize_http(self): """ Initialize object serverHttp """ self.serverHttp = ServerHttp(self.keyfile, self.certfile, self.ca_cert, self.address, self.port, authentification=self.authentification, method=self.method, users=self.users) def runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, event): """ Thread to manage khaganat program """ logging.debug("Initialize '%s'" % name) manageCommand = ManageCommand(name=name, command=command, path=path, logsize=logsize, bufsize=bufsize, queueIn=queueIn, queueOut=queueOut, event=event) manageCommand.run() def launch_server_http(self): """ Launch server https """ self.serverHttp.daemon = True self.serverHttp.start() def launch_command(self): """ Launch child to manage each program """ for name in self.param: logging.debug("Initialize '%s'" % name) queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() event = multiprocessing.Event() self.serverHttp.append(name, queueIn, queueOut, event) threadCommand = multiprocessing.Process(target=self.runCommand, args=(name, self.param[name]['command'], self.param[name]['path'], self.param[name]['logsize'], self.param[name]['bufsize'], queueIn, queueOut, event)) threadCommand.start() self.threadCommand.append(threadCommand) if self.launch_program: event.set() queueIn.put("START") try: item = queueOut.get(timeout=4) except queue.Empty: item = "" logging.debug("[%s] Queue empty (no message)" % name) return logging.info("%s => %s" % (name, item)) def receive_signal(self, signum, frame): """ Managed signal """ for child in self.threadCommand: child.terminate() if self.serverHttp: self.serverHttp.terminate() def wait_children_commands(self): for child in self.threadCommand: child.join() def wait_child_server_http(self): self.serverHttp.terminate() self.serverHttp.join() def run(self): """ launch all """ self.launch_command() self.launch_server_http() logging.info('started') self.wait_children_commands() logging.info('execute shutdown') signal.alarm(0) logging.info('wait thread http') time.sleep(1) self.wait_child_server_http() logging.info('shutdown completed') def root(filecfg, fileLog, logLevel, launch_program, show_log_console): """ Main function :param str filecfg: configuration file :param str fileLog: log file :param bool launch_program: do you launch program when you start manager (auto start) :param bool show_log_console: do you need show log on console """ # Manage log logging.getLogger('logging') numeric_level = getattr(logging, logLevel.upper(), None) if not isinstance(numeric_level, int): raise ValueError('Invalid log level: %s' % logLevel) handlers = [] if show_log_console: handlers.append(logging.StreamHandler()) if fileLog: handlers.append(logging.FileHandler(fileLog.name)) logging.basicConfig(handlers=handlers, level=numeric_level, format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s') if filecfg is None: logging.error("Missing configuration file") raise ValueError manager = Manager(launch_program) manager.load_config(filecfg) manager.load_password() manager.initialize_http() manager.run() def main(args=sys.argv[1:]): """ Main function :param list args: all arguments ('--help, '--version', ...) """ parser = argparse.ArgumentParser(description='Manage khaganat process') parser.add_argument('--version', action='version', version='%(prog)s ' + __VERSION__) parser.add_argument('-c', '--conf', type=argparse.FileType('r'), default='khaganat.cfg', help='configuration file') parser.add_argument('--show-log-console', action='store_true', help='show message in console', default=False) parser.add_argument('--filelog', type=argparse.FileType('wt'), default=None, help='log file') parser.add_argument('--log', default='INFO', help='log level [DEBUG, INFO, WARNING, ERROR') parser.add_argument('--launch-program', action='store_true', help='launch program when start manager', default=False) param = parser.parse_args(args) root(filecfg=param.conf, fileLog=param.filelog, logLevel=param.log, launch_program=param.launch_program, show_log_console=param.show_log_console) if __name__ == '__main__': main()