summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xtests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py106
-rw-r--r--tests/topotests/lib/bgp.py562
-rw-r--r--tests/topotests/lib/common_config.py83
3 files changed, 749 insertions, 2 deletions
diff --git a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
index 5f97225914..040b2f4a43 100755
--- a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
+++ b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py
@@ -53,11 +53,13 @@ from mininet.topo import Topo
from lib.common_config import (
start_topology, stop_topology, write_test_header,
- write_test_footer
+ write_test_footer, reset_config_on_routers
)
from lib.topolog import logger
from lib.bgp import (
- verify_bgp_convergence, create_router_bgp, verify_router_id
+ verify_bgp_convergence, create_router_bgp, verify_router_id,
+ modify_as_number, verify_as_numbers, clear_bgp_and_verify,
+ verify_bgp_timers_and_functionality
)
from lib.topojson import build_topo_from_json, build_config_from_json
@@ -209,6 +211,106 @@ def test_modify_and_delete_router_id(request):
write_test_footer(tc_name)
+def test_bgp_config_with_4byte_as_number(request):
+ """
+ Configure BGP with 4 byte ASN and verify it works fine
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "local_as": 131079
+ }
+ },
+ "r2": {
+ "bgp": {
+ "local_as": 131079
+ }
+ },
+ "r3": {
+ "bgp": {
+ "local_as": 131079
+ }
+ },
+ "r4": {
+ "bgp": {
+ "local_as": 131080
+ }
+ }
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ result = verify_as_numbers(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
+def test_bgp_timers_functionality(request):
+ """
+ Test to modify bgp timers and verify timers functionality.
+ """
+
+ tgen = get_topogen()
+ if BGP_CONVERGENCE is not True:
+ pytest.skip('skipped because of BGP Convergence failure')
+
+ # test case name
+ tc_name = request.node.name
+ write_test_header(tc_name)
+
+ # Creating configuration from JSON
+ reset_config_on_routers(tgen)
+
+ # Api call to modfiy BGP timerse
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "address_family": {
+ "ipv4": {
+ "unicast": {
+ "neighbor": {
+ "r2": {
+ "dest_link":{
+ "r1": {
+ "keepalivetimer": 60,
+ "holddowntimer": 180,
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ result = create_router_bgp(tgen, topo, deepcopy(input_dict))
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ # Api call to clear bgp, so timer modification would take place
+ clear_bgp_and_verify(tgen, topo, 'r1')
+
+ # Verifying bgp timers functionality
+ result = verify_bgp_timers_and_functionality(tgen, topo, input_dict)
+ assert result is True, "Testcase {} :Failed \n Error: {}". \
+ format(tc_name, result)
+
+ write_test_footer(tc_name)
+
+
if __name__ == '__main__':
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))
diff --git a/tests/topotests/lib/bgp.py b/tests/topotests/lib/bgp.py
index 0c554f2eec..04e4b6ebac 100644
--- a/tests/topotests/lib/bgp.py
+++ b/tests/topotests/lib/bgp.py
@@ -694,3 +694,565 @@ def verify_bgp_convergence(tgen, topo):
logger.info("Exiting API: verify_bgp_confergence()")
return True
+
+def modify_as_number(tgen, topo, input_dict):
+ """
+ API reads local_as and remote_as from user defined input_dict and
+ modify router"s ASNs accordingly. Router"s config is modified and
+ recent/changed config is loadeded to router.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : defines for which router ASNs needs to be modified
+
+ Usage
+ -----
+ To modify ASNs for router r1
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "local_as": 131079
+ }
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: modify_as_number()")
+ try:
+
+ new_topo = deepcopy(topo["routers"])
+ router_dict = {}
+ for router in input_dict.keys():
+ # Remove bgp configuration
+
+ router_dict.update({
+ router: {
+ "bgp": {
+ "delete": True
+ }
+ }
+ })
+
+ new_topo[router]["bgp"]["local_as"] = \
+ input_dict[router]["bgp"]["local_as"]
+
+ logger.info("Removing bgp configuration")
+ create_router_bgp(tgen, topo, router_dict)
+
+ logger.info("Applying modified bgp configuration")
+ create_router_bgp(tgen, new_topo)
+
+ except Exception as e:
+ # handle any exception
+ logger.error("Error %s occured. Arguments %s.", e.message, e.args)
+
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.info("Exiting lib API: modify_as_number()")
+
+ return True
+
+
+def verify_as_numbers(tgen, topo, input_dict):
+ """
+ This API is to verify AS numbers for given DUT by running
+ "show ip bgp neighbor json" command. Local AS and Remote AS
+ will ve verified with input_dict data and command output.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type` : ip type, ipv4/ipv6
+ * `input_dict`: defines - for which router, AS numbers needs to be verified
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "local_as": 131079
+ }
+ }
+ }
+ result = verify_as_numbers(tgen, topo, addr_type, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_as_numbers()")
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+
+ logger.info("Verifying AS numbers for dut %s:", router)
+
+ show_ip_bgp_neighbor_json = rnode.vtysh_cmd(
+ "show ip bgp neighbor json", isjson=True)
+ local_as = input_dict[router]["bgp"]["local_as"]
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+
+ for addr_type in bgp_addr_type:
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"][
+ "neighbor"]
+
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"]
+ for dest_link, peer_dict in peer_data["dest_link"].iteritems():
+ neighbor_ip = None
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type]. \
+ split("/")[0]
+ neigh_data = show_ip_bgp_neighbor_json[neighbor_ip]
+ # Verify Local AS for router
+ if neigh_data["localAs"] != local_as:
+ errormsg = "Failed: Verify local_as for dut {}," \
+ " found: {} but expected: {}".format(
+ router, neigh_data["localAs"],
+ local_as)
+ return errormsg
+ else:
+ logger.info("Verified local_as for dut %s, found"
+ " expected: %s", router, local_as)
+
+ # Verify Remote AS for neighbor
+ if neigh_data["remoteAs"] != remote_as:
+ errormsg = "Failed: Verify remote_as for dut " \
+ "{}'s neighbor {}, found: {} but " \
+ "expected: {}".format(
+ router, bgp_neighbor,
+ neigh_data["remoteAs"], remote_as)
+ return errormsg
+ else:
+ logger.info("Verified remote_as for dut %s's "
+ "neighbor %s, found expected: %s",
+ router, bgp_neighbor, remote_as)
+
+ logger.info("Exiting lib API: verify_AS_numbers()")
+ return True
+
+
+def clear_bgp_and_verify(tgen, topo, router):
+ """
+ This API is to clear bgp neighborship and verify bgp neighborship
+ is coming up(BGP is converged) usinf "show bgp summary json" command
+ and also verifying for all bgp neighbors uptime before and after
+ clear bgp sessions is different as the uptime must be changed once
+ bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `router`: device under test
+
+ Usage
+ -----
+ result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: clear_bgp_and_verify()")
+
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+
+ peer_uptime_before_clear_bgp = {}
+ # Verifying BGP convergence before bgp clear command
+ for retry in range(1, 11):
+ show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+ logger.info(show_bgp_json)
+ # Verifying output dictionary show_bgp_json is empty or not
+ if not bool(show_bgp_json):
+ errormsg = "BGP is not running"
+ return errormsg
+
+ sleeptime = 2 * retry
+ if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
+ # Waiting for BGP to converge
+ logger.info("Waiting for %s sec for BGP to converge on router"
+ " %s...", sleeptime, router)
+ sleep(sleeptime)
+ else:
+ errormsg = "TIMEOUT!! BGP is not converged in {} seconds for" \
+ " router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
+ return errormsg
+
+ # To find neighbor ip type
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+ for addr_type in bgp_addr_type.keys():
+ total_peer = 0
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor in bgp_neighbors:
+ total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+
+ for addr_type in bgp_addr_type:
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+ no_of_peer = 0
+
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ for dest_link, peer_dict in peer_data["dest_link"].iteritems():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type].split(
+ "/")[0]
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"][
+ "peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+
+ # Peer up time dictionary
+ peer_uptime_before_clear_bgp[bgp_neighbor] = \
+ ipv4_data[neighbor_ip]["peerUptime"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"][
+ "peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ # Peer up time dictionary
+ peer_uptime_before_clear_bgp[bgp_neighbor] = \
+ ipv6_data[neighbor_ip]["peerUptime"]
+
+ if nh_state == "Established":
+ no_of_peer += 1
+
+ if no_of_peer == total_peer:
+ logger.info("BGP is Converged for router %s before bgp"
+ " clear", router)
+ break
+ else:
+ logger.warning("BGP is not yet Converged for router %s "
+ "before bgp clear", router)
+
+ # Clearing BGP
+ logger.info("Clearing BGP neighborship for router %s..", router)
+ for addr_type in bgp_addr_type.keys():
+ if addr_type == "ipv4":
+ rnode.vtysh_cmd("clear ip bgp *")
+ elif addr_type == "ipv6":
+ rnode.vtysh_cmd("clear bgp ipv6 *")
+
+ peer_uptime_after_clear_bgp = {}
+ # Verifying BGP convergence after bgp clear command
+ for retry in range(1, 11):
+ show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+ # Verifying output dictionary show_bgp_json is empty or not
+ if not bool(show_bgp_json):
+ errormsg = "BGP is not running"
+ return errormsg
+
+ sleeptime = 2 * retry
+ if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
+ # Waiting for BGP to converge
+ logger.info("Waiting for %s sec for BGP to converge on router"
+ " %s...", sleeptime, router)
+ sleep(sleeptime)
+ else:
+ errormsg = "TIMEOUT!! BGP is not converged in {} seconds for" \
+ " router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
+ return errormsg
+
+ # To find neighbor ip type
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+
+ for addr_type in bgp_addr_type.keys():
+ total_peer = 0
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor in bgp_neighbors:
+ total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+
+ for addr_type in bgp_addr_type:
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+ no_of_peer = 0
+
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ for dest_link, peer_dict in peer_data["dest_link"].iteritems():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type]. \
+ split("/")[0]
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"][
+ "peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ peer_uptime_after_clear_bgp[bgp_neighbor] = \
+ ipv4_data[neighbor_ip]["peerUptime"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"][
+ "peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+ # Peer up time dictionary
+ peer_uptime_after_clear_bgp[bgp_neighbor] = \
+ ipv6_data[neighbor_ip]["peerUptime"]
+
+ if nh_state == "Established":
+ no_of_peer += 1
+
+ if no_of_peer == total_peer:
+ logger.info("BGP is Converged for router %s after bgp clear",
+ router)
+ break
+ else:
+ logger.warning("BGP is not yet Converged for router %s after"
+ " bgp clear", router)
+
+ # Compariung peerUptime dictionaries
+ if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
+ logger.info("BGP neighborship is reset after clear BGP on router %s",
+ router)
+ else:
+ errormsg = "BGP neighborship is not reset after clear bgp on router" \
+ " {}".format(router)
+ return errormsg
+
+ logger.info("Exiting lib API: clear_bgp_and_verify()")
+ return True
+
+
+def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
+ """
+ To verify BGP timer config, execute "show ip bgp neighbor json" command
+ and verify bgp timers with input_dict data.
+ To veirfy bgp timers functonality, shutting down peer interface
+ and verify BGP neighborship status.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type`: ip type, ipv4/ipv6
+ * `input_dict`: defines for which router, bgp timers needs to be verified
+
+ Usage:
+ # To verify BGP timers for neighbor r2 of router r1
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "bgp_neighbors":{
+ "r2":{
+ "keepalivetimer": 5,
+ "holddowntimer": 15,
+ }}}}}
+ result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
+ input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_bgp_timers_and_functionality()")
+ sleep(5)
+ router_list = tgen.routers()
+ for router in input_dict.keys():
+ if router not in router_list:
+ continue
+
+ rnode = router_list[router]
+
+ logger.info("Verifying bgp timers functionality, DUT is %s:",
+ router)
+
+ show_ip_bgp_neighbor_json = \
+ rnode.vtysh_cmd("show ip bgp neighbor json", isjson=True)
+
+ bgp_addr_type = input_dict[router]["bgp"]["address_family"]
+
+ for addr_type in bgp_addr_type:
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"][
+ "neighbor"]
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ for dest_link, peer_dict in peer_data["dest_link"].iteritems():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ keepalivetimer = peer_dict["keepalivetimer"]
+ holddowntimer = peer_dict["holddowntimer"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type]. \
+ split("/")[0]
+ neighbor_intf = data[dest_link]["interface"]
+
+ # Verify HoldDownTimer for neighbor
+ bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[
+ neighbor_ip]["bgpTimerHoldTimeMsecs"]
+ if bgpHoldTimeMsecs != holddowntimer * 1000:
+ errormsg = "Verifying holddowntimer for bgp " \
+ "neighbor {} under dut {}, found: {} " \
+ "but expected: {}".format(
+ neighbor_ip, router,
+ bgpHoldTimeMsecs,
+ holddowntimer * 1000)
+ return errormsg
+
+ # Verify KeepAliveTimer for neighbor
+ bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[
+ neighbor_ip]["bgpTimerKeepAliveIntervalMsecs"]
+ if bgpKeepAliveTimeMsecs != keepalivetimer * 1000:
+ errormsg = "Verifying keepalivetimer for bgp " \
+ "neighbor {} under dut {}, found: {} " \
+ "but expected: {}".format(
+ neighbor_ip, router,
+ bgpKeepAliveTimeMsecs,
+ keepalivetimer * 1000)
+ return errormsg
+
+ ####################
+ # Shutting down peer interface after keepalive time and
+ # after some time bringing up peer interface.
+ # verifying BGP neighborship in (hold down-keep alive)
+ # time, it should not go down
+ ####################
+
+ # Wait till keep alive time
+ logger.info("=" * 20)
+ logger.info("Scenario 1:")
+ logger.info("Shutdown and bring up peer interface: %s "
+ "in keep alive time : %s sec and verify "
+ " BGP neighborship is intact in %s sec ",
+ neighbor_intf, keepalivetimer,
+ (holddowntimer - keepalivetimer))
+ logger.info("=" * 20)
+ logger.info("Waiting for %s sec..", keepalivetimer)
+ sleep(keepalivetimer)
+
+ # Shutting down peer ineterface
+ logger.info("Shutting down interface %s on router %s",
+ neighbor_intf, bgp_neighbor)
+ topotest.interface_set_status(
+ router_list[bgp_neighbor], neighbor_intf,
+ ifaceaction=False)
+
+ # Bringing up peer interface
+ sleep(5)
+ logger.info("Bringing up interface %s on router %s..",
+ neighbor_intf, bgp_neighbor)
+ topotest.interface_set_status(
+ router_list[bgp_neighbor], neighbor_intf,
+ ifaceaction=True)
+
+ # Verifying BGP neighborship is intact in
+ # (holddown - keepalive) time
+ for timer in range(keepalivetimer, holddowntimer,
+ int(holddowntimer / 3)):
+ logger.info("Waiting for %s sec..", keepalivetimer)
+ sleep(keepalivetimer)
+ sleep(2)
+ show_bgp_json = \
+ rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ if timer == \
+ (holddowntimer - keepalivetimer):
+ if nh_state != "Established":
+ errormsg = "BGP neighborship has not gone " \
+ "down in {} sec for neighbor {}\n" \
+ "show_bgp_json: \n {} ".format(
+ timer, bgp_neighbor,
+ show_bgp_json)
+ return errormsg
+ else:
+ logger.info("BGP neighborship is intact in %s"
+ " sec for neighbor %s \n "
+ "show_bgp_json : \n %s",
+ timer, bgp_neighbor,
+ show_bgp_json)
+
+ ####################
+ # Shutting down peer interface and verifying that BGP
+ # neighborship is going down in holddown time
+ ####################
+ logger.info("=" * 20)
+ logger.info("Scenario 2:")
+ logger.info("Shutdown peer interface: %s and verify BGP"
+ " neighborship has gone down in hold down "
+ "time %s sec", neighbor_intf, holddowntimer)
+ logger.info("=" * 20)
+
+ logger.info("Shutting down interface %s on router %s..",
+ neighbor_intf, bgp_neighbor)
+ topotest.interface_set_status(router_list[bgp_neighbor],
+ neighbor_intf,
+ ifaceaction=False)
+
+ # Verifying BGP neighborship is going down in holddown time
+ for timer in range(keepalivetimer,
+ (holddowntimer + keepalivetimer),
+ int(holddowntimer / 3)):
+ logger.info("Waiting for %s sec..", keepalivetimer)
+ sleep(keepalivetimer)
+ sleep(2)
+ show_bgp_json = \
+ rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ if timer == holddowntimer:
+ if nh_state == "Established":
+ errormsg = "BGP neighborship has not gone " \
+ "down in {} sec for neighbor {}\n" \
+ "show_bgp_json: \n {} ".format(
+ timer, bgp_neighbor,
+ show_bgp_json)
+ return errormsg
+ else:
+ logger.info("BGP neighborship has gone down in"
+ " %s sec for neighbor %s \n"
+ "show_bgp_json : \n %s",
+ timer, bgp_neighbor,
+ show_bgp_json)
+
+ logger.info("Exiting lib API: verify_bgp_timers_and_functionality()")
+ return True
+
diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py
index b6bb89f78b..78d8d022ad 100644
--- a/tests/topotests/lib/common_config.py
+++ b/tests/topotests/lib/common_config.py
@@ -20,6 +20,7 @@
from collections import OrderedDict
from datetime import datetime
+import StringIO
import os
import ConfigParser
import traceback
@@ -194,6 +195,88 @@ def create_common_configuration(tgen, router, data, config_type=None,
return True
+def reset_config_on_routers(tgen, routerName=None):
+ """
+ Resets configuration on routers to the snapshot created using input JSON
+ file. It replaces existing router configuration with FRRCFG_BKUP_FILE
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `routerName` : router config is to be reset
+ """
+
+ logger.debug("Entering API: reset_config_on_routers")
+
+ router_list = tgen.routers()
+ for rname, router in router_list.iteritems():
+ if routerName and routerName != rname:
+ continue
+
+ cfg = router.run("vtysh -c 'show running'")
+ fname = "{}/{}/frr.sav".format(TMPDIR, rname)
+ dname = "{}/{}/delta.conf".format(TMPDIR, rname)
+ f = open(fname, "w")
+ for line in cfg.split("\n"):
+ line = line.strip()
+
+ if (line == "Building configuration..." or
+ line == "Current configuration:" or
+ not line):
+ continue
+ f.write(line)
+ f.write("\n")
+
+ f.close()
+
+ command = "/usr/lib/frr/frr-reload.py --input {}/{}/frr.sav" \
+ " --test {}/{}/frr_json_initial.conf > {}". \
+ format(TMPDIR, rname, TMPDIR, rname, dname)
+ result = os.system(command)
+
+ # Assert if command fail
+ if result > 0:
+ errormsg = ("Command:{} is failed due to non-zero exit"
+ " code".format(command))
+ return errormsg
+
+ f = open(dname, "r")
+ delta = StringIO.StringIO()
+ delta.write("configure terminal\n")
+ t_delta = f.read()
+ for line in t_delta.split("\n"):
+ line = line.strip()
+ if (line == "Lines To Delete" or
+ line == "===============" or
+ line == "Lines To Add" or
+ line == "============" or
+ not line):
+ continue
+ delta.write(line)
+ delta.write("\n")
+
+ delta.write("end\n")
+ output = router.vtysh_multicmd(delta.getvalue(),
+ pretty_output=False)
+ logger.info("New configuration for router {}:".format(rname))
+ delta.close()
+ delta = StringIO.StringIO()
+ cfg = router.run("vtysh -c 'show running'")
+ for line in cfg.split("\n"):
+ line = line.strip()
+ delta.write(line)
+ delta.write("\n")
+
+ # Router current configuration to log file or console if
+ # "show_router_config" is defined in "pytest.ini"
+ if show_router_config:
+ logger.info(delta.getvalue())
+ delta.close()
+
+ logger.debug("Exting API: reset_config_on_routers")
+ return True
+
+
def load_config_to_router(tgen, routerName, save_bkup=False):
"""
Loads configuration on router from the file FRRCFG_FILE.