#!/usr/bin/python # -*- coding: utf-8 -*- # This file is part of Cockpit. # # Copyright (C) 2017 Red Hat, Inc. # # Cockpit is free software; you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or # (at your option) any later version. # # Cockpit is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Cockpit; If not, see . ADDRESS = ("127.0.0.8", 9898) import ctypes import fnmatch import httplib import json import os import signal import shutil import socket import subprocess import sys import tempfile import time import unittest BASE = os.path.dirname(__file__) DEVNULL = open("/dev/null", "r+") import cache import github def mockServer(): # Data used by the above handler data = { "count": 0, } import BaseHTTPServer class Handler(BaseHTTPServer.BaseHTTPRequestHandler): def replyData(self, value, headers={ }, status=200): self.send_response(status) for name, content in headers.items(): self.send_header(name, content) self.end_headers() self.wfile.write(value) self.wfile.close() def replyJson(self, value, headers={ }, status=200): headers["Content-type"] = "application/json" self.server.data["count"] += 1 self.replyData(json.dumps(value), headers=headers, status=status) def do_GET(self): if self.path == "/count": self.replyJson(self.server.data["count"]) elif self.path == "/test/user": if self.headers.getheader("If-None-Match") == "blah": self.replyData("", status=304) else: self.replyJson({ "user": "blah" }, headers={ "ETag": "blah" }) elif self.path == "/test/user/modified": if self.headers.getheader("If-Modified-Since") == "Thu, 05 Jul 2012 15:31:30 GMT": self.replyData("", status=304) else: self.replyJson({ "user": "blah" }, headers={ "Last-Modified": "Thu, 05 Jul 2012 15:31:30 GMT" }) else: self.send_error(404, 'Mock Not Found: ' + self.path) httpd = BaseHTTPServer.HTTPServer(ADDRESS, Handler) httpd.data = data child = os.fork() if child != 0: return child # prctl(PR_SET_PDEATHSIG, SIGTERM) try: libc = ctypes.CDLL('libc.so.6') libc.prctl(1, 15) except OSError: pass httpd.serve_forever() os._exit(1) def mockKill(child): os.kill(child, signal.SIGTERM) os.waitpid(child, 0) class TestGitHub(unittest.TestCase): def setUp(self): self.child = mockServer() self.temp = tempfile.mkdtemp() def tearDown(self): mockKill(self.child) shutil.rmtree(self.temp) def testCache(self): api = github.GitHub("http://127.0.0.8:9898", cacher=cache.Cache(self.temp)) values = api.get("/test/user") cached = api.get("/test/user") self.assertEqual(json.dumps(values), json.dumps(cached)) count = api.get("/count") self.assertEqual(count, 1) def testLog(self): api = github.GitHub("http://127.0.0.8:9898", cacher=cache.Cache(self.temp)) api.get("/test/user") api.get("/test/user") expect = '127.0.0.8:9898 - - * "GET /test/user HTTP/1.1" 200 -\n' + \ '127.0.0.8:9898 - - * "GET /test/user HTTP/1.1" 304 -\n' hostname = socket.gethostname().split(".")[0] month = time.strftime("%Y%m") with open(os.path.join(self.temp, "{0}-{1}.log".format(hostname, month))) as f: log = f.read() match = fnmatch.fnmatch(log, expect) if not match: self.fail("'{0}' did not match '{1}'".format(log, expect)) class TestWhitelist(unittest.TestCase): def setUp(self): self.old_home = os.environ.get("HOME") self.temp = tempfile.mkdtemp() os.environ["HOME"] = self.temp def tearDown(self): shutil.rmtree(self.temp) os.environ["HOME"] = self.old_home def testLoad(self): filename = os.path.join(self.temp, "whitelist") with open(filename, "w") as f: f.write(" one \n two\nthree\n\n") whitelist = github.whitelist(filename) self.assertEqual(set(github.whitelist(filename)), set(["one", "two", "three"])) self.assertNotIn("four", whitelist) self.assertNotIn("", whitelist) def testDefault(self): whitelist = github.whitelist() self.assertTrue(len(whitelist) > 0) def testNotPresent(self): filename = os.path.join(self.temp, "whitelist") whitelist = github.whitelist(filename) self.assertEqual(len(whitelist), 0) if __name__ == '__main__': unittest.main()