summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/snmptest.py
blob: ba5835dcf757fc8674e727bca93887df790124f0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#
# topogen.py
# Library of helper functions for NetDEF Topology Tests
#
# Copyright (c) 2020 by Volta Networks
#
#
# Permission to use, copy, modify, and/or distribute this software
# for any purpose with or without fee is hereby granted, provided
# that the above copyright notice and this permission notice appear
# in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
# OF THIS SOFTWARE.
#

"""
SNMP library to test snmp walks and gets

Basic usage instructions:

* define an SnmpTester class giving a router, address, community and version
* use test_oid or test_walk to check values in MIBS
* see tests/topotest/simple-snmp-test/test_simple_snmp.py for example
"""

from topolog import logger


class SnmpTester(object):
    "A helper class for testing SNMP"

    def __init__(self, router, iface, community, version):
        self.community = community
        self.version = version
        self.router = router
        self.iface = iface
        logger.info(
            "created SNMP tester: SNMPv{0} community:{1}".format(
                self.version, self.community
            )
        )

    def _snmp_config(self):
        """
        Helper function to build a string with SNMP
        configuration for commands.
        """
        return "-v {0} -c {1} {2}".format(self.version, self.community, self.iface)

    @staticmethod
    def _get_snmp_value(snmp_output):
        tokens = snmp_output.strip().split()

        num_value_tokens = len(tokens) - 3

        if num_value_tokens > 1:
            output = ""
            index = 3
            while index < len(tokens) - 1:
                output += "{} ".format(tokens[index])
                index += 1
            output += "{}".format(tokens[index])
            return output
        # third token is the value of the object
        return tokens[3]

    @staticmethod
    def _get_snmp_oid(snmp_output):
        tokens = snmp_output.strip().split()

        # third token onwards is the value of the object
        return tokens[0].split(".", 1)[1]

    def _parse_multiline(self, snmp_output):
        results = snmp_output.strip().split("\r\n")
        out_dict = {}

        for response in results:
            out_dict[self._get_snmp_oid(response)] = self._get_snmp_value(response)
        return out_dict

    def get(self, oid):
        cmd = "snmpget {0} {1}".format(self._snmp_config(), oid)

        result = self.router.cmd(cmd)
        if "not found" in result:
            return None
        return self._get_snmp_value(result)

    def get_next(self, oid):
        cmd = "snmpgetnext {0} {1}".format(self._snmp_config(), oid)

        result = self.router.cmd(cmd)
        print("get_next: {}".format(result))
        if "not found" in result:
            return None
        return self._get_snmp_value(result)

    def walk(self, oid):
        cmd = "snmpwalk {0} {1}".format(self._snmp_config(), oid)

        result = self.router.cmd(cmd)
        return self._parse_multiline(result)

    def test_oid(self, oid, value):
        print("oid: {}".format(self.get_next(oid)))
        return self.get_next(oid) == value

    def test_oid_walk(self, oid, values, oids=None):
        results_dict = self.walk(oid)
        print("res {}".format(results_dict))
        if oids is not None:
            index = 0
            for oid in oids:
                if results_dict[oid] != values[index]:
                    return False
                index += 1
            return True

        return results_dict.values() == values