switch.add_link(tgen.gears["pe1"])
switch.add_link(tgen.gears["rr1"])
+ switch = tgen.add_switch("s4")
+ switch.add_link(tgen.gears["pe1"])
+
def setup_module(mod):
tgen = Topogen(build_topo, mod.__name__)
pe1.run("ip link add Service type vrf table 1002")
pe1.run("ip link set up dev Service")
pe1.run("ip link set pe1-eth1 master Service")
+ pe1.run("ip link set pe1-eth3 master Customer")
pe1.run("sysctl -w net.mpls.conf.pe1-eth2.input=1")
rr1.run("sysctl -w net.mpls.conf.rr1-eth0.input=1")
def _bgp_check_received_routes_due_originator_id():
output = json.loads(pe1.vtysh_cmd("show bgp ipv4 vpn summary json"))
- expected = {"peers": {"10.10.10.101": {"pfxRcd": 0, "pfxSnt": 4}}}
+ expected = {"peers": {"10.10.10.101": {"pfxRcd": 0, "pfxSnt": 5}}}
return topotest.json_cmp(output, expected)
test_func = functools.partial(_bgp_check_received_routes_due_originator_id)
def _bgp_check_received_routes_with_modified_rts():
output = json.loads(pe1.vtysh_cmd("show bgp ipv4 vpn summary json"))
- expected = {"peers": {"10.10.10.101": {"pfxRcd": 4, "pfxSnt": 4}}}
+ expected = {"peers": {"10.10.10.101": {"pfxRcd": 5, "pfxSnt": 5}}}
return topotest.json_cmp(output, expected)
test_func = functools.partial(_bgp_check_received_routes_with_modified_rts)
expected = {
"paths": [
{
- "community": {
- "string": "65001:111"
- },
+ "community": {"string": "65001:111"},
"extendedCommunity": {
"string": "RT:192.168.1.2:2 RT:192.168.2.2:2"
},
result is None
), "Failed, routes are not imported from RR1 with modified RT list"
+ step("Check if 192.0.2.0/24 is imported to vrf Service from vrf Customer")
+
+ def _bgp_check_imported_local_routes_from_vrf_service():
+ output = json.loads(
+ pe1.vtysh_cmd("show ip route vrf Service 192.0.2.0/24 json")
+ )
+ expected = {
+ "192.0.2.0/24": [
+ {
+ "vrfName": "Service",
+ "table": 1002,
+ "installed": True,
+ "selected": True,
+ "nexthops": [
+ {
+ "fib": True,
+ "vrf": "Customer",
+ "active": True,
+ }
+ ],
+ }
+ ]
+ }
+ return topotest.json_cmp(output, expected)
+
+ test_func = functools.partial(_bgp_check_imported_local_routes_from_vrf_service)
+ _, result = topotest.run_and_expect(test_func, None, count=60, wait=1)
+ assert (
+ result is None
+ ), "Failed, didn't import local route 192.0.2.0/24 from vrf Customer to vrf Service"
+
step("Check if 172.16.255.1/32 is announced to CE2")
def _bgp_check_received_routes_from_pe():
test_func = functools.partial(_bgp_check_received_routes_from_pe)
_, result = topotest.run_and_expect(test_func, None, count=60, wait=1)
- assert (
- result is None
- ), "Failed, didn't receive 172.16.255.1/32 from PE1"
+ assert result is None, "Failed, didn't receive 172.16.255.1/32 from PE1"
if __name__ == "__main__":