bgp_vpnv4_table_check(router, group=group, label_list=label_list)
-def mpls_table_check(router, blacklist=None, label_list=None, whitelist=None):
- """
- Dump and check 'show mpls table json' output. An assert is triggered in case test fails
- * 'router': the router to check
- * 'blacklist': the list of nexthops (IP or interface) that should not be on output
- * 'label_list': the list of labels that should be in inLabel value
- * 'whitelist': the list of nexthops (IP or interface) that should be on output
- """
+def check_show_mpls_table(router, blacklist=None, label_list=None, whitelist=None):
nexthop_list = []
if blacklist:
nexthop_list.append(blacklist)
- logger.info("Checking MPLS labels on {}".format(router.name))
+
dump = router.vtysh_cmd("show mpls table json", isjson=True)
for in_label, label_info in dump.items():
if label_list is not None:
label_list.add(in_label)
for nh in label_info["nexthops"]:
- assert (
- nh["installed"] == True and nh["type"] == "BGP"
- ), "{}, show mpls table, nexthop is not installed".format(router.name)
- if "nexthop" in nh.keys():
- assert (
- nh["nexthop"] not in nexthop_list
- ), "{}, show mpls table, duplicated or blacklisted nexthop address".format(
+ if nh["installed"] != True or nh["type"] != "BGP":
+ return "{}, show mpls table, nexthop is not installed".format(
router.name
)
+ if "nexthop" in nh.keys():
+ if nh["nexthop"] in nexthop_list:
+ return "{}, show mpls table, duplicated or blacklisted nexthop address".format(
+ router.name
+ )
nexthop_list.append(nh["nexthop"])
elif "interface" in nh.keys():
- assert (
- nh["interface"] not in nexthop_list
- ), "{}, show mpls table, duplicated or blacklisted nexthop interface".format(
- router.name
- )
+ if nh["interface"] in nexthop_list:
+ return "{}, show mpls table, duplicated or blacklisted nexthop interface".format(
+ router.name
+ )
nexthop_list.append(nh["interface"])
else:
- assert (
- 0
- ), "{}, show mpls table, entry with neither nexthop nor interface".format(
+ return "{}, show mpls table, entry with neither nexthop nor interface".format(
router.name
)
if whitelist:
for entry in whitelist:
- assert (
- entry in nexthop_list
- ), "{}, show mpls table, entry with nexthop {} not present in nexthop list".format(
- router.name, entry
- )
+ if entry not in nexthop_list:
+ return "{}, show mpls table, entry with nexthop {} not present in nexthop list".format(
+ router.name, entry
+ )
+ return None
+
+
+def mpls_table_check(router, blacklist=None, label_list=None, whitelist=None):
+ """
+ Dump and check 'show mpls table json' output. An assert is triggered in case test fails
+ * 'router': the router to check
+ * 'blacklist': the list of nexthops (IP or interface) that should not be on output
+ * 'label_list': the list of labels that should be in inLabel value
+ * 'whitelist': the list of nexthops (IP or interface) that should be on output
+ """
+ logger.info("Checking MPLS labels on {}".format(router.name))
+ # Check r2 removed 172.31.0.30 vpnv4 update
+ test_func = functools.partial(
+ check_show_mpls_table, router, blacklist, label_list, whitelist
+ )
+ success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
+ assert success, "{}, MPLS labels check fail: {}".format(router.name, result)
def check_show_bgp_vpn_prefix_not_found(router, ipversion, prefix, rd, label=None):