Commit ac9b8293 authored by Pedro Tammela's avatar Pedro Tammela Committed by Paolo Abeni
Browse files

selftests/tc-testing: implement tdc parallel test run



Use a Python process pool to run the tests in parallel.
Not all tests can run in parallel, for instance tests that are not
namespaced and tests that use netdevsim, as they can conflict with one
another.

The code logic will split the tests into serial and parallel.
For the parallel tests, we build batches of 32 tests and queue each
batch on the process pool. For the serial tests, they are queued as a
whole into the process pool, which in turn executes them concurrently
with the parallel tests.

Even though the tests serialize on rtnl_lock in the kernel, this feature
showed results with a ~3x speedup on the wall time for the entire test suite
running in a VM:
   Before - 4m32.502s
   After - 1m19.202s

Examples:
   In order to run tdc using 4 processes:
      ./tdc.py -J4 <...>
   In order to run tdc using 1 process:
      ./tdc.py -J1 <...> || ./tdc.py <...>

Note that the kernel configuration will affect the speed of the tests,
especially if such configuration slows down process creation and/or
fork().

Tested-by: default avatarDavide Caratti <dcaratti@redhat.com>
Signed-off-by: default avatarPedro Tammela <pctammela@mojatatu.com>
Acked-by: default avatarJamal Hadi Salim <jhs@mojatatu.com>
Signed-off-by: default avatarPaolo Abeni <pabeni@redhat.com>
parent d227cc0b
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -59,7 +59,8 @@ class TestResult:
        return self.steps

class TestSuiteReport():
    _testsuite = []
    def __init__(self):
        self._testsuite = []

    def add_resultdata(self, result_data):
        if isinstance(result_data, TestResult):
+44 −35
Original line number Diff line number Diff line
@@ -3,65 +3,74 @@ import signal
from string import Template
import subprocess
import time
from multiprocessing import Pool
from functools import cached_property
from TdcPlugin import TdcPlugin

from tdc_config import *

class SubPlugin(TdcPlugin):
    def __init__(self):
        self.sub_class = 'ns/SubPlugin'
        super().__init__()

    def pre_suite(self, testcount, testlist):
        super().pre_suite(testcount, testlist)

        print("Setting up namespaces and devices...")
def prepare_suite(obj, test):
    original = obj.args.NAMES

        original = self.args.NAMES

        for t in testlist:
            if 'skip' in t and t['skip'] == 'yes':
                continue
    if 'skip' in test and test['skip'] == 'yes':
        return

            if 'nsPlugin' not in t['plugins']:
                continue
    if 'nsPlugin' not in test['plugins']:
        return

    shadow = {}
    shadow['IP'] = original['IP']
    shadow['TC'] = original['TC']
            shadow['NS'] = '{}-{}'.format(original['NS'], t['random'])
            shadow['DEV0'] = '{}id{}'.format(original['DEV0'], t['id'])
            shadow['DEV1'] = '{}id{}'.format(original['DEV1'], t['id'])
            shadow['DUMMY'] = '{}id{}'.format(original['DUMMY'], t['id'])
    shadow['NS'] = '{}-{}'.format(original['NS'], test['random'])
    shadow['DEV0'] = '{}id{}'.format(original['DEV0'], test['id'])
    shadow['DEV1'] = '{}id{}'.format(original['DEV1'], test['id'])
    shadow['DUMMY'] = '{}id{}'.format(original['DUMMY'], test['id'])
    shadow['DEV2'] = original['DEV2']
            self.args.NAMES = shadow
    obj.args.NAMES = shadow

            if self.args.namespace:
                self._ns_create()
    if obj.args.namespace:
        obj._ns_create()
    else:
                self._ports_create()

        self.args.NAMES = original

    def pre_case(self, caseinfo, test_skip):
        if self.args.verbose:
            print('{}.pre_case'.format(self.sub_class))

        if test_skip:
            return
        obj._ports_create()

    # Make sure the netns is visible in the fs
    while True:
            self._proc_check()
        obj._proc_check()
        try:
                ns = self.args.NAMES['NS']
            ns = obj.args.NAMES['NS']
            f = open('/run/netns/{}'.format(ns))
            f.close()
            break
        except:
            time.sleep(0.1)
            continue

    obj.args.NAMES = original

class SubPlugin(TdcPlugin):
    def __init__(self):
        self.sub_class = 'ns/SubPlugin'
        super().__init__()

    def pre_suite(self, testcount, testlist):
        from itertools import cycle

        super().pre_suite(testcount, testlist)

        print("Setting up namespaces and devices...")

        with Pool(self.args.mp) as p:
            it = zip(cycle([self]), testlist)
            p.starmap(prepare_suite, it)

    def pre_case(self, caseinfo, test_skip):
        if self.args.verbose:
            print('{}.pre_case'.format(self.sub_class))

        if test_skip:
            return


    def post_case(self):
        if self.args.verbose:
            print('{}.post_case'.format(self.sub_class))
