#!/usr/bin/python3 # -*- coding: utf-8 -*- # # create certificate (use for test) # Copyright (C) 2017 AleaJactaEst # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import unittest import tempfile import os import configparser from unittest.mock import patch try: import pymanager.client as Client except ImportError: import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import pymanager.client as Client try: import pymanager.certificate as cert except ImportError: import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import pymanager.certificate as cert class TestClient(unittest.TestCase): def setUp(self): pass def test_cmp_to_key_correct(self): list = [1, 1] for key in sorted(list, key=Client.cmp_to_key()): pass def test_cmp_to_key_bad_value(self): list = ['A', '1'] for key in sorted(list, key=Client.cmp_to_key()): pass def test_cmp_to_key_all_int(self): mref = Client.cmp_to_key() m1 = mref(1) m2 = mref(2) if m1 == m2: pass if m1 >= m2: pass if m1 <= m2: pass if m1 < m2: pass if m1 > m2: pass if m1 != m2: pass def test_cmp_to_key_all_string(self): mref = Client.cmp_to_key() m1 = mref('a') m2 = mref('a') if m1 == m2: pass if m1 >= m2: pass if m1 <= m2: pass if m1 < m2: pass if m1 > m2: pass if m1 != m2: pass def test_load_config_file(self): cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t') cfgfile.write('#\n[config:server]\nauthentification = No\n') cfgfile.flush() client = Client.Client(None, None, None) client.load_config(cfgfile) def test_load_config_file_none(self): client = Client.Client(None, None, None) with self.assertRaises(ValueError): client.load_config(None) def test_load_config(self): config = configparser.ConfigParser() config.add_section('config:server') config.set('config:server', 'port', '8000') config.set('config:server', 'keyfile', '/home/gameserver/ca/appli/private/serverkey.pem') config.set('config:server', 'certfile', '/home/gameserver/ca/appli/certs/servercert.pem') config.set('config:server', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem') config.set('config:server', 'address', '') config.set('config:server', 'authentification', 'yes') config.add_section('config:client') config.set('config:client', 'port', '8000') config.set('config:client', 'keyfile', '/home/gameserver/ca/appli/private/clientkey.pem') config.set('config:client', 'certfile', '/home/gameserver/ca/appli/certs/clientcert.pem') config.set('config:client', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem') config.set('config:client', 'address', '127.0.0.1') config.add_section('command:test') config.set('command:test', 'path', '/home/gameserver') config.set('command:test', 'command', '/bin/sleep 10') config.set('command:test', 'logsize', '10') config.set('command:test', 'bufsize', '10') config.add_section('config:user') config.set('config:user', 'usename', 'filter_all, filter_admin') try: client = Client.Client(None, None, None) client._load_config(config) self.assertTrue(True) except: self.fail('Error detected on load config') def test_load_config_empty(self): config = configparser.ConfigParser() config.add_section('config:server') config.set('config:server', 'authentification', 'yes') config.add_section('config:client') #config.set('config:client', 'port', '8000') config.add_section('command:test') config.set('command:test', 'path', '/home/gameserver') try: client = Client.Client(None, None, None) client._load_config(config) self.assertTrue(True) except: self.fail('Error detected on load config') def test_load_config_bad_param_port(self): config = configparser.ConfigParser() config.add_section('config:server') config.add_section('config:client') config.set('config:client', 'port', 'A') client = Client.Client(None, None, None) with self.assertRaises(Exception): client._load_config(config) def test_init_client(self): Client.Client('username', True, 'password') def test_init_client_missing_password(self): with self.assertRaises(Exception): Client.Client('username', True, None) def test_init_client_stdin_password_but_send_param(self): with self.assertRaises(Exception): Client.Client('username', False, 'password') @patch("getpass.getpass") def test_init_client_stdin_password(self, getpass): getpass.return_value = "password" Client.Client('username', False, None) def test_https_init(self): Client.HTTPSConnectionCertificate(None, None, None) @patch("socket.create_connection") @patch("ssl.wrap_socket") def test_https_connection(self, socket, ssl): #socket.return_value = "" #ssl.return_value = "" https = Client.HTTPSConnectionCertificate(None, None, None) https.connect() @patch("socket.create_connection") @patch("ssl.wrap_socket") def test_https_connection_with_ca(self, socket, ssl): #socket.return_value = "" #ssl.return_value = "" https = Client.HTTPSConnectionCertificate(None, None, 'ca') https.connect() def test_client_load_config(self): #workdir = tempfile.mkdtemp(prefix='test_client_send_json') workdir_cert_root = tempfile.mkdtemp(prefix='pymanager-certificate-root-') workdir_cert_appli = tempfile.mkdtemp(prefix='pymanager-certificate-application-') args=['--workdir-cert-root', workdir_cert_root, '--workdir-cert-appli', workdir_cert_appli] cert.main(args) config = configparser.ConfigParser() config.add_section('config:server') config.set('config:server', 'port', '8000') config.set('config:server', 'keyfile', os.path.join(workdir_cert_appli, 'private', 'serverkey.pem')) config.set('config:server', 'certfile', os.path.join(workdir_cert_appli, 'certs', 'servercert.pem')) config.set('config:server', 'ca_cert', os.path.join(workdir_cert_appli, 'certs', 'cachaincert.pem')) config.add_section('config:client') config.set('config:client', 'port', '8000') config.set('config:client', 'keyfile', os.path.join(workdir_cert_appli, 'private', 'clientkey.pem')) config.set('config:client', 'certfile', os.path.join(workdir_cert_appli, 'certs', 'clientcert.pem')) config.set('config:client', 'ca_cert', os.path.join(workdir_cert_appli, 'certs', 'cachaincert.pem')) config.set('config:client', 'address', '127.0.0.1') config.add_section('config:user') config.set('config:user', 'usename', 'filter_all, filter_admin') try: client = Client.Client(None, None, None) client._load_config(config) self.assertTrue(True) except: self.fail('Error detected on load config') @patch.object(Client.Client, 'send_json') def test_client_root(self, send_json): cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t') cfgfile.write('#\n[config:client]\port = 8000\n') cfgfile.flush() Client.root(command='STATUS', program='ALL', stdin="", firstline="", fileLog="", logLevel="INFO", username=None, password_comand_line=False, password=None, show_log_console=False, raw_data=False, remove_color=False, filecfg=cfgfile) @patch.object(Client.Client, 'send_json') def test_client_root_2(self, send_json): cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t') cfgfile.write('#\n[config:client]\port = 8000\n') cfgfile.flush() Client.root(command='START', program='ALL', stdin="", firstline="", fileLog="", logLevel="INFO", username=None, password_comand_line=False, password=None, show_log_console=False, raw_data=False, remove_color=False, filecfg=cfgfile) @patch.object(Client.Client, 'send_json') def test_client_root_3(self, send_json): cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t') cfgfile.write('#\n[config:client]\port = 8000\n') cfgfile.flush() Client.root(command='STDIN', program='ALL', stdin="", firstline="", fileLog="", logLevel="INFO", username=None, password_comand_line=False, password=None, show_log_console=False, raw_data=False, remove_color=False, filecfg=cfgfile) @patch.object(Client.Client, 'send_json') def test_client_root_4(self, send_json): cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t') cfgfile.write('#\n[config:client]\port = 8000\n') cfgfile.flush() Client.root(command='STDOUT', program='ALL', stdin="", firstline="", fileLog="", logLevel="INFO", username=None, password_comand_line=False, password=None, show_log_console=False, raw_data=False, remove_color=False, filecfg=cfgfile) @patch.object(Client.Client, 'send_json') def test_client_root_5(self, send_json): cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t') cfgfile.write('#\n[config:client]\port = 8000\n') cfgfile.flush() Client.root(command='LIST', program='ALL', stdin="", firstline="", fileLog="", logLevel="INFO", username=None, password_comand_line=False, password=None, show_log_console=False, raw_data=False, remove_color=False, filecfg=cfgfile) @patch.object(Client.Client, 'send_json') def test_client_root_6(self, send_json): cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t') cfgfile.write('#\n[config:client]\port = 8000\n') cfgfile.flush() Client.root(command='STATUSALL', program='ALL', stdin="", firstline="", fileLog="", logLevel="INFO", username=None, password_comand_line=False, password=None, show_log_console=False, raw_data=False, remove_color=False, filecfg=cfgfile) @patch.object(Client.Client, 'send_json') def test_client_root_7(self, send_json): cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t') cfgfile.write('#\n[config:client]\port = 8000\n') cfgfile.flush() Client.root(command='SHUTDOWN', program='ALL', stdin="", firstline="", fileLog="", logLevel="INFO", username=None, password_comand_line=False, password=None, show_log_console=False, raw_data=False, remove_color=False, filecfg=cfgfile) @patch.object(Client.Client, 'send_json') def test_client_root_8(self, send_json): cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t') logfile = tempfile.NamedTemporaryFile(suffix="logfile", mode='w+t') cfgfile.write('#\n[config:client]\port = 8000\n') cfgfile.flush() logfile.flush() Client.root(command='SHUTDOWN', program='ALL', stdin="", firstline="", fileLog=logfile, logLevel="INFO", username=None, password_comand_line=False, password=None, show_log_console=True, raw_data=False, remove_color=False, filecfg=cfgfile) @patch.object(Client.Client, 'send_json') def test_client_root_9(self, send_json): cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t') cfgfile.write('#\n[config:client]\port = 8000\n') cfgfile.flush() with self.assertRaises(ValueError): Client.root(command='SHUTDOWN', program='ALL', stdin="", firstline="", fileLog="", logLevel="BADLEVEL", username=None, password_comand_line=False, password=None, show_log_console=False, raw_data=False, remove_color=False, filecfg=cfgfile) def test_main(self): config = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t') config.write('[config:server]\nauthentification=no\n') config.flush() Client.main(['--conf=' + config.name]) if __name__ == '__main__': unittest.main()