]> git.puffer.fish Git - mirror/frr.git/commitdiff
tests: fix pylint error, and update style in lib/*.py 13546/head
authorChristian Hopps <chopps@labn.net>
Wed, 17 May 2023 17:40:39 +0000 (13:40 -0400)
committerChristian Hopps <chopps@labn.net>
Wed, 17 May 2023 17:42:32 +0000 (13:42 -0400)
Signed-off-by: Christian Hopps <chopps@labn.net>
tests/topotests/lib/bgp.py
tests/topotests/lib/bgprib.py
tests/topotests/lib/common_config.py
tests/topotests/lib/lutil.py
tests/topotests/lib/pim.py
tests/topotests/lib/test/test_json.py
tests/topotests/lib/topojson.py

index a0b12b2d6425b138ccaee1e404e23c9a0548f777..c58da996d7bf88b21932c7e988c11fec19437216 100644 (file)
@@ -164,7 +164,6 @@ def create_router_bgp(tgen, topo=None, input_dict=None, build=False, load_config
                         router,
                     )
                 else:
-
                     ipv4_data = bgp_addr_data.setdefault("ipv4", {})
                     ipv6_data = bgp_addr_data.setdefault("ipv6", {})
                     l2vpn_data = bgp_addr_data.setdefault("l2vpn", {})
@@ -645,7 +644,6 @@ def __create_l2vpn_evpn_address_family(
 
         if advertise_data:
             for address_type, unicast_type in advertise_data.items():
-
                 if type(unicast_type) is dict:
                     for key, value in unicast_type.items():
                         cmd = "advertise {} {}".format(address_type, key)
@@ -1651,7 +1649,6 @@ def modify_as_number(tgen, topo, input_dict):
 
     logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
     try:
-
         new_topo = deepcopy(topo["routers"])
         router_dict = {}
         for router in input_dict.keys():
@@ -1953,7 +1950,6 @@ def clear_bgp_and_verify(tgen, topo, router, rid=None):
 
         total_peer = 0
         for addr_type in bgp_addr_type.keys():
-
             if not check_address_types(addr_type):
                 continue
 
@@ -2022,7 +2018,6 @@ def clear_bgp_and_verify(tgen, topo, router, rid=None):
     peer_uptime_after_clear_bgp = {}
     # Verifying BGP convergence after bgp clear command
     for retry in range(50):
-
         # Waiting for BGP to converge
         logger.info(
             "Waiting for %s sec for BGP to converge on router" " %s...",
@@ -3278,7 +3273,6 @@ def verify_graceful_restart(
 
             if lmode is None:
                 if "graceful-restart" in input_dict[dut]["bgp"]:
-
                     if (
                         "graceful-restart" in input_dict[dut]["bgp"]["graceful-restart"]
                         and input_dict[dut]["bgp"]["graceful-restart"][
@@ -3326,7 +3320,6 @@ def verify_graceful_restart(
 
             if rmode is None:
                 if "graceful-restart" in input_dict[peer]["bgp"]:
-
                     if (
                         "graceful-restart"
                         in input_dict[peer]["bgp"]["graceful-restart"]
@@ -3628,7 +3621,6 @@ def verify_eor(tgen, topo, addr_type, input_dict, dut, peer, expected=True):
 
             eor_json = show_bgp_graceful_json_out[afi]["endOfRibStatus"]
             if "endOfRibSend" in eor_json:
-
                 if eor_json["endOfRibSend"]:
                     logger.info(
                         "[DUT: %s]: EOR Send true for %s " "%s", dut, neighbor_ip, afi
@@ -3918,7 +3910,6 @@ def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer)
                         "timer"
                     ].items():
                         if rs_timer == "restart-time":
-
                             receivedTimer = value
                             if (
                                 show_bgp_graceful_json_out["timers"][
@@ -4777,7 +4768,6 @@ def get_prefix_count_route(
     tgen = get_topogen()
     for router, rnode in tgen.routers().items():
         if router == dut:
-
             if vrf:
                 ipv4_cmd = "sh ip bgp vrf {} summary json".format(vrf)
                 show_bgp_json_ipv4 = run_frr_cmd(rnode, ipv4_cmd, isjson=True)
@@ -4871,7 +4861,6 @@ def verify_rib_default_route(
     connected_routes = {}
     for router, rnode in tgen.routers().items():
         if router == dut:
-
             ipv4_routes = run_frr_cmd(rnode, "sh ip bgp json", isjson=True)
             ipv6_routes = run_frr_cmd(rnode, "sh ip bgp ipv6 unicast json", isjson=True)
     is_ipv4_default_attrib_found = False
index cd449cb50c727beb2c8deaa1de9d7879df8e32b2..699c7a4da6125156d91c25a6040b1a3ce7a084bb 100644 (file)
@@ -25,6 +25,7 @@ from lib.lutil import luCommand, luResult, LUtil
 import json
 import re
 
+
 # gpz: get rib in json form and compare against desired routes
 class BgpRib:
     def log(self, str):
index d19d8db75cbddf81b2356352389c9d9d4d65e15c..67afe8739f12d4d1b0a9187ab919a82c6c768851 100644 (file)
@@ -243,7 +243,6 @@ def run_frr_cmd(rnode, cmd, isjson=False):
 
 
 def apply_raw_config(tgen, input_dict):
-
     """
     API to configure raw configuration on device. This can be used for any cli
     which has not been implemented in JSON.
@@ -447,7 +446,6 @@ def check_router_status(tgen):
     try:
         router_list = tgen.routers()
         for router, rnode in router_list.items():
-
             result = rnode.check_router_running()
             if result != "":
                 daemons = []
@@ -1914,7 +1912,6 @@ def interface_status(tgen, topo, input_dict):
         rlist = []
 
         for router in input_dict.keys():
-
             interface_list = input_dict[router]["interface_list"]
             status = input_dict[router].setdefault("status", "up")
             for intf in interface_list:
@@ -2529,7 +2526,6 @@ def create_route_maps(tgen, input_dict, build=False):
                 continue
             rmap_data = []
             for rmap_name, rmap_value in input_dict[router]["route_maps"].items():
-
                 for rmap_dict in rmap_value:
                     del_action = rmap_dict.setdefault("delete", False)
 
@@ -3002,7 +2998,6 @@ def addKernelRoute(
         group_addr_range = [group_addr_range]
 
     for grp_addr in group_addr_range:
-
         addr_type = validate_ip_address(grp_addr)
         if addr_type == "ipv4":
             if next_hop is not None:
@@ -3193,7 +3188,6 @@ def configure_brctl(tgen, topo, input_dict):
 
         if "brctl" in input_dict[dut]:
             for brctl_dict in input_dict[dut]["brctl"]:
-
                 brctl_names = brctl_dict.setdefault("brctl_name", [])
                 addvxlans = brctl_dict.setdefault("addvxlan", [])
                 stp_values = brctl_dict.setdefault("stp", [])
@@ -3203,7 +3197,6 @@ def configure_brctl(tgen, topo, input_dict):
                 for brctl_name, vxlan, vrf, stp in zip(
                     brctl_names, addvxlans, vrfs, stp_values
                 ):
-
                     ip_cmd_list = []
                     cmd = "ip link add name {} type bridge stp_state {}".format(
                         brctl_name, stp
@@ -3614,7 +3607,6 @@ def verify_rib(
 
                 for static_route in static_routes:
                     if "vrf" in static_route and static_route["vrf"] is not None:
-
                         logger.info(
                             "[DUT: {}]: Verifying routes for VRF:"
                             " {}".format(router, static_route["vrf"])
@@ -4053,7 +4045,6 @@ def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None, protocol=
 
                 for static_route in static_routes:
                     if "vrf" in static_route and static_route["vrf"] is not None:
-
                         logger.info(
                             "[DUT: {}]: Verifying routes for VRF:"
                             " {}".format(router, static_route["vrf"])
index 9aef41cd1cd7b5061a027f24c13407ea11dbd5cc..1eb88f26d4c5108cbfb8749713a3c613e65e109b 100644 (file)
@@ -177,7 +177,17 @@ Total %-4d                                                           %-4d %d\n\
             self.log("unable to read: " + tstFile)
             sys.exit(1)
 
-    def command(self, target, command, regexp, op, result, returnJson, startt=None, force_result=False):
+    def command(
+        self,
+        target,
+        command,
+        regexp,
+        op,
+        result,
+        returnJson,
+        startt=None,
+        force_result=False,
+    ):
         global net
         if op == "jsoncmp_pass" or op == "jsoncmp_fail":
             returnJson = True
@@ -326,7 +336,9 @@ Total %-4d                                                           %-4d %d\n\
             if strict and (wait_count == 1):
                 force_result = True
 
-            found = self.command(target, command, regexp, op, result, returnJson, startt, force_result)
+            found = self.command(
+                target, command, regexp, op, result, returnJson, startt, force_result
+            )
             if found is not False:
                 break
 
@@ -342,6 +354,7 @@ Total %-4d                                                           %-4d %d\n\
 # initialized by luStart
 LUtil = None
 
+
 # entry calls
 def luStart(
     baseScriptDir=".",
@@ -455,6 +468,7 @@ def luShowFail():
     if printed > 0:
         logger.error("See %s for details of errors" % LUtil.fout_name)
 
+
 #
 # Sets default wait type for luCommand(op="wait) (may be overridden by
 # specifying luCommand(op="wait-strict") or luCommand(op="wait-nostrict")).
index 925890b324912aa7bbcd5001aef93a5d9f20ef22..e26bdb3af3b860f5761c0f67d5b65d86dc8d3224 100644 (file)
@@ -593,7 +593,6 @@ def find_rp_details(tgen, topo):
     topo_data = topo["routers"]
 
     for router in router_list.keys():
-
         if "pim" not in topo_data[router]:
             continue
 
@@ -1495,7 +1494,6 @@ def verify_mroutes(
                             and data["outboundInterface"] in oil
                         ):
                             if return_uptime:
-
                                 uptime_dict[grp_addr][src_address] = data["upTime"]
 
                             logger.info(
@@ -1917,7 +1915,6 @@ def get_pim_interface_traffic(tgen, input_dict):
             for intf, data in input_dict[dut].items():
                 interface_json = show_pim_intf_traffic_json[intf]
                 for state in data:
-
                     # Verify Tx/Rx
                     if state in interface_json:
                         output_dict[dut][state] = interface_json[state]
@@ -1990,7 +1987,6 @@ def get_pim6_interface_traffic(tgen, input_dict):
             for intf, data in input_dict[dut].items():
                 interface_json = show_pim_intf_traffic_json[intf]
                 for state in data:
-
                     # Verify Tx/Rx
                     if state in interface_json:
                         output_dict[dut][state] = interface_json[state]
@@ -3007,7 +3003,6 @@ def verify_pim_upstream_rpf(
     logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
 
     if "pim" in topo["routers"][dut]:
-
         logger.info("[DUT: %s]: Verifying ip pim upstream rpf:", dut)
 
         rnode = tgen.routers()[dut]
@@ -3245,7 +3240,6 @@ def verify_pim_join(
 
         grp_addr = grp_addr.split("/")[0]
         for source, data in interface_json[grp_addr].items():
-
             # Verify pim join
             if pim_join:
                 if data["group"] == grp_addr and data["channelJoinName"] == "JOIN":
@@ -3338,7 +3332,6 @@ def verify_igmp_config(tgen, input_dict, stats_return=False, expected=True):
         rnode = tgen.routers()[dut]
 
         for interface, data in input_dict[dut]["igmp"]["interfaces"].items():
-
             statistics = False
             report = False
             if "statistics" in input_dict[dut]["igmp"]["interfaces"][interface]["igmp"]:
@@ -3623,7 +3616,6 @@ def verify_pim_config(tgen, input_dict, expected=True):
         rnode = tgen.routers()[dut]
 
         for interface, data in input_dict[dut]["pim"]["interfaces"].items():
-
             logger.info("[DUT: %s]: Verifying PIM interface %s detail:", dut, interface)
 
             show_ip_igmp_intf_json = run_frr_cmd(
@@ -3772,7 +3764,6 @@ def verify_multicast_traffic(tgen, input_dict, return_traffic=False, expected=Tr
                     elif (
                         interface_json["pktsIn"] != 0 and interface_json["bytesIn"] != 0
                     ):
-
                         traffic_dict[traffic_type][interface][
                             "pktsIn"
                         ] = interface_json["pktsIn"]
@@ -3836,7 +3827,6 @@ def verify_multicast_traffic(tgen, input_dict, return_traffic=False, expected=Tr
                         interface_json["pktsOut"] != 0
                         and interface_json["bytesOut"] != 0
                     ):
-
                         traffic_dict[traffic_type][interface][
                             "pktsOut"
                         ] = interface_json["pktsOut"]
@@ -4232,7 +4222,6 @@ def verify_local_igmp_groups(tgen, dut, interface, group_addresses):
         group_addresses = [group_addresses]
 
     if interface not in show_ip_local_igmp_json:
-
         errormsg = (
             "[DUT %s]: Verifying local IGMP group received"
             " from interface %s [FAILED]!! " % (dut, interface)
@@ -4319,7 +4308,6 @@ def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type=
         for intf, data in input_dict[dut].items():
             interface_json = show_pim_intf_traffic_json[intf]
             for state in data:
-
                 # Verify Tx/Rx
                 if state in interface_json:
                     output_dict[dut][state] = interface_json[state]
@@ -4525,7 +4513,6 @@ def verify_mld_config(tgen, input_dict, stats_return=False, expected=True):
     for dut in input_dict.keys():
         rnode = tgen.routers()[dut]
         for interface, data in input_dict[dut]["mld"]["interfaces"].items():
-
             statistics = False
             report = False
             if "statistics" in input_dict[dut]["mld"]["interfaces"][interface]["mld"]:
@@ -5040,7 +5027,6 @@ def verify_pim6_config(tgen, input_dict, expected=True):
         rnode = tgen.routers()[dut]
 
         for interface, data in input_dict[dut]["pim6"]["interfaces"].items():
-
             logger.info(
                 "[DUT: %s]: Verifying PIM6 interface %s detail:", dut, interface
             )
@@ -5158,7 +5144,6 @@ def verify_local_mld_groups(tgen, dut, interface, group_addresses):
         group_addresses = [group_addresses]
 
     if interface not in show_ipv6_local_mld_json["default"]:
-
         errormsg = (
             "[DUT %s]: Verifying local MLD group received"
             " from interface %s [FAILED]!! " % (dut, interface)
index ae58d9b2d1fd44cb4ceaf2ba888b492b16a4f1a6..230c2bd7f7d893a608a2d6490f45dd97d4024bb2 100755 (executable)
@@ -616,7 +616,6 @@ def test_json_object_asterisk_matching():
 
 
 def test_json_list_nested_with_objects():
-
     dcomplete = [{"key": 1, "list": [123]}, {"key": 2, "list": [123]}]
 
     dsub1 = [{"key": 2, "list": [123]}, {"key": 1, "list": [123]}]
index ad6dddc76b4213267293e5ea3eb788bf7f5824f8..23a6c869934a811330bacb1eec5b7ab024f75089 100644 (file)
@@ -206,7 +206,6 @@ def build_topo_from_json(tgen, topo=None):
             for destRouterLink, data in sorted(
                 topo["switches"][curSwitch]["links"].items()
             ):
-
                 # Loopback interfaces
                 if "dst_node" in data:
                     destRouter = data["dst_node"]
@@ -220,7 +219,6 @@ def build_topo_from_json(tgen, topo=None):
                     destRouter = destRouterLink
 
                 if destRouter in listAllRouters:
-
                     topo["routers"][destRouter]["links"][curSwitch] = deepcopy(
                         topo["switches"][curSwitch]["links"][destRouterLink]
                     )
@@ -398,7 +396,7 @@ def setup_module_from_json(testfile, json_file=None):
     tgen = create_tgen_from_json(testfile, json_file)
 
     # Start routers (and their daemons)
-    start_topology(tgen, topo_daemons(tgen))
+    start_topology(tgen)
 
     # Configure routers
     build_config_from_json(tgen)