logger.info("Exiting API: verify_bgp_confergence()")
return True
+
+def modify_as_number(tgen, topo, input_dict):
+ """
+ API reads local_as and remote_as from user defined input_dict and
+ modify router"s ASNs accordingly. Router"s config is modified and
+ recent/changed config is loadeded to router.
+
+ Parameters
+ ----------
+ * `tgen` : Topogen object
+ * `topo` : json file data
+ * `input_dict` : defines for which router ASNs needs to be modified
+
+ Usage
+ -----
+ To modify ASNs for router r1
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "local_as": 131079
+ }
+ }
+ result = modify_as_number(tgen, topo, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: modify_as_number()")
+ try:
+
+ new_topo = deepcopy(topo["routers"])
+ router_dict = {}
+ for router in input_dict.keys():
+ # Remove bgp configuration
+
+ router_dict.update({
+ router: {
+ "bgp": {
+ "delete": True
+ }
+ }
+ })
+
+ new_topo[router]["bgp"]["local_as"] = \
+ input_dict[router]["bgp"]["local_as"]
+
+ logger.info("Removing bgp configuration")
+ create_router_bgp(tgen, topo, router_dict)
+
+ logger.info("Applying modified bgp configuration")
+ create_router_bgp(tgen, new_topo)
+
+ except Exception as e:
+ # handle any exception
+ logger.error("Error %s occured. Arguments %s.", e.message, e.args)
+
+ # Traceback
+ errormsg = traceback.format_exc()
+ logger.error(errormsg)
+ return errormsg
+
+ logger.info("Exiting lib API: modify_as_number()")
+
+ return True
+
+
+def verify_as_numbers(tgen, topo, input_dict):
+ """
+ This API is to verify AS numbers for given DUT by running
+ "show ip bgp neighbor json" command. Local AS and Remote AS
+ will ve verified with input_dict data and command output.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type` : ip type, ipv4/ipv6
+ * `input_dict`: defines - for which router, AS numbers needs to be verified
+
+ Usage
+ -----
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "local_as": 131079
+ }
+ }
+ }
+ result = verify_as_numbers(tgen, topo, addr_type, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_as_numbers()")
+ for router in input_dict.keys():
+ if router not in tgen.routers():
+ continue
+
+ rnode = tgen.routers()[router]
+
+ logger.info("Verifying AS numbers for dut %s:", router)
+
+ show_ip_bgp_neighbor_json = rnode.vtysh_cmd(
+ "show ip bgp neighbor json", isjson=True)
+ local_as = input_dict[router]["bgp"]["local_as"]
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+
+ for addr_type in bgp_addr_type:
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"][
+ "neighbor"]
+
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"]
+ for dest_link, peer_dict in peer_data["dest_link"].iteritems():
+ neighbor_ip = None
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type]. \
+ split("/")[0]
+ neigh_data = show_ip_bgp_neighbor_json[neighbor_ip]
+ # Verify Local AS for router
+ if neigh_data["localAs"] != local_as:
+ errormsg = "Failed: Verify local_as for dut {}," \
+ " found: {} but expected: {}".format(
+ router, neigh_data["localAs"],
+ local_as)
+ return errormsg
+ else:
+ logger.info("Verified local_as for dut %s, found"
+ " expected: %s", router, local_as)
+
+ # Verify Remote AS for neighbor
+ if neigh_data["remoteAs"] != remote_as:
+ errormsg = "Failed: Verify remote_as for dut " \
+ "{}'s neighbor {}, found: {} but " \
+ "expected: {}".format(
+ router, bgp_neighbor,
+ neigh_data["remoteAs"], remote_as)
+ return errormsg
+ else:
+ logger.info("Verified remote_as for dut %s's "
+ "neighbor %s, found expected: %s",
+ router, bgp_neighbor, remote_as)
+
+ logger.info("Exiting lib API: verify_AS_numbers()")
+ return True
+
+
+def clear_bgp_and_verify(tgen, topo, router):
+ """
+ This API is to clear bgp neighborship and verify bgp neighborship
+ is coming up(BGP is converged) usinf "show bgp summary json" command
+ and also verifying for all bgp neighbors uptime before and after
+ clear bgp sessions is different as the uptime must be changed once
+ bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `router`: device under test
+
+ Usage
+ -----
+ result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: clear_bgp_and_verify()")
+
+ if router not in tgen.routers():
+ return False
+
+ rnode = tgen.routers()[router]
+
+ peer_uptime_before_clear_bgp = {}
+ # Verifying BGP convergence before bgp clear command
+ for retry in range(1, 11):
+ show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+ logger.info(show_bgp_json)
+ # Verifying output dictionary show_bgp_json is empty or not
+ if not bool(show_bgp_json):
+ errormsg = "BGP is not running"
+ return errormsg
+
+ sleeptime = 2 * retry
+ if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
+ # Waiting for BGP to converge
+ logger.info("Waiting for %s sec for BGP to converge on router"
+ " %s...", sleeptime, router)
+ sleep(sleeptime)
+ else:
+ errormsg = "TIMEOUT!! BGP is not converged in {} seconds for" \
+ " router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
+ return errormsg
+
+ # To find neighbor ip type
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+ for addr_type in bgp_addr_type.keys():
+ total_peer = 0
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor in bgp_neighbors:
+ total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+
+ for addr_type in bgp_addr_type:
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+ no_of_peer = 0
+
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ for dest_link, peer_dict in peer_data["dest_link"].iteritems():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type].split(
+ "/")[0]
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"][
+ "peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+
+ # Peer up time dictionary
+ peer_uptime_before_clear_bgp[bgp_neighbor] = \
+ ipv4_data[neighbor_ip]["peerUptime"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"][
+ "peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ # Peer up time dictionary
+ peer_uptime_before_clear_bgp[bgp_neighbor] = \
+ ipv6_data[neighbor_ip]["peerUptime"]
+
+ if nh_state == "Established":
+ no_of_peer += 1
+
+ if no_of_peer == total_peer:
+ logger.info("BGP is Converged for router %s before bgp"
+ " clear", router)
+ break
+ else:
+ logger.warning("BGP is not yet Converged for router %s "
+ "before bgp clear", router)
+
+ # Clearing BGP
+ logger.info("Clearing BGP neighborship for router %s..", router)
+ for addr_type in bgp_addr_type.keys():
+ if addr_type == "ipv4":
+ rnode.vtysh_cmd("clear ip bgp *")
+ elif addr_type == "ipv6":
+ rnode.vtysh_cmd("clear bgp ipv6 *")
+
+ peer_uptime_after_clear_bgp = {}
+ # Verifying BGP convergence after bgp clear command
+ for retry in range(1, 11):
+ show_bgp_json = rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+ # Verifying output dictionary show_bgp_json is empty or not
+ if not bool(show_bgp_json):
+ errormsg = "BGP is not running"
+ return errormsg
+
+ sleeptime = 2 * retry
+ if sleeptime <= BGP_CONVERGENCE_TIMEOUT:
+ # Waiting for BGP to converge
+ logger.info("Waiting for %s sec for BGP to converge on router"
+ " %s...", sleeptime, router)
+ sleep(sleeptime)
+ else:
+ errormsg = "TIMEOUT!! BGP is not converged in {} seconds for" \
+ " router {}".format(BGP_CONVERGENCE_TIMEOUT, router)
+ return errormsg
+
+ # To find neighbor ip type
+ bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
+
+ for addr_type in bgp_addr_type.keys():
+ total_peer = 0
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+
+ for bgp_neighbor in bgp_neighbors:
+ total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
+
+ for addr_type in bgp_addr_type:
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
+ no_of_peer = 0
+
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ for dest_link, peer_dict in peer_data["dest_link"].iteritems():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type]. \
+ split("/")[0]
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"][
+ "peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ peer_uptime_after_clear_bgp[bgp_neighbor] = \
+ ipv4_data[neighbor_ip]["peerUptime"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"][
+ "peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+ # Peer up time dictionary
+ peer_uptime_after_clear_bgp[bgp_neighbor] = \
+ ipv6_data[neighbor_ip]["peerUptime"]
+
+ if nh_state == "Established":
+ no_of_peer += 1
+
+ if no_of_peer == total_peer:
+ logger.info("BGP is Converged for router %s after bgp clear",
+ router)
+ break
+ else:
+ logger.warning("BGP is not yet Converged for router %s after"
+ " bgp clear", router)
+
+ # Compariung peerUptime dictionaries
+ if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
+ logger.info("BGP neighborship is reset after clear BGP on router %s",
+ router)
+ else:
+ errormsg = "BGP neighborship is not reset after clear bgp on router" \
+ " {}".format(router)
+ return errormsg
+
+ logger.info("Exiting lib API: clear_bgp_and_verify()")
+ return True
+
+
+def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
+ """
+ To verify BGP timer config, execute "show ip bgp neighbor json" command
+ and verify bgp timers with input_dict data.
+ To veirfy bgp timers functonality, shutting down peer interface
+ and verify BGP neighborship status.
+
+ Parameters
+ ----------
+ * `tgen`: topogen object
+ * `topo`: input json file data
+ * `addr_type`: ip type, ipv4/ipv6
+ * `input_dict`: defines for which router, bgp timers needs to be verified
+
+ Usage:
+ # To verify BGP timers for neighbor r2 of router r1
+ input_dict = {
+ "r1": {
+ "bgp": {
+ "bgp_neighbors":{
+ "r2":{
+ "keepalivetimer": 5,
+ "holddowntimer": 15,
+ }}}}}
+ result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
+ input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.info("Entering lib API: verify_bgp_timers_and_functionality()")
+ sleep(5)
+ router_list = tgen.routers()
+ for router in input_dict.keys():
+ if router not in router_list:
+ continue
+
+ rnode = router_list[router]
+
+ logger.info("Verifying bgp timers functionality, DUT is %s:",
+ router)
+
+ show_ip_bgp_neighbor_json = \
+ rnode.vtysh_cmd("show ip bgp neighbor json", isjson=True)
+
+ bgp_addr_type = input_dict[router]["bgp"]["address_family"]
+
+ for addr_type in bgp_addr_type:
+ if not check_address_types(addr_type):
+ continue
+
+ bgp_neighbors = bgp_addr_type[addr_type]["unicast"][
+ "neighbor"]
+ for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
+ for dest_link, peer_dict in peer_data["dest_link"].iteritems():
+ data = topo["routers"][bgp_neighbor]["links"]
+
+ keepalivetimer = peer_dict["keepalivetimer"]
+ holddowntimer = peer_dict["holddowntimer"]
+
+ if dest_link in data:
+ neighbor_ip = data[dest_link][addr_type]. \
+ split("/")[0]
+ neighbor_intf = data[dest_link]["interface"]
+
+ # Verify HoldDownTimer for neighbor
+ bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[
+ neighbor_ip]["bgpTimerHoldTimeMsecs"]
+ if bgpHoldTimeMsecs != holddowntimer * 1000:
+ errormsg = "Verifying holddowntimer for bgp " \
+ "neighbor {} under dut {}, found: {} " \
+ "but expected: {}".format(
+ neighbor_ip, router,
+ bgpHoldTimeMsecs,
+ holddowntimer * 1000)
+ return errormsg
+
+ # Verify KeepAliveTimer for neighbor
+ bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[
+ neighbor_ip]["bgpTimerKeepAliveIntervalMsecs"]
+ if bgpKeepAliveTimeMsecs != keepalivetimer * 1000:
+ errormsg = "Verifying keepalivetimer for bgp " \
+ "neighbor {} under dut {}, found: {} " \
+ "but expected: {}".format(
+ neighbor_ip, router,
+ bgpKeepAliveTimeMsecs,
+ keepalivetimer * 1000)
+ return errormsg
+
+ ####################
+ # Shutting down peer interface after keepalive time and
+ # after some time bringing up peer interface.
+ # verifying BGP neighborship in (hold down-keep alive)
+ # time, it should not go down
+ ####################
+
+ # Wait till keep alive time
+ logger.info("=" * 20)
+ logger.info("Scenario 1:")
+ logger.info("Shutdown and bring up peer interface: %s "
+ "in keep alive time : %s sec and verify "
+ " BGP neighborship is intact in %s sec ",
+ neighbor_intf, keepalivetimer,
+ (holddowntimer - keepalivetimer))
+ logger.info("=" * 20)
+ logger.info("Waiting for %s sec..", keepalivetimer)
+ sleep(keepalivetimer)
+
+ # Shutting down peer ineterface
+ logger.info("Shutting down interface %s on router %s",
+ neighbor_intf, bgp_neighbor)
+ topotest.interface_set_status(
+ router_list[bgp_neighbor], neighbor_intf,
+ ifaceaction=False)
+
+ # Bringing up peer interface
+ sleep(5)
+ logger.info("Bringing up interface %s on router %s..",
+ neighbor_intf, bgp_neighbor)
+ topotest.interface_set_status(
+ router_list[bgp_neighbor], neighbor_intf,
+ ifaceaction=True)
+
+ # Verifying BGP neighborship is intact in
+ # (holddown - keepalive) time
+ for timer in range(keepalivetimer, holddowntimer,
+ int(holddowntimer / 3)):
+ logger.info("Waiting for %s sec..", keepalivetimer)
+ sleep(keepalivetimer)
+ sleep(2)
+ show_bgp_json = \
+ rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ if timer == \
+ (holddowntimer - keepalivetimer):
+ if nh_state != "Established":
+ errormsg = "BGP neighborship has not gone " \
+ "down in {} sec for neighbor {}\n" \
+ "show_bgp_json: \n {} ".format(
+ timer, bgp_neighbor,
+ show_bgp_json)
+ return errormsg
+ else:
+ logger.info("BGP neighborship is intact in %s"
+ " sec for neighbor %s \n "
+ "show_bgp_json : \n %s",
+ timer, bgp_neighbor,
+ show_bgp_json)
+
+ ####################
+ # Shutting down peer interface and verifying that BGP
+ # neighborship is going down in holddown time
+ ####################
+ logger.info("=" * 20)
+ logger.info("Scenario 2:")
+ logger.info("Shutdown peer interface: %s and verify BGP"
+ " neighborship has gone down in hold down "
+ "time %s sec", neighbor_intf, holddowntimer)
+ logger.info("=" * 20)
+
+ logger.info("Shutting down interface %s on router %s..",
+ neighbor_intf, bgp_neighbor)
+ topotest.interface_set_status(router_list[bgp_neighbor],
+ neighbor_intf,
+ ifaceaction=False)
+
+ # Verifying BGP neighborship is going down in holddown time
+ for timer in range(keepalivetimer,
+ (holddowntimer + keepalivetimer),
+ int(holddowntimer / 3)):
+ logger.info("Waiting for %s sec..", keepalivetimer)
+ sleep(keepalivetimer)
+ sleep(2)
+ show_bgp_json = \
+ rnode.vtysh_cmd("show bgp summary json",
+ isjson=True)
+
+ if addr_type == "ipv4":
+ ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
+ nh_state = ipv4_data[neighbor_ip]["state"]
+ else:
+ ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
+ nh_state = ipv6_data[neighbor_ip]["state"]
+
+ if timer == holddowntimer:
+ if nh_state == "Established":
+ errormsg = "BGP neighborship has not gone " \
+ "down in {} sec for neighbor {}\n" \
+ "show_bgp_json: \n {} ".format(
+ timer, bgp_neighbor,
+ show_bgp_json)
+ return errormsg
+ else:
+ logger.info("BGP neighborship has gone down in"
+ " %s sec for neighbor %s \n"
+ "show_bgp_json : \n %s",
+ timer, bgp_neighbor,
+ show_bgp_json)
+
+ logger.info("Exiting lib API: verify_bgp_timers_and_functionality()")
+ return True
+