Nginx OTel module.

This commit is contained in:
Pavel Pautov 2022-11-29 00:17:12 -08:00 committed by dplotnikov-f5
parent a0389d8296
commit 3430e85c34
8 changed files with 1042 additions and 0 deletions

220
src/batch_exporter.hpp Normal file
View file

@ -0,0 +1,220 @@
#pragma once
#include <nginx.h>
#include <thread>
#include <mutex>
#include <vector>
#include "str_view.hpp"
#include "trace_context.hpp"
#include "trace_service_client.hpp"
class BatchExporter {
public:
typedef TraceServiceClient::Request Request;
typedef TraceServiceClient::Response Response;
struct SpanInfo {
StrView name;
TraceContext trace;
opentelemetry::trace::SpanId parent;
uint64_t start;
uint64_t end;
};
class Span {
public:
Span(const Span&) = delete;
Span& operator=(const Span&) = delete;
Span(const SpanInfo& info, opentelemetry::proto::trace::v1::Span* span)
: span(span)
{
span->set_kind(
opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER);
// Short setters, like set_name(), use additional std::string as an
// intermediary at least up to v21.5 of protobuf.
set(span->mutable_name(), info.name);
set(span->mutable_trace_id(), info.trace.traceId.Id());
set(span->mutable_span_id(), info.trace.spanId.Id());
set(span->mutable_trace_state(), info.trace.state);
if (info.parent.IsValid()) {
set(span->mutable_parent_span_id(), info.parent.Id());
} else {
span->mutable_parent_span_id()->clear();
}
span->set_start_time_unix_nano(info.start);
span->set_end_time_unix_nano(info.end);
span->mutable_status()->clear_code();
}
~Span()
{
truncate(span->mutable_attributes(), attrSize);
}
void add(StrView key, StrView value)
{
add(key)->mutable_value()->mutable_string_value()->assign(
value.data(), value.size());
}
void add(StrView key, int value)
{
add(key)->mutable_value()->set_int_value(value);
}
void setError()
{
span->mutable_status()->set_code(
opentelemetry::proto::trace::v1::Status::STATUS_CODE_ERROR);
}
private:
template <class ByteRange>
static void set(std::string* str, const ByteRange& range)
{
str->assign((const char*)range.data(), range.size());
}
opentelemetry::proto::common::v1::KeyValue* add(StrView key)
{
auto attrs = span->mutable_attributes();
auto newAttr = attrs->size() > attrSize ?
attrs->Mutable(attrSize) : attrs->Add();
newAttr->mutable_key()->assign(key.data(), key.size());
++attrSize;
return newAttr;
}
opentelemetry::proto::trace::v1::Span* span;
int attrSize{0};
};
BatchExporter(StrView target,
size_t batchSize, size_t batchCount, StrView serviceName) :
batchSize(batchSize), client(std::string(target))
{
free.reserve(batchCount);
while (batchCount-- > 0) {
free.emplace_back();
auto resourceSpans = free.back().add_resource_spans();
auto attr = resourceSpans->mutable_resource()->add_attributes();
attr->set_key("service.name");
attr->mutable_value()->set_string_value(std::string(serviceName));
auto scopeSpans = resourceSpans->add_scope_spans();
scopeSpans->mutable_scope()->set_name("nginx");
scopeSpans->mutable_scope()->set_version(NGINX_VERSION);
scopeSpans->mutable_spans()->Reserve(batchSize);
}
worker = std::thread(&TraceServiceClient::run, &client);
}
~BatchExporter()
{
client.stop();
worker.join();
}
template <class F>
bool add(const SpanInfo& info, F fillSpan)
{
if (currentSize == (int)batchSize) {
sendBatch(current);
currentSize = -1;
}
if (currentSize == -1) {
std::unique_lock<std::mutex> lock(mutex);
if (free.empty()) {
return false;
}
current = std::move(free.back());
free.pop_back();
currentSize = 0;
}
auto spans = getSpans(current);
Span span(info, spans->size() > currentSize ?
spans->Mutable(currentSize) : spans->Add());
fillSpan(span);
++currentSize;
return true;
}
void flush()
{
if (currentSize <= 0) {
return;
}
truncate(getSpans(current), currentSize);
sendBatch(current);
currentSize = -1;
}
private:
const size_t batchSize;
TraceServiceClient client;
std::mutex mutex;
std::vector<Request> free;
Request current;
int currentSize{-1};
std::thread worker;
static auto getSpans(Request& req) -> decltype(
req.mutable_resource_spans(0)->mutable_scope_spans(0)->mutable_spans())
{
return req.mutable_resource_spans(0)->mutable_scope_spans(0)->
mutable_spans();
}
template <class T>
static void truncate(T* items, int newSize)
{
// unlike DeleteSubrange(), this doesn't destruct removed items
int tailSize = items->size() - newSize;
while (tailSize-- > 0) {
items->RemoveLast();
}
}
void sendBatch(Request& request)
{
client.send(request,
[this](Request req, Response, grpc::Status status) {
std::unique_lock<std::mutex> lock(mutex);
free.push_back(std::move(req));
lock.unlock();
if (!status.ok()) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"OTel export failure: %s",
status.error_message().c_str());
}
});
}
};

