]> git.puffer.fish Git - matthieu/frr.git/commitdiff
pim6d: add fresh MLD implementation
authorDavid Lamparter <equinox@opensourcerouting.org>
Thu, 11 Nov 2021 19:41:23 +0000 (20:41 +0100)
committerDavid Lamparter <equinox@opensourcerouting.org>
Fri, 29 Apr 2022 18:00:15 +0000 (20:00 +0200)
Fresh ground-up MLD implementation with subscriber-tracking for MLDv2.
Intended to be adapted for IPv4 and replace the IGMP implementation at a
later point.

Tested in ANVL, currently at 94/116.  Some issues/TODOs are left in the
code as CPP_NOTICE markers, but the code is very much good enough to
proceed since otherwise we're blocked on overall PIM v6 progress.

Signed-off-by: David Lamparter <equinox@opensourcerouting.org>
pimd/pim6_cmd.c
pimd/pim6_main.c
pimd/pim6_mld.c [new file with mode: 0644]
pimd/pim6_mld.h [new file with mode: 0644]
pimd/pim6_mld_protocol.h [new file with mode: 0644]
pimd/pim_iface.c
pimd/pim_iface.h
pimd/pim_nb_config.c
pimd/subdir.am

index 289772260c5821da2dad11f8dcb4735edd200c25..3443cc3d8213e74eed65e6983cc03daa2f13ffe5 100644 (file)
@@ -691,7 +691,7 @@ DEFPY (interface_ipv6_mld_query_max_response_time,
        IPV6_STR
        IFACE_MLD_STR
        IFACE_MLD_QUERY_MAX_RESPONSE_TIME_STR
-       "Query response value in deci-seconds\n")
+       "Query response value in milliseconds\n")
 {
        return gm_process_query_max_response_time_cmd(vty, qmrt_str);
 }
index ed539246160533ccf21bceaabc201d9fcce4cd0c..e6319a9bdd7db3f8b29b12d09d3783e03e4f0b57 100644 (file)
@@ -133,6 +133,7 @@ FRR_DAEMON_INFO(pim6d, PIM6,
 );
 /* clang-format on */
 
