diff options
Diffstat (limited to 'tests/topotests/lib/snmptest.py')
| -rw-r--r-- | tests/topotests/lib/snmptest.py | 186 |
1 files changed, 141 insertions, 45 deletions
diff --git a/tests/topotests/lib/snmptest.py b/tests/topotests/lib/snmptest.py index 598ad05f58..5c4e97a5d2 100644 --- a/tests/topotests/lib/snmptest.py +++ b/tests/topotests/lib/snmptest.py @@ -73,39 +73,6 @@ class SnmpTester(object): # third token onwards is the value of the object return tokens[0].split(".", 1)[1] - def _parse_notification_trap(self, snmp_out): - # we use the "=" as separator thus we will have - # element of list formated "value oid" - # value for index i is corresponding to index i-1 - results = snmp_out.strip().split("=") - - # remove the notification part date, notification OID - del results[0:2] - - index = 0 - oid_list = [] - next_oid = "" - oid = "" - while index < len(results): - result = results[index].strip().split() - if index < len(results) - 1: - raw_oid = result[-1] - # remove initial "." of oid - next_oid = raw_oid.split(".", 1)[1] - # remove oid from result to have only value - del result[-1] - if index > 0: - value = " ".join(result) - # ignore remote port oid 1.3.6.1.3.5.1.1.2.1.9 since - # it's value is variable - local_port = re.search("1.3.6.1.3.5.1.1.2.1.9", oid) - if not local_port: - oid_list.append((oid, value)) - - oid = next_oid - index += 1 - return oid_list - def _parse_multiline(self, snmp_output): results = snmp_output.strip().split("\n") @@ -117,15 +84,6 @@ class SnmpTester(object): return out_dict, out_list - def _parse_multiline_trap(self, results): - out_list = [] - results = [elem for index, elem in enumerate(results) if index % 2 != 0] - - for response in results: - oid_list = self._parse_notification_trap(response) - out_list += oid_list - return out_list - def get(self, oid): cmd = "snmpget {0} {1}".format(self._snmp_config(), oid) @@ -149,10 +107,148 @@ class SnmpTester(object): result = self.router.cmd(cmd) return self._parse_multiline(result) - def trap(self, outputfile): - whitecleanfile = re.sub("\t", " ", outputfile) + def parse_notif_ipv4(self, notif): + # normalise values + notif = re.sub(":", "", notif) + notif = re.sub('"([0-9]{2}) ([0-9]{2}) "', r"\1\2", notif) + notif = re.sub('"([0-9]{2}) "', r"\1", notif) + elems = re.findall("([0-9,\.]+) = ([0-9,\.]+)", notif) + + # remove common part + elems = elems[1:] + return elems + + def is_notif_bgp4_valid(self, output_list, address): + oid_notif_type = ".1.3.6.1.6.3.1.1.4.1.0" + peer_notif_established = ".1.3.6.1.2.1.15.0.1" + peer_notif_backward = ".1.3.6.1.2.1.15.0.2" + oid_peer_last_error = ".1.3.6.1.2.1.15.3.1.14" + oid_peer_remote_addr = ".1.3.6.1.2.1.15.3.1.7" + oid_peer_state = ".1.3.6.1.2.1.15.3.1.2" + + nb_notif = len(output_list) + for nb in range(0, nb_notif - 1): + # identify type of notification + # established or BackwardTransition + + if output_list[nb][0][0] != "{}".format(oid_notif_type): + return False + + if output_list[nb][0][1] == "{}".format(peer_notif_established): + logger.info("Established notification") + elif output_list[nb][0][1] == "{}".format(peer_notif_backward): + logger.info("Backward transition notification") + else: + return False + + # same behavior for 2 notification type in bgp4 + if output_list[nb][1][0] != "{}.{}".format(oid_peer_remote_addr, address): + return False + + if output_list[nb][2][0] != "{}.{}".format(oid_peer_last_error, address): + return False + if output_list[nb][3][0] != "{}.{}".format(oid_peer_state, address): + return False + + return True + + def is_notif_bgp4v2_valid(self, output_list, address, type_requested): + oid_notif_type = ".1.3.6.1.6.3.1.1.4.1.0" + peer_notif_established = ".1.3.6.1.3.5.1.0.1" + peer_notif_backward = ".1.3.6.1.3.5.1.0.2" + oid_peer_state = ".1.3.6.1.3.5.1.1.2.1.13" + oid_peer_local_port = ".1.3.6.1.3.5.1.1.2.1.6" + oid_peer_remote_port = ".1.3.6.1.3.5.1.1.2.1.9" + oid_peer_err_code_recv = ".1.3.6.1.3.5.1.1.3.1.1" + oid_peer_err_sub_code_recv = ".1.3.6.1.3.5.1.1.3.1.2" + oid_peer_err_recv_text = ".1.3.6.1.3.5.1.1.3.1.4" + + nb_notif = len(output_list) + for nb in range(nb_notif): + if output_list[nb][0][0] != "{}".format(oid_notif_type): + return False + + if output_list[nb][0][1] == "{}".format(peer_notif_established): + logger.info("Established notification") + notif_type = "Estab" + + elif output_list[nb][0][1] == "{}".format(peer_notif_backward): + logger.info("Backward transition notification") + notif_type = "Backward" + else: + return False + + if notif_type != type_requested: + continue + + if output_list[nb][1][0] != "{}.1.{}".format(oid_peer_state, address): + continue + + if output_list[nb][2][0] != "{}.1.{}".format(oid_peer_local_port, address): + return False + + if output_list[nb][3][0] != "{}.1.{}".format(oid_peer_remote_port, address): + return False + + if notif_type == "Estab": + return True + + if output_list[nb][4][0] != "{}.1.{}".format( + oid_peer_err_code_recv, address + ): + return False + + if output_list[nb][5][0] != "{}.1.{}".format( + oid_peer_err_sub_code_recv, address + ): + return False + + if output_list[nb][6][0] != "{}.1.{}".format( + oid_peer_err_recv_text, address + ): + return False + + return True + + return False + + def get_notif_bgp4(self, output_file): + notif_list = [] + whitecleanfile = re.sub("\t", " ", output_file) results = whitecleanfile.strip().split("\n") - return self._parse_multiline_trap(results) + + # don't consider SNMP additional messages + notifs_first = [elem for elem in results if not ("SNMP" in elem)] + # don't consider additional application messages + notifs = [elem for index, elem in enumerate(notifs_first) if index % 2 != 0] + + oid_v4 = "1\.3\.6\.1\.2\.1\.15" + for one_notif in notifs: + is_ipv4_notif = re.search(oid_v4, one_notif) + if is_ipv4_notif != None: + formated_notif = self.parse_notif_ipv4(one_notif) + notif_list.append(formated_notif) + + return notif_list + + def get_notif_bgp4v2(self, output_file): + notif_list = [] + whitecleanfile = re.sub("\t", " ", output_file) + results = whitecleanfile.strip().split("\n") + + # don't consider SNMP additional messages + notifs_first = [elem for elem in results if not ("SNMP" in elem)] + # don't consider additional application messages + notifs = [elem for index, elem in enumerate(results) if index % 2 != 0] + + oid_v6 = "1\.3\.6\.1\.3\.5\.1" + for one_notif in notifs: + is_ipv6_notif = re.search(oid_v6, one_notif) + if is_ipv6_notif != None: + formated_notif = self.parse_notif_ipv4(one_notif) + notif_list.append(formated_notif) + + return notif_list def test_oid(self, oid, value): print("oid: {}".format(self.get_next(oid))) |