452
src/http_module.cpp Normal file
View file

@ -0,0 +1,452 @@
extern "C" {
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
}
#include "str_view.hpp"
#include "trace_context.hpp"
#include "batch_exporter.hpp"
extern ngx_module_t gHttpModule;
namespace {
struct MainConf {
ngx_str_t endpoint;
ngx_msec_t interval;
size_t batchSize;
size_t batchCount;
ngx_str_t serviceName;
};
struct LocationConf {
ngx_http_complex_value_t* trace;
};
char* setExporter(ngx_conf_t* cf, ngx_command_t* cmd, void* conf);
ngx_command_t gCommands[] = {
{ ngx_string("otel_exporter"),
NGX_HTTP_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
setExporter,
NGX_HTTP_MAIN_CONF_OFFSET },
{ ngx_string("otel_service_name"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(MainConf, serviceName) },
{ ngx_string("otel_trace"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_http_set_complex_value_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(LocationConf, trace) },
ngx_null_command
};
ngx_command_t gExporterCommands[] = {
{ ngx_string("endpoint"),
NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
0,
offsetof(MainConf, endpoint) },
{ ngx_string("interval"),
NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
0,
offsetof(MainConf, interval) },
{ ngx_string("batch_size"),
NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
0,
offsetof(MainConf, batchSize) },
{ ngx_string("batch_count"),
NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
0,
offsetof(MainConf, batchCount) },
ngx_null_command
};
std::unique_ptr<BatchExporter> gExporter;
StrView toStrView(ngx_str_t str)
{
return StrView((char*)str.data, str.len);
}
ngx_int_t onRequestStart(ngx_http_request_t* r)
{
// don't let internal redirects to override sampling decision
if (r->internal) {
return NGX_DECLINED;
}
bool sampled = false;
auto lcf = (LocationConf*)ngx_http_get_module_loc_conf(r, gHttpModule);
if (lcf->trace != NULL) {
ngx_str_t trace;
if (ngx_http_complex_value(r, lcf->trace, &trace) != NGX_OK) {
return NGX_ERROR;
}
sampled = toStrView(trace) == "on";
}
if (sampled) {
ngx_http_set_ctx(r, &gHttpModule, gHttpModule);
}
return NGX_DECLINED;
}
StrView getServerName(ngx_http_request_t* r)
{
auto cscf = (ngx_http_core_srv_conf_t*)
ngx_http_get_module_srv_conf(r, ngx_http_core_module);
auto name = cscf->server_name;
if (name.len == 0) {
name = r->headers_in.server;
}
return toStrView(name);
}
void addDefaultAttrs(BatchExporter::Span& span, ngx_http_request_t* r)
{
// based on trace semantic conventions for HTTP from 1.16.0 OTel spec
span.add("http.method", toStrView(r->method_name));
span.add("http.target", toStrView(r->unparsed_uri));
auto clcf = (ngx_http_core_loc_conf_t*)
ngx_http_get_module_loc_conf(r, ngx_http_core_module);
if (clcf->name.len) {
span.add("http.route", toStrView(clcf->name));
}
span.add("http.scheme", r->connection->ssl ? "https" : "http");
auto protocol = toStrView(r->http_protocol);
if (protocol.size() > 5) { // "HTTP/"
span.add("http.flavor", protocol.substr(5));
}
if (r->headers_in.user_agent) {
span.add("http.user_agent", toStrView(r->headers_in.user_agent->value));
}
auto received = r->headers_in.content_length_n;
span.add("http.request_content_length", received > 0 ? received : 0);
auto sent = r->connection->sent - (off_t)r->header_size;
span.add("http.response_content_length", sent > 0 ? sent : 0);
auto status = r->err_status ? r->err_status : r->headers_out.status;
if (status) {
span.add("http.status_code", status);
if (status >= 500) {
span.setError();
}
}
span.add("net.host.name", getServerName(r));
if (ngx_connection_local_sockaddr(r->connection, NULL, 0) == NGX_OK) {
auto port = ngx_inet_get_port(r->connection->local_sockaddr);
auto defaultPort = r->connection->ssl ? 443 : 80;
if (port != defaultPort) {
span.add("net.host.port", port);
}
}
span.add("net.sock.peer.addr", toStrView(r->connection->addr_text));
span.add("net.sock.peer.port", ngx_inet_get_port(r->connection->sockaddr));
}
ngx_int_t onRequestEnd(ngx_http_request_t* r)
{
if (!ngx_http_get_module_ctx(r, gHttpModule)) {
return NGX_DECLINED;
}
auto clcf = (ngx_http_core_loc_conf_t*)ngx_http_get_module_loc_conf(
r, ngx_http_core_module);
auto now = ngx_timeofday();
auto toNanoSec = [](time_t sec, ngx_msec_t msec) -> uint64_t {
return (sec * 1000 + msec) * 1000000;
};
try {
BatchExporter::SpanInfo info{
toStrView(clcf->name), TraceContext::generate(true), {},
toNanoSec(r->start_sec, r->start_msec),
toNanoSec(now->sec, now->msec)};
bool ok = gExporter->add(info, [r](BatchExporter::Span& span) {
addDefaultAttrs(span, r);
});
if (!ok) {
static size_t dropped = 0;
static time_t lastLog = 0;
++dropped;
if (lastLog != ngx_time()) {
lastLog = ngx_time();
ngx_log_error(NGX_LOG_NOTICE, r->connection->log, 0,
"OTel dropped records: %uz", dropped);
}
}
} catch (const std::exception& e) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"OTel failed to add span: %s", e.what());
return NGX_ERROR;
}
return NGX_DECLINED;
}
ngx_int_t initModule(ngx_conf_t* cf)
{
auto cmcf = (ngx_http_core_main_conf_t*)ngx_http_conf_get_module_main_conf(
cf, ngx_http_core_module);
auto h = (ngx_http_handler_pt*)ngx_array_push(
&cmcf->phases[NGX_HTTP_REWRITE_PHASE].handlers);
if (h == NULL) {
return NGX_ERROR;
}
*h = onRequestStart;
h = (ngx_http_handler_pt*)ngx_array_push(
&cmcf->phases[NGX_HTTP_LOG_PHASE].handlers);
if (h == NULL) {
return NGX_ERROR;
}
*h = onRequestEnd;
return NGX_OK;
}
ngx_int_t initWorkerProcess(ngx_cycle_t* cycle)
{
auto mcf = (MainConf*)ngx_http_cycle_get_module_main_conf(
cycle, gHttpModule);
try {
gExporter.reset(new BatchExporter(
toStrView(mcf->endpoint),
mcf->batchSize,
mcf->batchCount,
toStrView(mcf->serviceName)));
} catch (const std::exception& e) {
ngx_log_error(NGX_LOG_CRIT, cycle->log, 0,
"OTel worker init error: %s", e.what());
return NGX_ERROR;
}
static ngx_connection_t dummy;
static ngx_event_t flushEvent;
flushEvent.data = &dummy;
flushEvent.log = cycle->log;
flushEvent.cancelable = 1;
flushEvent.handler = [](ngx_event_t* ev) {
try {
gExporter->flush();
} catch (const std::exception& e) {
ngx_log_error(NGX_LOG_CRIT, ev->log, 0,
"OTel flush error: %s", e.what());
}
auto mcf = (MainConf*)ngx_http_cycle_get_module_main_conf(
ngx_cycle, gHttpModule);
ngx_add_timer(ev, mcf->interval);
};
ngx_add_timer(&flushEvent, mcf->interval);
return NGX_OK;
}
void exitWorkerProcess(ngx_cycle_t* cycle)
{
try {
gExporter->flush();
} catch (const std::exception& e) {
ngx_log_error(NGX_LOG_CRIT, cycle->log, 0,
"OTel flush error: %s", e.what());
}
gExporter.reset();
}
char* setExporter(ngx_conf_t* cf, ngx_command_t* cmd, void* conf)
{
auto mcf = (MainConf*)conf;
if (mcf->endpoint.len) {
return (char*)"is duplicate";
}
auto cfCopy = *cf;
cfCopy.handler = [](ngx_conf_t* cf, ngx_command_t*, void*) {
auto name = (ngx_str_t*)cf->args->elts;
for (auto cmd = gExporterCommands; cmd->name.len; cmd++) {
if (ngx_strcmp(name->data, cmd->name.data) != 0) {
continue;
}
if (cf->args->nelts != 2) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid number of arguments in \"%V\" "
"directive of \"otel_exporter\"", name);
return (char*)NGX_CONF_ERROR;
}
auto rv = cmd->set(cf, cmd, cf->handler_conf);
if (rv == NGX_CONF_OK) {
return rv;
}
if (rv != NGX_CONF_ERROR) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"\"%V\" directive of \"otel_exporter\" %s", name, rv);
}
return (char*)NGX_CONF_ERROR;
}
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"unknown directive \"%V\" in \"otel_exporter\"", name);
return (char*)NGX_CONF_ERROR;
};
cfCopy.handler_conf = mcf;
auto rv = ngx_conf_parse(&cfCopy, NULL);
if (rv != NGX_CONF_OK) {
return rv;
}
if (mcf->endpoint.len == 0) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"\"otel_exporter\" requires \"endpoint\"");
return (char*)NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}
void* createMainConf(ngx_conf_t* cf)
{
auto mcf = (MainConf*)ngx_pcalloc(cf->pool, sizeof(MainConf));
if (mcf == NULL) {
return NULL;
}
mcf->interval = NGX_CONF_UNSET_MSEC;
mcf->batchSize = NGX_CONF_UNSET_SIZE;
mcf->batchCount = NGX_CONF_UNSET_SIZE;
return mcf;
}
char* initMainConf(ngx_conf_t* cf, void* conf)
{
auto mcf = (MainConf*)conf;
if (mcf->endpoint.len == 0) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"\"otel_exporter\" block is missing");
return (char*)NGX_CONF_ERROR;
}
ngx_conf_init_msec_value(mcf->interval, 5000);
ngx_conf_init_size_value(mcf->batchSize, 512);
ngx_conf_init_size_value(mcf->batchCount, 4);
if (mcf->serviceName.data == NULL) {
mcf->serviceName = ngx_string("unknown_service:nginx");
}
return NGX_CONF_OK;
}
void* createLocationConf(ngx_conf_t* cf)
{
auto conf = (LocationConf*)ngx_pcalloc(cf->pool, sizeof(LocationConf));
if (conf == NULL) {
return NULL;
}
conf->trace = (ngx_http_complex_value_t*)NGX_CONF_UNSET_PTR;
return conf;
}
char* mergeLocationConf(ngx_conf_t* cf, void* parent, void* child)
{
auto prev = (LocationConf*)parent;
auto conf = (LocationConf*)child;
ngx_conf_merge_ptr_value(conf->trace, prev->trace, NULL);
return NGX_CONF_OK;
}
ngx_http_module_t gHttpModuleCtx = {
NULL, /* preconfiguration */
initModule, /* postconfiguration */
createMainConf, /* create main configuration */
initMainConf, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
createLocationConf, /* create location configuration */
mergeLocationConf /* merge location configuration */
};
}
ngx_module_t gHttpModule = {
NGX_MODULE_V1,
&gHttpModuleCtx, /* module context */
gCommands, /* module directives */
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
initWorkerProcess, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
exitWorkerProcess, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};

