mirror of
https://port.numenaute.org/aleajactaest/khanat-code-old.git
synced 2025-01-12 09:55:20 +00:00
868 lines
33 KiB
Python
Executable file
868 lines
33 KiB
Python
Executable file
#!/usr/bin/python3
|
|
#
|
|
# script to start/stop/status/send command/read log for khaganat process
|
|
#
|
|
# Copyright (C) 2017 AleaJactaEst
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
"""
|
|
Manage all process khaganat
|
|
Launch this prorgam in background and use clientManager to manipulate process
|
|
|
|
Design
|
|
+ Manager
|
|
+ -> ManageCommand [NumCommand]
|
|
| + -> read_output (thread)
|
|
+ -> ServerHttp -> khaganatHTTPServer
|
|
| + -> ManageHttpRequest (each request are send on this class) [NumRequestHttp]
|
|
|
|
http(s) command :
|
|
[Method] /Path {json data}
|
|
--------------------------
|
|
[POST] /SHUTDOWN : Stop all process and stop manager
|
|
[POST] /STARTALL : Start all process
|
|
[GET] /STATUSALL : Get status all process
|
|
[POST] /STOPALL : Stop all process
|
|
[POST] /START {'name': program} : Start one program
|
|
[POST] /ACTION {'name': program, 'action' : action} : Send action one program (send to input program)
|
|
[GET] /STATUS {'name': program} : Get status one program
|
|
[POST] /STOP {'name': program} : Stop one program
|
|
[GET] /LOG {'name': program, 'first-line': firstline } : Get log for one program
|
|
|
|
Configuration File : This script need configuration file (see below for model)
|
|
------------------------------------------------------------------------------
|
|
[manager]
|
|
# Define port listen (default 8000)
|
|
port = 8000
|
|
|
|
# Generate key
|
|
# openssl req -nodes -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -subj "/C=FR/ST=France/L=Paris/O=khaganat/CN=khaganat.org"
|
|
|
|
# your key
|
|
keyfile = crt/key.pem
|
|
|
|
# your certificate
|
|
certfile = crt/cert.pem
|
|
|
|
# certification to check signature
|
|
ca_cert = /home/gameserver/ca/appli/certs/cachaincert.pem
|
|
|
|
# address listen (default all port)
|
|
address =
|
|
|
|
# Admin Executor Service
|
|
[aes]
|
|
# command to launch the program
|
|
command = ryzom_admin_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/log/khanat --nobreak --fulladminname=admin_executor_service --shortadminname=AES
|
|
# Path : where this program is launched
|
|
path = /home/gameserver/khanat/server/
|
|
# size buffer log for each program launched (number line stdout)
|
|
logsize = 1000
|
|
# buffer size (define value bufsize on subprocess.Popen, this buffer is use before read by manager)
|
|
bufsize = 100
|
|
|
|
# bms_master : backup_service
|
|
[bms_master]
|
|
# command to launch the program
|
|
command = ryzom_backup_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/khanat/server/log --nobreak --writepid -P49990
|
|
# Path : where this program is launched
|
|
path = /home/gameserver/khanat/server/
|
|
# we keep [logsize] last number line stdout
|
|
logsize = 1000
|
|
# buffer size (define value bufsize on subprocess.Popen)
|
|
bufsize = 100
|
|
------------------------------------------------------------------------------
|
|
Example :
|
|
nohup ./manage.py --log info --filelog /home/gameserver/log/manager.log -c khaganat.cfg 2>/dev/null 1>/dev/null 0</dev/zero &
|
|
|
|
"""
|
|
|
|
import subprocess
|
|
import queue
|
|
import threading
|
|
import signal
|
|
import argparse
|
|
import configparser
|
|
import logging
|
|
import logging.config
|
|
import multiprocessing
|
|
import time
|
|
import ssl
|
|
import http.server
|
|
import json
|
|
import fcntl
|
|
import os
|
|
|
|
class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
|
|
"""
|
|
Class ManageHttpRequest receive all request https
|
|
* analyze and send to ManageCommand (with queueIn & queueOut)
|
|
"""
|
|
def __init__(self, request, client_address, server):
|
|
""" Initialize object """
|
|
http.server.SimpleHTTPRequestHandler.__init__(self, request, client_address, server)
|
|
|
|
def log_message(self, format, *args):
|
|
""" function use to send log"""
|
|
logging.info("%s (%s) %s" %
|
|
(self.address_string(),
|
|
self.log_date_time_string(),
|
|
format%args))
|
|
|
|
def _set_headers(self):
|
|
""" Prepare header """
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'application/json')
|
|
self.end_headers()
|
|
|
|
def command_log(self):
|
|
""" sub request log (send log on specific process) """
|
|
if 'content-type' in self.headers:
|
|
ctype = self.headers['content-type']
|
|
else:
|
|
ctype = 'text'
|
|
if ctype != 'application/json':
|
|
logging.error("Received request with bad content-type")
|
|
self.send_response(400, "bad content-type")
|
|
self.end_headers()
|
|
return
|
|
try:
|
|
sizemsg = int(self.headers['content-length'])
|
|
except (TypeError, KeyError, ValueError):
|
|
logging.error("Received request with bad content-length")
|
|
self.send_response(400, "bad content-length")
|
|
self.end_headers()
|
|
return
|
|
|
|
msg = self.rfile.read(sizemsg)
|
|
msgjson = json.loads(msg.decode())
|
|
|
|
logging.debug(msgjson)
|
|
if 'name' not in msgjson:
|
|
self.send_error(400, 'Missing param name')
|
|
logging.error("Missing param name")
|
|
return
|
|
name = msgjson['name']
|
|
if name not in self.server.listQueueIn:
|
|
self.send_error(400, 'Name unknown')
|
|
logging.error("Name unknwon '%s'" % name)
|
|
return
|
|
|
|
if 'first-line' not in msgjson:
|
|
self.send_error(400, 'Missing param first-line')
|
|
logging.error("Missing param first-line '%s'" % name)
|
|
return
|
|
|
|
firstLine = 0
|
|
try:
|
|
firstLine = int(msgjson['first-line'])
|
|
except ValueError:
|
|
self.send_error(400, 'Impossible to read first-line')
|
|
logging.error("Impossible to read first-line '%s'" % msgjson['first-line'])
|
|
return
|
|
logging.debug("%s:%s" % (name, firstLine))
|
|
self.server.listEvent[name].set()
|
|
self.server.listQueueIn[name].put("LOG %d" % firstLine)
|
|
logging.debug("message envoye: %s" % (name))
|
|
try:
|
|
item = self.server.listQueueOut[name].get(timeout = 4)
|
|
except queue.Empty:
|
|
logging.debug("pas de message recu pour %s" % name)
|
|
return
|
|
self._set_headers()
|
|
self.wfile.write(bytes(item, "utf-8"))
|
|
logging.debug("item : %s" % item)
|
|
|
|
def send_list(self):
|
|
""" sub-request to list all program available """
|
|
outjson = {}
|
|
pos = 0
|
|
for program in self.server.listQueueIn:
|
|
outjson.setdefault(pos, program)
|
|
pos += 1
|
|
self._set_headers()
|
|
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
|
|
|
|
def send_shutdown(self):
|
|
""" Stop all program and stop manager """
|
|
for name in self.server.listQueueIn:
|
|
self.server.listEvent[name].set()
|
|
self.server.listQueueIn[name].put("SHUTDOWN")
|
|
self._set_headers()
|
|
outjson = {'shutdown':'ok'}
|
|
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
|
|
|
|
def send_command_all(self, action):
|
|
""" Send specific command on all program (start, stop, status)"""
|
|
outjson = {}
|
|
for name in self.server.listQueueIn:
|
|
self.server.listEvent[name].set()
|
|
self.server.listQueueIn[name].put(action)
|
|
try:
|
|
item = self.server.listQueueOut[name].get(timeout = 4)
|
|
except queue.Empty:
|
|
logging.debug("pas de message recu pour %s" % name)
|
|
return
|
|
outjson.setdefault(name, item)
|
|
self._set_headers()
|
|
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
|
|
|
|
def send_action(self):
|
|
""" send specific action on one program """
|
|
if 'content-type' in self.headers:
|
|
ctype = self.headers['content-type']
|
|
else:
|
|
ctype = 'text'
|
|
if ctype != 'application/json':
|
|
logging.error("Bad content-type")
|
|
self.send_response(400, "bad content-type")
|
|
self.end_headers()
|
|
return
|
|
try:
|
|
sizemsg = int(self.headers['content-length'])
|
|
except (TypeError, KeyError, ValueError):
|
|
logging.error("Bad content-length")
|
|
self.send_response(400, "bad content-length")
|
|
self.end_headers()
|
|
return
|
|
|
|
msg = self.rfile.read(sizemsg)
|
|
msgjson = json.loads(msg.decode())
|
|
|
|
logging.debug(msgjson)
|
|
if 'name' not in msgjson:
|
|
self.send_error(400, 'Missing param name')
|
|
logging.error("Missing param name")
|
|
return
|
|
name = msgjson['name']
|
|
if name not in self.server.listQueueIn:
|
|
self.send_error(400, 'Name unknown')
|
|
logging.error("Name unknwon '%s'" % name)
|
|
return
|
|
|
|
if 'action' not in msgjson:
|
|
self.send_error(400, 'Missing param action')
|
|
logging.error("Missing param action '%s'" % name)
|
|
return
|
|
|
|
action = ''
|
|
try:
|
|
action = msgjson['action']
|
|
except KeyError:
|
|
logging.error("Impossible to read first-line '%s'" % msgjson['action'])
|
|
self.send_error(400, 'Impossible to read action')
|
|
return
|
|
logging.debug("%s:%s" % (name, action))
|
|
self.server.listEvent[name].set()
|
|
self.server.listQueueIn[name].put("ACTION %s" % action)
|
|
logging.debug("message envoye: %s" % (name))
|
|
|
|
try:
|
|
result = self.server.listQueueOut[name].get(timeout = 4)
|
|
except queue.Empty:
|
|
logging.debug("pas de message recu pour %s" % name)
|
|
return
|
|
outjson={'state': result}
|
|
self._set_headers()
|
|
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
|
|
|
|
def send_command(self, command):
|
|
""" Send specific command on one program (start, stop, status)"""
|
|
if 'content-type' in self.headers:
|
|
ctype = self.headers['content-type']
|
|
else:
|
|
ctype = 'text'
|
|
if ctype != 'application/json':
|
|
logging.error("Bad content-type")
|
|
self.send_response(400, "Bad content-type")
|
|
self.end_headers()
|
|
return
|
|
try:
|
|
sizemsg = int(self.headers['content-length'])
|
|
except (TypeError, KeyError, ValueError):
|
|
logging.error("Bad content-length")
|
|
self.send_response(400, "Bad content-length")
|
|
self.end_headers()
|
|
return
|
|
msg = self.rfile.read(sizemsg)
|
|
msgjson = json.loads(msg.decode())
|
|
if 'name' not in msgjson:
|
|
self.send_error(400, 'Missing param name')
|
|
logging.error("Missing param name")
|
|
return
|
|
name = msgjson['name']
|
|
if name not in self.server.listQueueIn:
|
|
self.send_error(400, 'Name unknown')
|
|
logging.error("Name unknwon '%s'" % name)
|
|
return
|
|
logging.debug("[%s %s] Send command" % (command, name))
|
|
self.server.listEvent[name].set()
|
|
self.server.listQueueIn[name].put(command)
|
|
try:
|
|
result = self.server.listQueueOut[name].get(timeout = 4)
|
|
except queue.Empty:
|
|
self.send_error(500, 'Missing return')
|
|
logging.debug("[%s %s] Missing return" % (command, name))
|
|
return
|
|
logging.debug("[%s %s] => %s" % (command, name, result))
|
|
|
|
outjson={'state': result}
|
|
self._set_headers()
|
|
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
|
|
|
|
def do_GET(self): # READ
|
|
"""
|
|
Manage request READ
|
|
we can execute LOG, STATUS, LIST & STATUSALL
|
|
"""
|
|
logging.debug('get recieved : %s' % self.path)
|
|
if self.path == '/LOG':
|
|
self.command_log()
|
|
elif self.path == '/STATUS':
|
|
self.send_command("STATUS")
|
|
elif self.path == '/LIST':
|
|
self.send_list()
|
|
elif self.path == '/STATUSALL':
|
|
self.send_command_all("STATUS")
|
|
else:
|
|
self.send_error(400, 'Path unknown')
|
|
logging.error("Path unknwon '%s'" % self.path)
|
|
return
|
|
|
|
def do_POST(self): # CREATE
|
|
""" Manage request POST
|
|
currently, we execute START, STOP, ACTION, SHUTDOWN, STARTALL & STOPALL
|
|
"""
|
|
logging.debug('post recieved : %s' % self.path)
|
|
if self.path == '/START':
|
|
self.send_command("START")
|
|
elif self.path == '/STOP':
|
|
self.send_command("STOP")
|
|
elif self.path == '/ACTION':
|
|
self.send_action()
|
|
elif self.path == '/SHUTDOWN':
|
|
self.send_shutdown()
|
|
elif self.path == '/STARTALL':
|
|
self.send_command_all("START")
|
|
elif self.path == '/STOPALL':
|
|
self.send_command_all("STOP")
|
|
else:
|
|
self.send_error(400,'Path unknown')
|
|
logging.error("Path unknwon '%s'" % self.path)
|
|
return
|
|
|
|
def do_HEAD(self):
|
|
""" request HEAD received """
|
|
logging.debug('head recieved : %s' % self.path)
|
|
self.send_error(404,'File Not Found: %s' % self.path)
|
|
|
|
def do_PUT(self): # UPDATE/REPLACE
|
|
""" request PUT received """
|
|
logging.debug('put recieved!')
|
|
self.send_error(404,'File Not Found: %s' % self.path)
|
|
def do_PATCH(self): # UPDATE/MODIFY
|
|
""" request PATCH received """
|
|
logging.debug('patch recieved!')
|
|
self.send_error(404,'File Not Found: %s' % self.path)
|
|
def do_DELETE(self): # DELETE
|
|
""" request DELETE received """
|
|
logging.debug('delete recieved!')
|
|
self.send_error(404,'File Not Found: %s' % self.path)
|
|
|
|
|
|
class khaganatHTTPServer(http.server.HTTPServer):
|
|
"""
|
|
Class khaganatHTTPServer
|
|
Redefine HTTPServer (adding queue input & queue output, use by ManageHttpRequest)
|
|
"""
|
|
def __init__(self,
|
|
listQueueIn,
|
|
listQueueOut,
|
|
listEvent,
|
|
server_address,
|
|
RequestHandlerClass,
|
|
bind_and_activate=True):
|
|
http.server.HTTPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate)
|
|
self.listQueueIn = listQueueIn
|
|
self.listQueueOut = listQueueOut
|
|
self.listEvent = listEvent
|
|
|
|
class ServerHttp(multiprocessing.Process):
|
|
"""
|
|
Initialize server HTTPS
|
|
* define Dictionnary queueIn & queueOut (with key as section's name in configuration)
|
|
"""
|
|
def __init__(self, keyfile, certfile, ca_cert, address = '', port=8000):
|
|
multiprocessing.Process.__init__(self)
|
|
self.listQueueIn = {}
|
|
self.listQueueOut = {}
|
|
self.listEvent = {}
|
|
self.port = port
|
|
self.key_file = keyfile
|
|
self.cert_file = certfile
|
|
self.ca_cert = ca_cert
|
|
self.address = address
|
|
|
|
def run(self):
|
|
server_address = (self.address, self.port)
|
|
httpd = khaganatHTTPServer(self.listQueueIn,
|
|
self.listQueueOut,
|
|
self.listEvent,
|
|
server_address,
|
|
ManageHttpRequest)
|
|
if self.ca_cert:
|
|
httpd.socket = ssl.wrap_socket(httpd.socket,
|
|
keyfile = self.key_file,
|
|
certfile = self.cert_file,
|
|
ca_certs = self.ca_cert,
|
|
cert_reqs = ssl.CERT_REQUIRED,
|
|
ssl_version=ssl.PROTOCOL_TLSv1_2,
|
|
server_side = True)
|
|
else:
|
|
httpd.socket = ssl.wrap_socket (httpd.socket,
|
|
keyfile = self.key_file,
|
|
certfile = self.cert_file,
|
|
server_side = True)
|
|
logging.info('https listen')
|
|
httpd.serve_forever()
|
|
|
|
def append(self, name, queueIn, queueOut, event):
|
|
self.listQueueIn.setdefault(name, queueIn)
|
|
self.listQueueOut.setdefault(name, queueOut)
|
|
self.listEvent.setdefault(name, event)
|
|
|
|
|
|
class ManageCommand():
|
|
"""
|
|
Manage Command (only one)
|
|
* start/stop/status/get log/send an action [stdin] for command (receive order with queueIn)
|
|
* read output [in other thread]
|
|
* communicate with ManageHttpRequest (with queueOut)
|
|
"""
|
|
def __init__(self, name, command, path, logsize, bufsize, queueIn, queueOut, event):
|
|
self.process = None
|
|
self.queueIn = queueIn
|
|
self.queueOut = queueOut
|
|
self.name = name
|
|
self.command = command
|
|
self.path = path
|
|
self.log = []
|
|
self.poslastlog = 0
|
|
self.maxlog = logsize
|
|
self.event = event
|
|
self.bufsize = bufsize
|
|
self.threadRead = None
|
|
self.running = False
|
|
self.state = multiprocessing.Queue()
|
|
self.pipeIn, self.pipeOut = multiprocessing.Pipe()
|
|
self.eventRunning = threading.Event()
|
|
|
|
def read_output(self):
|
|
""" Thread to read output (stdout) """
|
|
fl = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
|
|
fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
|
|
logging.debug("Start reader %s" % self.name)
|
|
while self.eventRunning.is_set():
|
|
code = self.process.poll()
|
|
if code is not None:
|
|
logging.error("process %s down" % self.name)
|
|
self.eventRunning.clear()
|
|
continue
|
|
try:
|
|
line = self.process.stdout.readline()
|
|
except AttributeError:
|
|
logging.error("process %s down (not detected)" % self.name)
|
|
self.eventRunning.clear()
|
|
continue
|
|
if not line:
|
|
time.sleep(1)
|
|
continue
|
|
now = time.strftime('%Y/%m/%d %H:%M:%S %Z')
|
|
logging.debug("line %s " % line)
|
|
self.poslastlog += 1
|
|
while len(self.log) >= self.maxlog:
|
|
self.log.pop(0)
|
|
msg = line.decode().strip()
|
|
self.log.append(now + ' ' + msg)
|
|
logging.debug("recu: '%s'" %(msg))
|
|
logging.debug("End reader: '%s'" % self.name)
|
|
|
|
def handler(self, signum, frame):
|
|
""" Managed signal (not used) """
|
|
if self.process:
|
|
#logging.debug("Send signal %d to '%s'" %(signum, self.name))
|
|
self.process.send_signal(signum)
|
|
else:
|
|
logging.error("Impossible to send signal %d to '%s'" %(signum, self.name))
|
|
raise IOError("signal received")
|
|
|
|
def start(self):
|
|
""" Start program """
|
|
logging.debug("start %s" % (self.name))
|
|
if self.process:
|
|
logging.debug("%s already exist" % self.name)
|
|
code = self.process.poll()
|
|
if code is None:
|
|
logging.debug("%s already exist" % self.name)
|
|
return "already-started"
|
|
else:
|
|
logging.debug("%s crashed" % self.name)
|
|
code = self.process.wait()
|
|
logging.error("%s crashed (return code:%d) - restart program" % (self.name, code))
|
|
try:
|
|
self.process = subprocess.Popen(self.command.split(),
|
|
cwd = self.path,
|
|
shell=False,
|
|
bufsize=self.bufsize,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
close_fds=True)
|
|
except FileNotFoundError as e:
|
|
logging.error("Impossible to start %s (%s)" % (self.name, e))
|
|
return "crashed"
|
|
self.eventRunning.set()
|
|
if self.threadRead:
|
|
self.eventRunning.clear()
|
|
self.threadRead.join()
|
|
self.threadRead = None
|
|
self.running = True
|
|
self.threadRead = threading.Thread(target=self.read_output)
|
|
self.threadRead.start()
|
|
return "started"
|
|
|
|
def status(self):
|
|
""" Get status of program """
|
|
logging.debug("status %s" % (self.name))
|
|
if self.process:
|
|
logging.debug("status %s - check" % (self.name))
|
|
code = self.process.poll()
|
|
if code is None:
|
|
logging.debug("%s status" % (self.name))
|
|
return "started"
|
|
else:
|
|
logging.error("%s crashed (return code:%d)" % (self.name, code))
|
|
self.process = None
|
|
return "stopped"
|
|
else:
|
|
logging.debug("%s status" % (self.name))
|
|
return "stopped"
|
|
|
|
def list_thread(self):
|
|
""" List number thrad (not used) """
|
|
logging.debug('list thread')
|
|
#main_thread = threading.currentThread()
|
|
for t in threading.enumerate():
|
|
logging.debug('thread %s', t.getName())
|
|
logging.debug("id %d" % t.ident)
|
|
|
|
|
|
def stop(self):
|
|
""" Stop program """
|
|
logging.debug("stop %s" % (self.name))
|
|
if not self.process:
|
|
return "stopped"
|
|
else:
|
|
code = self.process.poll()
|
|
loop = 10
|
|
while (code is None) and (loop > 0):
|
|
logging.debug("stop process %s" , self.name)
|
|
self.process.send_signal(15)
|
|
time.sleep(1)
|
|
code = self.process.poll()
|
|
loop -= 1
|
|
|
|
loop = 10
|
|
while (code is None) and (loop > 0):
|
|
logging.debug("terminate process %s" , self.name)
|
|
self.process.terminate()
|
|
time.sleep(1)
|
|
code = self.process.poll()
|
|
loop -= 1
|
|
|
|
loop = 10
|
|
while (code is None) and (loop > 0):
|
|
logging.debug("kill process %s" , self.name)
|
|
self.process.send_signal(9)
|
|
time.sleep(1)
|
|
code = self.process.poll()
|
|
loop -= 1
|
|
|
|
code = self.process.wait()
|
|
self.process = None
|
|
if self.threadRead:
|
|
self.eventRunning.clear()
|
|
self.threadRead.join()
|
|
self.threadRead = None
|
|
logging.info("%s stopped (return code:%d)" % (self.name, code))
|
|
return "stopped"
|
|
|
|
def getlog(self, firstline):
|
|
""" Get log """
|
|
logging.debug("read log %d " % firstline)
|
|
outjson = {}
|
|
pos = self.poslastlog - len(self.log) + 1
|
|
firstlinefound = None
|
|
for line in self.log:
|
|
if pos >= firstline:
|
|
outjson.setdefault(pos, line)
|
|
if not firstlinefound:
|
|
firstlinefound = pos
|
|
pos += 1
|
|
outjson.setdefault('first-line', firstlinefound)
|
|
outjson.setdefault('last-line', pos - 1)
|
|
return json.dumps(outjson)
|
|
|
|
def action(self, action):
|
|
""" Send action to program (send input to stdin) """
|
|
logging.debug("ACTION '%s'" % action)
|
|
if self.process:
|
|
code = self.process.poll()
|
|
if code is None:
|
|
if action:
|
|
self.process.stdin.write(bytes(action +'\n', 'UTF-8'))
|
|
self.process.stdin.flush()
|
|
return "ok"
|
|
return "ko"
|
|
|
|
def run(self):
|
|
""" loop, run child (wait command) """
|
|
loop = True
|
|
while loop:
|
|
logging.debug('wait %s' % self.name)
|
|
self.event.wait()
|
|
logging.debug('received event %s' % self.name)
|
|
try:
|
|
msg = self.queueIn.get(timeout = 4)
|
|
except queue.Empty:
|
|
self.event.clear()
|
|
logging.debug("pas de message recu pour %s" % self.name)
|
|
return
|
|
logging.debug("command : '%s'" % msg)
|
|
command = msg.split()[0]
|
|
if command == "SHUTDOWN":
|
|
loop = False
|
|
continue
|
|
elif command == "START":
|
|
self.queueOut.put(self.start())
|
|
elif command == "STATUS":
|
|
self.queueOut.put(self.status())
|
|
elif command == "STOP":
|
|
self.queueOut.put(self.stop())
|
|
elif command == "ACTION":
|
|
data = msg.split(maxsplit=1)[1]
|
|
self.queueOut.put(self.action(data))
|
|
elif command == "LOG":
|
|
try:
|
|
firstline = int(msg.split(maxsplit=1)[1])
|
|
except ValueError:
|
|
firstline = 0
|
|
self.queueOut.put(self.getlog(firstline))
|
|
else:
|
|
self.queueOut.put("error : command unknown")
|
|
self.event.clear()
|
|
self.stop()
|
|
self.event.clear()
|
|
logging.debug('end')
|
|
|
|
|
|
class Manager():
|
|
"""
|
|
Manage all services
|
|
(read configuration, launch ManageCommand & launch ServerHttp & wait the end)
|
|
* https service
|
|
* all child to manage (it start ManageCommand by command define in configuration)
|
|
|
|
"""
|
|
def __init__(self, filecfg, launch_program):
|
|
self.threadCommand = []
|
|
self.command = []
|
|
self.launch_program = launch_program
|
|
self.param = {}
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read_file(filecfg)
|
|
logging.debug("Sections :%s" % config.sections())
|
|
for name in config.sections():
|
|
if name == 'config':
|
|
logging.debug("read config '%s'" % name)
|
|
try:
|
|
self.port = int(config[name]['port'])
|
|
except (TypeError, KeyError, ValueError):
|
|
self.port = 8000
|
|
try:
|
|
self.address = config[name]['address']
|
|
except (TypeError, KeyError):
|
|
self.address = ''
|
|
try:
|
|
self.keyfile = config[name]['keyfile']
|
|
except (TypeError, KeyError):
|
|
self.keyfile = 'crt/key.pem'
|
|
try:
|
|
self.certfile = config[name]['certfile']
|
|
except (TypeError, KeyError):
|
|
self.certfile = 'crt/cert.pem'
|
|
try:
|
|
self.ca_cert = config[name]['ca_cert']
|
|
except (TypeError, KeyError):
|
|
self.ca_cert = 'crt/ca_cert.crt'
|
|
elif 'command' in config[name]:
|
|
logging.debug("read command '%s'" % name)
|
|
if 'path' in config[name]:
|
|
path = config[name]['path']
|
|
else:
|
|
path = None
|
|
if 'logsize' in config[name]:
|
|
try:
|
|
logsize = int(config[name]['logsize'])
|
|
except (TypeError, KeyError, ValueError):
|
|
logsize = 100
|
|
logging.warning("Impossible to read param logsize (command:%s)", name)
|
|
else:
|
|
logsize = 100
|
|
if 'bufsize' in config[name]:
|
|
try:
|
|
bufsize = int(config[name]['bufsize'])
|
|
except (TypeError, KeyError, ValueError):
|
|
bufsize = 100
|
|
logging.warning("Impossible to read param bufsize (command:%s)", name)
|
|
else:
|
|
bufsize = 100
|
|
self.param.setdefault(name, {'command': config[name]['command'], 'path': path, 'logsize': logsize, 'bufsize': bufsize})
|
|
|
|
self.serverHttp = ServerHttp(self.keyfile, self.certfile, self.ca_cert, self.address, self.port)
|
|
if filecfg is None:
|
|
raise ValueError
|
|
|
|
def runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, event):
|
|
"""
|
|
Thread to manage khaganat program
|
|
"""
|
|
logging.debug("Initialize '%s'" % name)
|
|
manageCommand = ManageCommand(name=name,
|
|
command=command,
|
|
path=path,
|
|
logsize=logsize,
|
|
bufsize=bufsize,
|
|
queueIn=queueIn,
|
|
queueOut=queueOut,
|
|
event=event)
|
|
manageCommand.run()
|
|
|
|
def launch_server_http(self):
|
|
""" Launch server https """
|
|
self.serverHttp.daemon = True
|
|
self.serverHttp .start()
|
|
|
|
def launch_command(self):
|
|
""" Launch child to manage each program """
|
|
for name in self.param:
|
|
logging.debug("Initialize '%s'" % name)
|
|
queueIn = multiprocessing.Queue()
|
|
queueOut = multiprocessing.Queue()
|
|
event = multiprocessing.Event()
|
|
self.serverHttp.append(name, queueIn, queueOut, event)
|
|
threadCommand = multiprocessing.Process(target=self.runCommand,
|
|
args=(name,
|
|
self.param[name]['command'],
|
|
self.param[name]['path'],
|
|
self.param[name]['logsize'],
|
|
self.param[name]['bufsize'],
|
|
queueIn,
|
|
queueOut,
|
|
event))
|
|
threadCommand.start()
|
|
if self.launch_program:
|
|
event.set()
|
|
queueIn.put("START")
|
|
try:
|
|
item = queueOut.get(timeout = 4)
|
|
except queue.Empty:
|
|
item = ""
|
|
logging.debug("pas de message recu pour %s" % name)
|
|
return
|
|
logging.info("%s => %s" % (name, item))
|
|
self.threadCommand.append(threadCommand)
|
|
|
|
|
|
def receive_signal(self, signum, frame):
|
|
""" Managed signal """
|
|
for child in self.threadCommand:
|
|
child.terminate()
|
|
if self.serverHttp:
|
|
self.serverHttp.terminate()
|
|
|
|
def wait_children_commands(self):
|
|
for child in self.threadCommand:
|
|
child.join()
|
|
|
|
def wait_child_server_http(self):
|
|
self.serverHttp.terminate()
|
|
self.serverHttp.join()
|
|
|
|
def run(self):
|
|
""" launch all """
|
|
self.launch_command()
|
|
self.launch_server_http()
|
|
logging.info('started')
|
|
self.wait_children_commands()
|
|
logging.info('end')
|
|
signal.alarm(0)
|
|
logging.info('wait thread http')
|
|
time.sleep(1)
|
|
self.wait_child_server_http()
|
|
logging.info('end')
|
|
|
|
|
|
def main(filecfg, fileLog, logLevel, launch_program, show_log_console):
|
|
""" Main function """
|
|
# Manage log
|
|
logging.getLogger('logging')
|
|
numeric_level = getattr(logging, logLevel.upper(), None)
|
|
if not isinstance(numeric_level, int):
|
|
raise ValueError('Invalid log level: %s' % logLevel)
|
|
handlers=[]
|
|
if show_log_console:
|
|
handlers.append(logging.StreamHandler())
|
|
if fileLog:
|
|
handlers.append(logging.FileHandler(fileLog.name))
|
|
logging.basicConfig(handlers=handlers, level=numeric_level,
|
|
format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s')
|
|
if filecfg is None:
|
|
logging.error("Missing configuration file")
|
|
raise ValueError
|
|
manager = Manager(filecfg, launch_program)
|
|
manager.run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description='Manage khaganat process')
|
|
parser.add_argument('--version', action='version', version='%(prog)s 1.0')
|
|
parser.add_argument('-c', '--conf', type=argparse.FileType('r'),
|
|
default='khaganat.cfg', help='configuration file')
|
|
parser.add_argument( '--show-log-console', action='store_true',
|
|
help='show message in console', default=False)
|
|
parser.add_argument('--filelog', type=argparse.FileType('wt'),
|
|
default=None, help='log file')
|
|
parser.add_argument('--log',
|
|
default='INFO', help='log level [DEBUG, INFO, WARNING, ERROR')
|
|
parser.add_argument( '--launch-program', action='store_true',
|
|
help='launch program when start manager', default=False)
|
|
args = parser.parse_args()
|
|
main(filecfg=args.conf,
|
|
fileLog=args.filelog,
|
|
logLevel=args.log,
|
|
launch_program=args.launch_program,
|
|
show_log_console=args.show_log_console)
|