for path in paths["paths"]:
new_version = path["version"]
if new_version <= table_version:
- return "{}, prefix ipv4 vpn {} has not been updated yet".format(router.name, prefix)
+ return "{}, prefix ipv4 vpn {} has not been updated yet".format(
+ router.name, prefix
+ )
return None
-def bgp_vpnv4_table_check(router, group, label_list=None, label_value_expected=None, table_version=0):
+def bgp_vpnv4_table_check(
+ router, group, label_list=None, label_value_expected=None, table_version=0
+):
"""
Dump and check that vpnv4 entries have the same MPLS label value
* 'router': the router to check
stored_label_inited = False
for prefix in group:
- test_func = functools.partial(check_bgp_vpnv4_prefix_presence, router, prefix, table_version)
+ test_func = functools.partial(
+ check_bgp_vpnv4_prefix_presence, router, prefix, table_version
+ )
success, _ = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
assert success, "{}, prefix ipv4 vpn {} is not installed yet".format(
router.name, prefix
+ PREFIXES_REDIST
+ PREFIXES_CONNECTED,
label_list=label_list,
- table_version=table_version
+ table_version=table_version,
)
else:
for group in (
PREFIXES_REDIST,
PREFIXES_CONNECTED,
):
- bgp_vpnv4_table_check(router, group=group, label_list=label_list, table_version=table_version)
+ bgp_vpnv4_table_check(
+ router, group=group, label_list=label_list, table_version=table_version
+ )
def check_show_mpls_table(router, blacklist=None, label_list=None, whitelist=None):
return "not good"
return None
+
def get_table_version(router):
table = router.vtysh_cmd("show bgp ipv4 vpn json", isjson=True)
return table["tableVersion"]
+
def mpls_entry_get_interface(router, label):
"""
Assert that the label is in MPLS table
# Check vpnv4 routes from r1
logger.info("Checking vpnv4 routes on r1")
label_list = set()
- bgp_vpnv4_table_check_all(router, label_list=label_list, same=True, table_version=table_version)
+ bgp_vpnv4_table_check_all(
+ router, label_list=label_list, same=True, table_version=table_version
+ )
assert len(label_list) == 1, "r1, multiple Label values found for vpnv4 updates"
new_label = label_list.pop()
# Check vpnv4 routes from r1
logger.info("Checking vpnv4 routes on r1")
label_list = set()
- bgp_vpnv4_table_check_all(router, label_list=label_list, table_version=table_version)
+ bgp_vpnv4_table_check_all(
+ router, label_list=label_list, table_version=table_version
+ )
assert len(label_list) != 1, "r1, only 1 label values found for vpnv4 updates"
# Check mpls table with all values
for path in paths["paths"]:
new_version = path["version"]
if new_version <= table_version:
- return "{}, prefix ipv6 vpn {} has not been updated yet".format(router.name, prefix)
+ return "{}, prefix ipv6 vpn {} has not been updated yet".format(
+ router.name, prefix
+ )
return None
-def bgp_vpnv6_table_check(router, group, label_list=None, label_value_expected=None, table_version=0):
+def bgp_vpnv6_table_check(
+ router, group, label_list=None, label_value_expected=None, table_version=0
+):
"""
Dump and check that vpnv6 entries have the same MPLS label value
* 'router': the router to check
stored_label_inited = False
for prefix in group:
- test_func = functools.partial(check_bgp_vpnv6_prefix_presence, router, prefix, table_version)
+ test_func = functools.partial(
+ check_bgp_vpnv6_prefix_presence, router, prefix, table_version
+ )
success, _ = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
assert success, "{}, prefix ipv6 vpn {} is not installed yet".format(
router.name, prefix
+ PREFIXES_REDIST_R14
+ PREFIXES_CONNECTED,
label_list=label_list,
- table_version=table_version
+ table_version=table_version,
)
else:
for group in (
PREFIXES_REDIST_R14,
PREFIXES_CONNECTED,
):
- bgp_vpnv6_table_check(router, group=group, label_list=label_list, table_version=table_version)
+ bgp_vpnv6_table_check(
+ router, group=group, label_list=label_list, table_version=table_version
+ )
def check_show_mpls_table(router, blacklist=None, label_list=None, whitelist=None):
# Check vpnv6 routes from r1
logger.info("Checking VPNv6 routes on r1")
label_list = set()
- bgp_vpnv6_table_check_all(router, label_list=label_list, table_version=table_version)
+ bgp_vpnv6_table_check_all(
+ router, label_list=label_list, table_version=table_version
+ )
assert len(label_list) != 1, "r1, only 1 label values found for VPNv6 updates"
# Check mpls table with all values