14
src/modules.c Normal file
View file

@ -0,0 +1,14 @@
#include <ngx_config.h>
#include <ngx_core.h>
extern ngx_module_t gHttpModule;
ngx_module_t* ngx_modules[] = {
&gHttpModule,
NULL
};
char* ngx_module_names[] = {
"ngx_http_otel_module",
NULL
};

5
src/str_view.hpp Normal file
View file

@ -0,0 +1,5 @@
#pragma once
#include <opentelemetry/nostd/string_view.h>
typedef opentelemetry::nostd::string_view StrView;

25
src/trace_context.hpp Normal file
View file

@ -0,0 +1,25 @@
#pragma once
#include <opentelemetry/trace/trace_id.h>
#include <opentelemetry/trace/span_id.h>
#include <opentelemetry/sdk/trace/random_id_generator.h>
#include "str_view.hpp"
struct TraceContext {
opentelemetry::trace::TraceId traceId;
opentelemetry::trace::SpanId spanId;
bool sampled;
StrView state;
static TraceContext generate(bool sampled, TraceContext parent = {})
{
opentelemetry::sdk::trace::RandomIdGenerator idGen;
return {parent.traceId.IsValid() ?
parent.traceId : idGen.GenerateTraceId(),
idGen.GenerateSpanId(),
sampled,
parent.state};
}
};

