Commit e756dbeb authored by Daniel Latypov's avatar Daniel Latypov Committed by Shuah Khan
Browse files

kunit: tool: refactoring printing logic into kunit_printer.py



Context:
* kunit_kernel.py is importing kunit_parser.py just to use the
  print_with_timestamp() function
* the parser is directly printing to stdout, which will become an issue
  if we ever try to run multiple kernels in parallel

This patch introduces a kunit_printer.py file and migrates callers of
kunit_parser.print_with_timestamp() to call
kunit_printer.stdout.print_with_timestamp() instead.

Future changes:
If we want to support showing results for parallel runs, we could then
create new Printer's that don't directly write to stdout and refactor
the code to pass around these Printer objects.

Signed-off-by: default avatarDaniel Latypov <dlatypov@google.com>
Reviewed-by: default avatarDavid Gow <davidgow@google.com>
Reviewed-by: default avatarBrendan Higgins <brendanhiggins@google.com>
Signed-off-by: default avatarShuah Khan <skhan@linuxfoundation.org>
parent 8a04930f
Loading
Loading
Loading
Loading
+9 −8
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ from typing import Iterable, List, Optional, Sequence, Tuple
import kunit_json
import kunit_kernel
import kunit_parser
from kunit_printer import stdout

class KunitStatus(Enum):
	SUCCESS = auto()
@@ -72,7 +73,7 @@ def get_kernel_root_path() -> str:

def config_tests(linux: kunit_kernel.LinuxSourceTree,
		 request: KunitConfigRequest) -> KunitResult:
	kunit_parser.print_with_timestamp('Configuring KUnit Kernel ...')
	stdout.print_with_timestamp('Configuring KUnit Kernel ...')

	config_start = time.time()
	success = linux.build_reconfig(request.build_dir, request.make_options)
