Commit 7a9b709e authored by Pawan Gupta's avatar Pawan Gupta Committed by Dave Hansen
Browse files

selftest/x86/bugs: Add selftests for ITS

Below are the tests added for Indirect Target Selection (ITS):

- its_sysfs.py - Check if sysfs reflects the correct mitigation status for
  the mitigation selected via the kernel cmdline.

- its_permutations.py - tests mitigation selection with cmdline
  permutations with other bugs like spectre_v2 and retbleed.

- its_indirect_alignment.py - verifies that for addresses in
  .retpoline_sites section that belong to lower half of cacheline are
  patched to ITS-safe thunk. Typical output looks like below:

  Site 49: function symbol: __x64_sys_restart_syscall+0x1f <0xffffffffbb1509af>
  #     vmlinux: 0xffffffff813509af:    jmp     0xffffffff81f5a8e0
  #     kcore:   0xffffffffbb1509af:    jmpq    *%rax
  #     ITS thunk NOT expected for site 49
  #     PASSED: Found *%rax
  #
  Site 50: function symbol: __resched_curr+0xb0 <0xffffffffbb181910>
  #     vmlinux: 0xffffffff81381910:    jmp     0xffffffff81f5a8e0
  #     kcore:   0xffffffffbb181910:    jmp     0xffffffffc02000fc
  #     ITS thunk expected for site 50
  #     PASSED: Found 0xffffffffc02000fc -> jmpq *%rax <scattered-thunk?>

- its_ret_alignment.py - verifies that for addresses in .return_sites
  section that belong to lower half of cacheline are patched to
  its_return_thunk. Typical output looks like below:

  Site 97: function symbol: collect_event+0x48 <0xffffffffbb007f18>
  #     vmlinux: 0xffffffff81207f18:    jmp     0xffffffff81f5b500
  #     kcore:   0xffffffffbb007f18:    jmp     0xffffffffbbd5b560
  #     PASSED: Found jmp 0xffffffffbbd5b560 <its_return_thunk>
  #
  Site 98: function symbol: collect_event+0xa4 <0xffffffffbb007f74>
  #     vmlinux: 0xffffffff81207f74:    jmp     0xffffffff81f5b500
  #     kcore:   0xffffffffbb007f74:    retq
  #     PASSED: Found retq

Some of these tests have dependency on tools like virtme-ng[1] and drgn[2].
When the dependencies are not met, the test will be skipped.

[1] https://github.com/arighi/virtme-ng
[2] https://github.com/osandov/drgn



Co-developed-by: default avatarTao Zhang <tao1.zhang@linux.intel.com>
Signed-off-by: default avatarTao Zhang <tao1.zhang@linux.intel.com>
Signed-off-by: default avatarPawan Gupta <pawan.kumar.gupta@linux.intel.com>
Signed-off-by: default avatarDave Hansen <dave.hansen@linux.intel.com>
parent e52c1dc7
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -121,6 +121,7 @@ TARGETS += user_events
TARGETS += vDSO
TARGETS += mm
TARGETS += x86
TARGETS += x86/bugs
TARGETS += zram
#Please keep the TARGETS list alphabetically sorted
# Run "make quicktest=1 run_tests" or
+3 −0
Original line number Diff line number Diff line
TEST_PROGS := its_sysfs.py its_permutations.py its_indirect_alignment.py its_ret_alignment.py
TEST_FILES := common.py
include ../../lib.mk
+164 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
#
# Copyright (c) 2025 Intel Corporation
#
# This contains kselftest framework adapted common functions for testing
# mitigation for x86 bugs.

import os, sys, re, shutil

sys.path.insert(0, '../../kselftest')
import ksft

def read_file(path):
    if not os.path.exists(path):
        return None
    with open(path, 'r') as file:
        return file.read().strip()

def cpuinfo_has(arg):
    cpuinfo = read_file('/proc/cpuinfo')
    if arg in cpuinfo:
        return True
    return False

def cmdline_has(arg):
    cmdline = read_file('/proc/cmdline')
    if arg in cmdline:
        return True
    return False

def cmdline_has_either(args):
    cmdline = read_file('/proc/cmdline')
    for arg in args:
        if arg in cmdline:
            return True
    return False

def cmdline_has_none(args):
    return not cmdline_has_either(args)

def cmdline_has_all(args):
    cmdline = read_file('/proc/cmdline')
    for arg in args:
        if arg not in cmdline:
            return False
    return True

def get_sysfs(bug):
    return read_file("/sys/devices/system/cpu/vulnerabilities/" + bug)

def sysfs_has(bug, mitigation):
    status = get_sysfs(bug)
    if mitigation in status:
        return True
    return False

def sysfs_has_either(bugs, mitigations):
    for bug in bugs:
        for mitigation in mitigations:
            if sysfs_has(bug, mitigation):
                return True
    return False

