return True
+@retry(attempts=3, wait=4, return_is_str=True)
+def verify_bgp_attributes(tgen, addr_type, dut, static_routes, rmap_name,
+ input_dict, seq_id=None):
+ """
+ API will verify BGP attributes set by Route-map for given prefix and
+ DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command
+ in DUT to verify BGP attributes set by route-map, Set attributes
+ values will be read from input_dict and verified with command output.
+
+ * `tgen`: topogen object
+ * `addr_type` : ip type, ipv4/ipv6
+ * `dut`: Device Under Test
+ * `static_routes`: Static Routes for which BGP set attributes needs to be
+ verified
+ * `rmap_name`: route map name for which set criteria needs to be verified
+ * `input_dict`: defines for which router, AS numbers needs
+ * `seq_id`: sequence number of rmap, default is None
+
+ Usage
+ -----
+ input_dict = {
+ "r3": {
+ "route_maps": {
+ "rmap_match_pf_1_ipv4": [{
+ "action": "permit",
+ 'seq_id': '5',
+ "match": {
+ addr_type: {
+ "prefix_lists": "pf_list_1_" + addr_type
+ }
+ },
+ "set": {
+ "localpref": 150,
+ "weight": 100
+ }
+ }],
+ "rmap_match_pf_2_ipv6": [{
+ "action": "permit",
+ 'seq_id': '5',
+ "match": {
+ addr_type: {
+ "prefix_lists": "pf_list_1_" + addr_type
+ }
+ },
+ "set": {
+ "med": 50
+ }
+ }]
+ }
+ }
+ }
+ result = verify_bgp_attributes(tgen, 'ipv4', "r1", "10.0.20.1/32",
+ rmap_match_pf_1_ipv4, input_dict)
+
+ Returns
+ -------
+ errormsg(str) or True
+ """
+
+ logger.debug("Entering lib API: verify_bgp_attributes()")
+ for router, rnode in tgen.routers().iteritems():
+ if router != dut:
+ continue
+
+ logger.info('Verifying BGP set attributes for dut {}:'.format(router))
+
+ for static_route in static_routes:
+ cmd = "show bgp {} {} json".format(addr_type, static_route)
+ show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
+ print("show_bgp_json $$$$$", show_bgp_json)
+
+ dict_to_test = []
+ tmp_list = []
+ for rmap_router in input_dict.keys():
+ for rmap, values in input_dict[rmap_router][
+ "route_maps"].items():
+ print("rmap == rmap_name $$$$1", rmap, rmap_name)
+ if rmap == rmap_name:
+ print("rmap == rmap_name $$$$", rmap, rmap_name)
+ dict_to_test = values
+ for rmap_dict in values:
+ if seq_id is not None:
+ if type(seq_id) is not list:
+ seq_id = [seq_id]
+
+ if "seq_id" in rmap_dict:
+ rmap_seq_id = \
+ rmap_dict["seq_id"]
+ for _seq_id in seq_id:
+ if _seq_id == rmap_seq_id:
+ tmp_list.append(rmap_dict)
+ if tmp_list:
+ dict_to_test = tmp_list
+
+ print("dict_to_test $$$$", dict_to_test)
+ for rmap_dict in dict_to_test:
+ if "set" in rmap_dict:
+ for criteria in rmap_dict["set"].keys():
+ if criteria not in show_bgp_json[
+ "paths"][0]:
+ errormsg = ("BGP attribute: {}"
+ " is not found in"
+ " cli: {} output "
+ "in router {}".
+ format(criteria,
+ cmd,
+ router))
+ return errormsg
+
+ if rmap_dict["set"][criteria] == \
+ show_bgp_json["paths"][0][
+ criteria]:
+ logger.info("Verifying BGP "
+ "attribute {} for"
+ " route: {} in "
+ "router: {}, found"
+ " expected value:"
+ " {}".
+ format(criteria,
+ static_route,
+ dut,
+ rmap_dict[
+ "set"][
+ criteria]))
+ else:
+ errormsg = \
+ ("Failed: Verifying BGP "
+ "attribute {} for route:"
+ " {} in router: {}, "
+ " expected value: {} but"
+ " found: {}".
+ format(criteria,
+ static_route,
+ dut,
+ rmap_dict["set"]
+ [criteria],
+ show_bgp_json[
+ 'paths'][
+ 0][criteria]))
+ return errormsg
+
+ logger.debug("Exiting lib API: verify_bgp_attributes()")
+ return True
+
@retry(attempts=3, wait=2, return_is_str=True)
def verify_best_path_as_per_bgp_attribute(tgen, addr_type, router, input_dict,
attribute):