Commit 8aefcfa0 authored by Donald Hunter's avatar Donald Hunter Committed by Jakub Kicinski
Browse files

tools/net/ynl: add async notification handling



The notification handling in ynl is currently very simple, using sleep()
to wait a period of time and then handling all the buffered messages in
a single batch.

This patch adds async notification handling so that messages can be
processed as they are received. This makes it possible to use ynl as a
library that supplies notifications in a timely manner.

- Add poll_ntf() to be a generator that yields 1 notification at a
  time and blocks until a notification is available.
- Add a --duration parameter to the CLI, with --sleep as an alias.

./tools/net/ynl/cli.py \
    --spec <SPEC> --subscribe <TOPIC> [ --duration <SECS> ]

The cli will report any notifications for duration seconds and then
exit. If duration is not specified, then it will poll forever, until
interrupted.

Here is an example python snippet that shows how to use ynl as a library
for receiving notifications:

    ynl = YnlFamily(f"{dir}/rt_route.yaml")
    ynl.ntf_subscribe('rtnlgrp-ipv4-route')

    for event in ynl.poll_ntf():
        handle(event)

Signed-off-by: default avatarDonald Hunter <donald.hunter@gmail.com>
Link: https://patch.msgid.link/20241113090843.72917-3-donald.hunter@gmail.com


Signed-off-by: default avatarJakub Kicinski <kuba@kernel.org>
parent cef009cc
Loading
Loading
Loading
Loading
+9 −7
Original line number Diff line number Diff line
@@ -6,7 +6,6 @@ import json
import pathlib
import pprint
import sys
import time

sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
from lib import YnlFamily, Netlink, NlError
@@ -46,7 +45,10 @@ def main():
    group.add_argument('--list-ops', action='store_true')
    group.add_argument('--list-msgs', action='store_true')

    parser.add_argument('--sleep', dest='sleep', type=int)
    parser.add_argument('--duration', dest='duration', type=int,
                        help='when subscribed, watch for DURATION seconds')
    parser.add_argument('--sleep', dest='duration', type=int,
                        help='alias for duration')
    parser.add_argument('--subscribe', dest='ntf', type=str)
    parser.add_argument('--replace', dest='flags', action='append_const',
                        const=Netlink.NLM_F_REPLACE)
@@ -83,9 +85,6 @@ def main():
    if args.ntf:
        ynl.ntf_subscribe(args.ntf)

    if args.sleep:
        time.sleep(args.sleep)

    if args.list_ops:
        for op_name, op in ynl.ops.items():
            print(op_name, " [", ", ".join(op.modes), "]")
@@ -109,8 +108,11 @@ def main():
        exit(1)

    if args.ntf:
        ynl.check_ntf()
        output(ynl.async_msg_queue)
        try:
            for msg in ynl.poll_ntf(duration=args.duration):
                output(msg)
        except KeyboardInterrupt:
            pass


if __name__ == "__main__":
+25 −3
Original line number Diff line number Diff line
@@ -12,6 +12,9 @@ import sys
import yaml
import ipaddress
import uuid
import queue
import selectors
import time

from .nlspec import SpecFamily

@@ -489,7 +492,7 @@ class YnlFamily(SpecFamily):
        self.sock.setsockopt(Netlink.SOL_NETLINK, Netlink.NETLINK_GET_STRICT_CHK, 1)

        self.async_msg_ids = set()
        self.async_msg_queue = []
        self.async_msg_queue = queue.Queue()

        for msg in self.msgs.values():
            if msg.is_async:
@@ -903,7 +906,7 @@ class YnlFamily(SpecFamily):

        msg['name'] = op['name']
        msg['msg'] = attrs
        self.async_msg_queue.append(msg)
        self.async_msg_queue.put(msg)

    def check_ntf(self):
        while True:
@@ -925,11 +928,30 @@ class YnlFamily(SpecFamily):

                decoded = self.nlproto.decode(self, nl_msg, None)
                if decoded.cmd() not in self.async_msg_ids:
                    print("Unexpected msg id done while checking for ntf", decoded)
                    print("Unexpected msg id while checking for ntf", decoded)
                    continue

                self.handle_ntf(decoded)

    def poll_ntf(self, duration=None):
        start_time = time.time()
        selector = selectors.DefaultSelector()
        selector.register(self.sock, selectors.EVENT_READ)

        while True:
            try:
                yield self.async_msg_queue.get_nowait()
            except queue.Empty:
                if duration is not None:
                    timeout = start_time + duration - time.time()
                    if timeout <= 0:
                        return
                else:
                    timeout = None
                events = selector.select(timeout)
                if events:
                    self.check_ntf()

    def operation_do_attributes(self, name):
      """
      For a given operation name, find and return a supported