+extern void gm_cli_init(void);
 
 int main(int argc, char **argv, char **envp)
 {
@@ -184,6 +185,8 @@ int main(int argc, char **argv, char **envp)
         */
        pim_iface_init();
 
+       gm_cli_init();
+
        pim_zebra_init();
 #if 0
        pim_bfd_init();
diff --git a/pimd/pim6_mld.c b/pimd/pim6_mld.c
new file mode 100644 (file)
index 0000000..5ba5fd3
--- /dev/null
@@ -0,0 +1,2359 @@
+/*
+ * PIMv6 MLD querier
+ * Copyright (C) 2021-2022  David Lamparter for NetDEF, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the Free
+ * Software Foundation; either version 2 of the License, or (at your option)
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; see the file COPYING; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/*
+ * keep pim6_mld.h open when working on this code.  Most data structures are
+ * commented in the header.
+ *
+ * IPv4 support is pre-planned but hasn't been tackled yet.  It is intended
+ * that this code will replace the old IGMP querier at some point.
+ */
+
+#include <zebra.h>
+#include <netinet/ip6.h>
+
+#include "lib/memory.h"
+#include "lib/jhash.h"
+#include "lib/prefix.h"
+#include "lib/checksum.h"
+#include "lib/thread.h"
+
+#include "pimd/pim6_mld.h"
+#include "pimd/pim6_mld_protocol.h"
+#include "pimd/pim_memory.h"
+#include "pimd/pim_instance.h"
+#include "pimd/pim_iface.h"
+#include "pimd/pim_util.h"
+#include "pimd/pim_tib.h"
+#include "pimd/pimd.h"
+
+#ifndef IPV6_MULTICAST_ALL
+#define IPV6_MULTICAST_ALL 29
+#endif
+
+DEFINE_MTYPE_STATIC(PIMD, GM_IFACE, "MLD interface");
+DEFINE_MTYPE_STATIC(PIMD, GM_PACKET, "MLD packet");
+DEFINE_MTYPE_STATIC(PIMD, GM_SUBSCRIBER, "MLD subscriber");
+DEFINE_MTYPE_STATIC(PIMD, GM_STATE, "MLD subscription state");
+DEFINE_MTYPE_STATIC(PIMD, GM_SG, "MLD (S,G)");
+DEFINE_MTYPE_STATIC(PIMD, GM_GRP_PENDING, "MLD group query state");
+DEFINE_MTYPE_STATIC(PIMD, GM_GSQ_PENDING, "MLD group/source query aggregate");
+
+static void gm_t_query(struct thread *t);
+static void gm_trigger_specific(struct gm_sg *sg);
+static void gm_sg_timer_start(struct gm_if *gm_ifp, struct gm_sg *sg,
+                             struct timeval expire_wait);
+
+/* shorthand for log messages */
+#define log_ifp(msg)                                                           \
+       "[MLD %s:%s] " msg, gm_ifp->ifp->vrf->name, gm_ifp->ifp->name
+#define log_pkt_src(msg)                                                       \
+       "[MLD %s:%s %pI6] " msg, gm_ifp->ifp->vrf->name, gm_ifp->ifp->name,    \
+               &pkt_src->sin6_addr
+#define log_sg(sg, msg)                                                        \
+       "[MLD %s:%s %pSG] " msg, sg->iface->ifp->vrf->name,                    \
+               sg->iface->ifp->name, &sg->sgaddr
+
+/* clang-format off */
+#if PIM_IPV == 6
+static const pim_addr gm_all_hosts = {
+       .s6_addr = {
+               0xff, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+               0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
+       },
+};
+static const pim_addr gm_all_routers = {
+       .s6_addr = {
+               0xff, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+               0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x16,
+       },
+};
+/* MLDv1 does not allow subscriber tracking due to report suppression
+ * hence, the source address is replaced with ffff:...:ffff
+ */
+static const pim_addr gm_dummy_untracked = {
+       .s6_addr = {
+               0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
+               0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
+       },
+};
+#else
+/* 224.0.0.1 */
+static const pim_addr gm_all_hosts = { .s_addr = htonl(0xe0000001), };
+/* 224.0.0.22 */
+static const pim_addr gm_all_routers = { .s_addr = htonl(0xe0000016), };
+static const pim_addr gm_dummy_untracked = { .s_addr = 0xffffffff, };
+#endif
+/* clang-format on */
+
+#define IPV6_MULTICAST_SCOPE_LINK 2
+
+static inline uint8_t in6_multicast_scope(const pim_addr *addr)
+{
+       return addr->s6_addr[1] & 0xf;
+}
+
+static inline bool in6_multicast_nofwd(const pim_addr *addr)
+{
+       return in6_multicast_scope(addr) <= IPV6_MULTICAST_SCOPE_LINK;
+}
+
+/*
+ * (S,G) -> subscriber,(S,G)
+ */
+
+static int gm_packet_sg_cmp(const struct gm_packet_sg *a,
+                           const struct gm_packet_sg *b)
+{
+       const struct gm_packet_state *s_a, *s_b;
+
+       s_a = gm_packet_sg2state(a);
+       s_b = gm_packet_sg2state(b);
+       return IPV6_ADDR_CMP(&s_a->subscriber->addr, &s_b->subscriber->addr);
+}
+
+DECLARE_RBTREE_UNIQ(gm_packet_sg_subs, struct gm_packet_sg, subs_itm,
+                   gm_packet_sg_cmp);
+
+static struct gm_packet_sg *gm_packet_sg_find(struct gm_sg *sg,
+                                             enum gm_sub_sense sense,
+                                             struct gm_subscriber *sub)
+{
+       struct {
+               struct gm_packet_state hdr;
+               struct gm_packet_sg item;
+       } ref = {
+               /* clang-format off */
+               .hdr = {
+                       .subscriber = sub,
+               },
+               .item = {
+                       .offset = 0,
+               },
+               /* clang-format on */
+       };
+
+       return gm_packet_sg_subs_find(&sg->subs[sense], &ref.item);
+}
+
+/*
+ * interface -> (*,G),pending
+ */
+
+static int gm_grp_pending_cmp(const struct gm_grp_pending *a,
+                             const struct gm_grp_pending *b)
+{
+       return IPV6_ADDR_CMP(&a->grp, &b->grp);
+}
+
+DECLARE_RBTREE_UNIQ(gm_grp_pends, struct gm_grp_pending, itm,
+                   gm_grp_pending_cmp);
+
+/*
+ * interface -> ([S1,S2,...],G),pending
+ */
+
+static int gm_gsq_pending_cmp(const struct gm_gsq_pending *a,
+                             const struct gm_gsq_pending *b)
+{
+       if (a->s_bit != b->s_bit)
+               return numcmp(a->s_bit, b->s_bit);
+
+       return IPV6_ADDR_CMP(&a->grp, &b->grp);
+}
+
+static uint32_t gm_gsq_pending_hash(const struct gm_gsq_pending *a)
+{
+       uint32_t seed = a->s_bit ? 0x68f0eb5e : 0x156b7f19;
+
+       return jhash(&a->grp, sizeof(a->grp), seed);
+}
+
+DECLARE_HASH(gm_gsq_pends, struct gm_gsq_pending, itm, gm_gsq_pending_cmp,
+            gm_gsq_pending_hash);
+
+/*
+ * interface -> (S,G)
+ */
+
+static int gm_sg_cmp(const struct gm_sg *a, const struct gm_sg *b)
+{
+       return pim_sgaddr_cmp(a->sgaddr, b->sgaddr);
+}
+
+DECLARE_RBTREE_UNIQ(gm_sgs, struct gm_sg, itm, gm_sg_cmp);
+
+static struct gm_sg *gm_sg_find(struct gm_if *gm_ifp, pim_addr grp,
+                               pim_addr src)
+{
+       struct gm_sg ref;
+
+       ref.sgaddr.grp = grp;
+       ref.sgaddr.src = src;
+       return gm_sgs_find(gm_ifp->sgs, &ref);
+}
+
+static struct gm_sg *gm_sg_make(struct gm_if *gm_ifp, pim_addr grp,
+                               pim_addr src)
+{
+       struct gm_sg *ret, *prev;
+
+       ret = XCALLOC(MTYPE_GM_SG, sizeof(*ret));
+       ret->sgaddr.grp = grp;
+       ret->sgaddr.src = src;
+       ret->iface = gm_ifp;
+       prev = gm_sgs_add(gm_ifp->sgs, ret);
+
+       if (prev) {
+               XFREE(MTYPE_GM_SG, ret);
+               ret = prev;
+       } else {
+               gm_packet_sg_subs_init(ret->subs_positive);
+               gm_packet_sg_subs_init(ret->subs_negative);
+       }
+       return ret;
+}
+
+/*
+ * interface -> packets, sorted by expiry (because add_tail insert order)
+ */
+
+DECLARE_DLIST(gm_packet_expires, struct gm_packet_state, exp_itm);
+
+/*
+ * subscriber -> packets
+ */
+
+DECLARE_DLIST(gm_packets, struct gm_packet_state, pkt_itm);
+
+/*
+ * interface -> subscriber
+ */
+
+static int gm_subscriber_cmp(const struct gm_subscriber *a,
+                            const struct gm_subscriber *b)
+{
+       return IPV6_ADDR_CMP(&a->addr, &b->addr);
+}
+
+static uint32_t gm_subscriber_hash(const struct gm_subscriber *a)
+{
+       return jhash(&a->addr, sizeof(a->addr), 0xd0e94ad4);
+}
+
+DECLARE_HASH(gm_subscribers, struct gm_subscriber, itm, gm_subscriber_cmp,
+            gm_subscriber_hash);
+
+static struct gm_subscriber *gm_subscriber_findref(struct gm_if *gm_ifp,
+                                                  pim_addr addr)
+{
+       struct gm_subscriber ref, *ret;
+
+       ref.addr = addr;
+       ret = gm_subscribers_find(gm_ifp->subscribers, &ref);
+       if (ret)
+               ret->refcount++;
+       return ret;
+}
+
+static struct gm_subscriber *gm_subscriber_get(struct gm_if *gm_ifp,
+                                              pim_addr addr)
+{
+       struct gm_subscriber ref, *ret;
+
+       ref.addr = addr;
+       ret = gm_subscribers_find(gm_ifp->subscribers, &ref);
+
+       if (!ret) {
+               ret = XCALLOC(MTYPE_GM_SUBSCRIBER, sizeof(*ret));
+               ret->iface = gm_ifp;
+               ret->addr = addr;
+               ret->refcount = 1;
+               gm_packets_init(ret->packets);
+
+               gm_subscribers_add(gm_ifp->subscribers, ret);
+       }
+       return ret;
+}
+
+static void gm_subscriber_drop(struct gm_subscriber **subp)
+{
+       struct gm_subscriber *sub = *subp;
+       struct gm_if *gm_ifp;
+
+       if (!sub)
+               return;
+       gm_ifp = sub->iface;
+
+       *subp = NULL;
+       sub->refcount--;
+
+       if (sub->refcount)
+               return;
+
+       gm_subscribers_del(gm_ifp->subscribers, sub);
+       XFREE(MTYPE_GM_SUBSCRIBER, sub);
+}
+
+/****************************************************************************/
+
+/* bundle query timer values for combined v1/v2 handling */
+struct gm_query_timers {
+       unsigned qrv;
+       unsigned max_resp_ms;
+       unsigned qqic_ms;
+
+       struct timeval fuzz;
+       struct timeval expire_wait;
+};
+
+static void gm_expiry_calc(struct gm_query_timers *timers)
+{
+       unsigned expire =
+               (timers->qrv - 1) * timers->qqic_ms + timers->max_resp_ms;
+       ldiv_t exp_div = ldiv(expire, 1000);
+
+       timers->expire_wait.tv_sec = exp_div.quot;
+       timers->expire_wait.tv_usec = exp_div.rem * 1000;
+       timeradd(&timers->expire_wait, &timers->fuzz, &timers->expire_wait);
+}
+
+static void gm_sg_free(struct gm_sg *sg)
+{
+       /* t_sg_expiry is handled before this is reached */
+       THREAD_OFF(sg->t_sg_query);
+       gm_packet_sg_subs_fini(sg->subs_negative);
+       gm_packet_sg_subs_fini(sg->subs_positive);
+       XFREE(MTYPE_GM_SG, sg);
+}
+
+/* clang-format off */
+static const char *const gm_states[] = {
+       [GM_SG_NOINFO]                  = "NOINFO",
+       [GM_SG_JOIN]                    = "JOIN",
+       [GM_SG_JOIN_EXPIRING]           = "JOIN_EXPIRING",
+       [GM_SG_PRUNE]                   = "PRUNE",
+       [GM_SG_NOPRUNE]                 = "NOPRUNE",
+       [GM_SG_NOPRUNE_EXPIRING]        = "NOPRUNE_EXPIRING",
+};
+/* clang-format on */
+
+CPP_NOTICE("TODO: S,G entries in EXCLUDE (i.e. prune) unsupported");
+/* tib_sg_gm_prune() below is an "un-join", it doesn't prune S,G when *,G is
+ * joined.  Whether we actually want/need to support this is a separate
+ * question - it is almost never used.  In fact this is exactly what RFC5790
+ * ("lightweight" MLDv2) does:  it removes S,G EXCLUDE support.
+ */
+
+static void gm_sg_update(struct gm_sg *sg, bool has_expired)
+{
+       struct gm_if *gm_ifp = sg->iface;
+       enum gm_sg_state prev, desired;
+       bool new_join;
+       struct gm_sg *grp = NULL;
+
+       if (!pim_addr_is_any(sg->sgaddr.src))
+               grp = gm_sg_find(gm_ifp, sg->sgaddr.grp, PIMADDR_ANY);
+       else
+               assert(sg->state != GM_SG_PRUNE);
+
+       if (gm_packet_sg_subs_count(sg->subs_positive)) {
+               desired = GM_SG_JOIN;
+               assert(!sg->t_sg_expire);
+       } else if ((sg->state == GM_SG_JOIN ||
+                   sg->state == GM_SG_JOIN_EXPIRING) &&
+                  !has_expired)
+               desired = GM_SG_JOIN_EXPIRING;
+       else if (!grp || !gm_packet_sg_subs_count(grp->subs_positive))
+               desired = GM_SG_NOINFO;
+       else if (gm_packet_sg_subs_count(grp->subs_positive) ==
+                gm_packet_sg_subs_count(sg->subs_negative)) {
+               if ((sg->state == GM_SG_NOPRUNE ||
+                    sg->state == GM_SG_NOPRUNE_EXPIRING) &&
+                   !has_expired)
+                       desired = GM_SG_NOPRUNE_EXPIRING;
+               else
+                       desired = GM_SG_PRUNE;
+       } else if (gm_packet_sg_subs_count(sg->subs_negative))
+               desired = GM_SG_NOPRUNE;
+       else
+               desired = GM_SG_NOINFO;
+
+       if (desired != sg->state && !gm_ifp->stopping) {
+               if (PIM_DEBUG_IGMP_EVENTS)
+                       zlog_debug(log_sg(sg, "%s => %s"), gm_states[sg->state],
+                                  gm_states[desired]);
+
+               if (desired == GM_SG_JOIN_EXPIRING ||
+                   desired == GM_SG_NOPRUNE_EXPIRING) {
+                       struct gm_query_timers timers;
+
+                       timers.qrv = gm_ifp->cur_qrv;
+                       timers.max_resp_ms = gm_ifp->cur_max_resp;
+                       timers.qqic_ms = gm_ifp->cur_query_intv_trig;
+                       timers.fuzz = gm_ifp->cfg_timing_fuzz;
+
+                       gm_expiry_calc(&timers);
+                       gm_sg_timer_start(gm_ifp, sg, timers.expire_wait);
+
+                       THREAD_OFF(sg->t_sg_query);
+                       sg->n_query = gm_ifp->cur_qrv;
+                       sg->query_sbit = false;
+                       gm_trigger_specific(sg);
+               }
+       }
+       prev = sg->state;
+       sg->state = desired;
+
+       if (in6_multicast_nofwd(&sg->sgaddr.grp) || gm_ifp->stopping)
+               new_join = false;
+       else
+               new_join = gm_sg_state_want_join(desired);
+
+       if (new_join && !sg->tib_joined) {
+               /* this will retry if join previously failed */
+               sg->tib_joined = tib_sg_gm_join(gm_ifp->pim, sg->sgaddr,
+                                               gm_ifp->ifp, &sg->oil);
+               if (!sg->tib_joined)
+                       zlog_warn(
+                               "MLD join for %pSG%%%s not propagated into TIB",
+                               &sg->sgaddr, gm_ifp->ifp->name);
+               else
+                       zlog_info(log_ifp("%pSG%%%s TIB joined"), &sg->sgaddr,
+                                 gm_ifp->ifp->name);
+
+       } else if (sg->tib_joined && !new_join) {
+               tib_sg_gm_prune(gm_ifp->pim, sg->sgaddr, gm_ifp->ifp, &sg->oil);
+
+               sg->oil = NULL;
+               sg->tib_joined = false;
+       }
+
+       if (desired == GM_SG_NOINFO) {
+               assertf((!sg->t_sg_expire &&
+                        !gm_packet_sg_subs_count(sg->subs_positive) &&
+                        !gm_packet_sg_subs_count(sg->subs_negative)),
+                       "%pSG%%%s hx=%u exp=%pTHD state=%s->%s pos=%zu neg=%zu grp=%p",
+                       &sg->sgaddr, gm_ifp->ifp->name, has_expired,
+                       sg->t_sg_expire, gm_states[prev], gm_states[desired],
+                       gm_packet_sg_subs_count(sg->subs_positive),
+                       gm_packet_sg_subs_count(sg->subs_negative), grp);
+
+               if (PIM_DEBUG_IGMP_TRACE)
+                       zlog_debug(log_sg(sg, "dropping"));
+
+               gm_sgs_del(gm_ifp->sgs, sg);
+               gm_sg_free(sg);
+       }
+}
+
+/****************************************************************************/
+
+/* the following bunch of functions deals with transferring state from
+ * received packets into gm_packet_state.  As a reminder, the querier is
+ * structured to keep all items received in one packet together, since they
+ * will share expiry timers and thus allows efficient handling.
+ */
+
+static void gm_packet_free(struct gm_packet_state *pkt)
+{
+       gm_packet_expires_del(pkt->iface->expires, pkt);
+       gm_packets_del(pkt->subscriber->packets, pkt);
+       gm_subscriber_drop(&pkt->subscriber);
+       XFREE(MTYPE_GM_STATE, pkt);
+}
+
+static struct gm_packet_sg *gm_packet_sg_setup(struct gm_packet_state *pkt,
+                                              struct gm_sg *sg, bool is_excl,
+                                              bool is_src)
+{
+       struct gm_packet_sg *item;
+
+       assert(pkt->n_active < pkt->n_sg);
+
+       item = &pkt->items[pkt->n_active];
+       item->sg = sg;
+       item->is_excl = is_excl;
+       item->is_src = is_src;
+       item->offset = pkt->n_active;
+
+       pkt->n_active++;
+       return item;
+}
+
+static bool gm_packet_sg_drop(struct gm_packet_sg *item)
+{
+       struct gm_packet_state *pkt;
+       size_t i;
+
+       assert(item->sg);
+
+       pkt = gm_packet_sg2state(item);
+       if (item->sg->most_recent == item)
+               item->sg->most_recent = NULL;
+
+       for (i = 0; i < item->n_exclude; i++) {
+               struct gm_packet_sg *excl_item;
+
+               excl_item = item + 1 + i;
+               if (!excl_item->sg)
+                       continue;
+
+               gm_packet_sg_subs_del(excl_item->sg->subs_negative, excl_item);
+               excl_item->sg = NULL;
+               pkt->n_active--;
+
+               assert(pkt->n_active > 0);
+       }
+
+       if (item->is_excl && item->is_src)
+               gm_packet_sg_subs_del(item->sg->subs_negative, item);
+       else
+               gm_packet_sg_subs_del(item->sg->subs_positive, item);
+       item->sg = NULL;
+       pkt->n_active--;
+
+       if (!pkt->n_active) {
+               gm_packet_free(pkt);
+               return true;
+       }
+       return false;
+}
+
+static void gm_packet_drop(struct gm_packet_state *pkt, bool trace)
+{
+       for (size_t i = 0; i < pkt->n_sg; i++) {
+               struct gm_sg *sg = pkt->items[i].sg;
+               bool deleted;
+
+               if (!sg)
+                       continue;
+
+               if (trace && PIM_DEBUG_IGMP_TRACE)
+                       zlog_debug(log_sg(sg, "general-dropping from %pPA"),
+                                  &pkt->subscriber->addr);
+               deleted = gm_packet_sg_drop(&pkt->items[i]);
+
+               gm_sg_update(sg, true);
+               if (deleted)
+                       break;
+       }
+}
+
+static void gm_packet_sg_remove_sources(struct gm_if *gm_ifp,
+                                       struct gm_subscriber *subscriber,
+                                       pim_addr grp, pim_addr *srcs,
+                                       size_t n_src, enum gm_sub_sense sense)
+{
+       struct gm_sg *sg;
+       struct gm_packet_sg *old_src;
+       size_t i;
+
+       for (i = 0; i < n_src; i++) {
+               sg = gm_sg_find(gm_ifp, grp, srcs[i]);
+               if (!sg)
+                       continue;
+
+               old_src = gm_packet_sg_find(sg, sense, subscriber);
+               if (!old_src)
+                       continue;
+
+               gm_packet_sg_drop(old_src);
+               gm_sg_update(sg, false);
+       }
+}
+
+static void gm_sg_expiry_cancel(struct gm_sg *sg)
+{
+       if (sg->t_sg_expire && PIM_DEBUG_IGMP_TRACE)
+               zlog_debug(log_sg(sg, "alive, cancelling expiry timer"));
+       THREAD_OFF(sg->t_sg_expire);
+       sg->query_sbit = true;
+}
+
+/* first pass: process all changes resulting in removal of state:
+ *  - {TO,IS}_INCLUDE removes *,G EXCLUDE state (and S,G)
+ *  - ALLOW_NEW_SOURCES, if *,G in EXCLUDE removes S,G state
+ *  - BLOCK_OLD_SOURCES, if *,G in INCLUDE removes S,G state
+ *  - {TO,IS}_EXCLUDE,   if *,G in INCLUDE removes S,G state
+ * note *replacing* state is NOT considered *removing* state here
+ *
+ * everything else is thrown into pkt for creation of state in pass 2
+ */
+static void gm_handle_v2_pass1(struct gm_packet_state *pkt,
+                              struct mld_v2_rec_hdr *rechdr)
+{
+       /* NB: pkt->subscriber can be NULL here if the subscriber was not
+        * previously seen!
+        */
+       struct gm_subscriber *subscriber = pkt->subscriber;
+       struct gm_sg *grp;
+       struct gm_packet_sg *old_grp = NULL;
+       struct gm_packet_sg *item;
+       size_t n_src = ntohs(rechdr->n_src);
+       size_t j;
+       bool is_excl = false;
+
+       grp = gm_sg_find(pkt->iface, rechdr->grp, PIMADDR_ANY);
+       if (grp && subscriber)
+               old_grp = gm_packet_sg_find(grp, GM_SUB_POS, subscriber);
+
+       assert(old_grp == NULL || old_grp->is_excl);
+
+       switch (rechdr->type) {
+       case MLD_RECTYPE_IS_EXCLUDE:
+       case MLD_RECTYPE_CHANGE_TO_EXCLUDE:
+               /* this always replaces or creates state */
+               is_excl = true;
+               if (!grp)
+                       grp = gm_sg_make(pkt->iface, rechdr->grp, PIMADDR_ANY);
+
+               item = gm_packet_sg_setup(pkt, grp, is_excl, false);
+               item->n_exclude = n_src;
+
+               /* [EXCL_INCL_SG_NOTE] referenced below
+                *
+                * in theory, we should drop any S,G that the host may have
+                * previously added in INCLUDE mode.  In practice, this is both
+                * incredibly rare and entirely irrelevant.  It only makes any
+                * difference if an S,G that the host previously had on the
+                * INCLUDE list is now on the blocked list for EXCLUDE, which
+                * we can cover in processing the S,G list in pass2_excl().
+                *
+                * Other S,G from the host are simply left to expire
+                * "naturally" through general expiry.
+                */
+               break;
+
+       case MLD_RECTYPE_IS_INCLUDE:
+       case MLD_RECTYPE_CHANGE_TO_INCLUDE:
+               if (old_grp) {
+                       /* INCLUDE has no *,G state, so old_grp here refers to
+                        * previous EXCLUDE => delete it
+                        */
+                       gm_packet_sg_drop(old_grp);
+                       gm_sg_update(grp, false);
+                       CPP_NOTICE("need S,G PRUNE => NO_INFO transition here");
+               }
+               break;
+
+       case MLD_RECTYPE_ALLOW_NEW_SOURCES:
+               if (old_grp) {
+                       /* remove S,Gs from EXCLUDE, and then we're done */
+                       gm_packet_sg_remove_sources(pkt->iface, subscriber,
+                                                   rechdr->grp, rechdr->srcs,
+                                                   n_src, GM_SUB_NEG);
+                       return;
+               }
+               /* in INCLUDE mode => ALLOW_NEW_SOURCES is functionally
+                * idential to IS_INCLUDE (because the list of sources in
+                * IS_INCLUDE is not exhaustive)
+                */
+               break;
+
+       case MLD_RECTYPE_BLOCK_OLD_SOURCES:
+               if (old_grp) {
+                       /* this is intentionally not implemented because it
+                        * would be complicated as hell.  we only take the list
+                        * of blocked sources from full group state records
+                        */
+                       return;
+               }
+
+               if (subscriber)
+                       gm_packet_sg_remove_sources(pkt->iface, subscriber,
+                                                   rechdr->grp, rechdr->srcs,
+                                                   n_src, GM_SUB_POS);
+               return;
+       }
+
+       for (j = 0; j < n_src; j++) {
+               struct gm_sg *sg;
+
+               sg = gm_sg_find(pkt->iface, rechdr->grp, rechdr->srcs[j]);
+               if (!sg)
+                       sg = gm_sg_make(pkt->iface, rechdr->grp,
+                                       rechdr->srcs[j]);
+
+               gm_packet_sg_setup(pkt, sg, is_excl, true);
+       }
+}
+
+/* second pass: creating/updating/refreshing state.  All the items from the
+ * received packet have already been thrown into gm_packet_state.
+ */
+
+static void gm_handle_v2_pass2_incl(struct gm_packet_state *pkt, size_t i)
+{
+       struct gm_packet_sg *item = &pkt->items[i];
+       struct gm_packet_sg *old = NULL;
+       struct gm_sg *sg = item->sg;
+
+       /* EXCLUDE state was already dropped in pass1 */
+       assert(!gm_packet_sg_find(sg, GM_SUB_NEG, pkt->subscriber));
+
+       old = gm_packet_sg_find(sg, GM_SUB_POS, pkt->subscriber);
+       if (old)
+               gm_packet_sg_drop(old);
+
+       pkt->n_active++;
+       gm_packet_sg_subs_add(sg->subs_positive, item);
+
+       sg->most_recent = item;
+       gm_sg_expiry_cancel(sg);
+       gm_sg_update(sg, false);
+}
+
+static void gm_handle_v2_pass2_excl(struct gm_packet_state *pkt, size_t offs)
+{
+       struct gm_packet_sg *item = &pkt->items[offs];
+       struct gm_packet_sg *old_grp, *item_dup;
+       struct gm_sg *sg_grp = item->sg;
+       size_t i;
+
+       old_grp = gm_packet_sg_find(sg_grp, GM_SUB_POS, pkt->subscriber);
+       if (old_grp) {
+               for (i = 0; i < item->n_exclude; i++) {
+                       struct gm_packet_sg *item_src, *old_src;
+
+                       item_src = &pkt->items[offs + 1 + i];
+                       old_src = gm_packet_sg_find(item_src->sg, GM_SUB_NEG,
+                                                   pkt->subscriber);
+                       if (old_src)
+                               gm_packet_sg_drop(old_src);
+
+                       /* See [EXCL_INCL_SG_NOTE] above - we can have old S,G
+                        * items left over if the host previously had INCLUDE
+                        * mode going.  Remove them here if we find any.
+                        */
+                       old_src = gm_packet_sg_find(item_src->sg, GM_SUB_POS,
+                                                   pkt->subscriber);
+                       if (old_src)
+                               gm_packet_sg_drop(old_src);
+               }
+
+               /* the previous loop has removed the S,G entries which are
+                * still excluded after this update.  So anything left on the
+                * old item was previously excluded but is now included
+                * => need to trigger update on S,G
+                */
+               for (i = 0; i < old_grp->n_exclude; i++) {
+                       struct gm_packet_sg *old_src;
+                       struct gm_sg *old_sg_src;
+
+                       old_src = old_grp + 1 + i;
+                       old_sg_src = old_src->sg;
+                       if (!old_sg_src)
+                               continue;
+
+                       gm_packet_sg_drop(old_src);
+                       gm_sg_update(old_sg_src, false);
+               }
+
+               gm_packet_sg_drop(old_grp);
+       }
+
+       item_dup = gm_packet_sg_subs_add(sg_grp->subs_positive, item);
+       assert(!item_dup);
+       pkt->n_active++;
+
+       sg_grp->most_recent = item;
+       gm_sg_expiry_cancel(sg_grp);
+
+       for (i = 0; i < item->n_exclude; i++) {
+               struct gm_packet_sg *item_src;
+
+               item_src = &pkt->items[offs + 1 + i];
+               item_dup = gm_packet_sg_subs_add(item_src->sg->subs_negative,
+                                                item_src);
+
+               if (item_dup)
+                       item_src->sg = NULL;
+               else {
+                       pkt->n_active++;
+                       gm_sg_update(item_src->sg, false);
+               }
+       }
+
+       /* TODO: determine best ordering between gm_sg_update(S,G) and (*,G)
+        * to get lower PIM churn/flapping
+        */
+       gm_sg_update(sg_grp, false);
+}
+
+CPP_NOTICE("TODO: QRV/QQIC are not copied from queries to local state");
+/* on receiving a query, we need to update our robustness/query interval to
+ * match, so we correctly process group/source specific queries after last
+ * member leaves
+ */
+
+static void gm_handle_v2_report(struct gm_if *gm_ifp,
+                               const struct sockaddr_in6 *pkt_src, char *data,
+                               size_t len)
+{
+       struct mld_v2_report_hdr *hdr;
+       size_t i, n_records, max_entries;
+       struct gm_packet_state *pkt;
+
+       if (len < sizeof(*hdr)) {
+               if (PIM_DEBUG_IGMP_PACKETS)
+                       zlog_debug(log_pkt_src(
+                               "malformed MLDv2 report (truncated header)"));
+               return;
+       }
+
+       hdr = (struct mld_v2_report_hdr *)data;
+       data += sizeof(*hdr);
+       len -= sizeof(*hdr);
+
+       /* can't have more *,G and S,G items than there is space for ipv6
+        * addresses, so just use this to allocate temporary buffer
+        */
+       max_entries = len / sizeof(pim_addr);
+       pkt = XCALLOC(MTYPE_GM_STATE,
+                     offsetof(struct gm_packet_state, items[max_entries]));
+       pkt->n_sg = max_entries;
+       pkt->iface = gm_ifp;
+       pkt->subscriber = gm_subscriber_findref(gm_ifp, pkt_src->sin6_addr);
+
+       n_records = ntohs(hdr->n_records);
+
+       /* validate & remove state in v2_pass1() */
+       for (i = 0; i < n_records; i++) {
+               struct mld_v2_rec_hdr *rechdr;
+               size_t n_src, record_size;
+
+               if (len < sizeof(*rechdr)) {
+                       zlog_warn(log_pkt_src(
+                               "malformed MLDv2 report (truncated record header)"));
+                       break;
+               }
+
+               rechdr = (struct mld_v2_rec_hdr *)data;
+               data += sizeof(*rechdr);
+               len -= sizeof(*rechdr);
+
+               n_src = ntohs(rechdr->n_src);
+               record_size = n_src * sizeof(pim_addr) + rechdr->aux_len * 4;
+
+               if (len < record_size) {
+                       zlog_warn(log_pkt_src(
+                               "malformed MLDv2 report (truncated source list)"));
+                       break;
+               }
+               if (!IN6_IS_ADDR_MULTICAST(&rechdr->grp)) {
+                       zlog_warn(
+                               log_pkt_src(
+                                       "malformed MLDv2 report (invalid group %pI6)"),
+                               &rechdr->grp);
+                       break;
+               }
+
+               data += record_size;
+               len -= record_size;
+
+               gm_handle_v2_pass1(pkt, rechdr);
+       }
+
+       if (!pkt->n_active) {
+               gm_subscriber_drop(&pkt->subscriber);
+               XFREE(MTYPE_GM_STATE, pkt);
+               return;
+       }
+
+       pkt = XREALLOC(MTYPE_GM_STATE, pkt,
+                      offsetof(struct gm_packet_state, items[pkt->n_active]));
+       pkt->n_sg = pkt->n_active;
+       pkt->n_active = 0;
+
+       monotime(&pkt->received);
+       if (!pkt->subscriber)
+               pkt->subscriber = gm_subscriber_get(gm_ifp, pkt_src->sin6_addr);
+       gm_packets_add_tail(pkt->subscriber->packets, pkt);
+       gm_packet_expires_add_tail(gm_ifp->expires, pkt);
+
+       for (i = 0; i < pkt->n_sg; i++)
+               if (!pkt->items[i].is_excl)
+                       gm_handle_v2_pass2_incl(pkt, i);
+               else {
+                       gm_handle_v2_pass2_excl(pkt, i);
+                       i += pkt->items[i].n_exclude;
+               }
+
+       if (pkt->n_active == 0)
+               gm_packet_free(pkt);
+}
+
+static void gm_handle_v1_report(struct gm_if *gm_ifp,
+                               const struct sockaddr_in6 *pkt_src, char *data,
+                               size_t len)
+{
+       struct mld_v1_pkt *hdr;
+       struct gm_packet_state *pkt;
+       struct gm_sg *grp;
+       struct gm_packet_sg *item;
+       size_t max_entries;
+
+       if (len < sizeof(*hdr)) {
+               if (PIM_DEBUG_IGMP_PACKETS)
+                       zlog_debug(log_pkt_src("malformed MLDv1 report (truncated)"));
+               return;
+       }
+
+       hdr = (struct mld_v1_pkt *)data;
+
+       max_entries = 1;
+       pkt = XCALLOC(MTYPE_GM_STATE,
+                     offsetof(struct gm_packet_state, items[max_entries]));
+       pkt->n_sg = max_entries;
+       pkt->iface = gm_ifp;
+       pkt->subscriber = gm_subscriber_findref(gm_ifp, gm_dummy_untracked);
+
+       /* { equivalent of gm_handle_v2_pass1() with IS_EXCLUDE */
+
+       grp = gm_sg_find(pkt->iface, hdr->grp, PIMADDR_ANY);
+       if (!grp)
+               grp = gm_sg_make(pkt->iface, hdr->grp, PIMADDR_ANY);
+
+       item = gm_packet_sg_setup(pkt, grp, true, false);
+       item->n_exclude = 0;
+       CPP_NOTICE("set v1-seen timer on grp here");
+
+       /* } */
+
+       /* pass2 will count n_active back up to 1.  Also since a v1 report
+        * has exactly 1 group, we can skip the realloc() that v2 needs here.
+        */
+       assert(pkt->n_active == 1);
+       pkt->n_sg = pkt->n_active;
+       pkt->n_active = 0;
+
+       monotime(&pkt->received);
+       if (!pkt->subscriber)
+               pkt->subscriber = gm_subscriber_get(gm_ifp, gm_dummy_untracked);
+       gm_packets_add_tail(pkt->subscriber->packets, pkt);
+       gm_packet_expires_add_tail(gm_ifp->expires, pkt);
+
+       /* pass2 covers installing state & removing old state;  all the v1
+        * compat is handled at this point.
+        *
+        * Note that "old state" may be v2;  subscribers will switch from v2
+        * reports to v1 reports when the querier changes from v2 to v1.  So,
+        * limiting this to v1 would be wrong.
+        */
+       gm_handle_v2_pass2_excl(pkt, 0);
+
+       if (pkt->n_active == 0)
+               gm_packet_free(pkt);
+}
+
+static void gm_handle_v1_leave(struct gm_if *gm_ifp,
+                              const struct sockaddr_in6 *pkt_src, char *data,
+                              size_t len)
+{
+       struct mld_v1_pkt *hdr;
+       struct gm_subscriber *subscriber;
+       struct gm_sg *grp;
+       struct gm_packet_sg *old_grp;
+
+       if (len < sizeof(*hdr)) {
+               if (PIM_DEBUG_IGMP_PACKETS)
+                       zlog_debug(log_pkt_src("malformed MLDv1 leave (truncated)"));
+               return;
+       }
+
+       hdr = (struct mld_v1_pkt *)data;
+
+       subscriber = gm_subscriber_findref(gm_ifp, gm_dummy_untracked);
+       if (!subscriber)
+               return;
+
+       /* { equivalent of gm_handle_v2_pass1() with IS_INCLUDE */
+
+       grp = gm_sg_find(gm_ifp, hdr->grp, PIMADDR_ANY);
+       if (grp) {
+               old_grp = gm_packet_sg_find(grp, GM_SUB_POS, subscriber);
+               if (old_grp) {
+                       gm_packet_sg_drop(old_grp);
+                       gm_sg_update(grp, false);
+                       CPP_NOTICE("need S,G PRUNE => NO_INFO transition here");
+               }
+       }
+
+       /* } */
+
+       /* nothing more to do here, pass2 is no-op for leaves */
+       gm_subscriber_drop(&subscriber);
+}
+
+/* for each general query received (or sent), a timer is started to expire
+ * _everything_ at the appropriate time (including robustness multiplier).
+ *
+ * So when this timer hits, all packets - with all of their items - that were
+ * received *before* the query are aged out, and state updated accordingly.
+ * Note that when we receive a refresh/update, the previous/old packet is
+ * already dropped and replaced with a new one, so in normal steady-state
+ * operation, this timer won't be doing anything.
+ *
+ * Additionally, if a subscriber actively leaves a group, that goes through
+ * its own path too and won't hit this.  This is really only triggered when a
+ * host straight up disappears.
+ */
+static void gm_t_expire(struct thread *t)
+{
+       struct gm_if *gm_ifp = THREAD_ARG(t);
+       struct gm_packet_state *pkt;
+
+       zlog_info(log_ifp("general expiry timer"));
+
+       while (gm_ifp->n_pending) {
+               struct gm_general_pending *pend = gm_ifp->pending;
+               struct timeval remain;
+               int64_t remain_ms;
+
+               remain_ms = monotime_until(&pend->expiry, &remain);
+               if (remain_ms > 0) {
+                       if (PIM_DEBUG_IGMP_EVENTS)
+                               zlog_debug(
+                                       log_ifp("next general expiry in %" PRId64 "ms"),
+                                       remain_ms / 1000);
+
+                       thread_add_timer_tv(router->master, gm_t_expire, gm_ifp,
+                                           &remain, &gm_ifp->t_expire);
+                       return;
+               }
+
+               while ((pkt = gm_packet_expires_first(gm_ifp->expires))) {
+                       if (timercmp(&pkt->received, &pend->query, >=))
+                               break;
+
+                       if (PIM_DEBUG_IGMP_PACKETS)
+                               zlog_debug(log_ifp("expire packet %p"), pkt);
+                       gm_packet_drop(pkt, true);
+               }
+
+               gm_ifp->n_pending--;
+               memmove(gm_ifp->pending, gm_ifp->pending + 1,
+                       gm_ifp->n_pending * sizeof(gm_ifp->pending[0]));
+       }
+
+       if (PIM_DEBUG_IGMP_EVENTS)
+               zlog_debug(log_ifp("next general expiry waiting for query"));
+}
+
+/* NB: the receive handlers will also run when sending packets, since we
+ * receive our own packets back in.
+ */
+static void gm_handle_q_general(struct gm_if *gm_ifp,
+                               struct gm_query_timers *timers)
+{
+       struct timeval now, expiry;
+       struct gm_general_pending *pend;
+
+       monotime(&now);
+       timeradd(&now, &timers->expire_wait, &expiry);
+
+       while (gm_ifp->n_pending) {
+               pend = &gm_ifp->pending[gm_ifp->n_pending - 1];
+
+               if (timercmp(&pend->expiry, &expiry, <))
+                       break;
+
+               /* if we end up here, the last item in pending[] has an expiry
+                * later than the expiry for this query.  But our query time
+                * (now) is later than that of the item (because, well, that's
+                * how time works.)  This makes this query meaningless since
+                * it's "supersetted" within the preexisting query
+                */
+
+               if (PIM_DEBUG_IGMP_TRACE_DETAIL)
+                       zlog_debug(log_ifp("zapping supersetted general timer %pTVMu"),
+                                  &pend->expiry);
+
+               gm_ifp->n_pending--;
+               if (!gm_ifp->n_pending)
+                       THREAD_OFF(gm_ifp->t_expire);
+       }
+
+       /* people might be messing with their configs or something */
+       if (gm_ifp->n_pending == array_size(gm_ifp->pending))
+               return;
+
+       pend = &gm_ifp->pending[gm_ifp->n_pending];
+       pend->query = now;
+       pend->expiry = expiry;
+
+       if (!gm_ifp->n_pending++) {
+               if (PIM_DEBUG_IGMP_TRACE)
+                       zlog_debug(log_ifp("starting general timer @ 0: %pTVMu"),
+                                  &pend->expiry);
+               thread_add_timer_tv(router->master, gm_t_expire, gm_ifp,
+                                   &timers->expire_wait, &gm_ifp->t_expire);
+       } else
+               if (PIM_DEBUG_IGMP_TRACE)
+                       zlog_debug(log_ifp("appending general timer @ %u: %pTVMu"),
+                                  gm_ifp->n_pending, &pend->expiry);
+}
+
+static void gm_t_sg_expire(struct thread *t)
+{
+       struct gm_sg *sg = THREAD_ARG(t);
+       struct gm_if *gm_ifp = sg->iface;
+       struct gm_packet_sg *item;
+
+       assertf(sg->state == GM_SG_JOIN_EXPIRING ||
+                       sg->state == GM_SG_NOPRUNE_EXPIRING,
+               "%pSG%%%s %pTHD", &sg->sgaddr, gm_ifp->ifp->name, t);
+
+       frr_each_safe (gm_packet_sg_subs, sg->subs_positive, item)
+               /* this will also drop EXCLUDE mode S,G lists together with
+                * the *,G entry
+                */
+               gm_packet_sg_drop(item);
+
+       /* subs_negative items are only timed out together with the *,G entry
+        * since we won't get any reports for a group-and-source query
+        */
+       gm_sg_update(sg, true);
+}
+
+static bool gm_sg_check_recent(struct gm_if *gm_ifp, struct gm_sg *sg,
+                              struct timeval ref)
+{
+       struct gm_packet_state *pkt;
+
+       if (!sg->most_recent) {
+               struct gm_packet_state *best_pkt = NULL;
+               struct gm_packet_sg *item;
+
+               frr_each (gm_packet_sg_subs, sg->subs_positive, item) {
+                       pkt = gm_packet_sg2state(item);
+
+                       if (!best_pkt ||
+                           timercmp(&pkt->received, &best_pkt->received, >)) {
+                               best_pkt = pkt;
+                               sg->most_recent = item;
+                       }
+               }
+       }
+       if (sg->most_recent) {
+               struct timeval fuzz;
+
+               pkt = gm_packet_sg2state(sg->most_recent);
+
+               /* this shouldn't happen on plain old real ethernet segment,
+                * but on something like a VXLAN or VPLS it is very possible
+                * that we get a report before the query that triggered it.
+                * (imagine a triangle scenario with 3 datacenters, it's very
+                * possible A->B + B->C is faster than A->C due to odd routing)
+                *
+                * This makes a little tolerance allowance to handle that case.
+                */
+               timeradd(&pkt->received, &gm_ifp->cfg_timing_fuzz, &fuzz);
+
+               if (timercmp(&fuzz, &ref, >))
+                       return true;
+       }
+       return false;
+}
+
+static void gm_sg_timer_start(struct gm_if *gm_ifp, struct gm_sg *sg,
+                             struct timeval expire_wait)
+{
+       struct timeval now;
+
+       if (!sg)
+               return;
+       if (sg->state == GM_SG_PRUNE)
+               return;
+
+       monotime(&now);
+       if (gm_sg_check_recent(gm_ifp, sg, now))
+               return;
+
+       if (PIM_DEBUG_IGMP_TRACE)
+               zlog_debug(log_sg(sg, "expiring in %pTVI"), &expire_wait);
+
+       if (sg->t_sg_expire) {
+               struct timeval remain;
+
+               remain = thread_timer_remain(sg->t_sg_expire);
+               if (timercmp(&remain, &expire_wait, <=))
+                       return;
+
+               THREAD_OFF(sg->t_sg_expire);
+       }
+
+       thread_add_timer_tv(router->master, gm_t_sg_expire, sg, &expire_wait,
+                           &sg->t_sg_expire);
+}
+
+static void gm_handle_q_groupsrc(struct gm_if *gm_ifp,
+                                struct gm_query_timers *timers, pim_addr grp,
+                                const pim_addr *srcs, size_t n_src)
+{
+       struct gm_sg *sg;
+       size_t i;
+
+       for (i = 0; i < n_src; i++) {
+               sg = gm_sg_find(gm_ifp, grp, srcs[i]);
+               gm_sg_timer_start(gm_ifp, sg, timers->expire_wait);
+       }
+}
+
+static void gm_t_grp_expire(struct thread *t)
+{
+       /* if we're here, that means when we received the group-specific query
+        * there was one or more active S,G for this group.  For *,G the timer
+        * in sg->t_sg_expire is running separately and gets cancelled when we
+        * receive a report, so that work is left to gm_t_sg_expire and we
+        * shouldn't worry about it here.
+        */
+       struct gm_grp_pending *pend = THREAD_ARG(t);
+       struct gm_if *gm_ifp = pend->iface;
+       struct gm_sg *sg, *sg_start, sg_ref;
+
+       if (PIM_DEBUG_IGMP_EVENTS)
+               zlog_debug(log_ifp("*,%pPAs S,G timer expired"), &pend->grp);
+
+       /* gteq lookup - try to find *,G or S,G  (S,G is > *,G)
+        * could technically be gt to skip a possible *,G
+        */
+       sg_ref.sgaddr.grp = pend->grp;
+       sg_ref.sgaddr.src = PIMADDR_ANY;
+       sg_start = gm_sgs_find_gteq(gm_ifp->sgs, &sg_ref);
+
+       frr_each_from (gm_sgs, gm_ifp->sgs, sg, sg_start) {
+               struct gm_packet_sg *item;
+
+               if (pim_addr_cmp(sg->sgaddr.grp, pend->grp))
+                       break;
+               if (pim_addr_is_any(sg->sgaddr.src))
+                       /* handled by gm_t_sg_expire / sg->t_sg_expire */
+                       continue;
+               if (gm_sg_check_recent(gm_ifp, sg, pend->query))
+                       continue;
+
+               /* we may also have a group-source-specific query going on in
+                * parallel.  But if we received nothing for the *,G query,
+                * the S,G query is kinda irrelevant.
+                */
+               THREAD_OFF(sg->t_sg_expire);
+
+               frr_each_safe (gm_packet_sg_subs, sg->subs_positive, item)
+                       /* this will also drop the EXCLUDE S,G lists */
+                       gm_packet_sg_drop(item);
+
+               gm_sg_update(sg, true);
+       }
+
+       gm_grp_pends_del(gm_ifp->grp_pends, pend);
+       XFREE(MTYPE_GM_GRP_PENDING, pend);
+}
+
+static void gm_handle_q_group(struct gm_if *gm_ifp,
+                             struct gm_query_timers *timers, pim_addr grp)
+{
+       struct gm_sg *sg, sg_ref;
+       struct gm_grp_pending *pend, pend_ref;
+
+       sg_ref.sgaddr.grp = grp;
+       sg_ref.sgaddr.src = PIMADDR_ANY;
+       /* gteq lookup - try to find *,G or S,G  (S,G is > *,G) */
+       sg = gm_sgs_find_gteq(gm_ifp->sgs, &sg_ref);
+
+       if (!sg || pim_addr_cmp(sg->sgaddr.grp, grp))
+               /* we have nothing at all for this group - don't waste RAM */
+               return;
+
+       if (pim_addr_is_any(sg->sgaddr.src)) {
+               /* actually found *,G entry here */
+               if (PIM_DEBUG_IGMP_TRACE)
+                       zlog_debug(log_ifp("*,%pPAs expiry timer starting"),
+                                  &grp);
+               gm_sg_timer_start(gm_ifp, sg, timers->expire_wait);
+
+               sg = gm_sgs_next(gm_ifp->sgs, sg);
+               if (!sg || pim_addr_cmp(sg->sgaddr.grp, grp))
+                       /* no S,G for this group */
+                       return;
+       }
+
+       pend_ref.grp = grp;
+       pend = gm_grp_pends_find(gm_ifp->grp_pends, &pend_ref);
+
+       if (pend) {
+               struct timeval remain;
+
+               remain = thread_timer_remain(pend->t_expire);
+               if (timercmp(&remain, &timers->expire_wait, <=))
+                       return;
+
+               THREAD_OFF(pend->t_expire);
+       } else {
+               pend = XCALLOC(MTYPE_GM_GRP_PENDING, sizeof(*pend));
+               pend->grp = grp;
+               pend->iface = gm_ifp;
+               gm_grp_pends_add(gm_ifp->grp_pends, pend);
+       }
+
+       monotime(&pend->query);
+       thread_add_timer_tv(router->master, gm_t_grp_expire, pend,
+                           &timers->expire_wait, &pend->t_expire);
+
+       if (PIM_DEBUG_IGMP_TRACE)
+               zlog_debug(log_ifp("*,%pPAs S,G timer started: %pTHD"), &grp,
+                          pend->t_expire);
+}
+
+static void gm_bump_querier(struct gm_if *gm_ifp)
+{
+       struct pim_interface *pim_ifp = gm_ifp->ifp->info;
+
+       THREAD_OFF(gm_ifp->t_query);
+
+       if (pim_addr_is_any(pim_ifp->ll_lowest))
+               return;
+       if (!IPV6_ADDR_SAME(&gm_ifp->querier, &pim_ifp->ll_lowest))
+               return;
+
+       gm_ifp->n_startup = gm_ifp->cur_qrv;
+
+       thread_execute(router->master, gm_t_query, gm_ifp, 0);
+}
+
+static void gm_t_other_querier(struct thread *t)
+{
+       struct gm_if *gm_ifp = THREAD_ARG(t);
+       struct pim_interface *pim_ifp = gm_ifp->ifp->info;
+
+       zlog_info(log_ifp("other querier timer expired"));
+
+       gm_ifp->querier = pim_ifp->ll_lowest;
+       gm_ifp->n_startup = gm_ifp->cur_qrv;
+
+       thread_execute(router->master, gm_t_query, gm_ifp, 0);
+}
+
+static void gm_handle_query(struct gm_if *gm_ifp,
+                           const struct sockaddr_in6 *pkt_src,
+                           pim_addr *pkt_dst, char *data, size_t len)
+{
+       struct mld_v2_query_hdr *hdr;
+       struct pim_interface *pim_ifp = gm_ifp->ifp->info;
+       struct gm_query_timers timers;
+       bool general_query;
+
+       if (len < sizeof(struct mld_v2_query_hdr) &&
+           len != sizeof(struct mld_v1_pkt)) {
+               zlog_warn(log_pkt_src("invalid query size"));
+               return;
+       }
+
+       hdr = (struct mld_v2_query_hdr *)data;
+       general_query = pim_addr_is_any(hdr->grp);
+
+       if (!general_query && !IN6_IS_ADDR_MULTICAST(&hdr->grp)) {
+               zlog_warn(log_pkt_src(
+                                 "malformed MLDv2 query (invalid group %pI6)"),
+                         &hdr->grp);
+               return;
+       }
+
+       if (len >= sizeof(struct mld_v2_query_hdr)) {
+               size_t src_space = ntohs(hdr->n_src) * sizeof(pim_addr);
+
+               if (len < sizeof(struct mld_v2_query_hdr) + src_space) {
+                       zlog_warn(log_pkt_src(
+                               "malformed MLDv2 query (truncated source list)"));
+                       return;
+               }
+
+               if (general_query && src_space) {
+                       zlog_warn(log_pkt_src(
+                               "malformed MLDv2 query (general query with non-empty source list)"));
+                       return;
+               }
+       }
+
+       /* accepting queries unicast to us (or addressed to a wrong group)
+        * can mess up querier election as well as cause us to terminate
+        * traffic (since after a unicast query no reports will be coming in)
+        */
+       if (!IPV6_ADDR_SAME(pkt_dst, &gm_all_hosts)) {
+               if (pim_addr_is_any(hdr->grp)) {
+                       zlog_warn(
+                               log_pkt_src(
+                                       "wrong destination %pPA for general query"),
+                               pkt_dst);
+                       return;
+               }
+
+               if (!IPV6_ADDR_SAME(&hdr->grp, pkt_dst)) {
+                       zlog_warn(
+                               log_pkt_src(
+                                       "wrong destination %pPA for group specific query"),
+                               pkt_dst);
+                       return;
+               }
+       }
+
+       if (IPV6_ADDR_CMP(&pkt_src->sin6_addr, &gm_ifp->querier) < 0) {
+               if (PIM_DEBUG_IGMP_EVENTS)
+                       zlog_debug(log_pkt_src("replacing elected querier %pPA"),
+                                  &gm_ifp->querier);
+
+               gm_ifp->querier = pkt_src->sin6_addr;
+       }
+
+       if (len == sizeof(struct mld_v1_pkt)) {
+               timers.qrv = gm_ifp->cur_qrv;
+               timers.max_resp_ms = hdr->max_resp_code;
+               timers.qqic_ms = gm_ifp->cur_query_intv;
+       } else {
+               timers.qrv = (hdr->flags & 0x7) ?: 8;
+               timers.max_resp_ms = mld_max_resp_decode(hdr->max_resp_code);
+               timers.qqic_ms = igmp_msg_decode8to16(hdr->qqic) * 1000;
+       }
+       timers.fuzz = gm_ifp->cfg_timing_fuzz;
+
+       gm_expiry_calc(&timers);
+
+       if (PIM_DEBUG_IGMP_TRACE_DETAIL)
+               zlog_debug(
+                       log_ifp("query timers: QRV=%u max_resp=%ums qqic=%ums expire_wait=%pTVI"),
+                       timers.qrv, timers.max_resp_ms, timers.qqic_ms,
+                       &timers.expire_wait);
+
+       if (IPV6_ADDR_CMP(&pkt_src->sin6_addr, &pim_ifp->ll_lowest) < 0) {
+               unsigned other_ms;
+
+               THREAD_OFF(gm_ifp->t_query);
+               THREAD_OFF(gm_ifp->t_other_querier);
+
+               other_ms = timers.qrv * timers.qqic_ms + timers.max_resp_ms / 2;
+               thread_add_timer_msec(router->master, gm_t_other_querier,
+                                     gm_ifp, other_ms,
+                                     &gm_ifp->t_other_querier);
+       }
+
+       if (len == sizeof(struct mld_v1_pkt)) {
+               if (general_query)
+                       gm_handle_q_general(gm_ifp, &timers);
+               else
+                       gm_handle_q_group(gm_ifp, &timers, hdr->grp);
+               return;
+       }
+
+       /* v2 query - [S]uppress bit */
+       if (hdr->flags & 0x8)
+               return;
+
+       if (general_query)
+               gm_handle_q_general(gm_ifp, &timers);
+       else if (!ntohs(hdr->n_src))
+               gm_handle_q_group(gm_ifp, &timers, hdr->grp);
+       else
+               gm_handle_q_groupsrc(gm_ifp, &timers, hdr->grp, hdr->srcs,
+                                    ntohs(hdr->n_src));
+}
+
+static void gm_rx_process(struct gm_if *gm_ifp,
+                         const struct sockaddr_in6 *pkt_src, pim_addr *pkt_dst,
+                         void *data, size_t pktlen)
+{
+       struct icmp6_plain_hdr *icmp6 = data;
+       uint16_t pkt_csum, ref_csum;
+       struct ipv6_ph ph6 = {
+               .src = pkt_src->sin6_addr,
+               .dst = *pkt_dst,
+               .ulpl = htons(pktlen),
+               .next_hdr = IPPROTO_ICMPV6,
+       };
+
+       pkt_csum = icmp6->icmp6_cksum;
+       icmp6->icmp6_cksum = 0;
+       ref_csum = in_cksum_with_ph6(&ph6, data, pktlen);
+
+       if (pkt_csum != ref_csum) {
+               zlog_warn(
+                       log_pkt_src(
+                               "(dst %pPA) packet RX checksum failure, expected %04hx, got %04hx"),
+                       pkt_dst, pkt_csum, ref_csum);
+               return;
+       }
+
+       data = (icmp6 + 1);
+       pktlen -= sizeof(*icmp6);
+
+       switch (icmp6->icmp6_type) {
+       case ICMP6_MLD_QUERY:
+               gm_handle_query(gm_ifp, pkt_src, pkt_dst, data, pktlen);
+               break;
+       case ICMP6_MLD_V1_REPORT:
+               gm_handle_v1_report(gm_ifp, pkt_src, data, pktlen);
+               break;
+       case ICMP6_MLD_V1_DONE:
+               gm_handle_v1_leave(gm_ifp, pkt_src, data, pktlen);
+               break;
+       case ICMP6_MLD_V2_REPORT:
+               gm_handle_v2_report(gm_ifp, pkt_src, data, pktlen);
+               break;
+       }
+}
+
+static bool ip6_check_hopopts_ra(uint8_t *hopopts, size_t hopopt_len,
+                                uint16_t alert_type)
+{
+       uint8_t *hopopt_end;
+
+       if (hopopt_len < 8)
+               return false;
+       if (hopopt_len < (hopopts[1] + 1U) * 8U)
+               return false;
+
+       hopopt_end = hopopts + (hopopts[1] + 1) * 8;
+       hopopts += 2;
+
+       while (hopopts < hopopt_end) {
+               if (hopopts[0] == IP6OPT_PAD1) {
+                       hopopts++;
+                       continue;
+               }
+
+               if (hopopts > hopopt_end - 2)
+                       break;
+               if (hopopts > hopopt_end - 2 - hopopts[1])
+                       break;
+
+               if (hopopts[0] == IP6OPT_ROUTER_ALERT && hopopts[1] == 2) {
+                       uint16_t have_type = (hopopts[2] << 8) | hopopts[3];
+
+                       if (have_type == alert_type)
+                               return true;
+               }
+
+               hopopts += 2 + hopopts[1];
+       }
+       return false;
+}
+
+static void gm_t_recv(struct thread *t)
+{
+       struct gm_if *gm_ifp = THREAD_ARG(t);
+       union {
+               char buf[CMSG_SPACE(sizeof(struct in6_pktinfo)) +
+                        CMSG_SPACE(256) /* hop options */ +
+                        CMSG_SPACE(sizeof(int)) /* hopcount */];
+               struct cmsghdr align;
+       } cmsgbuf;
+       struct cmsghdr *cmsg;
+       struct in6_pktinfo *pktinfo = NULL;
+       uint8_t *hopopts = NULL;
+       size_t hopopt_len = 0;
+       int *hoplimit = NULL;
+       char rxbuf[2048];
+       struct msghdr mh[1] = {};
+       struct iovec iov[1];
+       struct sockaddr_in6 pkt_src[1];
+       ssize_t nread;
+       size_t pktlen;
+
+       thread_add_read(router->master, gm_t_recv, gm_ifp, gm_ifp->sock,
+                       &gm_ifp->t_recv);
+
+       iov->iov_base = rxbuf;
+       iov->iov_len = sizeof(rxbuf);
+
+       mh->msg_name = pkt_src;
+       mh->msg_namelen = sizeof(pkt_src);
+       mh->msg_control = cmsgbuf.buf;
+       mh->msg_controllen = sizeof(cmsgbuf.buf);
+       mh->msg_iov = iov;
+       mh->msg_iovlen = array_size(iov);
+       mh->msg_flags = 0;
+
+       nread = recvmsg(gm_ifp->sock, mh, MSG_PEEK | MSG_TRUNC);
+       if (nread <= 0) {
+               zlog_err(log_ifp("RX error: %m"));
+               return;
+       }
+
+       if ((size_t)nread > sizeof(rxbuf)) {
+               iov->iov_base = XMALLOC(MTYPE_GM_PACKET, nread);
+               iov->iov_len = nread;
+       }
+       nread = recvmsg(gm_ifp->sock, mh, 0);
+       if (nread <= 0) {
+               zlog_err(log_ifp("RX error: %m"));
+               goto out_free;
+       }
+
+       if ((int)pkt_src->sin6_scope_id != gm_ifp->ifp->ifindex)
+               goto out_free;
+
+       for (cmsg = CMSG_FIRSTHDR(mh); cmsg; cmsg = CMSG_NXTHDR(mh, cmsg)) {
+               if (cmsg->cmsg_level != SOL_IPV6)
+                       continue;
+
+               switch (cmsg->cmsg_type) {
+               case IPV6_PKTINFO:
+                       pktinfo = (struct in6_pktinfo *)CMSG_DATA(cmsg);
+                       break;
+               case IPV6_HOPOPTS:
+                       hopopts = CMSG_DATA(cmsg);
+                       hopopt_len = cmsg->cmsg_len - sizeof(*cmsg);
+                       break;
+               case IPV6_HOPLIMIT:
+                       hoplimit = (int *)CMSG_DATA(cmsg);
+                       break;
+               }
+       }
+
+       if (!pktinfo || !hoplimit) {
+               zlog_err(log_ifp(
+                       "BUG: packet without IPV6_PKTINFO or IPV6_HOPLIMIT"));
+               goto out_free;
+       }
+
+       if (*hoplimit != 1) {
+               zlog_err(log_pkt_src("packet with hop limit != 1"));
+               goto out_free;
+       }
+
+       if (!ip6_check_hopopts_ra(hopopts, hopopt_len, IP6_ALERT_MLD)) {
+               zlog_err(log_pkt_src(
+                       "packet without IPv6 Router Alert MLD option"));
+               goto out_free;
+       }
+
+       if (IN6_IS_ADDR_UNSPECIFIED(&pkt_src->sin6_addr))
+               /* reports from :: happen in normal operation for DAD, so
+                * don't spam log messages about this
+                */
+               goto out_free;
+
+       if (!IN6_IS_ADDR_LINKLOCAL(&pkt_src->sin6_addr)) {
+               zlog_warn(log_pkt_src("packet from invalid source address"));
+               goto out_free;
+       }
+
+       pktlen = nread;
+       if (pktlen < sizeof(struct icmp6_plain_hdr)) {
+               zlog_warn(log_pkt_src("truncated packet"));
+               goto out_free;
+       }
+
+       gm_rx_process(gm_ifp, pkt_src, &pktinfo->ipi6_addr, iov->iov_base,
+                     pktlen);
+
+out_free:
+       if (iov->iov_base != rxbuf)
+               XFREE(MTYPE_GM_PACKET, iov->iov_base);
+}
+
+static void gm_send_query(struct gm_if *gm_ifp, pim_addr grp,
+                         const pim_addr *srcs, size_t n_srcs, bool s_bit)
+{
+       struct pim_interface *pim_ifp = gm_ifp->ifp->info;
+       struct sockaddr_in6 dstaddr = {
+               .sin6_family = AF_INET6,
+               .sin6_scope_id = gm_ifp->ifp->ifindex,
+       };
+       struct {
+               struct icmp6_plain_hdr hdr;
+               struct mld_v2_query_hdr v2_query;
+       } query = {
+               /* clang-format off */
+               .hdr = {
+                       .icmp6_type = ICMP6_MLD_QUERY,
+                       .icmp6_code = 0,
+               },
+               .v2_query = {
+                       .grp = grp,
+               },
+               /* clang-format on */
+       };
+       struct ipv6_ph ph6 = {
+               .src = pim_ifp->ll_lowest,
+               .ulpl = htons(sizeof(query)),
+               .next_hdr = IPPROTO_ICMPV6,
+       };
+       union {
+               char buf[CMSG_SPACE(8)];
+               struct cmsghdr align;
+       } cmsg = {};
+       struct cmsghdr *cmh;
+       struct msghdr mh[1] = {};
+       struct iovec iov[3];
+       size_t iov_len;
+       ssize_t ret, expect_ret;
+       uint8_t *dp;
+
+       if (if_is_loopback(gm_ifp->ifp)) {
+               /* Linux is a bit odd with multicast on loopback */
+               ph6.src = in6addr_loopback;
+               dstaddr.sin6_addr = in6addr_loopback;
+       } else if (pim_addr_is_any(grp))
+               dstaddr.sin6_addr = gm_all_hosts;
+       else
+               dstaddr.sin6_addr = grp;
+
+       query.v2_query.max_resp_code =
+               mld_max_resp_encode(gm_ifp->cur_max_resp);
+       query.v2_query.flags = (gm_ifp->cur_qrv < 8) ? gm_ifp->cur_qrv : 0;
+       if (s_bit)
+               query.v2_query.flags |= 0x08;
+       query.v2_query.qqic =
+               igmp_msg_encode16to8(gm_ifp->cur_query_intv / 1000);
+       query.v2_query.n_src = htons(n_srcs);
+
+       ph6.dst = dstaddr.sin6_addr;
+
+       /* ph6 not included in sendmsg */
+       iov[0].iov_base = &ph6;
+       iov[0].iov_len = sizeof(ph6);
+       iov[1].iov_base = &query;
+       if (gm_ifp->cur_version == GM_MLDV1) {
+               iov_len = 2;
+               iov[1].iov_len = sizeof(query.hdr) + sizeof(struct mld_v1_pkt);
+       } else if (!n_srcs) {
+               iov_len = 2;
+               iov[1].iov_len = sizeof(query);
+       } else {
+               iov[1].iov_len = sizeof(query);
+               iov[2].iov_base = (void *)srcs;
+               iov[2].iov_len = n_srcs * sizeof(srcs[0]);
+               iov_len = 3;
+       }
+
+       query.hdr.icmp6_cksum = in_cksumv(iov, iov_len);
+
+       if (PIM_DEBUG_IGMP_PACKETS)
+               zlog_debug(log_ifp("MLD query %pPA -> %pI6 (grp=%pPA, %zu srcs)"),
+                          &pim_ifp->ll_lowest, &dstaddr.sin6_addr, &grp, n_srcs);
+
+       mh->msg_name = &dstaddr;
+       mh->msg_namelen = sizeof(dstaddr);
+       mh->msg_iov = iov + 1;
+       mh->msg_iovlen = iov_len - 1;
+       mh->msg_control = &cmsg;
+       mh->msg_controllen = sizeof(cmsg.buf);
+       cmh = CMSG_FIRSTHDR(mh);
+       cmh->cmsg_level = IPPROTO_IPV6;
+       cmh->cmsg_type = IPV6_HOPOPTS;
+       cmh->cmsg_len = CMSG_LEN(8);
+       dp = CMSG_DATA(cmh);
+       *dp++ = 0;                   // next header
+       *dp++ = 0;                   // length (8-byte blocks, minus 1)
+       *dp++ = IP6OPT_ROUTER_ALERT; // router alert
+       *dp++ = 2;                   // length
+       *dp++ = 0;                   // value (2 bytes)
+       *dp++ = 0;                   // value (2 bytes) (0 = MLD)
+       *dp++ = 0;                   // pad0
+       *dp++ = 0;                   // pad0
+
+       expect_ret = iov[1].iov_len;
+       if (iov_len == 3)
+               expect_ret += iov[2].iov_len;
+
+       frr_with_privs (&pimd_privs) {
+               ret = sendmsg(gm_ifp->sock, mh, 0);
+       }
+
+       if (ret != expect_ret)
+               zlog_warn(log_ifp("failed to send query: %m"));
+}
+
+static void gm_t_query(struct thread *t)
+{
+       struct gm_if *gm_ifp = THREAD_ARG(t);
+       unsigned timer_ms = gm_ifp->cur_query_intv;
+
+       if (gm_ifp->n_startup) {
+               timer_ms /= 4;
+               gm_ifp->n_startup--;
+       }
+
+       thread_add_timer_msec(router->master, gm_t_query, gm_ifp, timer_ms,
+                             &gm_ifp->t_query);
+
+       gm_send_query(gm_ifp, PIMADDR_ANY, NULL, 0, false);
+}
+
+static void gm_t_sg_query(struct thread *t)
+{
+       struct gm_sg *sg = THREAD_ARG(t);
+
+       gm_trigger_specific(sg);
+}
+
+/* S,G specific queries (triggered by a member leaving) get a little slack
+ * time so we can bundle queries for [S1,S2,S3,...],G into the same query
+ */
+static void gm_send_specific(struct gm_gsq_pending *pend_gsq)
+{
+       struct gm_if *gm_ifp = pend_gsq->iface;
+
+       gm_send_query(gm_ifp, pend_gsq->grp, pend_gsq->srcs, pend_gsq->n_src,
+                     pend_gsq->s_bit);
+
+       gm_gsq_pends_del(gm_ifp->gsq_pends, pend_gsq);
+       XFREE(MTYPE_GM_GSQ_PENDING, pend_gsq);
+}
+
+static void gm_t_gsq_pend(struct thread *t)
+{
+       struct gm_gsq_pending *pend_gsq = THREAD_ARG(t);
+
+       gm_send_specific(pend_gsq);
+}
+
+static void gm_trigger_specific(struct gm_sg *sg)
+{
+       struct gm_if *gm_ifp = sg->iface;
+       struct pim_interface *pim_ifp = gm_ifp->ifp->info;
+       struct gm_gsq_pending *pend_gsq, ref;
+
+       sg->n_query--;
+       if (sg->n_query)
+               thread_add_timer_msec(router->master, gm_t_sg_query, sg,
+                                     gm_ifp->cur_query_intv_trig,
+                                     &sg->t_sg_query);
+
+       if (!IPV6_ADDR_SAME(&gm_ifp->querier, &pim_ifp->ll_lowest))
+               return;
+       if (gm_ifp->sock == -1)
+               return;
+
+       if (PIM_DEBUG_IGMP_TRACE)
+               zlog_debug(log_sg(sg, "triggered query"));
+
+       if (pim_addr_is_any(sg->sgaddr.src)) {
+               gm_send_query(gm_ifp, sg->sgaddr.grp, NULL, 0, sg->query_sbit);
+               return;
+       }
+
+       ref.grp = sg->sgaddr.grp;
+       ref.s_bit = sg->query_sbit;
+
+       pend_gsq = gm_gsq_pends_find(gm_ifp->gsq_pends, &ref);
+       if (!pend_gsq) {
+               pend_gsq = XCALLOC(MTYPE_GM_GSQ_PENDING, sizeof(*pend_gsq));
+               pend_gsq->grp = sg->sgaddr.grp;
+               pend_gsq->s_bit = sg->query_sbit;
+               pend_gsq->iface = gm_ifp;
+               gm_gsq_pends_add(gm_ifp->gsq_pends, pend_gsq);
+
+               thread_add_timer_tv(router->master, gm_t_gsq_pend, pend_gsq,
+                                   &gm_ifp->cfg_timing_fuzz,
+                                   &pend_gsq->t_send);
+       }
+
+       assert(pend_gsq->n_src < array_size(pend_gsq->srcs));
+
+       pend_gsq->srcs[pend_gsq->n_src] = sg->sgaddr.src;
+       pend_gsq->n_src++;
+
+       if (pend_gsq->n_src == array_size(pend_gsq->srcs)) {
+               THREAD_OFF(pend_gsq->t_send);
+               gm_send_specific(pend_gsq);
+               pend_gsq = NULL;
+       }
+}
+
+static void gm_start(struct interface *ifp)
+{
+       struct pim_interface *pim_ifp = ifp->info;
+       struct gm_if *gm_ifp;
+       int ret, intval;
+       struct icmp6_filter filter[1];
+
+       assert(pim_ifp);
+       assert(pim_ifp->pim);
+       assert(pim_ifp->mroute_vif_index >= 0);
+       assert(!pim_ifp->mld);
+
+       gm_ifp = XCALLOC(MTYPE_GM_IFACE, sizeof(*gm_ifp));
+       gm_ifp->ifp = ifp;
+       pim_ifp->mld = gm_ifp;
+       gm_ifp->pim = pim_ifp->pim;
+
+       zlog_info(log_ifp("starting MLD"));
+
+       if (pim_ifp->mld_version == 1)
+               gm_ifp->cur_version = GM_MLDV1;
+       else
+               gm_ifp->cur_version = GM_MLDV2;
+
+       /* hardcoded for dev without CLI */
+       gm_ifp->cur_qrv = 2;
+       gm_ifp->cur_query_intv = pim_ifp->gm_default_query_interval * 1000;
+       gm_ifp->cur_query_intv_trig = gm_ifp->cur_query_intv;
+       gm_ifp->cur_max_resp = 250;
+
+       gm_ifp->cfg_timing_fuzz.tv_sec = 0;
+       gm_ifp->cfg_timing_fuzz.tv_usec = 10 * 1000;
+
+       gm_sgs_init(gm_ifp->sgs);
+       gm_subscribers_init(gm_ifp->subscribers);
+       gm_packet_expires_init(gm_ifp->expires);
+       gm_grp_pends_init(gm_ifp->grp_pends);
+       gm_gsq_pends_init(gm_ifp->gsq_pends);
+
+       ICMP6_FILTER_SETBLOCKALL(filter);
+       ICMP6_FILTER_SETPASS(ICMP6_MLD_QUERY, filter);
+       ICMP6_FILTER_SETPASS(ICMP6_MLD_V1_REPORT, filter);
+       ICMP6_FILTER_SETPASS(ICMP6_MLD_V1_DONE, filter);
+       ICMP6_FILTER_SETPASS(ICMP6_MLD_V2_REPORT, filter);
+
+       frr_with_privs (&pimd_privs) {
+               gm_ifp->sock = socket(AF_INET6, SOCK_RAW, IPPROTO_ICMPV6);
+               if (gm_ifp->sock < 0) {
+                       zlog_err("(%s) could not create MLD socket: %m",
+                                ifp->name);
+                       return;
+               }
+
+               ret = setsockopt(gm_ifp->sock, SOL_ICMPV6, ICMP6_FILTER, filter,
+                                sizeof(filter));
+               if (ret)
+                       zlog_err("(%s) failed to set ICMP6_FILTER: %m",
+                                ifp->name);
+
+               intval = 1;
+               ret = setsockopt(gm_ifp->sock, SOL_IPV6, IPV6_RECVPKTINFO,
+                                &intval, sizeof(intval));
+               if (ret)
+                       zlog_err("(%s) failed to set IPV6_RECVPKTINFO: %m",
+                                ifp->name);
+
+               intval = 1;
+               ret = setsockopt(gm_ifp->sock, SOL_IPV6, IPV6_RECVHOPOPTS,
+                                &intval, sizeof(intval));
+               if (ret)
+                       zlog_err("(%s) failed to set IPV6_HOPOPTS: %m",
+                                ifp->name);
+
+               intval = 1;
+               ret = setsockopt(gm_ifp->sock, SOL_IPV6, IPV6_RECVHOPLIMIT,
+                                &intval, sizeof(intval));
+               if (ret)
+                       zlog_err("(%s) failed to set IPV6_HOPLIMIT: %m",
+                                ifp->name);
+
+               intval = 1;
+               ret = setsockopt(gm_ifp->sock, SOL_IPV6, IPV6_MULTICAST_LOOP,
+                                &intval, sizeof(intval));
+               if (ret)
+                       zlog_err(
+                               "(%s) failed to disable IPV6_MULTICAST_LOOP: %m",
+                               ifp->name);
+
+               intval = 1;
+               ret = setsockopt(gm_ifp->sock, SOL_IPV6, IPV6_MULTICAST_HOPS,
+                                &intval, sizeof(intval));
+               if (ret)
+                       zlog_err("(%s) failed to set IPV6_MULTICAST_HOPS: %m",
+                                ifp->name);
+
+               /* NB: IPV6_MULTICAST_ALL does not completely bypass multicast
+                * RX filtering in Linux.  It only means "receive all groups
+                * that something on the system has joined".  To actually
+                * receive *all* MLD packets - which is what we need -
+                * multicast routing must be enabled on the interface.  And
+                * this only works for MLD packets specifically.
+                *
+                * For reference, check ip6_mc_input() in net/ipv6/ip6_input.c
+                * and in particular the #ifdef CONFIG_IPV6_MROUTE block there.
+                *
+                * Also note that the code there explicitly checks for the IPv6
+                * router alert MLD option (which is required by the RFC to be
+                * on MLD packets.)  That implies trying to support hosts which
+                * erroneously don't add that option is just not possible.
+                */
+               intval = 1;
+               ret = setsockopt(gm_ifp->sock, SOL_IPV6, IPV6_MULTICAST_ALL,
+                                &intval, sizeof(intval));
+               if (ret)
+                       zlog_info(
+                               "(%s) failed to set IPV6_MULTICAST_ALL: %m (OK on old kernels)",
+                               ifp->name);
+
+               struct ipv6_mreq mreq;
+
+               /* all-MLDv2 group */
+               mreq.ipv6mr_multiaddr = gm_all_routers;
+               mreq.ipv6mr_interface = ifp->ifindex;
+               ret = setsockopt(gm_ifp->sock, SOL_IPV6, IPV6_JOIN_GROUP, &mreq,
+                                sizeof(mreq));
+               if (ret)
+                       zlog_err("(%s) failed to join ff02::16 (all-MLDv2): %m",
+                                ifp->name);
+       }
+
+       thread_add_read(router->master, gm_t_recv, gm_ifp, gm_ifp->sock,
+                       &gm_ifp->t_recv);
+}
+
+void gm_ifp_teardown(struct interface *ifp)
+{
+       struct pim_interface *pim_ifp = ifp->info;
+       struct gm_if *gm_ifp;
+       struct gm_packet_state *pkt;
+       struct gm_grp_pending *pend_grp;
+       struct gm_gsq_pending *pend_gsq;
+       struct gm_subscriber *subscriber;
+       struct gm_sg *sg;
+
+       if (!pim_ifp || !pim_ifp->mld)
+               return;
+
+       gm_ifp = pim_ifp->mld;
+       gm_ifp->stopping = true;
+       if (PIM_DEBUG_IGMP_EVENTS)
+               zlog_debug(log_ifp("MLD stop"));
+
+       THREAD_OFF(gm_ifp->t_query);
+       THREAD_OFF(gm_ifp->t_other_querier);
+       THREAD_OFF(gm_ifp->t_recv);
+       THREAD_OFF(gm_ifp->t_expire);
+
+       if (gm_ifp->sock != -1) {
+               close(gm_ifp->sock);
+               gm_ifp->sock = -1;
+       }
+
+       while ((pkt = gm_packet_expires_first(gm_ifp->expires)))
+               gm_packet_drop(pkt, false);
+
+       while ((pend_grp = gm_grp_pends_pop(gm_ifp->grp_pends))) {
+               THREAD_OFF(pend_grp->t_expire);
+               XFREE(MTYPE_GM_GRP_PENDING, pend_grp);
+       }
+
+       while ((pend_gsq = gm_gsq_pends_pop(gm_ifp->gsq_pends))) {
+               THREAD_OFF(pend_gsq->t_send);
+               XFREE(MTYPE_GM_GSQ_PENDING, pend_gsq);
+       }
+
+       while ((sg = gm_sgs_pop(gm_ifp->sgs))) {
+               THREAD_OFF(sg->t_sg_expire);
+               assertf(!gm_packet_sg_subs_count(sg->subs_negative), "%pSG",
+                       &sg->sgaddr);
+               assertf(!gm_packet_sg_subs_count(sg->subs_positive), "%pSG",
+                       &sg->sgaddr);
+
+               gm_sg_free(sg);
+       }
+
+       while ((subscriber = gm_subscribers_pop(gm_ifp->subscribers))) {
+               assertf(!gm_packets_count(subscriber->packets), "%pPA",
+                       &subscriber->addr);
+               XFREE(MTYPE_GM_SUBSCRIBER, subscriber);
+       }
+
+       gm_grp_pends_fini(gm_ifp->grp_pends);
+       gm_packet_expires_fini(gm_ifp->expires);
+       gm_subscribers_fini(gm_ifp->subscribers);
+       gm_sgs_fini(gm_ifp->sgs);
+
+       XFREE(MTYPE_GM_IFACE, gm_ifp);
+       pim_ifp->mld = NULL;
+}
+
+static void gm_update_ll(struct interface *ifp)
+{
+       struct pim_interface *pim_ifp = ifp->info;
+       struct gm_if *gm_ifp = pim_ifp ? pim_ifp->mld : NULL;
+       struct sockaddr_in6 sa = {.sin6_family = AF_INET6};
+       int rc;
+       bool was_querier;
+
+       was_querier =
+               !IPV6_ADDR_CMP(&gm_ifp->cur_ll_lowest, &gm_ifp->querier) &&
+               !pim_addr_is_any(gm_ifp->querier);
+
+       gm_ifp->cur_ll_lowest = pim_ifp->ll_lowest;
+       if (was_querier)
+               gm_ifp->querier = pim_ifp->ll_lowest;
+       THREAD_OFF(gm_ifp->t_query);
+
+       if (pim_addr_is_any(gm_ifp->cur_ll_lowest)) {
+               if (was_querier)
+                       zlog_info(log_ifp(
+                               "lost link-local address, stopping querier"));
+               return;
+       }
+
+       if (was_querier)
+               zlog_info(log_ifp("new link-local %pPA while querier"),
+                         &gm_ifp->cur_ll_lowest);
+       else if (IPV6_ADDR_CMP(&gm_ifp->cur_ll_lowest, &gm_ifp->querier) < 0 ||
+                pim_addr_is_any(gm_ifp->querier)) {
+               zlog_info(log_ifp("new link-local %pPA, becoming querier"),
+                         &gm_ifp->cur_ll_lowest);
+               gm_ifp->querier = gm_ifp->cur_ll_lowest;
+       } else
+               return;
+
+       /* we're querier */
+       sa.sin6_addr = pim_ifp->ll_lowest;
+       sa.sin6_scope_id = ifp->ifindex;
+
+       frr_with_privs (&pimd_privs) {
+               rc = bind(gm_ifp->sock, (struct sockaddr *)&sa, sizeof(sa));
+       }
+       if (rc)
+               zlog_err(log_ifp("bind to %pPA failed: %m"),
+                        &pim_ifp->ll_lowest);
+
+       gm_ifp->n_startup = gm_ifp->cur_qrv;
+       thread_execute(router->master, gm_t_query, gm_ifp, 0);
+}
+
+void gm_ifp_update(struct interface *ifp)
+{
+       struct pim_interface *pim_ifp = ifp->info;
+       struct gm_if *gm_ifp;
+       bool changed = false;
+
+       if (!pim_ifp)
+               return;
+       if (!if_is_operative(ifp) || !pim_ifp->pim ||
+           pim_ifp->mroute_vif_index < 0) {
+               gm_ifp_teardown(ifp);
+               return;
+       }
+
+       if (!pim_ifp->mld)
+               gm_start(ifp);
+
+       gm_ifp = pim_ifp->mld;
+       if (IPV6_ADDR_CMP(&pim_ifp->ll_lowest, &gm_ifp->cur_ll_lowest))
+               gm_update_ll(ifp);
+
+       unsigned cfg_query_intv = pim_ifp->gm_default_query_interval * 1000;
+
+       if (gm_ifp->cur_query_intv != cfg_query_intv) {
+               gm_ifp->cur_query_intv = cfg_query_intv;
+               gm_ifp->cur_query_intv_trig = cfg_query_intv;
+               changed = true;
+       }
+
+       enum gm_version cfg_version;
+
+       if (pim_ifp->mld_version == 1)
+               cfg_version = GM_MLDV1;
+       else
+               cfg_version = GM_MLDV2;
+       if (gm_ifp->cur_version != cfg_version) {
+               gm_ifp->cur_version = cfg_version;
+               changed = true;
+       }
+
+       if (changed) {
+               if (PIM_DEBUG_IGMP_TRACE)
+                       zlog_debug(log_ifp("MLD querier config changed, querying"));
+               gm_bump_querier(gm_ifp);
+       }
+}
+
+
+#include "lib/command.h"
+
+#ifndef VTYSH_EXTRACT_PL
+#include "pimd/pim6_mld_clippy.c"
+#endif
+
+DEFPY(gm_debug_show,
+      gm_debug_show_cmd,
+      "debug show mld interface IFNAME",
+      DEBUG_STR
+      SHOW_STR
+      "MLD"
+      INTERFACE_STR
+      "interface name")
+{
+       struct interface *ifp;
+       struct pim_interface *pim_ifp;
+       struct gm_if *gm_ifp;
+
+       ifp = if_lookup_by_name(ifname, VRF_DEFAULT);
+       if (!ifp) {
+               vty_out(vty, "%% no such interface: %pSQq\n", ifname);
+               return CMD_WARNING;
+       }
+
+       pim_ifp = ifp->info;
+       if (!pim_ifp) {
+               vty_out(vty, "%% no PIM state for interface %pSQq\n", ifname);
+               return CMD_WARNING;
+       }
+
+       gm_ifp = pim_ifp->mld;
+       if (!gm_ifp) {
+               vty_out(vty, "%% no MLD state for interface %pSQq\n", ifname);
+               return CMD_WARNING;
+       }
+
+       vty_out(vty, "querier:         %pPA\n", &gm_ifp->querier);
+       vty_out(vty, "ll_lowest:       %pPA\n\n", &pim_ifp->ll_lowest);
+       vty_out(vty, "t_query:         %pTHD\n", gm_ifp->t_query);
+       vty_out(vty, "t_other_querier: %pTHD\n", gm_ifp->t_other_querier);
+       vty_out(vty, "t_recv:          %pTHD\n", gm_ifp->t_recv);
+       vty_out(vty, "t_expire:        %pTHD\n", gm_ifp->t_expire);
+
+       vty_out(vty, "\nn_pending: %u\n", gm_ifp->n_pending);
+       for (size_t i = 0; i < gm_ifp->n_pending; i++) {
+               int64_t query, expiry;
+
+               query = monotime_since(&gm_ifp->pending[i].query, NULL);
+               expiry = monotime_until(&gm_ifp->pending[i].expiry, NULL);
+
+               vty_out(vty, "[%zu]: query %"PRId64"ms ago, expiry in %"PRId64"ms\n",
+                       i, query / 1000, expiry / 1000);
+       }
+
+       struct gm_sg *sg;
+       struct gm_packet_state *pkt;
+       struct gm_packet_sg *item;
+       struct gm_subscriber *subscriber;
+
+       vty_out(vty, "\n%zu S,G entries:\n", gm_sgs_count(gm_ifp->sgs));
+       frr_each (gm_sgs, gm_ifp->sgs, sg) {
+               vty_out(vty, "\t%pSG    t_expire=%pTHD\n", &sg->sgaddr,
+                       sg->t_sg_expire);
+
+               vty_out(vty, "\t     @pos:%zu\n",
+                       gm_packet_sg_subs_count(sg->subs_positive));
+               frr_each (gm_packet_sg_subs, sg->subs_positive, item) {
+                       pkt = gm_packet_sg2state(item);
+
+                       vty_out(vty, "\t\t+%s%s [%pPAs %p] %p+%u\n",
+                               item->is_src ? "S" : "",
+                               item->is_excl ? "E" : "",
+                               &pkt->subscriber->addr, pkt->subscriber, pkt,
+                               item->offset);
+
+                       assert(item->sg == sg);
+               }
+               vty_out(vty, "\t     @neg:%zu\n",
+                       gm_packet_sg_subs_count(sg->subs_negative));
+               frr_each (gm_packet_sg_subs, sg->subs_negative, item) {
+                       pkt = gm_packet_sg2state(item);
+
+                       vty_out(vty, "\t\t-%s%s [%pPAs %p] %p+%u\n",
+                               item->is_src ? "S" : "",
+                               item->is_excl ? "E" : "",
+                               &pkt->subscriber->addr, pkt->subscriber, pkt,
+                               item->offset);
+
+                       assert(item->sg == sg);
+               }
+       }
+
+       vty_out(vty, "\n%zu subscribers:\n",
+               gm_subscribers_count(gm_ifp->subscribers));
+       frr_each (gm_subscribers, gm_ifp->subscribers, subscriber) {
+               vty_out(vty, "\t%pPA %p %zu packets\n", &subscriber->addr,
+                       subscriber, gm_packets_count(subscriber->packets));
+
+               frr_each (gm_packets, subscriber->packets, pkt) {
+                       vty_out(vty, "\t\t%p %.3fs ago %u of %u items active\n",
+                               pkt,
+                               monotime_since(&pkt->received, NULL) *
+                                       0.000001f,
+                               pkt->n_active, pkt->n_sg);
+
+                       for (size_t i = 0; i < pkt->n_sg; i++) {
+                               item = pkt->items + i;
+
+                               vty_out(vty, "\t\t[%zu]", i);
+
+                               if (!item->sg) {
+                                       vty_out(vty, " inactive\n");
+                                       continue;
+                               }
+
+                               vty_out(vty, " %s%s %pSG nE=%u\n",
+                                       item->is_src ? "S" : "",
+                                       item->is_excl ? "E" : "",
+                                       &item->sg->sgaddr, item->n_exclude);
+                       }
+               }
+       }
+
+       return CMD_SUCCESS;
+}
+
+DEFPY(gm_debug_iface_cfg,
+      gm_debug_iface_cfg_cmd,
+      "debug ipv6 mld {"
+        "robustness (0-7)|"
+       "query-max-response-time (1-8387584)"
+      "}",
+      DEBUG_STR
+      IPV6_STR
+      "Multicast Listener Discovery\n"
+      "QRV\nQRV\n"
+      "maxresp\nmaxresp\n")
+{
+       VTY_DECLVAR_CONTEXT(interface, ifp);
+       struct pim_interface *pim_ifp;
+       struct gm_if *gm_ifp;
+       bool changed = false;
+
+       pim_ifp = ifp->info;
+       if (!pim_ifp) {
+               vty_out(vty, "%% no PIM state for interface %pSQq\n",
+                       ifp->name);
+               return CMD_WARNING;
+       }
+       gm_ifp = pim_ifp->mld;
+       if (!gm_ifp) {
+               vty_out(vty, "%% no MLD state for interface %pSQq\n",
+                       ifp->name);
+               return CMD_WARNING;
+       }
+
+       if (robustness_str && gm_ifp->cur_qrv != robustness) {
+               gm_ifp->cur_qrv = robustness;
+               changed = true;
+       }
+       if (query_max_response_time_str &&
+           gm_ifp->cur_max_resp != query_max_response_time) {
+               gm_ifp->cur_max_resp = query_max_response_time;
+               changed = true;
+       }
+
+       if (changed) {
+               vty_out(vty, "%% MLD querier config changed, bumping\n");
+               gm_bump_querier(gm_ifp);
+       }
+       return CMD_SUCCESS;
+}
+
+void gm_cli_init(void);
+
+void gm_cli_init(void)
+{
+       install_element(VIEW_NODE, &gm_debug_show_cmd);
+       install_element(INTERFACE_NODE, &gm_debug_iface_cfg_cmd);
+}
diff --git a/pimd/pim6_mld.h b/pimd/pim6_mld.h
new file mode 100644 (file)
index 0000000..52f7405
--- /dev/null
@@ -0,0 +1,329 @@
+/*
+ * PIMv6 MLD querier
+ * Copyright (C) 2021-2022  David Lamparter for NetDEF, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the Free
+ * Software Foundation; either version 2 of the License, or (at your option)
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; see the file COPYING; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef PIM6_MLD_H
+#define PIM6_MLD_H
+
+#include "typesafe.h"
+#include "pim_addr.h"
+
+struct thread;
+struct pim_instance;
+struct gm_packet_sg;
+struct gm_if;
+struct channel_oil;
+
+/* see comment below on subs_negative/subs_positive */
+enum gm_sub_sense {
+       /* negative/pruning: S,G in EXCLUDE */
+       GM_SUB_NEG = 0,
+       /* positive/joining: *,G in EXCLUDE and S,G in INCLUDE */
+       GM_SUB_POS = 1,
+};
+
+enum gm_sg_state {
+       GM_SG_NOINFO = 0,
+       GM_SG_JOIN,
+       GM_SG_JOIN_EXPIRING,
+       /* remaining 3 only valid for S,G when *,G in EXCLUDE */
+       GM_SG_PRUNE,
+       GM_SG_NOPRUNE,
+       GM_SG_NOPRUNE_EXPIRING,
+};
+
+static inline bool gm_sg_state_want_join(enum gm_sg_state state)
+{
+       return state != GM_SG_NOINFO && state != GM_SG_PRUNE;
+}
+
+/* MLD (S,G) state (on an interface)
+ *
+ * group is always != ::, src is :: for (*,G) joins.  sort order in RB tree is
+ * such that sources for a particular group can be iterated by starting at the
+ * group.  For INCLUDE, no (*,G) entry exists, only (S,G).
+ */
+
+PREDECL_RBTREE_UNIQ(gm_packet_sg_subs);
+PREDECL_RBTREE_UNIQ(gm_sgs);
+struct gm_sg {
+       pim_sgaddr sgaddr;
+       struct gm_if *iface;
+       struct gm_sgs_item itm;
+
+       enum gm_sg_state state;
+       struct channel_oil *oil;
+       bool tib_joined;
+
+       /* if a group- or group-and-source specific query is running
+        * (implies we haven't received any report yet, since it's cancelled
+        * by that)
+        */
+       struct thread *t_sg_expire;
+
+       /* last-member-left triggered queries (group/group-source specific)
+        *
+        * this timer will be running even if we aren't the elected querier,
+        * in case the election result changes midway through.
+        */
+       struct thread *t_sg_query;
+
+       /* we must keep sending (QRV) queries even if we get a positive
+        * response, to make sure other routers are updated.  query_sbit
+        * will be set in that case, since other routers need the *response*,
+        * not the *query*
+        */
+       uint8_t n_query;
+       bool query_sbit;
+
+       /* subs_positive tracks gm_packet_sg resulting in a JOIN, i.e. for
+        * (*,G) it has *EXCLUDE* items, for (S,G) it has *INCLUDE* items.
+        *
+        * subs_negative is always empty for (*,G) and tracks EXCLUDE items
+        * for (S,G).  This means that an (S,G) entry is active as a PRUNE if
+        *   len(src->subs_negative) == len(grp->subs_positive)
+        *   && len(src->subs_positive) == 0
+        * (i.e. all receivers for the group opted to exclude this S,G and
+        * noone did an SSM join for the S,G)
+        */
+       union {
+               struct {
+                       struct gm_packet_sg_subs_head subs_negative[1];
+                       struct gm_packet_sg_subs_head subs_positive[1];
+               };
+               struct gm_packet_sg_subs_head subs[2];
+       };
+
+       /* If the elected querier is not ourselves, queries and reports might
+        * get reordered in rare circumstances, i.e. the report could arrive
+        * just a microsecond before the query kicks off the timer.  This can
+        * then result in us thinking there are no more receivers since no
+        * report might be received during the query period.
+        *
+        * To avoid this, keep track of the most recent report for this (S,G)
+        * so we can do a quick check to add just a little bit of slack.
+        *
+        * EXCLUDE S,Gs are never in most_recent.
+        */
+       struct gm_packet_sg *most_recent;
+};
+
+/* host tracking entry.  addr will be one of:
+ *
+ * ::          - used by hosts during address acquisition
+ * ::1         - may show up on some OS for joins by the router itself
+ * link-local  - regular operation by MLDv2 hosts
+ * ffff:..:ffff        - MLDv1 entry (cannot be tracked due to report suppression)
+ *
+ * global scope IPv6 addresses can never show up here
+ */
+PREDECL_HASH(gm_subscribers);
+PREDECL_DLIST(gm_packets);
+struct gm_subscriber {
+       pim_addr addr;
+       struct gm_subscribers_item itm;
+
+       struct gm_if *iface;
+       size_t refcount;
+
+       struct gm_packets_head packets[1];
+};
+
+/*
+ * MLD join state is kept batched by packet.  Since the timers for all items
+ * in a packet are the same, this reduces the number of timers we're keeping
+ * track of.  It also eases tracking for EXCLUDE state groups because the
+ * excluded sources are in the same packet.  (MLD does not support splitting
+ * that if it exceeds MTU, it's always a full replace for exclude.)
+ *
+ * Since packets may be partially superseded by newer packets, the "active"
+ * field is used to track this.
+ */
+
+/* gm_packet_sg is allocated as part of gm_packet_state, note the items[0]
+ * array at the end of that.  gm_packet_sg is NEVER directly allocated with
+ * XMALLOC/XFREE.
+ */
+struct gm_packet_sg {
+       /* non-NULL as long as this gm_packet_sg is the most recent entry
+        * for (subscriber,S,G).  Cleared to NULL when a newer packet by the
+        * subscriber replaces this item.
+        *
+        * (Old items are kept around so we don't need to realloc/resize
+        * gm_packet_state, which would mess up a whole lot of pointers)
+        */
+       struct gm_sg *sg;
+
+       /* gm_sg -> (subscriber, gm_packet_sg)
+        * only on RB-tree while sg != NULL, i.e. not superseded by newer.
+        */
+       struct gm_packet_sg_subs_item subs_itm;
+
+       bool is_src : 1; /* := (src != ::) */
+       bool is_excl : 1;
+
+       /* for getting back to struct gm_packet_state, cf.
+        * gm_packet_sg2state() below
+        */
+       uint16_t offset;
+
+       /* if this is a group entry in EXCLUDE state, n_exclude counts how
+        * many sources are on the exclude list here.  They follow immediately
+        * after.
+        */
+       uint16_t n_exclude;
+};
+
+#define gm_packet_sg2state(sg)                                                 \
+       container_of(sg, struct gm_packet_state, items[sg->offset])
+
+PREDECL_DLIST(gm_packet_expires);
+struct gm_packet_state {
+       struct gm_if *iface;
+       struct gm_subscriber *subscriber;
+       struct gm_packets_item pkt_itm;
+
+       struct timeval received;
+       struct gm_packet_expires_item exp_itm;
+
+       /* n_active starts equal to n_sg;  whenever active is set to false on
+        * an item it is decremented.  When n_active == 0, the packet can be
+        * freed.
+        */
+       uint16_t n_sg, n_active;
+       struct gm_packet_sg items[0];
+};
+
+/* general queries are rather different from group/S,G specific queries;  it's
+ * not particularly efficient or useful to try to shoehorn them into the S,G
+ * timers.  Instead, we keep a history of recent queries and their implied
+ * expiries.
+ */
+struct gm_general_pending {
+       struct timeval query, expiry;
+};
+
+/* similarly, group queries also age out S,G entries for the group, but in
+ * this case we only keep one query for each group
+ *
+ * why is this not in the *,G gm_sg?  There may not be one (for INCLUDE mode
+ * groups, or groups we don't know about.)  Also, malicious clients could spam
+ * random group-specific queries to trigger resource exhaustion, so it makes
+ * sense to limit these.
+ */
+PREDECL_RBTREE_UNIQ(gm_grp_pends);
+struct gm_grp_pending {
+       struct gm_grp_pends_item itm;
+       struct gm_if *iface;
+       pim_addr grp;
+
+       struct timeval query;
+       struct thread *t_expire;
+};
+
+/* guaranteed MTU for IPv6 is 1280 bytes.  IPv6 header is 40 bytes, MLDv2
+ * query header is 24 bytes, RA option is 8 bytes - leaves 1208 bytes for the
+ * source list, which is 151 IPv6 addresses.  But we may have some more IPv6
+ * extension headers (e.g. IPsec AH), so just cap to 128
+ */
+#define MLD_V2Q_MTU_MAX_SOURCES 128
+
+/* group-and-source-specific queries are bundled together, if some host joins
+ * multiple sources it's likely to drop all at the same time.
+ *
+ * Unlike gm_grp_pending, this is only used for aggregation since the S,G
+ * state is kept directly in the gm_sg structure.
+ */
+PREDECL_HASH(gm_gsq_pends);
+struct gm_gsq_pending {
+       struct gm_gsq_pends_item itm;
+
+       struct gm_if *iface;
+       struct thread *t_send;
+
+       pim_addr grp;
+       bool s_bit;
+
+       size_t n_src;
+       pim_addr srcs[MLD_V2Q_MTU_MAX_SOURCES];
+};
+
+
+/* The size of this history is limited by QRV, i.e. there can't be more than
+ * 8 items here.
+ */
+#define GM_MAX_PENDING 8
+
+enum gm_version {
+       GM_NONE,
+       GM_MLDV1,
+       GM_MLDV2,
+};
+
+struct gm_if {
+       struct interface *ifp;
+       struct pim_instance *pim;
+       struct thread *t_query, *t_other_querier, *t_recv, *t_expire;
+
+       bool stopping;
+
+       uint8_t n_startup;
+
+       uint8_t cur_qrv;
+       unsigned cur_query_intv;      /* ms */
+       unsigned cur_query_intv_trig; /* ms */
+       unsigned cur_max_resp;        /* ms */
+       enum gm_version cur_version;
+
+       /* this value (positive, default 10ms) defines our "timing tolerance":
+        * - added to deadlines for expiring joins
+        * - used to look backwards in time for queries, in case a report was
+        *   reordered before the query
+        */
+       struct timeval cfg_timing_fuzz;
+
+       /* items in pending[] are sorted by expiry, pending[0] is earliest */
+       struct gm_general_pending pending[GM_MAX_PENDING];
+       uint8_t n_pending;
+       struct gm_grp_pends_head grp_pends[1];
+       struct gm_gsq_pends_head gsq_pends[1];
+
+       int sock;
+
+       pim_addr querier;
+       pim_addr cur_ll_lowest;
+
+       struct gm_sgs_head sgs[1];
+       struct gm_subscribers_head subscribers[1];
+       struct gm_packet_expires_head expires[1];
+};
+
+#if PIM_IPV == 6
+extern void gm_ifp_update(struct interface *ifp);
+extern void gm_ifp_teardown(struct interface *ifp);
+#else
+static inline void gm_ifp_update(struct interface *ifp)
+{
+}
+
+static inline void gm_ifp_teardown(struct interface *ifp)
+{
+}
+#endif
+
+#endif /* PIM6_MLD_H */
diff --git a/pimd/pim6_mld_protocol.h b/pimd/pim6_mld_protocol.h
new file mode 100644 (file)
index 0000000..699178b
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ * MLD protocol definitions
+ * Copyright (C) 2022  David Lamparter for NetDEF, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the Free
+ * Software Foundation; either version 2 of the License, or (at your option)
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; see the file COPYING; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef _PIM6_MLD_PROTOCOL_H
+#define _PIM6_MLD_PROTOCOL_H
+
+#include <stdalign.h>
+#include <stdint.h>
+
+/* There is a struct icmp6_hdr provided by OS, but it includes 4 bytes of data.
+ * Not helpful for us if we want to put the MLD struct after it.
+ */
+
+struct icmp6_plain_hdr {
+       uint8_t icmp6_type;
+       uint8_t icmp6_code;
+       uint16_t icmp6_cksum;
+};
+static_assert(sizeof(struct icmp6_plain_hdr) == 4, "struct mismatch");
+static_assert(alignof(struct icmp6_plain_hdr) <= 4, "struct mismatch");
+
+/* for MLDv1 query, report and leave all use the same packet format */
+struct mld_v1_pkt {
+       uint16_t max_resp_code;
+       uint16_t rsvd0;
+       struct in6_addr grp;
+};
+static_assert(sizeof(struct mld_v1_pkt) == 20, "struct mismatch");
+static_assert(alignof(struct mld_v1_pkt) <= 4, "struct mismatch");
+
+
+struct mld_v2_query_hdr {
+       uint16_t max_resp_code;
+       uint16_t rsvd0;
+       struct in6_addr grp;
+       uint8_t flags;
+       uint8_t qqic;
+       uint16_t n_src;
+       struct in6_addr srcs[0];
+};
+static_assert(sizeof(struct mld_v2_query_hdr) == 24, "struct mismatch");
+static_assert(alignof(struct mld_v2_query_hdr) <= 4, "struct mismatch");
+
+
+struct mld_v2_report_hdr {
+       uint16_t rsvd;
+       uint16_t n_records;
+};
+static_assert(sizeof(struct mld_v2_report_hdr) == 4, "struct mismatch");
+static_assert(alignof(struct mld_v2_report_hdr) <= 4, "struct mismatch");
+
+
+struct mld_v2_rec_hdr {
+       uint8_t type;
+       uint8_t aux_len;
+       uint16_t n_src;
+       struct in6_addr grp;
+       struct in6_addr srcs[0];
+};
+static_assert(sizeof(struct mld_v2_rec_hdr) == 20, "struct mismatch");
+static_assert(alignof(struct mld_v2_rec_hdr) <= 4, "struct mismatch");
+
+/* clang-format off */
+enum icmp6_mld_type {
+       ICMP6_MLD_QUERY                 = 130,
+       ICMP6_MLD_V1_REPORT             = 131,
+       ICMP6_MLD_V1_DONE               = 132,
+       ICMP6_MLD_V2_REPORT             = 143,
+};
+
+enum mld_v2_rec_type {
+       MLD_RECTYPE_IS_INCLUDE          = 1,
+       MLD_RECTYPE_IS_EXCLUDE          = 2,
+       MLD_RECTYPE_CHANGE_TO_INCLUDE   = 3,
+       MLD_RECTYPE_CHANGE_TO_EXCLUDE   = 4,
+       MLD_RECTYPE_ALLOW_NEW_SOURCES   = 5,
+       MLD_RECTYPE_BLOCK_OLD_SOURCES   = 6,
+};
+/* clang-format on */
+
+/* helper functions */
+
+static inline unsigned mld_max_resp_decode(uint16_t wire)
+{
+       uint16_t code = ntohs(wire);
+       uint8_t exp;
+
+       if (code < 0x8000)
+               return code;
+       exp = (code >> 12) & 0x7;
+       return ((code & 0xfff) | 0x1000) << (exp + 3);
+}
+
+static inline uint16_t mld_max_resp_encode(uint32_t value)
+{
+       uint16_t code;
+       uint8_t exp;
+
+       if (value < 0x8000)
+               code = value;
+       else {
+               exp = 16 - __builtin_clz(value);
+               code = (value >> (exp + 3)) & 0xfff;
+               code |= 0x8000 | (exp << 12);
+       }
+       return htons(code);
+}
+
+#endif /* _PIM6_MLD_PROTOCOL_H */
index b98e64adfe91437e46505fc26c3cb34d708a4bf4..32ba1d926e77fbe8f61b82cd11cc96b587947c92 100644 (file)
@@ -50,6 +50,8 @@
 #include "pim_igmp_join.h"
 #include "pim_vxlan.h"
 
+#include "pim6_mld.h"
+
 #if PIM_IPV == 4
 static void pim_if_igmp_join_del_all(struct interface *ifp);
 static int igmp_join_sock(const char *ifname, ifindex_t ifindex,
@@ -650,6 +652,7 @@ void pim_if_addr_add(struct connected *ifc)
                vxlan_term = pim_vxlan_is_term_dev_cfg(pim_ifp->pim, ifp);
                pim_if_add_vif(ifp, false, vxlan_term);
        }
+       gm_ifp_update(ifp);
        pim_ifchannel_scan_forward_start(ifp);
 }
 
