router,
)
else:
-
ipv4_data = bgp_addr_data.setdefault("ipv4", {})
ipv6_data = bgp_addr_data.setdefault("ipv6", {})
l2vpn_data = bgp_addr_data.setdefault("l2vpn", {})
if advertise_data:
for address_type, unicast_type in advertise_data.items():
-
if type(unicast_type) is dict:
for key, value in unicast_type.items():
cmd = "advertise {} {}".format(address_type, key)
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
try:
-
new_topo = deepcopy(topo["routers"])
router_dict = {}
for router in input_dict.keys():
total_peer = 0
for addr_type in bgp_addr_type.keys():
-
if not check_address_types(addr_type):
continue
peer_uptime_after_clear_bgp = {}
# Verifying BGP convergence after bgp clear command
for retry in range(50):
-
# Waiting for BGP to converge
logger.info(
"Waiting for %s sec for BGP to converge on router" " %s...",
if lmode is None:
if "graceful-restart" in input_dict[dut]["bgp"]:
-
if (
"graceful-restart" in input_dict[dut]["bgp"]["graceful-restart"]
and input_dict[dut]["bgp"]["graceful-restart"][
if rmode is None:
if "graceful-restart" in input_dict[peer]["bgp"]:
-
if (
"graceful-restart"
in input_dict[peer]["bgp"]["graceful-restart"]
eor_json = show_bgp_graceful_json_out[afi]["endOfRibStatus"]
if "endOfRibSend" in eor_json:
-
if eor_json["endOfRibSend"]:
logger.info(
"[DUT: %s]: EOR Send true for %s " "%s", dut, neighbor_ip, afi
"timer"
].items():
if rs_timer == "restart-time":
-
receivedTimer = value
if (
show_bgp_graceful_json_out["timers"][
tgen = get_topogen()
for router, rnode in tgen.routers().items():
if router == dut:
-
if vrf:
ipv4_cmd = "sh ip bgp vrf {} summary json".format(vrf)
show_bgp_json_ipv4 = run_frr_cmd(rnode, ipv4_cmd, isjson=True)
connected_routes = {}
for router, rnode in tgen.routers().items():
if router == dut:
-
ipv4_routes = run_frr_cmd(rnode, "sh ip bgp json", isjson=True)
ipv6_routes = run_frr_cmd(rnode, "sh ip bgp ipv6 unicast json", isjson=True)
is_ipv4_default_attrib_found = False
import json
import re
+
# gpz: get rib in json form and compare against desired routes
class BgpRib:
def log(self, str):
def apply_raw_config(tgen, input_dict):
-
"""
API to configure raw configuration on device. This can be used for any cli
which has not been implemented in JSON.
try:
router_list = tgen.routers()
for router, rnode in router_list.items():
-
result = rnode.check_router_running()
if result != "":
daemons = []
rlist = []
for router in input_dict.keys():
-
interface_list = input_dict[router]["interface_list"]
status = input_dict[router].setdefault("status", "up")
for intf in interface_list:
continue
rmap_data = []
for rmap_name, rmap_value in input_dict[router]["route_maps"].items():
-
for rmap_dict in rmap_value:
del_action = rmap_dict.setdefault("delete", False)
group_addr_range = [group_addr_range]
for grp_addr in group_addr_range:
-
addr_type = validate_ip_address(grp_addr)
if addr_type == "ipv4":
if next_hop is not None:
if "brctl" in input_dict[dut]:
for brctl_dict in input_dict[dut]["brctl"]:
-
brctl_names = brctl_dict.setdefault("brctl_name", [])
addvxlans = brctl_dict.setdefault("addvxlan", [])
stp_values = brctl_dict.setdefault("stp", [])
for brctl_name, vxlan, vrf, stp in zip(
brctl_names, addvxlans, vrfs, stp_values
):
-
ip_cmd_list = []
cmd = "ip link add name {} type bridge stp_state {}".format(
brctl_name, stp
for static_route in static_routes:
if "vrf" in static_route and static_route["vrf"] is not None:
-
logger.info(
"[DUT: {}]: Verifying routes for VRF:"
" {}".format(router, static_route["vrf"])
for static_route in static_routes:
if "vrf" in static_route and static_route["vrf"] is not None:
-
logger.info(
"[DUT: {}]: Verifying routes for VRF:"
" {}".format(router, static_route["vrf"])
self.log("unable to read: " + tstFile)
sys.exit(1)
- def command(self, target, command, regexp, op, result, returnJson, startt=None, force_result=False):
+ def command(
+ self,
+ target,
+ command,
+ regexp,
+ op,
+ result,
+ returnJson,
+ startt=None,
+ force_result=False,
+ ):
global net
if op == "jsoncmp_pass" or op == "jsoncmp_fail":
returnJson = True
if strict and (wait_count == 1):
force_result = True
- found = self.command(target, command, regexp, op, result, returnJson, startt, force_result)
+ found = self.command(
+ target, command, regexp, op, result, returnJson, startt, force_result
+ )
if found is not False:
break
# initialized by luStart
LUtil = None
+
# entry calls
def luStart(
baseScriptDir=".",
if printed > 0:
logger.error("See %s for details of errors" % LUtil.fout_name)
+
#
# Sets default wait type for luCommand(op="wait) (may be overridden by
# specifying luCommand(op="wait-strict") or luCommand(op="wait-nostrict")).
topo_data = topo["routers"]
for router in router_list.keys():
-
if "pim" not in topo_data[router]:
continue
and data["outboundInterface"] in oil
):
if return_uptime:
-
uptime_dict[grp_addr][src_address] = data["upTime"]
logger.info(
for intf, data in input_dict[dut].items():
interface_json = show_pim_intf_traffic_json[intf]
for state in data:
-
# Verify Tx/Rx
if state in interface_json:
output_dict[dut][state] = interface_json[state]
for intf, data in input_dict[dut].items():
interface_json = show_pim_intf_traffic_json[intf]
for state in data:
-
# Verify Tx/Rx
if state in interface_json:
output_dict[dut][state] = interface_json[state]
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
if "pim" in topo["routers"][dut]:
-
logger.info("[DUT: %s]: Verifying ip pim upstream rpf:", dut)
rnode = tgen.routers()[dut]
grp_addr = grp_addr.split("/")[0]
for source, data in interface_json[grp_addr].items():
-
# Verify pim join
if pim_join:
if data["group"] == grp_addr and data["channelJoinName"] == "JOIN":
rnode = tgen.routers()[dut]
for interface, data in input_dict[dut]["igmp"]["interfaces"].items():
-
statistics = False
report = False
if "statistics" in input_dict[dut]["igmp"]["interfaces"][interface]["igmp"]:
rnode = tgen.routers()[dut]
for interface, data in input_dict[dut]["pim"]["interfaces"].items():
-
logger.info("[DUT: %s]: Verifying PIM interface %s detail:", dut, interface)
show_ip_igmp_intf_json = run_frr_cmd(
elif (
interface_json["pktsIn"] != 0 and interface_json["bytesIn"] != 0
):
-
traffic_dict[traffic_type][interface][
"pktsIn"
] = interface_json["pktsIn"]
interface_json["pktsOut"] != 0
and interface_json["bytesOut"] != 0
):
-
traffic_dict[traffic_type][interface][
"pktsOut"
] = interface_json["pktsOut"]
group_addresses = [group_addresses]
if interface not in show_ip_local_igmp_json:
-
errormsg = (
"[DUT %s]: Verifying local IGMP group received"
" from interface %s [FAILED]!! " % (dut, interface)
for intf, data in input_dict[dut].items():
interface_json = show_pim_intf_traffic_json[intf]
for state in data:
-
# Verify Tx/Rx
if state in interface_json:
output_dict[dut][state] = interface_json[state]
for dut in input_dict.keys():
rnode = tgen.routers()[dut]
for interface, data in input_dict[dut]["mld"]["interfaces"].items():
-
statistics = False
report = False
if "statistics" in input_dict[dut]["mld"]["interfaces"][interface]["mld"]:
rnode = tgen.routers()[dut]
for interface, data in input_dict[dut]["pim6"]["interfaces"].items():
-
logger.info(
"[DUT: %s]: Verifying PIM6 interface %s detail:", dut, interface
)
group_addresses = [group_addresses]
if interface not in show_ipv6_local_mld_json["default"]:
-
errormsg = (
"[DUT %s]: Verifying local MLD group received"
" from interface %s [FAILED]!! " % (dut, interface)
def test_json_list_nested_with_objects():
-
dcomplete = [{"key": 1, "list": [123]}, {"key": 2, "list": [123]}]
dsub1 = [{"key": 2, "list": [123]}, {"key": 1, "list": [123]}]
for destRouterLink, data in sorted(
topo["switches"][curSwitch]["links"].items()
):
-
# Loopback interfaces
if "dst_node" in data:
destRouter = data["dst_node"]
destRouter = destRouterLink
if destRouter in listAllRouters:
-
topo["routers"][destRouter]["links"][curSwitch] = deepcopy(
topo["switches"][curSwitch]["links"][destRouterLink]
)
tgen = create_tgen_from_json(testfile, json_file)
# Start routers (and their daemons)
- start_topology(tgen, topo_daemons(tgen))
+ start_topology(tgen)
# Configure routers
build_config_from_json(tgen)