def sysfs_has_none(bugs, mitigations):
    return not sysfs_has_either(bugs, mitigations)

def sysfs_has_all(bugs, mitigations):
    for bug in bugs:
        for mitigation in mitigations:
            if not sysfs_has(bug, mitigation):
                return False
    return True

def bug_check_pass(bug, found):
    ksft.print_msg(f"\nFound: {found}")
    # ksft.print_msg(f"\ncmdline: {read_file('/proc/cmdline')}")
    ksft.test_result_pass(f'{bug}: {found}')

def bug_check_fail(bug, found, expected):
    ksft.print_msg(f'\nFound:\t {found}')
    ksft.print_msg(f'Expected:\t {expected}')
    ksft.print_msg(f"\ncmdline: {read_file('/proc/cmdline')}")
    ksft.test_result_fail(f'{bug}: {found}')

def bug_status_unknown(bug, found):
    ksft.print_msg(f'\nUnknown status: {found}')
    ksft.print_msg(f"\ncmdline: {read_file('/proc/cmdline')}")
    ksft.test_result_fail(f'{bug}: {found}')

def basic_checks_sufficient(bug, mitigation):
    if not mitigation:
        bug_status_unknown(bug, "None")
        return True
    elif mitigation == "Not affected":
        ksft.test_result_pass(bug)
        return True
    elif mitigation == "Vulnerable":
        if cmdline_has_either([f'{bug}=off', 'mitigations=off']):
            bug_check_pass(bug, mitigation)
            return True
    return False

def get_section_info(vmlinux, section_name):
    from elftools.elf.elffile import ELFFile
    with open(vmlinux, 'rb') as f:
        elffile = ELFFile(f)
        section = elffile.get_section_by_name(section_name)
        if section is None:
            ksft.print_msg("Available sections in vmlinux:")
            for sec in elffile.iter_sections():
                ksft.print_msg(sec.name)
            raise ValueError(f"Section {section_name} not found in {vmlinux}")
        return section['sh_addr'], section['sh_offset'], section['sh_size']

def get_patch_sites(vmlinux, offset, size):
    import struct
    output = []
    with open(vmlinux, 'rb') as f:
        f.seek(offset)
        i = 0
        while i < size:
            data = f.read(4)  # s32
            if not data:
                break
            sym_offset = struct.unpack('<i', data)[0] + i
            i += 4
            output.append(sym_offset)
    return output

def get_instruction_from_vmlinux(elffile, section, virtual_address, target_address):
    from capstone import Cs, CS_ARCH_X86, CS_MODE_64
    section_start = section['sh_addr']
    section_end = section_start + section['sh_size']

    if not (section_start <= target_address < section_end):
        return None

    offset = target_address - section_start
    code = section.data()[offset:offset + 16]

    cap = init_capstone()
    for instruction in cap.disasm(code, target_address):
        if instruction.address == target_address:
            return instruction
    return None

def init_capstone():
    from capstone import Cs, CS_ARCH_X86, CS_MODE_64, CS_OPT_SYNTAX_ATT
    cap = Cs(CS_ARCH_X86, CS_MODE_64)
    cap.syntax = CS_OPT_SYNTAX_ATT
    return cap

def get_runtime_kernel():
    import drgn
    return drgn.program_from_kernel()

def check_dependencies_or_skip(modules, script_name="unknown test"):
    for mod in modules:
        try:
            __import__(mod)
        except ImportError:
            ksft.test_result_skip(f"Skipping {script_name}: missing module '{mod}'")
            ksft.finished()
+150 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
#
# Copyright (c) 2025 Intel Corporation
#
# Test for indirect target selection (ITS) mitigation.
#
# Test if indirect CALL/JMP are correctly patched by evaluating
# the vmlinux .retpoline_sites in /proc/kcore.

# Install dependencies
# add-apt-repository ppa:michel-slm/kernel-utils
# apt update
# apt install -y python3-drgn python3-pyelftools python3-capstone
#
# Best to copy the vmlinux at a standard location:
# mkdir -p /usr/lib/debug/lib/modules/$(uname -r)
# cp $VMLINUX /usr/lib/debug/lib/modules/$(uname -r)/vmlinux
#
# Usage: ./its_indirect_alignment.py [vmlinux]

import os, sys, argparse
from pathlib import Path

this_dir = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, this_dir + '/../../kselftest')
import ksft
import common as c

bug = "indirect_target_selection"

mitigation = c.get_sysfs(bug)
if not mitigation or "Aligned branch/return thunks" not in mitigation:
    ksft.test_result_skip("Skipping its_indirect_alignment.py: Aligned branch/return thunks not enabled")
    ksft.finished()