@@ -762,6 +765,8 @@ void pim_if_addr_del(struct connected *ifc, int force_prim_as_any)
                                "%s: removed link-local %pI6, lowest now %pI6, highest %pI6",
                                ifc->ifp->name, &ifc->address->u.prefix6,
                                &pim_ifp->ll_lowest, &pim_ifp->ll_highest);
+
+               gm_ifp_update(ifp);
        }
 #endif
 
@@ -821,6 +826,7 @@ void pim_if_addr_add_all(struct interface *ifp)
                vxlan_term = pim_vxlan_is_term_dev_cfg(pim_ifp->pim, ifp);
                pim_if_add_vif(ifp, false, vxlan_term);
        }
+       gm_ifp_update(ifp);
        pim_ifchannel_scan_forward_start(ifp);
 
        pim_rp_setup(pim_ifp->pim);
@@ -999,12 +1005,15 @@ int pim_if_add_vif(struct interface *ifp, bool ispimreg, bool is_vxlan_term)
        }
 
        ifaddr = pim_ifp->primary_address;
+#if PIM_IPV != 6
+       /* IPv6 API is always by interface index */
        if (!ispimreg && !is_vxlan_term && pim_addr_is_any(ifaddr)) {
                zlog_warn(
                        "%s: could not get address for interface %s ifindex=%d",
                        __func__, ifp->name, ifp->ifindex);
                return -4;
        }
