switch.add_link(tgen.gears["PE2"])
switch.add_link(tgen.gears["host2"])
+
def setup_pe_router(tgen, pe_name, tunnel_local_ip, svi_ip, intf):
pe = tgen.gears[pe_name]
# setup single vxlan device
pe.run(
- "ip link add dev vxlan0 type vxlan dstport 4789 local {0} nolearning external".format(tunnel_local_ip)
+ "ip link add dev vxlan0 type vxlan dstport 4789 local {0} nolearning external".format(
+ tunnel_local_ip
+ )
)
pe.run("ip link set dev vxlan0 master bridge")
pe.run("bridge link set dev vxlan0 vlan_tunnel on")
pe.run("bridge vlan add dev vxlan0 vid 300")
pe.run("bridge vlan add dev vxlan0 vid 300 tunnel_info id 300")
+
def setup_p_router(tgen, p_name):
p1 = tgen.gears[p_name]
p1.run("sysctl -w net.ipv4.ip_forward=1")
+
def setup_module(mod):
"Sets up the pytest environment"
"Teardown the pytest environment"
tgen = get_topogen()
- #tgen.mininet_cli()
+ # tgen.mininet_cli()
# This function tears down the whole topology.
tgen.stop_topology()
)
return None
+
def check_flood_entry_present(pe, vni, vtep):
if not topotest.iproute2_is_fdb_get_capable():
return None
- output = pe.run("bridge fdb get 00:00:00:00:00:00 dev vxlan0 vni {} self".format(vni))
+ output = pe.run(
+ "bridge fdb get 00:00:00:00:00:00 dev vxlan0 vni {} self".format(vni)
+ )
if str(vtep) not in output:
return output
return None
+
def test_pe1_converge_evpn():
"Wait for protocol convergence"
assertmsg = '"{}" Flood FDB Entry for VTEP {} not found'.format(pe1.name, vtep)
assert result is None, assertmsg
+
def test_pe2_converge_evpn():
"Wait for protocol convergence"
tgen = get_topogen()
-#Don't run this test if we have any failure.
+ # Don't run this test if we have any failure.
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
assertmsg = '"{}" Flood FDB Entry for VTEP {} not found'.format(pe2.name, vtep)
assert result is None, assertmsg
+
def mac_learn_test(host, local):
"check the host MAC gets learned by the VNI"
if "HWaddr" in line_items[0]:
mac = line_items[1]
break
- #print(host_output)
+ # print(host_output)
# check we have a local association between the MAC and IP
local_output = local.vtysh_cmd("show evpn mac vni 101 mac {} json".format(mac))
- #print(local_output)
+ # print(local_output)
local_output_json = json.loads(local_output)
mac_type = local_output_json[mac]["type"]
assertmsg = "Failed to learn local IP address on host {}".format(host.name)
remote_output = remote.vtysh_cmd(
"show evpn mac vni 101 mac {} json".format(mac)
)
- #print(remote_output)
+ # print(remote_output)
remote_output_json = json.loads(remote_output)
type = remote_output_json[mac]["type"]
if not remote_output_json[mac]["neighbors"] == "none":
count += 1
sleep(1)
- #print("tries: {}".format(count))
+ # print("tries: {}".format(count))
assertmsg = "{} remote learned mac no address: {} ".format(host.name, mac)
# some debug for this failure
if not converged == True:
log_output = remote.run("cat zebra.log")
- #print(log_output)
+ # print(log_output)
assert converged == True, assertmsg
if remote_output_json[mac]["neighbors"]["active"]:
host1 = tgen.gears["host1"]
pe1 = tgen.gears["PE1"]
pe2 = tgen.gears["PE2"]
- #pe2.vtysh_cmd("debug zebra vxlan")
- #pe2.vtysh_cmd("debug zebra kernel")
+ # pe2.vtysh_cmd("debug zebra vxlan")
+ # pe2.vtysh_cmd("debug zebra kernel")
# lets populate that arp cache
host1.run("ping -c1 10.10.1.1")
ip_learn_test(tgen, host1, pe1, pe2, "10.10.1.55")
host2 = tgen.gears["host2"]
pe1 = tgen.gears["PE1"]
pe2 = tgen.gears["PE2"]
- #pe1.vtysh_cmd("debug zebra vxlan")
- #pe1.vtysh_cmd("debug zebra kernel")
+ # pe1.vtysh_cmd("debug zebra vxlan")
+ # pe1.vtysh_cmd("debug zebra kernel")
# lets populate that arp cache
host2.run("ping -c1 10.10.1.3")
ip_learn_test(tgen, host2, pe2, pe1, "10.10.1.56")
# tgen.mininet_cli()
+
def show_dvni_route(pe, vni, prefix, vrf):
output = pe.vtysh_cmd("show ip route vrf {} {}".format(vrf, prefix))
return None
+
def test_dvni():
"test Downstream VNI works as expected importing into PE1"
_, result = topotest.run_and_expect(test_func, None, count=30, wait=1)
assertmsg = '"{}" DVNI route {} not found'.format(pe1.name, prefix)
assert result is None, assertmsg
- #tgen.mininet_cli()
+ # tgen.mininet_cli()
def test_memory_leak():
host1 = tgen.gears["host1"]
pe1 = tgen.gears["PE1"]
pe2 = tgen.gears["PE2"]
- #pe2.vtysh_cmd("debug zebra vxlan")
- #pe2.vtysh_cmd("debug zebra kernel")
+ # pe2.vtysh_cmd("debug zebra vxlan")
+ # pe2.vtysh_cmd("debug zebra kernel")
# lets populate that arp cache
host1.run("ping -c1 10.10.1.1")
ip_learn_test(tgen, host1, pe1, pe2, "10.10.1.55")
host2 = tgen.gears["host2"]
pe1 = tgen.gears["PE1"]
pe2 = tgen.gears["PE2"]
- #pe1.vtysh_cmd("debug zebra vxlan")
- #pe1.vtysh_cmd("debug zebra kernel")
+ # pe1.vtysh_cmd("debug zebra vxlan")
+ # pe1.vtysh_cmd("debug zebra kernel")
# lets populate that arp cache
host2.run("ping -c1 10.10.1.3")
ip_learn_test(tgen, host2, pe2, pe1, "10.10.1.56")