+102 −21
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ import subprocess
import time
import traceback
import random
from multiprocessing import Pool
from collections import OrderedDict
from string import Template

@@ -477,26 +478,11 @@ def run_one_test(pm, args, index, tidx):

    return res

def test_runner(pm, args, filtered_tests):
    """
    Driver function for the unit tests.

    Prints information about the tests being run, executes the setup and
    teardown commands and the command under test itself. Also determines
    success/failure based on the information in the test case and generates
    TAP output accordingly.
    """
    testlist = filtered_tests
def prepare_run(pm, args, testlist):
    tcount = len(testlist)
    index = 1
    tap = ''
    badtest = None
    stage = None
    emergency_exit = False
    emergency_exit_message = ''

    tsr = TestSuiteReport()

    try:
        pm.call_pre_suite(tcount, testlist)
    except Exception as ee:
@@ -506,14 +492,37 @@ def test_runner(pm, args, filtered_tests):
        traceback.print_tb(ex_tb)
        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
        emergency_exit = True
        stage = 'pre-SUITE'

    if emergency_exit:
        pm.call_post_suite(index)
        pm.call_post_suite(1)
        return emergency_exit_message
    if args.verbose > 1:

    if args.verbose:
        print('give test rig 2 seconds to stabilize')

    time.sleep(2)

def purge_run(pm, index):
    pm.call_post_suite(index)

def test_runner(pm, args, filtered_tests):
    """
    Driver function for the unit tests.

    Prints information about the tests being run, executes the setup and
    teardown commands and the command under test itself. Also determines
    success/failure based on the information in the test case and generates
    TAP output accordingly.
    """
    testlist = filtered_tests
    tcount = len(testlist)
    index = 1
    tap = ''
    badtest = None
    stage = None

    tsr = TestSuiteReport()

    for tidx in testlist:
        if "flower" in tidx["category"] and args.device == None:
            errmsg = "Tests using the DEV2 variable must define the name of a "
@@ -576,7 +585,68 @@ def test_runner(pm, args, filtered_tests):
        if input(sys.stdin):
            print('got something on stdin')

    pm.call_post_suite(index)
    return (index, tsr)

def mp_bins(alltests):
    serial = []
    parallel = []

    for test in alltests:
        if 'nsPlugin' not in test['plugins']:
            serial.append(test)
        else:
            # We can only create one netdevsim device at a time
            if 'netdevsim/new_device' in str(test['setup']):
                serial.append(test)
            else:
                parallel.append(test)

    return (serial, parallel)

def __mp_runner(tests):
    (_, tsr) = test_runner(mp_pm, mp_args, tests)
    return tsr._testsuite

def test_runner_mp(pm, args, alltests):
    prepare_run(pm, args, alltests)

    (serial, parallel) = mp_bins(alltests)

    batches = [parallel[n : n + 32] for n in range(0, len(parallel), 32)]
    batches.insert(0, serial)

    print("Executing {} tests in parallel and {} in serial".format(len(parallel), len(serial)))
    print("Using {} batches".format(len(batches)))

    # We can't pickle these objects so workaround them
    global mp_pm
    mp_pm = pm

    global mp_args
    mp_args = args

    with Pool(args.mp) as p:
        pres = p.map(__mp_runner, batches)

    tsr = TestSuiteReport()
    for trs in pres:
        for res in trs:
            tsr.add_resultdata(res)

    # Passing an index is not useful in MP
    purge_run(pm, None)

    return tsr

def test_runner_serial(pm, args, alltests):
    prepare_run(pm, args, alltests)

    if args.verbose:
        print("Executing {} tests in serial".format(len(alltests)))

    (index, tsr) = test_runner(pm, args, alltests)

    purge_run(pm, index)

    return tsr

@@ -605,12 +675,15 @@ def load_from_file(filename):
                k['filename'] = filename
    return testlist

def identity(string):
    return string

def args_parse():
    """
    Create the argument parser.
    """
    parser = argparse.ArgumentParser(description='Linux TC unit tests')
    parser.register('type', None, identity)
    return parser


@@ -668,6 +741,9 @@ def set_args(parser):
    parser.add_argument(
        '-P', '--pause', action='store_true',
        help='Pause execution just before post-suite stage')
    parser.add_argument(
        '-J', '--multiprocess', type=int, default=1, dest='mp',
        help='Run tests in parallel whenever possible')
    return parser


@@ -888,7 +964,12 @@ def set_operation_mode(pm, parser, args, remaining):
        except PluginDependencyException as pde:
            print('The following plugins were not found:')
            print('{}'.format(pde.missing_pg))
        catresults = test_runner(pm, args, alltests)

        if args.mp > 1:
            catresults = test_runner_mp(pm, args, alltests)
        else:
            catresults = test_runner_serial(pm, args, alltests)

        if catresults.count_failures() != 0:
            exit_code = 1 # KSFT_FAIL
        if args.format == 'none':