View file

@ -0,0 +1,108 @@
#pragma once
#include <functional>
#include <grpcpp/grpcpp.h>
#include <grpcpp/alarm.h>
#include <opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h>
namespace otel_proto_trace = opentelemetry::proto::collector::trace::v1;
class TraceServiceClient {
public:
typedef otel_proto_trace::ExportTraceServiceRequest Request;
typedef otel_proto_trace::ExportTraceServiceResponse Response;
typedef otel_proto_trace::TraceService TraceService;
typedef std::function<void (Request, Response, grpc::Status)>
ResponseCb;
TraceServiceClient(const std::string& target)
{
auto channel = grpc::CreateChannel(
target, grpc::InsecureChannelCredentials());
channel->GetState(true); // trigger 'connecting' state
stub = TraceService::NewStub(channel);
}
void send(Request& req, ResponseCb cb)
{
std::unique_ptr<ActiveCall> call{new ActiveCall{}};
call->request = std::move(req);
call->cb = std::move(cb);
++pending;
// post actual RPC to worker thread to minimize load on caller
gpr_timespec past{};
call->sendAlarm.Set(&queue, past, call.release());
}
void run()
{
void* tag = NULL;
bool ok = false;
while (queue.Next(&tag, &ok)) {
assert(ok);
if (tag == &shutdownAlarm) {
shutdown = true;
} else {
std::unique_ptr<ActiveCall> call{(ActiveCall*)tag};
if (!call->sent) {
--pending;
call->responseReader = stub->AsyncExport(
&call->context, call->request, &queue);
call->sent = true;
call->responseReader->Finish(
&call->response, &call->status, call.get());
call.release();
} else {
call->cb(std::move(call->request),
std::move(call->response), std::move(call->status));
}
}
// It's not clear if gRPC guarantees any order for expired alarms,
// so we use 'pending' counter to ensure CQ shutdown happens last.
// https://github.com/grpc/grpc/issues/31398
if (shutdown && pending == 0) {
queue.Shutdown();
}
}
}
void stop()
{
gpr_timespec past{};
shutdownAlarm.Set(&queue, past, &shutdownAlarm);
}
private:
struct ActiveCall {
grpc::Alarm sendAlarm;
bool sent;
grpc::ClientContext context;
Request request;
Response response;
grpc::Status status;
std::unique_ptr<grpc::ClientAsyncResponseReader<Response>>
responseReader;
ResponseCb cb;
};
std::unique_ptr<TraceService::Stub> stub;
grpc::CompletionQueue queue;
grpc::Alarm shutdownAlarm;
std::atomic<int> pending{0};
bool shutdown{false};
};