@@ -85,7 +86,7 @@ def config_tests(linux: kunit_kernel.LinuxSourceTree,

def build_tests(linux: kunit_kernel.LinuxSourceTree,
		request: KunitBuildRequest) -> KunitResult:
	kunit_parser.print_with_timestamp('Building KUnit Kernel ...')
	stdout.print_with_timestamp('Building KUnit Kernel ...')

	build_start = time.time()
	success = linux.build_kernel(request.alltests,
@@ -158,7 +159,7 @@ def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -
	test_counts = kunit_parser.TestCounts()
	exec_time = 0.0
	for i, filter_glob in enumerate(filter_globs):
		kunit_parser.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs)))
		stdout.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs)))

		test_start = time.time()
		run_result = linux.run_kernel(
@@ -221,7 +222,7 @@ def parse_tests(request: KunitParseRequest, metadata: kunit_json.Metadata, input
		else:
			with open(request.json, 'w') as f:
				f.write(json_str)
			kunit_parser.print_with_timestamp("Test results stored in %s" %
			stdout.print_with_timestamp("Test results stored in %s" %
				os.path.abspath(request.json))

	if test_result.status != kunit_parser.TestStatus.SUCCESS:
@@ -245,7 +246,7 @@ def run_tests(linux: kunit_kernel.LinuxSourceTree,

	run_end = time.time()

	kunit_parser.print_with_timestamp((
	stdout.print_with_timestamp((
		'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' +
		'building, %.3fs running\n') % (
				run_end - run_start,
@@ -446,7 +447,7 @@ def main(argv):
		request = KunitConfigRequest(build_dir=cli_args.build_dir,
					     make_options=cli_args.make_options)
		result = config_tests(linux, request)
		kunit_parser.print_with_timestamp((
		stdout.print_with_timestamp((
			'Elapsed time: %.3fs\n') % (
				result.elapsed_time))
		if result.status != KunitStatus.SUCCESS:
@@ -458,7 +459,7 @@ def main(argv):
					    jobs=cli_args.jobs,
					    alltests=cli_args.alltests)
		result = config_and_build_tests(linux, request)
		kunit_parser.print_with_timestamp((
		stdout.print_with_timestamp((
			'Elapsed time: %.3fs\n') % (
				result.elapsed_time))
		if result.status != KunitStatus.SUCCESS:
@@ -474,7 +475,7 @@ def main(argv):
						kernel_args=cli_args.kernel_args,
						run_isolated=cli_args.run_isolated)
		result = exec_tests(linux, exec_request)
		kunit_parser.print_with_timestamp((
		stdout.print_with_timestamp((
			'Elapsed time: %.3fs\n') % (result.elapsed_time))
		if result.status != KunitStatus.SUCCESS:
			sys.exit(1)
+4 −4
Original line number Diff line number Diff line
@@ -18,7 +18,7 @@ import threading
from typing import Iterator, List, Optional, Tuple

import kunit_config
import kunit_parser
from kunit_printer import stdout
import qemu_config

KCONFIG_PATH = '.config'
@@ -138,7 +138,7 @@ class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
		super().__init__(linux_arch='um', cross_compile=cross_compile)

	def make_allyesconfig(self, build_dir: str, make_options) -> None:
		kunit_parser.print_with_timestamp(
		stdout.print_with_timestamp(
			'Enabling all CONFIGs for UML...')
		command = ['make', 'ARCH=um', 'O=' + build_dir, 'allyesconfig']
		if make_options:
@@ -148,13 +148,13 @@ class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
			stdout=subprocess.DEVNULL,
			stderr=subprocess.STDOUT)
		process.wait()
		kunit_parser.print_with_timestamp(
		stdout.print_with_timestamp(
			'Disabling broken configs to run KUnit tests...')

		with open(get_kconfig_path(build_dir), 'a') as config:
			with open(BROKEN_ALLCONFIG_PATH, 'r') as disable:
				config.write(disable.read())
		kunit_parser.print_with_timestamp(
		stdout.print_with_timestamp(
			'Starting Kernel with all configs takes a few minutes...')

	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
+19 −44
Original line number Diff line number Diff line
@@ -13,10 +13,11 @@ from __future__ import annotations
import re
import sys

import datetime
from enum import Enum, auto
from typing import Iterable, Iterator, List, Optional, Tuple

from kunit_printer import stdout

class Test:
	"""
	A class to represent a test parsed from KTAP results. All KTAP
@@ -55,7 +56,7 @@ class Test:
	def add_error(self, error_message: str) -> None:
		"""Records an error that occurred while parsing this test."""
		self.counts.errors += 1
		print_with_timestamp(red('[ERROR]') + f' Test: {self.name}: {error_message}')
		stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')

class TestStatus(Enum):
	"""An enumeration class to represent the status of a test."""
@@ -461,32 +462,6 @@ def parse_diagnostic(lines: LineStream) -> List[str]:

DIVIDER = '=' * 60

RESET = '\033[0;0m'

def red(text: str) -> str:
	"""Returns inputted string with red color code."""
	if not sys.stdout.isatty():
		return text
	return '\033[1;31m' + text + RESET

def yellow(text: str) -> str:
	"""Returns inputted string with yellow color code."""
	if not sys.stdout.isatty():
		return text
	return '\033[1;33m' + text + RESET

def green(text: str) -> str:
	"""Returns inputted string with green color code."""
	if not sys.stdout.isatty():
		return text
	return '\033[1;32m' + text + RESET

ANSI_LEN = len(red(''))

def print_with_timestamp(message: str) -> None:
	"""Prints message with timestamp at beginning."""
	print('[%s] %s' % (datetime.datetime.now().strftime('%H:%M:%S'), message))

def format_test_divider(message: str, len_message: int) -> str:
	"""
	Returns string with message centered in fixed width divider.
@@ -529,12 +504,12 @@ def print_test_header(test: Test) -> None:
			message += ' (1 subtest)'
		else:
			message += f' ({test.expected_count} subtests)'
	print_with_timestamp(format_test_divider(message, len(message)))
	stdout.print_with_timestamp(format_test_divider(message, len(message)))

def print_log(log: Iterable[str]) -> None:
	"""Prints all strings in saved log for test in yellow."""
	for m in log:
		print_with_timestamp(yellow(m))
		stdout.print_with_timestamp(stdout.yellow(m))

def format_test_result(test: Test) -> str:
	"""
@@ -551,16 +526,16 @@ def format_test_result(test: Test) -> str:
	String containing formatted test result
	"""
	if test.status == TestStatus.SUCCESS:
		return green('[PASSED] ') + test.name
		return stdout.green('[PASSED] ') + test.name
	if test.status == TestStatus.SKIPPED:
		return yellow('[SKIPPED] ') + test.name
		return stdout.yellow('[SKIPPED] ') + test.name
	if test.status == TestStatus.NO_TESTS:
		return yellow('[NO TESTS RUN] ') + test.name
		return stdout.yellow('[NO TESTS RUN] ') + test.name
	if test.status == TestStatus.TEST_CRASHED:
		print_log(test.log)
		return red('[CRASHED] ') + test.name
		return stdout.red('[CRASHED] ') + test.name
	print_log(test.log)
	return red('[FAILED] ') + test.name
	return stdout.red('[FAILED] ') + test.name

def print_test_result(test: Test) -> None:
	"""
@@ -572,7 +547,7 @@ def print_test_result(test: Test) -> None:
	Parameters:
	test - Test object representing current test being printed
	"""
	print_with_timestamp(format_test_result(test))
	stdout.print_with_timestamp(format_test_result(test))

def print_test_footer(test: Test) -> None:
	"""
@@ -585,8 +560,8 @@ def print_test_footer(test: Test) -> None:
	test - Test object representing current test being printed
	"""
	message = format_test_result(test)
	print_with_timestamp(format_test_divider(message,
		len(message) - ANSI_LEN))
	stdout.print_with_timestamp(format_test_divider(message,
		len(message) - stdout.color_len()))

def print_summary_line(test: Test) -> None:
	"""
@@ -603,12 +578,12 @@ def print_summary_line(test: Test) -> None:
	test - Test object representing current test being printed
	"""
	if test.status == TestStatus.SUCCESS:
		color = green
		color = stdout.green
	elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
		color = yellow
		color = stdout.yellow
	else:
		color = red
	print_with_timestamp(color(f'Testing complete. {test.counts}'))
		color = stdout.red
	stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))

# Other methods:

@@ -762,7 +737,7 @@ def parse_run_tests(kernel_output: Iterable[str]) -> Test:
	Return:
	Test - the main test object with all subtests.
	"""
	print_with_timestamp(DIVIDER)
	stdout.print_with_timestamp(DIVIDER)
	lines = extract_tap_lines(kernel_output)
	test = Test()
	if not lines:
@@ -773,6 +748,6 @@ def parse_run_tests(kernel_output: Iterable[str]) -> Test:
		test = parse_test(lines, 0, [])
		if test.status != TestStatus.NO_TESTS:
			test.status = test.counts.get_status()
	print_with_timestamp(DIVIDER)
	stdout.print_with_timestamp(DIVIDER)
	print_summary_line(test)
	return test
+48 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
#
# Utilities for printing and coloring output.
#
# Copyright (C) 2022, Google LLC.
# Author: Daniel Latypov <dlatypov@google.com>

import datetime
import sys
import typing

_RESET = '\033[0;0m'

class Printer:
	"""Wraps a file object, providing utilities for coloring output, etc."""

	def __init__(self, output: typing.IO):
		self._output = output
		self._use_color = output.isatty()

	def print(self, message: str) -> None:
		print(message, file=self._output)

	def print_with_timestamp(self, message: str) -> None:
		ts = datetime.datetime.now().strftime('%H:%M:%S')
		self.print(f'[{ts}] {message}')

	def _color(self, code: str, text: str) -> str:
		if not self._use_color:
			return text
		return code + text + _RESET

	def red(self, text: str) -> str:
		return self._color('\033[1;31m', text)

	def yellow(self, text: str) -> str:
		return self._color('\033[1;33m', text)

	def green(self, text: str) -> str:
		return self._color('\033[1;32m', text)

	def color_len(self) -> int:
		"""Returns the length of the color escape codes."""
		return len(self.red(''))

# Provides a default instance that prints to stdout
stdout = Printer(sys.stdout)
+2 −2
Original line number Diff line number Diff line
@@ -222,7 +222,7 @@ class KUnitParserTest(unittest.TestCase):

	def test_no_kunit_output(self):
		crash_log = test_data_path('test_insufficient_memory.log')
		print_mock = mock.patch('builtins.print').start()
		print_mock = mock.patch('kunit_printer.Printer.print').start()
		with open(crash_log) as file:
			result = kunit_parser.parse_run_tests(
				kunit_parser.extract_tap_lines(file.readlines()))
@@ -500,7 +500,7 @@ class KUnitMainTest(unittest.TestCase):
		with open(path) as file:
			all_passed_log = file.readlines()

		self.print_mock = mock.patch('builtins.print').start()
		self.print_mock = mock.patch('kunit_printer.Printer.print').start()
		self.addCleanup(mock.patch.stopall)

		self.mock_linux_init = mock.patch.object(kunit_kernel, 'LinuxSourceTree').start()