diff options
39 files changed, 1753 insertions, 1106 deletions
diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index 61bc0ade65..c3a0fdedf0 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -2,6 +2,7 @@  # git blame --ignore-revs-file .git-blame-ignore-revs <...>  # or to make it permanent  # git config blame.ignoreRevsFile .git-blame-ignore-revs +9fa6ec14737b94fdfb41539d96c7e4f84f3514b6  701a01920eee5431d2052aad92aefbdf50ac2139  bf2394f08bdc91a6cbd3784a1bfa3af3247bb06f  0157c327715ca367d13b7f02b2981f3484ccdeeb diff --git a/tests/topotests/all-protocol-startup/test_all_protocol_startup.py b/tests/topotests/all-protocol-startup/test_all_protocol_startup.py index 08378d9b58..ab9358408e 100644 --- a/tests/topotests/all-protocol-startup/test_all_protocol_startup.py +++ b/tests/topotests/all-protocol-startup/test_all_protocol_startup.py @@ -1016,6 +1016,7 @@ def test_bgp_ipv6_summary():      # For debugging after starting FRR daemons, uncomment the next line      # CLI(net) +  def test_nht():      print("\n\n**** Test that nexthop tracking is at least nominally working ****\n") @@ -1026,13 +1027,16 @@ def test_nht():          expected = open(nhtFile).read().rstrip()          expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) -        actual = (net["r%s" %i].cmd('vtysh -c "show ip nht" 2> /dev/null').rstrip()) +        actual = net["r%s" % i].cmd('vtysh -c "show ip nht" 2> /dev/null').rstrip()          actual = re.sub(r"fd [0-9][0-9]", "fd XX", actual)          actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) -        diff = topotest.get_textdiff(actual, expected, -                                     title1="Actual `show ip nht`", -                                     title2="Expected `show ip nht`") +        diff = topotest.get_textdiff( +            actual, +            expected, +            title1="Actual `show ip nht`", +            title2="Expected `show ip nht`", +        )          if diff:              assert 0, "r%s failed ip nht check:\n%s\n" % (i, diff) @@ -1043,19 +1047,23 @@ def test_nht():          expected = open(nhtFile).read().rstrip()          expected = ("\n".join(expected.splitlines()) + "\n").splitlines(1) -        actual = (net["r%s" %i].cmd('vtysh -c "show ipv6 nht" 2> /dev/null').rstrip()) +        actual = net["r%s" % i].cmd('vtysh -c "show ipv6 nht" 2> /dev/null').rstrip()          actual = re.sub(r"fd [0-9][0-9]", "fd XX", actual)          actual = ("\n".join(actual.splitlines()) + "\n").splitlines(1) -        diff = topotest.get_textdiff(actual, expected, -                                     title1="Actual `show ip nht`", -                                     title2="Expected `show ip nht`") +        diff = topotest.get_textdiff( +            actual, +            expected, +            title1="Actual `show ip nht`", +            title2="Expected `show ip nht`", +        )          if diff:              assert 0, "r%s failed ipv6 nht check:\n%s\n" % (i, diff)          else:              print("show ipv6 nht is ok\n") +  def test_bgp_ipv4():      global fatal_error      global net diff --git a/tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py b/tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py index 595132214b..98a5033f53 100644 --- a/tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py +++ b/tests/topotests/bfd-bgp-cbit-topo3/test_bfd_bgp_cbit_topo3.py @@ -74,7 +74,8 @@ def setup_module(mod):      for rname, router in router_list.items():          router.load_config( -            TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname)), +            TopoRouter.RD_ZEBRA, +            os.path.join(CWD, "{}/zebra.conf".format(rname)),          )          router.load_config(              TopoRouter.RD_BFD, os.path.join(CWD, "{}/bfdd.conf".format(rname)) diff --git a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py index b3b7256ac4..b701a0d61e 100644 --- a/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py +++ b/tests/topotests/bgp-basic-functionality-topo1/test_bgp_basic_functionality.py @@ -282,10 +282,26 @@ def test_BGP_config_with_invalid_ASN_p2(request):      # Api call to modify AS number      input_dict = { -        "r1": {"bgp": {"local_as": 0,}}, -        "r2": {"bgp": {"local_as": 0,}}, -        "r3": {"bgp": {"local_as": 0,}}, -        "r4": {"bgp": {"local_as": 64000,}}, +        "r1": { +            "bgp": { +                "local_as": 0, +            } +        }, +        "r2": { +            "bgp": { +                "local_as": 0, +            } +        }, +        "r3": { +            "bgp": { +                "local_as": 0, +            } +        }, +        "r4": { +            "bgp": { +                "local_as": 64000, +            } +        },      }      result = modify_as_number(tgen, topo, input_dict)      try: @@ -819,7 +835,11 @@ def test_bgp_with_loopback_interface(request):              # Adding ['source_link'] = 'lo' key:value pair              topo["routers"][routerN]["bgp"]["address_family"]["ipv4"]["unicast"][                  "neighbor" -            ][bgp_neighbor]["dest_link"] = {"lo": {"source_link": "lo",}} +            ][bgp_neighbor]["dest_link"] = { +                "lo": { +                    "source_link": "lo", +                } +            }      # Creating configuration from JSON      build_config_from_json(tgen, topo) diff --git a/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py b/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py index 28a1b98eb3..353df0684b 100644 --- a/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py +++ b/tests/topotests/bgp-ecmp-topo2/test_ebgp_ecmp_topo2.py @@ -273,8 +273,20 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type):          "r3": {              "bgp": {                  "address_family": { -                    "ipv4": {"unicast": {"maximum_paths": {"ebgp": ecmp_num,}}}, -                    "ipv6": {"unicast": {"maximum_paths": {"ebgp": ecmp_num,}}}, +                    "ipv4": { +                        "unicast": { +                            "maximum_paths": { +                                "ebgp": ecmp_num, +                            } +                        } +                    }, +                    "ipv6": { +                        "unicast": { +                            "maximum_paths": { +                                "ebgp": ecmp_num, +                            } +                        } +                    },                  }              }          } @@ -303,7 +315,7 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type):              input_dict_1,              next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],              protocol=protocol, -            count_only=True +            count_only=True,          )          assert result is True, "Testcase {} : Failed \n Error: {}".format( @@ -371,8 +383,8 @@ def test_ecmp_after_clear_bgp(request, ecmp_num, test_type):  def test_ecmp_remove_redistribute_static(request): -    """ Verify routes are cleared from BGP and RIB table of DUT when -        redistribute static configuration is removed.""" +    """Verify routes are cleared from BGP and RIB table of DUT when +    redistribute static configuration is removed."""      tc_name = request.node.name      write_test_header(tc_name) @@ -481,8 +493,8 @@ def test_ecmp_remove_redistribute_static(request):  @pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"])  def test_ecmp_shut_bgp_neighbor(request, test_type): -    """ Shut BGP neigbors one by one and verify BGP and routing table updated -        accordingly in DUT """ +    """Shut BGP neigbors one by one and verify BGP and routing table updated +    accordingly in DUT"""      tc_name = request.node.name      write_test_header(tc_name) diff --git a/tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py b/tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py index 4d8003c4be..2f73bdb1b8 100644 --- a/tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py +++ b/tests/topotests/bgp-ecmp-topo2/test_ibgp_ecmp_topo2.py @@ -274,8 +274,20 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type):          "r3": {              "bgp": {                  "address_family": { -                    "ipv4": {"unicast": {"maximum_paths": {"ibgp": ecmp_num,}}}, -                    "ipv6": {"unicast": {"maximum_paths": {"ibgp": ecmp_num,}}}, +                    "ipv4": { +                        "unicast": { +                            "maximum_paths": { +                                "ibgp": ecmp_num, +                            } +                        } +                    }, +                    "ipv6": { +                        "unicast": { +                            "maximum_paths": { +                                "ibgp": ecmp_num, +                            } +                        } +                    },                  }              }          } @@ -304,7 +316,7 @@ def test_modify_ecmp_max_paths(request, ecmp_num, test_type):              input_dict_1,              next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],              protocol=protocol, -            count_only=True +            count_only=True,          )          assert result is True, "Testcase {} : Failed \n Error: {}".format( @@ -372,8 +384,8 @@ def test_ecmp_after_clear_bgp(request, ecmp_num, test_type):  def test_ecmp_remove_redistribute_static(request): -    """ Verify routes are cleared from BGP and RIB table of DUT when -        redistribute static configuration is removed.""" +    """Verify routes are cleared from BGP and RIB table of DUT when +    redistribute static configuration is removed."""      tc_name = request.node.name      write_test_header(tc_name) @@ -482,8 +494,8 @@ def test_ecmp_remove_redistribute_static(request):  @pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"])  def test_ecmp_shut_bgp_neighbor(request, test_type): -    """ Shut BGP neigbors one by one and verify BGP and routing table updated -        accordingly in DUT """ +    """Shut BGP neigbors one by one and verify BGP and routing table updated +    accordingly in DUT"""      tc_name = request.node.name      write_test_header(tc_name) diff --git a/tests/topotests/bgp-evpn-mh/test_evpn_mh.py b/tests/topotests/bgp-evpn-mh/test_evpn_mh.py index 4c56d1a02d..6a24684649 100644 --- a/tests/topotests/bgp-evpn-mh/test_evpn_mh.py +++ b/tests/topotests/bgp-evpn-mh/test_evpn_mh.py @@ -658,10 +658,11 @@ def test_evpn_mac():          assertmsg = '"{}" remote MAC content incorrect'.format(tor.name)          assert result is None, assertmsg +  def check_df_role(dut, esi, role): -    ''' +    """      Return error string if the df role on the dut is different -    ''' +    """      es_json = dut.vtysh_cmd("show evpn es %s json" % esi)      es = json.loads(es_json) @@ -676,12 +677,13 @@ def check_df_role(dut, esi, role):      return None +  def test_evpn_df(): -    ''' +    """      1. Check the DF role on all the PEs on rack-1.      2. Increase the DF preference on the non-DF and check if it becomes         the DF winner. -    ''' +    """      tgen = get_topogen() @@ -720,10 +722,11 @@ def test_evpn_df():      # tgen.mininet_cli() +  def check_protodown_rc(dut, protodown_rc): -    ''' +    """      check if specified protodown reason code is set -    ''' +    """      out = dut.vtysh_cmd("show evpn json") @@ -739,13 +742,14 @@ def check_protodown_rc(dut, protodown_rc):      return None +  def test_evpn_uplink_tracking(): -    ''' +    """      1. Wait for access ports to come out of startup-delay      2. disable uplinks and check if access ports have been protodowned      3. enable uplinks and check if access ports have been moved out         of protodown -    ''' +    """      tgen = get_topogen() @@ -778,6 +782,7 @@ def test_evpn_uplink_tracking():      assertmsg = '"{}" protodown rc incorrect'.format(dut_name)      assert result is None, assertmsg +  if __name__ == "__main__":      args = ["-s"] + sys.argv[1:]      sys.exit(pytest.main(args)) diff --git a/tests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py b/tests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py index 5098808d55..9a38158b2b 100755 --- a/tests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py +++ b/tests/topotests/bgp-evpn-vxlan_topo1/test_bgp_evpn_vxlan.py @@ -310,8 +310,10 @@ def ip_learn_test(tgen, host, local, remote, ip_addr):      assertmsg = "local learned mac wrong type: {} ".format(mac_type)      assert mac_type == "local", assertmsg -    assertmsg = "learned address mismatch with configured address host: {} learned: {}".format( -        ip_addr, learned_ip +    assertmsg = ( +        "learned address mismatch with configured address host: {} learned: {}".format( +            ip_addr, learned_ip +        )      )      assert ip_addr == learned_ip, assertmsg diff --git a/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py index 607b036c6a..a9541a55c5 100644 --- a/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py +++ b/tests/topotests/bgp-path-attributes-topo1/test_bgp_path_attributes.py @@ -238,9 +238,10 @@ def test_next_hop_attribute(request):          result = verify_rib(              tgen, addr_type, dut, input_dict, protocol=protocol, expected=False          ) -        assert result is not True, ( -            "Testcase {} : Failed \n Error: " -            "{} routes are not present in RIB".format(addr_type, tc_name) +        assert ( +            result is not True +        ), "Testcase {} : Failed \n Error: " "{} routes are not present in RIB".format( +            addr_type, tc_name          )      # Configure next-hop-self to bgp neighbor diff --git a/tests/topotests/bgp-route-map/test_route_map_topo1.py b/tests/topotests/bgp-route-map/test_route_map_topo1.py index 1aa951edaa..0158e24d31 100644 --- a/tests/topotests/bgp-route-map/test_route_map_topo1.py +++ b/tests/topotests/bgp-route-map/test_route_map_topo1.py @@ -807,7 +807,9 @@ def test_route_map_multiple_seq_different_match_set_clause_p0(request):                                          "prefix_lists": "pf_list_2_{}".format(addr_type)                                      }                                  }, -                                "set": {"locPrf": 150,}, +                                "set": { +                                    "locPrf": 150, +                                },                              },                              {                                  "action": "permit", @@ -906,7 +908,17 @@ def test_route_map_multiple_seq_different_match_set_clause_p0(request):          dut = "r3"          protocol = "bgp"          input_dict = { -            "r3": {"route_maps": {"rmap_match_pf_list1": [{"set": {"metric": 50,}}],}} +            "r3": { +                "route_maps": { +                    "rmap_match_pf_list1": [ +                        { +                            "set": { +                                "metric": 50, +                            } +                        } +                    ], +                } +            }          }          static_routes = [NETWORK[adt][0]] @@ -1093,7 +1105,14 @@ def test_route_map_set_only_no_match_p0(request):          input_dict_4 = {              "r3": {                  "route_maps": { -                    "rmap_match_pf_1": [{"action": "permit", "set": {"metric": 50,}}] +                    "rmap_match_pf_1": [ +                        { +                            "action": "permit", +                            "set": { +                                "metric": 50, +                            }, +                        } +                    ]                  }              }          } @@ -1210,7 +1229,13 @@ def test_route_map_match_only_no_set_p0(request):                  "r1": {                      "route_maps": {                          "rmap_match_pf_1_{}".format(addr_type): [ -                            {"action": "permit", "set": {"metric": 50, "locPrf": 150,}} +                            { +                                "action": "permit", +                                "set": { +                                    "metric": 50, +                                    "locPrf": 150, +                                }, +                            }                          ]                      }                  } diff --git a/tests/topotests/bgp-route-map/test_route_map_topo2.py b/tests/topotests/bgp-route-map/test_route_map_topo2.py index 3056aa29f3..958eceba62 100644 --- a/tests/topotests/bgp-route-map/test_route_map_topo2.py +++ b/tests/topotests/bgp-route-map/test_route_map_topo2.py @@ -268,12 +268,20 @@ def test_rmap_match_prefix_list_permit_in_and_outbound_prefixes_p0():              "prefix_lists": {                  "ipv4": {                      "pf_list_1_ipv4": [ -                        {"seqid": 10, "network": "any", "action": "permit",} +                        { +                            "seqid": 10, +                            "network": "any", +                            "action": "permit", +                        }                      ]                  },                  "ipv6": {                      "pf_list_1_ipv6": [ -                        {"seqid": 10, "network": "any", "action": "permit",} +                        { +                            "seqid": 10, +                            "network": "any", +                            "action": "permit", +                        }                      ]                  },              } @@ -472,7 +480,11 @@ def test_modify_set_match_clauses_in_rmap_p0():              "prefix_lists": {                  "ipv4": {                      "pf_list_1_ipv4": [ -                        {"seqid": 10, "network": "any", "action": "permit",} +                        { +                            "seqid": 10, +                            "network": "any", +                            "action": "permit", +                        }                      ],                      "pf_list_2_ipv4": [                          {"seqid": 10, "network": "any", "action": "permit"} @@ -480,7 +492,11 @@ def test_modify_set_match_clauses_in_rmap_p0():                  },                  "ipv6": {                      "pf_list_1_ipv6": [ -                        {"seqid": 10, "network": "any", "action": "permit",} +                        { +                            "seqid": 10, +                            "network": "any", +                            "action": "permit", +                        }                      ],                      "pf_list_2_ipv6": [                          {"seqid": 10, "network": "any", "action": "permit"} @@ -506,7 +522,9 @@ def test_modify_set_match_clauses_in_rmap_p0():                                      "prefix_lists": "pf_list_1_{}".format(addr_type)                                  }                              }, -                            "set": {"locPrf": 150,}, +                            "set": { +                                "locPrf": 150, +                            },                          }                      ],                      "rmap_match_pf_2_{}".format(addr_type): [ @@ -666,7 +684,9 @@ def test_modify_set_match_clauses_in_rmap_p0():                                      "prefix_lists": "pf_list_1_{}".format(addr_type)                                  }                              }, -                            "set": {"locPrf": 1000,}, +                            "set": { +                                "locPrf": 1000, +                            },                          }                      ],                      "rmap_match_pf_2_{}".format(addr_type): [ @@ -816,12 +836,20 @@ def test_modify_prefix_list_referenced_by_rmap_p0():              "prefix_lists": {                  "ipv4": {                      "pf_list_1_ipv4": [ -                        {"seqid": 10, "network": "any", "action": "permit",} +                        { +                            "seqid": 10, +                            "network": "any", +                            "action": "permit", +                        }                      ]                  },                  "ipv6": {                      "pf_list_1_ipv6": [ -                        {"seqid": 100, "network": "any", "action": "permit",} +                        { +                            "seqid": 100, +                            "network": "any", +                            "action": "permit", +                        }                      ]                  },              } @@ -1090,7 +1118,9 @@ def test_remove_prefix_list_referenced_by_rmap_p0():                                      "prefix_lists": "pf_list_1_{}".format(addr_type)                                  }                              }, -                            "set": {"locPrf": 150,}, +                            "set": { +                                "locPrf": 150, +                            },                          }                      ],                      "rmap_match_pf_2_{}".format(addr_type): [ @@ -1894,7 +1924,9 @@ def test_multiple_match_statement_in_route_map_logical_ANDed_p1():                                      "prefix_lists": "pf_list_1_{}".format(addr_type)                                  }                              }, -                            "set": {"locPrf": 150,}, +                            "set": { +                                "locPrf": 150, +                            },                          }                      ]                  } @@ -1921,7 +1953,9 @@ def test_multiple_match_statement_in_route_map_logical_ANDed_p1():                                      }                                  }                              }, -                            "set": {"locPrf": 150,}, +                            "set": { +                                "locPrf": 150, +                            },                          }                      ]                  } @@ -2048,7 +2082,9 @@ def test_add_remove_rmap_to_specific_neighbor_p0():                                      "prefix_lists": "pf_list_1_{}".format(addr_type)                                  }                              }, -                            "set": {"locPrf": 150,}, +                            "set": { +                                "locPrf": 150, +                            },                          }                      ]                  } @@ -3505,7 +3541,9 @@ def test_create_rmap_match_prefix_list_to_deny_in_and_outbound_prefixes_p0():                                      "prefix_lists": "pf_list_1_{}".format(addr_type)                                  }                              }, -                            "set": {"locPrf": 150,}, +                            "set": { +                                "locPrf": 150, +                            },                          }                      ],                      "rmap_match_pf_2_{}".format(addr_type): [ diff --git a/tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py b/tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py index 36f1d8cd56..71f64e9b70 100644 --- a/tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py +++ b/tests/topotests/bgp-vrf-route-leak-basic/test_bgp-vrf-route-leak-basic.py @@ -90,7 +90,11 @@ def test_vrf_route_leak():      # Test DONNA VRF.      expect = { -        "10.0.0.0/24": [{"protocol": "connected",}], +        "10.0.0.0/24": [ +            { +                "protocol": "connected", +            } +        ],          "10.0.1.0/24": [              {"protocol": "bgp", "selected": True, "nexthops": [{"fib": True}]}          ], @@ -111,11 +115,19 @@ def test_vrf_route_leak():          "10.0.0.0/24": [              {"protocol": "bgp", "selected": True, "nexthops": [{"fib": True}]}          ], -        "10.0.1.0/24": [{"protocol": "connected",}], +        "10.0.1.0/24": [ +            { +                "protocol": "connected", +            } +        ],          "10.0.2.0/24": [              {"protocol": "bgp", "selected": True, "nexthops": [{"fib": True}]}          ], -        "10.0.3.0/24": [{"protocol": "connected",}], +        "10.0.3.0/24": [ +            { +                "protocol": "connected", +            } +        ],      }      test_func = partial( diff --git a/tests/topotests/bgp_features/test_bgp_features.py b/tests/topotests/bgp_features/test_bgp_features.py index bc821bd7a0..5f3809c2b3 100644 --- a/tests/topotests/bgp_features/test_bgp_features.py +++ b/tests/topotests/bgp_features/test_bgp_features.py @@ -753,41 +753,71 @@ def test_bgp_delayopen_without():          pytest.skip(tgen.errors)      # part 1: no delay r1 <=> no delay r4 -    logger.info("Starting optional test of BGP functionality without DelayOpenTimer enabled to establish a reference for following tests") +    logger.info( +        "Starting optional test of BGP functionality without DelayOpenTimer enabled to establish a reference for following tests" +    )      # 1.1 enable peering shutdown      logger.info("Enable shutdown of peering between r1 and r4") -    tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 shutdown"') -    tgen.net["r4"].cmd('vtysh -c "conf t" -c "router bgp 65100" -c "neighbor 192.168.101.1 shutdown"') +    tgen.net["r1"].cmd( +        'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 shutdown"' +    ) +    tgen.net["r4"].cmd( +        'vtysh -c "conf t" -c "router bgp 65100" -c "neighbor 192.168.101.1 shutdown"' +    )      # 1.2 wait for peers to shut down (poll output)      for router_num in [1, 4]: -        logger.info("Checking BGP summary after enabling shutdown of peering on r{}".format(router_num)) +        logger.info( +            "Checking BGP summary after enabling shutdown of peering on r{}".format( +                router_num +            ) +        )          router = tgen.gears["r{}".format(router_num)] -        reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num)) +        reffile = os.path.join( +            CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num) +        )          expected = json.loads(open(reffile).read()) -        test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) +        test_func = functools.partial( +            topotest.router_json_cmp, router, "show ip bgp summary json", expected +        )          _, res = topotest.run_and_expect(test_func, None, count=3, wait=1)          assertmsg = "BGP session on r{} did not shut down peer".format(router_num)          assert res is None, assertmsg      # 1.3 disable peering shutdown      logger.info("Disable shutdown of peering between r1 and r4") -    tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 shutdown"') -    tgen.net["r4"].cmd('vtysh -c "conf t" -c "router bgp 65100" -c "no neighbor 192.168.101.1 shutdown"') +    tgen.net["r1"].cmd( +        'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 shutdown"' +    ) +    tgen.net["r4"].cmd( +        'vtysh -c "conf t" -c "router bgp 65100" -c "no neighbor 192.168.101.1 shutdown"' +    )      # 1.4 wait for peers to establish connection (poll output)      for router_num in [1, 4]: -        logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num)) +        logger.info( +            "Checking BGP summary after disabling shutdown of peering on r{}".format( +                router_num +            ) +        )          router = tgen.gears["r{}".format(router_num)] -        reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num)) +        reffile = os.path.join( +            CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num) +        )          expected = json.loads(open(reffile).read()) -        test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) +        test_func = functools.partial( +            topotest.router_json_cmp, router, "show ip bgp summary json", expected +        )          _, res = topotest.run_and_expect(test_func, None, count=5, wait=1) -        assertmsg = "BGP session on r{} did not establish a connection with peer".format(router_num) +        assertmsg = ( +            "BGP session on r{} did not establish a connection with peer".format( +                router_num +            ) +        )          assert res is None, assertmsg -    #tgen.mininet_cli() +    # tgen.mininet_cli()      # end test_bgp_delayopen_without @@ -800,65 +830,107 @@ def test_bgp_delayopen_singular():          pytest.skip(tgen.errors)      # part 2: delay 240s r1 <=> no delay r4 -    logger.info("Starting test of BGP functionality and behaviour with DelayOpenTimer enabled on one side of the peering") +    logger.info( +        "Starting test of BGP functionality and behaviour with DelayOpenTimer enabled on one side of the peering" +    )      # 2.1 enable peering shutdown      logger.info("Enable shutdown of peering between r1 and r4") -    tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 shutdown"') -    tgen.net["r4"].cmd('vtysh -c "conf t" -c "router bgp 65100" -c "neighbor 192.168.101.1 shutdown"') +    tgen.net["r1"].cmd( +        'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 shutdown"' +    ) +    tgen.net["r4"].cmd( +        'vtysh -c "conf t" -c "router bgp 65100" -c "neighbor 192.168.101.1 shutdown"' +    )      # 2.2 wait for peers to shut down (poll output)      for router_num in [1, 4]: -        logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num)) +        logger.info( +            "Checking BGP summary after disabling shutdown of peering on r{}".format( +                router_num +            ) +        )          router = tgen.gears["r{}".format(router_num)] -        reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num)) +        reffile = os.path.join( +            CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num) +        )          expected = json.loads(open(reffile).read()) -        test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) +        test_func = functools.partial( +            topotest.router_json_cmp, router, "show ip bgp summary json", expected +        )          _, res = topotest.run_and_expect(test_func, None, count=3, wait=1)          assertmsg = "BGP session on r{} did not shut down peer".format(router_num)          assert res is None, assertmsg      # 2.3 set delayopen on R1 to 240      logger.info("Setting DelayOpenTime for neighbor r4 to 240 seconds on r1") -    tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 timers delayopen 240"') +    tgen.net["r1"].cmd( +        'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.101.2 timers delayopen 240"' +    )      # 2.4 check config (poll output)      logger.info("Checking BGP neighbor configuration after setting DelayOpenTime on r1")      router = tgen.gears["r1"]      reffile = os.path.join(CWD, "r1/bgp_delayopen_neighbor.json")      expected = json.loads(open(reffile).read()) -    test_func = functools.partial(topotest.router_json_cmp, router, "show bgp neighbors json", expected) +    test_func = functools.partial( +        topotest.router_json_cmp, router, "show bgp neighbors json", expected +    )      _, res = topotest.run_and_expect(test_func, None, count=3, wait=1)      assertmsg = "BGP session on r1 failed to set DelayOpenTime for r4"      assert res is None, assertmsg      # 2.5 disable peering shutdown      logger.info("Disable shutdown of peering between r1 and r4") -    tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 shutdown"') -    tgen.net["r4"].cmd('vtysh -c "conf t" -c "router bgp 65100" -c "no neighbor 192.168.101.1 shutdown"') +    tgen.net["r1"].cmd( +        'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 shutdown"' +    ) +    tgen.net["r4"].cmd( +        'vtysh -c "conf t" -c "router bgp 65100" -c "no neighbor 192.168.101.1 shutdown"' +    )      # 2.6 wait for peers to establish connection (poll output)      for router_num in [1, 4]: -        logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num)) +        logger.info( +            "Checking BGP summary after disabling shutdown of peering on r{}".format( +                router_num +            ) +        )          router = tgen.gears["r{}".format(router_num)] -        reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num)) +        reffile = os.path.join( +            CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num) +        )          expected = json.loads(open(reffile).read()) -        test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) +        test_func = functools.partial( +            topotest.router_json_cmp, router, "show ip bgp summary json", expected +        )          _, res = topotest.run_and_expect(test_func, None, count=5, wait=1) -        assertmsg = "BGP session on r{} did not establish a connection with peer".format(router_num) +        assertmsg = ( +            "BGP session on r{} did not establish a connection with peer".format( +                router_num +            ) +        )          assert res is None, assertmsg      # 2.7 unset delayopen on R1      logger.info("Disabling DelayOpenTimer for neighbor r4 on r1") -    tgen.net["r1"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 timers delayopen"') +    tgen.net["r1"].cmd( +        'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.101.2 timers delayopen"' +    )      # 2.8 check config (poll output) -    logger.info("Checking BGP neighbor configuration after disabling DelayOpenTimer on r1") -    delayopen_cfg = tgen.net["r1"].cmd('vtysh -c "show bgp neighbors json" | grep "DelayOpenTimeMsecs"').rstrip() +    logger.info( +        "Checking BGP neighbor configuration after disabling DelayOpenTimer on r1" +    ) +    delayopen_cfg = ( +        tgen.net["r1"] +        .cmd('vtysh -c "show bgp neighbors json" | grep "DelayOpenTimeMsecs"') +        .rstrip() +    )      assertmsg = "BGP session on r1 failed disable DelayOpenTimer for peer r4"      assert delayopen_cfg == "", assertmsg -    #tgen.mininet_cli() +    # tgen.mininet_cli()      # end test_bgp_delayopen_singular @@ -870,67 +942,119 @@ def test_bgp_delayopen_dual():          pytest.skip(tgen.errors)      # part 3: delay 60s R2 <=> delay 30s R5 -    logger.info("Starting test of BGP functionality and behaviour with DelayOpenTimer enabled on both sides of the peering with different timer intervals") +    logger.info( +        "Starting test of BGP functionality and behaviour with DelayOpenTimer enabled on both sides of the peering with different timer intervals" +    )      # 3.1 enable peering shutdown      logger.info("Enable shutdown of peering between r2 and r5") -    tgen.net["r2"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.201.2 shutdown"') -    tgen.net["r5"].cmd('vtysh -c "conf t" -c "router bgp 65200" -c "neighbor 192.168.201.1 shutdown"') +    tgen.net["r2"].cmd( +        'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.201.2 shutdown"' +    ) +    tgen.net["r5"].cmd( +        'vtysh -c "conf t" -c "router bgp 65200" -c "neighbor 192.168.201.1 shutdown"' +    )      # 3.2 wait for peers to shut down (pool output)      for router_num in [2, 5]: -        logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num)) +        logger.info( +            "Checking BGP summary after disabling shutdown of peering on r{}".format( +                router_num +            ) +        )          router = tgen.gears["r{}".format(router_num)] -        reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num)) +        reffile = os.path.join( +            CWD, "r{}/bgp_delayopen_summary_shutdown.json".format(router_num) +        )          expected = json.loads(open(reffile).read()) -        test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) +        test_func = functools.partial( +            topotest.router_json_cmp, router, "show ip bgp summary json", expected +        )          _, res = topotest.run_and_expect(test_func, None, count=3, wait=1)          assertmsg = "BGP session on r{} did not shut down peer".format(router_num)          assert res is None, assertmsg      # 3.3 set delayopen on R2 to 60s and on R5 to 30s      logger.info("Setting DelayOpenTime for neighbor r5 to 60 seconds on r2") -    tgen.net["r2"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.201.2 timers delayopen 60"') +    tgen.net["r2"].cmd( +        'vtysh -c "conf t" -c "router bgp 65000" -c "neighbor 192.168.201.2 timers delayopen 60"' +    )      logger.info("Setting DelayOpenTime for neighbor r2 to 30 seconds on r5") -    tgen.net["r5"].cmd('vtysh -c "conf t" -c "router bgp 65200" -c "neighbor 192.168.201.1 timers delayopen 30"') +    tgen.net["r5"].cmd( +        'vtysh -c "conf t" -c "router bgp 65200" -c "neighbor 192.168.201.1 timers delayopen 30"' +    )      # 3.4 check config (poll output)      for router_num in [2, 5]: -        logger.info("Checking BGP neighbor configuration after setting DelayOpenTime on r{}i".format(router_num)) +        logger.info( +            "Checking BGP neighbor configuration after setting DelayOpenTime on r{}i".format( +                router_num +            ) +        )          router = tgen.gears["r{}".format(router_num)] -        reffile = os.path.join(CWD, "r{}/bgp_delayopen_neighbor.json".format(router_num)) +        reffile = os.path.join( +            CWD, "r{}/bgp_delayopen_neighbor.json".format(router_num) +        )          expected = json.loads(open(reffile).read()) -        test_func = functools.partial(topotest.router_json_cmp, router, "show bgp neighbors json", expected) +        test_func = functools.partial( +            topotest.router_json_cmp, router, "show bgp neighbors json", expected +        )          _, res = topotest.run_and_expect(test_func, None, count=3, wait=1)          assertmsg = "BGP session on r{} failed to set DelayOpenTime".format(router_num)          assert res is None, assertmsg      ## 3.5 disable peering shutdown      logger.info("Disable shutdown of peering between r2 and r5") -    tgen.net["r2"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.201.2 shutdown"') -    tgen.net["r5"].cmd('vtysh -c "conf t" -c "router bgp 65200" -c "no neighbor 192.168.201.1 shutdown"') +    tgen.net["r2"].cmd( +        'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.201.2 shutdown"' +    ) +    tgen.net["r5"].cmd( +        'vtysh -c "conf t" -c "router bgp 65200" -c "no neighbor 192.168.201.1 shutdown"' +    )      ## 3.6 wait for peers to reach connect or active state (poll output)      delay_start = int(time.time())      for router_num in [2, 5]: -        logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num)) +        logger.info( +            "Checking BGP summary after disabling shutdown of peering on r{}".format( +                router_num +            ) +        )          router = tgen.gears["r{}".format(router_num)] -        reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_connect.json".format(router_num)) +        reffile = os.path.join( +            CWD, "r{}/bgp_delayopen_summary_connect.json".format(router_num) +        )          expected = json.loads(open(reffile).read()) -        test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) +        test_func = functools.partial( +            topotest.router_json_cmp, router, "show ip bgp summary json", expected +        )          _, res = topotest.run_and_expect(test_func, None, count=3, wait=1) -        assertmsg = "BGP session on r{} did not enter Connect state with peer".format(router_num) +        assertmsg = "BGP session on r{} did not enter Connect state with peer".format( +            router_num +        )          assert res is None, assertmsg      ## 3.7 wait for peers to establish connection (poll output)      for router_num in [2, 5]: -        logger.info("Checking BGP summary after disabling shutdown of peering on r{}".format(router_num)) +        logger.info( +            "Checking BGP summary after disabling shutdown of peering on r{}".format( +                router_num +            ) +        )          router = tgen.gears["r{}".format(router_num)] -        reffile = os.path.join(CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num)) +        reffile = os.path.join( +            CWD, "r{}/bgp_delayopen_summary_established.json".format(router_num) +        )          expected = json.loads(open(reffile).read()) -        test_func = functools.partial(topotest.router_json_cmp, router, "show ip bgp summary json", expected) +        test_func = functools.partial( +            topotest.router_json_cmp, router, "show ip bgp summary json", expected +        )          _, res = topotest.run_and_expect(test_func, None, count=35, wait=1) -        assertmsg = "BGP session on r{} did not establish a connection with peer".format(router_num) +        assertmsg = ( +            "BGP session on r{} did not establish a connection with peer".format( +                router_num +            ) +        )          assert res is None, assertmsg      delay_stop = int(time.time()) @@ -939,18 +1063,32 @@ def test_bgp_delayopen_dual():      # 3.8 unset delayopen on R2 and R5      logger.info("Disabling DelayOpenTimer for neighbor r5 on r2") -    tgen.net["r2"].cmd('vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.201.2 timers delayopen"') +    tgen.net["r2"].cmd( +        'vtysh -c "conf t" -c "router bgp 65000" -c "no neighbor 192.168.201.2 timers delayopen"' +    )      logger.info("Disabling DelayOpenTimer for neighbor r2 on r5") -    tgen.net["r5"].cmd('vtysh -c "conf t" -c "router bgp 65200" -c "no neighbor 192.168.201.1 timers delayopen"') +    tgen.net["r5"].cmd( +        'vtysh -c "conf t" -c "router bgp 65200" -c "no neighbor 192.168.201.1 timers delayopen"' +    )      # 3.9 check config (poll output)      for router_num in [2, 5]: -        logger.info("Checking BGP neighbor configuration after disabling DelayOpenTimer on r{}".format(router_num)) -        delayopen_cfg = tgen.net["r{}".format(router_num)].cmd('vtysh -c "show bgp neighbors json" | grep "DelayOpenTimeMsecs"').rstrip() -        assertmsg = "BGP session on r{} failed disable DelayOpenTimer".format(router_num) +        logger.info( +            "Checking BGP neighbor configuration after disabling DelayOpenTimer on r{}".format( +                router_num +            ) +        ) +        delayopen_cfg = ( +            tgen.net["r{}".format(router_num)] +            .cmd('vtysh -c "show bgp neighbors json" | grep "DelayOpenTimeMsecs"') +            .rstrip() +        ) +        assertmsg = "BGP session on r{} failed disable DelayOpenTimer".format( +            router_num +        )          assert delayopen_cfg == "", assertmsg -    #tgen.mininet_cli() +    # tgen.mininet_cli()      # end test_bgp_delayopen_dual diff --git a/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py b/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py index 097b654e77..94fd17e012 100644 --- a/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py +++ b/tests/topotests/bgp_gr_functionality_topo1/test_bgp_gr_functionality_topo1.py @@ -316,7 +316,9 @@ def test_BGP_GR_TC_46_p1(request):      input_dict = {          "r1": {              "bgp": { -                "graceful-restart": {"graceful-restart": True,}, +                "graceful-restart": { +                    "graceful-restart": True, +                },                  "address_family": {                      "ipv4": {                          "unicast": { @@ -1184,8 +1186,8 @@ def test_BGP_GR_TC_53_p1(request):  def test_BGP_GR_TC_4_p0(request):      """ -     Test Objective : Verify that the restarting node sets "R" bit while sending the -     BGP open messages after the node restart, only if GR is enabled. +    Test Objective : Verify that the restarting node sets "R" bit while sending the +    BGP open messages after the node restart, only if GR is enabled.      """      tgen = get_topogen() @@ -1368,9 +1370,9 @@ def test_BGP_GR_TC_4_p0(request):  def test_BGP_GR_TC_5_1_2_p1(request):      """ -     Test Objective : Verify if restarting node resets R bit in BGP open message -     during normal BGP session flaps as well, even when GR restarting mode is enabled. -     Here link flap happen due to interface UP/DOWN. +    Test Objective : Verify if restarting node resets R bit in BGP open message +    during normal BGP session flaps as well, even when GR restarting mode is enabled. +    Here link flap happen due to interface UP/DOWN.      """      tgen = get_topogen() @@ -1815,8 +1817,8 @@ def test_BGP_GR_TC_6_1_2_p1(request):  def test_BGP_GR_TC_8_p1(request):      """ -     Test Objective : Verify that restarting nodes set "F" bit while sending -      the BGP open messages after it restarts, only when BGP GR is enabled. +    Test Objective : Verify that restarting nodes set "F" bit while sending +     the BGP open messages after it restarts, only when BGP GR is enabled.      """      tgen = get_topogen() @@ -1959,8 +1961,8 @@ def test_BGP_GR_TC_8_p1(request):  def test_BGP_GR_TC_17_p1(request):      """ -      Test Objective : Verify that only GR helper routers keep the stale -       route entries, not any GR disabled router. +    Test Objective : Verify that only GR helper routers keep the stale +     route entries, not any GR disabled router.      """      tgen = get_topogen() @@ -2145,8 +2147,8 @@ def test_BGP_GR_TC_17_p1(request):  def test_BGP_GR_TC_19_p1(request):      """ -      Test Objective : Verify that GR helper routers keeps all the routes received -      from restarting node if both the routers are configured as GR restarting node. +    Test Objective : Verify that GR helper routers keeps all the routes received +    from restarting node if both the routers are configured as GR restarting node.      """      tgen = get_topogen() @@ -2325,8 +2327,8 @@ def test_BGP_GR_TC_19_p1(request):  def test_BGP_GR_TC_20_p1(request):      """ -      Test Objective : Verify that GR helper routers delete all the routes -       received from a node if both the routers are configured as GR helper node. +    Test Objective : Verify that GR helper routers delete all the routes +     received from a node if both the routers are configured as GR helper node.      """      tgen = get_topogen()      tc_name = request.node.name @@ -3090,8 +3092,8 @@ def test_BGP_GR_TC_31_2_p1(request):  def test_BGP_GR_TC_9_p1(request):      """ -      Test Objective : Verify that restarting nodes reset "F" bit while sending -      the BGP open messages after it's restarts, when BGP GR is **NOT** enabled. +    Test Objective : Verify that restarting nodes reset "F" bit while sending +    the BGP open messages after it's restarts, when BGP GR is **NOT** enabled.      """      tgen = get_topogen() @@ -3264,8 +3266,8 @@ def test_BGP_GR_TC_9_p1(request):  def test_BGP_GR_TC_17_p1(request):      """ -      Test Objective : Verify that only GR helper routers keep the stale -       route entries, not any GR disabled router. +    Test Objective : Verify that only GR helper routers keep the stale +     route entries, not any GR disabled router.      """      tgen = get_topogen() @@ -3467,7 +3469,13 @@ def test_BGP_GR_TC_43_p1(request):      step("Configure R1 and R2 as GR restarting node in global level")      input_dict = { -        "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}, +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart": True, +                } +            } +        },          "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},      } @@ -3560,7 +3568,13 @@ def test_BGP_GR_TC_43_p1(request):      step("Verify on R2 that R1 doesn't advertise any GR capabilities")      input_dict = { -        "r1": {"bgp": {"graceful-restart": {"graceful-restart-disable": True,}}}, +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart-disable": True, +                } +            } +        },          "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},      } @@ -3659,7 +3673,13 @@ def test_BGP_GR_TC_43_p1(request):      step("Verify on R2 that R1 advertises GR capabilities as a restarting node")      input_dict = { -        "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}, +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart": True, +                } +            } +        },          "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},      } @@ -3779,7 +3799,13 @@ def test_BGP_GR_TC_44_p1(request):      step("Verify on R2 that R1 advertises GR capabilities as a helper node")      input_dict = { -        "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True,}}}, +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart-helper": True, +                } +            } +        },          "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},      } @@ -3849,7 +3875,13 @@ def test_BGP_GR_TC_44_p1(request):      start_router_daemons(tgen, "r2", ["bgpd"])      input_dict = { -        "r1": {"bgp": {"graceful-restart": {"graceful-restart-disable": True,}}} +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart-disable": True, +                } +            } +        }      }      configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") @@ -3857,7 +3889,13 @@ def test_BGP_GR_TC_44_p1(request):      step("Verify on R2 that R1 doesn't advertise any GR capabilities")      input_dict = { -        "r1": {"bgp": {"graceful-restart": {"graceful-restart-disable": True,}}}, +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart-disable": True, +                } +            } +        },          "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},      } @@ -3941,7 +3979,13 @@ def test_BGP_GR_TC_44_p1(request):      step("Verify on R2 that R1 advertises GR capabilities as a helper node")      input_dict = { -        "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True,}}}, +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart-helper": True, +                } +            } +        },          "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},      } @@ -4108,14 +4152,28 @@ def test_BGP_GR_TC_45_p1(request):      start_router_daemons(tgen, "r1", ["bgpd"]) -    input_dict = {"r1": {"bgp": {"graceful-restart": {"graceful-restart": False,}}}} +    input_dict = { +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart": False, +                } +            } +        } +    }      configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")      step("Verify on R2 that R1 advertises GR capabilities as a helper node")      input_dict = { -        "r1": {"bgp": {"graceful-restart": {"graceful-restart-helper": True,}}}, +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart-helper": True, +                } +            } +        },          "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},      } @@ -4199,14 +4257,28 @@ def test_BGP_GR_TC_45_p1(request):      start_router_daemons(tgen, "r2", ["bgpd"]) -    input_dict = {"r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}} +    input_dict = { +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart": True, +                } +            } +        } +    }      configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2")      step("Verify on R2 that R1 advertises GR capabilities as a restarting node")      input_dict = { -        "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}, +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart": True, +                } +            } +        },          "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},      } @@ -4307,7 +4379,9 @@ def test_BGP_GR_TC_46_p1(request):      input_dict = {          "r1": {              "bgp": { -                "graceful-restart": {"graceful-restart": True,}, +                "graceful-restart": { +                    "graceful-restart": True, +                },                  "address_family": {                      "ipv4": {                          "unicast": { @@ -4559,7 +4633,9 @@ def test_BGP_GR_TC_47_p1(request):      input_dict = {          "r1": {              "bgp": { -                "graceful-restart": {"graceful-restart": True,}, +                "graceful-restart": { +                    "graceful-restart": True, +                },                  "address_family": {                      "ipv4": {                          "unicast": { @@ -4698,7 +4774,13 @@ def test_BGP_GR_TC_47_p1(request):      step("Verify on R2 that R1 still advertises GR capabilities as a restarting node")      input_dict = { -        "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}, +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart": True, +                } +            } +        },          "r2": {"bgp": {"graceful-restart": {"graceful-restart": True}}},      } @@ -4814,7 +4896,9 @@ def test_BGP_GR_TC_48_p1(request):      input_dict = {          "r1": {              "bgp": { -                "graceful-restart": {"graceful-restart": True,}, +                "graceful-restart": { +                    "graceful-restart": True, +                },                  "address_family": {                      "ipv4": {                          "unicast": { @@ -4960,7 +5044,13 @@ def test_BGP_GR_TC_48_p1(request):      step("Verify on R2 that R1 advertises GR capabilities as a restarting node")      input_dict = { -        "r1": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}, +        "r1": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart": True, +                } +            } +        },          "r2": {"bgp": {"graceful-restart": {"graceful-restart-helper": True}}},      } diff --git a/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py b/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py index 6926121a6b..2ddeab13f6 100644 --- a/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py +++ b/tests/topotests/bgp_gr_functionality_topo2/test_bgp_gr_functionality_topo2.py @@ -456,7 +456,9 @@ def test_BGP_GR_TC_3_p0(request):      input_dict = {          "r1": {              "bgp": { -                "graceful-restart": {"disable-eor": True,}, +                "graceful-restart": { +                    "disable-eor": True, +                },                  "address_family": {                      "ipv4": {                          "unicast": { @@ -2095,7 +2097,10 @@ def test_BGP_GR_chaos_33_p1(request):                      "ipv4": {                          "unicast": {                              "advertise_networks": [ -                                {"network": "200.0.20.1/32", "no_of_network": 2,} +                                { +                                    "network": "200.0.20.1/32", +                                    "no_of_network": 2, +                                }                              ]                          }                      }, @@ -2207,13 +2212,13 @@ def test_BGP_GR_chaos_33_p1(request):              else:                  next_hop_6 = NEXT_HOP_6[1] -            result = verify_rib(tgen, addr_type, dut, input_dict_2, next_hop_6, -                                expected=False) -            assert result is not True,\ -                "Testcase {} :Failed \n Error {}". \ -                     format(tc_name, result) -            logger.info(" Expected behavior: {}".\ -                format(result)) +            result = verify_rib( +                tgen, addr_type, dut, input_dict_2, next_hop_6, expected=False +            ) +            assert result is not True, "Testcase {} :Failed \n Error {}".format( +                tc_name, result +            ) +            logger.info(" Expected behavior: {}".format(result))      logger.info("[Step 4] : Start BGPd daemon on R1 and R4..") @@ -3960,7 +3965,13 @@ def test_BGP_GR_21_p2(request):                  }              }          }, -        "r2": {"bgp": {"graceful-restart": {"graceful-restart": True,}}}, +        "r2": { +            "bgp": { +                "graceful-restart": { +                    "graceful-restart": True, +                } +            } +        },      }      configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r1", peer="r2") diff --git a/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py b/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py index 9c0355a3e9..c2858a4bd0 100644 --- a/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py +++ b/tests/topotests/bgp_large_community/test_bgp_large_community_topo_2.py @@ -2132,7 +2132,11 @@ def test_large_community_lists_with_rmap_match_regex(request):                      {                          "action": "permit",                          "seq_id": "10", -                        "match": {"large_community_list": {"id": "ALL",},}, +                        "match": { +                            "large_community_list": { +                                "id": "ALL", +                            }, +                        },                      }                  ]              } @@ -2208,7 +2212,11 @@ def test_large_community_lists_with_rmap_match_regex(request):                      {                          "action": "permit",                          "seq_id": "20", -                        "match": {"large_community_list": {"id": "EXP_ALL",},}, +                        "match": { +                            "large_community_list": { +                                "id": "EXP_ALL", +                            }, +                        },                      }                  ]              } diff --git a/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py b/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py index 9703cf8d57..d34446e2ee 100644 --- a/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py +++ b/tests/topotests/bgp_multi_vrf_topo2/test_bgp_multi_vrf_topo2.py @@ -95,7 +95,7 @@ from lib.common_config import (      kill_router_daemons,      start_router_daemons,      stop_router, -    start_router +    start_router,  )  from lib.topolog import logger @@ -129,7 +129,7 @@ LOOPBACK_2 = {      "ipv4": "20.20.20.20/32",      "ipv6": "20::20:20/128",      "ipv4_mask": "255.255.255.255", -    "ipv6_mask": None +    "ipv6_mask": None,  }  MAX_PATHS = 2 @@ -724,16 +724,40 @@ def test_vrf_with_multiple_links_p1(request):                      "local_as": "200",                      "vrf": "RED_A",                      "address_family": { -                        "ipv4": {"unicast": {"maximum_paths": {"ebgp": MAX_PATHS,}}}, -                        "ipv6": {"unicast": {"maximum_paths": {"ebgp": MAX_PATHS,}}}, +                        "ipv4": { +                            "unicast": { +                                "maximum_paths": { +                                    "ebgp": MAX_PATHS, +                                } +                            } +                        }, +                        "ipv6": { +                            "unicast": { +                                "maximum_paths": { +                                    "ebgp": MAX_PATHS, +                                } +                            } +                        },                      },                  },                  {                      "local_as": "200",                      "vrf": "BLUE_A",                      "address_family": { -                        "ipv4": {"unicast": {"maximum_paths": {"ebgp": MAX_PATHS,}}}, -                        "ipv6": {"unicast": {"maximum_paths": {"ebgp": MAX_PATHS,}}}, +                        "ipv4": { +                            "unicast": { +                                "maximum_paths": { +                                    "ebgp": MAX_PATHS, +                                } +                            } +                        }, +                        "ipv6": { +                            "unicast": { +                                "maximum_paths": { +                                    "ebgp": MAX_PATHS, +                                } +                            } +                        },                      },                  },              ] @@ -2148,7 +2172,7 @@ def test_restart_bgpd_daemon_p1(request):      assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)      result = verify_bgp_convergence(tgen, topo) -    assert result is True, "Testcase () :Failed\n Error {}". format(tc_name, result) +    assert result is True, "Testcase () :Failed\n Error {}".format(tc_name, result)      step("Kill BGPd daemon on R1.")      kill_router_daemons(tgen, "r1", ["bgpd"]) diff --git a/tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py b/tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py index 6d7131e1e5..ceac84709b 100644 --- a/tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py +++ b/tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py @@ -101,7 +101,11 @@ def test_r1_receive_and_advertise_prefix_sid_type1():              "prefix": prefix,              "advertisedTo": {"10.0.0.101": {}, "10.0.0.102": {}},              "paths": [ -                {"valid": True, "remoteLabel": remoteLabel, "labelIndex": labelIndex,} +                { +                    "valid": True, +                    "remoteLabel": remoteLabel, +                    "labelIndex": labelIndex, +                }              ],          }          return topotest.json_cmp(output, expected) diff --git a/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py b/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py index 3af944473d..3f9009967d 100644 --- a/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py +++ b/tests/topotests/bgp_recursive_route_ebgp_multi_hop/test_bgp_recursive_route_ebgp_multi_hop.py @@ -1042,9 +1042,10 @@ def test_next_hop_with_recursive_lookup_p1(request):              next_hop=next_hop,              expected=False,          ) -        assert result is not True, ( -            "Testcase {} : Failed \n " -            "Route is still present \n Error : {}".format(tc_name, result) +        assert ( +            result is not True +        ), "Testcase {} : Failed \n " "Route is still present \n Error : {}".format( +            tc_name, result          )      step("Re-apply redistribution on R4.") @@ -1125,9 +1126,10 @@ def test_next_hop_with_recursive_lookup_p1(request):              next_hop=next_hop,              expected=False,          ) -        assert result is not True, ( -            "Testcase {} : Failed \n " -            "Route is still present \n Error : {}".format(tc_name, result) +        assert ( +            result is not True +        ), "Testcase {} : Failed \n " "Route is still present \n Error : {}".format( +            tc_name, result          )      shutdown_bringup_interface(tgen, "r3", intf_r3_r4, True) @@ -1182,9 +1184,10 @@ def test_next_hop_with_recursive_lookup_p1(request):              next_hop=next_hop,              expected=False,          ) -        assert result is not True, ( -            "Testcase {} : Failed \n " -            "Route is still present \n Error : {}".format(tc_name, result) +        assert ( +            result is not True +        ), "Testcase {} : Failed \n " "Route is still present \n Error : {}".format( +            tc_name, result          )      shutdown_bringup_interface(tgen, "r4", intf_r4_r3, True) @@ -2101,8 +2104,20 @@ def test_BGP_active_standby_preemption_and_ecmp_p1(request):          "r4": {              "bgp": {                  "address_family": { -                    "ipv4": {"unicast": {"maximum_paths": {"ebgp": 1,}}}, -                    "ipv6": {"unicast": {"maximum_paths": {"ebgp": 1,}}}, +                    "ipv4": { +                        "unicast": { +                            "maximum_paths": { +                                "ebgp": 1, +                            } +                        } +                    }, +                    "ipv6": { +                        "unicast": { +                            "maximum_paths": { +                                "ebgp": 1, +                            } +                        } +                    },                  }              }          } @@ -2131,8 +2146,20 @@ def test_BGP_active_standby_preemption_and_ecmp_p1(request):          "r4": {              "bgp": {                  "address_family": { -                    "ipv4": {"unicast": {"maximum_paths": {"ebgp": 2,}}}, -                    "ipv6": {"unicast": {"maximum_paths": {"ebgp": 2,}}}, +                    "ipv4": { +                        "unicast": { +                            "maximum_paths": { +                                "ebgp": 2, +                            } +                        } +                    }, +                    "ipv6": { +                        "unicast": { +                            "maximum_paths": { +                                "ebgp": 2, +                            } +                        } +                    },                  }              }          } diff --git a/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py b/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py index 0fabd90341..0467bf1bfb 100644 --- a/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py +++ b/tests/topotests/bgp_route_aggregation/test_bgp_aggregation.py @@ -415,9 +415,10 @@ def test_route_summarisation_with_summary_only_p1(request):          result = verify_rib(              tgen, addr_type, "r3", input_static, protocol="bgp", expected=False          ) -        assert result is not True, ( -            "Testcase  : Failed \n " -            "Routes are still present \n Error: {}".format(tc_name, result) +        assert ( +            result is not True +        ), "Testcase  : Failed \n " "Routes are still present \n Error: {}".format( +            tc_name, result          )          result = verify_rib(tgen, addr_type, "r1", input_static_agg, protocol="bgp") @@ -614,7 +615,9 @@ def test_route_summarisation_with_summary_only_p1(request):                          addr_type: {                              "unicast": {                                  "advertise_networks": [ -                                    {"network": NETWORK_4_1[addr_type],} +                                    { +                                        "network": NETWORK_4_1[addr_type], +                                    }                                  ]                              }                          } @@ -1014,7 +1017,11 @@ def test_route_summarisation_with_as_set_p1(request):          assert result is True, "Testcase  : Failed \n Error: {}".format(tc_name, result)      for addr_type in ADDR_TYPES: -        for pfx, seq_id, network, in zip([6, 7], [60, 70], [NETWORK_3_1, NETWORK_4_1]): +        for ( +            pfx, +            seq_id, +            network, +        ) in zip([6, 7], [60, 70], [NETWORK_3_1, NETWORK_4_1]):              prefix_list = {                  "r1": {                      "prefix_lists": { diff --git a/tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py b/tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py index cf8be5f44f..c75055c26f 100644 --- a/tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py +++ b/tests/topotests/bgp_suppress_fib/test_bgp_suppress_fib.py @@ -56,6 +56,7 @@ class TemplateTopo(Topo):          switch.add_link(tgen.gears["r2"])          switch.add_link(tgen.gears["r3"]) +  def setup_module(mod):      tgen = Topogen(TemplateTopo, mod.__name__)      tgen.start_topology() @@ -114,6 +115,7 @@ def test_bgp_route():      assertmsg = '"r3" JSON output mismatches'      assert result is None, assertmsg +  if __name__ == "__main__":      args = ["-s"] + sys.argv[1:]      sys.exit(pytest.main(args)) diff --git a/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py b/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py index 9106c163cd..6e7495d929 100644 --- a/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py +++ b/tests/topotests/bgp_vrf_dynamic_route_leak/test_bgp_vrf_dynamic_route_leak_topo2.py @@ -943,9 +943,10 @@ def test_modify_route_map_match_set_clauses_p1(request):          }          result = verify_bgp_rib(tgen, addr_type, "r1", input_routes_r1, expected=False) -        assert result is not True, ( -            "Testcase {} : Failed \n Error : Routes are still " -            "present {}".format(tc_name, result) +        assert ( +            result is not True +        ), "Testcase {} : Failed \n Error : Routes are still " "present {}".format( +            tc_name, result          )      write_test_footer(tc_name) diff --git a/tests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py b/tests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py index 67edcae90e..6f80ffd1aa 100755 --- a/tests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py +++ b/tests/topotests/isis-lfa-topo1/test_isis_lfa_topo1.py @@ -62,7 +62,7 @@ from functools import partial  # Save the Current Working Directory to find configuration files.  CWD = os.path.dirname(os.path.realpath(__file__)) -sys.path.append(os.path.join(CWD, '../')) +sys.path.append(os.path.join(CWD, "../"))  # pylint: disable=C0413  # Import topogen and topotest helpers @@ -76,8 +76,10 @@ from mininet.topo import Topo  # Global multi-dimensional dictionary containing all expected outputs  outputs = {} +  class TemplateTopo(Topo):      "Test topology builder" +      def build(self, *_args, **_opts):          "Build function"          tgen = get_topogen(self) @@ -85,59 +87,58 @@ class TemplateTopo(Topo):          #          # Define FRR Routers          # -        for router in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'rt7']: +        for router in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "rt7"]:              tgen.add_router(router)          #          # Define connections          # -        switch = tgen.add_switch('s1') -        switch.add_link(tgen.gears['rt1'], nodeif="eth-rt2") -        switch.add_link(tgen.gears['rt2'], nodeif="eth-rt1") -        switch = tgen.add_switch('s2') -        switch.add_link(tgen.gears['rt2'], nodeif="eth-rt3") -        switch.add_link(tgen.gears['rt3'], nodeif="eth-rt2") -        switch = tgen.add_switch('s3') -        switch.add_link(tgen.gears['rt1'], nodeif="eth-rt3") -        switch.add_link(tgen.gears['rt3'], nodeif="eth-rt1") -        switch = tgen.add_switch('s4') -        switch.add_link(tgen.gears['rt1'], nodeif="eth-rt4") -        switch.add_link(tgen.gears['rt4'], nodeif="eth-rt1") -        switch = tgen.add_switch('s5') -        switch.add_link(tgen.gears['rt1'], nodeif="eth-rt5") -        switch.add_link(tgen.gears['rt5'], nodeif="eth-rt1") -        switch = tgen.add_switch('s6') -        switch.add_link(tgen.gears['rt1'], nodeif="eth-rt6") -        switch.add_link(tgen.gears['rt6'], nodeif="eth-rt1") -        switch = tgen.add_switch('s7') -        switch.add_link(tgen.gears['rt2'], nodeif="eth-rt7") -        switch.add_link(tgen.gears['rt7'], nodeif="eth-rt2") -        switch = tgen.add_switch('s8') -        switch.add_link(tgen.gears['rt3'], nodeif="eth-rt7") -        switch.add_link(tgen.gears['rt7'], nodeif="eth-rt3") -        switch = tgen.add_switch('s9') -        switch.add_link(tgen.gears['rt4'], nodeif="eth-rt7") -        switch.add_link(tgen.gears['rt7'], nodeif="eth-rt4") -        switch = tgen.add_switch('s10') -        switch.add_link(tgen.gears['rt5'], nodeif="eth-rt7") -        switch.add_link(tgen.gears['rt7'], nodeif="eth-rt5") -        switch = tgen.add_switch('s11') -        switch.add_link(tgen.gears['rt6'], nodeif="eth-rt7") -        switch.add_link(tgen.gears['rt7'], nodeif="eth-rt6") +        switch = tgen.add_switch("s1") +        switch.add_link(tgen.gears["rt1"], nodeif="eth-rt2") +        switch.add_link(tgen.gears["rt2"], nodeif="eth-rt1") +        switch = tgen.add_switch("s2") +        switch.add_link(tgen.gears["rt2"], nodeif="eth-rt3") +        switch.add_link(tgen.gears["rt3"], nodeif="eth-rt2") +        switch = tgen.add_switch("s3") +        switch.add_link(tgen.gears["rt1"], nodeif="eth-rt3") +        switch.add_link(tgen.gears["rt3"], nodeif="eth-rt1") +        switch = tgen.add_switch("s4") +        switch.add_link(tgen.gears["rt1"], nodeif="eth-rt4") +        switch.add_link(tgen.gears["rt4"], nodeif="eth-rt1") +        switch = tgen.add_switch("s5") +        switch.add_link(tgen.gears["rt1"], nodeif="eth-rt5") +        switch.add_link(tgen.gears["rt5"], nodeif="eth-rt1") +        switch = tgen.add_switch("s6") +        switch.add_link(tgen.gears["rt1"], nodeif="eth-rt6") +        switch.add_link(tgen.gears["rt6"], nodeif="eth-rt1") +        switch = tgen.add_switch("s7") +        switch.add_link(tgen.gears["rt2"], nodeif="eth-rt7") +        switch.add_link(tgen.gears["rt7"], nodeif="eth-rt2") +        switch = tgen.add_switch("s8") +        switch.add_link(tgen.gears["rt3"], nodeif="eth-rt7") +        switch.add_link(tgen.gears["rt7"], nodeif="eth-rt3") +        switch = tgen.add_switch("s9") +        switch.add_link(tgen.gears["rt4"], nodeif="eth-rt7") +        switch.add_link(tgen.gears["rt7"], nodeif="eth-rt4") +        switch = tgen.add_switch("s10") +        switch.add_link(tgen.gears["rt5"], nodeif="eth-rt7") +        switch.add_link(tgen.gears["rt7"], nodeif="eth-rt5") +        switch = tgen.add_switch("s11") +        switch.add_link(tgen.gears["rt6"], nodeif="eth-rt7") +        switch.add_link(tgen.gears["rt7"], nodeif="eth-rt6")          #          # Populate multi-dimensional dictionary containing all expected outputs          # -        files = ["show_ipv6_route.ref", -                 "show_yang_interface_isis_adjacencies.ref"] -        for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'rt7']: +        files = ["show_ipv6_route.ref", "show_yang_interface_isis_adjacencies.ref"] +        for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "rt7"]:              outputs[rname] = {}              for step in range(1, 13 + 1):                  outputs[rname][step] = {}                  for file in files:                      if step == 1:                          # Get snapshots relative to the expected initial network convergence -                        filename = '{}/{}/step{}/{}'.format(CWD, rname, step, file) +                        filename = "{}/{}/step{}/{}".format(CWD, rname, step, file)                          outputs[rname][step][file] = open(filename).read()                      else:                          if rname != "rt1": @@ -146,20 +147,23 @@ class TemplateTopo(Topo):                              continue                          # Get diff relative to the previous step -                        filename = '{}/{}/step{}/{}.diff'.format(CWD, rname, step, file) +                        filename = "{}/{}/step{}/{}.diff".format(CWD, rname, step, file)                          # Create temporary files in order to apply the diff                          f_in = tempfile.NamedTemporaryFile()                          f_in.write(outputs[rname][step - 1][file])                          f_in.flush()                          f_out = tempfile.NamedTemporaryFile() -                        os.system("patch -s -o %s %s %s" %(f_out.name, f_in.name, filename)) +                        os.system( +                            "patch -s -o %s %s %s" % (f_out.name, f_in.name, filename) +                        )                          # Store the updated snapshot and remove the temporary files                          outputs[rname][step][file] = open(f_out.name).read()                          f_in.close()                          f_out.close() +  def setup_module(mod):      "Sets up the pytest environment"      tgen = Topogen(TemplateTopo, mod.__name__) @@ -170,16 +174,15 @@ def setup_module(mod):      # For all registered routers, load the zebra configuration file      for rname, router in router_list.iteritems():          router.load_config( -            TopoRouter.RD_ZEBRA, -            os.path.join(CWD, '{}/zebra.conf'.format(rname)) +            TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))          )          router.load_config( -            TopoRouter.RD_ISIS, -            os.path.join(CWD, '{}/isisd.conf'.format(rname)) +            TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))          )      tgen.start_router() +  def teardown_module(mod):      "Teardown the pytest environment"      tgen = get_topogen() @@ -187,6 +190,7 @@ def teardown_module(mod):      # This function tears down the whole topology.      tgen.stop_topology() +  def router_compare_json_output(rname, command, reference):      "Compare router JSON output" @@ -196,12 +200,12 @@ def router_compare_json_output(rname, command, reference):      expected = json.loads(reference)      # Run test function until we get an result. Wait at most 60 seconds. -    test_func = partial(topotest.router_json_cmp, -        tgen.gears[rname], command, expected) +    test_func = partial(topotest.router_json_cmp, tgen.gears[rname], command, expected)      _, diff = topotest.run_and_expect(test_func, None, count=120, wait=0.5)      assertmsg = '"{}" JSON output mismatches the expected result'.format(rname)      assert diff is None, assertmsg +  #  # Step 1  # @@ -215,9 +219,13 @@ def test_isis_adjacencies_step1():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'rt7']: -        router_compare_json_output(rname, "show yang operational-data /frr-interface:lib isisd", -                                   outputs[rname][1]["show_yang_interface_isis_adjacencies.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "rt7"]: +        router_compare_json_output( +            rname, +            "show yang operational-data /frr-interface:lib isisd", +            outputs[rname][1]["show_yang_interface_isis_adjacencies.ref"], +        ) +  def test_rib_ipv6_step1():      logger.info("Test (step 1): verify IPv6 RIB") @@ -227,9 +235,11 @@ def test_rib_ipv6_step1():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6', 'rt7']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][1]["show_ipv6_route.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6", "rt7"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][1]["show_ipv6_route.ref"] +        ) +  #  # Step 2 @@ -248,16 +258,28 @@ def test_rib_ipv6_step2():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Disabling LFA protection on all rt1 interfaces') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt2" -c "no isis fast-reroute lfa"') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt3" -c "no isis fast-reroute lfa"') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt4" -c "no isis fast-reroute lfa"') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt5" -c "no isis fast-reroute lfa"') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt6" -c "no isis fast-reroute lfa"') +    logger.info("Disabling LFA protection on all rt1 interfaces") +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "interface eth-rt2" -c "no isis fast-reroute lfa"' +    ) +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "interface eth-rt3" -c "no isis fast-reroute lfa"' +    ) +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "interface eth-rt4" -c "no isis fast-reroute lfa"' +    ) +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "interface eth-rt5" -c "no isis fast-reroute lfa"' +    ) +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "interface eth-rt6" -c "no isis fast-reroute lfa"' +    ) + +    for rname in ["rt1"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][2]["show_ipv6_route.ref"] +        ) -    for rname in ['rt1']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][2]["show_ipv6_route.ref"])  #  # Step 3 @@ -276,16 +298,28 @@ def test_rib_ipv6_step3():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Re-enabling LFA protection on all rt1 interfaces') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt2" -c "isis fast-reroute lfa"') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt3" -c "isis fast-reroute lfa"') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt4" -c "isis fast-reroute lfa"') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt5" -c "isis fast-reroute lfa"') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt6" -c "isis fast-reroute lfa"') +    logger.info("Re-enabling LFA protection on all rt1 interfaces") +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "interface eth-rt2" -c "isis fast-reroute lfa"' +    ) +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "interface eth-rt3" -c "isis fast-reroute lfa"' +    ) +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "interface eth-rt4" -c "isis fast-reroute lfa"' +    ) +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "interface eth-rt5" -c "isis fast-reroute lfa"' +    ) +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "interface eth-rt6" -c "isis fast-reroute lfa"' +    ) + +    for rname in ["rt1"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][3]["show_ipv6_route.ref"] +        ) -    for rname in ['rt1']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][3]["show_ipv6_route.ref"])  #  # Step 4 @@ -304,12 +338,16 @@ def test_rib_ipv6_step4():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Disabling LFA load-sharing on rt1') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute load-sharing disable"') +    logger.info("Disabling LFA load-sharing on rt1") +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute load-sharing disable"' +    ) + +    for rname in ["rt1"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][4]["show_ipv6_route.ref"] +        ) -    for rname in ['rt1']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][4]["show_ipv6_route.ref"])  #  # Step 5 @@ -328,12 +366,16 @@ def test_rib_ipv6_step5():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Re-enabling LFA load-sharing on rt1') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no fast-reroute load-sharing disable"') +    logger.info("Re-enabling LFA load-sharing on rt1") +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "no fast-reroute load-sharing disable"' +    ) + +    for rname in ["rt1"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][5]["show_ipv6_route.ref"] +        ) -    for rname in ['rt1']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][5]["show_ipv6_route.ref"])  #  # Step 6 @@ -352,12 +394,16 @@ def test_rib_ipv6_step6():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Limiting backup computation to critical priority prefixes only') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute priority-limit critical"') +    logger.info("Limiting backup computation to critical priority prefixes only") +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute priority-limit critical"' +    ) + +    for rname in ["rt1"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][6]["show_ipv6_route.ref"] +        ) -    for rname in ['rt1']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][6]["show_ipv6_route.ref"])  #  # Step 7 @@ -377,13 +423,19 @@ def test_rib_ipv6_step7():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Configuring a prefix priority list') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "spf prefix-priority critical CRITICAL_DESTINATIONS"') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "ipv6 access-list CRITICAL_DESTINATIONS seq 5 permit 2001:db8:1000::7/128"') +    logger.info("Configuring a prefix priority list") +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "spf prefix-priority critical CRITICAL_DESTINATIONS"' +    ) +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "ipv6 access-list CRITICAL_DESTINATIONS seq 5 permit 2001:db8:1000::7/128"' +    ) + +    for rname in ["rt1"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][7]["show_ipv6_route.ref"] +        ) -    for rname in ['rt1']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][7]["show_ipv6_route.ref"])  #  # Step 8 @@ -402,14 +454,22 @@ def test_rib_ipv6_step8():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Reverting previous changes related to prefix priorities') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "no ipv6 access-list CRITICAL_DESTINATIONS seq 5 permit 2001:db8:1000::7/128"') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no fast-reroute priority-limit critical"') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no spf prefix-priority critical CRITICAL_DESTINATIONS"') +    logger.info("Reverting previous changes related to prefix priorities") +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "no ipv6 access-list CRITICAL_DESTINATIONS seq 5 permit 2001:db8:1000::7/128"' +    ) +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "no fast-reroute priority-limit critical"' +    ) +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "no spf prefix-priority critical CRITICAL_DESTINATIONS"' +    ) + +    for rname in ["rt1"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][8]["show_ipv6_route.ref"] +        ) -    for rname in ['rt1']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][8]["show_ipv6_route.ref"])  #  # Step 9 @@ -428,12 +488,16 @@ def test_rib_ipv6_step9():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Excluding eth-rt6 from LFA computation for eth-rt2\'s failure') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt2" -c "isis fast-reroute lfa exclude interface eth-rt6"') +    logger.info("Excluding eth-rt6 from LFA computation for eth-rt2's failure") +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "interface eth-rt2" -c "isis fast-reroute lfa exclude interface eth-rt6"' +    ) + +    for rname in ["rt1"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][9]["show_ipv6_route.ref"] +        ) -    for rname in ['rt1']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][9]["show_ipv6_route.ref"])  #  # Step 10 @@ -452,12 +516,20 @@ def test_rib_ipv6_step10():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Removing exclusion of eth-rt6 from LFA computation for eth-rt2\'s failure') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "interface eth-rt2" -c "no isis fast-reroute lfa exclude interface eth-rt6"') +    logger.info( +        "Removing exclusion of eth-rt6 from LFA computation for eth-rt2's failure" +    ) +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "interface eth-rt2" -c "no isis fast-reroute lfa exclude interface eth-rt6"' +    ) + +    for rname in ["rt1"]: +        router_compare_json_output( +            rname, +            "show ipv6 route isis json", +            outputs[rname][10]["show_ipv6_route.ref"], +        ) -    for rname in ['rt1']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][10]["show_ipv6_route.ref"])  #  # Step 11 @@ -476,12 +548,18 @@ def test_rib_ipv6_step11():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Adding LFA tiebreaker: prefer node protecting backup path') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker node-protecting index 10"') +    logger.info("Adding LFA tiebreaker: prefer node protecting backup path") +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker node-protecting index 10"' +    ) + +    for rname in ["rt1"]: +        router_compare_json_output( +            rname, +            "show ipv6 route isis json", +            outputs[rname][11]["show_ipv6_route.ref"], +        ) -    for rname in ['rt1']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][11]["show_ipv6_route.ref"])  #  # Step 12 @@ -500,12 +578,18 @@ def test_rib_ipv6_step12():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Adding LFA tiebreaker: prefer backup path via downstream node') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker downstream index 20"') +    logger.info("Adding LFA tiebreaker: prefer backup path via downstream node") +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker downstream index 20"' +    ) + +    for rname in ["rt1"]: +        router_compare_json_output( +            rname, +            "show ipv6 route isis json", +            outputs[rname][12]["show_ipv6_route.ref"], +        ) -    for rname in ['rt1']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][12]["show_ipv6_route.ref"])  #  # Step 13 @@ -524,22 +608,29 @@ def test_rib_ipv6_step13():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Adding LFA tiebreaker: prefer backup path with lowest total metric') -    tgen.net['rt1'].cmd('vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker lowest-backup-metric index 30"') +    logger.info("Adding LFA tiebreaker: prefer backup path with lowest total metric") +    tgen.net["rt1"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "fast-reroute lfa tiebreaker lowest-backup-metric index 30"' +    ) + +    for rname in ["rt1"]: +        router_compare_json_output( +            rname, +            "show ipv6 route isis json", +            outputs[rname][13]["show_ipv6_route.ref"], +        ) -    for rname in ['rt1']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][13]["show_ipv6_route.ref"])  # Memory leak test template  def test_memory_leak():      "Run the memory leak test and report results."      tgen = get_topogen()      if not tgen.is_memleak_enabled(): -        pytest.skip('Memory leak test/report is disabled') +        pytest.skip("Memory leak test/report is disabled")      tgen.report_memory_leaks() -if __name__ == '__main__': + +if __name__ == "__main__":      args = ["-s"] + sys.argv[1:]      sys.exit(pytest.main(args)) diff --git a/tests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py b/tests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py index 514ea53552..a1263de8ad 100755 --- a/tests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py +++ b/tests/topotests/isis-tilfa-topo1/test_isis_tilfa_topo1.py @@ -74,7 +74,7 @@ from functools import partial  # Save the Current Working Directory to find configuration files.  CWD = os.path.dirname(os.path.realpath(__file__)) -sys.path.append(os.path.join(CWD, '../')) +sys.path.append(os.path.join(CWD, "../"))  # pylint: disable=C0413  # Import topogen and topotest helpers @@ -88,8 +88,10 @@ from mininet.topo import Topo  # Global multi-dimensional dictionary containing all expected outputs  outputs = {} +  class TemplateTopo(Topo):      "Test topology builder" +      def build(self, *_args, **_opts):          "Build function"          tgen = get_topogen(self) @@ -97,80 +99,85 @@ class TemplateTopo(Topo):          #          # Define FRR Routers          # -        for router in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: +        for router in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:              tgen.add_router(router)          #          # Define connections          # -        switch = tgen.add_switch('s1') -        switch.add_link(tgen.gears['rt1'], nodeif="eth-sw1") -        switch.add_link(tgen.gears['rt2'], nodeif="eth-sw1") -        switch.add_link(tgen.gears['rt3'], nodeif="eth-sw1") +        switch = tgen.add_switch("s1") +        switch.add_link(tgen.gears["rt1"], nodeif="eth-sw1") +        switch.add_link(tgen.gears["rt2"], nodeif="eth-sw1") +        switch.add_link(tgen.gears["rt3"], nodeif="eth-sw1") -        switch = tgen.add_switch('s2') -        switch.add_link(tgen.gears['rt2'], nodeif="eth-rt4-1") -        switch.add_link(tgen.gears['rt4'], nodeif="eth-rt2-1") +        switch = tgen.add_switch("s2") +        switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4-1") +        switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2-1") -        switch = tgen.add_switch('s3') -        switch.add_link(tgen.gears['rt2'], nodeif="eth-rt4-2") -        switch.add_link(tgen.gears['rt4'], nodeif="eth-rt2-2") +        switch = tgen.add_switch("s3") +        switch.add_link(tgen.gears["rt2"], nodeif="eth-rt4-2") +        switch.add_link(tgen.gears["rt4"], nodeif="eth-rt2-2") -        switch = tgen.add_switch('s4') -        switch.add_link(tgen.gears['rt3'], nodeif="eth-rt5-1") -        switch.add_link(tgen.gears['rt5'], nodeif="eth-rt3-1") +        switch = tgen.add_switch("s4") +        switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5-1") +        switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3-1") -        switch = tgen.add_switch('s5') -        switch.add_link(tgen.gears['rt3'], nodeif="eth-rt5-2") -        switch.add_link(tgen.gears['rt5'], nodeif="eth-rt3-2") +        switch = tgen.add_switch("s5") +        switch.add_link(tgen.gears["rt3"], nodeif="eth-rt5-2") +        switch.add_link(tgen.gears["rt5"], nodeif="eth-rt3-2") -        switch = tgen.add_switch('s6') -        switch.add_link(tgen.gears['rt4'], nodeif="eth-rt5") -        switch.add_link(tgen.gears['rt5'], nodeif="eth-rt4") +        switch = tgen.add_switch("s6") +        switch.add_link(tgen.gears["rt4"], nodeif="eth-rt5") +        switch.add_link(tgen.gears["rt5"], nodeif="eth-rt4") -        switch = tgen.add_switch('s7') -        switch.add_link(tgen.gears['rt4'], nodeif="eth-rt6") -        switch.add_link(tgen.gears['rt6'], nodeif="eth-rt4") +        switch = tgen.add_switch("s7") +        switch.add_link(tgen.gears["rt4"], nodeif="eth-rt6") +        switch.add_link(tgen.gears["rt6"], nodeif="eth-rt4") -        switch = tgen.add_switch('s8') -        switch.add_link(tgen.gears['rt5'], nodeif="eth-rt6") -        switch.add_link(tgen.gears['rt6'], nodeif="eth-rt5") +        switch = tgen.add_switch("s8") +        switch.add_link(tgen.gears["rt5"], nodeif="eth-rt6") +        switch.add_link(tgen.gears["rt6"], nodeif="eth-rt5")          #          # Populate multi-dimensional dictionary containing all expected outputs          # -        files = ["show_ip_route.ref", -                 "show_ipv6_route.ref", -                 "show_mpls_table.ref", -                 "show_yang_interface_isis_adjacencies.ref"] -        for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: +        files = [ +            "show_ip_route.ref", +            "show_ipv6_route.ref", +            "show_mpls_table.ref", +            "show_yang_interface_isis_adjacencies.ref", +        ] +        for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]:              outputs[rname] = {}              for step in range(1, 9 + 1):                  outputs[rname][step] = {}                  for file in files:                      if step == 1:                          # Get snapshots relative to the expected initial network convergence -                        filename = '{}/{}/step{}/{}'.format(CWD, rname, step, file) +                        filename = "{}/{}/step{}/{}".format(CWD, rname, step, file)                          outputs[rname][step][file] = open(filename).read()                      else:                          if file == "show_yang_interface_isis_adjacencies.ref":                              continue                          # Get diff relative to the previous step -                        filename = '{}/{}/step{}/{}.diff'.format(CWD, rname, step, file) +                        filename = "{}/{}/step{}/{}.diff".format(CWD, rname, step, file)                          # Create temporary files in order to apply the diff                          f_in = tempfile.NamedTemporaryFile()                          f_in.write(outputs[rname][step - 1][file])                          f_in.flush()                          f_out = tempfile.NamedTemporaryFile() -                        os.system("patch -s -o %s %s %s" %(f_out.name, f_in.name, filename)) +                        os.system( +                            "patch -s -o %s %s %s" % (f_out.name, f_in.name, filename) +                        )                          # Store the updated snapshot and remove the temporary files                          outputs[rname][step][file] = open(f_out.name).read()                          f_in.close()                          f_out.close() +  def setup_module(mod):      "Sets up the pytest environment"      tgen = Topogen(TemplateTopo, mod.__name__) @@ -181,16 +188,15 @@ def setup_module(mod):      # For all registered routers, load the zebra configuration file      for rname, router in router_list.items():          router.load_config( -            TopoRouter.RD_ZEBRA, -            os.path.join(CWD, '{}/zebra.conf'.format(rname)) +            TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))          )          router.load_config( -            TopoRouter.RD_ISIS, -            os.path.join(CWD, '{}/isisd.conf'.format(rname)) +            TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))          )      tgen.start_router() +  def teardown_module(mod):      "Teardown the pytest environment"      tgen = get_topogen() @@ -198,6 +204,7 @@ def teardown_module(mod):      # This function tears down the whole topology.      tgen.stop_topology() +  def router_compare_json_output(rname, command, reference):      "Compare router JSON output" @@ -207,12 +214,12 @@ def router_compare_json_output(rname, command, reference):      expected = json.loads(reference)      # Run test function until we get an result. Wait at most 60 seconds. -    test_func = partial(topotest.router_json_cmp, -        tgen.gears[rname], command, expected) +    test_func = partial(topotest.router_json_cmp, tgen.gears[rname], command, expected)      _, diff = topotest.run_and_expect(test_func, None, count=120, wait=0.5)      assertmsg = '"{}" JSON output mismatches the expected result'.format(rname)      assert diff is None, assertmsg +  #  # Step 1  # @@ -226,9 +233,13 @@ def test_isis_adjacencies_step1():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show yang operational-data /frr-interface:lib isisd", -                                   outputs[rname][1]["show_yang_interface_isis_adjacencies.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, +            "show yang operational-data /frr-interface:lib isisd", +            outputs[rname][1]["show_yang_interface_isis_adjacencies.ref"], +        ) +  def test_rib_ipv4_step1():      logger.info("Test (step 1): verify IPv4 RIB") @@ -238,9 +249,11 @@ def test_rib_ipv4_step1():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ip route isis json", -                                   outputs[rname][1]["show_ip_route.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ip route isis json", outputs[rname][1]["show_ip_route.ref"] +        ) +  def test_rib_ipv6_step1():      logger.info("Test (step 1): verify IPv6 RIB") @@ -250,9 +263,11 @@ def test_rib_ipv6_step1():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][1]["show_ipv6_route.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][1]["show_ipv6_route.ref"] +        ) +  def test_mpls_lib_step1():      logger.info("Test (step 1): verify MPLS LIB") @@ -262,9 +277,11 @@ def test_mpls_lib_step1():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show mpls table json", -                                   outputs[rname][1]["show_mpls_table.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show mpls table json", outputs[rname][1]["show_mpls_table.ref"] +        ) +  #  # Step 2 @@ -283,12 +300,16 @@ def test_rib_ipv4_step2():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Disabling TI-LFA link protection on rt2\'s eth-sw1 interface') -    tgen.net['rt2'].cmd('vtysh -c "conf t" -c "interface eth-sw1" -c "no isis fast-reroute ti-lfa"') +    logger.info("Disabling TI-LFA link protection on rt2's eth-sw1 interface") +    tgen.net["rt2"].cmd( +        'vtysh -c "conf t" -c "interface eth-sw1" -c "no isis fast-reroute ti-lfa"' +    ) + +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ip route isis json", outputs[rname][2]["show_ip_route.ref"] +        ) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ip route isis json", -                                   outputs[rname][2]["show_ip_route.ref"])  def test_rib_ipv6_step2():      logger.info("Test (step 2): verify IPv6 RIB") @@ -298,9 +319,11 @@ def test_rib_ipv6_step2():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][2]["show_ipv6_route.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][2]["show_ipv6_route.ref"] +        ) +  def test_mpls_lib_step2():      logger.info("Test (step 2): verify MPLS LIB") @@ -310,9 +333,11 @@ def test_mpls_lib_step2():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show mpls table json", -                                   outputs[rname][2]["show_mpls_table.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show mpls table json", outputs[rname][2]["show_mpls_table.ref"] +        ) +  #  # Step 3 @@ -331,12 +356,16 @@ def test_rib_ipv4_step3():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Enabling TI-LFA link protection on rt2\'s eth-sw1 interface') -    tgen.net['rt2'].cmd('vtysh -c "conf t" -c "interface eth-sw1" -c "isis fast-reroute ti-lfa"') +    logger.info("Enabling TI-LFA link protection on rt2's eth-sw1 interface") +    tgen.net["rt2"].cmd( +        'vtysh -c "conf t" -c "interface eth-sw1" -c "isis fast-reroute ti-lfa"' +    ) + +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ip route isis json", outputs[rname][3]["show_ip_route.ref"] +        ) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ip route isis json", -                                   outputs[rname][3]["show_ip_route.ref"])  def test_rib_ipv6_step3():      logger.info("Test (step 3): verify IPv6 RIB") @@ -346,9 +375,11 @@ def test_rib_ipv6_step3():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][3]["show_ipv6_route.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][3]["show_ipv6_route.ref"] +        ) +  def test_mpls_lib_step3():      logger.info("Test (step 3): verify MPLS LIB") @@ -358,9 +389,11 @@ def test_mpls_lib_step3():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show mpls table json", -                                   outputs[rname][3]["show_mpls_table.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show mpls table json", outputs[rname][3]["show_mpls_table.ref"] +        ) +  #  # Step 4 @@ -384,12 +417,16 @@ def test_rib_ipv4_step4():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Disabling SR on rt4') -    tgen.net['rt4'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no segment-routing on"') +    logger.info("Disabling SR on rt4") +    tgen.net["rt4"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "no segment-routing on"' +    ) + +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ip route isis json", outputs[rname][4]["show_ip_route.ref"] +        ) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ip route isis json", -                                   outputs[rname][4]["show_ip_route.ref"])  def test_rib_ipv6_step4():      logger.info("Test (step 4): verify IPv6 RIB") @@ -399,9 +436,11 @@ def test_rib_ipv6_step4():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][4]["show_ipv6_route.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][4]["show_ipv6_route.ref"] +        ) +  def test_mpls_lib_step4():      logger.info("Test (step 4): verify MPLS LIB") @@ -411,9 +450,11 @@ def test_mpls_lib_step4():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show mpls table json", -                                   outputs[rname][4]["show_mpls_table.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show mpls table json", outputs[rname][4]["show_mpls_table.ref"] +        ) +  #  # Step 5 @@ -432,12 +473,14 @@ def test_rib_ipv4_step5():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Enabling SR on rt4') -    tgen.net['rt4'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing on"') +    logger.info("Enabling SR on rt4") +    tgen.net["rt4"].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing on"') + +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ip route isis json", outputs[rname][5]["show_ip_route.ref"] +        ) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ip route isis json", -                                   outputs[rname][5]["show_ip_route.ref"])  def test_rib_ipv6_step5():      logger.info("Test (step 5): verify IPv6 RIB") @@ -447,9 +490,11 @@ def test_rib_ipv6_step5():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][5]["show_ipv6_route.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][5]["show_ipv6_route.ref"] +        ) +  def test_mpls_lib_step5():      logger.info("Test (step 5): verify MPLS LIB") @@ -459,9 +504,11 @@ def test_mpls_lib_step5():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show mpls table json", -                                   outputs[rname][5]["show_mpls_table.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show mpls table json", outputs[rname][5]["show_mpls_table.ref"] +        ) +  #  # Step 6 @@ -480,12 +527,16 @@ def test_rib_ipv4_step6():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Changing rt5\'s SRGB') -    tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing global-block 30000 37999"') +    logger.info("Changing rt5's SRGB") +    tgen.net["rt5"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "segment-routing global-block 30000 37999"' +    ) + +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ip route isis json", outputs[rname][6]["show_ip_route.ref"] +        ) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ip route isis json", -                                   outputs[rname][6]["show_ip_route.ref"])  def test_rib_ipv6_step6():      logger.info("Test (step 6): verify IPv6 RIB") @@ -495,9 +546,11 @@ def test_rib_ipv6_step6():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][6]["show_ipv6_route.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][6]["show_ipv6_route.ref"] +        ) +  def test_mpls_lib_step6():      logger.info("Test (step 6): verify MPLS LIB") @@ -507,9 +560,11 @@ def test_mpls_lib_step6():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show mpls table json", -                                   outputs[rname][6]["show_mpls_table.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show mpls table json", outputs[rname][6]["show_mpls_table.ref"] +        ) +  #  # Step 7 @@ -529,13 +584,19 @@ def test_rib_ipv4_step7():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Deleting rt5\'s Prefix-SIDs') -    tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no segment-routing prefix 5.5.5.5/32 index 50"') -    tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "no segment-routing prefix 2001:db8:1000::5/128 index 51"') +    logger.info("Deleting rt5's Prefix-SIDs") +    tgen.net["rt5"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "no segment-routing prefix 5.5.5.5/32 index 50"' +    ) +    tgen.net["rt5"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "no segment-routing prefix 2001:db8:1000::5/128 index 51"' +    ) + +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ip route isis json", outputs[rname][7]["show_ip_route.ref"] +        ) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ip route isis json", -                                   outputs[rname][7]["show_ip_route.ref"])  def test_rib_ipv6_step7():      logger.info("Test (step 7): verify IPv6 RIB") @@ -545,9 +606,11 @@ def test_rib_ipv6_step7():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][7]["show_ipv6_route.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][7]["show_ipv6_route.ref"] +        ) +  def test_mpls_lib_step7():      logger.info("Test (step 7): verify MPLS LIB") @@ -557,9 +620,11 @@ def test_mpls_lib_step7():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show mpls table json", -                                   outputs[rname][7]["show_mpls_table.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show mpls table json", outputs[rname][7]["show_mpls_table.ref"] +        ) +  #  # Step 8 @@ -578,13 +643,19 @@ def test_rib_ipv4_step8():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Re-adding rt5\'s Prefix-SIDs') -    tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 5.5.5.5/32 index 50"') -    tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 2001:db8:1000::5/128 index 51"') +    logger.info("Re-adding rt5's Prefix-SIDs") +    tgen.net["rt5"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 5.5.5.5/32 index 50"' +    ) +    tgen.net["rt5"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 2001:db8:1000::5/128 index 51"' +    ) + +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ip route isis json", outputs[rname][8]["show_ip_route.ref"] +        ) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ip route isis json", -                                   outputs[rname][8]["show_ip_route.ref"])  def test_rib_ipv6_step8():      logger.info("Test (step 8): verify IPv6 RIB") @@ -594,9 +665,11 @@ def test_rib_ipv6_step8():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][8]["show_ipv6_route.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][8]["show_ipv6_route.ref"] +        ) +  def test_mpls_lib_step8():      logger.info("Test (step 8): verify MPLS LIB") @@ -606,9 +679,11 @@ def test_mpls_lib_step8():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show mpls table json", -                                   outputs[rname][8]["show_mpls_table.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show mpls table json", outputs[rname][8]["show_mpls_table.ref"] +        ) +  #  # Step 9 @@ -628,13 +703,19 @@ def test_rib_ipv4_step9():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    logger.info('Re-adding rt5\'s Prefix-SIDs') -    tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 5.5.5.5/32 index 500"') -    tgen.net['rt5'].cmd('vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 2001:db8:1000::5/128 index 501"') +    logger.info("Re-adding rt5's Prefix-SIDs") +    tgen.net["rt5"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 5.5.5.5/32 index 500"' +    ) +    tgen.net["rt5"].cmd( +        'vtysh -c "conf t" -c "router isis 1" -c "segment-routing prefix 2001:db8:1000::5/128 index 501"' +    ) + +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ip route isis json", outputs[rname][9]["show_ip_route.ref"] +        ) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ip route isis json", -                                   outputs[rname][9]["show_ip_route.ref"])  def test_rib_ipv6_step9():      logger.info("Test (step 9): verify IPv6 RIB") @@ -644,9 +725,11 @@ def test_rib_ipv6_step9():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show ipv6 route isis json", -                                   outputs[rname][9]["show_ipv6_route.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show ipv6 route isis json", outputs[rname][9]["show_ipv6_route.ref"] +        ) +  def test_mpls_lib_step9():      logger.info("Test (step 9): verify MPLS LIB") @@ -656,19 +739,22 @@ def test_mpls_lib_step9():      if tgen.routers_have_failure():          pytest.skip(tgen.errors) -    for rname in ['rt1', 'rt2', 'rt3', 'rt4', 'rt5', 'rt6']: -        router_compare_json_output(rname, "show mpls table json", -                                   outputs[rname][9]["show_mpls_table.ref"]) +    for rname in ["rt1", "rt2", "rt3", "rt4", "rt5", "rt6"]: +        router_compare_json_output( +            rname, "show mpls table json", outputs[rname][9]["show_mpls_table.ref"] +        ) +  # Memory leak test template  def test_memory_leak():      "Run the memory leak test and report results."      tgen = get_topogen()      if not tgen.is_memleak_enabled(): -        pytest.skip('Memory leak test/report is disabled') +        pytest.skip("Memory leak test/report is disabled")      tgen.report_memory_leaks() -if __name__ == '__main__': + +if __name__ == "__main__":      args = ["-s"] + sys.argv[1:]      sys.exit(pytest.main(args)) diff --git a/tests/topotests/ldp-topo1/test_ldp_topo1.py b/tests/topotests/ldp-topo1/test_ldp_topo1.py index 31adeafbf6..9822686dfc 100644 --- a/tests/topotests/ldp-topo1/test_ldp_topo1.py +++ b/tests/topotests/ldp-topo1/test_ldp_topo1.py @@ -647,9 +647,11 @@ def test_zebra_ipv4_routingTable():              else:                  print("r%s ok" % i) -            assert failures == 0, ( -                "IPv4 Zebra Routing Table verification failed for router r%s:\n%s" -                % (i, diff) +            assert ( +                failures == 0 +            ), "IPv4 Zebra Routing Table verification failed for router r%s:\n%s" % ( +                i, +                diff,              )      # Make sure that all daemons are running diff --git a/tests/topotests/lib/common_config.py b/tests/topotests/lib/common_config.py index d81f740548..70c4c061f8 100644 --- a/tests/topotests/lib/common_config.py +++ b/tests/topotests/lib/common_config.py @@ -1151,7 +1151,7 @@ def generate_ips(network, no_of_ips):              mask = int(start_ipaddr.split("/")[1])          else:              logger.debug("start_ipaddr {} must have a / in it".format(start_ipaddr)) -            assert(0) +            assert 0          addr_type = validate_ip_address(start_ip)          if addr_type == "ipv4": @@ -2612,7 +2612,7 @@ def verify_rib(      tag=None,      metric=None,      fib=None, -    count_only=False +    count_only=False,  ):      """      Data will be read from input_dict or input JSON file, API will generate @@ -2804,8 +2804,10 @@ def verify_rib(                                              "Nexthops are missing for "                                              "route {} in RIB of router {}: "                                              "expected {}, found {}\n".format( -                                                st_rt, dut, len(next_hop), -                                                len(found_hops) +                                                st_rt, +                                                dut, +                                                len(next_hop), +                                                len(found_hops),                                              )                                          )                                          return errormsg @@ -2852,7 +2854,11 @@ def verify_rib(                                      errormsg = (                                          "[DUT: {}]: tag value {}"                                          " is not matched for" -                                        " route {} in RIB \n".format(dut, _tag, st_rt,) +                                        " route {} in RIB \n".format( +                                            dut, +                                            _tag, +                                            st_rt, +                                        )                                      )                                      return errormsg @@ -2869,7 +2875,11 @@ def verify_rib(                                      errormsg = (                                          "[DUT: {}]: metric value "                                          "{} is not matched for " -                                        "route {} in RIB \n".format(dut, metric, st_rt,) +                                        "route {} in RIB \n".format( +                                            dut, +                                            metric, +                                            st_rt, +                                        )                                      )                                      return errormsg @@ -2918,7 +2928,9 @@ def verify_rib(                  for advertise_network_dict in advertise_network:                      if "vrf" in advertise_network_dict: -                        cmd = "{} vrf {} json".format(command, advertise_network_dict["vrf"]) +                        cmd = "{} vrf {} json".format( +                            command, advertise_network_dict["vrf"] +                        )                      else:                          cmd = "{} json".format(command) @@ -2997,6 +3009,7 @@ def verify_rib(      logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))      return True +  @retry(attempts=6, wait=2, return_is_str=True)  def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):      """ diff --git a/tests/topotests/lib/ospf.py b/tests/topotests/lib/ospf.py index 9f3d4841b0..5c7514a641 100644 --- a/tests/topotests/lib/ospf.py +++ b/tests/topotests/lib/ospf.py @@ -874,7 +874,11 @@ def verify_ospf_rib(                                      errormsg = (                                          "[DUT: {}]: tag value {}"                                          " is not matched for" -                                        " route {} in RIB \n".format(dut, _tag, st_rt,) +                                        " route {} in RIB \n".format( +                                            dut, +                                            _tag, +                                            st_rt, +                                        )                                      )                                      return errormsg @@ -891,7 +895,11 @@ def verify_ospf_rib(                                      errormsg = (                                          "[DUT: {}]: metric value "                                          "{} is not matched for " -                                        "route {} in RIB \n".format(dut, metric, st_rt,) +                                        "route {} in RIB \n".format( +                                            dut, +                                            metric, +                                            st_rt, +                                        )                                      )                                      return errormsg diff --git a/tests/topotests/lib/test/test_json.py b/tests/topotests/lib/test/test_json.py index b85e193d3b..7b3c8593cc 100755 --- a/tests/topotests/lib/test/test_json.py +++ b/tests/topotests/lib/test/test_json.py @@ -107,16 +107,25 @@ def test_json_intersect_multilevel_true():      dcomplete = {          "i1": "item1",          "i2": "item2", -        "i3": {"i100": "item100",}, +        "i3": { +            "i100": "item100", +        },          "i4": { -            "i41": {"i411": "item411",}, -            "i42": {"i421": "item421", "i422": "item422",}, +            "i41": { +                "i411": "item411", +            }, +            "i42": { +                "i421": "item421", +                "i422": "item422", +            },          },      }      dsub1 = {          "i1": "item1", -        "i3": {"i100": "item100",}, +        "i3": { +            "i100": "item100", +        },          "i10": None,      }      dsub2 = { @@ -126,10 +135,36 @@ def test_json_intersect_multilevel_true():      }      dsub3 = {          "i2": "item2", -        "i4": {"i41": {"i411": "item411",}, "i42": {"i422": "item422", "i450": None,}}, +        "i4": { +            "i41": { +                "i411": "item411", +            }, +            "i42": { +                "i422": "item422", +                "i450": None, +            }, +        }, +    } +    dsub4 = { +        "i2": "item2", +        "i4": { +            "i41": {}, +            "i42": { +                "i450": None, +            }, +        }, +    } +    dsub5 = { +        "i2": "item2", +        "i3": { +            "i100": "item100", +        }, +        "i4": { +            "i42": { +                "i450": None, +            } +        },      } -    dsub4 = {"i2": "item2", "i4": {"i41": {}, "i42": {"i450": None,}}} -    dsub5 = {"i2": "item2", "i3": {"i100": "item100",}, "i4": {"i42": {"i450": None,}}}      assert json_cmp(dcomplete, dsub1) is None      assert json_cmp(dcomplete, dsub2) is None @@ -144,17 +179,26 @@ def test_json_intersect_multilevel_false():      dcomplete = {          "i1": "item1",          "i2": "item2", -        "i3": {"i100": "item100",}, +        "i3": { +            "i100": "item100", +        },          "i4": { -            "i41": {"i411": "item411",}, -            "i42": {"i421": "item421", "i422": "item422",}, +            "i41": { +                "i411": "item411", +            }, +            "i42": { +                "i421": "item421", +                "i422": "item422", +            },          },      }      # Incorrect sub-level value      dsub1 = {          "i1": "item1", -        "i3": {"i100": "item00",}, +        "i3": { +            "i100": "item00", +        },          "i10": None,      }      # Inexistent sub-level @@ -166,14 +210,41 @@ def test_json_intersect_multilevel_false():      # Inexistent sub-level value      dsub3 = {          "i1": "item1", -        "i3": {"i100": None,}, +        "i3": { +            "i100": None, +        },      }      # Inexistent sub-sub-level value -    dsub4 = {"i4": {"i41": {"i412": "item412",}, "i42": {"i421": "item421",}}} +    dsub4 = { +        "i4": { +            "i41": { +                "i412": "item412", +            }, +            "i42": { +                "i421": "item421", +            }, +        } +    }      # Invalid sub-sub-level value -    dsub5 = {"i4": {"i41": {"i411": "item411",}, "i42": {"i421": "item420000",}}} +    dsub5 = { +        "i4": { +            "i41": { +                "i411": "item411", +            }, +            "i42": { +                "i421": "item420000", +            }, +        } +    }      # sub-sub-level should be value -    dsub6 = {"i4": {"i41": {"i411": "item411",}, "i42": "foobar",}} +    dsub6 = { +        "i4": { +            "i41": { +                "i411": "item411", +            }, +            "i42": "foobar", +        } +    }      assert json_cmp(dcomplete, dsub1) is not None      assert json_cmp(dcomplete, dsub2) is not None @@ -187,7 +258,15 @@ def test_json_with_list_sucess():      "Test successful json comparisons that have lists."      dcomplete = { -        "list": [{"i1": "item 1", "i2": "item 2",}, {"i10": "item 10",},], +        "list": [ +            { +                "i1": "item 1", +                "i2": "item 2", +            }, +            { +                "i10": "item 10", +            }, +        ],          "i100": "item 100",      } @@ -197,12 +276,19 @@ def test_json_with_list_sucess():      }      # Test list correct list items      dsub2 = { -        "list": [{"i1": "item 1",},], +        "list": [ +            { +                "i1": "item 1", +            }, +        ],          "i100": "item 100",      }      # Test list correct list size      dsub3 = { -        "list": [{}, {},], +        "list": [ +            {}, +            {}, +        ],      }      assert json_cmp(dcomplete, dsub1) is None @@ -214,7 +300,15 @@ def test_json_with_list_failure():      "Test failed json comparisons that have lists."      dcomplete = { -        "list": [{"i1": "item 1", "i2": "item 2",}, {"i10": "item 10",},], +        "list": [ +            { +                "i1": "item 1", +                "i2": "item 2", +            }, +            { +                "i10": "item 10", +            }, +        ],          "i100": "item 100",      } @@ -224,12 +318,20 @@ def test_json_with_list_failure():      }      # Test list incorrect list items      dsub2 = { -        "list": [{"i1": "item 2",},], +        "list": [ +            { +                "i1": "item 2", +            }, +        ],          "i100": "item 100",      }      # Test list correct list size      dsub3 = { -        "list": [{}, {}, {},], +        "list": [ +            {}, +            {}, +            {}, +        ],      }      assert json_cmp(dcomplete, dsub1) is not None @@ -241,20 +343,52 @@ def test_json_list_start_success():      "Test JSON encoded data that starts with a list that should succeed."      dcomplete = [ -        {"id": 100, "value": "abc",}, -        {"id": 200, "value": "abcd",}, -        {"id": 300, "value": "abcde",}, +        { +            "id": 100, +            "value": "abc", +        }, +        { +            "id": 200, +            "value": "abcd", +        }, +        { +            "id": 300, +            "value": "abcde", +        },      ] -    dsub1 = [{"id": 100, "value": "abc",}] +    dsub1 = [ +        { +            "id": 100, +            "value": "abc", +        } +    ] -    dsub2 = [{"id": 100, "value": "abc",}, {"id": 200, "value": "abcd",}] +    dsub2 = [ +        { +            "id": 100, +            "value": "abc", +        }, +        { +            "id": 200, +            "value": "abcd", +        }, +    ] -    dsub3 = [{"id": 300, "value": "abcde",}] +    dsub3 = [ +        { +            "id": 300, +            "value": "abcde", +        } +    ]      dsub4 = [] -    dsub5 = [{"id": 100,}] +    dsub5 = [ +        { +            "id": 100, +        } +    ]      assert json_cmp(dcomplete, dsub1) is None      assert json_cmp(dcomplete, dsub2) is None @@ -272,13 +406,44 @@ def test_json_list_start_failure():          {"id": 300, "value": "abcde"},      ] -    dsub1 = [{"id": 100, "value": "abcd",}] +    dsub1 = [ +        { +            "id": 100, +            "value": "abcd", +        } +    ] -    dsub2 = [{"id": 100, "value": "abc",}, {"id": 200, "value": "abc",}] +    dsub2 = [ +        { +            "id": 100, +            "value": "abc", +        }, +        { +            "id": 200, +            "value": "abc", +        }, +    ] -    dsub3 = [{"id": 100, "value": "abc",}, {"id": 350, "value": "abcde",}] +    dsub3 = [ +        { +            "id": 100, +            "value": "abc", +        }, +        { +            "id": 350, +            "value": "abcde", +        }, +    ] -    dsub4 = [{"value": "abcx",}, {"id": 300, "value": "abcde",}] +    dsub4 = [ +        { +            "value": "abcx", +        }, +        { +            "id": 300, +            "value": "abcde", +        }, +    ]      assert json_cmp(dcomplete, dsub1) is not None      assert json_cmp(dcomplete, dsub2) is not None diff --git a/tests/topotests/lib/topogen.py b/tests/topotests/lib/topogen.py index 86f06b2af7..7c52e824c1 100644 --- a/tests/topotests/lib/topogen.py +++ b/tests/topotests/lib/topogen.py @@ -336,7 +336,9 @@ class Topogen(object):          for gear in self.gears.values():              errors += gear.stop()          if len(errors) > 0: -            logger.error("Errors found post shutdown - details follow: {}".format(errors)) +            logger.error( +                "Errors found post shutdown - details follow: {}".format(errors) +            )          self.net.stop() diff --git a/tests/topotests/lib/topotest.py b/tests/topotests/lib/topotest.py index d1e76866a7..ef0ac27118 100644 --- a/tests/topotests/lib/topotest.py +++ b/tests/topotests/lib/topotest.py @@ -609,8 +609,10 @@ def interface_set_status(node, ifacename, ifaceaction=False, vrf_name=None):              ifacename, str_ifaceaction          )      else: -        cmd = 'vtysh -c "configure terminal" -c "interface {0} vrf {1}" -c "{2}"'.format( -            ifacename, vrf_name, str_ifaceaction +        cmd = ( +            'vtysh -c "configure terminal" -c "interface {0} vrf {1}" -c "{2}"'.format( +                ifacename, vrf_name, str_ifaceaction +            )          )      node.run(cmd) @@ -924,40 +926,44 @@ def checkAddressSanitizerError(output, router, component, logdir=""):          )          if addressSanitizerLog:              # Find Calling Test. Could be multiple steps back -            testframe=sys._current_frames().values()[0] -            level=0 +            testframe = sys._current_frames().values()[0] +            level = 0              while level < 10: -                test=os.path.splitext(os.path.basename(testframe.f_globals["__file__"]))[0] +                test = os.path.splitext( +                    os.path.basename(testframe.f_globals["__file__"]) +                )[0]                  if (test != "topotest") and (test != "topogen"):                      # Found the calling test -                    callingTest=os.path.basename(testframe.f_globals["__file__"]) +                    callingTest = os.path.basename(testframe.f_globals["__file__"])                      break -                level=level+1 -                testframe=testframe.f_back -            if (level >= 10): +                level = level + 1 +                testframe = testframe.f_back +            if level >= 10:                  # somehow couldn't find the test script. -                callingTest="unknownTest" +                callingTest = "unknownTest"              #              # Now finding Calling Procedure -            level=0 +            level = 0              while level < 20: -                callingProc=sys._getframe(level).f_code.co_name -                if ((callingProc != "processAddressSanitizerError") and -                        (callingProc != "checkAddressSanitizerError") and -                        (callingProc != "checkRouterCores") and -                        (callingProc != "stopRouter") and -                        (callingProc != "__stop_internal") and -                        (callingProc != "stop") and -                        (callingProc != "stop_topology") and -                        (callingProc != "checkRouterRunning") and -                        (callingProc != "check_router_running") and -                        (callingProc != "routers_have_failure")): +                callingProc = sys._getframe(level).f_code.co_name +                if ( +                    (callingProc != "processAddressSanitizerError") +                    and (callingProc != "checkAddressSanitizerError") +                    and (callingProc != "checkRouterCores") +                    and (callingProc != "stopRouter") +                    and (callingProc != "__stop_internal") +                    and (callingProc != "stop") +                    and (callingProc != "stop_topology") +                    and (callingProc != "checkRouterRunning") +                    and (callingProc != "check_router_running") +                    and (callingProc != "routers_have_failure") +                ):                      # Found the calling test                      break -                level=level+1 -            if (level >= 20): +                level = level + 1 +            if level >= 20:                  # something wrong - couldn't found the calling test function -                callingProc="unknownProc" +                callingProc = "unknownProc"              with open("/tmp/AddressSanitzer.txt", "a") as addrSanFile:                  sys.stderr.write(                      "AddressSanitizer error in topotest `%s`, test `%s`, router `%s`\n\n" @@ -979,7 +985,6 @@ def checkAddressSanitizerError(output, router, component, logdir=""):                  addrSanFile.write("\n---------------\n")          return -      addressSanitizerError = re.search(          "(==[0-9]+==)ERROR: AddressSanitizer: ([^\s]*) ", output      ) @@ -989,16 +994,20 @@ def checkAddressSanitizerError(output, router, component, logdir=""):      # No Address Sanitizer Error in Output. Now check for AddressSanitizer daemon file      if logdir: -        filepattern=logdir+"/"+router+"/"+component+".asan.*" -        logger.debug("Log check for %s on %s, pattern %s\n" % (component, router, filepattern)) +        filepattern = logdir + "/" + router + "/" + component + ".asan.*" +        logger.debug( +            "Log check for %s on %s, pattern %s\n" % (component, router, filepattern) +        )          for file in glob.glob(filepattern):              with open(file, "r") as asanErrorFile: -                asanError=asanErrorFile.read() +                asanError = asanErrorFile.read()              addressSanitizerError = re.search(                  "(==[0-9]+==)ERROR: AddressSanitizer: ([^\s]*) ", asanError -                ) +            )              if addressSanitizerError: -                processAddressSanitizerError(addressSanitizerError, asanError, router, component) +                processAddressSanitizerError( +                    addressSanitizerError, asanError, router, component +                )                  return True      return False @@ -1065,7 +1074,7 @@ class Router(Node):          if self.logdir is None:              cur_test = os.environ["PYTEST_CURRENT_TEST"]              self.logdir = "/tmp/topotests/" + cur_test[ -                cur_test.find("/")+1 : cur_test.find(".py") +                cur_test.find("/") + 1 : cur_test.find(".py")              ].replace("/", ".")          # If the logdir is not created, then create it and set the @@ -1073,7 +1082,7 @@ class Router(Node):          if not os.path.isdir(self.logdir):              os.system("mkdir -p " + self.logdir + "/" + name)              os.system("chmod -R go+rw /tmp/topotests") -        # Erase logs of previous run +            # Erase logs of previous run              os.system("rm -rf " + self.logdir + "/" + name)          self.daemondir = None diff --git a/tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py b/tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py index 6d44d02e5e..2421b312d2 100644 --- a/tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py +++ b/tests/topotests/ospf-topo1-vrf/test_ospf_topo1_vrf.py @@ -320,8 +320,10 @@ def test_ospf_link_down_kernel_route():          # Run test function until we get an result. Wait at most 60 seconds.          test_func = partial(compare_show_ip_route_vrf, router.name, expected)          result, diff = topotest.run_and_expect(test_func, "", count=140, wait=0.5) -        assertmsg = 'OSPF IPv4 route mismatch in router "{}" after link down: {}'.format( -            router.name, diff +        assertmsg = ( +            'OSPF IPv4 route mismatch in router "{}" after link down: {}'.format( +                router.name, diff +            )          )          assert result, assertmsg diff --git a/tests/topotests/ospf-topo1/test_ospf_topo1.py b/tests/topotests/ospf-topo1/test_ospf_topo1.py index 95193afb2a..24806dd8fc 100644 --- a/tests/topotests/ospf-topo1/test_ospf_topo1.py +++ b/tests/topotests/ospf-topo1/test_ospf_topo1.py @@ -310,7 +310,10 @@ def test_ospf_json():              # r4 has more interfaces for area 0.0.0.1              if router.name == "r4":                  expected["areas"]["0.0.0.1"].update( -                    {"areaIfActiveCounter": 2, "areaIfTotalCounter": 2,} +                    { +                        "areaIfActiveCounter": 2, +                        "areaIfTotalCounter": 2, +                    }                  )          # router 3 has an additional area @@ -372,16 +375,25 @@ def test_ospf_link_down_kernel_route():          }          if router.name == "r1" or router.name == "r2":              expected.update( -                {"10.0.10.0/24": None, "172.16.0.0/24": None, "172.16.1.0/24": None,} +                { +                    "10.0.10.0/24": None, +                    "172.16.0.0/24": None, +                    "172.16.1.0/24": None, +                }              )          elif router.name == "r3" or router.name == "r4":              expected.update( -                {"10.0.1.0/24": None, "10.0.2.0/24": None,} +                { +                    "10.0.1.0/24": None, +                    "10.0.2.0/24": None, +                }              )          # Route '10.0.3.0' is no longer available for r4 since it is down.          if router.name == "r4":              expected.update( -                {"10.0.3.0/24": None,} +                { +                    "10.0.3.0/24": None, +                }              )          assertmsg = 'OSPF IPv4 route mismatch in router "{}" after link down'.format(              router.name @@ -443,12 +455,17 @@ def test_ospf6_link_down_kernel_route():              )          elif router.name == "r3" or router.name == "r4":              expected.update( -                {"2001:db8:1::/64": None, "2001:db8:2::/64": None,} +                { +                    "2001:db8:1::/64": None, +                    "2001:db8:2::/64": None, +                }              )          # Route '2001:db8:3::/64' is no longer available for r4 since it is down.          if router.name == "r4":              expected.update( -                {"2001:db8:3::/64": None,} +                { +                    "2001:db8:3::/64": None, +                }              )          assertmsg = 'OSPF IPv6 route mismatch in router "{}" after link down'.format(              router.name diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py b/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py index 3b37b8a92f..5ef6b9b0be 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_ecmp.py @@ -252,7 +252,11 @@ def test_ospf_ecmp_tc16_p0(request):      input_dict = {          "r0": {              "static_routes": [ -                {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0",} +                { +                    "network": NETWORK["ipv4"][0], +                    "no_of_ip": 5, +                    "next_hop": "Null0", +                }              ]          }      } @@ -415,7 +419,11 @@ def test_ospf_ecmp_tc17_p0(request):      input_dict = {          "r0": {              "static_routes": [ -                {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0",} +                { +                    "network": NETWORK["ipv4"][0], +                    "no_of_ip": 5, +                    "next_hop": "Null0", +                }              ]          }      } diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py b/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py index 04b1f4b878..88667a6ac8 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_routemaps.py @@ -197,6 +197,7 @@ def teardown_module(mod):  # Test cases start here.  # ################################## +  def test_ospf_routemaps_functionality_tc19_p0(request):      """      OSPF Route map - Verify OSPF route map support functionality. @@ -215,121 +216,92 @@ def test_ospf_routemaps_functionality_tc19_p0(request):          "r0": {              "static_routes": [                  { -                    "network": NETWORK['ipv4'][0], +                    "network": NETWORK["ipv4"][0],                      "no_of_ip": 5, -                    "next_hop": 'Null0', +                    "next_hop": "Null0",                  }              ]          }      }      result = create_static_routes(tgen, input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) -    ospf_red_r1 = { -        "r0": { -            "ospf": { -                "redistribute": [{ -                    "redist_type": "static" -                }] -            } -        } -    } +    ospf_red_r1 = {"r0": {"ospf": {"redistribute": [{"redist_type": "static"}]}}}      result = create_router_ospf(tgen, topo, ospf_red_r1) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) -    dut = 'r1' -    lsid = NETWORK['ipv4'][0].split("/")[0] -    rid =  routerids[0] -    protocol = 'ospf' +    dut = "r1" +    lsid = NETWORK["ipv4"][0].split("/")[0] +    rid = routerids[0] +    protocol = "ospf"      result = verify_ospf_rib(tgen, dut, input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      ospf_red_r1 = {          "r0": { -            "ospf": { -                "redistribute": [{ -                    "redist_type": "static", -                    "del_action": True -                }] -            } +            "ospf": {"redistribute": [{"redist_type": "static", "del_action": True}]}          }      }      result = create_router_ospf(tgen, topo, ospf_red_r1) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step( -        'Create prefix-list in R0 to permit 10.0.20.1/32 prefix &' -        ' deny 10.0.20.2/32') +        "Create prefix-list in R0 to permit 10.0.20.1/32 prefix &" " deny 10.0.20.2/32" +    )      # Create ip prefix list      pfx_list = {          "r0": {              "prefix_lists": {                  "ipv4": { -                    "pf_list_1_ipv4": [{ -                        "seqid": 10, -                        "network": NETWORK['ipv4'][0], -                        "action": "permit" -                    }, -                    { -                        "seqid": 11, -                        "network": "any", -                        "action": "deny" -                    } +                    "pf_list_1_ipv4": [ +                        { +                            "seqid": 10, +                            "network": NETWORK["ipv4"][0], +                            "action": "permit", +                        }, +                        {"seqid": 11, "network": "any", "action": "deny"},                      ]                  }              }          }      }      result = create_prefix_lists(tgen, pfx_list) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      # Create route map      routemaps = { -            "r0": { -                "route_maps": { -                    "rmap_ipv4": [{ +        "r0": { +            "route_maps": { +                "rmap_ipv4": [ +                    {                          "action": "permit", -                        "match": { -                            "ipv4": { -                                "prefix_lists": -                                    "pf_list_1_ipv4" -                            } -                        } -                    }] -                } +                        "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, +                    } +                ]              } +        }      }      result = create_route_maps(tgen, routemaps) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step(          "Configure route map rmap1 and redistribute static routes to" -        " ospf using route map rmap1") +        " ospf using route map rmap1" +    )      ospf_red_r1 = {          "r0": {              "ospf": { -                "redistribute": [{ -                    "redist_type": "static", -                    "route_map": "rmap_ipv4" -                }] +                "redistribute": [{"redist_type": "static", "route_map": "rmap_ipv4"}]              }          }      }      result = create_router_ospf(tgen, topo, ospf_red_r1) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("Change prefix rules to permit 10.0.20.2 and deny 10.0.20.1")      # Create ip prefix list @@ -337,65 +309,54 @@ def test_ospf_routemaps_functionality_tc19_p0(request):          "r0": {              "prefix_lists": {                  "ipv4": { -                    "pf_list_1_ipv4": [{ -                        "seqid": 10, -                        "network": NETWORK['ipv4'][1], -                        "action": "permit" -                    }, -                    { -                        "seqid": 11, -                        "network": "any", -                        "action": "deny" -                    } +                    "pf_list_1_ipv4": [ +                        { +                            "seqid": 10, +                            "network": NETWORK["ipv4"][1], +                            "action": "permit", +                        }, +                        {"seqid": 11, "network": "any", "action": "deny"},                      ]                  }              }          }      }      result = create_prefix_lists(tgen, pfx_list) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("Verify that route 10.0.20.2 is allowed and 10.0.20.1 is denied.") -    dut = 'r1' +    dut = "r1"      input_dict = {          "r0": {              "static_routes": [ -                { -                    "network": NETWORK['ipv4'][1], -                    "no_of_ip": 1, -                    "next_hop": 'Null0' -                } +                {"network": NETWORK["ipv4"][1], "no_of_ip": 1, "next_hop": "Null0"}              ]          }      }      result = verify_ospf_rib(tgen, dut, input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      input_dict = {          "r0": {              "static_routes": [ -                { -                    "network": NETWORK['ipv4'][0], -                    "no_of_ip": 1, -                    "next_hop": 'Null0' -                } +                {"network": NETWORK["ipv4"][0], "no_of_ip": 1, "next_hop": "Null0"}              ]          }      }      result = verify_ospf_rib(tgen, dut, input_dict, expected=False)      assert result is not True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +        tc_name, result +    ) -    result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, -                        expected=False) +    result = verify_rib( +        tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False +    )      assert result is not True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +        tc_name, result +    )      step("Delete and reconfigure prefix list.")      # Create ip prefix list @@ -403,114 +364,101 @@ def test_ospf_routemaps_functionality_tc19_p0(request):          "r0": {              "prefix_lists": {                  "ipv4": { -                    "pf_list_1_ipv4": [{ -                        "seqid": 10, -                        "network": NETWORK['ipv4'][1], -                        "action": "permit", -                        "delete": True -                    }, -                    { -                        "seqid": 11, -                        "network": "any", -                        "action": "deny", -                        "delete": True -                    } +                    "pf_list_1_ipv4": [ +                        { +                            "seqid": 10, +                            "network": NETWORK["ipv4"][1], +                            "action": "permit", +                            "delete": True, +                        }, +                        { +                            "seqid": 11, +                            "network": "any", +                            "action": "deny", +                            "delete": True, +                        },                      ]                  }              }          }      }      result = create_prefix_lists(tgen, pfx_list) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      result = verify_prefix_lists(tgen, pfx_list) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      input_dict = {          "r0": {              "static_routes": [ -                { -                    "network": NETWORK['ipv4'][0], -                    "no_of_ip": 5, -                    "next_hop": 'Null0' -                } +                {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0"}              ]          }      }      result = verify_ospf_rib(tgen, dut, input_dict, expected=False)      assert result is not True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +        tc_name, result +    ) -    result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, -                            expected=False) +    result = verify_rib( +        tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False +    )      assert result is not True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +        tc_name, result +    )      pfx_list = {          "r0": {              "prefix_lists": {                  "ipv4": { -                    "pf_list_1_ipv4": [{ -                        "seqid": 10, -                        "network": NETWORK['ipv4'][1], -                        "action": "permit" -                    }, -                    { -                        "seqid": 11, -                        "network": "any", -                        "action": "deny" -                    } +                    "pf_list_1_ipv4": [ +                        { +                            "seqid": 10, +                            "network": NETWORK["ipv4"][1], +                            "action": "permit", +                        }, +                        {"seqid": 11, "network": "any", "action": "deny"},                      ]                  }              }          }      }      result = create_prefix_lists(tgen, pfx_list) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("Verify that route 10.0.20.2 is allowed and 10.0.20.1 is denied.") -    dut = 'r1' +    dut = "r1"      input_dict = {          "r0": {              "static_routes": [ -                { -                    "network": NETWORK['ipv4'][1], -                    "no_of_ip": 1, -                    "next_hop": 'Null0' -                } +                {"network": NETWORK["ipv4"][1], "no_of_ip": 1, "next_hop": "Null0"}              ]          }      }      result = verify_ospf_rib(tgen, dut, input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      input_dict = {          "r0": {              "static_routes": [ -                { -                    "network": NETWORK['ipv4'][0], -                    "no_of_ip": 1, -                    "next_hop": 'Null0' -                } +                {"network": NETWORK["ipv4"][0], "no_of_ip": 1, "next_hop": "Null0"}              ]          }      }      result = verify_ospf_rib(tgen, dut, input_dict, expected=False)      assert result is not True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +        tc_name, result +    ) -    result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol, -                            expected=False) +    result = verify_rib( +        tgen, "ipv4", dut, input_dict, protocol=protocol, expected=False +    )      assert result is not True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +        tc_name, result +    )      write_test_footer(tc_name) @@ -535,7 +483,11 @@ def test_ospf_routemaps_functionality_tc20_p0(request):      input_dict = {          "r0": {              "static_routes": [ -                {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0",} +                { +                    "network": NETWORK["ipv4"][0], +                    "no_of_ip": 5, +                    "next_hop": "Null0", +                }              ]          }      } @@ -663,318 +615,229 @@ def test_ospf_routemaps_functionality_tc21_p0(request):      step(          "Create static routes(10.0.20.1/32) in R1 and redistribute " -        "to OSPF using route map.") +        "to OSPF using route map." +    )      # Create Static routes      input_dict = {          "r0": {              "static_routes": [                  { -                    "network": NETWORK['ipv4'][0], +                    "network": NETWORK["ipv4"][0],                      "no_of_ip": 5, -                    "next_hop": 'Null0', +                    "next_hop": "Null0",                  }              ]          }      }      result = create_static_routes(tgen, input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      ospf_red_r0 = {          "r0": {              "ospf": { -                "redistribute": [{ -                    "redist_type": "static", -                    "route_map": "rmap_ipv4" -                }] +                "redistribute": [{"redist_type": "static", "route_map": "rmap_ipv4"}]              }          }      }      result = create_router_ospf(tgen, topo, ospf_red_r0) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      # Create route map      routemaps = { -    "r0": { -        "route_maps": { -            "rmap_ipv4": [{ -                "action": "permit", -                "seq_id": 10 -            }] -        } -    } +        "r0": {"route_maps": {"rmap_ipv4": [{"action": "permit", "seq_id": 10}]}}      }      result = create_route_maps(tgen, routemaps) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("Verify that route is advertised to R2.") -    dut = 'r1' -    protocol = 'ospf' +    dut = "r1" +    protocol = "ospf"      result = verify_ospf_rib(tgen, dut, input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      # Create route map      routemaps = { -    "r0": { -        "route_maps": { -            "rmap_ipv4": [{ -                "action": "permit", -                "delete": True, -                "seq_id": 10 -            }] +        "r0": { +            "route_maps": { +                "rmap_ipv4": [{"action": "permit", "delete": True, "seq_id": 10}] +            }          }      } -    }      result = create_route_maps(tgen, routemaps) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) -    step(' Configure route map with set clause (set metric)') +    step(" Configure route map with set clause (set metric)")      # Create route map      routemaps = { -    "r0": { -        "route_maps": { -            "rmap_ipv4": [{ -                "action": "permit", -                "set": { -                    "med": 123 -                }, -                "seq_id": 10 -            }] +        "r0": { +            "route_maps": { +                "rmap_ipv4": [{"action": "permit", "set": {"med": 123}, "seq_id": 10}] +            }          }      } -    }      result = create_route_maps(tgen, routemaps) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("Verify that configured metric is applied to ospf routes.") -    dut = 'r1' -    protocol = 'ospf' +    dut = "r1" +    protocol = "ospf"      result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step(          "Configure route map with match clause (match metric) with " -        "some actions(change metric).") +        "some actions(change metric)." +    )      # Create route map      routemaps = { -    "r0": { -        "route_maps": { -            "rmap_ipv4": [{ -                "action": "permit", -                "match": { -                    "med": 123 -                }, -                "set": { -                    "med": 150 -                }, -                "seq_id": 10 -            }] +        "r0": { +            "route_maps": { +                "rmap_ipv4": [ +                    { +                        "action": "permit", +                        "match": {"med": 123}, +                        "set": {"med": 150}, +                        "seq_id": 10, +                    } +                ] +            }          }      } -    }      result = create_route_maps(tgen, routemaps) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("Configure route map with call clause")      # Create ip prefix list      input_dict_2 = { -        'r0': { -            'prefix_lists': { -                'ipv4': { -                'pf_list_1_ipv4': [{ -                    'seqid': 10, -                    'network': 'any', -                    'action': 'permit' -                }] -            } +        "r0": { +            "prefix_lists": { +                "ipv4": { +                    "pf_list_1_ipv4": [ +                        {"seqid": 10, "network": "any", "action": "permit"} +                    ] +                }              }          }      }      result = create_prefix_lists(tgen, input_dict_2) -    assert result is True, 'Testcase {} : Failed \n Error: {}'.format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      # Create route map      input_dict_3 = { -    "r0": { -        "route_maps": { -            "rmap_ipv4": [{ -                "action": "permit", -                "match": { -                    "ipv4": { -                    "prefix_lists": "pf_list_1_ipv4" -                } -                }, -                "set": { -                    "med": 150 -                }, -                "call": "rmap_match_pf_2_ipv4", -                "seq_id": 10 -            }], -            "rmap_match_pf_2_ipv4": [{ -                "action": "permit", -                "match": { -                    "ipv4": { -                    "prefix_lists": "pf_list_1_ipv4" -                } -                }, -                "set": { -                    "med": 200 -                }, -                "seq_id": 10 -            }] +        "r0": { +            "route_maps": { +                "rmap_ipv4": [ +                    { +                        "action": "permit", +                        "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, +                        "set": {"med": 150}, +                        "call": "rmap_match_pf_2_ipv4", +                        "seq_id": 10, +                    } +                ], +                "rmap_match_pf_2_ipv4": [ +                    { +                        "action": "permit", +                        "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, +                        "set": {"med": 200}, +                        "seq_id": 10, +                    } +                ], +            }          }      } -    }      result = create_route_maps(tgen, input_dict_3) -    assert result is True, 'Testcase {} : Failed \n Error: {}'.format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      result = verify_ospf_rib(tgen, dut, input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -       tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      # Create route map -    routemaps = { -    "r0": { -        "route_maps": { -            "rmap_ipv4": [{ -                "delete": True -            }] -        } -    } -    } +    routemaps = {"r0": {"route_maps": {"rmap_ipv4": [{"delete": True}]}}}      result = create_route_maps(tgen, routemaps) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("Configure route map with continue clause")      # Create route map      input_dict_3 = { -    "r0": { -        "route_maps": { -            "rmap_ipv4": [{ -                    "action": "permit", -                    'seq_id': '10', -                    "match": { -                        "ipv4": { -                        "prefix_lists": "pf_list_1_ipv4" -                    } -                    }, -                    "set": { -                        "med": 150 -                    }, -                    "continue": "30", -                    "seq_id": 10 -                }, -                { -                    "action": "permit", -                    "match": { -                        "ipv4": { -                        "prefix_lists": "pf_list_1_ipv4" -                    } -                    }, -                    "set": { -                        "med": 100 +        "r0": { +            "route_maps": { +                "rmap_ipv4": [ +                    { +                        "action": "permit", +                        "seq_id": "10", +                        "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, +                        "set": {"med": 150}, +                        "continue": "30", +                        "seq_id": 10,                      }, -                    "seq_id": 20 -                }, -                { -                    "action": "permit", -                    "match": { -                        "ipv4": { -                        "prefix_lists": "pf_list_1_ipv4" -                    } +                    { +                        "action": "permit", +                        "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, +                        "set": {"med": 100}, +                        "seq_id": 20,                      }, -                    "set": { -                        "med": 50 +                    { +                        "action": "permit", +                        "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, +                        "set": {"med": 50}, +                        "seq_id": 30,                      }, -                    "seq_id": 30 -                } -            ] +                ] +            }          }      } -    }      result = create_route_maps(tgen, input_dict_3) -    assert result is True, 'Testcase {} : Failed \n Error: {}'.format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      result = verify_ospf_rib(tgen, dut, input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -       tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("Configure route map with goto clause")      # Create route map      input_dict_3 = { -    "r0": { -        "route_maps": { -            "rmap_ipv4": [{ -                    "action": "permit", -                    'seq_id': '10', -                    "match": { -                    "ipv4": { -                        "prefix_lists": "pf_list_1_ipv4" -                    } +        "r0": { +            "route_maps": { +                "rmap_ipv4": [ +                    { +                        "action": "permit", +                        "seq_id": "10", +                        "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, +                        "goto": "30",                      }, -                    "goto": "30", -                }, -                { -                    "action": "permit", -                    'seq_id': '20', -                    "match": { -                    "ipv4": { -                        "prefix_lists": "pf_list_1_ipv4" -                    } +                    { +                        "action": "permit", +                        "seq_id": "20", +                        "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, +                        "set": {"med": 100},                      }, -                    "set": { -                        "med": 100 -                    } -                }, -                { -                    "action": "permit", -                    'seq_id': '30', -                    "match": { -                    "ipv4": { -                        "prefix_lists": "pf_list_1_ipv4" -                    } +                    { +                        "action": "permit", +                        "seq_id": "30", +                        "match": {"ipv4": {"prefix_lists": "pf_list_1_ipv4"}}, +                        "set": {"med": 200},                      }, -                    "set": { -                        "med": 200 -                    } -                } -            ] +                ] +            }          }      } -    }      result = create_route_maps(tgen, input_dict_3) -    assert result is True, 'Testcase {} : Failed \n Error: {}'.format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      result = verify_rib(tgen, "ipv4", dut, input_dict, protocol=protocol) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      write_test_footer(tc_name) @@ -1003,7 +866,11 @@ def test_ospf_routemaps_functionality_tc24_p0(request):      input_dict = {          "r0": {              "static_routes": [ -                {"network": NETWORK["ipv4"][0], "no_of_ip": 1, "next_hop": "Null0",} +                { +                    "network": NETWORK["ipv4"][0], +                    "no_of_ip": 1, +                    "next_hop": "Null0", +                }              ]          }      } @@ -1037,9 +904,10 @@ def test_ospf_routemaps_functionality_tc24_p0(request):      step("verify that prefix-list is created in R0.")      result = verify_prefix_lists(tgen, pfx_list) -    assert result is not True, ( -        "Testcase {} : Failed \n Prefix list not " -        "present. Error: {}".format(tc_name, result) +    assert ( +        result is not True +    ), "Testcase {} : Failed \n Prefix list not " "present. Error: {}".format( +        tc_name, result      )      # Create route map @@ -1105,9 +973,10 @@ def test_ospf_routemaps_functionality_tc24_p0(request):      step("verify that prefix-list is created in R0.")      result = verify_prefix_lists(tgen, pfx_list) -    assert result is not True, ( -        "Testcase {} : Failed \n Prefix list not " -        "present. Error: {}".format(tc_name, result) +    assert ( +        result is not True +    ), "Testcase {} : Failed \n Prefix list not " "present. Error: {}".format( +        tc_name, result      )      # Create route map diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py b/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py index 6ac0b515df..434d7f8ef5 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_rte_calc.py @@ -407,7 +407,13 @@ def test_ospf_redistribution_tc6_p0(request):      protocol = "ospf"      result = verify_rib( -        tgen, "ipv4", dut, input_dict, protocol=protocol, next_hop=nh, expected=False, +        tgen, +        "ipv4", +        dut, +        input_dict, +        protocol=protocol, +        next_hop=nh, +        expected=False,      )      assert result is not True, "Testcase {} : Failed \n Error: {}".format(          tc_name, result @@ -549,7 +555,11 @@ def test_ospf_redistribution_tc8_p1(request):      input_dict = {          "r0": {              "static_routes": [ -                {"network": NETWORK["ipv4"][0], "no_of_ip": 5, "next_hop": "Null0",} +                { +                    "network": NETWORK["ipv4"][0], +                    "no_of_ip": 5, +                    "next_hop": "Null0", +                }              ]          }      } diff --git a/tests/topotests/ospf_basic_functionality/test_ospf_single_area.py b/tests/topotests/ospf_basic_functionality/test_ospf_single_area.py index b2b0fb8d44..6f6b119abc 100644 --- a/tests/topotests/ospf_basic_functionality/test_ospf_single_area.py +++ b/tests/topotests/ospf_basic_functionality/test_ospf_single_area.py @@ -777,7 +777,6 @@ def test_ospf_show_p1(request):      write_test_footer(tc_name) -  def test_ospf_dead_tc11_p0(request):      """      OSPF timers. @@ -799,224 +798,146 @@ def test_ospf_dead_tc11_p0(request):      step("modify dead interval from default value to some other value on r1")      topo1 = { -        'r1': { -            'links': { -                'r0': { -                    'interface': topo['routers']['r1']['links']['r0'][ -                        'interface'], -                    'ospf': { -                        'hello_interval': 12, -                        'dead_interval': 48 -                    } +        "r1": { +            "links": { +                "r0": { +                    "interface": topo["routers"]["r1"]["links"]["r0"]["interface"], +                    "ospf": {"hello_interval": 12, "dead_interval": 48},                  }              }          }      } -      result = create_interfaces_cfg(tgen, topo1) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -    tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step(          "verify that new timer value is configured and applied using " -        "the show ip ospf interface command.") -    dut = 'r1' -    input_dict= { -        'r1': { -            'links':{ -                'r0': { -                    'ospf':{ -                        'timerDeadSecs': 48 -                    } -                } -            } -        } -    } +        "the show ip ospf interface command." +    ) +    dut = "r1" +    input_dict = {"r1": {"links": {"r0": {"ospf": {"timerDeadSecs": 48}}}}}      result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result) -    step( -        "modify dead interval from default value to r1" -        "dead interval timer on r2") +    step("modify dead interval from default value to r1" "dead interval timer on r2")      topo1 = { -        'r0': { -            'links': { -                'r1': { -                    'interface': topo['routers']['r0']['links']['r1'][ -                        'interface'], -                    'ospf': { -                        'dead_interval': 48, -                        'hello_interval': 12 -                    } +        "r0": { +            "links": { +                "r1": { +                    "interface": topo["routers"]["r0"]["links"]["r1"]["interface"], +                    "ospf": {"dead_interval": 48, "hello_interval": 12},                  }              }          }      }      result = create_interfaces_cfg(tgen, topo1) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -    tc_name, result) - +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("verify that new timer value is configured.") -    input_dict= { -        'r0': { -            'links':{ -                'r1': { -                    'ospf':{ -                        'timerDeadSecs': 48 -                    } -                } -            } -        } -    } -    dut = 'r0' +    input_dict = {"r0": {"links": {"r1": {"ospf": {"timerDeadSecs": 48}}}}} +    dut = "r0"      result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("verify that ospf neighbours are  full")      ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut) -    assert ospf_covergence is True, ("setup_module :Failed \n Error:" -                                      " {}".format(ospf_covergence)) +    assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format( +        ospf_covergence +    ) -    step( -        "reconfigure the default dead interval timer value to " -        "default on r1 and r2") +    step("reconfigure the default dead interval timer value to " "default on r1 and r2")      topo1 = { -        'r0': { -            'links': { -                'r1': { -                    'interface': topo['routers']['r0']['links']['r1'][ -                        'interface'], -                    'ospf': { -                    'dead_interval': 40 -                    } +        "r0": { +            "links": { +                "r1": { +                    "interface": topo["routers"]["r0"]["links"]["r1"]["interface"], +                    "ospf": {"dead_interval": 40},                  }              }          }      }      result = create_interfaces_cfg(tgen, topo1) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -    tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      topo1 = { -        'r1': { -            'links': { -                'r0': { -                    'interface': topo['routers']['r1']['links']['r0'][ -                        'interface'], -                    'ospf': { -                    'dead_interval': 40 -                    } +        "r1": { +            "links": { +                "r0": { +                    "interface": topo["routers"]["r1"]["links"]["r0"]["interface"], +                    "ospf": {"dead_interval": 40},                  }              }          }      }      result = create_interfaces_cfg(tgen, topo1) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -    tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("verify that new timer value is configured.") -    input_dict= { -        'r0': { -            'links':{ -                'r1': { -                    'ospf':{ -                        'timerDeadSecs': 40 -                    } -                } -            } -        } -    } -    dut = 'r0' +    input_dict = {"r0": {"links": {"r1": {"ospf": {"timerDeadSecs": 40}}}}} +    dut = "r0"      result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) - +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("verify that ospf neighbours are  full")      ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut) -    assert ospf_covergence is True, ("setup_module :Failed \n Error:" -                                      " {}".format(ospf_covergence)) - +    assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format( +        ospf_covergence +    )      step(" Configure dead timer = 65535 on r1 and r2")      topo1 = { -        'r0': { -            'links': { -                'r1': { -                    'interface': topo['routers']['r0']['links']['r1'][ -                        'interface'], -                    'ospf': { -                    'dead_interval': 65535 -                    } +        "r0": { +            "links": { +                "r1": { +                    "interface": topo["routers"]["r0"]["links"]["r1"]["interface"], +                    "ospf": {"dead_interval": 65535},                  }              }          }      }      result = create_interfaces_cfg(tgen, topo1) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -    tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      topo1 = { -        'r1': { -            'links': { -                'r0': { -                    'interface': topo['routers']['r1']['links']['r0'][ -                        'interface'], -                    'ospf': { -                    'dead_interval': 65535 -                    } +        "r1": { +            "links": { +                "r0": { +                    "interface": topo["routers"]["r1"]["links"]["r0"]["interface"], +                    "ospf": {"dead_interval": 65535},                  }              }          }      }      result = create_interfaces_cfg(tgen, topo1) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -    tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("verify that new timer value is configured.") -    input_dict= { -        'r0': { -            'links':{ -                'r1': { -                    'ospf':{ -                        'timerDeadSecs': 65535 -                    } -                } -            } -        } -    } -    dut = 'r0' +    input_dict = {"r0": {"links": {"r1": {"ospf": {"timerDeadSecs": 65535}}}}} +    dut = "r0"      result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step("verify that ospf neighbours are  full")      ospf_covergence = verify_ospf_neighbor(tgen, topo, dut=dut) -    assert ospf_covergence is True, ("setup_module :Failed \n Error:" -                                      " {}".format(ospf_covergence)) - +    assert ospf_covergence is True, "setup_module :Failed \n Error:" " {}".format( +        ospf_covergence +    )      step(" Try configuring timer values outside range for example 65536")      topo1 = { -        'r0': { -            'links': { -                'r1': { -                    'interface': topo['routers']['r0']['links']['r1'][ -                        'interface'], -                    'ospf': { -                    'dead_interval': 65536 -                    } +        "r0": { +            "links": { +                "r1": { +                    "interface": topo["routers"]["r0"]["links"]["r1"]["interface"], +                    "ospf": {"dead_interval": 65536},                  }              }          } @@ -1024,47 +945,33 @@ def test_ospf_dead_tc11_p0(request):      result = create_interfaces_cfg(tgen, topo1)      assert result is not True, "Testcase {} : Failed \n Error: {}".format( -    tc_name, result) +        tc_name, result +    )      step("Unconfigure the dead timer from the interface from r1 and r2.")      topo1 = { -        'r1': { -            'links': { -                'r0': { -                    'interface': topo['routers']['r1']['links']['r0'][ -                        'interface'], -                    'ospf': { -                    'dead_interval': 65535 -                    }, -                    'delete': True +        "r1": { +            "links": { +                "r0": { +                    "interface": topo["routers"]["r1"]["links"]["r0"]["interface"], +                    "ospf": {"dead_interval": 65535}, +                    "delete": True,                  }              }          }      }      result = create_interfaces_cfg(tgen, topo1) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -    tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      step( -        "Verify that timer value is deleted from intf & " -        "set to default value 40 sec.") -    input_dict= { -        'r1': { -            'links':{ -                'r0': { -                    'ospf':{ -                        'timerDeadSecs': 40 -                    } -                } -            } -        } -    } -    dut = 'r1' +        "Verify that timer value is deleted from intf & " "set to default value 40 sec." +    ) +    input_dict = {"r1": {"links": {"r0": {"ospf": {"timerDeadSecs": 40}}}}} +    dut = "r1"      result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict) -    assert result is True, "Testcase {} : Failed \n Error: {}".format( -        tc_name, result) +    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)      write_test_footer(tc_name) diff --git a/tests/topotests/rip-topo1/test_rip_topo1.py b/tests/topotests/rip-topo1/test_rip_topo1.py index de11b78824..fdafa50aba 100644 --- a/tests/topotests/rip-topo1/test_rip_topo1.py +++ b/tests/topotests/rip-topo1/test_rip_topo1.py @@ -352,9 +352,11 @@ def test_zebra_ipv4_routingTable():              else:                  print("r%s ok" % i) -            assert failures == 0, ( -                "Zebra IPv4 Routing Table verification failed for router r%s:\n%s" -                % (i, diff) +            assert ( +                failures == 0 +            ), "Zebra IPv4 Routing Table verification failed for router r%s:\n%s" % ( +                i, +                diff,              )      # Make sure that all daemons are still running diff --git a/tests/topotests/ripng-topo1/test_ripng_topo1.py b/tests/topotests/ripng-topo1/test_ripng_topo1.py index 2976cdefe4..4702d33dae 100644 --- a/tests/topotests/ripng-topo1/test_ripng_topo1.py +++ b/tests/topotests/ripng-topo1/test_ripng_topo1.py @@ -373,9 +373,11 @@ def test_zebra_ipv6_routingTable():              else:                  print("r%s ok" % i) -            assert failures == 0, ( -                "Zebra IPv6 Routing Table verification failed for router r%s:\n%s" -                % (i, diff) +            assert ( +                failures == 0 +            ), "Zebra IPv6 Routing Table verification failed for router r%s:\n%s" % ( +                i, +                diff,              )      # Make sure that all daemons are running diff --git a/tests/topotests/zebra_netlink/test_zebra_netlink.py b/tests/topotests/zebra_netlink/test_zebra_netlink.py index 4da5303641..94baf8438f 100644 --- a/tests/topotests/zebra_netlink/test_zebra_netlink.py +++ b/tests/topotests/zebra_netlink/test_zebra_netlink.py @@ -118,7 +118,12 @@ def test_zebra_netlink_batching():      r1.vtysh_cmd("sharp install routes 2.1.3.7 nexthop 192.168.1.1 100")      json_file = "{}/r1/v4_route.json".format(CWD)      expected = json.loads(open(json_file).read()) -    test_func = partial(topotest.router_json_cmp, r1, "show ip route json", expected,) +    test_func = partial( +        topotest.router_json_cmp, +        r1, +        "show ip route json", +        expected, +    )      _, result = topotest.run_and_expect(test_func, None, count=2, wait=0.5)      assertmsg = '"r1" JSON output mismatches'      assert result is None, assertmsg  | 
