--- /dev/null
+#!/usr/bin/python
+
+#
+# Copyright (c) 2020 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation,
+# Inc. ("NetDEF") in this file.
+#
+# Permission to use, copy, modify, and/or distribute this software
+# for any purpose with or without fee is hereby granted, provided
+# that the above copyright notice and this permission notice appear
+# in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
+# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
+# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
+# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
+# OF THIS SOFTWARE.
+#
+
+"""
+Following tests are covered to test bgp allowas-in functionality:
+
+- Verify that routes coming from same AS are accepted only when
+ '"allowas-in" is configuerd.
+- Verify that "allowas-in" feature works per address-family/VRF
+ 'basis and doesn't impact the other AFIs.
+- Verify that the if number of occurrences of AS number in path is
+ 'more than the configured allowas-in value then we do not accept
+ 'the route.
+- Verify that when we advertise a network, learned from the same AS
+ 'via allowas-in command, to an iBGP neighbor we see multiple
+ 'occurrences.
+- Verify that when we advertise a network, learned from the same AS
+ 'via allowas-in command, to an eBGP neighbor we see multiple
+ 'occurrences of our own AS based on configured value+1.
+"""
+
+import os
+import sys
+import time
+import json
+import pytest
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CWD, '../'))
+sys.path.append(os.path.join(CWD, '../lib/'))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from mininet.topo import Topo
+from lib.topogen import Topogen, get_topogen
+
+# Import topoJson from lib, to create topology and initial configuration
+from lib.common_config import (
+ start_topology, write_test_header,
+ write_test_footer, reset_config_on_routers,
+ verify_rib, create_static_routes,
+ create_route_maps, check_address_types, step,
+)
+from lib.topolog import logger
+from lib.bgp import (
+ verify_bgp_convergence, create_router_bgp,
+ clear_bgp_and_verify, verify_bgp_rib
+)
+from lib.topojson import build_topo_from_json, build_config_from_json
+
+# Reading the data from JSON File for topology creation
+jsonFile = "{}/bgp_as_allow_in.json".format(CWD)
+try:
+ with open(jsonFile, 'r') as topoJson:
+ topo = json.load(topoJson)
+except IOError:
+ assert False, "Could not read file {}".format(jsonFile)
+
+# Global variables
+BGP_CONVERGENCE = False
+ADDR_TYPES = check_address_types()
+NETWORK = {"ipv4": "2.2.2.2/32", "ipv6": "22:22::2/128"}
+NEXT_HOP_IP = {"ipv4": "Null0", "ipv6": "Null0"}
+
+
+class BGPALLOWASIN(Topo):
+ """
+ Test BGPALLOWASIN - topology 1
+
+ * `Topo`: Topology object
+ """
+
+ def build(self, *_args, **_opts):
+ """Build function"""
+ tgen = get_topogen(self)
+
+ # Building topology from json file
+ build_topo_from_json(tgen, topo)
+
+
+def setup_module(mod):
+ """
+ Sets up the pytest environment
+
+ * `mod`: module name
+ """
+
+ testsuite_run_time = time.asctime(time.localtime(time.time()))
+ logger.info("Testsuite start time: {}".format(testsuite_run_time))
+ logger.info("="*40)
+
+ logger.info("Running setup_module to create topology")
+
+ # This function initiates the topology build with Topogen...
+ tgen = Topogen(BGPALLOWASIN, mod.__name__)
+ # ... and here it calls Mininet initialization functions.
+
+ # Starting topology, create tmp files which are loaded to routers
+ # to start deamons and then start routers
+ start_topology(tgen)
+
+ # Creating configuration from JSON
+ build_config_from_json(tgen, topo)
+
+ # Checking BGP convergence
+ global BGP_CONVERGENCE
+ global ADDR_TYPES
+
+ # Api call verify whether BGP is converged
+ BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+ assert BGP_CONVERGENCE is True, ("setup_module :Failed \n Error:"
+ " {}".format(BGP_CONVERGENCE))
+
+ logger.info("Running setup_module() done")
+
+
+def teardown_module(mod):
+ """
+ Teardown the pytest environment
+
+ * `mod`: module name
+ """
+
+ logger.info("Running teardown_module to delete topology")
+
+ tgen = get_topogen()
+
+ # Stop toplogy and Remove tmp files
+ tgen.stop_topology()
+
+ logger.info("Testsuite end time: {}".
+ format(time.asctime(time.localtime(time.time()))))
+ logger.info("="*40)
+
+
+#####################################################
+#
+# Tests starting
+#
+#####################################################
+
+def test_bgp_allowas_in_p0(request):
+ """
+ Verify that routes coming from same AS are accepted only when
+ "allowas-in" is configuerd.
+
+ """
+
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ reset_config_on_routers(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ step("Advertise prefix 2.2.2.2/32 from Router-1(AS-200).")
+ step("Advertise an ipv6 prefix 22:22::2/128 from Router-1(AS-200).")
+ # configure static routes
+ dut = "r3"
+ protocol = "bgp"
+
+ for addr_type in ADDR_TYPES:
+ # Enable static routes
+ input_dict_4 = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": NETWORK[addr_type],
+ "next_hop": NEXT_HOP_IP[addr_type]
+ }
+ ]
+ }
+ }
+
+ logger.info("Configure static routes")
+ result = create_static_routes(tgen, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ step("configure redistribute static in Router BGP in R1")
+
+ input_dict_2 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "redistribute": [{
+ "redist_type": "static"
+ }]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+ step('Check BGP table of router R3 using "sh bgp ipv4" and "sh bgp '
+ 'ipv6" command.')
+ step("We should not see prefix advertised from R1 in R3's BGP "
+ "table without allowas-in.")
+ logger.info("Verifying %s routes on r3, route should not be present",
+ addr_type)
+ result = verify_rib(tgen, addr_type, dut, input_dict_4,
+ next_hop=NEXT_HOP_IP[addr_type],
+ protocol=protocol, expected=False)
+ assert result is not True, "Testcase {} : Failed \n"
+ "Expected behavior: routes should not present in rib \n"
+ "Error: {}".format(tc_name, result)
+
+ step('Configure allowas-in on R3 for R2.')
+ step("We should see the prefix advertised from R1 in R3's BGP table.")
+ # Api call to enable allowas-in in bgp process.
+ input_dict_1 = {
+ "r3": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {
+ "number_occurences": 1
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+ result = verify_rib(tgen, addr_type, dut, input_dict_4,
+ protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_bgp_allowas_in_per_addr_family_p0(request):
+ """
+ Verify that "allowas-in" feature works per address-family/VRF
+ basis and doesn't impact the other AFIs.
+
+ """
+
+ # This test is applicable only for dual stack.
+ if "ipv4" not in ADDR_TYPES or "ipv6" not in ADDR_TYPES:
+ pytest.skip("NOT APPLICABLE")
+
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ reset_config_on_routers(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ step("Advertise prefix 2.2.2.2/32 from Router-1(AS-200).")
+ step("Advertise an ipv6 prefix 22:22::2/128 from Router-1(AS-200).")
+ # configure static routes routes
+ dut = "r3"
+ protocol = "bgp"
+
+ for addr_type in ADDR_TYPES:
+ # Enable static routes
+ input_dict_4 = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": NETWORK[addr_type],
+ "next_hop": NEXT_HOP_IP[addr_type]
+ }
+ ]
+ }
+ }
+
+ logger.info("Configure static routes")
+ result = create_static_routes(tgen, input_dict_4)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ step("configure redistribute static in Router BGP in R1")
+
+ input_dict_2 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "redistribute": [{
+ "redist_type": "static"
+ }]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ step('Configure allowas-in on R3 for R2 under IPv4 addr-family only')
+ # Api call to enable allowas-in in bgp process.
+ input_dict_1 = {
+ "r3": {
+ "bgp": {
+ "address_family": {
+ 'ipv4': {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {
+ "number_occurences": 1
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ static_route_ipv4 = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": NETWORK['ipv4'],
+ "next_hop": NEXT_HOP_IP['ipv4']
+ }
+ ]
+ }
+ }
+
+ static_route_ipv6 = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": NETWORK['ipv6'],
+ "next_hop": NEXT_HOP_IP['ipv6']
+ }
+ ]
+ }
+ }
+ step("We should see R1 advertised prefix only in IPv4 AFI "
+ "not in IPv6 AFI.")
+ result = verify_rib(tgen, 'ipv4', dut, static_route_ipv4,
+ protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+ result = verify_rib(tgen, 'ipv6', dut, static_route_ipv6,
+ protocol=protocol, expected=False)
+ assert result is not True, "Testcase {} : Failed \n"
+ "Expected behavior: routes are should not be present in ipv6 rib\n"
+ " Error: {}".format(tc_name, result)
+
+ step("Repeat the same test for IPv6 AFI.")
+ step('Configure allowas-in on R3 for R2 under IPv6 addr-family only')
+ # Api call to enable allowas-in in bgp process.
+ input_dict_1 = {
+ "r3": {
+ "bgp": {
+ "address_family": {
+ 'ipv4': {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {
+ "number_occurences": 2,
+ "delete": True
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ 'ipv6': {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {
+ "number_occurences": 2
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+ step("We should see R1 advertised prefix only in IPv6 AFI "
+ "not in IPv4 AFI.")
+ result = verify_rib(tgen, 'ipv4', dut, static_route_ipv4,
+ protocol=protocol, expected=False)
+ assert result is not True, "Testcase {} : Failed \n"
+ "Expected behavior: routes should not be present in ipv4 rib\n"
+ " Error: {}".format(tc_name, result)
+ result = verify_rib(tgen, 'ipv6', dut, static_route_ipv6,
+ protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_bgp_allowas_in_no_of_occurrences_p0(request):
+ """
+ Verify that the if number of occurrences of AS number in path is
+ more than the configured allowas-in value then we do not accept
+ the route.
+
+ """
+
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ reset_config_on_routers(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ dut = "r3"
+ protocol = "bgp"
+
+ for addr_type in ADDR_TYPES:
+ # Enable static routes
+ static_routes = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": NETWORK[addr_type],
+ "next_hop": NEXT_HOP_IP[addr_type]
+ }
+ ]
+ }
+ }
+
+ logger.info("Configure static routes")
+ result = create_static_routes(tgen, static_routes)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ step("configure redistribute static in Router BGP in R1")
+
+ input_dict_2 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "redistribute": [{
+ "redist_type": "static"
+ }]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ step('Configure a route-map on R1 to prepend AS 4 times.')
+ for addr_type in ADDR_TYPES:
+ input_dict_4 = {
+ "r1": {
+ "route_maps": {
+ "ASP_{}".format(addr_type): [{
+ "action": "permit",
+ "set": {
+ "aspath": {
+ "as_num": "200 200 200 200",
+ "as_action": "prepend"
+ }
+ }
+ }]
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_4)
+ assert result is True, 'Testcase {} : Failed \n Error: {}'.format(
+ tc_name, result)
+
+ step("configure route map in out direction on R1")
+ # Configure neighbor for route map
+ input_dict_7 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r1": {
+ "route_maps": [
+ {
+ "name":
+ "ASP_{}".format(
+ addr_type),
+ "direction": "out"
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict_7)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ for addr_type in ADDR_TYPES:
+ step('Configure "allowas-in 4" on R3 for R2.')
+ # Api call to enable allowas-in in bgp process.
+ input_dict_1 = {
+ "r3": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {
+ "number_occurences": 4
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+ result = verify_rib(tgen, addr_type, dut, static_routes,
+ protocol=protocol, expected=False)
+ assert result is not True, "Testcase {} : Failed \n "
+ "Expected behavior: routes are should not be present in rib\n"
+ "Error: {}".format(
+ tc_name, result)
+
+ for addr_type in ADDR_TYPES:
+ step('Configure "allowas-in 5" on R3 for R2.')
+ input_dict_1 = {
+ "r3": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {
+ "number_occurences": 5
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+ static_routes = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": NETWORK[addr_type],
+ "next_hop": NEXT_HOP_IP[addr_type]
+ }
+ ]
+ }
+ }
+ result = verify_rib(tgen, addr_type, dut, static_routes,
+ protocol=protocol)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_bgp_allowas_in_sameastoibgp_p1(request):
+ """
+ Verify that when we advertise a network, learned from the same AS
+ via allowas-in command, to an iBGP neighbor we see multiple
+ occurrences.
+
+ """
+
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ reset_config_on_routers(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ dut = "r3"
+ protocol = "bgp"
+
+ for addr_type in ADDR_TYPES:
+ # Enable static routes
+ static_routes = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": NETWORK[addr_type],
+ "next_hop": NEXT_HOP_IP[addr_type]
+ }
+ ]
+ }
+ }
+
+ logger.info("Configure static routes")
+ result = create_static_routes(tgen, static_routes)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ step("configure redistribute static in Router BGP in R1")
+
+ input_dict_2 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "redistribute": [{
+ "redist_type": "static"
+ }]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ step('Configure a route-map on R2 to prepend AS 2 times.')
+ for addr_type in ADDR_TYPES:
+ input_dict_4 = {
+ "r2": {
+ "route_maps": {
+ "ASP_{}".format(addr_type): [{
+ "action": "permit",
+ "set": {
+ "aspath": {
+ "as_num": "200 200",
+ "as_action": "prepend"
+ }
+ }
+ }]
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_4)
+ assert result is True, 'Testcase {} : Failed \n Error: {}'.format(
+ tc_name, result)
+
+ step("configure route map in out direction on R2")
+ # Configure neighbor for route map
+ input_dict_7 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link": {
+ "r2": {
+ "route_maps": [
+ {
+ "name":
+ "ASP_{}".format(
+ addr_type),
+ "direction": "out"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict_7)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ step('Configure "allowas-in 3" on R3 for R1.')
+ input_dict_1 = {
+ "r3": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {
+ "number_occurences": 3
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ input_dict_1 = {
+ "r4": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link": {
+ "r4": {
+ "allowas-in": {
+ "number_occurences": 3
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ static_routes = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": NETWORK[addr_type],
+ "next_hop": NEXT_HOP_IP[addr_type]
+ }
+ ]
+ }
+ }
+ dut = "r4"
+ aspath = "100 200 200 200"
+ result = verify_bgp_rib(tgen, addr_type, dut, static_routes,
+ aspath=aspath)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_bgp_allowas_in_sameastoebgp_p1(request):
+ """
+ Verify that when we advertise a network, learned from the same AS
+ via allowas-in command, to an eBGP neighbor we see multiple
+ occurrences of our own AS based on configured value+1.
+
+ """
+
+ tc_name = request.node.name
+ write_test_header(tc_name)
+ tgen = get_topogen()
+ reset_config_on_routers(tgen)
+
+ # Don't run this test if we have any failure.
+ if tgen.routers_have_failure():
+ pytest.skip(tgen.errors)
+
+ dut = "r3"
+ protocol = "bgp"
+
+ for addr_type in ADDR_TYPES:
+ # Enable static routes
+ static_routes = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": NETWORK[addr_type],
+ "next_hop": NEXT_HOP_IP[addr_type]
+ }
+ ]
+ }
+ }
+
+ logger.info("Configure static routes")
+ result = create_static_routes(tgen, static_routes)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ step("configure redistribute static in Router BGP in R1")
+
+ input_dict_2 = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "redistribute": [{
+ "redist_type": "static"
+ }]
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_2)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ step('Configure a route-map on R2 to prepend AS 2 times.')
+ for addr_type in ADDR_TYPES:
+ input_dict_4 = {
+ "r2": {
+ "route_maps": {
+ "ASP_{}".format(addr_type): [{
+ "action": "permit",
+ "set": {
+ "aspath": {
+ "as_num": "200 200",
+ "as_action": "prepend"
+ }
+ }
+ }]
+ }
+ }
+ }
+ result = create_route_maps(tgen, input_dict_4)
+ assert result is True, 'Testcase {} : Failed \n Error: {}'.format(
+ tc_name, result)
+
+ step("configure route map in out direction on R2")
+ # Configure neighbor for route map
+ input_dict_7 = {
+ "r2": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r3": {
+ "dest_link": {
+ "r2": {
+ "route_maps": [
+ {
+ "name":
+ "ASP_{}".format(
+ addr_type),
+ "direction": "out"}
+ ]
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ result = create_router_bgp(tgen, topo, input_dict_7)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+
+ for addr_type in ADDR_TYPES:
+ step('Configure "allowas-in 3" on R3 for R1.')
+ input_dict_1 = {
+ "r3": {
+ "bgp": {
+ "address_family": {
+ addr_type: {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link": {
+ "r3": {
+ "allowas-in": {
+ "number_occurences": 3
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, input_dict_1)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+ static_routes = {
+ "r1": {
+ "static_routes": [
+ {
+ "network": NETWORK[addr_type],
+ "next_hop": NEXT_HOP_IP[addr_type]
+ }
+ ]
+ }
+ }
+ dut = "r5"
+ aspath = "200 100 200 200 200"
+ result = verify_bgp_rib(tgen, addr_type, dut, static_routes,
+ aspath=aspath)
+ assert result is True, "Testcase {} : Failed \n Error: {}".format(
+ tc_name, result)
+ write_test_footer(tc_name)
+
+
+if __name__ == "__main__":
+ args = ["-s"] + sys.argv[1:]
+ sys.exit(pytest.main(args))
from lib.topolog import logger
# Import common_config to use commomnly used APIs
-from lib.common_config import (
- create_common_configuration,
- InvalidCLIError,
- load_config_to_router,
- check_address_types,
- generate_ips,
- find_interface_with_greater_ip,
- run_frr_cmd,
- retry,
-)
+from lib.common_config import (create_common_configuration,
+ InvalidCLIError,
+ load_config_to_router,
+ check_address_types,
+ generate_ips,
+ validate_ip_address,
+ find_interface_with_greater_ip,
+ run_frr_cmd, retry)
BGP_CONVERGENCE_TIMEOUT = 10
"holddowntimer": 180,
"dest_link": {
"r4": {
+ "allowas-in": {
+ "number_occurences":2
+ },
"prefix_lists": [
{
"name": "pf_list_1",
prefix_lists = peer.setdefault("prefix_lists", {})
route_maps = peer.setdefault("route_maps", {})
no_send_community = peer.setdefault("no_send_community", None)
+ allowas_in = peer.setdefault("allowas-in", None)
# next-hop-self
if next_hop_self:
"no {} send-community {}".format(neigh_cxt, no_send_community)
)
+ if "allowas_in" in peer:
+ allow_as_in = peer["allowas_in"]
+ config_data.append("{} allowas-in {}".format(neigh_cxt,
+ allow_as_in))
+
+ if "no_allowas_in" in peer:
+ allow_as_in = peer["no_allowas_in"]
+ config_data.append("no {} allowas-in {}".format(neigh_cxt,
+ allow_as_in))
if prefix_lists:
for prefix_list in prefix_lists:
name = prefix_list.setdefault("name", {})
cmd = "no {}".format(cmd)
config_data.append(cmd)
+ if allowas_in:
+ number_occurences = allowas_in.\
+ setdefault("number_occurences", {})
+ del_action = allowas_in.setdefault("delete", False)
+
+ cmd = "{} allowas-in {}".format(neigh_cxt,
+ number_occurences)
+
+ if del_action:
+ cmd = "no {}".format(cmd)
+
+ config_data.append(cmd)
+
return config_data
)
return errormsg
- logger.info(peer_uptime_before_clear_bgp)
# Clearing BGP
logger.info("Clearing BGP neighborship for router %s..", router)
for addr_type in bgp_addr_type.keys():
logger.info("Exiting lib API: verify_best_path_as_per_admin_distance()")
return True
+
+
+@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
+def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
+ """
+ This API is to verify whether bgp rib has any
+ matching route for a nexthop.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `dut`: input dut router name
+ * `addr_type` : ip type ipv4/ipv6
+ * `input_dict` : input dict, has details of static routes
+ * `next_hop`[optional]: next_hop which needs to be verified,
+ default = static
+ * 'aspath'[optional]: aspath which needs to be verified
+
+ Usage
+ -----
+ dut = 'r1'
+ next_hop = "192.168.1.10"
+ input_dict = topo['routers']
+ aspath = "100 200 300"
+ result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
+ next_hop, aspath)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_bgp_rib()")
+
+ router_list = tgen.routers()
+ additional_nexthops_in_required_nhs = []
+ list1 = []
+ list2 = []
+ for routerInput in input_dict.keys():
+ for router, rnode in router_list.iteritems():
+ if router != dut:
+ continue
+
+ # Verifying RIB routes
+ command = "show bgp"
+
+ # Static routes
+ sleep(2)
+ logger.info('Checking router {} BGP RIB:'.format(dut))
+
+ if 'static_routes' in input_dict[routerInput]:
+ static_routes = input_dict[routerInput]["static_routes"]
+
+ for static_route in static_routes:
+ found_routes = []
+ missing_routes = []
+ st_found = False
+ nh_found = False
+ vrf = static_route.setdefault("vrf", None)
+ if vrf:
+ cmd = "{} vrf {} {}".\
+ format(command, vrf, addr_type)
+
+ else:
+ cmd = "{} {}".\
+ format(command, addr_type)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) == False:
+ errormsg = "No route found in rib of router {}..". \
+ format(router)
+ return errormsg
+
+ network = static_route["network"]
+
+ if "no_of_ip" in static_route:
+ no_of_ip = static_route["no_of_ip"]
+ else:
+ no_of_ip = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_ip)
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json["routes"]:
+ st_found = True
+ found_routes.append(st_rt)
+
+ if next_hop:
+ if not isinstance(next_hop, list):
+ next_hop = [next_hop]
+ list1 = next_hop
+ found_hops = [rib_r["ip"] for rib_r in
+ rib_routes_json["routes"][
+ st_rt][0]["nexthops"]]
+ list2 = found_hops
+ missing_list_of_nexthops = \
+ set(list2).difference(list1)
+ additional_nexthops_in_required_nhs = \
+ set(list1).difference(list2)
+
+ if list2:
+ if additional_nexthops_in_required_nhs:
+ logger.info("Missing nexthop %s for route"\
+ " %s in RIB of router %s\n", \
+ additional_nexthops_in_required_nhs, \
+ st_rt, dut)
+ errormsg=("Nexthop {} is Missing for "\
+ "route {} in RIB of router {}\n".format(
+ additional_nexthops_in_required_nhs,
+ st_rt, dut))
+ return errormsg
+ else:
+ nh_found = True
+ if aspath:
+ found_paths = rib_routes_json["routes"][
+ st_rt][0]["path"]
+ if aspath == found_paths:
+ aspath_found = True
+ logger.info("Found AS path {} for route" \
+ " {} in RIB of router "\
+ "{}\n".format(aspath, st_rt, dut))
+ else:
+ errormsg=("AS Path {} is missing for route"\
+ "for route {} in RIB of router {}\n"\
+ .format(aspath, st_rt, dut))
+ return errormsg
+
+ else:
+ missing_routes.append(st_rt)
+
+ if nh_found:
+ logger.info("Found next_hop {} for all bgp"
+ " routes in RIB of"
+ " router {}\n".format(next_hop, \
+ router))
+
+ if len(missing_routes) > 0:
+ errormsg = ("Missing route in RIB of router {}, "
+ "routes: {}\n".format(dut, missing_routes))
+ return errormsg
+
+ if found_routes:
+ logger.info("Verified routes in router {} BGP RIB, "
+ "found routes are: {} \n".\
+ format(dut, found_routes))
+ continue
+
+ if "bgp" not in input_dict[routerInput]:
+ continue
+
+ # Advertise networks
+ bgp_data_list = input_dict[routerInput]["bgp"]
+
+ if type(bgp_data_list) is not list:
+ bgp_data_list = [bgp_data_list]
+
+ for bgp_data in bgp_data_list:
+ vrf_id = bgp_data.setdefault("vrf", None)
+ if vrf_id:
+ cmd = "{} vrf {} {}".\
+ format(command, vrf_id, addr_type)
+ else:
+ cmd = "{} {}".\
+ format(command, addr_type)
+
+ cmd = "{} json".format(cmd)
+
+ rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+
+ # Verifying output dictionary rib_routes_json is not empty
+ if bool(rib_routes_json) == False:
+ errormsg = "No route found in rib of router {}..". \
+ format(router)
+ return errormsg
+
+ bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
+ advertise_network = \
+ bgp_net_advertise.setdefault("advertise_networks", [])
+
+ for advertise_network_dict in advertise_network:
+ found_routes = []
+ missing_routes = []
+ found = False
+
+ network = advertise_network_dict['network']
+
+ if 'no_of_network' in advertise_network_dict:
+ no_of_network = advertise_network_dict[
+ 'no_of_network']
+ else:
+ no_of_network = 1
+
+ # Generating IPs for verification
+ ip_list = generate_ips(network, no_of_network)
+
+ for st_rt in ip_list:
+ st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
+
+ _addr_type = validate_ip_address(st_rt)
+ if _addr_type != addr_type:
+ continue
+
+ if st_rt in rib_routes_json["routes"]:
+ found = True
+ found_routes.append(st_rt)
+ else:
+ found = False
+ missing_routes.append(st_rt)
+
+ if len(missing_routes) > 0:
+ errormsg = ("Missing route in BGP RIB of router {},"
+ " are: {}\n".format(dut, missing_routes))
+ return errormsg
+
+ if found_routes:
+ logger.info("Verified routes in router {} BGP RIB, found "
+ "routes are: {}\n".format(dut, found_routes))
+
+ logger.debug("Exiting lib API: verify_bgp_rib()")
+ return True