if c.sysfs_has("spectre_v2", "Retpolines"):
    ksft.test_result_skip("Skipping its_indirect_alignment.py: Retpolines deployed")
    ksft.finished()

c.check_dependencies_or_skip(['drgn', 'elftools', 'capstone'], script_name="its_indirect_alignment.py")

from elftools.elf.elffile import ELFFile
from drgn.helpers.common.memory import identify_address

cap = c.init_capstone()

if len(os.sys.argv) > 1:
    arg_vmlinux = os.sys.argv[1]
    if not os.path.exists(arg_vmlinux):
        ksft.test_result_fail(f"its_indirect_alignment.py: vmlinux not found at argument path: {arg_vmlinux}")
        ksft.exit_fail()
    os.makedirs(f"/usr/lib/debug/lib/modules/{os.uname().release}", exist_ok=True)
    os.system(f'cp {arg_vmlinux} /usr/lib/debug/lib/modules/$(uname -r)/vmlinux')

vmlinux = f"/usr/lib/debug/lib/modules/{os.uname().release}/vmlinux"
if not os.path.exists(vmlinux):
    ksft.test_result_fail(f"its_indirect_alignment.py: vmlinux not found at {vmlinux}")
    ksft.exit_fail()

ksft.print_msg(f"Using vmlinux: {vmlinux}")

retpolines_start_vmlinux, retpolines_sec_offset, size = c.get_section_info(vmlinux, '.retpoline_sites')
ksft.print_msg(f"vmlinux: Section .retpoline_sites (0x{retpolines_start_vmlinux:x}) found at 0x{retpolines_sec_offset:x} with size 0x{size:x}")

sites_offset = c.get_patch_sites(vmlinux, retpolines_sec_offset, size)
total_retpoline_tests = len(sites_offset)
ksft.print_msg(f"Found {total_retpoline_tests} retpoline sites")

prog = c.get_runtime_kernel()
retpolines_start_kcore = prog.symbol('__retpoline_sites').address
ksft.print_msg(f'kcore: __retpoline_sites: 0x{retpolines_start_kcore:x}')

x86_indirect_its_thunk_r15 = prog.symbol('__x86_indirect_its_thunk_r15').address
ksft.print_msg(f'kcore: __x86_indirect_its_thunk_r15: 0x{x86_indirect_its_thunk_r15:x}')

tests_passed = 0
tests_failed = 0
tests_unknown = 0

with open(vmlinux, 'rb') as f:
    elffile = ELFFile(f)
    text_section = elffile.get_section_by_name('.text')

    for i in range(0, len(sites_offset)):
        site = retpolines_start_kcore + sites_offset[i]
        vmlinux_site = retpolines_start_vmlinux + sites_offset[i]
        passed = unknown = failed = False
        try:
            vmlinux_insn = c.get_instruction_from_vmlinux(elffile, text_section, text_section['sh_addr'], vmlinux_site)
            kcore_insn = list(cap.disasm(prog.read(site, 16), site))[0]
            operand = kcore_insn.op_str
            insn_end = site + kcore_insn.size - 1 # TODO handle Jcc.32 __x86_indirect_thunk_\reg
            safe_site = insn_end & 0x20
            site_status = "" if safe_site else "(unsafe)"

            ksft.print_msg(f"\nSite {i}: {identify_address(prog, site)} <0x{site:x}> {site_status}")
            ksft.print_msg(f"\tvmlinux: 0x{vmlinux_insn.address:x}:\t{vmlinux_insn.mnemonic}\t{vmlinux_insn.op_str}")
            ksft.print_msg(f"\tkcore:   0x{kcore_insn.address:x}:\t{kcore_insn.mnemonic}\t{kcore_insn.op_str}")

            if (site & 0x20) ^ (insn_end & 0x20):
                ksft.print_msg(f"\tSite at safe/unsafe boundary: {str(kcore_insn.bytes)} {kcore_insn.mnemonic} {operand}")
            if safe_site:
                tests_passed += 1
                passed = True
                ksft.print_msg(f"\tPASSED: At safe address")
                continue

            if operand.startswith('0xffffffff'):
                thunk = int(operand, 16)
                if thunk > x86_indirect_its_thunk_r15:
                    insn_at_thunk = list(cap.disasm(prog.read(thunk, 16), thunk))[0]
                    operand += ' -> ' + insn_at_thunk.mnemonic + ' ' + insn_at_thunk.op_str + ' <dynamic-thunk?>'
                    if 'jmp' in insn_at_thunk.mnemonic and thunk & 0x20:
                        ksft.print_msg(f"\tPASSED: Found {operand} at safe address")
                        passed = True
                if not passed:
                    if kcore_insn.operands[0].type == capstone.CS_OP_IMM:
                        operand += ' <' + prog.symbol(int(operand, 16)) + '>'
                        if '__x86_indirect_its_thunk_' in operand:
                            ksft.print_msg(f"\tPASSED: Found {operand}")
                        else:
                            ksft.print_msg(f"\tPASSED: Found direct branch: {kcore_insn}, ITS thunk not required.")
                        passed = True
                    else:
                        unknown = True
            if passed:
                tests_passed += 1
            elif unknown:
                ksft.print_msg(f"UNKNOWN: unexpected operand: {kcore_insn}")
                tests_unknown += 1
            else:
                ksft.print_msg(f'\t************* FAILED *************')
                ksft.print_msg(f"\tFound {kcore_insn.bytes} {kcore_insn.mnemonic} {operand}")
                ksft.print_msg(f'\t**********************************')
                tests_failed += 1
        except Exception as e:
            ksft.print_msg(f"UNKNOWN: An unexpected error occurred: {e}")
            tests_unknown += 1