+#endif
 
        pim_ifp->mroute_vif_index = pim_iface_next_vif_index(ifp);
 
@@ -1029,9 +1038,10 @@ int pim_if_add_vif(struct interface *ifp, bool ispimreg, bool is_vxlan_term)
 
        pim_ifp->pim->iface_vif_index[pim_ifp->mroute_vif_index] = 1;
 
+       gm_ifp_update(ifp);
+
        /* if the device qualifies as pim_vxlan iif/oif update vxlan entries */
        pim_vxlan_add_vif(ifp);
-
        return 0;
 }
 
@@ -1049,6 +1059,8 @@ int pim_if_del_vif(struct interface *ifp)
        /* if the device was a pim_vxlan iif/oif update vxlan mroute entries */
        pim_vxlan_del_vif(ifp);
 
+       gm_ifp_teardown(ifp);
+
        pim_mroute_del_vif(ifp);
 
        /*
@@ -1057,7 +1069,6 @@ int pim_if_del_vif(struct interface *ifp)
        pim_ifp->pim->iface_vif_index[pim_ifp->mroute_vif_index] = 0;
 
        pim_ifp->mroute_vif_index = -1;
-
        return 0;
 }
 
index 3535db70a8462dbb1a9beb713c1e7740beb28ede..6781cbf3a045964b56eeab72b759546862c86714 100644 (file)
@@ -69,6 +69,8 @@ struct pim_secondary_addr {
        enum pim_secondary_addr_flags flags;
 };
 
+struct gm_if;
+
 struct pim_interface {
        bool pim_enable : 1;
        bool pim_can_disable_join_suppression : 1;
@@ -90,6 +92,7 @@ struct pim_interface {
                                         * address of the interface */
 
        int igmp_version;                    /* IGMP version */
