summaryrefslogtreecommitdiff
path: root/tests/topotests/lib/snmptest.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/lib/snmptest.py')
-rw-r--r--tests/topotests/lib/snmptest.py186
1 files changed, 141 insertions, 45 deletions
diff --git a/tests/topotests/lib/snmptest.py b/tests/topotests/lib/snmptest.py
index 598ad05f58..5c4e97a5d2 100644
--- a/tests/topotests/lib/snmptest.py
+++ b/tests/topotests/lib/snmptest.py
@@ -73,39 +73,6 @@ class SnmpTester(object):
# third token onwards is the value of the object
return tokens[0].split(".", 1)[1]
- def _parse_notification_trap(self, snmp_out):
- # we use the "=" as separator thus we will have
- # element of list formated "value oid"
- # value for index i is corresponding to index i-1
- results = snmp_out.strip().split("=")
-
- # remove the notification part date, notification OID
- del results[0:2]
-
- index = 0
- oid_list = []
- next_oid = ""
- oid = ""
- while index < len(results):
- result = results[index].strip().split()
- if index < len(results) - 1:
- raw_oid = result[-1]
- # remove initial "." of oid
- next_oid = raw_oid.split(".", 1)[1]
- # remove oid from result to have only value
- del result[-1]
- if index > 0:
- value = " ".join(result)
- # ignore remote port oid 1.3.6.1.3.5.1.1.2.1.9 since
- # it's value is variable
- local_port = re.search("1.3.6.1.3.5.1.1.2.1.9", oid)
- if not local_port:
- oid_list.append((oid, value))
-
- oid = next_oid
- index += 1
- return oid_list
-
def _parse_multiline(self, snmp_output):
results = snmp_output.strip().split("\n")
@@ -117,15 +84,6 @@ class SnmpTester(object):
return out_dict, out_list
- def _parse_multiline_trap(self, results):
- out_list = []
- results = [elem for index, elem in enumerate(results) if index % 2 != 0]
-
- for response in results:
- oid_list = self._parse_notification_trap(response)
- out_list += oid_list
- return out_list
-
def get(self, oid):
cmd = "snmpget {0} {1}".format(self._snmp_config(), oid)
@@ -149,10 +107,148 @@ class SnmpTester(object):
result = self.router.cmd(cmd)
return self._parse_multiline(result)
- def trap(self, outputfile):
- whitecleanfile = re.sub("\t", " ", outputfile)
+ def parse_notif_ipv4(self, notif):
+ # normalise values
+ notif = re.sub(":", "", notif)
+ notif = re.sub('"([0-9]{2}) ([0-9]{2}) "', r"\1\2", notif)
+ notif = re.sub('"([0-9]{2}) "', r"\1", notif)
+ elems = re.findall("([0-9,\.]+) = ([0-9,\.]+)", notif)
+
+ # remove common part
+ elems = elems[1:]
+ return elems
+
+ def is_notif_bgp4_valid(self, output_list, address):
+ oid_notif_type = ".1.3.6.1.6.3.1.1.4.1.0"
+ peer_notif_established = ".1.3.6.1.2.1.15.0.1"
+ peer_notif_backward = ".1.3.6.1.2.1.15.0.2"
+ oid_peer_last_error = ".1.3.6.1.2.1.15.3.1.14"
+ oid_peer_remote_addr = ".1.3.6.1.2.1.15.3.1.7"
+ oid_peer_state = ".1.3.6.1.2.1.15.3.1.2"
+
+ nb_notif = len(output_list)
+ for nb in range(0, nb_notif - 1):
+ # identify type of notification
+ # established or BackwardTransition
+
+ if output_list[nb][0][0] != "{}".format(oid_notif_type):
+ return False
+
+ if output_list[nb][0][1] == "{}".format(peer_notif_established):
+ logger.info("Established notification")
+ elif output_list[nb][0][1] == "{}".format(peer_notif_backward):
+ logger.info("Backward transition notification")
+ else:
+ return False
+
+ # same behavior for 2 notification type in bgp4
+ if output_list[nb][1][0] != "{}.{}".format(oid_peer_remote_addr, address):
+ return False
+
+ if output_list[nb][2][0] != "{}.{}".format(oid_peer_last_error, address):
+ return False
+ if output_list[nb][3][0] != "{}.{}".format(oid_peer_state, address):
+ return False
+
+ return True
+
+ def is_notif_bgp4v2_valid(self, output_list, address, type_requested):
+ oid_notif_type = ".1.3.6.1.6.3.1.1.4.1.0"
+ peer_notif_established = ".1.3.6.1.3.5.1.0.1"
+ peer_notif_backward = ".1.3.6.1.3.5.1.0.2"
+ oid_peer_state = ".1.3.6.1.3.5.1.1.2.1.13"
+ oid_peer_local_port = ".1.3.6.1.3.5.1.1.2.1.6"
+ oid_peer_remote_port = ".1.3.6.1.3.5.1.1.2.1.9"
+ oid_peer_err_code_recv = ".1.3.6.1.3.5.1.1.3.1.1"
+ oid_peer_err_sub_code_recv = ".1.3.6.1.3.5.1.1.3.1.2"
+ oid_peer_err_recv_text = ".1.3.6.1.3.5.1.1.3.1.4"
+
+ nb_notif = len(output_list)
+ for nb in range(nb_notif):
+ if output_list[nb][0][0] != "{}".format(oid_notif_type):
+ return False
+
+ if output_list[nb][0][1] == "{}".format(peer_notif_established):
+ logger.info("Established notification")
+ notif_type = "Estab"
+
+ elif output_list[nb][0][1] == "{}".format(peer_notif_backward):
+ logger.info("Backward transition notification")
+ notif_type = "Backward"
+ else:
+ return False
+
+ if notif_type != type_requested:
+ continue
+
+ if output_list[nb][1][0] != "{}.1.{}".format(oid_peer_state, address):
+ continue
+
+ if output_list[nb][2][0] != "{}.1.{}".format(oid_peer_local_port, address):
+ return False
+
+ if output_list[nb][3][0] != "{}.1.{}".format(oid_peer_remote_port, address):
+ return False
+
+ if notif_type == "Estab":
+ return True
+
+ if output_list[nb][4][0] != "{}.1.{}".format(
+ oid_peer_err_code_recv, address
+ ):
+ return False
+
+ if output_list[nb][5][0] != "{}.1.{}".format(
+ oid_peer_err_sub_code_recv, address
+ ):
+ return False
+
+ if output_list[nb][6][0] != "{}.1.{}".format(
+ oid_peer_err_recv_text, address
+ ):
+ return False
+
+ return True
+
+ return False
+
+ def get_notif_bgp4(self, output_file):
+ notif_list = []
+ whitecleanfile = re.sub("\t", " ", output_file)
results = whitecleanfile.strip().split("\n")
- return self._parse_multiline_trap(results)
+
+ # don't consider SNMP additional messages
+ notifs_first = [elem for elem in results if not ("SNMP" in elem)]
+ # don't consider additional application messages
+ notifs = [elem for index, elem in enumerate(notifs_first) if index % 2 != 0]
+
+ oid_v4 = "1\.3\.6\.1\.2\.1\.15"
+ for one_notif in notifs:
+ is_ipv4_notif = re.search(oid_v4, one_notif)
+ if is_ipv4_notif != None:
+ formated_notif = self.parse_notif_ipv4(one_notif)
+ notif_list.append(formated_notif)
+
+ return notif_list
+
+ def get_notif_bgp4v2(self, output_file):
+ notif_list = []
+ whitecleanfile = re.sub("\t", " ", output_file)
+ results = whitecleanfile.strip().split("\n")
+
+ # don't consider SNMP additional messages
+ notifs_first = [elem for elem in results if not ("SNMP" in elem)]
+ # don't consider additional application messages
+ notifs = [elem for index, elem in enumerate(results) if index % 2 != 0]
+
+ oid_v6 = "1\.3\.6\.1\.3\.5\.1"
+ for one_notif in notifs:
+ is_ipv6_notif = re.search(oid_v6, one_notif)
+ if is_ipv6_notif != None:
+ formated_notif = self.parse_notif_ipv4(one_notif)
+ notif_list.append(formated_notif)
+
+ return notif_list
def test_oid(self, oid, value):
print("oid: {}".format(self.get_next(oid)))