mirror of
https://github.com/classilla/tenfourfox.git
synced 2024-07-01 14:29:41 +00:00
1028 lines
40 KiB
Python
1028 lines
40 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
import argparse
|
||
|
import json
|
||
|
import optparse
|
||
|
import os
|
||
|
import StringIO
|
||
|
import sys
|
||
|
import unittest
|
||
|
import signal
|
||
|
import xml.etree.ElementTree as ET
|
||
|
|
||
|
import mozfile
|
||
|
|
||
|
from mozlog import (
|
||
|
commandline,
|
||
|
reader,
|
||
|
structuredlog,
|
||
|
stdadapter,
|
||
|
handlers,
|
||
|
formatters,
|
||
|
)
|
||
|
|
||
|
|
||
|
class TestHandler(object):
|
||
|
def __init__(self):
|
||
|
self.items = []
|
||
|
|
||
|
def __call__(self, data):
|
||
|
self.items.append(data)
|
||
|
|
||
|
@property
|
||
|
def last_item(self):
|
||
|
return self.items[-1]
|
||
|
|
||
|
@property
|
||
|
def empty(self):
|
||
|
return not self.items
|
||
|
|
||
|
|
||
|
class BaseStructuredTest(unittest.TestCase):
|
||
|
def setUp(self):
|
||
|
self.logger = structuredlog.StructuredLogger("test")
|
||
|
self.handler = TestHandler()
|
||
|
self.logger.add_handler(self.handler)
|
||
|
|
||
|
def pop_last_item(self):
|
||
|
return self.handler.items.pop()
|
||
|
|
||
|
def assert_log_equals(self, expected, actual=None):
|
||
|
if actual is None:
|
||
|
actual = self.pop_last_item()
|
||
|
|
||
|
all_expected = {"pid": os.getpid(),
|
||
|
"thread": "MainThread",
|
||
|
"source": "test"}
|
||
|
specials = set(["time"])
|
||
|
|
||
|
all_expected.update(expected)
|
||
|
for key, value in all_expected.iteritems():
|
||
|
self.assertEqual(actual[key], value)
|
||
|
|
||
|
self.assertEquals(set(all_expected.keys()) | specials, set(actual.keys()))
|
||
|
|
||
|
|
||
|
class TestStatusHandler(BaseStructuredTest):
|
||
|
def setUp(self):
|
||
|
super(TestStatusHandler, self).setUp()
|
||
|
self.handler = handlers.StatusHandler()
|
||
|
self.logger.add_handler(self.handler)
|
||
|
|
||
|
def test_failure_run(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_status("test1", "sub1", status='PASS')
|
||
|
self.logger.test_status("test1", "sub2", status='TIMEOUT')
|
||
|
self.logger.test_end("test1", status='OK')
|
||
|
self.logger.suite_end()
|
||
|
summary = self.handler.summarize()
|
||
|
self.assertIn('TIMEOUT', summary.unexpected_statuses)
|
||
|
self.assertEqual(1, summary.unexpected_statuses['TIMEOUT'])
|
||
|
self.assertIn('PASS', summary.expected_statuses)
|
||
|
self.assertEqual(1, summary.expected_statuses['PASS'])
|
||
|
self.assertIn('OK', summary.expected_statuses)
|
||
|
self.assertEqual(1, summary.expected_statuses['OK'])
|
||
|
self.assertEqual(2, summary.action_counts['test_status'])
|
||
|
self.assertEqual(1, summary.action_counts['test_end'])
|
||
|
|
||
|
def test_error_run(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.error("ERRR!")
|
||
|
self.logger.test_end("test1", status='OK')
|
||
|
self.logger.test_start("test2")
|
||
|
self.logger.test_end("test2", status='OK')
|
||
|
self.logger.suite_end()
|
||
|
summary = self.handler.summarize()
|
||
|
self.assertIn('ERROR', summary.log_level_counts)
|
||
|
self.assertEqual(1, summary.log_level_counts['ERROR'])
|
||
|
self.assertIn('OK', summary.expected_statuses)
|
||
|
self.assertEqual(2, summary.expected_statuses['OK'])
|
||
|
|
||
|
|
||
|
class TestStructuredLog(BaseStructuredTest):
|
||
|
def test_suite_start(self):
|
||
|
self.logger.suite_start(["test"])
|
||
|
self.assert_log_equals({"action": "suite_start",
|
||
|
"tests":["test"]})
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_suite_end(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.suite_end()
|
||
|
self.assert_log_equals({"action": "suite_end"})
|
||
|
|
||
|
def test_start(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.assert_log_equals({"action": "test_start",
|
||
|
"test":"test1"})
|
||
|
|
||
|
self.logger.test_start(("test1", "==", "test1-ref"), path="path/to/test")
|
||
|
self.assert_log_equals({"action": "test_start",
|
||
|
"test":("test1", "==", "test1-ref"),
|
||
|
"path": "path/to/test"})
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_start_inprogress(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_start("test1")
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"message": "test_start for test1 logged while in progress.",
|
||
|
"level": "ERROR"})
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_status(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_status("test1", "subtest name", "fail", expected="FAIL", message="Test message")
|
||
|
self.assert_log_equals({"action": "test_status",
|
||
|
"subtest": "subtest name",
|
||
|
"status": "FAIL",
|
||
|
"message": "Test message",
|
||
|
"test":"test1"})
|
||
|
self.logger.test_end("test1", "OK")
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_status_1(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_status("test1", "subtest name", "fail")
|
||
|
self.assert_log_equals({"action": "test_status",
|
||
|
"subtest": "subtest name",
|
||
|
"status": "FAIL",
|
||
|
"expected": "PASS",
|
||
|
"test":"test1"})
|
||
|
self.logger.test_end("test1", "OK")
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_status_2(self):
|
||
|
self.assertRaises(ValueError, self.logger.test_status, "test1", "subtest name", "XXXUNKNOWNXXX")
|
||
|
|
||
|
def test_status_extra(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_status("test1", "subtest name", "FAIL", expected="PASS", extra={"data": 42})
|
||
|
self.assert_log_equals({"action": "test_status",
|
||
|
"subtest": "subtest name",
|
||
|
"status": "FAIL",
|
||
|
"expected": "PASS",
|
||
|
"test": "test1",
|
||
|
"extra": {"data":42}
|
||
|
})
|
||
|
self.logger.test_end("test1", "OK")
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_status_stack(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_status("test1", "subtest name", "FAIL", expected="PASS", stack="many\nlines\nof\nstack")
|
||
|
self.assert_log_equals({"action": "test_status",
|
||
|
"subtest": "subtest name",
|
||
|
"status": "FAIL",
|
||
|
"expected": "PASS",
|
||
|
"test": "test1",
|
||
|
"stack": "many\nlines\nof\nstack"
|
||
|
})
|
||
|
self.logger.test_end("test1", "OK")
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_status_not_started(self):
|
||
|
self.logger.test_status("test_UNKNOWN", "subtest", "PASS")
|
||
|
self.assertTrue(self.pop_last_item()["message"].startswith(
|
||
|
"test_status for test_UNKNOWN logged while not in progress. Logged with data: {"))
|
||
|
|
||
|
def test_end(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_end("test1", "fail", message="Test message")
|
||
|
self.assert_log_equals({"action": "test_end",
|
||
|
"status": "FAIL",
|
||
|
"expected": "OK",
|
||
|
"message": "Test message",
|
||
|
"test":"test1"})
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_end_1(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_end("test1", "PASS", expected="PASS", extra={"data":123})
|
||
|
self.assert_log_equals({"action": "test_end",
|
||
|
"status": "PASS",
|
||
|
"extra": {"data": 123},
|
||
|
"test":"test1"})
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_end_2(self):
|
||
|
self.assertRaises(ValueError, self.logger.test_end, "test1", "XXXUNKNOWNXXX")
|
||
|
|
||
|
def test_end_stack(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_end("test1", "PASS", expected="PASS", stack="many\nlines\nof\nstack")
|
||
|
self.assert_log_equals({"action": "test_end",
|
||
|
"status": "PASS",
|
||
|
"test": "test1",
|
||
|
"stack": "many\nlines\nof\nstack"
|
||
|
})
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_end_no_start(self):
|
||
|
self.logger.test_end("test1", "PASS", expected="PASS")
|
||
|
self.assertTrue(self.pop_last_item()["message"].startswith(
|
||
|
"test_end for test1 logged while not in progress. Logged with data: {"))
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_end_twice(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test2")
|
||
|
self.logger.test_end("test2", "PASS", expected="PASS")
|
||
|
self.assert_log_equals({"action": "test_end",
|
||
|
"status": "PASS",
|
||
|
"test": "test2"})
|
||
|
self.logger.test_end("test2", "PASS", expected="PASS")
|
||
|
last_item = self.pop_last_item()
|
||
|
self.assertEquals(last_item["action"], "log")
|
||
|
self.assertEquals(last_item["level"], "ERROR")
|
||
|
self.assertTrue(last_item["message"].startswith(
|
||
|
"test_end for test2 logged while not in progress. Logged with data: {"))
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_suite_start_twice(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.assert_log_equals({"action": "suite_start",
|
||
|
"tests": []})
|
||
|
self.logger.suite_start([])
|
||
|
last_item = self.pop_last_item()
|
||
|
self.assertEquals(last_item["action"], "log")
|
||
|
self.assertEquals(last_item["level"], "ERROR")
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_suite_end_no_start(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.assert_log_equals({"action": "suite_start",
|
||
|
"tests": []})
|
||
|
self.logger.suite_end()
|
||
|
self.assert_log_equals({"action": "suite_end"})
|
||
|
self.logger.suite_end()
|
||
|
last_item = self.pop_last_item()
|
||
|
self.assertEquals(last_item["action"], "log")
|
||
|
self.assertEquals(last_item["level"], "ERROR")
|
||
|
|
||
|
def test_multiple_loggers_suite_start(self):
|
||
|
logger1 = structuredlog.StructuredLogger("test")
|
||
|
self.logger.suite_start([])
|
||
|
logger1.suite_start([])
|
||
|
last_item = self.pop_last_item()
|
||
|
self.assertEquals(last_item["action"], "log")
|
||
|
self.assertEquals(last_item["level"], "ERROR")
|
||
|
|
||
|
def test_multiple_loggers_test_start(self):
|
||
|
logger1 = structuredlog.StructuredLogger("test")
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test")
|
||
|
logger1.test_start("test")
|
||
|
last_item = self.pop_last_item()
|
||
|
self.assertEquals(last_item["action"], "log")
|
||
|
self.assertEquals(last_item["level"], "ERROR")
|
||
|
|
||
|
def test_process(self):
|
||
|
self.logger.process_output(1234, "test output")
|
||
|
self.assert_log_equals({"action": "process_output",
|
||
|
"process": "1234",
|
||
|
"data": "test output"})
|
||
|
|
||
|
def test_process_start(self):
|
||
|
self.logger.process_start(1234)
|
||
|
self.assert_log_equals({"action": "process_start",
|
||
|
"process": "1234"})
|
||
|
|
||
|
def test_process_exit(self):
|
||
|
self.logger.process_exit(1234, 0)
|
||
|
self.assert_log_equals({"action": "process_exit",
|
||
|
"process": "1234",
|
||
|
"exitcode": 0})
|
||
|
|
||
|
def test_log(self):
|
||
|
for level in ["critical", "error", "warning", "info", "debug"]:
|
||
|
getattr(self.logger, level)("message")
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": level.upper(),
|
||
|
"message": "message"})
|
||
|
|
||
|
def test_logging_adapter(self):
|
||
|
import logging
|
||
|
logging.basicConfig(level="DEBUG")
|
||
|
old_level = logging.root.getEffectiveLevel()
|
||
|
logging.root.setLevel("DEBUG")
|
||
|
|
||
|
std_logger = logging.getLogger("test")
|
||
|
std_logger.setLevel("DEBUG")
|
||
|
|
||
|
logger = stdadapter.std_logging_adapter(std_logger)
|
||
|
|
||
|
try:
|
||
|
for level in ["critical", "error", "warning", "info", "debug"]:
|
||
|
getattr(logger, level)("message")
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": level.upper(),
|
||
|
"message": "message"})
|
||
|
finally:
|
||
|
logging.root.setLevel(old_level)
|
||
|
|
||
|
def test_add_remove_handlers(self):
|
||
|
handler = TestHandler()
|
||
|
self.logger.add_handler(handler)
|
||
|
self.logger.info("test1")
|
||
|
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "INFO",
|
||
|
"message": "test1"})
|
||
|
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "INFO",
|
||
|
"message": "test1"}, actual=handler.last_item)
|
||
|
|
||
|
self.logger.remove_handler(handler)
|
||
|
self.logger.info("test2")
|
||
|
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "INFO",
|
||
|
"message": "test2"})
|
||
|
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "INFO",
|
||
|
"message": "test1"}, actual=handler.last_item)
|
||
|
|
||
|
def test_wrapper(self):
|
||
|
file_like = structuredlog.StructuredLogFileLike(self.logger)
|
||
|
|
||
|
file_like.write("line 1")
|
||
|
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "INFO",
|
||
|
"message": "line 1"})
|
||
|
|
||
|
file_like.write("line 2\n")
|
||
|
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "INFO",
|
||
|
"message": "line 2"})
|
||
|
|
||
|
file_like.write("line 3\r")
|
||
|
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "INFO",
|
||
|
"message": "line 3"})
|
||
|
|
||
|
file_like.write("line 4\r\n")
|
||
|
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "INFO",
|
||
|
"message": "line 4"})
|
||
|
|
||
|
|
||
|
class TestTypeConversions(BaseStructuredTest):
|
||
|
def test_raw(self):
|
||
|
self.logger.log_raw({"action":"suite_start", "tests":[1], "time": "1234"})
|
||
|
self.assert_log_equals({"action": "suite_start",
|
||
|
"tests":["1"],
|
||
|
"time": 1234})
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_tuple(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start(("\xf0\x90\x8d\x84\xf0\x90\x8c\xb4\xf0\x90\x8d\x83\xf0\x90\x8d\x84", 42, u"\u16a4"))
|
||
|
self.assert_log_equals({"action": "test_start",
|
||
|
"test": (u'\U00010344\U00010334\U00010343\U00010344', u"42", u"\u16a4")})
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_non_string_messages(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.info(1)
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"message": "1",
|
||
|
"level": "INFO"})
|
||
|
self.logger.info([1, (2, '3'), "s", "s" + chr(255)])
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"message": "[1, (2, '3'), 's', 's\\xff']",
|
||
|
"level": "INFO"})
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_utf8str_write(self):
|
||
|
with mozfile.NamedTemporaryFile() as logfile:
|
||
|
_fmt = formatters.TbplFormatter()
|
||
|
_handler = handlers.StreamHandler(logfile, _fmt)
|
||
|
self.logger.add_handler(_handler)
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.info("☺")
|
||
|
logfile.seek(0)
|
||
|
data = logfile.readlines()[-1].strip()
|
||
|
self.assertEquals(data, "☺")
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_arguments(self):
|
||
|
self.logger.info(message="test")
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"message": "test",
|
||
|
"level": "INFO"})
|
||
|
|
||
|
self.logger.suite_start([], {})
|
||
|
self.assert_log_equals({"action": "suite_start",
|
||
|
"tests": [],
|
||
|
"run_info": {}})
|
||
|
self.logger.test_start(test="test1")
|
||
|
self.logger.test_status("subtest1", "FAIL", test="test1", status="PASS")
|
||
|
self.assert_log_equals({"action": "test_status",
|
||
|
"test": "test1",
|
||
|
"subtest": "subtest1",
|
||
|
"status": "PASS",
|
||
|
"expected": "FAIL"})
|
||
|
self.logger.process_output(123, "data", "test")
|
||
|
self.assert_log_equals({"action": "process_output",
|
||
|
"process": "123",
|
||
|
"command": "test",
|
||
|
"data": "data"})
|
||
|
self.assertRaises(TypeError, self.logger.test_status, subtest="subtest2",
|
||
|
status="FAIL", expected="PASS")
|
||
|
self.assertRaises(TypeError, self.logger.test_status, "test1", "subtest1",
|
||
|
"PASS", "FAIL", "message", "stack", {}, "unexpected")
|
||
|
self.assertRaises(TypeError, self.logger.test_status, "test1", test="test2")
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
|
||
|
class TestComponentFilter(BaseStructuredTest):
|
||
|
def test_filter_component(self):
|
||
|
component_logger = structuredlog.StructuredLogger(self.logger.name,
|
||
|
"test_component")
|
||
|
component_logger.component_filter = handlers.LogLevelFilter(lambda x:x, "info")
|
||
|
|
||
|
self.logger.debug("Test")
|
||
|
self.assertFalse(self.handler.empty)
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "DEBUG",
|
||
|
"message": "Test"})
|
||
|
self.assertTrue(self.handler.empty)
|
||
|
|
||
|
component_logger.info("Test 1")
|
||
|
self.assertFalse(self.handler.empty)
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "INFO",
|
||
|
"message": "Test 1",
|
||
|
"component": "test_component"})
|
||
|
|
||
|
component_logger.debug("Test 2")
|
||
|
self.assertTrue(self.handler.empty)
|
||
|
|
||
|
component_logger.component_filter = None
|
||
|
|
||
|
component_logger.debug("Test 3")
|
||
|
self.assertFalse(self.handler.empty)
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "DEBUG",
|
||
|
"message": "Test 3",
|
||
|
"component": "test_component"})
|
||
|
|
||
|
def test_filter_default_component(self):
|
||
|
component_logger = structuredlog.StructuredLogger(self.logger.name,
|
||
|
"test_component")
|
||
|
|
||
|
self.logger.debug("Test")
|
||
|
self.assertFalse(self.handler.empty)
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "DEBUG",
|
||
|
"message": "Test"})
|
||
|
|
||
|
self.logger.component_filter = handlers.LogLevelFilter(lambda x:x, "info")
|
||
|
|
||
|
self.logger.debug("Test 1")
|
||
|
self.assertTrue(self.handler.empty)
|
||
|
|
||
|
component_logger.debug("Test 2")
|
||
|
self.assertFalse(self.handler.empty)
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "DEBUG",
|
||
|
"message": "Test 2",
|
||
|
"component": "test_component"})
|
||
|
|
||
|
self.logger.component_filter = None
|
||
|
|
||
|
self.logger.debug("Test 3")
|
||
|
self.assertFalse(self.handler.empty)
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "DEBUG",
|
||
|
"message": "Test 3"})
|
||
|
|
||
|
def test_filter_message_mutuate(self):
|
||
|
def filter_mutate(msg):
|
||
|
if msg["action"] == "log":
|
||
|
msg["message"] = "FILTERED! %s" % msg["message"]
|
||
|
return msg
|
||
|
|
||
|
self.logger.component_filter = filter_mutate
|
||
|
self.logger.debug("Test")
|
||
|
self.assert_log_equals({"action": "log",
|
||
|
"level": "DEBUG",
|
||
|
"message": "FILTERED! Test"})
|
||
|
self.logger.component_filter = None
|
||
|
|
||
|
|
||
|
class FormatterTest(unittest.TestCase):
|
||
|
|
||
|
def setUp(self):
|
||
|
self.position = 0
|
||
|
self.logger = structuredlog.StructuredLogger("test_%s" % type(self).__name__)
|
||
|
self.output_file = StringIO.StringIO()
|
||
|
self.handler = handlers.StreamHandler(
|
||
|
self.output_file, self.get_formatter())
|
||
|
self.logger.add_handler(self.handler)
|
||
|
|
||
|
def set_position(self, pos=None):
|
||
|
if pos is None:
|
||
|
pos = self.output_file.tell()
|
||
|
self.position = pos
|
||
|
|
||
|
def get_formatter(self):
|
||
|
raise NotImplementedError("FormatterTest subclasses must implement get_formatter")
|
||
|
|
||
|
@property
|
||
|
def loglines(self):
|
||
|
self.output_file.seek(self.position)
|
||
|
return [line.rstrip() for line in self.output_file.readlines()]
|
||
|
|
||
|
class TestTBPLFormatter(FormatterTest):
|
||
|
|
||
|
def get_formatter(self):
|
||
|
return formatters.TbplFormatter()
|
||
|
|
||
|
def test_unexpected_message(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("timeout_test")
|
||
|
self.logger.test_end("timeout_test",
|
||
|
"TIMEOUT",
|
||
|
message="timed out")
|
||
|
self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | timed out",
|
||
|
self.loglines)
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_default_unexpected_end_message(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("timeout_test")
|
||
|
self.logger.test_end("timeout_test",
|
||
|
"TIMEOUT")
|
||
|
self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | expected OK",
|
||
|
self.loglines)
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_default_unexpected_status_message(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("timeout_test")
|
||
|
self.logger.test_status("timeout_test",
|
||
|
"subtest",
|
||
|
status="TIMEOUT")
|
||
|
self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | subtest - expected PASS",
|
||
|
self.loglines)
|
||
|
self.logger.test_end("timeout_test", "OK")
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
def test_single_newline(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.set_position()
|
||
|
self.logger.test_status("test1", "subtest",
|
||
|
status="PASS",
|
||
|
expected="FAIL")
|
||
|
self.logger.test_end("test1", "OK")
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
# This sequence should not produce blanklines
|
||
|
for line in self.loglines:
|
||
|
self.assertNotEqual("", line, "No blank line should be present in: %s" %
|
||
|
self.loglines)
|
||
|
|
||
|
def test_process_exit(self):
|
||
|
self.logger.process_exit(1234, 0)
|
||
|
self.assertIn('TEST-INFO | 1234: exit 0', self.loglines)
|
||
|
|
||
|
@unittest.skipUnless(os.name == 'posix', 'posix only')
|
||
|
def test_process_exit_with_sig(self):
|
||
|
# subprocess return code is negative when process
|
||
|
# has been killed by signal on posix.
|
||
|
self.logger.process_exit(1234, -signal.SIGTERM)
|
||
|
self.assertIn,('TEST-INFO | 1234: killed by SIGTERM', self.loglines)
|
||
|
|
||
|
class TestMachFormatter(FormatterTest):
|
||
|
|
||
|
def get_formatter(self):
|
||
|
return formatters.MachFormatter(disable_colors=True)
|
||
|
|
||
|
def test_summary(self):
|
||
|
self.logger.suite_start([])
|
||
|
|
||
|
#Some tests that pass
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_end("test1", status="PASS", expected="PASS")
|
||
|
|
||
|
self.logger.test_start("test2")
|
||
|
self.logger.test_end("test2", status="PASS", expected="TIMEOUT")
|
||
|
|
||
|
self.logger.test_start("test3")
|
||
|
self.logger.test_end("test3", status="FAIL", expected="PASS")
|
||
|
|
||
|
self.set_position()
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
self.assertIn("Ran 3 tests", self.loglines)
|
||
|
self.assertIn("Expected results: 1", self.loglines)
|
||
|
self.assertIn("Unexpected results: 2 (FAIL: 1, PASS: 1)", self.loglines)
|
||
|
self.assertNotIn("test1", self.loglines)
|
||
|
self.assertIn("PASS expected TIMEOUT test2", self.loglines)
|
||
|
self.assertIn("FAIL test3", self.loglines)
|
||
|
|
||
|
def test_summary_subtests(self):
|
||
|
self.logger.suite_start([])
|
||
|
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_status("test1", "subtest1", status="PASS")
|
||
|
self.logger.test_status("test1", "subtest2", status="FAIL")
|
||
|
self.logger.test_end("test1", status="OK", expected="OK")
|
||
|
|
||
|
self.logger.test_start("test2")
|
||
|
self.logger.test_status("test2", "subtest1", status="TIMEOUT", expected="PASS")
|
||
|
self.logger.test_end("test2", status="TIMEOUT", expected="OK")
|
||
|
|
||
|
self.set_position()
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
self.assertIn("Ran 5 tests (2 parents, 3 subtests)", self.loglines)
|
||
|
self.assertIn("Expected results: 2", self.loglines)
|
||
|
self.assertIn("Unexpected results: 3 (FAIL: 1, TIMEOUT: 2)", self.loglines)
|
||
|
|
||
|
def test_summary_ok(self):
|
||
|
self.logger.suite_start([])
|
||
|
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_status("test1", "subtest1", status="PASS")
|
||
|
self.logger.test_status("test1", "subtest2", status="PASS")
|
||
|
self.logger.test_end("test1", status="OK", expected="OK")
|
||
|
|
||
|
self.logger.test_start("test2")
|
||
|
self.logger.test_status("test2", "subtest1", status="PASS", expected="PASS")
|
||
|
self.logger.test_end("test2", status="OK", expected="OK")
|
||
|
|
||
|
self.set_position()
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
self.assertIn("OK", self.loglines)
|
||
|
self.assertIn("Expected results: 5", self.loglines)
|
||
|
self.assertIn("Unexpected results: 0", self.loglines)
|
||
|
|
||
|
def test_process_start(self):
|
||
|
self.logger.process_start(1234)
|
||
|
self.assertIn("Started process `1234`", self.loglines[0])
|
||
|
|
||
|
def test_process_start_with_command(self):
|
||
|
self.logger.process_start(1234, command='test cmd')
|
||
|
self.assertIn("Started process `1234` (test cmd)", self.loglines[0])
|
||
|
|
||
|
def test_process_exit(self):
|
||
|
self.logger.process_exit(1234, 0)
|
||
|
self.assertIn('1234: exit 0', self.loglines[0])
|
||
|
|
||
|
@unittest.skipUnless(os.name == 'posix', 'posix only')
|
||
|
def test_process_exit_with_sig(self):
|
||
|
# subprocess return code is negative when process
|
||
|
# has been killed by signal on posix.
|
||
|
self.logger.process_exit(1234, -signal.SIGTERM)
|
||
|
self.assertIn('1234: killed by SIGTERM', self.loglines[0])
|
||
|
|
||
|
|
||
|
class TestXUnitFormatter(FormatterTest):
|
||
|
|
||
|
def get_formatter(self):
|
||
|
return formatters.XUnitFormatter()
|
||
|
|
||
|
def log_as_xml(self):
|
||
|
return ET.fromstring('\n'.join(self.loglines))
|
||
|
|
||
|
def test_stacktrace_is_present(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_end("test1", "fail", message="Test message", stack='this\nis\na\nstack')
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
root = self.log_as_xml()
|
||
|
self.assertIn('this\nis\na\nstack', root.find('testcase/failure').text)
|
||
|
|
||
|
def test_failure_message(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_end("test1", "fail", message="Test message")
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
root = self.log_as_xml()
|
||
|
self.assertEquals('Expected OK, got FAIL', root.find('testcase/failure').get('message'))
|
||
|
|
||
|
def test_suite_attrs(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_end("test1", "ok", message="Test message")
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
root = self.log_as_xml()
|
||
|
self.assertEqual(root.get('skips'), '0')
|
||
|
self.assertEqual(root.get('failures'), '0')
|
||
|
self.assertEqual(root.get('errors'), '0')
|
||
|
self.assertEqual(root.get('tests'), '1')
|
||
|
self.assertEqual(root.get('time'), '0.00')
|
||
|
|
||
|
def test_time_is_not_rounded(self):
|
||
|
# call formatter directly, it is easier here
|
||
|
formatter = self.get_formatter()
|
||
|
formatter.suite_start(dict(time=55000))
|
||
|
formatter.test_start(dict(time=55100))
|
||
|
formatter.test_end(dict(time=55558, test='id', message='message', status='PASS'))
|
||
|
xml_string = formatter.suite_end(dict(time=55559))
|
||
|
|
||
|
root = ET.fromstring(xml_string)
|
||
|
self.assertEqual(root.get('time'), '0.56')
|
||
|
self.assertEqual(root.find('testcase').get('time'), '0.46')
|
||
|
|
||
|
|
||
|
class TestCommandline(unittest.TestCase):
|
||
|
|
||
|
def setUp(self):
|
||
|
self.logfile = mozfile.NamedTemporaryFile()
|
||
|
|
||
|
@property
|
||
|
def loglines(self):
|
||
|
self.logfile.seek(0)
|
||
|
return [line.rstrip() for line in self.logfile.readlines()]
|
||
|
|
||
|
def test_setup_logging(self):
|
||
|
parser = argparse.ArgumentParser()
|
||
|
commandline.add_logging_group(parser)
|
||
|
args = parser.parse_args(["--log-raw=-"])
|
||
|
logger = commandline.setup_logging("test_setup_logging", args, {})
|
||
|
self.assertEqual(len(logger.handlers), 1)
|
||
|
|
||
|
def test_setup_logging_optparse(self):
|
||
|
parser = optparse.OptionParser()
|
||
|
commandline.add_logging_group(parser)
|
||
|
args, _ = parser.parse_args(["--log-raw=-"])
|
||
|
logger = commandline.setup_logging("test_optparse", args, {})
|
||
|
self.assertEqual(len(logger.handlers), 1)
|
||
|
self.assertIsInstance(logger.handlers[0], handlers.StreamHandler)
|
||
|
|
||
|
def test_limit_formatters(self):
|
||
|
parser = argparse.ArgumentParser()
|
||
|
commandline.add_logging_group(parser, include_formatters=['raw'])
|
||
|
other_formatters = [fmt for fmt in commandline.log_formatters
|
||
|
if fmt != 'raw']
|
||
|
# check that every formatter except raw is not present
|
||
|
for fmt in other_formatters:
|
||
|
with self.assertRaises(SystemExit):
|
||
|
parser.parse_args(["--log-%s=-" % fmt])
|
||
|
with self.assertRaises(SystemExit):
|
||
|
parser.parse_args(["--log-%s-level=error" % fmt])
|
||
|
# raw is still ok
|
||
|
args = parser.parse_args(["--log-raw=-"])
|
||
|
logger = commandline.setup_logging("test_setup_logging2", args, {})
|
||
|
self.assertEqual(len(logger.handlers), 1)
|
||
|
|
||
|
def test_setup_logging_optparse_unicode(self):
|
||
|
parser = optparse.OptionParser()
|
||
|
commandline.add_logging_group(parser)
|
||
|
args, _ = parser.parse_args([u"--log-raw=-"])
|
||
|
logger = commandline.setup_logging("test_optparse_unicode", args, {})
|
||
|
self.assertEqual(len(logger.handlers), 1)
|
||
|
self.assertEqual(logger.handlers[0].stream, sys.stdout)
|
||
|
self.assertIsInstance(logger.handlers[0], handlers.StreamHandler)
|
||
|
|
||
|
def test_logging_defaultlevel(self):
|
||
|
parser = argparse.ArgumentParser()
|
||
|
commandline.add_logging_group(parser)
|
||
|
|
||
|
args = parser.parse_args(["--log-tbpl=%s" % self.logfile.name])
|
||
|
logger = commandline.setup_logging("test_fmtopts", args, {})
|
||
|
logger.info("INFO message")
|
||
|
logger.debug("DEBUG message")
|
||
|
logger.error("ERROR message")
|
||
|
# The debug level is not logged by default.
|
||
|
self.assertEqual(["INFO message",
|
||
|
"ERROR message"],
|
||
|
self.loglines)
|
||
|
|
||
|
def test_logging_errorlevel(self):
|
||
|
parser = argparse.ArgumentParser()
|
||
|
commandline.add_logging_group(parser)
|
||
|
args = parser.parse_args(["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=error"])
|
||
|
logger = commandline.setup_logging("test_fmtopts", args, {})
|
||
|
logger.info("INFO message")
|
||
|
logger.debug("DEBUG message")
|
||
|
logger.error("ERROR message")
|
||
|
|
||
|
# Only the error level and above were requested.
|
||
|
self.assertEqual(["ERROR message"],
|
||
|
self.loglines)
|
||
|
|
||
|
def test_logging_debuglevel(self):
|
||
|
parser = argparse.ArgumentParser()
|
||
|
commandline.add_logging_group(parser)
|
||
|
args = parser.parse_args(["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=debug"])
|
||
|
logger = commandline.setup_logging("test_fmtopts", args, {})
|
||
|
logger.info("INFO message")
|
||
|
logger.debug("DEBUG message")
|
||
|
logger.error("ERROR message")
|
||
|
# Requesting a lower log level than default works as expected.
|
||
|
self.assertEqual(["INFO message",
|
||
|
"DEBUG message",
|
||
|
"ERROR message"],
|
||
|
self.loglines)
|
||
|
|
||
|
def test_unused_options(self):
|
||
|
parser = argparse.ArgumentParser()
|
||
|
commandline.add_logging_group(parser)
|
||
|
args = parser.parse_args(["--log-tbpl-level=error"])
|
||
|
self.assertRaises(ValueError, commandline.setup_logging, "test_fmtopts", args, {})
|
||
|
|
||
|
class TestBuffer(BaseStructuredTest):
|
||
|
|
||
|
def assert_log_equals(self, expected, actual=None):
|
||
|
if actual is None:
|
||
|
actual = self.pop_last_item()
|
||
|
|
||
|
all_expected = {"pid": os.getpid(),
|
||
|
"thread": "MainThread",
|
||
|
"source": "testBuffer"}
|
||
|
specials = set(["time"])
|
||
|
|
||
|
all_expected.update(expected)
|
||
|
for key, value in all_expected.iteritems():
|
||
|
self.assertEqual(actual[key], value)
|
||
|
|
||
|
self.assertEquals(set(all_expected.keys()) | specials, set(actual.keys()))
|
||
|
|
||
|
def setUp(self):
|
||
|
self.logger = structuredlog.StructuredLogger("testBuffer")
|
||
|
self.handler = handlers.BufferHandler(TestHandler(), message_limit=4)
|
||
|
self.logger.add_handler(self.handler)
|
||
|
|
||
|
def tearDown(self):
|
||
|
self.logger.remove_handler(self.handler)
|
||
|
|
||
|
def pop_last_item(self):
|
||
|
return self.handler.inner.items.pop()
|
||
|
|
||
|
def test_buffer_messages(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.send_message("buffer", "off")
|
||
|
self.logger.test_status("test1", "sub1", status="PASS")
|
||
|
# Even for buffered actions, the buffer does not interfere if
|
||
|
# buffering is turned off.
|
||
|
self.assert_log_equals({"action": "test_status",
|
||
|
"test": "test1",
|
||
|
"status": "PASS",
|
||
|
"subtest": "sub1"})
|
||
|
self.logger.send_message("buffer", "on")
|
||
|
self.logger.test_status("test1", "sub2", status="PASS")
|
||
|
self.logger.test_status("test1", "sub3", status="PASS")
|
||
|
self.logger.test_status("test1", "sub4", status="PASS")
|
||
|
self.logger.test_status("test1", "sub5", status="PASS")
|
||
|
self.logger.test_status("test1", "sub6", status="PASS")
|
||
|
self.logger.test_status("test1", "sub7", status="PASS")
|
||
|
self.logger.test_end("test1", status="OK")
|
||
|
self.logger.send_message("buffer", "clear")
|
||
|
self.assert_log_equals({"action": "test_end",
|
||
|
"test": "test1",
|
||
|
"status": "OK"})
|
||
|
self.logger.suite_end()
|
||
|
|
||
|
|
||
|
def test_buffer_size(self):
|
||
|
self.logger.suite_start([])
|
||
|
self.logger.test_start("test1")
|
||
|
self.logger.test_status("test1", "sub1", status="PASS")
|
||
|
self.logger.test_status("test1", "sub2", status="PASS")
|
||
|
self.logger.test_status("test1", "sub3", status="PASS")
|
||
|
self.logger.test_status("test1", "sub4", status="PASS")
|
||
|
self.logger.test_status("test1", "sub5", status="PASS")
|
||
|
self.logger.test_status("test1", "sub6", status="PASS")
|
||
|
self.logger.test_status("test1", "sub7", status="PASS")
|
||
|
|
||
|
# No test status messages made it to the underlying handler.
|
||
|
self.assert_log_equals({"action": "test_start",
|
||
|
"test": "test1"})
|
||
|
|
||
|
# The buffer's actual size never grows beyond the specified limit.
|
||
|
self.assertEquals(len(self.handler._buffer), 4)
|
||
|
|
||
|
self.logger.test_status("test1", "sub8", status="FAIL")
|
||
|
# The number of messages deleted comes back in a list.
|
||
|
self.assertEquals([4], self.logger.send_message("buffer", "flush"))
|
||
|
|
||
|
# When the buffer is dumped, the failure is the last thing logged
|
||
|
self.assert_log_equals({"action": "test_status",
|
||
|
"test": "test1",
|
||
|
"subtest": "sub8",
|
||
|
"status": "FAIL",
|
||
|
"expected": "PASS"})
|
||
|
# Three additional messages should have been retained for context
|
||
|
self.assert_log_equals({"action": "test_status",
|
||
|
"test": "test1",
|
||
|
"status": "PASS",
|
||
|
"subtest": "sub7"})
|
||
|
self.assert_log_equals({"action": "test_status",
|
||
|
"test": "test1",
|
||
|
"status": "PASS",
|
||
|
"subtest": "sub6"})
|
||
|
self.assert_log_equals({"action": "test_status",
|
||
|
"test": "test1",
|
||
|
"status": "PASS",
|
||
|
"subtest": "sub5"})
|
||
|
self.assert_log_equals({"action": "suite_start",
|
||
|
"tests": []})
|
||
|
|
||
|
|
||
|
class TestReader(unittest.TestCase):
|
||
|
def to_file_like(self, obj):
|
||
|
data_str = "\n".join(json.dumps(item) for item in obj)
|
||
|
return StringIO.StringIO(data_str)
|
||
|
|
||
|
def test_read(self):
|
||
|
data = [{"action": "action_0", "data": "data_0"},
|
||
|
{"action": "action_1", "data": "data_1"}]
|
||
|
|
||
|
f = self.to_file_like(data)
|
||
|
self.assertEquals(data, list(reader.read(f)))
|
||
|
|
||
|
def test_imap_log(self):
|
||
|
data = [{"action": "action_0", "data": "data_0"},
|
||
|
{"action": "action_1", "data": "data_1"}]
|
||
|
|
||
|
f = self.to_file_like(data)
|
||
|
|
||
|
def f_action_0(item):
|
||
|
return ("action_0", item["data"])
|
||
|
|
||
|
def f_action_1(item):
|
||
|
return ("action_1", item["data"])
|
||
|
|
||
|
res_iter = reader.imap_log(reader.read(f),
|
||
|
{"action_0": f_action_0,
|
||
|
"action_1": f_action_1})
|
||
|
self.assertEquals([("action_0", "data_0"), ("action_1", "data_1")],
|
||
|
list(res_iter))
|
||
|
|
||
|
def test_each_log(self):
|
||
|
data = [{"action": "action_0", "data": "data_0"},
|
||
|
{"action": "action_1", "data": "data_1"}]
|
||
|
|
||
|
f = self.to_file_like(data)
|
||
|
|
||
|
count = {"action_0":0,
|
||
|
"action_1":0}
|
||
|
|
||
|
def f_action_0(item):
|
||
|
count[item["action"]] += 1
|
||
|
|
||
|
def f_action_1(item):
|
||
|
count[item["action"]] += 2
|
||
|
|
||
|
reader.each_log(reader.read(f),
|
||
|
{"action_0": f_action_0,
|
||
|
"action_1": f_action_1})
|
||
|
|
||
|
self.assertEquals({"action_0":1, "action_1":2}, count)
|
||
|
|
||
|
def test_handler(self):
|
||
|
data = [{"action": "action_0", "data": "data_0"},
|
||
|
{"action": "action_1", "data": "data_1"}]
|
||
|
|
||
|
f = self.to_file_like(data)
|
||
|
|
||
|
test = self
|
||
|
class ReaderTestHandler(reader.LogHandler):
|
||
|
def __init__(self):
|
||
|
self.action_0_count = 0
|
||
|
self.action_1_count = 0
|
||
|
|
||
|
def action_0(self, item):
|
||
|
test.assertEquals(item["action"], "action_0")
|
||
|
self.action_0_count += 1
|
||
|
|
||
|
def action_1(self, item):
|
||
|
test.assertEquals(item["action"], "action_1")
|
||
|
self.action_1_count += 1
|
||
|
|
||
|
handler = ReaderTestHandler()
|
||
|
reader.handle_log(reader.read(f), handler)
|
||
|
|
||
|
self.assertEquals(handler.action_0_count, 1)
|
||
|
self.assertEquals(handler.action_1_count, 1)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
unittest.main()
|