# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
-sys.path.append(os.path.join(CWD, '../'))
-sys.path.append(os.path.join(CWD, '../lib/'))
+sys.path.append(os.path.join(CWD, "../"))
+sys.path.append(os.path.join(CWD, "../lib/"))
# pylint: disable=C0413
# Import topogen and topotest helpers
from lib.bgp import (
verify_bgp_convergence, create_router_bgp,
clear_bgp_and_verify, verify_bgp_rib
+ start_topology,
+ write_test_header,
+ write_test_footer,
+ reset_config_on_routers,
+ verify_rib,
+ create_static_routes,
+ create_route_maps,
+ check_address_types,
+ step,
+)
+from lib.topolog import logger
+from lib.bgp import (
+ verify_bgp_convergence,
+ create_router_bgp,
+ clear_bgp_and_verify,
+ verify_bgp_rib,
)
from lib.topojson import build_topo_from_json, build_config_from_json
# Reading the data from JSON File for topology creation
jsonFile = "{}/bgp_as_allow_in.json".format(CWD)
try:
- with open(jsonFile, 'r') as topoJson:
+ with open(jsonFile, "r") as topoJson:
topo = json.load(topoJson)
except IOError:
assert False, "Could not read file {}".format(jsonFile)
testsuite_run_time = time.asctime(time.localtime(time.time()))
logger.info("Testsuite start time: {}".format(testsuite_run_time))
- logger.info("="*40)
+ logger.info("=" * 40)
logger.info("Running setup_module to create topology")
# Api call verify whether BGP is converged
BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
- assert BGP_CONVERGENCE is True, ("setup_module :Failed \n Error:"
- " {}".format(BGP_CONVERGENCE))
+ assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error:" " {}".format(
+ BGP_CONVERGENCE
+ )
logger.info("Running setup_module() done")
# Stop toplogy and Remove tmp files
tgen.stop_topology()
- logger.info("Testsuite end time: {}".
- format(time.asctime(time.localtime(time.time()))))
- logger.info("="*40)
+ logger.info(
+ "Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
+ )
+ logger.info("=" * 40)
#####################################################
input_dict_4 = {
"r1": {
"static_routes": [
- {
- "network": NETWORK[addr_type],
- "next_hop": NEXT_HOP_IP[addr_type]
- }
+ {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
]
}
}
logger.info("Configure static routes")
result = create_static_routes(tgen, input_dict_4)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
step("configure redistribute static in Router BGP in R1")
"bgp": {
"address_family": {
addr_type: {
- "unicast": {
- "redistribute": [{
- "redist_type": "static"
- }]
- }
+ "unicast": {"redistribute": [{"redist_type": "static"}]}
}
}
}
}
result = create_router_bgp(tgen, topo, input_dict_2)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
- step('Check BGP table of router R3 using "sh bgp ipv4" and "sh bgp '
- 'ipv6" command.')
- step("We should not see prefix advertised from R1 in R3's BGP "
- "table without allowas-in.")
- logger.info("Verifying %s routes on r3, route should not be present",
- addr_type)
- result = verify_rib(tgen, addr_type, dut, input_dict_4,
- next_hop=NEXT_HOP_IP[addr_type],
- protocol=protocol, expected=False)
+ tc_name, result
+ )
+ step(
+ 'Check BGP table of router R3 using "sh bgp ipv4" and "sh bgp '
+ 'ipv6" command.'
+ )
+ step(
+ "We should not see prefix advertised from R1 in R3's BGP "
+ "table without allowas-in."
+ )
+ logger.info("Verifying %s routes on r3, route should not be present", addr_type)
+ result = verify_rib(
+ tgen,
+ addr_type,
+ dut,
+ input_dict_4,
+ next_hop=NEXT_HOP_IP[addr_type],
+ protocol=protocol,
+ expected=False,
+ )
assert result is not True, "Testcase {} : Failed \n"
"Expected behavior: routes should not present in rib \n"
"Error: {}".format(tc_name, result)
- step('Configure allowas-in on R3 for R2.')
+ step("Configure allowas-in on R3 for R2.")
step("We should see the prefix advertised from R1 in R3's BGP table.")
# Api call to enable allowas-in in bgp process.
input_dict_1 = {
"r3": {
- "bgp": {
- "address_family": {
- addr_type: {
- "unicast": {
- "neighbor": {
- "r2": {
- "dest_link": {
- "r3": {
- "allowas-in": {
- "number_occurences": 1
- }
- }
- }
- }
- }
- }
- }
- }
- }
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {"number_occurences": 1}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
}
result = create_router_bgp(tgen, topo, input_dict_1)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
- result = verify_rib(tgen, addr_type, dut, input_dict_4,
- protocol=protocol)
+ tc_name, result
+ )
+ result = verify_rib(tgen, addr_type, dut, input_dict_4, protocol=protocol)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
write_test_footer(tc_name)
input_dict_4 = {
"r1": {
"static_routes": [
- {
- "network": NETWORK[addr_type],
- "next_hop": NEXT_HOP_IP[addr_type]
- }
+ {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
]
}
}
logger.info("Configure static routes")
result = create_static_routes(tgen, input_dict_4)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
step("configure redistribute static in Router BGP in R1")
"bgp": {
"address_family": {
addr_type: {
- "unicast": {
- "redistribute": [{
- "redist_type": "static"
- }]
- }
+ "unicast": {"redistribute": [{"redist_type": "static"}]}
}
}
}
}
result = create_router_bgp(tgen, topo, input_dict_2)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
- step('Configure allowas-in on R3 for R2 under IPv4 addr-family only')
+ step("Configure allowas-in on R3 for R2 under IPv4 addr-family only")
# Api call to enable allowas-in in bgp process.
input_dict_1 = {
"r3": {
- "bgp": {
- "address_family": {
- 'ipv4': {
- "unicast": {
- "neighbor": {
- "r2": {
- "dest_link": {
- "r3": {
- "allowas-in": {
- "number_occurences": 1
- }
- }
- }
- }
- }
- }
- }
- }
- }
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {"allowas-in": {"number_occurences": 1}}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
}
result = create_router_bgp(tgen, topo, input_dict_1)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
static_route_ipv4 = {
- "r1": {
- "static_routes": [
- {
- "network": NETWORK['ipv4'],
- "next_hop": NEXT_HOP_IP['ipv4']
- }
- ]
- }
+ "r1": {
+ "static_routes": [
+ {"network": NETWORK["ipv4"], "next_hop": NEXT_HOP_IP["ipv4"]}
+ ]
+ }
}
static_route_ipv6 = {
- "r1": {
- "static_routes": [
- {
- "network": NETWORK['ipv6'],
- "next_hop": NEXT_HOP_IP['ipv6']
- }
- ]
- }
+ "r1": {
+ "static_routes": [
+ {"network": NETWORK["ipv6"], "next_hop": NEXT_HOP_IP["ipv6"]}
+ ]
+ }
}
- step("We should see R1 advertised prefix only in IPv4 AFI "
- "not in IPv6 AFI.")
- result = verify_rib(tgen, 'ipv4', dut, static_route_ipv4,
- protocol=protocol)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
- result = verify_rib(tgen, 'ipv6', dut, static_route_ipv6,
- protocol=protocol, expected=False)
+ step("We should see R1 advertised prefix only in IPv4 AFI " "not in IPv6 AFI.")
+ result = verify_rib(tgen, "ipv4", dut, static_route_ipv4, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+ result = verify_rib(
+ tgen, "ipv6", dut, static_route_ipv6, protocol=protocol, expected=False
+ )
assert result is not True, "Testcase {} : Failed \n"
"Expected behavior: routes are should not be present in ipv6 rib\n"
" Error: {}".format(tc_name, result)
step("Repeat the same test for IPv6 AFI.")
- step('Configure allowas-in on R3 for R2 under IPv6 addr-family only')
+ step("Configure allowas-in on R3 for R2 under IPv6 addr-family only")
# Api call to enable allowas-in in bgp process.
input_dict_1 = {
"r3": {
- "bgp": {
- "address_family": {
- 'ipv4': {
- "unicast": {
- "neighbor": {
- "r2": {
- "dest_link": {
- "r3": {
- "allowas-in": {
- "number_occurences": 2,
- "delete": True
- }
- }
- }
- }
- }
- }
- },
- 'ipv6': {
- "unicast": {
- "neighbor": {
- "r2": {
- "dest_link": {
- "r3": {
- "allowas-in": {
- "number_occurences": 2
- }
- }
- }
- }
- }
- }
- }
- }
- }
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {
+ "number_occurences": 2,
+ "delete": True,
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "ipv6": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {"allowas-in": {"number_occurences": 2}}
+ }
+ }
+ }
+ }
+ },
+ }
+ }
}
}
result = create_router_bgp(tgen, topo, input_dict_1)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
- step("We should see R1 advertised prefix only in IPv6 AFI "
- "not in IPv4 AFI.")
- result = verify_rib(tgen, 'ipv4', dut, static_route_ipv4,
- protocol=protocol, expected=False)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+ step("We should see R1 advertised prefix only in IPv6 AFI " "not in IPv4 AFI.")
+ result = verify_rib(
+ tgen, "ipv4", dut, static_route_ipv4, protocol=protocol, expected=False
+ )
assert result is not True, "Testcase {} : Failed \n"
"Expected behavior: routes should not be present in ipv4 rib\n"
" Error: {}".format(tc_name, result)
- result = verify_rib(tgen, 'ipv6', dut, static_route_ipv6,
- protocol=protocol)
- assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ result = verify_rib(tgen, "ipv6", dut, static_route_ipv6, protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
write_test_footer(tc_name)
static_routes = {
"r1": {
"static_routes": [
- {
- "network": NETWORK[addr_type],
- "next_hop": NEXT_HOP_IP[addr_type]
- }
+ {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
]
}
}
logger.info("Configure static routes")
result = create_static_routes(tgen, static_routes)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
step("configure redistribute static in Router BGP in R1")
"bgp": {
"address_family": {
addr_type: {
- "unicast": {
- "redistribute": [{
- "redist_type": "static"
- }]
- }
+ "unicast": {"redistribute": [{"redist_type": "static"}]}
}
}
}
}
result = create_router_bgp(tgen, topo, input_dict_2)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
- step('Configure a route-map on R1 to prepend AS 4 times.')
+ step("Configure a route-map on R1 to prepend AS 4 times.")
for addr_type in ADDR_TYPES:
input_dict_4 = {
"r1": {
"route_maps": {
- "ASP_{}".format(addr_type): [{
- "action": "permit",
- "set": {
- "aspath": {
- "as_num": "200 200 200 200",
- "as_action": "prepend"
- }
+ "ASP_{}".format(addr_type): [
+ {
+ "action": "permit",
+ "set": {
+ "path": {
+ "as_num": "200 200 200 200",
+ "as_action": "prepend",
+ }
+ },
}
- }]
+ ]
}
}
}
result = create_route_maps(tgen, input_dict_4)
- assert result is True, 'Testcase {} : Failed \n Error: {}'.format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
step("configure route map in out direction on R1")
# Configure neighbor for route map
"r1": {
"route_maps": [
{
- "name":
- "ASP_{}".format(
- addr_type),
- "direction": "out"
+ "name": "ASP_{}".format(
+ addr_type
+ ),
+ "direction": "out",
}
]
}
result = create_router_bgp(tgen, topo, input_dict_7)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
for addr_type in ADDR_TYPES:
step('Configure "allowas-in 4" on R3 for R2.')
# Api call to enable allowas-in in bgp process.
input_dict_1 = {
"r3": {
- "bgp": {
- "address_family": {
- addr_type: {
- "unicast": {
- "neighbor": {
- "r2": {
- "dest_link": {
- "r3": {
- "allowas-in": {
- "number_occurences": 4
- }
- }
- }
- }
- }
- }
- }
- }
- }
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {"number_occurences": 4}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
}
result = create_router_bgp(tgen, topo, input_dict_1)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
- result = verify_rib(tgen, addr_type, dut, static_routes,
- protocol=protocol, expected=False)
+ tc_name, result
+ )
+ result = verify_rib(
+ tgen, addr_type, dut, static_routes, protocol=protocol, expected=False
+ )
assert result is not True, "Testcase {} : Failed \n "
"Expected behavior: routes are should not be present in rib\n"
- "Error: {}".format(
- tc_name, result)
+ "Error: {}".format(tc_name, result)
for addr_type in ADDR_TYPES:
step('Configure "allowas-in 5" on R3 for R2.')
input_dict_1 = {
"r3": {
- "bgp": {
- "address_family": {
- addr_type: {
- "unicast": {
- "neighbor": {
- "r2": {
- "dest_link": {
- "r3": {
- "allowas-in": {
- "number_occurences": 5
- }
- }
- }
- }
- }
- }
- }
- }
- }
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {"number_occurences": 5}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
}
result = create_router_bgp(tgen, topo, input_dict_1)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
static_routes = {
"r1": {
"static_routes": [
- {
- "network": NETWORK[addr_type],
- "next_hop": NEXT_HOP_IP[addr_type]
- }
+ {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
]
}
}
- result = verify_rib(tgen, addr_type, dut, static_routes,
- protocol=protocol)
+ result = verify_rib(tgen, addr_type, dut, static_routes, protocol=protocol)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
write_test_footer(tc_name)
static_routes = {
"r1": {
"static_routes": [
- {
- "network": NETWORK[addr_type],
- "next_hop": NEXT_HOP_IP[addr_type]
- }
+ {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
]
}
}
logger.info("Configure static routes")
result = create_static_routes(tgen, static_routes)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
step("configure redistribute static in Router BGP in R1")
"bgp": {
"address_family": {
addr_type: {
- "unicast": {
- "redistribute": [{
- "redist_type": "static"
- }]
- }
+ "unicast": {"redistribute": [{"redist_type": "static"}]}
}
}
}
}
result = create_router_bgp(tgen, topo, input_dict_2)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
- step('Configure a route-map on R2 to prepend AS 2 times.')
+ step("Configure a route-map on R2 to prepend AS 2 times.")
for addr_type in ADDR_TYPES:
input_dict_4 = {
"r2": {
"route_maps": {
- "ASP_{}".format(addr_type): [{
- "action": "permit",
- "set": {
- "aspath": {
- "as_num": "200 200",
- "as_action": "prepend"
- }
+ "ASP_{}".format(addr_type): [
+ {
+ "action": "permit",
+ "set": {
+ "path": {"as_num": "200 200", "as_action": "prepend"}
+ },
}
- }]
+ ]
}
}
}
result = create_route_maps(tgen, input_dict_4)
- assert result is True, 'Testcase {} : Failed \n Error: {}'.format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
step("configure route map in out direction on R2")
# Configure neighbor for route map
"r2": {
"route_maps": [
{
- "name":
- "ASP_{}".format(
- addr_type),
- "direction": "out"}
+ "name": "ASP_{}".format(
+ addr_type
+ ),
+ "direction": "out",
+ }
]
}
}
result = create_router_bgp(tgen, topo, input_dict_7)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
step('Configure "allowas-in 3" on R3 for R1.')
input_dict_1 = {
"r3": {
- "bgp": {
- "address_family": {
- addr_type: {
- "unicast": {
- "neighbor": {
- "r2": {
- "dest_link": {
- "r3": {
- "allowas-in": {
- "number_occurences": 3
- }
- }
- }
- }
- }
- }
- }
- }
- }
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {"number_occurences": 3}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
}
result = create_router_bgp(tgen, topo, input_dict_1)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
input_dict_1 = {
"r4": {
- "bgp": {
- "address_family": {
- addr_type: {
- "unicast": {
- "neighbor": {
- "r3": {
- "dest_link": {
- "r4": {
- "allowas-in": {
- "number_occurences": 3
- }
- }
- }
- }
- }
- }
- }
- }
- }
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link": {
+ "r4": {
+ "allowas-in": {"number_occurences": 3}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
}
result = create_router_bgp(tgen, topo, input_dict_1)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
static_routes = {
"r1": {
"static_routes": [
- {
- "network": NETWORK[addr_type],
- "next_hop": NEXT_HOP_IP[addr_type]
- }
+ {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
]
}
}
dut = "r4"
- aspath = "100 200 200 200"
- result = verify_bgp_rib(tgen, addr_type, dut, static_routes,
- aspath=aspath)
+ path = "100 200 200 200"
+ result = verify_bgp_rib(tgen, addr_type, dut, static_routes, aspath=path)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
write_test_footer(tc_name)
static_routes = {
"r1": {
"static_routes": [
- {
- "network": NETWORK[addr_type],
- "next_hop": NEXT_HOP_IP[addr_type]
- }
+ {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
]
}
}
logger.info("Configure static routes")
result = create_static_routes(tgen, static_routes)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
step("configure redistribute static in Router BGP in R1")
"bgp": {
"address_family": {
addr_type: {
- "unicast": {
- "redistribute": [{
- "redist_type": "static"
- }]
- }
+ "unicast": {"redistribute": [{"redist_type": "static"}]}
}
}
}
}
result = create_router_bgp(tgen, topo, input_dict_2)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
- step('Configure a route-map on R2 to prepend AS 2 times.')
+ step("Configure a route-map on R2 to prepend AS 2 times.")
for addr_type in ADDR_TYPES:
input_dict_4 = {
"r2": {
"route_maps": {
- "ASP_{}".format(addr_type): [{
- "action": "permit",
- "set": {
- "aspath": {
- "as_num": "200 200",
- "as_action": "prepend"
- }
+ "ASP_{}".format(addr_type): [
+ {
+ "action": "permit",
+ "set": {
+ "path": {"as_num": "200 200", "as_action": "prepend"}
+ },
}
- }]
+ ]
}
}
}
result = create_route_maps(tgen, input_dict_4)
- assert result is True, 'Testcase {} : Failed \n Error: {}'.format(
- tc_name, result)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result
+ )
step("configure route map in out direction on R2")
# Configure neighbor for route map
"r2": {
"route_maps": [
{
- "name":
- "ASP_{}".format(
- addr_type),
- "direction": "out"}
+ "name": "ASP_{}".format(
+ addr_type
+ ),
+ "direction": "out",
+ }
]
}
}
result = create_router_bgp(tgen, topo, input_dict_7)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
for addr_type in ADDR_TYPES:
step('Configure "allowas-in 3" on R3 for R1.')
input_dict_1 = {
"r3": {
- "bgp": {
- "address_family": {
- addr_type: {
- "unicast": {
- "neighbor": {
- "r2": {
- "dest_link": {
- "r3": {
- "allowas-in": {
- "number_occurences": 3
- }
- }
- }
- }
- }
- }
- }
- }
- }
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {"number_occurences": 3}
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
}
result = create_router_bgp(tgen, topo, input_dict_1)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
static_routes = {
"r1": {
"static_routes": [
- {
- "network": NETWORK[addr_type],
- "next_hop": NEXT_HOP_IP[addr_type]
- }
+ {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
]
}
}
dut = "r5"
- aspath = "200 100 200 200 200"
- result = verify_bgp_rib(tgen, addr_type, dut, static_routes,
- aspath=aspath)
+ path = "200 100 200 200 200"
+ result = verify_bgp_rib(tgen, addr_type, dut, static_routes, aspath=path)
assert result is True, "Testcase {} : Failed \n Error: {}".format(
- tc_name, result)
+ tc_name, result
+ )
write_test_footer(tc_name)
logger.debug("Exiting lib API: verify_bgp_rib()")
return True
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
+ """
+ This API is to verify whether bgp rib has any
+ matching route for a nexthop.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: input dut router name
+ * `addr_type` : ip type ipv4/ipv6
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default = static
+ * 'aspath'[optional]: aspath which needs to be verified
+
+ Usage
+ -----
+ dut = 'r1'
+ next_hop = "192.168.1.10"
+ input_dict = topo['routers']
+ aspath = "100 200 300"
+ result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
+ next_hop, aspath)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_bgp_rib()")
+
+ router_list = tgen.routers()
+ additional_nexthops_in_required_nhs = []
+ list1 = []
+ list2 = []
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ # Verifying RIB routes
+ command = "show bgp"
+
+ # Static routes
+ sleep(2)
+ logger.info("Checking router {} BGP RIB:".format(dut))
+
+ if "static_routes" in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+
+ for static_route in static_routes:
+ found_routes = []
+ missing_routes = []
+ st_found = False
+ nh_found = False
+ vrf = static_route.setdefault("vrf", None)
+ if vrf:
+ cmd = "{} vrf {} {}".format(command, vrf, addr_type)
+
+ else:
+ cmd = "{} {}".format(command, addr_type)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) == False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ network = static_route["network"]
+
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json["routes"]:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if not isinstance(next_hop, list):
+ next_hop = [next_hop]
+ list1 = next_hop
+ found_hops = [
+ rib_r["ip"]
+ for rib_r in rib_routes_json["routes"][st_rt][0][
+ "nexthops"
+ ]
+ ]
+ list2 = found_hops
+ missing_list_of_nexthops = set(list2).difference(list1)
+ additional_nexthops_in_required_nhs = set(
+ list1
+ ).difference(list2)
+
+ if list2:
+ if additional_nexthops_in_required_nhs:
+ logger.info(
+ "Missing nexthop %s for route"
+ " %s in RIB of router %s\n",
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ errormsg = (
+ "Nexthop {} is Missing for "
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt,
+ dut,
+ )
+ )
+ return errormsg
+ else:
+ nh_found = True
+ if aspath:
+ found_paths = rib_routes_json["routes"][st_rt][0][
+ "path"
+ ]
+ if aspath == found_paths:
+ aspath_found = True
+ logger.info(
+ "Found AS path {} for route"
+ " {} in RIB of router "
+ "{}\n".format(aspath, st_rt, dut)
+ )
+ else:
+ errormsg = (
+ "AS Path {} is missing for route"
+ "for route {} in RIB of router {}\n".format(
+ aspath, st_rt, dut
+ )
+ )
+ return errormsg
+
+ else:
+ missing_routes.append(st_rt)
+
+ if nh_found:
+ logger.info(
+ "Found next_hop {} for all bgp"
+ " routes in RIB of"
+ " router {}\n".format(next_hop, router)
+ )
+
+ if len(missing_routes) > 0:
+ errormsg = (
+ "Missing route in RIB of router {}, "
+ "routes: {}\n".format(dut, missing_routes)
+ )
+ return errormsg
+
+ if found_routes:
+ logger.info(
+ "Verified routes in router {} BGP RIB, "
+ "found routes are: {} \n".format(dut, found_routes)
+ )
+ continue
+
+ if "bgp" not in input_dict[routerInput]:
+ continue
+
+ # Advertise networks
+ bgp_data_list = input_dict[routerInput]["bgp"]
+
+ if type(bgp_data_list) is not list:
+ bgp_data_list = [bgp_data_list]
+
+ for bgp_data in bgp_data_list:
+ vrf_id = bgp_data.setdefault("vrf", None)
+ if vrf_id:
+ cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
+ else:
+ cmd = "{} {}".format(command, addr_type)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) == False:
+ errormsg = "No route found in rib of router {}..".format(router)
+ return errormsg
+
+ bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
+ advertise_network = bgp_net_advertise.setdefault(
+ "advertise_networks", []
+ )
+
+ for advertise_network_dict in advertise_network:
+ found_routes = []
+ missing_routes = []
+ found = False
+
+ network = advertise_network_dict["network"]
+
+ if "no_of_network" in advertise_network_dict:
+ no_of_network = advertise_network_dict["no_of_network"]
+ else:
+ no_of_network = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_network)
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json["routes"]:
+ found = True
+ found_routes.append(st_rt)
+ else:
+ found = False
+ missing_routes.append(st_rt)
+
+ if len(missing_routes) > 0:
+ errormsg = (
+ "Missing route in BGP RIB of router {},"
+ " are: {}\n".format(dut, missing_routes)
+ )
+ return errormsg
+
+ if found_routes:
+ logger.info(
+ "Verified routes in router {} BGP RIB, found "
+ "routes are: {}\n".format(dut, found_routes)
+ )
+
+ logger.debug("Exiting lib API: verify_bgp_rib()")
+ return True