diff options
| author | Christian Hopps <chopps@labn.net> | 2021-12-24 02:04:57 -0500 | 
|---|---|---|
| committer | Christian Hopps <chopps@labn.net> | 2022-06-02 16:37:16 -0400 | 
| commit | 9191ac86fdb70e057ba46827a99c975507365bad (patch) | |
| tree | 0be78dd76f744791e3dec0274ac2183b10991309 /ospfclient | |
| parent | bd1188f904b63664c3b350ca587a04547210bf15 (diff) | |
ospfclient: add ospfclient api python class
Signed-off-by: Christian Hopps <chopps@labn.net>
Diffstat (limited to 'ospfclient')
| -rwxr-xr-x | ospfclient/ospfclient.py | 1133 | ||||
| -rw-r--r-- | ospfclient/subdir.am | 8 | 
2 files changed, 1141 insertions, 0 deletions
diff --git a/ospfclient/ospfclient.py b/ospfclient/ospfclient.py new file mode 100755 index 0000000000..be8b51f007 --- /dev/null +++ b/ospfclient/ospfclient.py @@ -0,0 +1,1133 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 eval: (blacken-mode 1) -*- +# +# December 22 2021, Christian Hopps <chopps@labn.net> +# +# Copyright 2021, LabN Consulting, L.L.C. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; see the file COPYING; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +# + +import argparse +import asyncio +import errno +import logging +import socket +import struct +import sys +from asyncio import Event, Lock +from ipaddress import ip_address as ip + +FMT_APIMSGHDR = ">BBHL" +FMT_APIMSGHDR_SIZE = struct.calcsize(FMT_APIMSGHDR) + +FMT_LSA_FILTER = ">HBB"  # + plus x"I" areas +LSAF_ORIGIN_NON_SELF = 0 +LSAF_ORIGIN_SELF = 1 +LSAF_ORIGIN_ANY = 2 + +FMT_LSA_HEADER = ">HBBIILHH" +FMT_LSA_HEADER_SIZE = struct.calcsize(FMT_LSA_HEADER) + +# ------------------------ +# Messages to OSPF daemon. +# ------------------------ + +MSG_REGISTER_OPAQUETYPE = 1 +MSG_UNREGISTER_OPAQUETYPE = 2 +MSG_REGISTER_EVENT = 3 +MSG_SYNC_LSDB = 4 +MSG_ORIGINATE_REQUEST = 5 +MSG_DELETE_REQUEST = 6 +MSG_SYNC_REACHABLE = 7 +MSG_SYNC_ISM = 8 +MSG_SYNC_NSM = 9 + +smsg_info = { +    MSG_REGISTER_OPAQUETYPE: ("REGISTER_OPAQUETYPE", "BBxx"), +    MSG_UNREGISTER_OPAQUETYPE: ("UNREGISTER_OPAQUETYPE", "BBxx"), +    MSG_REGISTER_EVENT: ("REGISTER_EVENT", FMT_LSA_FILTER), +    MSG_SYNC_LSDB: ("SYNC_LSDB", FMT_LSA_FILTER), +    MSG_ORIGINATE_REQUEST: ("ORIGINATE_REQUEST", ">II" + FMT_LSA_HEADER[1:]), +    MSG_DELETE_REQUEST: ("DELETE_REQUEST", ">IBBxxL"), +    MSG_SYNC_REACHABLE: ("MSG_SYNC_REACHABLE", ""), +    MSG_SYNC_ISM: ("MSG_SYNC_ISM", ""), +    MSG_SYNC_NSM: ("MSG_SYNC_NSM", ""), +} + +# -------------------------- +# Messages from OSPF daemon. +# -------------------------- + +MSG_REPLY = 10 +MSG_READY_NOTIFY = 11 +MSG_LSA_UPDATE_NOTIFY = 12 +MSG_LSA_DELETE_NOTIFY = 13 +MSG_NEW_IF = 14 +MSG_DEL_IF = 15 +MSG_ISM_CHANGE = 16 +MSG_NSM_CHANGE = 17 +MSG_REACHABLE_CHANGE = 18 + +amsg_info = { +    MSG_REPLY: ("REPLY", "bxxx"), +    MSG_READY_NOTIFY: ("READY_NOTIFY", ">BBxxI"), +    MSG_LSA_UPDATE_NOTIFY: ("LSA_UPDATE_NOTIFY", ">IIBxxx" + FMT_LSA_HEADER[1:]), +    MSG_LSA_DELETE_NOTIFY: ("LSA_DELETE_NOTIFY", ">IIBxxx" + FMT_LSA_HEADER[1:]), +    MSG_NEW_IF: ("NEW_IF", ">II"), +    MSG_DEL_IF: ("DEL_IF", ">I"), +    MSG_ISM_CHANGE: ("ISM_CHANGE", ">IIBxxx"), +    MSG_NSM_CHANGE: ("NSM_CHANGE", ">IIIBxxx"), +    MSG_REACHABLE_CHANGE: ("REACHABLE_CHANGE", ">HH"), +} + +OSPF_API_OK = 0 +OSPF_API_NOSUCHINTERFACE = -1 +OSPF_API_NOSUCHAREA = -2 +OSPF_API_NOSUCHLSA = -3 +OSPF_API_ILLEGALLSATYPE = -4 +OSPF_API_OPAQUETYPEINUSE = -5 +OSPF_API_OPAQUETYPENOTREGISTERED = -6 +OSPF_API_NOTREADY = -7 +OSPF_API_NOMEMORY = -8 +OSPF_API_ERROR = -9 +OSPF_API_UNDEF = -10 + +msg_errname = { +    OSPF_API_OK: "OSPF_API_OK", +    OSPF_API_NOSUCHINTERFACE: "OSPF_API_NOSUCHINTERFACE", +    OSPF_API_NOSUCHAREA: "OSPF_API_NOSUCHAREA", +    OSPF_API_NOSUCHLSA: "OSPF_API_NOSUCHLSA", +    OSPF_API_ILLEGALLSATYPE: "OSPF_API_ILLEGALLSATYPE", +    OSPF_API_OPAQUETYPEINUSE: "OSPF_API_OPAQUETYPEINUSE", +    OSPF_API_OPAQUETYPENOTREGISTERED: "OSPF_API_OPAQUETYPENOTREGISTERED", +    OSPF_API_NOTREADY: "OSPF_API_NOTREADY", +    OSPF_API_NOMEMORY: "OSPF_API_NOMEMORY", +    OSPF_API_ERROR: "OSPF_API_ERROR", +    OSPF_API_UNDEF: "OSPF_API_UNDEF", +} + +# msg_info = {**smsg_info, **amsg_info} +msg_info = {} +msg_info.update(smsg_info) +msg_info.update(amsg_info) +msg_name = {k: v[0] for k, v in msg_info.items()} +msg_fmt = {k: v[1] for k, v in msg_info.items()} +msg_size = {k: struct.calcsize(v) for k, v in msg_fmt.items()} + + +def api_msgname(mt): +    return msg_name.get(mt, str(mt)) + + +def api_errname(ecode): +    return msg_errname.get(ecode, str(ecode)) + + +# ------------------- +# API Semantic Errors +# ------------------- + + +class APIError(Exception): +    pass + + +class MsgTypeError(Exception): +    pass + + +class SeqNumError(Exception): +    pass + + +# --------- +# LSA Types +# --------- + +LSA_TYPE_UNKNOWN = 0 +LSA_TYPE_ROUTER = 1 +LSA_TYPE_NETWORK = 2 +LSA_TYPE_SUMMARY = 3 +LSA_TYPE_ASBR_SUMMARY = 4 +LSA_TYPE_AS_EXTERNAL = 5 +LSA_TYPE_GROUP_MEMBER = 6 +LSA_TYPE_AS_NSSA = 7 +LSA_TYPE_EXTERNAL_ATTRIBUTES = 8 +LSA_TYPE_OPAQUE_LINK = 9 +LSA_TYPE_OPAQUE_AREA = 10 +LSA_TYPE_OPAQUE_AS = 11 + + +def lsa_typename(lsa_type): +    names = { +        LSA_TYPE_ROUTER: "LSA:ROUTER", +        LSA_TYPE_NETWORK: "LSA:NETWORK", +        LSA_TYPE_SUMMARY: "LSA:SUMMARY", +        LSA_TYPE_ASBR_SUMMARY: "LSA:ASBR_SUMMARY", +        LSA_TYPE_AS_EXTERNAL: "LSA:AS_EXTERNAL", +        LSA_TYPE_GROUP_MEMBER: "LSA:GROUP_MEMBER", +        LSA_TYPE_AS_NSSA: "LSA:AS_NSSA", +        LSA_TYPE_EXTERNAL_ATTRIBUTES: "LSA:EXTERNAL_ATTRIBUTES", +        LSA_TYPE_OPAQUE_LINK: "LSA:OPAQUE_LINK", +        LSA_TYPE_OPAQUE_AREA: "LSA:OPAQUE_AREA", +        LSA_TYPE_OPAQUE_AS: "LSA:OPAQUE_AS", +    } +    return names.get(lsa_type, str(lsa_type)) + + +# ------------------------------ +# Interface State Machine States +# ------------------------------ + +ISM_DEPENDUPON = 0 +ISM_DOWN = 1 +ISM_LOOPBACK = 2 +ISM_WAITING = 3 +ISM_POINTTOPOINT = 4 +ISM_DROTHER = 5 +ISM_BACKUP = 6 +ISM_DR = 7 + + +def ism_name(state): +    names = { +        ISM_DEPENDUPON: "ISM_DEPENDUPON", +        ISM_DOWN: "ISM_DOWN", +        ISM_LOOPBACK: "ISM_LOOPBACK", +        ISM_WAITING: "ISM_WAITING", +        ISM_POINTTOPOINT: "ISM_POINTTOPOINT", +        ISM_DROTHER: "ISM_DROTHER", +        ISM_BACKUP: "ISM_BACKUP", +        ISM_DR: "ISM_DR", +    } +    return names.get(state, str(state)) + + +# ----------------------------- +# Neighbor State Machine States +# ----------------------------- + +NSM_DEPENDUPON = 0 +NSM_DELETED = 1 +NSM_DOWN = 2 +NSM_ATTEMPT = 3 +NSM_INIT = 4 +NSM_TWOWAY = 5 +NSM_EXSTART = 6 +NSM_EXCHANGE = 7 +NSM_LOADING = 8 +NSM_FULL = 9 + + +def nsm_name(state): +    names = { +        NSM_DEPENDUPON: "NSM_DEPENDUPON", +        NSM_DELETED: "NSM_DELETED", +        NSM_DOWN: "NSM_DOWN", +        NSM_ATTEMPT: "NSM_ATTEMPT", +        NSM_INIT: "NSM_INIT", +        NSM_TWOWAY: "NSM_TWOWAY", +        NSM_EXSTART: "NSM_EXSTART", +        NSM_EXCHANGE: "NSM_EXCHANGE", +        NSM_LOADING: "NSM_LOADING", +        NSM_FULL: "NSM_FULL", +    } +    return names.get(state, str(state)) + + +# -------------- +# Client Classes +# -------------- + + +class OspfApiClient: +    def __str__(self): +        return "OspfApiClient({})".format(self.server) + +    @staticmethod +    def _get_bound_sockets(port): +        s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) +        try: +            s1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +            # s1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) +            s1.bind(("", port)) +            s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) +            try: +                s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +                # s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) +                s2.bind(("", port + 1)) +                return s1, s2 +            except Exception: +                s2.close() +                raise +        except Exception: +            s1.close() +            raise + +    def __init__(self, server="localhost", handlers=None): +        """A client connection to OSPF Daemon using the OSPF API + +        The client object is not created in a connected state.  To connect to the server +        the `connect` method should be called.  If an error is encountered when sending +        messages to the server an exception will be raised and the connection will be +        closed.  When this happens `connect` may be called again to restore the +        connection. + +        Args: +            server: hostname or IP address of server default is "localhost" +            handlers: dict of message handlers, the key is the API message +                type, the value is a function. The functions signature is: +                `handler(msg_type, msg, msg_extra, *params)`, where `msg` is the +                message data after the API header, `*params` will be the +                unpacked message values, and msg_extra are any bytes beyond the +                fixed parameters of the message. +        Raises: +            Will raise exceptions for failures with various `socket` modules +            functions such as `socket.socket`, `socket.setsockopt`, `socket.bind`. +        """ +        self._seq = 0 +        self._s = None +        self._as = None +        self._ls = None +        self._ar = self._r = self._w = None +        self.server = server +        self.handlers = handlers if handlers is not None else dict() +        self.write_lock = Lock() + +        # try and get consecutive 2 ports +        PORTSTART = 49152 +        PORTEND = 65534 +        for port in range(PORTSTART, PORTEND + 2, 2): +            try: +                logging.debug("%s: binding to ports %s, %s", self, port, port + 1) +                self._s, self._ls = self._get_bound_sockets(port) +                break +            except OSError as error: +                if error.errno != errno.EADDRINUSE or port == PORTEND: +                    logging.warning("%s: binding port %s error %s", self, port, error) +                    raise +                logging.debug("%s: ports %s, %s in use.", self, port, port + 1) +        else: +            assert False, "Should not reach this code execution point" + +    async def _connect_locked(self): +        logging.debug("%s: connect to OSPF API", self) + +        loop = asyncio.get_event_loop() + +        self._ls.listen() +        try: +            logging.debug("%s: connecting sync socket to server", self) +            await loop.sock_connect(self._s, (self.server, 2607)) + +            logging.debug("%s: accepting connect from server", self) +            self._as, _ = await loop.sock_accept(self._ls) +        except Exception: +            await self._close_locked() +            raise + +        logging.debug("%s: success", self) +        self._r, self._w = await asyncio.open_connection(sock=self._s) +        self._ar, _ = await asyncio.open_connection(sock=self._as) +        self._seq = 1 + +    async def connect(self): +        async with self.write_lock: +            await self._connect_locked() + +    @property +    def closed(self): +        "True if the connection is closed." +        return self._seq == 0 + +    async def _close_locked(self): +        logging.debug("%s: closing", self) +        if self._s: +            if self._w: +                self._w.close() +                await self._w.wait_closed() +                self._w = None +            else: +                self._s.close() +            self._s = None +            self._r = None +        assert self._w is None +        if self._as: +            self._as.close() +            self._as = None +            self._ar = None +        if self._ls: +            self._ls.close() +            self._ls = None +        self._seq = 0 + +    async def close(self): +        async with self.write_lock: +            await self._close_locked() + +    @staticmethod +    async def _msg_read(r, expseq=-1): +        """Read an OSPF API message from the socket `r` + +        Args: +            r: socket to read msg from +            expseq: sequence number to expect or -1 for any. +        Raises: +            Will raise exceptions for failures with various `socket` modules, +            Additionally may raise SeqNumError if unexpected seqnum is received. +        """ +        try: +            mh = await r.readexactly(FMT_APIMSGHDR_SIZE) +            v, mt, l, seq = struct.unpack(FMT_APIMSGHDR, mh) +            if v != 1: +                raise Exception("received unexpected OSPF API version {}".format(v)) +            if expseq == -1: +                logging.debug("_msg_read: got seq: 0x%x on async read", seq) +            elif seq != expseq: +                raise SeqNumError("rx {} != {}".format(seq, expseq)) +            msg = await r.readexactly(l) if l else b"" +            return mt, msg +        except asyncio.IncompleteReadError: +            raise EOFError + +    async def msg_read(self): +        """Read a message from the async notify channel. + +        Raises: +            May raise exceptions for failures with various `socket` modules. +        """ +        return await OspfApiClient._msg_read(self._ar, -1) + +    async def msg_send(self, mt, mp): +        """Send a message to OSPF API and wait for error code reply. + +        Args: +            mt: the messaage type +            mp: the message payload +        Returns: +            error: an OSPF_API_XXX error code, 0 for OK. +        Raises: +            Raises SeqNumError if the synchronous reply is the wrong sequence number; +            MsgTypeError if the synchronous reply is not MSG_REPLY. Also, +            may raise exceptions for failures with various `socket` modules, + +            The connection will be closed. +        """ +        logging.debug("SEND: %s: sending %s seq 0x%x", self, api_msgname(mt), self._seq) +        mh = struct.pack(FMT_APIMSGHDR, 1, mt, len(mp), self._seq) + +        seq = self._seq +        self._seq = seq + 1 + +        try: +            async with self.write_lock: +                self._w.write(mh + mp) +                await self._w.drain() +                mt, mp = await OspfApiClient._msg_read(self._r, seq) + +            if mt != MSG_REPLY: +                raise MsgTypeError( +                    "rx {} != {}".format(api_msgname(mt), api_msgname(MSG_REPLY)) +                ) + +            return struct.unpack(msg_fmt[MSG_REPLY], mp)[0] +        except Exception: +            # We've written data with a sequence number +            await self.close() +            raise + +    async def msg_send_raises(self, mt, mp=b"\x00" * 4): +        """Send a message to OSPF API and wait for error code reply. + +        Args: +            mt: the messaage type +            mp: the message payload +        Raises: +            APIError if the server replies with an error. + +            Also may raise exceptions for failures with various `socket` modules, +            as well as MsgTypeError if the synchronous reply is incorrect. +            The connection will be closed for these non-API error exceptions. +        """ +        ecode = await self.msg_send(mt, mp) +        if ecode: +            raise APIError("{} error {}".format(api_msgname(mt), api_errname(ecode))) + +    async def handle_async_msg(self, mt, msg): +        if mt not in msg_fmt: +            logging.debug("RECV: %s: unknown async msg type %s", self, mt) +            return + +        fmt = msg_fmt[mt] +        sz = msg_size[mt] +        tup = struct.unpack(fmt, msg[:sz]) +        extra = msg[sz:] + +        if mt not in self.handlers: +            logging.debug( +                "RECV: %s: no handlers for msg type %s", self, api_msgname(mt) +            ) +            return + +        logging.debug("RECV: %s: calling handler for %s", self, api_msgname(mt)) +        await self.handlers[mt](mt, msg, extra, *tup) + +    # +    # Client to Server Messaging +    # +    @staticmethod +    def lsa_type_mask(*lsa_types): +        "Return a 16 bit mask for each LSA type passed." +        if not lsa_types: +            return 0xFFFF +        mask = 0 +        for t in lsa_types: +            assert 0 < t < 16, "LSA type {} out of range [1, 15]".format(t) +            mask |= 1 << t +        return mask + +    @staticmethod +    def lsa_filter(origin, areas, lsa_types): +        """Return an LSA filter. + +        Return the filter message bytes based on `origin` the `areas` list and the LSAs +        types in the `lsa_types` list. +        """ +        mask = OspfApiClient.lsa_type_mask(*lsa_types) +        narea = len(areas) +        fmt = FMT_LSA_FILTER + ("{}I".format(narea) if narea else "") +        # lsa type mask, origin, number of areas, each area +        return struct.pack(fmt, mask, origin, narea, *areas) + +    async def req_lsdb_sync(self): +        "Register for all LSA notifications and request an LSDB synchronoization." +        logging.debug("SEND: %s: request LSDB events", self) +        mp = OspfApiClient.lsa_filter(LSAF_ORIGIN_ANY, [], []) +        await self.msg_send_raises(MSG_REGISTER_EVENT, mp) + +        logging.debug("SEND: %s: request LSDB sync", self) +        await self.msg_send_raises(MSG_SYNC_LSDB, mp) + +    async def req_reachable_routers(self): +        "Request a dump of all reachable routers." +        logging.debug("SEND: %s: request reachable changes", self) +        await self.msg_send_raises(MSG_SYNC_REACHABLE) + +    async def req_ism_states(self): +        "Request a dump of the current ISM states of all interfaces." +        logging.debug("SEND: %s: request ISM changes", self) +        await self.msg_send_raises(MSG_SYNC_ISM) + +    async def req_nsm_states(self): +        "Request a dump of the current NSM states of all neighbors." +        logging.debug("SEND: %s: request NSM changes", self) +        await self.msg_send_raises(MSG_SYNC_NSM) + + +class OspfOpaqueClient(OspfApiClient): +    """A client connection to OSPF Daemon for manipulating Opaque LSA data. + +    The client object is not created in a connected state.  To connect to the server +    the `connect` method should be called.  If an error is encountered when sending +    messages to the server an exception will be raised and the connection will be +    closed.  When this happens `connect` may be called again to restore the +    connection. + +    Args: +        server: hostname or IP address of server default is "localhost" + +    Raises: +        Will raise exceptions for failures with various `socket` modules +        functions such as `socket.socket`, `socket.setsockopt`, `socket.bind`. +    """ + +    def __init__(self, server="localhost"): +        handlers = { +            MSG_READY_NOTIFY: self._ready_msg, +            MSG_LSA_UPDATE_NOTIFY: self._lsa_change_msg, +            MSG_LSA_DELETE_NOTIFY: self._lsa_change_msg, +            MSG_NEW_IF: self._if_msg, +            MSG_DEL_IF: self._if_msg, +            MSG_ISM_CHANGE: self._if_change_msg, +            MSG_NSM_CHANGE: self._nbr_change_msg, +            MSG_REACHABLE_CHANGE: self._reachable_msg, +        } +        super().__init__(server, handlers) + +        self.ready_lock = Lock() +        self.ready_cond = { +            LSA_TYPE_OPAQUE_LINK: {}, +            LSA_TYPE_OPAQUE_AREA: {}, +            LSA_TYPE_OPAQUE_AS: {}, +        } +        self.lsid_seq_num = {} + +        self.lsa_change_cb = None +        self.opaque_change_cb = {} + +        self.reachable_routers = set() +        self.reachable_change_cb = None + +        self.if_area = {} +        self.ism_states = {} +        self.ism_change_cb = None + +        self.nsm_states = {} +        self.nsm_change_cb = None + +    async def _register_opaque_data(self, lsa_type, otype): +        async with self.ready_lock: +            cond = self.ready_cond[lsa_type].get(otype) +            assert cond is None, "multiple registers for {} opaque-type {}".format( +                lsa_typename(lsa_type), otype +            ) + +            logging.debug("register %s opaque-type %s", lsa_typename(lsa_type), otype) + +            mt = MSG_REGISTER_OPAQUETYPE +            mp = struct.pack(msg_fmt[mt], lsa_type, otype) +            await self.msg_send_raises(mt, mp) + +    async def _assure_opaque_ready(self, lsa_type, otype): +        async with self.ready_lock: +            if self.ready_cond[lsa_type].get(otype) is True: +                return + +        await self._register_opaque_data(lsa_type, otype) +        await self.wait_opaque_ready(lsa_type, otype) + +    async def _handle_msg_loop(self): +        try: +            logging.debug("entering async msg handling loop") +            while True: +                mt, msg = await self.msg_read() +                if mt in amsg_info: +                    await self.handle_async_msg(mt, msg) +                else: +                    mts = api_msgname(mt) +                    logging.warning( +                        "ignoring unexpected msg: %s len: %s", mts, len(msg) +                    ) +        except EOFError: +            logging.info("Got EOF from OSPF API server on async notify socket") +            return 2 + +    @staticmethod +    def _opaque_args(lsa_type, otype, oid, mp): +        lsid = (otype << 24) | oid +        return 0, 0, lsa_type, lsid, 0, 0, 0, FMT_LSA_HEADER_SIZE + len(mp) + +    @staticmethod +    def _make_opaque_lsa(lsa_type, otype, oid, mp): +        # /* Make a new LSA from parameters */ +        lsa = struct.pack( +            FMT_LSA_HEADER, *OspfOpaqueClient._opaque_args(lsa_type, otype, oid, mp) +        ) +        lsa += mp +        return lsa + +    async def _ready_msg(self, mt, msg, extra, lsa_type, otype, addr): +        if lsa_type == LSA_TYPE_OPAQUE_LINK: +            e = "ifaddr {}".format(ip(addr)) +        elif lsa_type == LSA_TYPE_OPAQUE_AREA: +            e = "area {}".format(ip(addr)) +        else: +            e = "" +        logging.info( +            "RECV: %s ready notify for %s opaque-type %s%s", +            self, +            lsa_typename(lsa_type), +            otype, +            e, +        ) + +        # Signal all waiting senders they can send now. +        async with self.ready_lock: +            cond = self.ready_cond[lsa_type].get(otype) +            self.ready_cond[lsa_type][otype] = True + +        if cond is True: +            logging.warning( +                "RECV: dup ready received for %s opaque-type %s", +                lsa_typename(lsa_type), +                otype, +            ) +        elif cond: +            for evt in cond: +                evt.set() + +    async def _if_msg(self, mt, msg, extra, *args): +        if mt == MSG_NEW_IF: +            ifaddr, aid = args +        else: +            assert mt == MSG_DEL_IF +            ifaddr, aid = args[0], 0 +        logging.info( +            "RECV: %s ifaddr %s areaid %s", api_msgname(mt), ip(ifaddr), ip(aid) +        ) + +    async def _if_change_msg(self, mt, msg, extra, ifaddr, aid, state): +        ifaddr = ip(ifaddr) +        aid = ip(aid) + +        logging.info( +            "RECV: %s ifaddr %s areaid %s state %s", +            api_msgname(mt), +            ifaddr, +            aid, +            ism_name(state), +        ) + +        self.if_area[ifaddr] = aid +        self.ism_states[ifaddr] = state + +        if self.ism_change_cb: +            self.ism_change_cb(ifaddr, aid, state) + +    async def _nbr_change_msg(self, mt, msg, extra, ifaddr, nbraddr, router_id, state): +        ifaddr = ip(ifaddr) +        nbraddr = ip(nbraddr) +        router_id = ip(router_id) + +        logging.info( +            "RECV: %s ifaddr %s nbraddr %s router_id %s state %s", +            api_msgname(mt), +            ifaddr, +            nbraddr, +            router_id, +            nsm_name(state), +        ) + +        if ifaddr not in self.nsm_states: +            self.nsm_states[ifaddr] = {} +        self.nsm_states[ifaddr][(nbraddr, router_id)] = state + +        if self.nsm_change_cb: +            self.nsm_change_cb(ifaddr, nbraddr, router_id, state) + +    async def _lsa_change_msg(self, mt, msg, extra, ifaddr, aid, is_self, *ls_header): +        ( +            lsa_age,  # ls_age, +            _,  # ls_options, +            lsa_type, +            ls_id, +            _,  # ls_adv_router, +            ls_seq, +            _,  # ls_cksum, +            ls_len, +        ) = ls_header + +        otype = (ls_id >> 24) & 0xFF + +        if mt == MSG_LSA_UPDATE_NOTIFY: +            ts = "update" +        else: +            assert mt == MSG_LSA_DELETE_NOTIFY +            ts = "delete" + +        logging.info( +            "RECV: LSA %s msg for LSA %s in area %s seq 0x%x len %s age %s", +            ts, +            ip(ls_id), +            ip(aid), +            ls_seq, +            ls_len, +            lsa_age, +        ) +        idx = (lsa_type, otype) + +        pre_lsa_size = msg_size[mt] - FMT_LSA_HEADER_SIZE +        lsa = msg[pre_lsa_size:] + +        if idx in self.opaque_change_cb: +            self.opaque_change_cb[idx](mt, ifaddr, aid, ls_header, extra, lsa) + +        if self.lsa_change_cb: +            self.lsa_change_cb(mt, ifaddr, aid, ls_header, extra, lsa) + +    async def _reachable_msg(self, mt, msg, extra, nadd, nremove): +        router_ids = struct.unpack(">{}I".format(nadd + nremove), extra) +        router_ids = [ip(x) for x in router_ids] +        logging.info( +            "RECV: %s added %s removed %s", +            api_msgname(mt), +            router_ids[:nadd], +            router_ids[nadd:], +        ) +        self.reachable_routers |= set(router_ids[:nadd]) +        self.reachable_routers -= set(router_ids[nadd:]) +        logging.info("RECV: %s new set %s", api_msgname(mt), self.reachable_routers) + +        if self.reachable_change_cb: +            logging.info("RECV: %s calling callback", api_msgname(mt)) +            await self.reachable_change_cb(router_ids[:nadd], router_ids[nadd:]) + +    async def add_opaque_data(self, addr, lsa_type, otype, oid, data): +        """Add an instance of opaque data. + +        Add an instance of opaque data. This call will register for the given +        LSA and opaque type if not already done. + +        Args: +            addr: depends on lsa_type, LINK => ifaddr, AREA => area ID, AS => ignored +            lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS} +            otype: (octet) opaque type +            oid: (3 octets) ID of this opaque data +            data: the opaque data +        Raises: +            See `msg_send_raises` +        """ + +        if lsa_type == LSA_TYPE_OPAQUE_LINK: +            ifaddr, aid = int(addr), 0 +        elif lsa_type == LSA_TYPE_OPAQUE_AREA: +            ifaddr, aid = 0, int(addr) +        else: +            assert lsa_type == LSA_TYPE_OPAQUE_AS +            ifaddr, aid = 0, 0 + +        mt = MSG_ORIGINATE_REQUEST +        msg = struct.pack( +            msg_fmt[mt], +            ifaddr, +            aid, +            *OspfOpaqueClient._opaque_args(lsa_type, otype, oid, data), +        ) +        msg += data +        await self._assure_opaque_ready(lsa_type, otype) +        await self.msg_send_raises(mt, msg) + +    async def delete_opaque_data(self, addr, lsa_type, otype, oid): +        """Delete an instance of opaque data. + +        Delete an instance of opaque data. This call will register for the given +        LSA and opaque type if not already done. + +        Args: +            addr: depends on lsa_type, LINK => ifaddr, AREA => area ID, AS => ignored +            lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS} +            otype: (octet) opaque type. Note: the type will be registered if the user +                has not explicity done that yet with `register_opaque_data`. +            oid: (3 octets) ID of this opaque data +        Raises: +            See `msg_send_raises` +        """ +        if (lsa_type, otype) in self.opaque_change_cb: +            del self.opaque_change_cb[(lsa_type, otype)] + +        mt = MSG_DELETE_REQUEST +        await self._assure_opaque_ready(lsa_type, otype) +        mp = struct.pack(msg_fmt[mt], int(addr), lsa_type, otype, oid) +        await self.msg_send_raises(mt, mp) + +    async def register_opaque_data(self, lsa_type, otype, callback=None): +        """Register intent to advertise opaque data. + +        The application should wait for the async notificaiton that the server is +        ready to advertise the given opaque data type. The API currently only allows +        a single "owner" of each unique (lsa_type,otype). To wait call `wait_opaque_ready` + +        Args: +            lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS} +            otype: (octet) opaque type. Note: the type will be registered if the user +                has not explicity done that yet with `register_opaque_data`. +            callback: if given, callback will be called when changes are received for +                LSA of the given (lsa_type, otype). The callbacks signature is: + +                `callback(msg_type, ifaddr, area_id, lsa_header, data, lsa)` + +                Args: +                    msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY +                    ifaddr: integer identifying an interface (by IP address) +                    area_id: integer identifying an area +                    lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH") +                    data: the opaque data that follows the LSA header +                    lsa: the octets of the full lsa +        Raises: +            See `msg_send_raises` +        """ +        if callback: +            self.opaque_change_cb[(lsa_type, otype)] = callback +        elif (lsa_type, otype) in self.opaque_change_cb: +            logging.warning( +                "OSPFCLIENT: register: removing callback for %s opaque-type %s", +                lsa_typename(lsa_type), +                otype, +            ) +            del self.opaque_change_cb[(lsa_type, otype)] + +        await self._register_opaque_data(lsa_type, otype) + +    async def wait_opaque_ready(self, lsa_type, otype): +        async with self.ready_lock: +            cond = self.ready_cond[lsa_type].get(otype) +            if cond is True: +                return + +            logging.debug( +                "waiting for ready %s opaque-type %s", lsa_typename(lsa_type), otype +            ) + +            if not cond: +                cond = self.ready_cond[lsa_type][otype] = [] + +            evt = Event() +            cond.append(evt) + +        await evt.wait() +        logging.debug("READY for %s opaque-type %s", lsa_typename(lsa_type), otype) + +    async def register_opaque_data_wait(self, lsa_type, otype, callback=None): +        """Register intent to advertise opaque data and wait for ready. + +        The API currently only allows a single "owner" of each unique (lsa_type,otype). + +        Args: +            lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS} +            otype: (octet) opaque type. Note: the type will be registered if the user +                has not explicity done that yet with `register_opaque_data`. +            callback: if given, callback will be called when changes are received for +                LSA of the given (lsa_type, otype). The callbacks signature is: + +                `callback(msg_type, ifaddr, area_id, lsa_header, data, lsa)` + +                Args: +                    msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY +                    ifaddr: integer identifying an interface (by IP address) +                    area_id: integer identifying an area +                    lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH") +                    data: the opaque data that follows the LSA header +                    lsa: the octets of the full lsa +        Raises: + +            See `msg_send_raises` +        """ +        if callback: +            self.opaque_change_cb[(lsa_type, otype)] = callback +        elif (lsa_type, otype) in self.opaque_change_cb: +            logging.warning( +                "OSPFCLIENT: register: removing callback for %s opaque-type %s", +                lsa_typename(lsa_type), +                otype, +            ) +            del self.opaque_change_cb[(lsa_type, otype)] + +        return await self._assure_opaque_ready(lsa_type, otype) + +    async def unregister_opaque_data(self, lsa_type, otype): +        """Unregister intent to advertise opaque data. + +        This will also cause the server to flush/delete all opaque data of +        the given (lsa_type,otype). + +        Args: +            lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS} +            otype: (octet) opaque type. Note: the type will be registered if the user +                has not explicity done that yet with `register_opaque_data`. +        Raises: +            See `msg_send_raises` +        """ + +        if (lsa_type, otype) in self.opaque_change_cb: +            del self.opaque_change_cb[(lsa_type, otype)] + +        mt = MSG_UNREGISTER_OPAQUETYPE +        mp = struct.pack(msg_fmt[mt], lsa_type, otype) +        await self.msg_send_raises(mt, mp) + +    async def monitor_lsa(self, callback=None): +        """Monitor changes to LSAs. + +        Args: +            callback: if given, callback will be called when changes are received for +                any LSA. The callback signature is: + +                `callback(msg_type, ifaddr, area_id, lsa_header, extra, lsa)` + +                Args: +                    msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY +                    ifaddr: integer identifying an interface (by IP address) +                    area_id: integer identifying an area +                    lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH") +                    extra: the octets that follow the LSA header +                    lsa: the octets of the full lsa +        """ +        self.lsa_change_cb = callback +        await self.req_lsdb_sync() + +    async def monitor_reachable(self, callback=None): +        """Monitor the set of reachable routers. + +        The property `reachable_routers` contains the set() of reachable router IDs +        as integers. This set is updated prior to calling the `callback` + +        Args: +            callback: callback will be called when the set of reachable +                routers changes. The callback signature is: + +                `callback(added, removed)` + +                Args: +                    added: list of integer router IDs being added +                    removed: list of integer router IDs being removed +        """ +        self.reachable_change_cb = callback +        await self.req_reachable_routers() + +    async def monitor_ism(self, callback=None): +        """Monitor the state of OSPF enabled interfaces. + +        Args: +            callback: callback will be called when an interface changes state. +                The callback signature is: + +                `callback(ifaddr, area_id, state)` + +                Args: +                    ifaddr: integer identifying an interface (by IP address) +                    area_id: integer identifying an area +                    state: ISM_* +        """ +        self.ism_change_cb = callback +        await self.req_ism_states() + +    async def monitor_nsm(self, callback=None): +        """Monitor the state of OSPF neighbors. + +        Args: +            callback: callback will be called when a neighbor changes state. +                The callback signature is: + +                `callback(ifaddr, nbr_addr, router_id, state)` + +                Args: +                    ifaddr: integer identifying an interface (by IP address) +                    nbr_addr: integer identifying neighbor by IP address +                    router_id: integer identifying neighbor router ID +                    state: NSM_* +        """ +        self.nsm_change_cb = callback +        await self.req_nsm_states() + + +# ================ +# CLI/Script Usage +# ================ + + +async def async_main(args): +    c = OspfOpaqueClient(args.server) +    await c.connect() + +    try: +        # Start handling async messages from server. +        if sys.version_info[1] > 6: +            asyncio.create_task(c._handle_msg_loop()) +        else: +            asyncio.get_event_loop().create_task(c._handle_msg_loop()) + +        await c.req_lsdb_sync() +        await c.req_reachable_routers() +        await c.req_ism_states() +        await c.req_nsm_states() + +        if args.actions: +            for action in args.actions: +                _s = action.split(",") +                what = _s.pop(False) +                ltype = int(_s.pop(False)) +                if ltype == 11: +                    addr = ip(0) +                else: +                    aval = _s.pop(False) +                    try: +                        addr = ip(int(aval)) +                    except ValueError: +                        addr = ip(aval) +                oargs = [addr, ltype, int(_s.pop(False)), int(_s.pop(False))] +                assert len(_s) <= 1, "Bad format for action argument" +                try: +                    b = bytes.fromhex(_s.pop(False)) +                except IndexError: +                    b = b"" +                logging.info("opaque data is %s octets", len(b)) +                # Needs to be multiple of 4 in length +                mod = len(b) % 4 +                if mod: +                    b += b"\x00" * (4 - mod) +                    logging.info("opaque padding to %s octets", len(b)) + +                if what.casefold() == "add": +                    await c.add_opaque_data(*oargs, b) +                else: +                    assert what.casefold().startswith("del") +                    await c.delete_opaque_data(*oargs) +            if args.exit: +                return 0 +    except Exception as error: +        logging.error("async_main: unexpected error: %s", error, exc_info=True) +        return 2 + +    try: +        logging.info("Sleeping forever") +        while True: +            await asyncio.sleep(120) +    except EOFError: +        logging.info("Got EOF from OSPF API server on async notify socket") +        return 2 + + +def main(*args): +    ap = argparse.ArgumentParser(args) +    ap.add_argument("--exit", action="store_true", help="Exit after commands") +    ap.add_argument("--server", default="localhost", help="OSPF API server") +    ap.add_argument("-v", "--verbose", action="store_true", help="be verbose") +    ap.add_argument( +        "actions", nargs="*", help="(ADD|DEL),LSATYPE,[ADDR,],OTYPE,OID,[HEXDATA]" +    ) +    args = ap.parse_args() + +    level = logging.DEBUG if args.verbose else logging.INFO +    logging.basicConfig( +        level=level, format="%(asctime)s %(levelname)s: CLIENT: %(name)s %(message)s" +    ) + +    logging.info("ospfclient: starting") + +    status = 3 +    try: +        if sys.version_info[1] > 6: +            # python >= 3.7 +            status = asyncio.run(async_main(args)) +        else: +            loop = asyncio.get_event_loop() +            try: +                status = loop.run_until_complete(async_main(args)) +            finally: +                loop.close() +    except KeyboardInterrupt: +        logging.info("Exiting, received KeyboardInterrupt in main") +    except Exception as error: +        logging.info("Exiting, unexpected exception %s", error, exc_info=True) +    else: +        logging.info("ospfclient: clean exit") + +    return status + + +if __name__ == "__main__": +    exit_status = main() +    sys.exit(exit_status) diff --git a/ospfclient/subdir.am b/ospfclient/subdir.am index 1f9547ab87..b8c82c0bcf 100644 --- a/ospfclient/subdir.am +++ b/ospfclient/subdir.am @@ -6,6 +6,10 @@ if OSPFCLIENT  lib_LTLIBRARIES += ospfclient/libfrrospfapiclient.la  noinst_PROGRAMS += ospfclient/ospfclient  #man8 += $(MANBUILD)/frr-ospfclient.8 + +sbin_SCRIPTS += \ +	ospfclient/ospfclient.py \ +	# end  endif  ospfclient_libfrrospfapiclient_la_LDFLAGS = $(LIB_LDFLAGS) -version-info 0:0:0 @@ -41,3 +45,7 @@ endif  ospfclient_ospfclient_SOURCES = \  	ospfclient/ospfclient.c \  	# end + +EXTRA_DIST += \ +	ospfclient/ospfclient.py \ +	# end  | 
