khanat-code-old/code/khaganat/tools/manage.py

825 lines
30 KiB
Python
Raw Normal View History

#!/usr/bin/python3
#
# script to start/stop/status/send command/read log for khaganat process
#
# Copyright (C) 2017 AleaJactaEst
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Manage all process khaganat
Launch this prorgam in background and use clientManager to manipulate process
you can launch command :
[POST] SHUTDOWN : Stop all process and stop manager
[POST] STARTALL : Start all process
[GET] STATUSALL : Get status all process
[POST] STOPALL : Stop all process
[POST] START {'name': program} : Start one program
[POST] ACTION {'name': program, 'action' : action} : Send action one program (send to input program)
[GET] STATUS {'name': program} : Get status one program
[POST] STOP {'name': program} : Stop one program
[GET] LOG {'name': program, 'first-line': firstline } : Get log for one program
Configuration File : This script need configuration file (see below for model)
------------------------------------------------------------------------------
[config]
# Define port listen (default 8000)
port = 8000
# Generate key
# openssl req -nodes -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -subj "/C=FR/ST=France/L=Paris/O=khaganat/CN=khaganat.org"
# key
keyfile = crt/key.pem
# certificate
certfile = crt/cert.pem
# address listen (default all port)
address =
# Admin Executor Service
[aes]
# command to launch the program
command = ryzom_admin_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/log/khanat --nobreak --fulladminname=admin_executor_service --shortadminname=AES
# Path : where this program is launched
path = /home/gameserver/khanat/server/
# size buffer log for each program launched (number line stdout)
logsize = 1000
# buffer size (define value bufsize on subprocess.Popen, this buffer is use before read by manager)
bufsize = 100
# bms_master : backup_service
[bms_master]
# command to launch the program
command = ryzom_backup_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/khanat/server/log --nobreak --writepid -P49990
# Path : where this program is launched
path = /home/gameserver/khanat/server/
# we keep [logsize] last number line stdout
logsize = 1000
# buffer size (define value bufsize on subprocess.Popen)
bufsize = 100
------------------------------------------------------------------------------
Example :
nohup ./manage.py --log info --filelog /home/gameserver/log/manager.log -c khaganat.cfg 2>/dev/null 1>/dev/null 0</dev/zero &
"""
# docker run -it -v $PWD:/opt/jsa servercontainer_khanat_debian_jessie_x86_64 /bin/bash
# ./manage.py --log debug --show-log-console -c test.cfg
# https://pymotw.com/2/multiprocessing/communication.html
# https://eli.thegreenplace.net/2012/01/24/distributed-computing-in-python-with-multiprocessing/
# https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
import subprocess
import queue
import threading
import signal
import argparse
import configparser
import logging
import logging.config
import multiprocessing
import time
import ssl
import http.server
import json
import fcntl
import os
class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
"""
Class received all request and send to manager process
"""
def __init__(self, request, client_address, server):
""" Initialize object """
http.server.SimpleHTTPRequestHandler.__init__(self, request, client_address, server)
def log_message(self, format, *args):
""" function use to send log"""
logging.info("%s (%s) %s" %
(self.address_string(),
self.log_date_time_string(),
format%args))
def _set_headers(self):
""" Prepare header """
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
def command_log(self):
""" sub request log (send log on specific process) """
if 'content-type' in self.headers:
ctype = self.headers['content-type']
else:
ctype = 'text'
if ctype != 'application/json':
self.send_response(400)
self.end_headers()
return
if 'content-length' in self.headers:
try:
sizemsg = int(self.headers['content-length'])
except:
self.send_response(400)
self.end_headers()
return
else:
self.send_response(400)
self.end_headers()
return
msg = self.rfile.read(sizemsg)
msgjson = json.loads(msg.decode())
logging.debug(msgjson)
if 'name' not in msgjson:
self.send_error(400,'Missing param name')
logging.error("Missing param name")
return
name = msgjson['name']
if name not in self.server.listQueueIn:
self.send_error(400,'Name unknown')
logging.error("Name unknwon '%s'" % name)
return
if 'first-line' not in msgjson:
self.send_error(400,'Missing param first-line')
logging.error("Missing param first-line '%s'" % name)
return
firstLine = 0
try:
firstLine = int(msgjson['first-line'])
except:
self.send_error(400,'Impossible to read first-line')
logging.error("Impossible to read first-line '%s'" % msgjson['first-line'])
return
logging.debug("%s:%s" % (name, firstLine))
self.server.listEvent[name].set()
self.server.listQueueIn[name].put("LOG %d" % firstLine)
logging.debug("message envoye: %s" % (name))
try:
item = self.server.listQueueOut[name].get(timeout = 4)
except queue.Empty:
logging.debug("pas de message recu pour %s" % name)
return
self._set_headers()
self.wfile.write(bytes(item, "utf-8"))
logging.debug("item : %s" % item)
def send_list(self):
""" sub-request to list all program available """
outjson = {}
pos = 0
for program in self.server.listQueueIn:
outjson.setdefault(pos, program)
pos += 1
self._set_headers()
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
def send_shutdown(self):
""" Stop all program and stop manager """
for name in self.server.listQueueIn:
self.server.listEvent[name].set()
self.server.listQueueIn[name].put("SHUTDOWN")
self._set_headers()
outjson = {'shutdown':'ok'}
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
def send_command_all(self, action):
""" Send specific command on all program (start, stop, status)"""
outjson = {}
for name in self.server.listQueueIn:
self.server.listEvent[name].set()
self.server.listQueueIn[name].put(action)
try:
item = self.server.listQueueOut[name].get(timeout = 4)
except queue.Empty:
logging.debug("pas de message recu pour %s" % name)
return
outjson.setdefault(name, item)
self._set_headers()
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
def send_action(self):
""" send specific action on one program """
if 'content-type' in self.headers:
ctype = self.headers['content-type']
else:
ctype = 'text'
if ctype != 'application/json':
self.send_response(400)
self.end_headers()
return
if 'content-length' in self.headers:
try:
sizemsg = int(self.headers['content-length'])
except:
self.send_response(400)
self.end_headers()
return
else:
self.send_response(400)
self.end_headers()
return
msg = self.rfile.read(sizemsg)
msgjson = json.loads(msg.decode())
logging.debug(msgjson)
if 'name' not in msgjson:
self.send_error(400,'Missing param name')
logging.error("Missing param name")
return
name = msgjson['name']
if name not in self.server.listQueueIn:
self.send_error(400,'Name unknown')
logging.error("Name unknwon '%s'" % name)
return
if 'action' not in msgjson:
self.send_error(400,'Missing param action')
logging.error("Missing param action '%s'" % name)
return
action = ''
try:
action = msgjson['action']
except:
self.send_error(400,'Impossible to read action')
logging.error("Impossible to read first-line '%s'" % msgjson['action'])
return
logging.debug("%s:%s" % (name, action))
self.server.listEvent[name].set()
self.server.listQueueIn[name].put("ACTION %s" % action)
logging.debug("message envoye: %s" % (name))
try:
result = self.server.listQueueOut[name].get(timeout = 4)
except queue.Empty:
logging.debug("pas de message recu pour %s" % name)
return
outjson={'state': result}
self._set_headers()
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
def send_command(self, command):
""" Send specific command on one program (start, stop, status)"""
if 'content-type' in self.headers:
ctype = self.headers['content-type']
else:
ctype = 'text'
if ctype != 'application/json':
self.send_response(400)
self.end_headers()
return
if 'content-length' in self.headers:
try:
sizemsg = int(self.headers['content-length'])
except:
self.send_response(400)
self.end_headers()
return
else:
self.send_response(400)
self.end_headers()
return
msg = self.rfile.read(sizemsg)
msgjson = json.loads(msg.decode())
if 'name' not in msgjson:
self.send_error(400,'Missing param name')
logging.error("Missing param name")
return
name = msgjson['name']
if name not in self.server.listQueueIn:
self.send_error(400,'Name unknown')
logging.error("Name unknwon '%s'" % name)
return
logging.debug("[%s %s] Send command" % (command, name))
self.server.listEvent[name].set()
logging.debug("[%s %s] Sent command" % (command, name))
self.server.listQueueIn[name].put(command)
try:
result = self.server.listQueueOut[name].get(timeout = 4)
except queue.Empty:
self.send_error(500,'Missing return')
logging.debug("[%s %s] Missing return" % (command, name))
return
logging.debug("[%s %s] => %s" % (command, name, result))
outjson={'state': result}
self._set_headers()
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
def do_GET(self): # READ
""" Manage request READ
currently, we execute LOG, STATUS & LIST
"""
logging.debug('get recieved : %s' % self.path)
if self.path == '/LOG':
self.command_log()
elif self.path == '/STATUS':
self.send_command("STATUS")
elif self.path == '/LIST':
self.send_list()
elif self.path == '/STATUSALL':
self.send_command_all("STATUS")
else:
self.send_error(400,'Path unknown')
logging.error("Path unknwon '%s'" % self.path)
return
def do_POST(self): # CREATE
""" Manage request POST
currently, we execute START, STOP, ACTION & SHUTDOWN
"""
logging.debug('post recieved : %s' % self.path)
if self.path == '/START':
self.send_command("START")
elif self.path == '/STOP':
self.send_command("STOP")
elif self.path == '/ACTION':
self.send_action()
elif self.path == '/SHUTDOWN':
self.send_shutdown()
elif self.path == '/STARTALL':
self.send_command_all("START")
elif self.path == '/STOPALL':
self.send_command_all("STOP")
else:
self.send_error(400,'Path unknown')
logging.error("Path unknwon '%s'" % self.path)
return
def do_HEAD(self):
""" request HEAD received """
logging.debug('head recieved : %s' % self.path)
self.send_error(404,'File Not Found: %s' % self.path)
def do_PUT(self): # UPDATE/REPLACE
""" request PUT received """
logging.debug('put recieved!')
self.send_error(404,'File Not Found: %s' % self.path)
def do_PATCH(self): # UPDATE/MODIFY
""" request PATCH received """
logging.debug('patch recieved!')
self.send_error(404,'File Not Found: %s' % self.path)
def do_DELETE(self): # DELETE
""" request DELETE received """
logging.debug('delete recieved!')
self.send_error(404,'File Not Found: %s' % self.path)
class khaganatHTTPServer(http.server.HTTPServer):
"""
Class khaganatHTTPServer
"""
def __init__(self,
listQueueIn,
listQueueOut,
listEvent,
server_address,
RequestHandlerClass,
bind_and_activate=True):
http.server.HTTPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate)
self.listQueueIn = listQueueIn
self.listQueueOut = listQueueOut
self.listEvent = listEvent
class ServerHttp(multiprocessing.Process):
""" Initialize server HTTPS """
def __init__(self, keyfile, certfile, address = '', port=8000):
multiprocessing.Process.__init__(self)
self.listQueueIn = {}
self.listQueueOut = {}
self.listEvent = {}
self.port = port
self.keyfile = keyfile
self.certfile = certfile
self.address = address
def run(self):
server_address = (self.address, self.port)
httpd = khaganatHTTPServer(self.listQueueIn,
self.listQueueOut,
self.listEvent,
server_address,
ManageHttpRequest)
httpd.socket = ssl.wrap_socket (httpd.socket,
keyfile = self.keyfile,
certfile = self.certfile,
ca_certs=None,
server_side = True)
httpd.serve_forever()
def append(self, name, queueIn, queueOut, event):
self.listQueueIn.setdefault(name, queueIn)
self.listQueueOut.setdefault(name, queueOut)
self.listEvent.setdefault(name, event)
class ManageCommand():
"""
Thread manage all program
"""
def __init__(self, name, command, path, logsize, bufsize, queueIn, queueOut, event):
self.process = None
self.queueIn = queueIn
self.queueOut = queueOut
self.name = name
self.command = command
self.path = path
self.log = []
self.poslastlog = 0
self.maxlog = logsize
self.event = event
self.bufsize = bufsize
self.threadRead = None
self.running = False
self.state = multiprocessing.Queue()
self.pipeIn, self.pipeOut = multiprocessing.Pipe()
self.eventRunning = threading.Event()
def read_output(self):
fl = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
logging.debug("Start reader %s " % self.name)
while self.eventRunning.is_set():
#logging.debug("Start reader %s " % self.name)
try:
line = self.process.stdout.readline()
if not line:
time.sleep(1)
continue
now = time.strftime('%Y/%m/%d %H:%M:%S %Z')
logging.debug("line %s " % line)
self.poslastlog += 1
while len(self.log) >= self.maxlog:
self.log.pop(0)
msg = line.decode().strip()
self.log.append(now + ' ' + msg)
logging.debug("recu: '%s'" %(msg))
except:
continue
logging.debug("End reader: '%s'" % self.name)
def handler(self, signum, frame):
if self.process:
#logging.debug("Send signal %d to '%s'" %(signum, self.name))
self.process.send_signal(signum)
else:
logging.error("Impossible to send signal %d to '%s'" %(signum, self.name))
raise IOError("signal received")
def start(self):
logging.debug("start %s" % (self.name))
if self.process:
logging.debug("%s already exist" % self.name)
code = self.process.poll()
if code is None:
logging.debug("%s already exist" % self.name)
return "already-started"
else:
logging.debug("%s crashed" % self.name)
code = self.process.wait()
logging.error("%s crashed (return code:%d) - restart program" % (self.name, code))
try:
self.process = subprocess.Popen(self.command.split(),
cwd = self.path,
shell=False,
bufsize=self.bufsize,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=True)
except FileNotFoundError as e:
logging.error("Impossible to start %s (%s)" % (self.name, e))
return "crashed"
self.eventRunning.set()
if self.threadRead:
self.eventRunning.clear()
self.threadRead.join()
self.threadRead = None
self.running = True
self.threadRead = threading.Thread(target=self.read_output)
self.threadRead.start()
return "started"
def status(self):
logging.debug("status %s" % (self.name))
if self.process:
logging.debug("status %s - check" % (self.name))
code = self.process.poll()
if code is None:
logging.debug("%s status" % (self.name))
return "started"
else:
logging.error("%s crashed (return code:%d)" % (self.name, code))
self.process = None
return "stopped"
else:
logging.debug("%s status" % (self.name))
return "stopped"
def list_thread(self):
logging.debug('list thread')
#main_thread = threading.currentThread()
for t in threading.enumerate():
logging.debug('thread %s', t.getName())
logging.debug("id %d" % t.ident)
def stop(self):
logging.debug("stop %s" % (self.name))
if not self.process:
return "stopped"
else:
code = self.process.poll()
loop = 10
while (code is None) and (loop > 0):
logging.debug("stop process %s" , self.name)
self.process.send_signal(15)
time.sleep(1)
code = self.process.poll()
loop -= 1
loop = 10
while (code is None) and (loop > 0):
logging.debug("terminate process %s" , self.name)
self.process.terminate()
time.sleep(1)
code = self.process.poll()
loop -= 1
loop = 10
while (code is None) and (loop > 0):
logging.debug("kill process %s" , self.name)
self.process.send_signal(9)
time.sleep(1)
code = self.process.poll()
loop -= 1
code = self.process.wait()
self.process = None
if self.threadRead:
self.eventRunning.clear()
self.threadRead.join()
self.threadRead = None
logging.info("%s stopped (return code:%d)" % (self.name, code))
return "stopped"
def getlog(self, firstline):
logging.debug("read log %d " % firstline)
outjson = {}
pos = self.poslastlog - len(self.log) + 1
firstlinefound = None
for line in self.log:
if pos >= firstline:
outjson.setdefault(pos, line)
if not firstlinefound:
firstlinefound = pos
pos += 1
outjson.setdefault('first-line', firstlinefound)
outjson.setdefault('last-line', pos - 1)
return json.dumps(outjson)
def action(self, action):
logging.debug("ACTION '%s'" % action)
if self.process:
code = self.process.poll()
if code is None:
if action:
self.process.stdin.write(bytes(action +'\n', 'UTF-8'))
self.process.stdin.flush()
return "ok"
return "ko"
def run(self):
loop = True
while loop:
logging.debug('wait %s' % self.name)
self.event.wait()
logging.debug('received event %s' % self.name)
try:
msg = self.queueIn.get(timeout = 4)
except queue.Empty:
self.event.clear()
logging.debug("pas de message recu pour %s" % self.name)
return
logging.debug("command : '%s'" % msg)
command = msg.split()[0]
if command == "SHUTDOWN":
loop = False
continue
elif command == "START":
self.queueOut.put(self.start())
elif command == "STATUS":
self.queueOut.put(self.status())
elif command == "STOP":
self.queueOut.put(self.stop())
elif command == "ACTION":
data = msg.split(maxsplit=1)[1]
self.queueOut.put(self.action(data))
elif command == "LOG":
try:
firstline = int(msg.split(maxsplit=1)[1])
except:
firstline = 0
self.queueOut.put(self.getlog(firstline))
else:
self.queueOut.put("error : command unknown")
self.event.clear()
self.stop()
self.event.clear()
logging.debug('end')
def runCommand(name, command, path, logsize, bufsize, queueIn, queueOut, event):
"""
Launch Manager
(thread to manage khaganat program)
"""
logging.debug("Initialize '%s'" % name)
manageCommand = ManageCommand(name=name,
command=command,
path=path,
logsize=logsize,
bufsize=bufsize,
queueIn=queueIn,
queueOut=queueOut,
event=event)
manageCommand.run()
class Manager():
def __init__(self, filecfg, launch_program):
self.threadCommand = None
self.command = []
self.launch_program = launch_program
self.param = {}
config = configparser.ConfigParser()
config.read_file(filecfg)
logging.debug("Sections :%s" % config.sections())
for name in config.sections():
if name == 'config':
logging.debug("read config '%s'" % name)
try:
port = int(config[name]['port'])
except:
port = 8000
try:
address = config[name]['address']
except:
address = ''
try:
keyfile = config[name]['keyfile']
except:
keyfile = 'crt/key.pem'
try:
certfile = config[name]['certfile']
except:
certfile = 'crt/cert.pem'
elif 'command' in config[name]:
logging.debug("read command '%s'" % name)
if 'path' in config[name]:
path = config[name]['path']
else:
path = None
if 'logsize' in config[name]:
try:
logsize = int(config[name]['logsize'])
except:
logsize = 100
logging.warning("Impossible to read param logsize (command:%s)", name)
else:
logsize = 100
if 'bufsize' in config[name]:
try:
bufsize = int(config[name]['bufsize'])
except:
bufsize = 100
logging.warning("Impossible to read param bufsize (command:%s)", name)
else:
bufsize = 100
self.param.setdefault(name, {'command': config[name]['command'], 'path': path, 'logsize': logsize, 'bufsize': bufsize})
self.serverHttp = ServerHttp(keyfile, certfile, address, port)
if filecfg is None:
raise ValueError
def launch_server_http(self):
self.serverHttp.daemon = True
self.serverHttp .start()
def launch_command(self):
for name in self.param:
logging.debug("Initialize '%s'" % name)
queueIn = multiprocessing.Queue()
queueOut = multiprocessing.Queue()
event = multiprocessing.Event()
self.serverHttp.append(name, queueIn, queueOut, event)
self.threadCommand = multiprocessing.Process(target=runCommand,
args=(name,
self.param[name]['command'],
self.param[name]['path'],
self.param[name]['logsize'],
self.param[name]['bufsize'],
queueIn,
queueOut,
event))
self.threadCommand.start()
if self.launch_program:
event.set()
queueIn.put("START")
try:
item = queueOut.get(timeout = 4)
except queue.Empty:
item = ""
logging.debug("pas de message recu pour %s" % name)
return
logging.info("%s => %s" % (name, item))
def receive_signal(self, signum, frame):
if self.threadCommand:
print(dir(self.threadCommand))
self.threadCommand.terminate()
if self.serverHttp:
self.serverHttp.terminate()
def run(self):
self.launch_command()
self.launch_server_http()
logging.info('started')
self.threadCommand.join()
logging.info('end')
signal.alarm(0)
logging.info('wait thread http')
time.sleep(1)
self.serverHttp.terminate()
self.serverHttp.join()
logging.info('end')
def main(filecfg, fileLog, logLevel, launch_program, show_log_console):
""" Main function """
# Manage log
logging.getLogger('logging')
numeric_level = getattr(logging, logLevel.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % logLevel)
handlers=[]
if show_log_console:
handlers.append(logging.StreamHandler())
if fileLog:
handlers.append(logging.FileHandler(fileLog.name))
logging.basicConfig(handlers=handlers, level=numeric_level,
format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s')
if filecfg is None:
logging.error("Missing configuration file")
raise ValueError
manager = Manager(filecfg, launch_program)
manager.run()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Manage khaganat process')
parser.add_argument('--version', action='version', version='%(prog)s 1.0')
parser.add_argument('-c', '--conf', type=argparse.FileType('r'),
default='khaganat.cfg', help='configuration file')
parser.add_argument( '--show-log-console', action='store_true',
help='show message in console', default=False)
parser.add_argument('--filelog', type=argparse.FileType('wt'),
default=None, help='log file')
parser.add_argument('--log',
default='INFO', help='log level [DEBUG, INFO, WARNING, ERROR')
parser.add_argument( '--launch-program', action='store_true',
help='launch program when start manager', default=False)
args = parser.parse_args()
main(filecfg=args.conf,
fileLog=args.filelog,
logLevel=args.log,
launch_program=args.launch_program,
show_log_console=args.show_log_console)