import sys
import functools
import pytest
-import time
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, "../"))
router.load_config(TopoRouter.RD_ZEBRA, "zebra.conf")
router.load_config(TopoRouter.RD_BGP, "bgpd.conf")
tgen.start_router()
- time.sleep(1)
yield tgen
tgen.stop_topology()
pytest.skip("skipped because of previous test failure")
+def find_neighbor_status(router, neighbor_ip):
+ return json.loads(router.vtysh_cmd(f"show bgp neighbors {neighbor_ip} json"))[
+ neighbor_ip
+ ]
+
+
+def check_role_mismatch(router, neighbor_ip):
+ return is_role_mismatch(find_neighbor_status(router, neighbor_ip))
+
+
def is_role_mismatch(neighbor_status):
return (
neighbor_status["bgpState"] != "Established"
)
+def check_session_established(router, neighbor_ip):
+ neighbor_status = find_neighbor_status(router, neighbor_ip)
+ return neighbor_status["bgpState"] == "Established"
+
+
def test_correct_pair(tgen):
# provider-customer pair
+ router = tgen.gears["r1"]
neighbor_ip = "192.168.2.2"
- neighbor_status = json.loads(
- tgen.gears["r1"].vtysh_cmd(f"show bgp neighbors {neighbor_ip} json")
- )[neighbor_ip]
+ check_r2_established = functools.partial(
+ check_session_established, router, neighbor_ip
+ )
+ success, result = topotest.run_and_expect(
+ check_r2_established, True, count=20, wait=3
+ )
+ assert success, "Session with r2 is not Established"
+
+ neighbor_status = find_neighbor_status(router, neighbor_ip)
assert neighbor_status["localRole"] == "provider"
assert neighbor_status["remoteRole"] == "customer"
- assert neighbor_status["bgpState"] == "Established"
assert (
neighbor_status["neighborCapabilities"].get("role") == "advertisedAndReceived"
)
def test_role_pair_mismatch(tgen):
# provider-peer mistmatch
+ router = tgen.gears["r1"]
neighbor_ip = "192.168.3.2"
- neighbor_status = json.loads(
- tgen.gears["r1"].vtysh_cmd(f"show bgp neighbors {neighbor_ip} json")
- )[neighbor_ip]
- assert is_role_mismatch(neighbor_status)
+ check_r3_mismatch = functools.partial(check_role_mismatch, router, neighbor_ip)
+ success, result = topotest.run_and_expect(check_r3_mismatch, True, count=20, wait=3)
+ assert success, "Session with r3 was not correctly closed"
def test_single_role_advertising(tgen):
# provider-undefined pair; we set role
+ router = tgen.gears["r1"]
neighbor_ip = "192.168.4.2"
- neighbor_status = json.loads(
- tgen.gears["r1"].vtysh_cmd(f"show bgp neighbors {neighbor_ip} json")
- )[neighbor_ip]
+ check_r4_established = functools.partial(
+ check_session_established, router, neighbor_ip
+ )
+ success, result = topotest.run_and_expect(
+ check_r4_established, True, count=20, wait=3
+ )
+ assert success, "Session with r4 is not Established"
+
+ neighbor_status = find_neighbor_status(router, neighbor_ip)
assert neighbor_status["localRole"] == "provider"
assert neighbor_status["remoteRole"] == "undefined"
- assert neighbor_status["bgpState"] == "Established"
assert neighbor_status["neighborCapabilities"].get("role") == "advertised"
def test_single_role_receiving(tgen):
# provider-undefined pair; we receive role
+ router = tgen.gears["r4"]
neighbor_ip = "192.168.4.1"
- neighbor_status = json.loads(
- tgen.gears["r4"].vtysh_cmd(f"show bgp neighbors {neighbor_ip} json")
- )[neighbor_ip]
+ check_r1_established = functools.partial(
+ check_session_established, router, neighbor_ip
+ )
+ success, result = topotest.run_and_expect(
+ check_r1_established, True, count=20, wait=3
+ )
+ assert success, "Session with r1 is not Established"
+
+ neighbor_status = find_neighbor_status(router, neighbor_ip)
assert neighbor_status["localRole"] == "undefined"
assert neighbor_status["remoteRole"] == "provider"
- assert neighbor_status["bgpState"] == "Established"
assert neighbor_status["neighborCapabilities"].get("role") == "received"
def test_role_strict_mode(tgen):
# provider-undefined pair with strict-mode
+ router = tgen.gears["r1"]
neighbor_ip = "192.168.5.2"
- neighbor_status = json.loads(
- tgen.gears["r1"].vtysh_cmd(f"show bgp neighbors {neighbor_ip} json")
- )
- assert is_role_mismatch(neighbor_status[neighbor_ip])
+ check_r5_mismatch = functools.partial(check_role_mismatch, router, neighbor_ip)
+ success, result = topotest.run_and_expect(check_r5_mismatch, True, count=20, wait=3)
+ assert success, "Session with r5 was not correctly closed"
if __name__ == "__main__":
import sys
import functools
import pytest
-import time
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, "../"))
router.load_config(TopoRouter.RD_ZEBRA, "zebra.conf")
router.load_config(TopoRouter.RD_BGP, "bgpd.conf")
tgen.start_router()
- BGP_CONVERGENCE = verify_bgp_convergence_from_running_config(tgen)
- assert BGP_CONVERGENCE, f"setup_module :Failed \n Error: {BGP_CONVERGENCE}"
- # Todo: What is the indented way to wait for convergence without json?!
- time.sleep(5)
yield tgen
tgen.stop_topology()
def test_r10_routes(tgen):
# provider-undefine pair bur strict-mode was set
- routes = json.loads(tgen.gears["r10"].vtysh_cmd("show bgp ipv4 json"))["routes"]
- route_list = sorted(routes.keys())
- assert route_list == [
- "192.0.2.1/32",
- "192.0.2.2/32",
- "192.0.2.3/32",
- "192.0.2.4/32",
- "192.0.2.5/32",
- "192.0.2.6/32",
- "192.0.2.7/32",
- ]
+ def _routes_half_converged():
+ routes = json.loads(tgen.gears["r10"].vtysh_cmd("show bgp ipv4 json"))["routes"]
+ output = sorted(routes.keys())
+ expected = [
+ "192.0.2.1/32",
+ "192.0.2.2/32",
+ "192.0.2.3/32",
+ "192.0.2.4/32",
+ "192.0.2.5/32",
+ "192.0.2.6/32",
+ "192.0.2.7/32",
+ ]
+ return output == expected
+
+ success, result = topotest.run_and_expect(
+ _routes_half_converged, True, count=20, wait=3
+ )
+ assert success, "Routes did not converged"
+
routes_with_otc = list()
for number in range(1, 8):
prefix = f"192.0.2.{number}/32"