+       int mld_version;
        int gm_default_robustness_variable;  /* IGMP or MLD QRV */
        int gm_default_query_interval;       /* IGMP or MLD secs between general
                                                  queries */
@@ -111,6 +114,8 @@ struct pim_interface {
        struct list *gm_group_list;  /* list of struct IGMP or MLD group */
        struct hash *gm_group_hash;
 
+       struct gm_if *mld;
+
        int pim_sock_fd;                /* PIM socket file descriptor */
        struct thread *t_pim_sock_read; /* thread for reading PIM socket */
        int64_t pim_sock_creation;      /* timestamp of PIM socket creation */
index d174b8a0af7eb649cc758367c6de4b519556222b..40a3d6eecc4a716fdb2463440f495e8d6c208dc8 100644 (file)
@@ -35,6 +35,7 @@
 #include "log.h"
 #include "lib_errors.h"
 #include "pim_util.h"
+#include "pim6_mld.h"
 
 #if PIM_IPV == 6
 #define pim6_msdp_err(funcname, argtype)                                       \
@@ -2676,12 +2677,22 @@ int lib_interface_gmp_address_family_igmp_version_destroy(
 int lib_interface_gmp_address_family_mld_version_modify(
        struct nb_cb_modify_args *args)
 {
+       struct interface *ifp;
+       struct pim_interface *pim_ifp;
+
        switch (args->event) {
        case NB_EV_VALIDATE:
        case NB_EV_PREPARE:
        case NB_EV_ABORT:
+               break;
        case NB_EV_APPLY:
-               /* TBD depends on MLD data structure changes */
+               ifp = nb_running_get_entry(args->dnode, NULL, true);
+               pim_ifp = ifp->info;
+               if (!pim_ifp)
+                       return NB_ERR_INCONSISTENCY;
+
+               pim_ifp->mld_version = yang_dnode_get_uint8(args->dnode, NULL);
+               gm_ifp_update(ifp);
                break;
        }
 
@@ -2691,11 +2702,22 @@ int lib_interface_gmp_address_family_mld_version_modify(
 int lib_interface_gmp_address_family_mld_version_destroy(
        struct nb_cb_destroy_args *args)
 {
+       struct interface *ifp;
+       struct pim_interface *pim_ifp;
+
        switch (args->event) {
        case NB_EV_VALIDATE:
        case NB_EV_PREPARE:
        case NB_EV_ABORT:
+               break;
        case NB_EV_APPLY:
+               ifp = nb_running_get_entry(args->dnode, NULL, true);
+               pim_ifp = ifp->info;
+               if (!pim_ifp)
+                       return NB_ERR_INCONSISTENCY;
+
+               pim_ifp->mld_version = 2;
+               gm_ifp_update(ifp);
                break;
        }
 
@@ -2708,10 +2730,10 @@ int lib_interface_gmp_address_family_mld_version_destroy(
 int lib_interface_gmp_address_family_query_interval_modify(
        struct nb_cb_modify_args *args)
 {
-#if PIM_IPV == 4
        struct interface *ifp;
        int query_interval;
 
+#if PIM_IPV == 4
        switch (args->event) {
        case NB_EV_VALIDATE:
        case NB_EV_PREPARE:
@@ -2723,7 +2745,23 @@ int lib_interface_gmp_address_family_query_interval_modify(
                change_query_interval(ifp->info, query_interval);
        }
 #else
-       /* TBD Depends on MLD data structure changes */
+       struct pim_interface *pim_ifp;
+
+       switch (args->event) {
+       case NB_EV_VALIDATE:
+       case NB_EV_PREPARE:
+       case NB_EV_ABORT:
+               break;
+       case NB_EV_APPLY:
+               ifp = nb_running_get_entry(args->dnode, NULL, true);
+               pim_ifp = ifp->info;
+               if (!pim_ifp)
+                       return NB_ERR_INCONSISTENCY;
+
+               query_interval = yang_dnode_get_uint16(args->dnode, NULL);
+               pim_ifp->gm_default_query_interval = query_interval;
+               gm_ifp_update(ifp);
+       }
 #endif
        return NB_OK;
 }
index 41fc6dc6325b7a17210a9f42a6b9da0adca27c3f..342b93e21f6a1566e5d4042f34d173809a07253d 100644 (file)
@@ -9,6 +9,7 @@ noinst_PROGRAMS += pimd/test_igmpv3_join
 vtysh_scan += \
        pimd/pim_cmd.c \
        pimd/pim6_cmd.c \
+       pimd/pim6_mld.c \
        #end
 vtysh_daemons += pimd
 vtysh_daemons += pim6d
@@ -89,6 +90,7 @@ nodist_pimd_pimd_SOURCES = \
 pimd_pim6d_SOURCES = \
        $(pim_common) \
        pimd/pim6_main.c \
+       pimd/pim6_mld.c \
        pimd/pim6_stubs.c \
        pimd/pim6_cmd.c \
        pimd/pim6_mroute_msg.c \
@@ -155,6 +157,8 @@ noinst_HEADERS += \
        pimd/pim_vxlan.h \
        pimd/pim_vxlan_instance.h \
        pimd/pimd.h \
+       pimd/pim6_mld.h \
+       pimd/pim6_mld_protocol.h \
        pimd/mtracebis_netlink.h \
        pimd/mtracebis_routeget.h \
        pimd/pim6_cmd.h \
@@ -163,6 +167,7 @@ noinst_HEADERS += \
 clippy_scan += \
        pimd/pim_cmd.c \
        pimd/pim6_cmd.c \
+       pimd/pim6_mld.c \
        # end
 
 pimd_pimd_CFLAGS = $(AM_CFLAGS) -DPIM_IPV=4