]> git.puffer.fish Git - matthieu/frr.git/commitdiff
tests: add machinery to test SNMP
authorBabis Chalios <babis@voltanet.io>
Thu, 12 Nov 2020 11:43:36 +0000 (11:43 +0000)
committerPat Ruddy <pat@voltanet.io>
Mon, 11 Jan 2021 14:36:41 +0000 (14:36 +0000)
Add initial 'walk' and 'get' test APIs for SNMP

Signed-off-by: Babis Chalios <babis@voltanet.io>
Signed-off-by: Pat Ruddy <pat@voltanet.io>
tests/topotests/lib/snmptest.py [new file with mode: 0644]

diff --git a/tests/topotests/lib/snmptest.py b/tests/topotests/lib/snmptest.py
new file mode 100644 (file)
index 0000000..ba5835d
--- /dev/null
@@ -0,0 +1,127 @@
+#
+# topogen.py
+# Library of helper functions for NetDEF Topology Tests
+#
+# Copyright (c) 2020 by Volta Networks
+#
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+SNMP library to test snmp walks and gets
+
+Basic usage instructions:
+
+* define an SnmpTester class giving a router, address, community and version
+* use test_oid or test_walk to check values in MIBS
+* see tests/topotest/simple-snmp-test/test_simple_snmp.py for example
+"""
+
+from topolog import logger
+
+
+class SnmpTester(object):
+    "A helper class for testing SNMP"
+
+    def __init__(self, router, iface, community, version):
+        self.community = community
+        self.version = version
+        self.router = router
+        self.iface = iface
+        logger.info(
+            "created SNMP tester: SNMPv{0} community:{1}".format(
+                self.version, self.community
+            )
+        )
+
+    def _snmp_config(self):
+        """
+        Helper function to build a string with SNMP
+        configuration for commands.
+        """
+        return "-v {0} -c {1} {2}".format(self.version, self.community, self.iface)
+
+    @staticmethod
+    def _get_snmp_value(snmp_output):
+        tokens = snmp_output.strip().split()
+
+        num_value_tokens = len(tokens) - 3
+
+        if num_value_tokens > 1:
+            output = ""
+            index = 3
+            while index < len(tokens) - 1:
+                output += "{} ".format(tokens[index])
+                index += 1
+            output += "{}".format(tokens[index])
+            return output
+        # third token is the value of the object
+        return tokens[3]
+
+    @staticmethod
+    def _get_snmp_oid(snmp_output):
+        tokens = snmp_output.strip().split()
+
+        # third token onwards is the value of the object
+        return tokens[0].split(".", 1)[1]
+
+    def _parse_multiline(self, snmp_output):
+        results = snmp_output.strip().split("\r\n")
+        out_dict = {}
+
+        for response in results:
+            out_dict[self._get_snmp_oid(response)] = self._get_snmp_value(response)
+        return out_dict
+
+    def get(self, oid):
+        cmd = "snmpget {0} {1}".format(self._snmp_config(), oid)
+
+        result = self.router.cmd(cmd)
+        if "not found" in result:
+            return None
+        return self._get_snmp_value(result)
+
+    def get_next(self, oid):
+        cmd = "snmpgetnext {0} {1}".format(self._snmp_config(), oid)
+
+        result = self.router.cmd(cmd)
+        print("get_next: {}".format(result))
+        if "not found" in result:
+            return None
+        return self._get_snmp_value(result)
+
+    def walk(self, oid):
+        cmd = "snmpwalk {0} {1}".format(self._snmp_config(), oid)
+
+        result = self.router.cmd(cmd)
+        return self._parse_multiline(result)
+
+    def test_oid(self, oid, value):
+        print("oid: {}".format(self.get_next(oid)))
+        return self.get_next(oid) == value
+
+    def test_oid_walk(self, oid, values, oids=None):
+        results_dict = self.walk(oid)
+        print("res {}".format(results_dict))
+        if oids is not None:
+            index = 0
+            for oid in oids:
+                if results_dict[oid] != values[index]:
+                    return False
+                index += 1
+            return True
+
+        return results_dict.values() == values