return pytest.skip("Skipping BGP EVPN RT5 NETNS Test. Kernel not supported")
r1 = tgen.net["r1"]
- ns = "vrf-101"
- r1.add_netns(ns)
- r1.cmd_raises(
- """
-ip link add loop101 type dummy
-ip link add vxlan-101 type vxlan id 101 dstport 4789 dev r1-eth0 local 192.168.0.1
-"""
- )
- r1.set_intf_netns("loop101", ns, up=True)
- r1.set_intf_netns("vxlan-101", ns, up=True)
- r1.cmd_raises(
- """
-ip -n vrf-101 link set lo up
-ip -n vrf-101 link add bridge-101 up type bridge stp_state 0
-ip -n vrf-101 link set dev vxlan-101 master bridge-101
-ip -n vrf-101 link set bridge-101 up
-ip -n vrf-101 link set vxlan-101 up
-"""
- )
+ for vrf in (101, 102):
+ ns = "vrf-{}".format(vrf)
+ r1.add_netns(ns)
+ r1.cmd_raises(
+ """
+ip link add loop{0} type dummy
+ip link add vxlan-{0} type vxlan id {0} dstport 4789 dev r1-eth0 local 192.168.0.1
+""".format(
+ vrf
+ )
+ )
+ r1.set_intf_netns("loop{}".format(vrf), ns, up=True)
+ r1.set_intf_netns("vxlan-{}".format(vrf), ns, up=True)
+ r1.cmd_raises(
+ """
+ip -n vrf-{0} link set lo up
+ip -n vrf-{0} link add bridge-{0} up address {1} type bridge stp_state 0
+ip -n vrf-{0} link set dev vxlan-{0} master bridge-{0}
+ip -n vrf-{0} link set bridge-{0} up
+ip -n vrf-{0} link set vxlan-{0} up
+""".format(
+ vrf, _create_rmac(1, vrf)
+ )
+ )
- tgen.gears["r2"].cmd(
- """
-ip link add vrf-101 type vrf table 101
-ip link set dev vrf-101 up
-ip link add loop101 type dummy
-ip link set dev loop101 master vrf-101
-ip link set dev loop101 up
-ip link add bridge-101 up type bridge stp_state 0
-ip link set bridge-101 master vrf-101
-ip link set dev bridge-101 up
-ip link add vxlan-101 type vxlan id 101 dstport 4789 dev r2-eth0 local 192.168.0.2
-ip link set dev vxlan-101 master bridge-101
-ip link set vxlan-101 up type bridge_slave learning off flood off mcast_flood off
-"""
- )
+ tgen.gears["r2"].cmd(
+ """
+ip link add vrf-{0} type vrf table {0}
+ip link set dev vrf-{0} up
+ip link add loop{0} type dummy
+ip link set dev loop{0} master vrf-{0}
+ip link set dev loop{0} up
+ip link add bridge-{0} up address {1} type bridge stp_state 0
+ip link set bridge-{0} master vrf-{0}
+ip link set dev bridge-{0} up
+ip link add vxlan-{0} type vxlan id {0} dstport 4789 dev r2-eth0 local 192.168.0.2
+ip link set dev vxlan-{0} master bridge-{0}
+ip link set vxlan-{0} up type bridge_slave learning off flood off mcast_flood off
+""".format(
+ vrf, _create_rmac(2, vrf)
+ )
+ )
for rname, router in tgen.routers().items():
logger.info("Loading router %s" % rname)
tgen = get_topogen()
tgen.net["r1"].delete_netns("vrf-101")
+ tgen.net["r1"].delete_netns("vrf-102")
tgen.stop_topology()
-def _test_evpn_ping_router(pingrouter, ipv4_only=False, ipv6_only=False):
+def _create_rmac(router, vrf):
+ """
+ Creates RMAC for a given router and vrf
+ """
+ return "52:54:00:00:{:02x}:{:02x}".format(router, vrf)
+
+
+def _test_evpn_ping_router(
+ pingrouter, dst_router, source_vrf, dst_vrf, ipv4_only=False, ipv6_only=False
+):
"""
internal function to check ping between r1 and r2
"""
- # Check IPv4 and IPv6 connectivity between r1 and r2 ( routing vxlan evpn)
+ if pingrouter.name == "r1":
+ command = "ip netns exec vrf-{0} ping".format(source_vrf)
+ else:
+ command = "ping -I vrf-{0}".format(source_vrf)
+
+ dst_router_id = dst_router.name[1:]
+ dst_ips = []
+
if not ipv6_only:
- logger.info("Check Ping IPv4 from R1(vrf-101) to R2(vrf-101, 10.0.101.2)")
- output = pingrouter.run("ip netns exec vrf-101 ping 10.0.101.2 -f -c 1000")
+ dst_ips.append("10.0.{0}.{1}".format(dst_vrf, dst_router_id))
+ if not ipv4_only:
+ dst_ips.append("fd0{0}::{1}".format(dst_vrf - 100, dst_router_id))
+
+ for ip in dst_ips:
+ logger.info(
+ "Check Ping from {0}(vrf-{2}) to {1}(vrf-{3}, {4})".format(
+ pingrouter.name, dst_router.name, source_vrf, dst_vrf, ip
+ )
+ )
+ output = pingrouter.run("{0} {1} -f -c 1000".format(command, ip))
logger.info(output)
if "1000 packets transmitted, 1000 received" not in output:
- assertmsg = "expected ping IPv4 from R1(vrf-101) to R2(vrf-101, 10.0.101.2) should be ok"
+ assertmsg = "expected ping from {0}(vrf-{2}) to {1}(vrf-{3}, {4}) should be ok".format(
+ pingrouter.name, dst_router.name, source_vrf, dst_vrf, ip
+ )
assert 0, assertmsg
else:
logger.info(
- "Check Ping IPv4 from R1(vrf-101) to R2(vrf-101, 10.0.101.2) OK"
+ "Check Ping from {0}(vrf-{2}) to {1}(vrf-{3}, {4}) OK".format(
+ pingrouter.name, dst_router.name, source_vrf, dst_vrf, ip
+ )
)
- if not ipv4_only:
- logger.info("Check Ping IPv6 from R1(vrf-101) to R2(vrf-101, fd01::2)")
- output = pingrouter.run("ip netns exec vrf-101 ping fd01::2 -f -c 1000")
- logger.info(output)
- if "1000 packets transmitted, 1000 received" not in output:
- assert (
- 0
- ), "expected ping IPv6 from R1(vrf-101) to R2(vrf-101, fd01::2) should be ok"
- else:
- logger.info("Check Ping IPv6 from R1(vrf-101) to R2(vrf-101, fd01::2) OK")
-
def test_protocols_convergence():
"""
tgen = get_topogen()
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- # Check BGP IPv4 routing tables on r1
- logger.info("Checking BGP L2VPN EVPN routes for convergence on r1")
for rname in ("r1", "r2"):
router = tgen.gears[rname]
+ logger.info(
+ "Checking BGP L2VPN EVPN routes for convergence on {}".format(router.name)
+ )
json_file = "{}/{}/bgp_l2vpn_evpn_routes.json".format(CWD, router.name)
expected = json.loads(open(json_file).read())
test_func = partial(
logger.info("==== result from show bgp vrf vrf-101 ipv6")
logger.info(output)
output = tgen.gears["r1"].vtysh_cmd("show bgp vrf vrf-101", isjson=False)
- logger.info("==== result from show bgp vrf vrf-101 ")
+ logger.info("==== result from show bgp vrf vrf-101")
logger.info(output)
output = tgen.gears["r1"].vtysh_cmd("show ip route vrf vrf-101", isjson=False)
logger.info("==== result from show ip route vrf vrf-101")
_print_evpn_nexthop_rmac("r1")
+def _test_bgp_vrf_routes(router, vrf, suffix=None):
+ for af in ("ipv4", "ipv6"):
+ json_file = "{}/{}/bgp_vrf_{}_{}_routes_detail{}.json".format(
+ CWD, router.name, vrf, af, "_" + suffix if suffix else ""
+ )
+ expected = json.loads(open(json_file).read())
+ test_func = partial(
+ topotest.router_json_cmp,
+ router,
+ "show bgp vrf vrf-{} {} unicast detail json".format(vrf, af),
+ expected,
+ )
+ _, result = topotest.run_and_expect(test_func, None, count=20, wait=1)
+ assertmsg = '"{}" JSON output mismatches VRF: {} Suffix: {}'.format(
+ router.name, vrf, suffix
+ )
+ assert result is None, assertmsg
+
+
def test_bgp_vrf_routes():
"""
Check routes are correctly imported to VRF
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- for rname in ("r1", "r2"):
- router = tgen.gears[rname]
- for af in ("ipv4", "ipv6"):
- json_file = "{}/{}/bgp_vrf_{}_routes_detail.json".format(
- CWD, router.name, af
- )
- expected = json.loads(open(json_file).read())
- test_func = partial(
- topotest.router_json_cmp,
- router,
- "show bgp vrf vrf-101 {} unicast detail json".format(af),
- expected,
- )
- _, result = topotest.run_and_expect(test_func, None, count=20, wait=1)
- assertmsg = '"{}" JSON output mismatches'.format(router.name)
- assert result is None, assertmsg
+ for vrf in (101, 102):
+ for rname in ("r1", "r2"):
+ router = tgen.gears[rname]
+ _test_bgp_vrf_routes(router, vrf)
def test_router_check_ip():
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- _test_evpn_ping_router(tgen.gears["r1"])
+ _test_evpn_ping_router(tgen.gears["r1"], tgen.gears["r2"], 101, 101)
def test_evpn_disable_routemap():
result = verify_bgp_rib(tgen, family, router, rib_routes, expected=expected)
if expected:
- assert result is True, "expect routes {} present".format(routes)
+ assert result, "expect routes {} present".format(routes)
else:
assert result is not True, "expect routes {} not present".format(routes)
-def test_evpn_remove_ip():
+def test_evpn_remove_ipv6():
"""
Check the removal of an EVPN route is correctly handled
"""
logger.info("==== Remove IPv6 network on R2")
result = apply_raw_config(tgen, config_no_ipv6)
- assert result is True, "Failed to remove IPv6 network on R2, Error: {} ".format(
- result
- )
+ assert result, "Failed to remove IPv6 network on R2, Error: {} ".format(result)
_check_evpn_routes("r1", "ipv6", "vrf-101", ["fd01::2/128"], expected=False)
_print_evpn_nexthop_rmac("r1")
-
-
-def test_router_check_evpn_contexts_again():
- """
- Check EVPN nexthops and RMAC number are correctly configured
- """
- tgen = get_topogen()
- if tgen.routers_have_failure():
- pytest.skip(tgen.errors)
-
- _test_router_check_evpn_contexts(tgen.gears["r1"], ipv4_only=True)
_test_router_check_evpn_next_hop()
+ _test_evpn_ping_router(tgen.gears["r1"], tgen.gears["r2"], 101, 101, ipv4_only=True)
+ _test_router_check_evpn_contexts(tgen.gears["r1"], ipv4_only=True)
-def test_evpn_ping_again():
- """
- Check ping between R1 and R2 is ok
- """
- tgen = get_topogen()
- if tgen.routers_have_failure():
- pytest.skip(tgen.errors)
-
- _test_evpn_ping_router(tgen.gears["r1"], ipv4_only=True)
-
-
-def test_evpn_other_address_family():
+def test_evpn_remove_ipv4():
"""
Check the removal of an EVPN route is correctly handled
"""
logger.info("==== Add IPv6 again network on R2")
result = apply_raw_config(tgen, config_add_ipv6)
- assert result is True, "Failed to add IPv6 network on R2, Error: {} ".format(result)
+ assert result, "Failed to add IPv6 network on R2, Error: {} ".format(result)
_check_evpn_routes("r1", "ipv6", "vrf-101", ["fd01::2/128"], expected=True)
config_no_ipv4 = {
logger.info("==== Remove IPv4 network on R2")
result = apply_raw_config(tgen, config_no_ipv4)
- assert result is True, "Failed to remove IPv4 network on R2, Error: {} ".format(
- result
- )
+ assert result, "Failed to remove IPv4 network on R2, Error: {} ".format(result)
_check_evpn_routes("r1", "ipv4", "vrf-101", ["10.0.101.2/32"], expected=False)
_print_evpn_nexthop_rmac("r1")
+ _test_router_check_evpn_next_hop()
+ _test_evpn_ping_router(tgen.gears["r1"], tgen.gears["r2"], 101, 101, ipv6_only=True)
+ _test_router_check_evpn_contexts(tgen.gears["r1"], ipv6_only=True)
-def test_router_check_evpn_contexts_again_other_address_family():
+def test_evpn_restore_ipv4():
"""
- Check EVPN nexthops and RMAC number are correctly configured
+ Restore IPv4 network on R2
"""
tgen = get_topogen()
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
- _test_router_check_evpn_contexts(tgen.gears["r1"], ipv6_only=True)
-
+ config_add_ipv4 = {
+ "r2": {
+ "raw_config": [
+ "router bgp 65000 vrf vrf-101",
+ "address-family ipv4 unicast",
+ "network 10.0.101.2/32",
+ "network 10.0.101.12/32",
+ ]
+ }
+ }
-def test_evpn_ping_again_other_address_family():
- """
- Check ping between R1 and R2 is ok
- """
- tgen = get_topogen()
- if tgen.routers_have_failure():
- pytest.skip(tgen.errors)
+ logger.info("==== Add IPv4 network again on R2")
+ result = apply_raw_config(tgen, config_add_ipv4)
+ assert result, "Failed to add IPv4 network again on R2, Error: {} ".format(result)
- _test_evpn_ping_router(tgen.gears["r1"], ipv6_only=True)
+ _check_evpn_routes("r1", "ipv4", "vrf-101", ["10.0.101.2/32"], expected=True)
+ _test_router_check_evpn_next_hop()
+ _test_evpn_ping_router(tgen.gears["r1"], tgen.gears["r2"], 101, 101)
+ _test_router_check_evpn_contexts(tgen.gears["r1"])
def _get_established_epoch(router, peer):
logger.info("==== Configure second path between R1 and R2")
result = apply_raw_config(tgen, evpn_multipath)
assert (
- result is True
+ result
), "Failed to configure second path between R1 and R2, Error: {} ".format(result)
r1 = tgen.gears["r1"]
logger.info("==== Deconfigure second path between R1 and R2")
result = apply_raw_config(tgen, shutdown_evpn_multipath)
assert (
- result is True
+ result
), "Failed to deconfigure second path between R1 and R2, Error: {} ".format(result)
_test_wait_for_multipath_convergence(tgen.gears["r2"])
_test_router_check_evpn_next_hop()
-def test_rmap_match_evpn_vni_102():
+def test_rmap_match_evpn_vni_105():
"""
change input route-map from r2.
- match evpn vni value from 101 to 102
+ match evpn vni value from 101 to 105
expecting all prefixes are denied
"""
pytest.skip(tgen.errors)
r1 = tgen.gears["r1"]
- nb_prefix = 2
+ nb_prefix = 4
expected = {"numPrefix": nb_prefix, "totalPrefix": nb_prefix}
test_func = partial(
topotest.router_json_cmp,
"r1": {
"raw_config": [
"route-map rmap_r1 permit 1",
- "match evpn vni 102",
+ "match evpn vni 105",
]
},
}
assert apply_raw_config(tgen, cfg), "Configuration failed"
r1 = tgen.gears["r1"]
- nb_prefix = 2
+ nb_prefix = 4
expected = {"numPrefix": nb_prefix, "totalPrefix": nb_prefix}
test_func = partial(
topotest.router_json_cmp,
assert apply_raw_config(tgen, cfg), "Configuration failed"
r1 = tgen.gears["r1"]
- nb_prefix = 2
+ nb_prefix = 4
expected = {"numPrefix": nb_prefix, "totalPrefix": nb_prefix}
test_func = partial(
topotest.router_json_cmp,
assert result is None, f"r1 was expecting {nb_prefix} from r2"
+def _validate_evpn_rmacs(router, expected):
+ """
+ Internal function to check RMACs are matching the expected values
+ and that VTEP IPs are unique for each VRF/VNI
+ """
+ data = router.vtysh_cmd("show evpn rmac vni all json", isjson=True)
+ cmp = topotest.json_cmp(data, expected, exact=False)
+ if cmp is not None:
+ return cmp
+
+ for vni, details in data.items():
+ vtep_ips = []
+ for key, detail in details.items():
+ if key == "numRmacs":
+ continue
+ vtep_ip = detail["vtepIp"]
+ if vtep_ip in vtep_ips:
+ # VTEP IP is occuring for more than one RMAC in the same VNI
+ return "Duplicate VTEP IP {} found in VNI {}".format(vtep_ip, vni)
+ vtep_ips.append(detail["vtepIp"])
+
+ return None
+
+
+def _test_evpn_rmac(tgen):
+ """
+ Internal function to check RMACs for both VRFs from peers
+ """
+ for router, peer in {1: 2, 2: 1}.items():
+ r = tgen.gears["r{}".format(router)]
+ # Expecting the RMACs of the peer
+ expected = {
+ str(vrf): {
+ _create_rmac(peer, vrf): {
+ "routerMac": _create_rmac(peer, vrf),
+ "vtepIp": "192.168.0.{}".format(peer),
+ }
+ }
+ for vrf in (101, 102)
+ }
+ test_func = partial(
+ _validate_evpn_rmacs,
+ r,
+ expected,
+ )
+ _, result = topotest.run_and_expect(test_func, None, count=30, wait=1)
+ assert result is None, "r{}".format(router) + " missing rmacs for vni"
+
+
+def test_evpn_l3vpn_import():
+ """
+ Import vrf-102 to vrf-101 on r2 and vice versa on r3
+ """
+ tgen = get_topogen()
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ _test_evpn_rmac(tgen)
+
+ # import r1 vrf 101 routes into vrf 102 and vice versa on r2
+ # establishing connectivity for r1 between vrf 101 and vrf 102
+ # over r2. Overwriting origin to allow re-export to iBGP peer.
+ cfg = {
+ "r2": {
+ "raw_config": [
+ "ip prefix-list vrf-101 seq 5 permit 10.0.102.1/32",
+ "ipv6 prefix-list vrf-101 seq 5 permit fd02::1/128",
+ "route-map vrf-import-to-101 permit 1",
+ " match ip address prefix-list vrf-101",
+ " set origin incomplete",
+ " route-map vrf-import-to-101 permit 2",
+ " match ipv6 address prefix-list vrf-101",
+ " set origin incomplete",
+ "ip prefix-list vrf-102 seq 5 permit 10.0.101.1/32",
+ "ipv6 prefix-list vrf-102 seq 5 permit fd01::1/128",
+ "route-map vrf-import-to-102 permit 1",
+ " match ip address prefix-list vrf-102",
+ " set origin incomplete",
+ " route-map vrf-import-to-102 permit 2",
+ " match ipv6 address prefix-list vrf-102",
+ " set origin incomplete",
+ "router bgp 65000 vrf vrf-101",
+ " address-family ipv4 unicast",
+ " import vrf route-map vrf-import-to-101",
+ " import vrf vrf-102",
+ " address-family ipv6 unicast",
+ " import vrf route-map vrf-import-to-101",
+ " import vrf vrf-102",
+ "router bgp 65000 vrf vrf-102",
+ " address-family ipv4 unicast",
+ " import vrf route-map vrf-import-to-102",
+ " import vrf vrf-101",
+ " address-family ipv6 unicast",
+ " import vrf route-map vrf-import-to-102",
+ " import vrf vrf-101",
+ ]
+ },
+ }
+ assert apply_raw_config(tgen, cfg), "Configuration failed"
+
+ for vrf in (101, 102):
+ _test_bgp_vrf_routes(tgen.gears["r1"], vrf, suffix="import")
+
+ _test_evpn_rmac(tgen)
+ _test_evpn_ping_router(tgen.gears["r1"], tgen.gears["r1"], 101, 102)
+
+
def test_memory_leak():
"Run the memory leak test and report results."
tgen = get_topogen()