]> git.puffer.fish Git - matthieu/frr.git/commitdiff
tests : bgp default-originate api are added
authorARShreenidhi <rshreenidhi@vmware.com>
Tue, 7 Jun 2022 06:34:20 +0000 (06:34 +0000)
committerARShreenidhi <rshreenidhi@vmware.com>
Tue, 14 Jun 2022 15:40:19 +0000 (15:40 +0000)
Signed-off-by: ARShreenidhi <rshreenidhi@vmware.com>
tests/topotests/lib/bgp.py

index 4dd44e3e9e7003f77c3728b978b049a46fe6e7cc..216756f5121e8ff2974f2c8432cc47924c1999e9 100644 (file)
@@ -29,6 +29,7 @@ from lib.common_config import (
     create_common_configurations,
     FRRCFG_FILE,
     InvalidCLIError,
+    apply_raw_config,
     check_address_types,
     find_interface_with_greater_ip,
     generate_ips,
@@ -74,6 +75,12 @@ def create_router_bgp(tgen, topo=None, input_dict=None, build=False, load_config
                 "address_family": {
                     "ipv4": {
                         "unicast": {
+                            "default_originate":{
+                                "neighbor":"R2",
+                                "add_type":"lo"
+                                "route_map":"rm"
+
+                            },
                             "redistribute": [{
                                 "redist_type": "static",
                                     "attribute": {
@@ -498,6 +505,12 @@ def __create_bgp_unicast_neighbor(
                 topo, input_dict, router, addr_type, add_neigh
             )
             config_data.extend(neigh_data)
+        # configure default originate
+        if "default_originate" in addr_data:
+            default_originate_config = __create_bgp_default_originate_neighbor(
+                topo, input_dict, router, addr_type, add_neigh
+            )
+            config_data.extend(default_originate_config)
 
     for addr_type, addr_dict in bgp_data.items():
         if not addr_dict or not check_address_types(addr_type):
@@ -515,6 +528,78 @@ def __create_bgp_unicast_neighbor(
     return config_data
 
 
+def __create_bgp_default_originate_neighbor(
+    topo, input_dict, router, addr_type, add_neigh=True
+):
+    """
+    Helper API to create neighbor default - originate  configuration
+
+    Parameters
+    ----------
+    * `tgen` : Topogen object
+    * `topo` : json file data
+    * `input_dict` : Input dict data, required when configuring from testcase
+    * `router` : router id to be configured
+    """
+    tgen = get_topogen()
+    config_data = []
+    logger.debug("Entering lib API: __create_bgp_default_originate_neighbor()")
+
+    bgp_data = input_dict["address_family"]
+    neigh_data = bgp_data[addr_type]["unicast"]["default_originate"]
+    for name, peer_dict in neigh_data.items():
+        nh_details = topo[name]
+
+        neighbor_ip = None
+        if "dest-link" in neigh_data[name]:
+            dest_link = neigh_data[name]["dest-link"]
+            neighbor_ip = nh_details["links"][dest_link][addr_type].split("/")[0]
+        elif "add_type" in neigh_data[name]:
+            add_type = neigh_data[name]["add_type"]
+            neighbor_ip = nh_details["links"][add_type][addr_type].split("/")[0]
+        else:
+            neighbor_ip = nh_details["links"][router][addr_type].split("/")[0]
+
+        config_data.append("address-family {} unicast".format(addr_type))
+        if "route_map" in peer_dict:
+            route_map = peer_dict["route_map"]
+            if "delete" in peer_dict:
+                if peer_dict["delete"]:
+                    config_data.append(
+                        "no neighbor {} default-originate  route-map {}".format(
+                            neighbor_ip, route_map
+                        )
+                    )
+                else:
+                    config_data.append(
+                        " neighbor {} default-originate  route-map {}".format(
+                            neighbor_ip, route_map
+                        )
+                    )
+            else:
+                config_data.append(
+                    " neighbor {} default-originate  route-map {}".format(
+                        neighbor_ip, route_map
+                    )
+                )
+
+        else:
+            if "delete" in peer_dict:
+                if peer_dict["delete"]:
+                    config_data.append(
+                        "no neighbor {} default-originate".format(neighbor_ip)
+                    )
+                else:
+                    config_data.append(
+                        "neighbor {} default-originate".format(neighbor_ip)
+                    )
+            else:
+                config_data.append("neighbor {} default-originate".format(neighbor_ip))
+
+    logger.debug("Exiting lib API: __create_bgp_default_originate_neighbor()")
+    return config_data
+
+
 def __create_l2vpn_evpn_address_family(
     tgen, topo, input_dict, router, config_data=None
 ):
@@ -4574,3 +4659,876 @@ def verify_tcp_mss(tgen, dut, neighbour, configured_tcp_mss, vrf=None):
             return "TCP-MSS Mismatch"
     logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
     return False
+
+
+def get_dut_as_number(tgen, dut):
+    """
+    API to get the Autonomous Number of the given DUT
+
+    params:
+    =======
+    dut  : Device Under test
+
+    returns :
+    =======
+    Success : DUT Autonomous number
+    Fail : Error message with Boolean False
+    """
+    tgen = get_topogen()
+    for router, rnode in tgen.routers().items():
+        if router == dut:
+            show_bgp_json = run_frr_cmd(rnode, "sh ip bgp summary json ", isjson=True)
+            as_number = show_bgp_json["ipv4Unicast"]["as"]
+            if as_number:
+                logger.info(
+                    "[dut {}] DUT contains Automnomous number :: {} ".format(
+                        dut, as_number
+                    )
+                )
+                return as_number
+            else:
+                logger.error(
+                    "[dut {}] ERROR....!  DUT doesnot contain any  Automnomous number  ".format(
+                        dut
+                    )
+                )
+                return False
+
+
+def get_prefix_count_route(
+    tgen, topo, dut, peer, vrf=None, link=None, sent=None, received=None
+):
+    """
+    API to return  the prefix count  of default originate the given DUT
+    dut  : Device under test
+    peer : neigbor on which you are expecting the route to be received
+
+    returns :
+        prefix_count as dict with ipv4 and ipv6 value
+    """
+    # the neighbor IP address can be accessable by finding the neigborship (vice-versa)
+
+    if link:
+        neighbor_ipv4_address = topo["routers"][peer]["links"][link]["ipv4"]
+        neighbor_ipv6_address = topo["routers"][peer]["links"][link]["ipv6"]
+    else:
+        neighbor_ipv4_address = topo["routers"][peer]["links"][dut]["ipv4"]
+        neighbor_ipv6_address = topo["routers"][peer]["links"][dut]["ipv6"]
+
+    neighbor_ipv4_address = neighbor_ipv4_address.split("/")[0]
+    neighbor_ipv6_address = neighbor_ipv6_address.split("/")[0]
+    prefix_count = {}
+    tgen = get_topogen()
+    for router, rnode in tgen.routers().items():
+        if router == dut:
+
+            if vrf:
+                ipv4_cmd = "sh ip bgp vrf {} summary json".format(vrf)
+                show_bgp_json_ipv4 = run_frr_cmd(rnode, ipv4_cmd, isjson=True)
+                ipv6_cmd = "sh ip bgp vrf {} ipv6 unicast summary json".format(vrf)
+                show_bgp_json_ipv6 = run_frr_cmd(rnode, ipv6_cmd, isjson=True)
+
+                prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"]["peers"][
+                    neighbor_ipv4_address
+                ]["pfxRcd"]
+                prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
+                    neighbor_ipv6_address
+                ]["pfxRcd"]
+
+                logger.info(
+                    "The Prefix Count of the [DUT:{} : vrf [{}] ] towards neighbor ipv4 : {} and ipv6 : {}  is : {}".format(
+                        dut,
+                        vrf,
+                        neighbor_ipv4_address,
+                        neighbor_ipv6_address,
+                        prefix_count,
+                    )
+                )
+                return prefix_count
+
+            else:
+                show_bgp_json_ipv4 = run_frr_cmd(
+                    rnode, "sh ip bgp summary json ", isjson=True
+                )
+                show_bgp_json_ipv6 = run_frr_cmd(
+                    rnode, "sh ip bgp ipv6 unicast summary json ", isjson=True
+                )
+                if received:
+                    prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
+                        "peers"
+                    ][neighbor_ipv4_address]["pfxRcd"]
+                    prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
+                        neighbor_ipv6_address
+                    ]["pfxRcd"]
+
+                elif sent:
+                    prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
+                        "peers"
+                    ][neighbor_ipv4_address]["pfxSnt"]
+                    prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
+                        neighbor_ipv6_address
+                    ]["pfxSnt"]
+
+                else:
+                    prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
+                        "peers"
+                    ][neighbor_ipv4_address]["pfxRcd"]
+                    prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
+                        neighbor_ipv6_address
+                    ]["pfxRcd"]
+
+                logger.info(
+                    "The Prefix Count of the DUT:{} towards neighbor ipv4 : {} and ipv6 : {}  is : {}".format(
+                        dut, neighbor_ipv4_address, neighbor_ipv6_address, prefix_count
+                    )
+                )
+                return prefix_count
+        else:
+            logger.error("ERROR...! Unknown dut {} in topolgy".format(dut))
+
+
+@retry(retry_timeout=5)
+def verify_rib_default_route(
+    tgen,
+    topo,
+    dut,
+    routes,
+    expected_nexthop,
+    metric=None,
+    origin=None,
+    locPrf=None,
+    expected_aspath=None,
+):
+    """
+    API to verify the the 'Default route" in BGP RIB with the attributes the rout carries (metric , local preference, )
+
+    param
+    =====
+    dut : device under test
+    routes : default route with expected nexthop
+    expected_nexthop : the nexthop that is expected the deafult route
+
+    """
+    result = False
+    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+    tgen = get_topogen()
+    connected_routes = {}
+    for router, rnode in tgen.routers().items():
+        if router == dut:
+
+            ipv4_routes = run_frr_cmd(rnode, "sh ip bgp json", isjson=True)
+            ipv6_routes = run_frr_cmd(rnode, "sh ip bgp ipv6 unicast json", isjson=True)
+    is_ipv4_default_attrib_found = False
+    is_ipv6_default_attrib_found = False
+
+    default_ipv4_route = routes["ipv4"]
+    default_ipv6_route = "::/0"
+    ipv4_route_Origin = False
+    ipv4_route_local_pref = False
+    ipv4_route_metric = False
+
+    if default_ipv4_route in ipv4_routes["routes"].keys():
+        nxt_hop_count = len(ipv4_routes["routes"][default_ipv4_route])
+        rib_next_hops = []
+        for index in range(nxt_hop_count):
+            rib_next_hops.append(
+                ipv4_routes["routes"][default_ipv4_route][index]["nexthops"][0]["ip"]
+            )
+
+        for nxt_hop in expected_nexthop.items():
+            if nxt_hop[0] == "ipv4":
+                if nxt_hop[1] in rib_next_hops:
+                    logger.info(
+                        "Default routes [{}] obtained from {} .....PASSED".format(
+                            default_ipv4_route, nxt_hop[1]
+                        )
+                    )
+                else:
+                    logger.error(
+                        "ERROR ...! Default routes [{}] expected  is missing  {}".format(
+                            default_ipv4_route, nxt_hop[1]
+                        )
+                    )
+                    return False
+
+            else:
+                pass
+
+        if "origin" in ipv4_routes["routes"][default_ipv4_route][0].keys():
+            ipv4_route_Origin = ipv4_routes["routes"][default_ipv4_route][0]["origin"]
+        if "locPrf" in ipv4_routes["routes"][default_ipv4_route][0].keys():
+            ipv4_route_local_pref = ipv4_routes["routes"][default_ipv4_route][0][
+                "locPrf"
+            ]
+        if "metric" in ipv4_routes["routes"][default_ipv4_route][0].keys():
+            ipv4_route_metric = ipv4_routes["routes"][default_ipv4_route][0]["metric"]
+    else:
+        logger.error("ERROR [ DUT {}] : The Default Route Not found in RIB".format(dut))
+        return False
+
+    origin_found = False
+    locPrf_found = False
+    metric_found = False
+    as_path_found = False
+
+    if origin:
+        if origin == ipv4_route_Origin:
+            logger.info(
+                "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
+                    default_ipv4_route, origin
+                )
+            )
+            origin_found = True
+        else:
+            logger.error(
+                "ERROR... IPV4::! Expected Origin is {} obtained {}".format(
+                    origin, ipv4_route_Origin
+                )
+            )
+            return False
+    else:
+        origin_found = True
+
+    if locPrf:
+        if locPrf == ipv4_route_local_pref:
+            logger.info(
+                "Dafault Route {} expected local preference {} Found in RIB....PASSED".format(
+                    default_ipv4_route, locPrf
+                )
+            )
+            locPrf_found = True
+        else:
+            logger.error(
+                "ERROR... IPV4::! Expected Local preference is {} obtained {}".format(
+                    locPrf, ipv4_route_local_pref
+                )
+            )
+            return False
+    else:
+        locPrf_found = True
+
+    if metric:
+        if metric == ipv4_route_metric:
+            logger.info(
+                "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
+                    default_ipv4_route, metric
+                )
+            )
+
+            metric_found = True
+        else:
+            logger.error(
+                "ERROR... IPV4::! Expected metric is {} obtained {}".format(
+                    metric, ipv4_route_metric
+                )
+            )
+            return False
+    else:
+        metric_found = True
+
+    if expected_aspath:
+        obtained_aspath = ipv4_routes["routes"]["0.0.0.0/0"][0]["path"]
+        if expected_aspath in obtained_aspath:
+            as_path_found = True
+            logger.info(
+                "Dafault Route {} expected  AS path {} Found in RIB....PASSED".format(
+                    default_ipv4_route, expected_aspath
+                )
+            )
+        else:
+            logger.error(
+                "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
+                    expected_aspath, obtained_aspath
+                )
+            )
+            return False
+    else:
+        as_path_found = True
+
+    if origin_found and locPrf_found and metric_found and as_path_found:
+        is_ipv4_default_attrib_found = True
+        logger.info(
+            "IPV4:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in  RIB".format(
+                origin, locPrf, metric, expected_aspath
+            )
+        )
+    else:
+        is_ipv4_default_attrib_found = False
+        logger.error(
+            "IPV4:: Expected origin ['{}'] Obtained [{}]".format(
+                origin, ipv4_route_Origin
+            )
+        )
+        logger.error(
+            "IPV4:: Expected locPrf ['{}'] Obtained [{}]".format(
+                locPrf, ipv4_route_local_pref
+            )
+        )
+        logger.error(
+            "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
+                metric, ipv4_route_metric
+            )
+        )
+        logger.error(
+            "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
+                expected_aspath, obtained_aspath
+            )
+        )
+
+    route_Origin = False
+    route_local_pref = False
+    route_local_metric = False
+    default_ipv6_route = ""
+    try:
+        ipv6_routes["routes"]["0::0/0"]
+        default_ipv6_route = "0::0/0"
+    except:
+        ipv6_routes["routes"]["::/0"]
+        default_ipv6_route = "::/0"
+    if default_ipv6_route in ipv6_routes["routes"].keys():
+        nxt_hop_count = len(ipv6_routes["routes"][default_ipv6_route])
+        rib_next_hops = []
+        for index in range(nxt_hop_count):
+            rib_next_hops.append(
+                ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][0]["ip"]
+            )
+            try:
+                rib_next_hops.append(
+                    ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][1][
+                        "ip"
+                    ]
+                )
+            except (KeyError, IndexError) as e:
+                logger.error("NO impact ..! Global IPV6 Address not found ")
+
+        for nxt_hop in expected_nexthop.items():
+            if nxt_hop[0] == "ipv6":
+                if nxt_hop[1] in rib_next_hops:
+                    logger.info(
+                        "Default routes [{}] obtained from {} .....PASSED".format(
+                            default_ipv6_route, nxt_hop[1]
+                        )
+                    )
+                else:
+                    logger.error(
+                        "ERROR ...! Default routes [{}] expected from {} obtained {}".format(
+                            default_ipv6_route, nxt_hop[1], rib_next_hops
+                        )
+                    )
+                    return False
+
+            else:
+                pass
+        if "origin" in ipv6_routes["routes"][default_ipv6_route][0].keys():
+            route_Origin = ipv6_routes["routes"][default_ipv6_route][0]["origin"]
+        if "locPrf" in ipv6_routes["routes"][default_ipv6_route][0].keys():
+            route_local_pref = ipv6_routes["routes"][default_ipv6_route][0]["locPrf"]
+        if "metric" in ipv6_routes["routes"][default_ipv6_route][0].keys():
+            route_local_metric = ipv6_routes["routes"][default_ipv6_route][0]["metric"]
+
+    origin_found = False
+    locPrf_found = False
+    metric_found = False
+    as_path_found = False
+
+    if origin:
+        if origin == route_Origin:
+            logger.info(
+                "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
+                    default_ipv6_route, route_Origin
+                )
+            )
+            origin_found = True
+        else:
+            logger.error(
+                "ERROR... IPV6::! Expected Origin is {} obtained {}".format(
+                    origin, route_Origin
+                )
+            )
+            return False
+    else:
+        origin_found = True
+
+    if locPrf:
+        if locPrf == route_local_pref:
+            logger.info(
+                "Dafault Route {} expected Local Preference {} Found in RIB....PASSED".format(
+                    default_ipv6_route, route_local_pref
+                )
+            )
+            locPrf_found = True
+        else:
+            logger.error(
+                "ERROR... IPV6::! Expected Local Preference is {} obtained {}".format(
+                    locPrf, route_local_pref
+                )
+            )
+            return False
+    else:
+        locPrf_found = True
+
+    if metric:
+        if metric == route_local_metric:
+            logger.info(
+                "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
+                    default_ipv4_route, metric
+                )
+            )
+
+            metric_found = True
+        else:
+            logger.error(
+                "ERROR... IPV6::! Expected metric is {} obtained {}".format(
+                    metric, route_local_metric
+                )
+            )
+            return False
+    else:
+        metric_found = True
+
+    if expected_aspath:
+        obtained_aspath = ipv6_routes["routes"]["::/0"][0]["path"]
+        if expected_aspath in obtained_aspath:
+            as_path_found = True
+            logger.info(
+                "Dafault Route {} expected  AS path {} Found in RIB....PASSED".format(
+                    default_ipv4_route, expected_aspath
+                )
+            )
+        else:
+            logger.error(
+                "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
+                    expected_aspath, obtained_aspath
+                )
+            )
+            return False
+    else:
+        as_path_found = True
+
+    if origin_found and locPrf_found and metric_found and as_path_found:
+        is_ipv6_default_attrib_found = True
+        logger.info(
+            "IPV6:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in  RIB".format(
+                origin, locPrf, metric, expected_aspath
+            )
+        )
+    else:
+        is_ipv6_default_attrib_found = False
+        logger.error(
+            "IPV6:: Expected origin ['{}'] Obtained [{}]".format(origin, route_Origin)
+        )
+        logger.error(
+            "IPV6:: Expected locPrf ['{}'] Obtained [{}]".format(
+                locPrf, route_local_pref
+            )
+        )
+        logger.error(
+            "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
+                metric, route_local_metric
+            )
+        )
+        logger.error(
+            "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
+                expected_aspath, obtained_aspath
+            )
+        )
+
+    if is_ipv4_default_attrib_found and is_ipv6_default_attrib_found:
+        logger.info("The attributes are found for default route in RIB ")
+        return True
+    else:
+        return False
+
+
+@retry(retry_timeout=5)
+def verify_fib_default_route(tgen, topo, dut, routes, expected_nexthop):
+    """
+    API to verify the the 'Default route" in FIB
+
+    param
+    =====
+    dut : device under test
+    routes : default route with expected nexthop
+    expected_nexthop : the nexthop that is expected the deafult route
+
+    """
+    result = False
+    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+    tgen = get_topogen()
+    connected_routes = {}
+    for router, rnode in tgen.routers().items():
+        if router == dut:
+            ipv4_routes = run_frr_cmd(rnode, "sh ip route json", isjson=True)
+            ipv6_routes = run_frr_cmd(rnode, "sh ipv6 route json", isjson=True)
+
+    is_ipv4_default_route_found = False
+    is_ipv6_default_route_found = False
+    if routes["ipv4"] in ipv4_routes.keys():
+        rib_ipv4_nxt_hops = []
+        ipv4_default_route = routes["ipv4"]
+        nxt_hop_count = len(ipv4_routes[ipv4_default_route][0]["nexthops"])
+        for index in range(nxt_hop_count):
+            rib_ipv4_nxt_hops.append(
+                ipv4_routes[ipv4_default_route][0]["nexthops"][index]["ip"]
+            )
+
+        if expected_nexthop["ipv4"] in rib_ipv4_nxt_hops:
+            is_ipv4_default_route_found = True
+            logger.info(
+                "{} default route with next hop {} is found in FIB ".format(
+                    ipv4_default_route, expected_nexthop
+                )
+            )
+        else:
+            logger.error(
+                "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
+                    ipv4_default_route, expected_nexthop
+                )
+            )
+            return False
+
+    if routes["ipv6"] in ipv6_routes.keys() or "::/0" in ipv6_routes.keys():
+        rib_ipv6_nxt_hops = []
+        if "::/0" in ipv6_routes.keys():
+            ipv6_default_route = "::/0"
+        elif routes["ipv6"] in ipv6_routes.keys():
+            ipv6_default_route = routes["ipv6"]
+
+        nxt_hop_count = len(ipv6_routes[ipv6_default_route][0]["nexthops"])
+        for index in range(nxt_hop_count):
+            rib_ipv6_nxt_hops.append(
+                ipv6_routes[ipv6_default_route][0]["nexthops"][index]["ip"]
+            )
+
+        if expected_nexthop["ipv6"] in rib_ipv6_nxt_hops:
+            is_ipv6_default_route_found = True
+            logger.info(
+                "{} default route with next hop {} is found in FIB ".format(
+                    ipv6_default_route, expected_nexthop
+                )
+            )
+        else:
+            logger.error(
+                "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
+                    ipv6_default_route, expected_nexthop
+                )
+            )
+            return False
+
+    if is_ipv4_default_route_found and is_ipv6_default_route_found:
+        return True
+    else:
+        logger.error(
+            "Default Route for ipv4 and ipv6 address family is not found in FIB "
+        )
+        return False
+
+
+@retry(retry_timeout=5)
+def verify_bgp_advertised_routes_from_neighbor(tgen, topo, dut, peer, expected_routes):
+    """
+    APi is verifies the the routes that are advertised from dut to peer
+
+    command used :
+    "sh ip bgp neighbor <x.x.x.x> advertised-routes" and
+    "sh ip bgp ipv6 unicast neighbor<x::x> advertised-routes"
+
+    dut : Device Under Tests
+    Peer : Peer on which  the routs is expected
+    expected_routes : dual stack IPV4-and IPv6 routes  to be verified
+                    expected_routes
+
+    returns: True / False
+
+    """
+    result = False
+    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+    tgen = get_topogen()
+
+    peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0]
+    peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0]
+
+    for router, rnode in tgen.routers().items():
+        if router == dut:
+            ipv4_receieved_routes = run_frr_cmd(
+                rnode,
+                "sh ip bgp neighbor {} advertised-routes  json".format(
+                    peer_ipv4_neighbor_ip
+                ),
+                isjson=True,
+            )
+            ipv6_receieved_routes = run_frr_cmd(
+                rnode,
+                "sh ip bgp ipv6 unicast neighbor {} advertised-routes  json".format(
+                    peer_ipv6_neighbor_ip
+                ),
+                isjson=True,
+            )
+    ipv4_route_count = 0
+    ipv6_route_count = 0
+    if ipv4_receieved_routes:
+        for index in range(len(expected_routes["ipv4"])):
+            if (
+                expected_routes["ipv4"][index]["network"]
+                in ipv4_receieved_routes["advertisedRoutes"].keys()
+            ):
+                ipv4_route_count += 1
+                logger.info(
+                    "Success  [DUT : {}] The Expected Route {} is  advertised to {} ".format(
+                        dut, expected_routes["ipv4"][index]["network"], peer
+                    )
+                )
+
+            elif (
+                expected_routes["ipv4"][index]["network"]
+                in ipv4_receieved_routes["bgpOriginatingDefaultNetwork"]
+            ):
+                ipv4_route_count += 1
+                logger.info(
+                    "Success  [DUT : {}] The Expected Route {} is  advertised to {} ".format(
+                        dut, expected_routes["ipv4"][index]["network"], peer
+                    )
+                )
+
+            else:
+                logger.error(
+                    "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
+                        dut, expected_routes["ipv4"][index]["network"], peer
+                    )
+                )
+    else:
+        logger.error(ipv4_receieved_routes)
+        logger.error(
+            "ERROR...! [DUT : {}] No IPV4 Routes are advertised to  the peer {}".format(
+                dut, peer
+            )
+        )
+        return False
+
+    if ipv6_receieved_routes:
+        for index in range(len(expected_routes["ipv6"])):
+            if (
+                expected_routes["ipv6"][index]["network"]
+                in ipv6_receieved_routes["advertisedRoutes"].keys()
+            ):
+                ipv6_route_count += 1
+                logger.info(
+                    "Success  [DUT : {}] The Expected Route {} is  advertised to {} ".format(
+                        dut, expected_routes["ipv6"][index]["network"], peer
+                    )
+                )
+            elif (
+                expected_routes["ipv6"][index]["network"]
+                in ipv6_receieved_routes["bgpOriginatingDefaultNetwork"]
+            ):
+                ipv6_route_count += 1
+                logger.info(
+                    "Success  [DUT : {}] The Expected Route {} is  advertised to {} ".format(
+                        dut, expected_routes["ipv6"][index]["network"], peer
+                    )
+                )
+            else:
+                logger.error(
+                    "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
+                        dut, expected_routes["ipv6"][index]["network"], peer
+                    )
+                )
+    else:
+        logger.error(ipv6_receieved_routes)
+        logger.error(
+            "ERROR...! [DUT : {}] No IPV6 Routes are  advertised to the peer {}".format(
+                dut, peer
+            )
+        )
+        return False
+
+    if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len(
+        expected_routes["ipv6"]
+    ):
+        return True
+    else:
+        logger.error(
+            "ERROR ....! IPV4 : Expected Routes -> {}  obtained ->{} ".format(
+                expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"]
+            )
+        )
+        logger.error(
+            "ERROR ....! IPV6 : Expected Routes -> {}  obtained ->{} ".format(
+                expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"]
+            )
+        )
+        return False
+
+
+@retry(retry_timeout=5)
+def verify_bgp_received_routes_from_neighbor(tgen, topo, dut, peer, expected_routes):
+    """
+    API to verify the bgp received routes
+
+    commad used :
+    =============
+    show ip bgp neighbor <x.x.x.x> received-routes
+    show ip bgp ipv6 unicast neighbor <x::x> received-routes
+
+    params
+    =======
+    dut : Device Under Tests
+    Peer : Peer on which  the routs is expected
+    expected_routes : dual stack IPV4-and IPv6 routes  to be verified
+                    expected_routes
+
+    returns:
+    ========
+    True / False
+    """
+    result = False
+    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
+    tgen = get_topogen()
+
+    peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0]
+    peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0]
+
+    logger.info("Enabling Soft configuration to neighbor INBOUND ")
+    neigbor_dict = {"ipv4": peer_ipv4_neighbor_ip, "ipv6": peer_ipv6_neighbor_ip}
+    result = configure_bgp_soft_configuration(
+        tgen, dut, neigbor_dict, direction="inbound"
+    )
+    assert (
+        result is True
+    ), "  Failed to configure the soft configuration \n Error: {}".format(result)
+
+    """sleep of 10 sec is required to get the routes on peer after soft configuration"""
+    sleep(10)
+    for router, rnode in tgen.routers().items():
+        if router == dut:
+            ipv4_receieved_routes = run_frr_cmd(
+                rnode,
+                "sh ip bgp neighbor {} received-routes json".format(
+                    peer_ipv4_neighbor_ip
+                ),
+                isjson=True,
+            )
+            ipv6_receieved_routes = run_frr_cmd(
+                rnode,
+                "sh ip bgp ipv6 unicast neighbor {} received-routes json".format(
+                    peer_ipv6_neighbor_ip
+                ),
+                isjson=True,
+            )
+    ipv4_route_count = 0
+    ipv6_route_count = 0
+    if ipv4_receieved_routes:
+        for index in range(len(expected_routes["ipv4"])):
+            if (
+                expected_routes["ipv4"][index]["network"]
+                in ipv4_receieved_routes["receivedRoutes"].keys()
+            ):
+                ipv4_route_count += 1
+                logger.info(
+                    "Success  [DUT : {}] The Expected Route {} is  received from {} ".format(
+                        dut, expected_routes["ipv4"][index]["network"], peer
+                    )
+                )
+            else:
+                logger.error(
+                    "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
+                        dut, expected_routes["ipv4"][index]["network"], peer
+                    )
+                )
+    else:
+        logger.error(ipv4_receieved_routes)
+        logger.error(
+            "ERROR...! [DUT : {}] No IPV4 Routes are received from the peer {}".format(
+                dut, peer
+            )
+        )
+        return False
+
+    if ipv6_receieved_routes:
+        for index in range(len(expected_routes["ipv6"])):
+            if (
+                expected_routes["ipv6"][index]["network"]
+                in ipv6_receieved_routes["receivedRoutes"].keys()
+            ):
+                ipv6_route_count += 1
+                logger.info(
+                    "Success  [DUT : {}] The Expected Route {} is  received from {} ".format(
+                        dut, expected_routes["ipv6"][index]["network"], peer
+                    )
+                )
+            else:
+                logger.error(
+                    "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
+                        dut, expected_routes["ipv6"][index]["network"], peer
+                    )
+                )
+    else:
+        logger.error(ipv6_receieved_routes)
+        logger.error(
+            "ERROR...! [DUT : {}] No IPV6 Routes are received from the peer {}".format(
+                dut, peer
+            )
+        )
+        return False
+
+    if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len(
+        expected_routes["ipv6"]
+    ):
+        return True
+    else:
+        logger.error(
+            "ERROR ....! IPV4 : Expected Routes -> {}  obtained ->{} ".format(
+                expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"]
+            )
+        )
+        logger.error(
+            "ERROR ....! IPV6 : Expected Routes -> {}  obtained ->{} ".format(
+                expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"]
+            )
+        )
+        return False
+
+
+def configure_bgp_soft_configuration(tgen, dut, neighbor_dict, direction):
+    """
+    Api to configure the bgp soft configuration to show  the received routes from peer
+    params
+    ======
+    dut : device under test route on which the sonfiguration to be applied
+    neighbor_dict : dict element contains ipv4 and ipv6 neigbor ip
+    direction : Directionon which it should be applied in/out
+
+    returns:
+    ========
+    boolean
+    """
+    logger.info("Enabling Soft configuration to neighbor INBOUND ")
+    local_as = get_dut_as_number(tgen, dut)
+    ipv4_neighbor = neighbor_dict["ipv4"]
+    ipv6_neighbor = neighbor_dict["ipv6"]
+    direction = direction.lower()
+    if ipv4_neighbor and ipv4_neighbor:
+        raw_config = {
+            dut: {
+                "raw_config": [
+                    "router bgp {}".format(local_as),
+                    "address-family ipv4 unicast",
+                    "neighbor {} soft-reconfiguration {} ".format(
+                        ipv4_neighbor, direction
+                    ),
+                    "exit-address-family",
+                    "address-family ipv6 unicast",
+                    "neighbor {}  soft-reconfiguration {} ".format(
+                        ipv6_neighbor, direction
+                    ),
+                    "exit-address-family",
+                ]
+            }
+        }
+        result = apply_raw_config(tgen, raw_config)
+        logger.info(
+            "Success... [DUT : {}] The soft configuration onis applied on neighbors {} ".format(
+                dut, neighbor_dict
+            )
+        )
+        return True