summaryrefslogtreecommitdiff
path: root/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py')
-rw-r--r--[-rwxr-xr-x]tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py930
1 files changed, 744 insertions, 186 deletions
diff --git a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
index e99111d90b..b3b7256ac4 100755..100644
--- a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
+++ b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
@@ -37,6 +37,8 @@ Test steps
- Verify clear bgp
- Test bgp convergence with loopback interface
- Test advertise network using network command
+- Verify routes not installed in zebra when /32 routes received
+ with loopback BGP session subnet
"""
import os
@@ -48,8 +50,8 @@ from copy import deepcopy
# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
-sys.path.append(os.path.join(CWD, '../'))
-sys.path.append(os.path.join(CWD, '../lib/'))
+sys.path.append(os.path.join(CWD, "../"))
+sys.path.append(os.path.join(CWD, "../lib/"))
# Required to instantiate the topology builder class.
@@ -59,26 +61,60 @@ from lib.topogen import Topogen, get_topogen
from mininet.topo import Topo
from lib.common_config import (
- start_topology, write_test_header,
- write_test_footer, reset_config_on_routers, create_static_routes,
- verify_rib, verify_admin_distance_for_static_routes
+ step,
+ start_topology,
+ write_test_header,
+ write_test_footer,
+ reset_config_on_routers,
+ create_static_routes,
+ verify_rib,
+ verify_admin_distance_for_static_routes,
+ check_address_types,
+ apply_raw_config,
+ addKernelRoute,
+ verify_fib_routes,
+ create_prefix_lists,
+ create_route_maps,
+ verify_bgp_community,
+ required_linux_kernel_version,
)
from lib.topolog import logger
from lib.bgp import (
- verify_bgp_convergence, create_router_bgp, verify_router_id,
- modify_as_number, verify_as_numbers, clear_bgp_and_verify,
- verify_bgp_timers_and_functionality
+ verify_bgp_convergence,
+ create_router_bgp,
+ verify_router_id,
+ modify_as_number,
+ verify_as_numbers,
+ clear_bgp_and_verify,
+ verify_bgp_timers_and_functionality,
+ verify_bgp_rib,
)
from lib.topojson import build_topo_from_json, build_config_from_json
# Reading the data from JSON File for topology creation
jsonFile = "{}/bgp_basic_functionality.json".format(CWD)
try:
- with open(jsonFile, 'r') as topoJson:
+ with open(jsonFile, "r") as topoJson:
topo = json.load(topoJson)
except IOError:
assert False, "Could not read file {}".format(jsonFile)
+# Global Variable
+KEEPALIVETIMER = 2
+HOLDDOWNTIMER = 6
+r1_ipv4_loopback = "1.0.1.0/24"
+r2_ipv4_loopback = "1.0.2.0/24"
+r3_ipv4_loopback = "1.0.3.0/24"
+r4_ipv4_loopback = "1.0.4.0/24"
+r1_ipv6_loopback = "2001:db8:f::1:0/120"
+r2_ipv6_loopback = "2001:db8:f::2:0/120"
+r3_ipv6_loopback = "2001:db8:f::3:0/120"
+r4_ipv6_loopback = "2001:db8:f::4:0/120"
+NETWORK = {
+ "ipv4": ["100.1.1.1/32", "100.1.1.2/32"],
+ "ipv6": ["100::1/128", "100::2/128"],
+}
+
class CreateTopo(Topo):
"""
@@ -102,6 +138,11 @@ def setup_module(mod):
* `mod`: module name
"""
+ # Required linux kernel version for this suite to run.
+ result = required_linux_kernel_version("4.15")
+ if result is not True:
+ pytest.skip("Kernel requirements are not met")
+
testsuite_run_time = time.asctime(time.localtime(time.time()))
logger.info("Testsuite start time: {}".format(testsuite_run_time))
logger.info("=" * 40)
@@ -119,10 +160,13 @@ def setup_module(mod):
# Creating configuration from JSON
build_config_from_json(tgen, topo)
+ global ADDR_TYPES
global BGP_CONVERGENCE
+ ADDR_TYPES = check_address_types()
BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
- assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}". \
- format(BGP_CONVERGENCE)
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error: {}".format(
+ BGP_CONVERGENCE
+ )
logger.info("Running setup_module() done")
@@ -137,8 +181,9 @@ def teardown_module():
# Stop toplogy and Remove tmp files
tgen.stop_topology()
- logger.info("Testsuite end time: {}".
- format(time.asctime(time.localtime(time.time()))))
+ logger.info(
+ "Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
+ )
logger.info("=" * 40)
@@ -154,7 +199,7 @@ def test_modify_and_delete_router_id(request):
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
- pytest.skip('skipped because of BGP Convergence failure')
+ pytest.skip("skipped because of BGP Convergence failure")
# test case name
tc_name = request.node.name
@@ -162,59 +207,31 @@ def test_modify_and_delete_router_id(request):
# Modify router id
input_dict = {
- 'r1': {
- "bgp": {
- 'router_id': '12.12.12.12'
- }
- },
- 'r2': {
- "bgp": {
- 'router_id': '22.22.22.22'
- }
- },
- 'r3': {
- "bgp": {
- 'router_id': '33.33.33.33'
- }
- },
+ "r1": {"bgp": {"router_id": "12.12.12.12"}},
+ "r2": {"bgp": {"router_id": "22.22.22.22"}},
+ "r3": {"bgp": {"router_id": "33.33.33.33"}},
}
result = create_router_bgp(tgen, topo, input_dict)
- assert result is True, "Testcase {} :Failed \n Error: {}".\
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
# Verifying router id once modified
result = verify_router_id(tgen, topo, input_dict)
- assert result is True, "Testcase {} :Failed \n Error: {}".\
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
# Delete router id
input_dict = {
- 'r1': {
- "bgp": {
- 'del_router_id': True
- }
- },
- 'r2': {
- "bgp": {
- 'del_router_id': True
- }
- },
- 'r3': {
- "bgp": {
- 'del_router_id': True
- }
- },
+ "r1": {"bgp": {"del_router_id": True}},
+ "r2": {"bgp": {"del_router_id": True}},
+ "r3": {"bgp": {"del_router_id": True}},
}
result = create_router_bgp(tgen, topo, input_dict)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
# Verifying router id once deleted
# Once router-id is deleted, highest interface ip should become
# router-id
result = verify_router_id(tgen, topo, input_dict)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
write_test_footer(tc_name)
@@ -226,41 +243,94 @@ def test_bgp_config_with_4byte_as_number(request):
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
- pytest.skip('skipped because of BGP Convergence failure')
+ pytest.skip("skipped because of BGP Convergence failure")
# test case name
tc_name = request.node.name
write_test_header(tc_name)
input_dict = {
- "r1": {
- "bgp": {
- "local_as": 131079
- }
- },
- "r2": {
- "bgp": {
- "local_as": 131079
- }
- },
- "r3": {
- "bgp": {
- "local_as": 131079
- }
- },
- "r4": {
- "bgp": {
- "local_as": 131080
- }
- }
+ "r1": {"bgp": {"local_as": 131079}},
+ "r2": {"bgp": {"local_as": 131079}},
+ "r3": {"bgp": {"local_as": 131079}},
+ "r4": {"bgp": {"local_as": 131080}},
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
+
+ result = verify_as_numbers(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_config_with_invalid_ASN_p2(request):
+ """
+ Configure BGP with invalid ASN(ex - 0, reserved ASN) and verify test case
+ ended up with error
+ """
+
+ tgen = get_topogen()
+ global BGP_CONVERGENCE
+
+ if BGP_CONVERGENCE != True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Api call to modify AS number
+ input_dict = {
+ "r1": {"bgp": {"local_as": 0,}},
+ "r2": {"bgp": {"local_as": 0,}},
+ "r3": {"bgp": {"local_as": 0,}},
+ "r4": {"bgp": {"local_as": 64000,}},
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+ try:
+ assert result is True
+ except AssertionError:
+ logger.info("Expected behaviour: {}".format(result))
+ logger.info("BGP config is not created because of invalid ASNs")
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_config_with_2byteAS_and_4byteAS_number_p1(request):
+ """
+ Configure BGP with 4 byte and 2 byte ASN and verify BGP is converged
+ """
+
+ tgen = get_topogen()
+ global BGP_CONVERGENCE
+
+ if BGP_CONVERGENCE != True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Api call to modify AS number
+ input_dict = {
+ "r1": {"bgp": {"local_as": 131079}},
+ "r2": {"bgp": {"local_as": 131079}},
+ "r3": {"bgp": {"local_as": 131079}},
+ "r4": {"bgp": {"local_as": 111}},
}
result = modify_as_number(tgen, topo, input_dict)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ if result != True:
+ assert False, "Testcase " + tc_name + " :Failed \n Error: {}".format(result)
result = verify_as_numbers(tgen, topo, input_dict)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ if result != True:
+ assert False, "Testcase " + tc_name + " :Failed \n Error: {}".format(result)
+
+ # Api call verify whether BGP is converged
+ result = verify_bgp_convergence(tgen, topo)
+ if result != True:
+ assert False, "Testcase " + tc_name + " :Failed \n Error: {}".format(result)
write_test_footer(tc_name)
@@ -272,7 +342,7 @@ def test_bgp_timers_functionality(request):
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
- pytest.skip('skipped because of BGP Convergence failure')
+ pytest.skip("skipped because of BGP Convergence failure")
# test case name
tc_name = request.node.name
@@ -290,10 +360,10 @@ def test_bgp_timers_functionality(request):
"unicast": {
"neighbor": {
"r2": {
- "dest_link":{
+ "dest_link": {
"r1": {
- "keepalivetimer": 60,
- "holddowntimer": 180,
+ "keepalivetimer": KEEPALIVETIMER,
+ "holddowntimer": HOLDDOWNTIMER,
}
}
}
@@ -305,28 +375,24 @@ def test_bgp_timers_functionality(request):
}
}
result = create_router_bgp(tgen, topo, deepcopy(input_dict))
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
# Api call to clear bgp, so timer modification would take place
- clear_bgp_and_verify(tgen, topo, 'r1')
+ clear_bgp_and_verify(tgen, topo, "r1")
# Verifying bgp timers functionality
result = verify_bgp_timers_and_functionality(tgen, topo, input_dict)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
write_test_footer(tc_name)
-
-
def test_static_routes(request):
""" Test to create and verify static routes. """
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
- pytest.skip('skipped because of BGP Convergence failure')
+ pytest.skip("skipped because of BGP Convergence failure")
# test case name
tc_name = request.node.name
@@ -338,17 +404,18 @@ def test_static_routes(request):
# Api call to create static routes
input_dict = {
"r1": {
- "static_routes": [{
- "network": "10.0.20.1/32",
- "no_of_ip": 9,
- "admin_distance": 100,
- "next_hop": "10.0.0.2"
- }]
+ "static_routes": [
+ {
+ "network": "10.0.20.1/32",
+ "no_of_ip": 9,
+ "admin_distance": 100,
+ "next_hop": "10.0.0.2",
+ }
+ ]
}
}
result = create_static_routes(tgen, input_dict)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
# Api call to redistribute static routes
input_dict_1 = {
@@ -359,7 +426,7 @@ def test_static_routes(request):
"unicast": {
"redistribute": [
{"redist_type": "static"},
- {"redist_type": "connected"}
+ {"redist_type": "connected"},
]
}
}
@@ -369,17 +436,16 @@ def test_static_routes(request):
}
result = create_router_bgp(tgen, topo, input_dict_1)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
# Verifying RIB routes
- dut = 'r3'
- protocol = 'bgp'
- next_hop = ['10.0.0.2', '10.0.0.5']
- result = verify_rib(tgen, 'ipv4', dut, input_dict, next_hop=next_hop,
- protocol=protocol)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ dut = "r3"
+ protocol = "bgp"
+ next_hop = ["10.0.0.2", "10.0.0.5"]
+ result = verify_rib(
+ tgen, "ipv4", dut, input_dict, next_hop=next_hop, protocol=protocol
+ )
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
write_test_footer(tc_name)
@@ -389,7 +455,7 @@ def test_admin_distance_for_existing_static_routes(request):
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
- pytest.skip('skipped because of BGP Convergence failure')
+ pytest.skip("skipped because of BGP Convergence failure")
# test case name
tc_name = request.node.name
@@ -400,21 +466,21 @@ def test_admin_distance_for_existing_static_routes(request):
input_dict = {
"r1": {
- "static_routes": [{
- "network": "10.0.20.1/32",
- "admin_distance": 10,
- "next_hop": "10.0.0.2"
- }]
+ "static_routes": [
+ {
+ "network": "10.0.20.1/32",
+ "admin_distance": 10,
+ "next_hop": "10.0.0.2",
+ }
+ ]
}
}
result = create_static_routes(tgen, input_dict)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
# Verifying admin distance once modified
result = verify_admin_distance_for_static_routes(tgen, input_dict)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
write_test_footer(tc_name)
@@ -424,7 +490,7 @@ def test_advertise_network_using_network_command(request):
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
- pytest.skip('skipped because of BGP Convergence failure')
+ pytest.skip("skipped because of BGP Convergence failure")
# test case name
tc_name = request.node.name
@@ -441,14 +507,8 @@ def test_advertise_network_using_network_command(request):
"ipv4": {
"unicast": {
"advertise_networks": [
- {
- "network": "20.0.0.0/32",
- "no_of_network": 10
- },
- {
- "network": "30.0.0.0/32",
- "no_of_network": 10
- }
+ {"network": "20.0.0.0/32", "no_of_network": 10},
+ {"network": "30.0.0.0/32", "no_of_network": 10},
]
}
}
@@ -458,15 +518,13 @@ def test_advertise_network_using_network_command(request):
}
result = create_router_bgp(tgen, topo, input_dict)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
# Verifying RIB routes
- dut = 'r2'
+ dut = "r2"
protocol = "bgp"
- result = verify_rib(tgen, 'ipv4', dut, input_dict, protocol=protocol)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
write_test_footer(tc_name)
@@ -479,7 +537,7 @@ def test_clear_bgp_and_verify(request):
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
- pytest.skip('skipped because of BGP Convergence failure')
+ pytest.skip("skipped because of BGP Convergence failure")
# test case name
tc_name = request.node.name
@@ -489,9 +547,245 @@ def test_clear_bgp_and_verify(request):
reset_config_on_routers(tgen)
# clear ip bgp
- result = clear_bgp_and_verify(tgen, topo, 'r1')
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ result = clear_bgp_and_verify(tgen, topo, "r1")
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_BGP_attributes_with_vrf_default_keyword_p0(request):
+ """
+ TC_9:
+ Verify BGP functionality for default vrf with
+ "vrf default" keyword.
+ """
+
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ # reset_config_on_routers(tgen)
+
+ step("Configure static routes and redistribute in BGP on R3")
+ for addr_type in ADDR_TYPES:
+ input_dict = {
+ "r3": {
+ "static_routes": [
+ {
+ "network": NETWORK[addr_type][0],
+ "no_of_ip": 4,
+ "next_hop": "Null0",
+ }
+ ]
+ }
+ }
+
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ input_dict_2 = {
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
+ "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "Create a route-map to match a specific prefix and modify"
+ "BGP attributes for matched prefix"
+ )
+ input_dict_2 = {
+ "r3": {
+ "prefix_lists": {
+ "ipv4": {
+ "ABC": [
+ {
+ "seqid": 10,
+ "action": "permit",
+ "network": NETWORK["ipv4"][0],
+ }
+ ]
+ },
+ "ipv6": {
+ "XYZ": [
+ {
+ "seqid": 100,
+ "action": "permit",
+ "network": NETWORK["ipv6"][0],
+ }
+ ]
+ },
+ }
+ }
+ }
+ result = create_prefix_lists(tgen, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ for addr_type in ADDR_TYPES:
+ if addr_type == "ipv4":
+ pf_list = "ABC"
+ else:
+ pf_list = "XYZ"
+
+ input_dict_6 = {
+ "r3": {
+ "route_maps": {
+ "BGP_ATTR_{}".format(addr_type): [
+ {
+ "action": "permit",
+ "seq_id": 10,
+ "match": {addr_type: {"prefix_lists": pf_list}},
+ "set": {
+ "aspath": {"as_num": 500, "as_action": "prepend"},
+ "localpref": 500,
+ "origin": "egp",
+ "community": {"num": "500:500", "action": "additive"},
+ "large_community": {
+ "num": "500:500:500",
+ "action": "additive",
+ },
+ },
+ },
+ {"action": "permit", "seq_id": 20},
+ ]
+ },
+ "BGP_ATTR_{}".format(addr_type): [
+ {
+ "action": "permit",
+ "seq_id": 100,
+ "match": {addr_type: {"prefix_lists": pf_list}},
+ "set": {
+ "aspath": {"as_num": 500, "as_action": "prepend"},
+ "localpref": 500,
+ "origin": "egp",
+ "community": {"num": "500:500", "action": "additive"},
+ "large_community": {
+ "num": "500:500:500",
+ "action": "additive",
+ },
+ },
+ },
+ {"action": "permit", "seq_id": 200},
+ ],
+ }
+ }
+
+ result = create_route_maps(tgen, input_dict_6)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ step("Apply the route-map on R3 in outbound direction for peer R4")
+
+ input_dict_7 = {
+ "r3": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r4": {
+ "dest_link": {
+ "r3": {
+ "route_maps": [
+ {
+ "name": "BGP_ATTR_ipv4",
+ "direction": "out",
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r4": {
+ "dest_link": {
+ "r3": {
+ "route_maps": [
+ {
+ "name": "BGP_ATTR_ipv6",
+ "direction": "out",
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ },
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict_7)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step(
+ "verify modified attributes for specific prefix with 'vrf default'"
+ "keyword on R4"
+ )
+ for addr_type in ADDR_TYPES:
+ dut = "r4"
+ input_dict = {
+ "r3": {
+ "static_routes": [
+ {
+ "network": NETWORK[addr_type][0],
+ "vrf": "default",
+ "largeCommunity": "500:500:500",
+ }
+ ]
+ }
+ }
+
+ result = verify_bgp_rib(tgen, addr_type, dut, input_dict)
+ assert result is True, "Testcase : Failed \n Error: {}".format(tc_name, result)
+ result = verify_rib(tgen, addr_type, dut, input_dict)
+ assert result is True, "Testcase : Failed \n Error: {}".format(tc_name, result)
+
+ for addr_type in ADDR_TYPES:
+ dut = "r4"
+ input_dict = {
+ "r3": {
+ "static_routes": [
+ {
+ "network": NETWORK[addr_type][0],
+ "vrf": "default",
+ "community": "500:500",
+ }
+ ]
+ }
+ }
+
+ result = verify_bgp_rib(tgen, addr_type, dut, input_dict)
+ assert result is True, "Testcase : Failed \n Error: {}".format(tc_name, result)
+ result = verify_rib(tgen, addr_type, dut, input_dict)
+ assert result is True, "Testcase : Failed \n Error: {}".format(tc_name, result)
+
+ input_dict_4 = {"largeCommunity": "500:500:500", "community": "500:500"}
+
+ result = verify_bgp_community(
+ tgen, addr_type, dut, [NETWORK[addr_type][0]], input_dict_4
+ )
+ assert result is True, "Test case {} : Should fail \n Error: {}".format(
+ tc_name, result
+ )
write_test_footer(tc_name)
@@ -508,7 +802,7 @@ def test_bgp_with_loopback_interface(request):
tgen = get_topogen()
if BGP_CONVERGENCE is not True:
- pytest.skip('skipped because of BGP Convergence failure')
+ pytest.skip("skipped because of BGP Convergence failure")
# test case name
tc_name = request.node.name
@@ -517,79 +811,343 @@ def test_bgp_with_loopback_interface(request):
# Creating configuration from JSON
reset_config_on_routers(tgen)
- for routerN in sorted(topo['routers'].keys()):
- for bgp_neighbor in \
- topo['routers'][routerN]['bgp']['address_family']['ipv4'][
- 'unicast']['neighbor'].keys():
+ for routerN in sorted(topo["routers"].keys()):
+ for bgp_neighbor in topo["routers"][routerN]["bgp"]["address_family"]["ipv4"][
+ "unicast"
+ ]["neighbor"].keys():
# Adding ['source_link'] = 'lo' key:value pair
- topo['routers'][routerN]['bgp']['address_family']['ipv4'][
- 'unicast']['neighbor'][bgp_neighbor]["dest_link"] = {
- 'lo': {
- "source_link": "lo",
- }
- }
+ topo["routers"][routerN]["bgp"]["address_family"]["ipv4"]["unicast"][
+ "neighbor"
+ ][bgp_neighbor]["dest_link"] = {"lo": {"source_link": "lo",}}
# Creating configuration from JSON
build_config_from_json(tgen, topo)
input_dict = {
"r1": {
- "static_routes": [{
- "network": "1.0.2.17/32",
- "next_hop": "10.0.0.2"
- },
- {
- "network": "1.0.3.17/32",
- "next_hop": "10.0.0.6"
- }
+ "static_routes": [
+ {"network": "1.0.2.17/32", "next_hop": "10.0.0.2"},
+ {"network": "1.0.3.17/32", "next_hop": "10.0.0.6"},
]
},
"r2": {
- "static_routes": [{
- "network": "1.0.1.17/32",
- "next_hop": "10.0.0.1"
- },
- {
- "network": "1.0.3.17/32",
- "next_hop": "10.0.0.10"
- }
+ "static_routes": [
+ {"network": "1.0.1.17/32", "next_hop": "10.0.0.1"},
+ {"network": "1.0.3.17/32", "next_hop": "10.0.0.10"},
]
},
"r3": {
- "static_routes": [{
- "network": "1.0.1.17/32",
- "next_hop": "10.0.0.5"
- },
- {
- "network": "1.0.2.17/32",
- "next_hop": "10.0.0.9"
- },
- {
- "network": "1.0.4.17/32",
- "next_hop": "10.0.0.14"
- }
+ "static_routes": [
+ {"network": "1.0.1.17/32", "next_hop": "10.0.0.5"},
+ {"network": "1.0.2.17/32", "next_hop": "10.0.0.9"},
+ {"network": "1.0.4.17/32", "next_hop": "10.0.0.14"},
]
},
- "r4": {
- "static_routes": [{
- "network": "1.0.3.17/32",
- "next_hop": "10.0.0.13"
- }]
- }
+ "r4": {"static_routes": [{"network": "1.0.3.17/32", "next_hop": "10.0.0.13"}]},
}
result = create_static_routes(tgen, input_dict)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
# Api call verify whether BGP is converged
result = verify_bgp_convergence(tgen, topo)
- assert result is True, "Testcase {} :Failed \n Error: {}". \
- format(tc_name, result)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_bgp_with_loopback_with_same_subnet_p1(request):
+ """
+ Verify routes not installed in zebra when /32 routes received
+ with loopback BGP session subnet
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip("skipped because of BGP Convergence failure")
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+ step("Delete BGP seesion created initially")
+ input_dict_r1 = {
+ "r1": {"bgp": {"delete": True}},
+ "r2": {"bgp": {"delete": True}},
+ "r3": {"bgp": {"delete": True}},
+ "r4": {"bgp": {"delete": True}},
+ }
+ result = create_router_bgp(tgen, topo, input_dict_r1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("Create BGP session over loop address")
+ topo_modify = deepcopy(topo)
+
+ for routerN in sorted(topo["routers"].keys()):
+ for addr_type in ADDR_TYPES:
+ for bgp_neighbor in topo_modify["routers"][routerN]["bgp"][
+ "address_family"
+ ][addr_type]["unicast"]["neighbor"].keys():
+
+ # Adding ['source_link'] = 'lo' key:value pair
+ topo_modify["routers"][routerN]["bgp"]["address_family"][addr_type][
+ "unicast"
+ ]["neighbor"][bgp_neighbor]["dest_link"] = {
+ "lo": {"source_link": "lo", "ebgp_multihop": 2}
+ }
+
+ result = create_router_bgp(tgen, topo_modify["routers"])
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+ step("Disable IPv6 BGP nbr from ipv4 address family")
+ raw_config = {
+ "r1": {
+ "raw_config": [
+ "router bgp {}".format(topo["routers"]["r1"]["bgp"]["local_as"]),
+ "address-family ipv4 unicast",
+ "no neighbor {} activate".format(
+ topo["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0]
+ ),
+ "no neighbor {} activate".format(
+ topo["routers"]["r3"]["links"]["lo"]["ipv6"].split("/")[0]
+ ),
+ ]
+ },
+ "r2": {
+ "raw_config": [
+ "router bgp {}".format(topo["routers"]["r2"]["bgp"]["local_as"]),
+ "address-family ipv4 unicast",
+ "no neighbor {} activate".format(
+ topo["routers"]["r1"]["links"]["lo"]["ipv6"].split("/")[0]
+ ),
+ "no neighbor {} activate".format(
+ topo["routers"]["r3"]["links"]["lo"]["ipv6"].split("/")[0]
+ ),
+ ]
+ },
+ "r3": {
+ "raw_config": [
+ "router bgp {}".format(topo["routers"]["r3"]["bgp"]["local_as"]),
+ "address-family ipv4 unicast",
+ "no neighbor {} activate".format(
+ topo["routers"]["r1"]["links"]["lo"]["ipv6"].split("/")[0]
+ ),
+ "no neighbor {} activate".format(
+ topo["routers"]["r2"]["links"]["lo"]["ipv6"].split("/")[0]
+ ),
+ "no neighbor {} activate".format(
+ topo["routers"]["r4"]["links"]["lo"]["ipv6"].split("/")[0]
+ ),
+ ]
+ },
+ "r4": {
+ "raw_config": [
+ "router bgp {}".format(topo["routers"]["r4"]["bgp"]["local_as"]),
+ "address-family ipv4 unicast",
+ "no neighbor {} activate".format(
+ topo["routers"]["r3"]["links"]["lo"]["ipv6"].split("/")[0]
+ ),
+ ]
+ },
+ }
+
+ step("Configure kernel routes")
+ result = apply_raw_config(tgen, raw_config)
+ assert result is True, "Testcase {} : Failed Error: {}".format(tc_name, result)
+
+ r1_ipv4_lo = topo["routers"]["r1"]["links"]["lo"]["ipv4"]
+ r1_ipv6_lo = topo["routers"]["r1"]["links"]["lo"]["ipv6"]
+ r2_ipv4_lo = topo["routers"]["r2"]["links"]["lo"]["ipv4"]
+ r2_ipv6_lo = topo["routers"]["r2"]["links"]["lo"]["ipv6"]
+ r3_ipv4_lo = topo["routers"]["r3"]["links"]["lo"]["ipv4"]
+ r3_ipv6_lo = topo["routers"]["r3"]["links"]["lo"]["ipv6"]
+ r4_ipv4_lo = topo["routers"]["r4"]["links"]["lo"]["ipv4"]
+ r4_ipv6_lo = topo["routers"]["r4"]["links"]["lo"]["ipv6"]
+
+ r1_r2 = topo["routers"]["r1"]["links"]["r2"]["ipv6"].split("/")[0]
+ r2_r1 = topo["routers"]["r2"]["links"]["r1"]["ipv6"].split("/")[0]
+ r1_r3 = topo["routers"]["r1"]["links"]["r3"]["ipv6"].split("/")[0]
+ r3_r1 = topo["routers"]["r3"]["links"]["r1"]["ipv6"].split("/")[0]
+ r2_r3 = topo["routers"]["r2"]["links"]["r3"]["ipv6"].split("/")[0]
+ r3_r2 = topo["routers"]["r3"]["links"]["r2"]["ipv6"].split("/")[0]
+ r3_r4 = topo["routers"]["r3"]["links"]["r4"]["ipv6"].split("/")[0]
+ r4_r3 = topo["routers"]["r4"]["links"]["r3"]["ipv6"].split("/")[0]
+
+ r1_r2_ipv4 = topo["routers"]["r1"]["links"]["r2"]["ipv4"].split("/")[0]
+ r2_r1_ipv4 = topo["routers"]["r2"]["links"]["r1"]["ipv4"].split("/")[0]
+ r1_r3_ipv4 = topo["routers"]["r1"]["links"]["r3"]["ipv4"].split("/")[0]
+ r3_r1_ipv4 = topo["routers"]["r3"]["links"]["r1"]["ipv4"].split("/")[0]
+ r2_r3_ipv4 = topo["routers"]["r2"]["links"]["r3"]["ipv4"].split("/")[0]
+ r3_r2_ipv4 = topo["routers"]["r3"]["links"]["r2"]["ipv4"].split("/")[0]
+ r3_r4_ipv4 = topo["routers"]["r3"]["links"]["r4"]["ipv4"].split("/")[0]
+ r4_r3_ipv4 = topo["routers"]["r4"]["links"]["r3"]["ipv4"].split("/")[0]
+
+ r1_r2_intf = topo["routers"]["r1"]["links"]["r2"]["interface"]
+ r2_r1_intf = topo["routers"]["r2"]["links"]["r1"]["interface"]
+ r1_r3_intf = topo["routers"]["r1"]["links"]["r3"]["interface"]
+ r3_r1_intf = topo["routers"]["r3"]["links"]["r1"]["interface"]
+ r2_r3_intf = topo["routers"]["r2"]["links"]["r3"]["interface"]
+ r3_r2_intf = topo["routers"]["r3"]["links"]["r2"]["interface"]
+ r3_r4_intf = topo["routers"]["r3"]["links"]["r4"]["interface"]
+ r4_r3_intf = topo["routers"]["r4"]["links"]["r3"]["interface"]
+
+ ipv4_list = [
+ ("r1", r1_r2_intf, r2_ipv4_loopback),
+ ("r1", r1_r3_intf, r3_ipv4_loopback),
+ ("r2", r2_r1_intf, r1_ipv4_loopback),
+ ("r2", r2_r3_intf, r3_ipv4_loopback),
+ ("r3", r3_r1_intf, r1_ipv4_loopback),
+ ("r3", r3_r2_intf, r2_ipv4_loopback),
+ ("r3", r3_r4_intf, r4_ipv4_loopback),
+ ("r4", r4_r3_intf, r3_ipv4_loopback),
+ ]
+
+ ipv6_list = [
+ ("r1", r1_r2_intf, r2_ipv6_loopback, r2_r1),
+ ("r1", r1_r3_intf, r3_ipv6_loopback, r3_r1),
+ ("r2", r2_r1_intf, r1_ipv6_loopback, r1_r2),
+ ("r2", r2_r3_intf, r3_ipv6_loopback, r3_r2),
+ ("r3", r3_r1_intf, r1_ipv6_loopback, r1_r3),
+ ("r3", r3_r2_intf, r2_ipv6_loopback, r2_r3),
+ ("r3", r3_r4_intf, r4_ipv6_loopback, r4_r3),
+ ("r4", r4_r3_intf, r3_ipv6_loopback, r3_r4),
+ ]
+
+ for dut, intf, loop_addr in ipv4_list:
+ result = addKernelRoute(tgen, dut, intf, loop_addr)
+ assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result)
+
+ for dut, intf, loop_addr, next_hop in ipv6_list:
+ result = addKernelRoute(tgen, dut, intf, loop_addr, next_hop)
+ assert result is True, "Testcase {}:Failed \n Error: {}".format(tc_name, result)
+
+ step("Configure static routes")
+
+ input_dict = {
+ "r1": {
+ "static_routes": [
+ {"network": r2_ipv4_loopback, "next_hop": r2_r1_ipv4},
+ {"network": r3_ipv4_loopback, "next_hop": r3_r1_ipv4},
+ {"network": r2_ipv6_loopback, "next_hop": r2_r1},
+ {"network": r3_ipv6_loopback, "next_hop": r3_r1},
+ ]
+ },
+ "r2": {
+ "static_routes": [
+ {"network": r1_ipv4_loopback, "next_hop": r1_r2_ipv4},
+ {"network": r3_ipv4_loopback, "next_hop": r3_r2_ipv4},
+ {"network": r1_ipv6_loopback, "next_hop": r1_r2},
+ {"network": r3_ipv6_loopback, "next_hop": r3_r2},
+ ]
+ },
+ "r3": {
+ "static_routes": [
+ {"network": r1_ipv4_loopback, "next_hop": r1_r3_ipv4},
+ {"network": r2_ipv4_loopback, "next_hop": r2_r3_ipv4},
+ {"network": r4_ipv4_loopback, "next_hop": r4_r3_ipv4},
+ {"network": r1_ipv6_loopback, "next_hop": r1_r3},
+ {"network": r2_ipv6_loopback, "next_hop": r2_r3},
+ {"network": r4_ipv6_loopback, "next_hop": r4_r3},
+ ]
+ },
+ "r4": {
+ "static_routes": [
+ {"network": r3_ipv4_loopback, "next_hop": r3_r4_ipv4},
+ {"network": r3_ipv6_loopback, "next_hop": r3_r4},
+ ]
+ },
+ }
+ result = create_static_routes(tgen, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
+
+ step("Verify BGP session convergence")
+
+ result = verify_bgp_convergence(tgen, topo_modify)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
+
+ step("Configure redistribute connected on R2 and R4")
+ input_dict_1 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {"redistribute": [{"redist_type": "connected"}]}
+ },
+ "ipv6": {
+ "unicast": {"redistribute": [{"redist_type": "connected"}]}
+ },
+ }
+ }
+ },
+ "r4": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {"redistribute": [{"redist_type": "connected"}]}
+ },
+ "ipv6": {
+ "unicast": {"redistribute": [{"redist_type": "connected"}]}
+ },
+ }
+ }
+ },
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
+
+ step("Verify Ipv4 and Ipv6 network installed in R1 RIB but not in FIB")
+ input_dict_r1 = {
+ "r1": {
+ "static_routes": [
+ {"network": "1.0.2.17/32"},
+ {"network": "2001:db8:f::2:17/128"},
+ ]
+ }
+ }
+
+ dut = "r1"
+ protocol = "bgp"
+ for addr_type in ADDR_TYPES:
+ result = verify_rib(tgen, addr_type, dut, input_dict_r1, protocol=protocol)
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_fib_routes(tgen, addr_type, dut, input_dict_r1, expected=False)
+ assert result is not True, "Testcase {} : Failed \n"
+ "Expected behavior: routes should not present in fib \n"
+ "Error: {}".format(tc_name, result)
+
+ step("Verify Ipv4 and Ipv6 network installed in r3 RIB but not in FIB")
+ input_dict_r3 = {
+ "r3": {
+ "static_routes": [
+ {"network": "1.0.4.17/32"},
+ {"network": "2001:db8:f::4:17/128"},
+ ]
+ }
+ }
+ dut = "r3"
+ protocol = "bgp"
+ for addr_type in ADDR_TYPES:
+ result = verify_rib(
+ tgen, addr_type, dut, input_dict_r3, protocol=protocol, fib=None
+ )
+ assert result is True, "Testcase {} :Failed \n Error: {}".format(
+ tc_name, result
+ )
+
+ result = verify_fib_routes(tgen, addr_type, dut, input_dict_r1, expected=False)
+ assert result is not True, "Testcase {} : Failed \n"
+ "Expected behavior: routes should not present in fib \n"
+ "Error: {}".format(tc_name, result)
write_test_footer(tc_name)
-if __name__ == '__main__':
+if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))