ksft.print_msg(f"\n\nSummary:")
ksft.print_msg(f"PASS:    \t{tests_passed} \t/ {total_retpoline_tests}")
ksft.print_msg(f"FAIL:    \t{tests_failed} \t/ {total_retpoline_tests}")
ksft.print_msg(f"UNKNOWN: \t{tests_unknown} \t/ {total_retpoline_tests}")

if tests_failed == 0:
    ksft.test_result_pass("All ITS return thunk sites passed")
else:
    ksft.test_result_fail(f"{tests_failed} ITS return thunk sites failed")
ksft.finished()
+109 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
#
# Copyright (c) 2025 Intel Corporation
#
# Test for indirect target selection (ITS) cmdline permutations with other bugs
# like spectre_v2 and retbleed.

import os, sys, subprocess, itertools, re, shutil

test_dir = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, test_dir + '/../../kselftest')
import ksft
import common as c

bug = "indirect_target_selection"
mitigation = c.get_sysfs(bug)

if not mitigation or "Not affected" in mitigation:
    ksft.test_result_skip("Skipping its_permutations.py: not applicable")
    ksft.finished()

if shutil.which('vng') is None:
    ksft.test_result_skip("Skipping its_permutations.py: virtme-ng ('vng') not found in PATH.")
    ksft.finished()

TEST = f"{test_dir}/its_sysfs.py"
default_kparam = ['clearcpuid=hypervisor', 'panic=5', 'panic_on_warn=1', 'oops=panic', 'nmi_watchdog=1', 'hung_task_panic=1']

DEBUG = " -v "

# Install dependencies
# https://github.com/arighi/virtme-ng
# apt install virtme-ng
BOOT_CMD = f"vng --run {test_dir}/../../../../../arch/x86/boot/bzImage "
#BOOT_CMD += DEBUG

bug = "indirect_target_selection"

input_options = {
    'indirect_target_selection'     : ['off', 'on', 'stuff', 'vmexit'],
    'retbleed'                      : ['off', 'stuff', 'auto'],
    'spectre_v2'                    : ['off', 'on', 'eibrs', 'retpoline', 'ibrs', 'eibrs,retpoline'],
}

def pretty_print(output):
    OKBLUE = '\033[94m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'

    # Define patterns and their corresponding colors
    patterns = {
        r"^ok \d+": OKGREEN,
        r"^not ok \d+": FAIL,
        r"^# Testing .*": OKBLUE,
        r"^# Found: .*": WARNING,
        r"^# Totals: .*": BOLD,
        r"pass:([1-9]\d*)": OKGREEN,
        r"fail:([1-9]\d*)": FAIL,
        r"skip:([1-9]\d*)": WARNING,
    }

    # Apply colors based on patterns
    for pattern, color in patterns.items():
        output = re.sub(pattern, lambda match: f"{color}{match.group(0)}{ENDC}", output, flags=re.MULTILINE)

    print(output)

combinations = list(itertools.product(*input_options.values()))
ksft.print_header()
ksft.set_plan(len(combinations))

logs = ""

for combination in combinations:
    append = ""
    log = ""
    for p in default_kparam:
        append += f' --append={p}'
    command = BOOT_CMD + append
    test_params = ""
    for i, key in enumerate(input_options.keys()):
        param = f'{key}={combination[i]}'
        test_params += f' {param}'
        command += f" --append={param}"
    command += f" -- {TEST}"
    test_name = f"{bug} {test_params}"
    pretty_print(f'# Testing {test_name}')
    t =  subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    t.wait()
    output, _ = t.communicate()
    if t.returncode == 0:
        ksft.test_result_pass(test_name)
    else:
        ksft.test_result_fail(test_name)
    output = output.decode()
    log += f" {output}"
    pretty_print(log)
    logs += output + "\n"

# Optionally use tappy to parse the output
# apt install python3-tappy
with open("logs.txt", "w") as f:
    f.write(logs)

ksft.finished()
Loading