summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/json.c13
-rw-r--r--lib/json.h22
-rw-r--r--lib/libfrr.c17
-rw-r--r--lib/libfrr.h11
-rw-r--r--lib/log_vty.c16
-rw-r--r--lib/northbound_grpc.cpp142
-rw-r--r--lib/prefix.c37
-rw-r--r--lib/routemap.c11
-rw-r--r--lib/zlog.c4
-rw-r--r--lib/zlog_live.c54
-rw-r--r--lib/zlog_live.h32
11 files changed, 283 insertions, 76 deletions
diff --git a/lib/json.c b/lib/json.c
index 854a3d59d1..d85a21215c 100644
--- a/lib/json.c
+++ b/lib/json.c
@@ -74,6 +74,19 @@ void json_object_string_addv(struct json_object *obj, const char *key,
json_object_object_add(obj, key, json_object_new_stringv(fmt, args));
}
+void json_object_object_addv(struct json_object *parent,
+ struct json_object *child, const char *keyfmt,
+ va_list args)
+{
+ char *text, buf[256];
+
+ text = vasnprintfrr(MTYPE_TMP, buf, sizeof(buf), keyfmt, args);
+ json_object_object_add(parent, text, child);
+
+ if (text != buf)
+ XFREE(MTYPE_TMP, text);
+}
+
void json_object_int_add(struct json_object *obj, const char *key, int64_t i)
{
json_object_object_add(obj, key, json_object_new_int64(i));
diff --git a/lib/json.h b/lib/json.h
index fcaa84c816..78c3836515 100644
--- a/lib/json.h
+++ b/lib/json.h
@@ -116,6 +116,28 @@ static inline struct json_object *json_object_new_stringf(const char *fmt, ...)
return ret;
}
+/* NOTE: argument order differs! (due to varargs)
+ * json_object_object_add(parent, key, child)
+ * json_object_object_addv(parent, child, key, va)
+ * json_object_object_addf(parent, child, key, ...)
+ * (would be weird to have the child inbetween the format string and args)
+ */
+PRINTFRR(3, 0)
+extern void json_object_object_addv(struct json_object *parent,
+ struct json_object *child,
+ const char *keyfmt, va_list args);
+PRINTFRR(3, 4)
+static inline void json_object_object_addf(struct json_object *parent,
+ struct json_object *child,
+ const char *keyfmt, ...)
+{
+ va_list args;
+
+ va_start(args, keyfmt);
+ json_object_object_addv(parent, child, keyfmt, args);
+ va_end(args);
+}
+
#define JSON_STR "JavaScript Object Notation\n"
/* NOTE: json-c lib has following commit 316da85 which
diff --git a/lib/libfrr.c b/lib/libfrr.c
index 10b3aad89e..042c9d3704 100644
--- a/lib/libfrr.c
+++ b/lib/libfrr.c
@@ -333,6 +333,8 @@ void frr_preinit(struct frr_daemon_info *daemon, int argc, char **argv)
umask(0027);
+ log_args_init(daemon->early_logging);
+
opt_extend(&os_always);
if (!(di->flags & FRR_NO_SPLIT_CONFIG))
opt_extend(&os_cfg);
@@ -431,6 +433,8 @@ static int frr_opt(int opt)
static int vty_port_set = 0;
static int vty_addr_set = 0;
struct option_chain *oc;
+ struct log_arg *log_arg;
+ size_t arg_len;
char *err;
switch (opt) {
@@ -613,7 +617,10 @@ static int frr_opt(int opt)
di->privs->group = optarg;
break;
case OPTION_LOG:
- di->early_logging = optarg;
+ arg_len = strlen(optarg) + 1;
+ log_arg = XCALLOC(MTYPE_TMP, sizeof(*log_arg) + arg_len);
+ memcpy(log_arg->target, optarg, arg_len);
+ log_args_add_tail(di->early_logging, log_arg);
break;
case OPTION_LOGLEVEL:
di->early_loglevel = optarg;
@@ -706,10 +713,12 @@ static struct thread_master *master;
struct thread_master *frr_init(void)
{
struct option_chain *oc;
+ struct log_arg *log_arg;
struct frrmod_runtime *module;
struct zprivs_ids_t ids;
char p_instance[16] = "", p_pathspace[256] = "";
const char *dir;
+
dir = di->module_path ? di->module_path : frr_moduledir;
srandom(time(NULL));
@@ -739,7 +748,11 @@ struct thread_master *frr_init(void)
zlog_init(di->progname, di->logname, di->instance,
ids.uid_normal, ids.gid_normal);
- command_setup_early_logging(di->early_logging, di->early_loglevel);
+ while ((log_arg = log_args_pop(di->early_logging))) {
+ command_setup_early_logging(log_arg->target,
+ di->early_loglevel);
+ XFREE(MTYPE_TMP, log_arg);
+ }
if (!frr_zclient_addr(&zclient_addr, &zclient_addr_len,
frr_zclientpath)) {
diff --git a/lib/libfrr.h b/lib/libfrr.h
index 65c1df9675..69054e4264 100644
--- a/lib/libfrr.h
+++ b/lib/libfrr.h
@@ -21,6 +21,7 @@
#ifndef _ZEBRA_FRR_H
#define _ZEBRA_FRR_H
+#include "typesafe.h"
#include "sigevent.h"
#include "privs.h"
#include "thread.h"
@@ -52,6 +53,14 @@ extern "C" {
*/
#define FRR_DETACH_LATER (1 << 6)
+PREDECL_DLIST(log_args);
+struct log_arg {
+ struct log_args_item itm;
+
+ char target[0];
+};
+DECLARE_DLIST(log_args, struct log_arg, itm);
+
enum frr_cli_mode {
FRR_CLI_CLASSIC = 0,
FRR_CLI_TRANSACTIONAL,
@@ -88,7 +97,7 @@ struct frr_daemon_info {
const char *pathspace;
bool zpathspace;
- const char *early_logging;
+ struct log_args_head early_logging[1];
const char *early_loglevel;
const char *proghelp;
diff --git a/lib/log_vty.c b/lib/log_vty.c
index 682c9ea372..ef33a39d4a 100644
--- a/lib/log_vty.c
+++ b/lib/log_vty.c
@@ -427,6 +427,22 @@ void command_setup_early_logging(const char *dest, const char *level)
set_log_file(&zt_file_cmdline, NULL, sep, nlevel);
return;
}
+ if (strcmp(type, "monitor") == 0 && sep) {
+ struct zlog_live_cfg cfg = {};
+ unsigned long fd;
+ char *endp;
+
+ sep++;
+ fd = strtoul(sep, &endp, 10);
+ if (!*sep || *endp) {
+ fprintf(stderr, "invalid monitor fd \"%s\"\n", sep);
+ exit(1);
+ }
+
+ zlog_live_open_fd(&cfg, nlevel, fd);
+ zlog_live_disown(&cfg);
+ return;
+ }
fprintf(stderr, "invalid log target \"%s\" (\"%s\")\n", type, dest);
exit(1);
diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp
index 34bb1e4986..e2a6290035 100644
--- a/lib/northbound_grpc.cpp
+++ b/lib/northbound_grpc.cpp
@@ -1,7 +1,7 @@
//
+// Copyright (c) 2021-2022, LabN Consulting, L.L.C
// Copyright (C) 2019 NetDEF, Inc.
// Renato Westphal
-// Copyright (c) 2021, LabN Consulting, L.L.C
//
// This program is free software; you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
@@ -50,6 +50,8 @@ static struct thread_master *main_master;
static struct frr_pthread *fpt;
+static bool grpc_running;
+
#define grpc_debug(...) \
do { \
if (nb_dbg_client_grpc) \
@@ -96,11 +98,11 @@ class Candidates
{
char errmsg[BUFSIZ] = {0};
- _cdb.erase(c->id);
nb_config_free(c->config);
if (c->transaction)
nb_candidate_commit_abort(c->transaction, errmsg,
sizeof(errmsg));
+ _cdb.erase(c->id);
}
struct candidate *get_candidate(uint32_t id)
@@ -116,9 +118,11 @@ class Candidates
class RpcStateBase
{
public:
+ virtual ~RpcStateBase() = default;
virtual CallState doCallback() = 0;
virtual void do_request(::frr::Northbound::AsyncService *service,
- ::grpc::ServerCompletionQueue *cq) = 0;
+ ::grpc::ServerCompletionQueue *cq,
+ bool no_copy) = 0;
};
/*
@@ -188,17 +192,22 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase
}
void do_request(::frr::Northbound::AsyncService *service,
- ::grpc::ServerCompletionQueue *cq) override
+ ::grpc::ServerCompletionQueue *cq,
+ bool no_copy) override
{
grpc_debug("%s, posting a request for: %s", __func__, name);
if (requestf) {
NewRpcState<Q, S> *copy =
- new NewRpcState(cdb, requestf, callback, name);
+ no_copy ? this
+ : new NewRpcState(cdb, requestf,
+ callback, name);
(service->*requestf)(&copy->ctx, &copy->request,
&copy->responder, cq, cq, copy);
} else {
NewRpcState<Q, S> *copy =
- new NewRpcState(cdb, requestsf, callback, name);
+ no_copy ? this
+ : new NewRpcState(cdb, requestsf,
+ callback, name);
(service->*requestsf)(&copy->ctx, &copy->request,
&copy->async_responder, cq, cq,
copy);
@@ -227,7 +236,6 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase
pthread_mutex_unlock(&_tag->cmux);
return;
}
- NewRpcState<Q, S> *orig;
const char *name;
grpc::ServerContext ctx;
@@ -238,12 +246,12 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase
Candidates *cdb;
void (*callback)(NewRpcState<Q, S> *);
- reqfunc_t requestf;
- reqsfunc_t requestsf;
+ reqfunc_t requestf = NULL;
+ reqsfunc_t requestsf = NULL;
pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
- void *context;
+ void *context = 0;
CallState state = CREATE;
};
@@ -268,10 +276,10 @@ static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding)
}
static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path,
- const std::string &value)
+ const char *value)
{
- LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(),
- value.c_str(), LYD_NEW_PATH_UPDATE, &dnode);
+ LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value,
+ LYD_NEW_PATH_UPDATE, &dnode);
if (err != LY_SUCCESS) {
flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s",
__func__, ly_errmsg(ly_native_ctx));
@@ -698,8 +706,8 @@ void HandleUnaryEditCandidate(
auto pvs = tag->request.update();
for (const frr::PathValue &pv : pvs) {
- if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), pv.value())
- != 0) {
+ if (yang_dnode_edit(candidate_tmp->dnode, pv.path(),
+ pv.value().c_str()) != 0) {
nb_config_free(candidate_tmp);
tag->responder.Finish(
@@ -1226,7 +1234,7 @@ void HandleUnaryExecute(
frr::NAME##Response>( \
(cdb), &frr::Northbound::AsyncService::Request##NAME, \
&HandleUnary##NAME, #NAME); \
- _rpcState->do_request(service, s_cq); \
+ _rpcState->do_request(&service, cq.get(), true); \
} while (0)
#define REQUEST_NEWRPC_STREAMING(NAME, cdb) \
@@ -1235,7 +1243,7 @@ void HandleUnaryExecute(
frr::NAME##Response>( \
(cdb), &frr::Northbound::AsyncService::Request##NAME, \
&HandleStreaming##NAME, #NAME); \
- _rpcState->do_request(service, s_cq); \
+ _rpcState->do_request(&service, cq.get(), true); \
} while (0)
struct grpc_pthread_attr {
@@ -1244,8 +1252,8 @@ struct grpc_pthread_attr {
};
// Capture these objects so we can try to shut down cleanly
-static std::unique_ptr<grpc::Server> s_server;
-static grpc::ServerCompletionQueue *s_cq;
+static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER;
+static grpc::Server *s_server;
static void *grpc_pthread_start(void *arg)
{
@@ -1255,18 +1263,22 @@ static void *grpc_pthread_start(void *arg)
Candidates candidates;
grpc::ServerBuilder builder;
std::stringstream server_address;
- frr::Northbound::AsyncService *service =
- new frr::Northbound::AsyncService();
+ frr::Northbound::AsyncService service;
frr_pthread_set_name(fpt);
server_address << "0.0.0.0:" << port;
builder.AddListeningPort(server_address.str(),
grpc::InsecureServerCredentials());
- builder.RegisterService(service);
- auto cq = builder.AddCompletionQueue();
- s_cq = cq.get();
- s_server = builder.BuildAndStart();
+ builder.RegisterService(&service);
+ builder.AddChannelArgument(
+ GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
+ std::unique_ptr<grpc::ServerCompletionQueue> cq =
+ builder.AddCompletionQueue();
+ std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
+ s_server = server.get();
+
+ grpc_running = true;
/* Schedule all RPC handlers */
REQUEST_NEWRPC(GetCapabilities, NULL);
@@ -1287,20 +1299,25 @@ static void *grpc_pthread_start(void *arg)
server_address.str().c_str());
/* Process inbound RPCs */
- while (true) {
- void *tag;
- bool ok;
-
- s_cq->Next(&tag, &ok);
- if (!ok)
+ bool ok;
+ void *tag;
+ while (grpc_running) {
+ if (!cq->Next(&tag, &ok)) {
+ grpc_debug("%s: CQ empty exiting", __func__);
break;
+ }
- grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__,
- tag, ok);
+ grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag,
+ ok);
+
+ if (!ok || !grpc_running) {
+ delete static_cast<RpcStateBase *>(tag);
+ break;
+ }
RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
CallState state = rpc->doCallback();
- grpc_debug("%s: Callback returned RPC State: %s", __func__,
+ grpc_debug("%s: callback returned RPC State: %s", __func__,
call_states[state]);
/*
@@ -1310,10 +1327,30 @@ static void *grpc_pthread_start(void *arg)
* to be called back once more in the FINISH state (from the
* user indicating Finish() for cleanup.
*/
- if (state == FINISH)
- rpc->do_request(service, s_cq);
+ if (state == FINISH && grpc_running)
+ rpc->do_request(&service, cq.get(), false);
+ }
+
+ /* This was probably done for us to get here, but let's be safe */
+ pthread_mutex_lock(&s_server_lock);
+ grpc_running = false;
+ if (s_server) {
+ grpc_debug("%s: shutdown server and CQ", __func__);
+ server->Shutdown();
+ s_server = NULL;
+ }
+ pthread_mutex_unlock(&s_server_lock);
+
+ grpc_debug("%s: shutting down CQ", __func__);
+ cq->Shutdown();
+
+ grpc_debug("%s: draining the CQ", __func__);
+ while (cq->Next(&tag, &ok)) {
+ grpc_debug("%s: drain tag %p", __func__, tag);
+ delete static_cast<RpcStateBase *>(tag);
}
+ zlog_info("%s: exiting from grpc pthread", __func__);
return NULL;
}
@@ -1325,6 +1362,8 @@ static int frr_grpc_init(uint port)
.stop = NULL,
};
+ grpc_debug("%s: entered", __func__);
+
fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc");
fpt->data = reinterpret_cast<void *>((intptr_t)port);
@@ -1340,24 +1379,27 @@ static int frr_grpc_init(uint port)
static int frr_grpc_finish(void)
{
- // Shutdown the grpc server
- if (s_server) {
- s_server->Shutdown();
- s_cq->Shutdown();
-
- // And drain the queue
- void *ignore;
- bool ok;
+ grpc_debug("%s: entered", __func__);
- while (s_cq->Next(&ignore, &ok))
- ;
- }
+ if (!fpt)
+ return 0;
- if (fpt) {
- pthread_join(fpt->thread, NULL);
- frr_pthread_destroy(fpt);
+ /*
+ * Shut the server down here in main thread. This will cause the wait on
+ * the completion queue (cq.Next()) to exit and cleanup everything else.
+ */
+ pthread_mutex_lock(&s_server_lock);
+ grpc_running = false;
+ if (s_server) {
+ grpc_debug("%s: shutdown server", __func__);
+ s_server->Shutdown();
+ s_server = NULL;
}
+ pthread_mutex_unlock(&s_server_lock);
+ grpc_debug("%s: joining and destroy grpc thread", __func__);
+ pthread_join(fpt->thread, NULL);
+ frr_pthread_destroy(fpt);
return 0;
}
diff --git a/lib/prefix.c b/lib/prefix.c
index 90ab48a13b..89c5be8f38 100644
--- a/lib/prefix.c
+++ b/lib/prefix.c
@@ -1071,6 +1071,26 @@ const char *prefix2str(union prefixconstptr pu, char *str, int size)
return str;
}
+static ssize_t prefixhost2str(struct fbuf *fbuf, union prefixconstptr pu)
+{
+ const struct prefix *p = pu.p;
+ char buf[PREFIX2STR_BUFFER];
+
+ switch (p->family) {
+ case AF_INET:
+ case AF_INET6:
+ inet_ntop(p->family, &p->u.prefix, buf, sizeof(buf));
+ return bputs(fbuf, buf);
+
+ case AF_ETHERNET:
+ prefix_mac2str(&p->u.prefix_eth, buf, sizeof(buf));
+ return bputs(fbuf, buf);
+
+ default:
+ return bprintfrr(fbuf, "{prefix.af=%dPF}", p->family);
+ }
+}
+
void prefix_mcast_inet4_dump(const char *onfail, struct in_addr addr,
char *buf, int buf_size)
{
@@ -1458,13 +1478,24 @@ printfrr_ext_autoreg_p("FX", printfrr_pfx);
static ssize_t printfrr_pfx(struct fbuf *buf, struct printfrr_eargs *ea,
const void *ptr)
{
- char cbuf[PREFIX_STRLEN];
+ bool host_only = false;
+
+ if (ea->fmt[0] == 'h') {
+ ea->fmt++;
+ host_only = true;
+ }
if (!ptr)
return bputs(buf, "(null)");
- prefix2str(ptr, cbuf, sizeof(cbuf));
- return bputs(buf, cbuf);
+ if (host_only)
+ return prefixhost2str(buf, (struct prefix *)ptr);
+ else {
+ char cbuf[PREFIX_STRLEN];
+
+ prefix2str(ptr, cbuf, sizeof(cbuf));
+ return bputs(buf, cbuf);
+ }
}
printfrr_ext_autoreg_p("PSG4", printfrr_psg);
diff --git a/lib/routemap.c b/lib/routemap.c
index 7f733c8114..9afe18d10b 100644
--- a/lib/routemap.c
+++ b/lib/routemap.c
@@ -1799,12 +1799,11 @@ static struct list *route_map_get_index_list(struct route_node **rn,
/*
* This function returns the route-map index that best matches the prefix.
*/
-static struct route_map_index *route_map_get_index(struct route_map *map,
- const struct prefix *prefix,
- void *object,
- uint8_t *match_ret)
+static struct route_map_index *
+route_map_get_index(struct route_map *map, const struct prefix *prefix,
+ void *object, enum route_map_cmd_result_t *match_ret)
{
- int ret = 0;
+ enum route_map_cmd_result_t ret = RMAP_NOMATCH;
struct list *candidate_rmap_list = NULL;
struct route_node *rn = NULL;
struct listnode *ln = NULL, *nn = NULL;
@@ -2559,7 +2558,7 @@ route_map_result_t route_map_apply_ext(struct route_map *map,
if ((!map->optimization_disabled)
&& (map->ipv4_prefix_table || map->ipv6_prefix_table)) {
index = route_map_get_index(map, prefix, match_object,
- (uint8_t *)&match_ret);
+ &match_ret);
if (index) {
index->applied++;
if (rmap_debug)
diff --git a/lib/zlog.c b/lib/zlog.c
index 85606d2624..e0bb34a258 100644
--- a/lib/zlog.c
+++ b/lib/zlog.c
@@ -401,7 +401,7 @@ void zlog_tls_buffer_flush(void)
return;
rcu_read_lock();
- frr_each (zlog_targets, &zlog_targets, zt) {
+ frr_each_safe (zlog_targets, &zlog_targets, zt) {
if (!zt->logfn)
continue;
@@ -431,7 +431,7 @@ static void vzlog_notls(const struct xref_logmsg *xref, int prio,
msg->stackbufsz = sizeof(stackbuf);
rcu_read_lock();
- frr_each (zlog_targets, &zlog_targets, zt) {
+ frr_each_safe (zlog_targets, &zlog_targets, zt) {
if (prio > zt->prio_min)
continue;
if (!zt->logfn)
diff --git a/lib/zlog_live.c b/lib/zlog_live.c
index fbe0e5ee49..931aa3461d 100644
--- a/lib/zlog_live.c
+++ b/lib/zlog_live.c
@@ -22,6 +22,7 @@
#include "frrcu.h"
#include "zlog.h"
#include "printfrr.h"
+#include "network.h"
DEFINE_MTYPE_STATIC(LOG, LOG_LIVE, "log vtysh live target");
@@ -39,6 +40,7 @@ struct zlt_live {
struct rcu_head head_self;
atomic_uint_fast32_t state;
+ atomic_uint_fast32_t lost_msgs;
};
static void zlog_live(struct zlog_target *zt, struct zlog_msg *msgs[],
@@ -63,14 +65,16 @@ static void zlog_live(struct zlog_target *zt, struct zlog_msg *msgs[],
for (i = 0; i < nmsgs; i++) {
const struct fmt_outpos *argpos;
- size_t n_argpos, arghdrlen;
+ size_t n_argpos, texthdrlen;
struct zlog_msg *msg = msgs[i];
int prio = zlog_msg_prio(msg);
+ const struct xref_logmsg *xref;
+ intmax_t pid, tid;
if (prio > zt->prio_min)
continue;
- zlog_msg_args(msg, &arghdrlen, &n_argpos, &argpos);
+ zlog_msg_args(msg, &texthdrlen, &n_argpos, &argpos);
mmh->msg_hdr.msg_iov = iov;
@@ -89,14 +93,29 @@ static void zlog_live(struct zlog_target *zt, struct zlog_msg *msgs[],
iov++;
zlog_msg_tsraw(msg, &ts);
+ zlog_msg_pid(msg, &pid, &tid);
+ xref = zlog_msg_xref(msg);
hdr->ts_sec = ts.tv_sec;
hdr->ts_nsec = ts.tv_nsec;
- hdr->prio = zlog_msg_prio(msg);
+ hdr->pid = pid;
+ hdr->tid = tid;
+ hdr->lost_msgs = atomic_load_explicit(&zte->lost_msgs,
+ memory_order_relaxed);
+ hdr->prio = prio;
hdr->flags = 0;
hdr->textlen = textlen;
- hdr->arghdrlen = arghdrlen;
+ hdr->texthdrlen = texthdrlen;
hdr->n_argpos = n_argpos;
+ if (xref) {
+ memcpy(hdr->uid, xref->xref.xrefdata->uid,
+ sizeof(hdr->uid));
+ hdr->ec = xref->ec;
+ } else {
+ memset(hdr->uid, 0, sizeof(hdr->uid));
+ hdr->ec = 0;
+ }
+ hdr->hdrlen = sizeof(*hdr) + sizeof(*argpos) * n_argpos;
mmh->msg_hdr.msg_iovlen = iov - mmh->msg_hdr.msg_iov;
mmh++;
@@ -109,6 +128,12 @@ static void zlog_live(struct zlog_target *zt, struct zlog_msg *msgs[],
for (size_t msgpos = 0; msgpos < msgtotal; msgpos += sent) {
sent = sendmmsg(fd, mmhs + msgpos, msgtotal - msgpos, 0);
+ if (sent <= 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ atomic_fetch_add_explicit(&zte->lost_msgs,
+ msgtotal - msgpos,
+ memory_order_relaxed);
+ break;
+ }
if (sent <= 0)
goto out_err;
}
@@ -134,7 +159,7 @@ static void zlog_live_sigsafe(struct zlog_target *zt, const char *text,
size_t len)
{
struct zlt_live *zte = container_of(zt, struct zlt_live, zt);
- struct zlog_live_hdr hdr[1];
+ struct zlog_live_hdr hdr[1] = {};
struct iovec iovs[2], *iov = iovs;
struct timespec ts;
int fd;
@@ -143,14 +168,12 @@ static void zlog_live_sigsafe(struct zlog_target *zt, const char *text,
if (fd < 0)
return;
- clock_gettime(CLOCK_MONOTONIC, &ts);
+ clock_gettime(CLOCK_REALTIME, &ts);
hdr->ts_sec = ts.tv_sec;
hdr->ts_nsec = ts.tv_nsec;
hdr->prio = LOG_CRIT;
- hdr->flags = 0;
hdr->textlen = len;
- hdr->n_argpos = 0;
iov->iov_base = (char *)hdr;
iov->iov_len = sizeof(hdr);
@@ -166,8 +189,6 @@ static void zlog_live_sigsafe(struct zlog_target *zt, const char *text,
void zlog_live_open(struct zlog_live_cfg *cfg, int prio_min, int *other_fd)
{
int sockets[2];
- struct zlt_live *zte;
- struct zlog_target *zt;
if (cfg->target)
zlog_live_close(cfg);
@@ -192,12 +213,23 @@ void zlog_live_open(struct zlog_live_cfg *cfg, int prio_min, int *other_fd)
shutdown(sockets[0], SHUT_RD);
*other_fd = sockets[1];
+ zlog_live_open_fd(cfg, prio_min, sockets[0]);
+}
+
+void zlog_live_open_fd(struct zlog_live_cfg *cfg, int prio_min, int fd)
+{
+ struct zlt_live *zte;
+ struct zlog_target *zt;
+
+ if (cfg->target)
+ zlog_live_close(cfg);
zt = zlog_target_clone(MTYPE_LOG_LIVE, NULL, sizeof(*zte));
zte = container_of(zt, struct zlt_live, zt);
cfg->target = zte;
- zte->fd = sockets[0];
+ set_nonblocking(fd);
+ zte->fd = fd;
zte->zt.prio_min = prio_min;
zte->zt.logfn = zlog_live;
zte->zt.logfn_sigsafe = zlog_live_sigsafe;
diff --git a/lib/zlog_live.h b/lib/zlog_live.h
index c948baeab1..55e60ae674 100644
--- a/lib/zlog_live.h
+++ b/lib/zlog_live.h
@@ -20,13 +20,42 @@
#include "printfrr.h"
struct zlog_live_hdr {
+ /* timestamp (CLOCK_REALTIME) */
uint64_t ts_sec;
uint32_t ts_nsec;
+
+ /* length of zlog_live_hdr, including variable length bits and
+ * possible future extensions - aka start of text
+ */
+ uint32_t hdrlen;
+
+ /* process & thread ID, meaning depends on OS */
+ int64_t pid;
+ int64_t tid;
+
+ /* number of lost messages due to best-effort non-blocking mode */
+ uint32_t lost_msgs;
+ /* syslog priority value */
uint32_t prio;
+ /* flags: currently unused */
uint32_t flags;
+ /* length of message text - extra data (e.g. future key/value metadata)
+ * may follow after it
+ */
uint32_t textlen;
+ /* length of "[XXXXX-XXXXX][EC 0] " header; consumer may want to skip
+ * over it if using the raw values below. Note that this text may be
+ * absent depending on "log error-category" and "log unique-id"
+ * settings
+ */
+ uint32_t texthdrlen;
+
+ /* xref unique identifier, "XXXXX-XXXXX\0" = 12 bytes */
+ char uid[12];
+ /* EC value */
+ uint32_t ec;
- uint32_t arghdrlen;
+ /* recorded printf formatting argument positions (variable length) */
uint32_t n_argpos;
struct fmt_outpos argpos[0];
};
@@ -41,6 +70,7 @@ struct zlog_live_cfg {
extern void zlog_live_open(struct zlog_live_cfg *cfg, int prio_min,
int *other_fd);
+extern void zlog_live_open_fd(struct zlog_live_cfg *cfg, int prio_min, int fd);
static inline bool zlog_live_is_null(struct zlog_live_cfg *cfg)
{