Commit 2748087c authored by Jakub Kicinski's avatar Jakub Kicinski Committed by Paolo Abeni
Browse files

selftests: drv-net: psp: add connection breaking tests



Add test checking conditions which lead to connections breaking.
Using bad key or connection gets stuck if device key is rotated
twice.

Signed-off-by: default avatarJakub Kicinski <kuba@kernel.org>
Signed-off-by: default avatarDaniel Zahka <daniel.zahka@gmail.com>
Link: https://patch.msgid.link/20250927225420.1443468-7-kuba@kernel.org


Reviewed-by: default avatarWillem de Bruijn <willemb@google.com>
Signed-off-by: default avatarPaolo Abeni <pabeni@redhat.com>
parent 81b89085
Loading
Loading
Loading
Loading
+91 −1
Original line number Diff line number Diff line
@@ -98,6 +98,16 @@ def _check_data_rx(cfg, exp_len):
        time.sleep(0.01)
    ksft_eq(read_len, exp_len)


def _check_data_outq(s, exp_len, force_wait=False):
    outq = 0
    for _ in range(10):
        outq = _get_outq(s)
        if not force_wait and outq == exp_len:
            break
        time.sleep(0.01)
    ksft_eq(outq, exp_len)

#
# Test case boiler plate
#
@@ -372,6 +382,85 @@ def _data_basic_send(cfg, version, ipver):
    _close_psp_conn(cfg, s)


def __bad_xfer_do(cfg, s, tx, version='hdr0-aes-gcm-128'):
    # Make sure we accept the ACK for the SPI before we seal with the bad assoc
    _check_data_outq(s, 0)

    cfg.pspnl.tx_assoc({"dev-id": cfg.psp_dev_id,
                        "version": version,
                        "tx-key": tx,
                        "sock-fd": s.fileno()})

    data_len = _send_careful(cfg, s, 20)
    _check_data_outq(s, data_len, force_wait=True)
    _check_data_rx(cfg, 0)
    _close_psp_conn(cfg, s)


def data_send_bad_key(cfg):
    """ Test send data with bad key """
    _init_psp_dev(cfg)

    s = _make_psp_conn(cfg)

    rx_assoc = cfg.pspnl.rx_assoc({"version": 0,
                                   "dev-id": cfg.psp_dev_id,
                                   "sock-fd": s.fileno()})
    rx = rx_assoc['rx-key']
    tx = _spi_xchg(s, rx)
    tx['key'] = (tx['key'][0] ^ 0xff).to_bytes(1, 'little') + tx['key'][1:]
    __bad_xfer_do(cfg, s, tx)


def data_send_disconnect(cfg):
    """ Test socket close after sending data """
    _init_psp_dev(cfg)

    with _make_psp_conn(cfg) as s:
        assoc = cfg.pspnl.rx_assoc({"version": 0,
                                  "sock-fd": s.fileno()})
        tx = _spi_xchg(s, assoc['rx-key'])
        cfg.pspnl.tx_assoc({"version": 0,
                          "tx-key": tx,
                          "sock-fd": s.fileno()})

        data_len = _send_careful(cfg, s, 100)
        _check_data_rx(cfg, data_len)

        s.shutdown(socket.SHUT_RDWR)
        s.close()


def data_stale_key(cfg):
    """ Test send on a double-rotated key """
    _init_psp_dev(cfg)

    s = _make_psp_conn(cfg)
    try:
        rx_assoc = cfg.pspnl.rx_assoc({"version": 0,
                                     "dev-id": cfg.psp_dev_id,
                                     "sock-fd": s.fileno()})
        rx = rx_assoc['rx-key']
        tx = _spi_xchg(s, rx)

        cfg.pspnl.tx_assoc({"dev-id": cfg.psp_dev_id,
                          "version": 0,
                          "tx-key": tx,
                          "sock-fd": s.fileno()})

        data_len = _send_careful(cfg, s, 100)
        _check_data_rx(cfg, data_len)
        _check_data_outq(s, 0)

        cfg.pspnl.key_rotate({"id": cfg.psp_dev_id})
        cfg.pspnl.key_rotate({"id": cfg.psp_dev_id})

        s.send(b'0123456789' * 200)
        _check_data_outq(s, 2000, force_wait=True)
    finally:
        _close_psp_conn(cfg, s)


def psp_ip_ver_test_builder(name, test_func, psp_ver, ipver):
    """Build test cases for each combo of PSP version and IP version"""
    def test_case(cfg):
@@ -410,7 +499,8 @@ def main() -> None:
                ]

                ksft_run(cases=cases, globs=globals(),
                         case_pfx={"dev_", "assoc_"}, args=(cfg, ))
                         case_pfx={"dev_", "data_", "assoc_"},
                         args=(cfg, ))

                cfg.comm_sock.send(b"exit\0")
                cfg.comm_sock.close()