From 9d3e21194cb1248ce403d81cb9ff4c7354100994 Mon Sep 17 00:00:00 2001 From: Pavel Pautov Date: Tue, 29 Nov 2022 00:17:12 -0800 Subject: [PATCH] Nginx OTel module. --- CMakeLists.txt | 124 ++++++++++ README | 15 ++ src/batch_exporter.hpp | 220 +++++++++++++++++ src/http_module.cpp | 452 +++++++++++++++++++++++++++++++++++ src/modules.c | 14 ++ src/str_view.hpp | 5 + src/trace_context.hpp | 25 ++ src/trace_service_client.hpp | 108 +++++++++ 8 files changed, 963 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 README create mode 100644 src/batch_exporter.hpp create mode 100644 src/http_module.cpp create mode 100644 src/modules.c create mode 100644 src/str_view.hpp create mode 100644 src/trace_context.hpp create mode 100644 src/trace_service_client.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..7815644 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,124 @@ +cmake_minimum_required(VERSION 3.16.3) +project(nginx-otel) + +set(NGX_OTEL_NGINX_BUILD_DIR "" + CACHE PATH "Nginx build (objs) dir") +set(NGX_OTEL_NGINX_DIR "${NGX_OTEL_NGINX_BUILD_DIR}/.." + CACHE PATH "Nginx source dir") + +set(NGX_OTEL_FETCH_DEPS ON CACHE BOOL "Download dependencies") +set(NGX_OTEL_PROTO_DIR "" CACHE PATH "OTel proto files root") + +if(NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE RelWithDebInfo) +endif() + +set(CMAKE_CXX_VISIBILITY_PRESET hidden) + +if(NGX_OTEL_FETCH_DEPS) + include(FetchContent) + + FetchContent_Declare( + grpc + GIT_REPOSITORY https://github.com/grpc/grpc + GIT_TAG 18dda3c586b2607d8daead6b97922e59d867bb7d # v1.46.6 + GIT_SUBMODULES third_party/protobuf third_party/abseil-cpp + GIT_SHALLOW ON) + + set(gRPC_USE_PROTO_LITE ON CACHE INTERNAL "") + set(gRPC_INSTALL OFF CACHE INTERNAL "") + set(gRPC_CARES_PROVIDER package CACHE INTERNAL "") + set(gRPC_RE2_PROVIDER package CACHE INTERNAL "") + set(gRPC_SSL_PROVIDER package CACHE INTERNAL "") + set(gRPC_ZLIB_PROVIDER package CACHE INTERNAL "") + + FetchContent_Declare( + otelcpp + GIT_REPOSITORY https://github.com/open-telemetry/opentelemetry-cpp + GIT_TAG 57bf8c2b0e85215a61602f559522d08caa4d2fb8 # v1.8.1 + GIT_SUBMODULES third_party/opentelemetry-proto + GIT_SHALLOW ON) + + set(BUILD_TESTING OFF CACHE INTERNAL "") + set(WITH_EXAMPLES OFF CACHE INTERNAL "") + + set(CMAKE_POSITION_INDEPENDENT_CODE ON) + set(CMAKE_POLICY_DEFAULT_CMP0063 NEW) + + FetchContent_MakeAvailable(grpc otelcpp) + + set_property(DIRECTORY ${grpc_SOURCE_DIR} + PROPERTY EXCLUDE_FROM_ALL YES) + set_property(DIRECTORY ${otelcpp_SOURCE_DIR} + PROPERTY EXCLUDE_FROM_ALL YES) + + if(NOT NGX_OTEL_PROTO_DIR) + set(NGX_OTEL_PROTO_DIR + "${otelcpp_SOURCE_DIR}/third_party/opentelemetry-proto") + endif() + + add_library(opentelemetry-cpp::trace ALIAS opentelemetry_trace) + add_library(gRPC::grpc++ ALIAS grpc++) + add_executable(gRPC::grpc_cpp_plugin ALIAS grpc_cpp_plugin) +else() + find_package(opentelemetry-cpp REQUIRED) + find_package(protobuf REQUIRED) + find_package(gRPC REQUIRED) +endif() + +set(PROTO_DIR ${NGX_OTEL_PROTO_DIR}) +set(PROTOS + "${PROTO_DIR}/opentelemetry/proto/common/v1/common.proto" + "${PROTO_DIR}/opentelemetry/proto/resource/v1/resource.proto" + "${PROTO_DIR}/opentelemetry/proto/trace/v1/trace.proto" + "${PROTO_DIR}/opentelemetry/proto/collector/trace/v1/trace_service.proto") + +set(PROTO_OUT_DIR "${CMAKE_CURRENT_BINARY_DIR}") +set(PROTO_SOURCES + "${PROTO_OUT_DIR}/opentelemetry/proto/common/v1/common.pb.cc" + "${PROTO_OUT_DIR}/opentelemetry/proto/resource/v1/resource.pb.cc" + "${PROTO_OUT_DIR}/opentelemetry/proto/trace/v1/trace.pb.cc" + "${PROTO_OUT_DIR}/opentelemetry/proto/collector/trace/v1/trace_service.pb.cc" + "${PROTO_OUT_DIR}/opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.cc") + +# generate protobuf code for lite runtime +add_custom_command( + OUTPUT ${PROTO_SOURCES} #${PROTO_HEADERS} + COMMAND protobuf::protoc + ARGS --proto_path ${PROTO_DIR} + --cpp_out lite:${PROTO_OUT_DIR} + --grpc_out ${PROTO_OUT_DIR} + --plugin protoc-gen-grpc=$ + ${PROTOS} + DEPENDS ${PROTOS} protobuf::protoc gRPC::grpc_cpp_plugin + VERBATIM) + +set(CMAKE_CXX_STANDARD 11) +set(CMAKE_CXX_EXTENSIONS OFF) + +add_compile_options(-Wall -Wtype-limits -Werror) + +add_library(ngx_otel_module MODULE + src/http_module.cpp + src/modules.c + ${PROTO_SOURCES}) + +# avoid 'lib' prefix in binary name +set_target_properties(ngx_otel_module PROPERTIES PREFIX "") + +# can't use OTel's WITH_ABSEIL until cmake 3.24, as it triggers find_package() +target_compile_definitions(ngx_otel_module PRIVATE HAVE_ABSEIL) + +target_include_directories(ngx_otel_module PRIVATE + ${NGX_OTEL_NGINX_BUILD_DIR} + ${NGX_OTEL_NGINX_DIR}/src/core + ${NGX_OTEL_NGINX_DIR}/src/event + ${NGX_OTEL_NGINX_DIR}/src/os/unix + ${NGX_OTEL_NGINX_DIR}/src/http + ${NGX_OTEL_NGINX_DIR}/src/http/modules + ${NGX_OTEL_NGINX_DIR}/src/http/v2 + ${PROTO_OUT_DIR}) + +target_link_libraries(ngx_otel_module + opentelemetry-cpp::trace + gRPC::grpc++) diff --git a/README b/README new file mode 100644 index 0000000..6537dc6 --- /dev/null +++ b/README @@ -0,0 +1,15 @@ +Building +-------- + +Install build tools and dependencies: + $ sudo apt install cmake build-essential libssl-dev zlib1g-dev libpcre3-dev + $ sudo apt install pkg-config libc-ares-dev libre2-dev # for gRPC + +Configure Nginx: + $ ./configure --with-compat + +Configure and build Nginx OTel module: + $ mkdir build + $ cd build + $ cmake -DNGX_OTEL_NGINX_BUILD_DIR=/path/to/configured/nginx/objs .. + $ make diff --git a/src/batch_exporter.hpp b/src/batch_exporter.hpp new file mode 100644 index 0000000..eaeb9d4 --- /dev/null +++ b/src/batch_exporter.hpp @@ -0,0 +1,220 @@ +#pragma once + +#include + +#include +#include +#include + +#include "str_view.hpp" +#include "trace_context.hpp" +#include "trace_service_client.hpp" + +class BatchExporter { +public: + typedef TraceServiceClient::Request Request; + typedef TraceServiceClient::Response Response; + + struct SpanInfo { + StrView name; + TraceContext trace; + opentelemetry::trace::SpanId parent; + uint64_t start; + uint64_t end; + }; + + class Span { + public: + Span(const Span&) = delete; + Span& operator=(const Span&) = delete; + + Span(const SpanInfo& info, opentelemetry::proto::trace::v1::Span* span) + : span(span) + { + span->set_kind( + opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER); + + // Short setters, like set_name(), use additional std::string as an + // intermediary at least up to v21.5 of protobuf. + set(span->mutable_name(), info.name); + + set(span->mutable_trace_id(), info.trace.traceId.Id()); + set(span->mutable_span_id(), info.trace.spanId.Id()); + set(span->mutable_trace_state(), info.trace.state); + + if (info.parent.IsValid()) { + set(span->mutable_parent_span_id(), info.parent.Id()); + } else { + span->mutable_parent_span_id()->clear(); + } + + span->set_start_time_unix_nano(info.start); + span->set_end_time_unix_nano(info.end); + + span->mutable_status()->clear_code(); + } + + ~Span() + { + truncate(span->mutable_attributes(), attrSize); + } + + void add(StrView key, StrView value) + { + add(key)->mutable_value()->mutable_string_value()->assign( + value.data(), value.size()); + } + + void add(StrView key, int value) + { + add(key)->mutable_value()->set_int_value(value); + } + + void setError() + { + span->mutable_status()->set_code( + opentelemetry::proto::trace::v1::Status::STATUS_CODE_ERROR); + } + + private: + template + static void set(std::string* str, const ByteRange& range) + { + str->assign((const char*)range.data(), range.size()); + } + + opentelemetry::proto::common::v1::KeyValue* add(StrView key) + { + auto attrs = span->mutable_attributes(); + + auto newAttr = attrs->size() > attrSize ? + attrs->Mutable(attrSize) : attrs->Add(); + + newAttr->mutable_key()->assign(key.data(), key.size()); + + ++attrSize; + + return newAttr; + } + + opentelemetry::proto::trace::v1::Span* span; + int attrSize{0}; + }; + + BatchExporter(StrView target, + size_t batchSize, size_t batchCount, StrView serviceName) : + batchSize(batchSize), client(std::string(target)) + { + free.reserve(batchCount); + while (batchCount-- > 0) { + free.emplace_back(); + auto resourceSpans = free.back().add_resource_spans(); + + auto attr = resourceSpans->mutable_resource()->add_attributes(); + attr->set_key("service.name"); + attr->mutable_value()->set_string_value(std::string(serviceName)); + + auto scopeSpans = resourceSpans->add_scope_spans(); + scopeSpans->mutable_scope()->set_name("nginx"); + scopeSpans->mutable_scope()->set_version(NGINX_VERSION); + + scopeSpans->mutable_spans()->Reserve(batchSize); + } + + worker = std::thread(&TraceServiceClient::run, &client); + } + + ~BatchExporter() + { + client.stop(); + worker.join(); + } + + template + bool add(const SpanInfo& info, F fillSpan) + { + if (currentSize == (int)batchSize) { + sendBatch(current); + currentSize = -1; + } + + if (currentSize == -1) { + std::unique_lock lock(mutex); + if (free.empty()) { + return false; + } + current = std::move(free.back()); + free.pop_back(); + currentSize = 0; + } + + auto spans = getSpans(current); + + Span span(info, spans->size() > currentSize ? + spans->Mutable(currentSize) : spans->Add()); + + fillSpan(span); + + ++currentSize; + + return true; + } + + void flush() + { + if (currentSize <= 0) { + return; + } + + truncate(getSpans(current), currentSize); + + sendBatch(current); + currentSize = -1; + } + +private: + const size_t batchSize; + + TraceServiceClient client; + + std::mutex mutex; + std::vector free; + + Request current; + int currentSize{-1}; + + std::thread worker; + + static auto getSpans(Request& req) -> decltype( + req.mutable_resource_spans(0)->mutable_scope_spans(0)->mutable_spans()) + { + return req.mutable_resource_spans(0)->mutable_scope_spans(0)-> + mutable_spans(); + } + + template + static void truncate(T* items, int newSize) + { + // unlike DeleteSubrange(), this doesn't destruct removed items + int tailSize = items->size() - newSize; + while (tailSize-- > 0) { + items->RemoveLast(); + } + } + + void sendBatch(Request& request) + { + client.send(request, + [this](Request req, Response, grpc::Status status) { + std::unique_lock lock(mutex); + free.push_back(std::move(req)); + lock.unlock(); + + if (!status.ok()) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "OTel export failure: %s", + status.error_message().c_str()); + } + }); + } +}; diff --git a/src/http_module.cpp b/src/http_module.cpp new file mode 100644 index 0000000..f4f5980 --- /dev/null +++ b/src/http_module.cpp @@ -0,0 +1,452 @@ +extern "C" { +#include +#include +#include +} + +#include "str_view.hpp" +#include "trace_context.hpp" +#include "batch_exporter.hpp" + +extern ngx_module_t gHttpModule; + +namespace { + +struct MainConf { + ngx_str_t endpoint; + ngx_msec_t interval; + size_t batchSize; + size_t batchCount; + + ngx_str_t serviceName; +}; + +struct LocationConf { + ngx_http_complex_value_t* trace; +}; + +char* setExporter(ngx_conf_t* cf, ngx_command_t* cmd, void* conf); + +ngx_command_t gCommands[] = { + + { ngx_string("otel_exporter"), + NGX_HTTP_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS, + setExporter, + NGX_HTTP_MAIN_CONF_OFFSET }, + + { ngx_string("otel_service_name"), + NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_HTTP_MAIN_CONF_OFFSET, + offsetof(MainConf, serviceName) }, + + { ngx_string("otel_trace"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_http_set_complex_value_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(LocationConf, trace) }, + + ngx_null_command +}; + +ngx_command_t gExporterCommands[] = { + + { ngx_string("endpoint"), + NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + 0, + offsetof(MainConf, endpoint) }, + + { ngx_string("interval"), + NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + 0, + offsetof(MainConf, interval) }, + + { ngx_string("batch_size"), + NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + 0, + offsetof(MainConf, batchSize) }, + + { ngx_string("batch_count"), + NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + 0, + offsetof(MainConf, batchCount) }, + + ngx_null_command +}; + +std::unique_ptr gExporter; + +StrView toStrView(ngx_str_t str) +{ + return StrView((char*)str.data, str.len); +} + +ngx_int_t onRequestStart(ngx_http_request_t* r) +{ + // don't let internal redirects to override sampling decision + if (r->internal) { + return NGX_DECLINED; + } + + bool sampled = false; + + auto lcf = (LocationConf*)ngx_http_get_module_loc_conf(r, gHttpModule); + if (lcf->trace != NULL) { + ngx_str_t trace; + if (ngx_http_complex_value(r, lcf->trace, &trace) != NGX_OK) { + return NGX_ERROR; + } + + sampled = toStrView(trace) == "on"; + } + + if (sampled) { + ngx_http_set_ctx(r, &gHttpModule, gHttpModule); + } + + return NGX_DECLINED; +} + +StrView getServerName(ngx_http_request_t* r) +{ + auto cscf = (ngx_http_core_srv_conf_t*) + ngx_http_get_module_srv_conf(r, ngx_http_core_module); + + auto name = cscf->server_name; + if (name.len == 0) { + name = r->headers_in.server; + } + + return toStrView(name); +} + +void addDefaultAttrs(BatchExporter::Span& span, ngx_http_request_t* r) +{ + // based on trace semantic conventions for HTTP from 1.16.0 OTel spec + + span.add("http.method", toStrView(r->method_name)); + + span.add("http.target", toStrView(r->unparsed_uri)); + + auto clcf = (ngx_http_core_loc_conf_t*) + ngx_http_get_module_loc_conf(r, ngx_http_core_module); + if (clcf->name.len) { + span.add("http.route", toStrView(clcf->name)); + } + + span.add("http.scheme", r->connection->ssl ? "https" : "http"); + + auto protocol = toStrView(r->http_protocol); + if (protocol.size() > 5) { // "HTTP/" + span.add("http.flavor", protocol.substr(5)); + } + + if (r->headers_in.user_agent) { + span.add("http.user_agent", toStrView(r->headers_in.user_agent->value)); + } + + auto received = r->headers_in.content_length_n; + span.add("http.request_content_length", received > 0 ? received : 0); + + auto sent = r->connection->sent - (off_t)r->header_size; + span.add("http.response_content_length", sent > 0 ? sent : 0); + + auto status = r->err_status ? r->err_status : r->headers_out.status; + if (status) { + span.add("http.status_code", status); + + if (status >= 500) { + span.setError(); + } + } + + span.add("net.host.name", getServerName(r)); + + if (ngx_connection_local_sockaddr(r->connection, NULL, 0) == NGX_OK) { + auto port = ngx_inet_get_port(r->connection->local_sockaddr); + auto defaultPort = r->connection->ssl ? 443 : 80; + + if (port != defaultPort) { + span.add("net.host.port", port); + } + } + + span.add("net.sock.peer.addr", toStrView(r->connection->addr_text)); + span.add("net.sock.peer.port", ngx_inet_get_port(r->connection->sockaddr)); +} + +ngx_int_t onRequestEnd(ngx_http_request_t* r) +{ + if (!ngx_http_get_module_ctx(r, gHttpModule)) { + return NGX_DECLINED; + } + + auto clcf = (ngx_http_core_loc_conf_t*)ngx_http_get_module_loc_conf( + r, ngx_http_core_module); + + auto now = ngx_timeofday(); + + auto toNanoSec = [](time_t sec, ngx_msec_t msec) -> uint64_t { + return (sec * 1000 + msec) * 1000000; + }; + + try { + BatchExporter::SpanInfo info{ + toStrView(clcf->name), TraceContext::generate(true), {}, + toNanoSec(r->start_sec, r->start_msec), + toNanoSec(now->sec, now->msec)}; + + bool ok = gExporter->add(info, [r](BatchExporter::Span& span) { + addDefaultAttrs(span, r); + }); + + if (!ok) { + static size_t dropped = 0; + static time_t lastLog = 0; + ++dropped; + if (lastLog != ngx_time()) { + lastLog = ngx_time(); + ngx_log_error(NGX_LOG_NOTICE, r->connection->log, 0, + "OTel dropped records: %uz", dropped); + } + } + + } catch (const std::exception& e) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "OTel failed to add span: %s", e.what()); + return NGX_ERROR; + } + + return NGX_DECLINED; +} + +ngx_int_t initModule(ngx_conf_t* cf) +{ + auto cmcf = (ngx_http_core_main_conf_t*)ngx_http_conf_get_module_main_conf( + cf, ngx_http_core_module); + + auto h = (ngx_http_handler_pt*)ngx_array_push( + &cmcf->phases[NGX_HTTP_REWRITE_PHASE].handlers); + if (h == NULL) { + return NGX_ERROR; + } + + *h = onRequestStart; + + h = (ngx_http_handler_pt*)ngx_array_push( + &cmcf->phases[NGX_HTTP_LOG_PHASE].handlers); + if (h == NULL) { + return NGX_ERROR; + } + + *h = onRequestEnd; + + return NGX_OK; +} + +ngx_int_t initWorkerProcess(ngx_cycle_t* cycle) +{ + auto mcf = (MainConf*)ngx_http_cycle_get_module_main_conf( + cycle, gHttpModule); + + try { + gExporter.reset(new BatchExporter( + toStrView(mcf->endpoint), + mcf->batchSize, + mcf->batchCount, + toStrView(mcf->serviceName))); + } catch (const std::exception& e) { + ngx_log_error(NGX_LOG_CRIT, cycle->log, 0, + "OTel worker init error: %s", e.what()); + return NGX_ERROR; + } + + static ngx_connection_t dummy; + static ngx_event_t flushEvent; + + flushEvent.data = &dummy; + flushEvent.log = cycle->log; + flushEvent.cancelable = 1; + flushEvent.handler = [](ngx_event_t* ev) { + try { + gExporter->flush(); + } catch (const std::exception& e) { + ngx_log_error(NGX_LOG_CRIT, ev->log, 0, + "OTel flush error: %s", e.what()); + } + + auto mcf = (MainConf*)ngx_http_cycle_get_module_main_conf( + ngx_cycle, gHttpModule); + + ngx_add_timer(ev, mcf->interval); + }; + + ngx_add_timer(&flushEvent, mcf->interval); + + return NGX_OK; +} + +void exitWorkerProcess(ngx_cycle_t* cycle) +{ + try { + gExporter->flush(); + } catch (const std::exception& e) { + ngx_log_error(NGX_LOG_CRIT, cycle->log, 0, + "OTel flush error: %s", e.what()); + } + + gExporter.reset(); +} + +char* setExporter(ngx_conf_t* cf, ngx_command_t* cmd, void* conf) +{ + auto mcf = (MainConf*)conf; + + if (mcf->endpoint.len) { + return (char*)"is duplicate"; + } + + auto cfCopy = *cf; + + cfCopy.handler = [](ngx_conf_t* cf, ngx_command_t*, void*) { + auto name = (ngx_str_t*)cf->args->elts; + + for (auto cmd = gExporterCommands; cmd->name.len; cmd++) { + + if (ngx_strcmp(name->data, cmd->name.data) != 0) { + continue; + } + + if (cf->args->nelts != 2) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid number of arguments in \"%V\" " + "directive of \"otel_exporter\"", name); + return (char*)NGX_CONF_ERROR; + } + + auto rv = cmd->set(cf, cmd, cf->handler_conf); + + if (rv == NGX_CONF_OK) { + return rv; + } + + if (rv != NGX_CONF_ERROR) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "\"%V\" directive of \"otel_exporter\" %s", name, rv); + } + + return (char*)NGX_CONF_ERROR; + } + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "unknown directive \"%V\" in \"otel_exporter\"", name); + return (char*)NGX_CONF_ERROR; + }; + + cfCopy.handler_conf = mcf; + + auto rv = ngx_conf_parse(&cfCopy, NULL); + if (rv != NGX_CONF_OK) { + return rv; + } + + if (mcf->endpoint.len == 0) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "\"otel_exporter\" requires \"endpoint\""); + return (char*)NGX_CONF_ERROR; + } + + return NGX_CONF_OK; +} + +void* createMainConf(ngx_conf_t* cf) +{ + auto mcf = (MainConf*)ngx_pcalloc(cf->pool, sizeof(MainConf)); + if (mcf == NULL) { + return NULL; + } + + mcf->interval = NGX_CONF_UNSET_MSEC; + mcf->batchSize = NGX_CONF_UNSET_SIZE; + mcf->batchCount = NGX_CONF_UNSET_SIZE; + + return mcf; +} + +char* initMainConf(ngx_conf_t* cf, void* conf) +{ + auto mcf = (MainConf*)conf; + + if (mcf->endpoint.len == 0) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "\"otel_exporter\" block is missing"); + return (char*)NGX_CONF_ERROR; + } + + ngx_conf_init_msec_value(mcf->interval, 5000); + ngx_conf_init_size_value(mcf->batchSize, 512); + ngx_conf_init_size_value(mcf->batchCount, 4); + + if (mcf->serviceName.data == NULL) { + mcf->serviceName = ngx_string("unknown_service:nginx"); + } + + return NGX_CONF_OK; +} + +void* createLocationConf(ngx_conf_t* cf) +{ + auto conf = (LocationConf*)ngx_pcalloc(cf->pool, sizeof(LocationConf)); + if (conf == NULL) { + return NULL; + } + + conf->trace = (ngx_http_complex_value_t*)NGX_CONF_UNSET_PTR; + + return conf; +} + +char* mergeLocationConf(ngx_conf_t* cf, void* parent, void* child) +{ + auto prev = (LocationConf*)parent; + auto conf = (LocationConf*)child; + + ngx_conf_merge_ptr_value(conf->trace, prev->trace, NULL); + + return NGX_CONF_OK; +} + +ngx_http_module_t gHttpModuleCtx = { + NULL, /* preconfiguration */ + initModule, /* postconfiguration */ + + createMainConf, /* create main configuration */ + initMainConf, /* init main configuration */ + + NULL, /* create server configuration */ + NULL, /* merge server configuration */ + + createLocationConf, /* create location configuration */ + mergeLocationConf /* merge location configuration */ +}; + +} + +ngx_module_t gHttpModule = { + NGX_MODULE_V1, + &gHttpModuleCtx, /* module context */ + gCommands, /* module directives */ + NGX_HTTP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + initWorkerProcess, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + exitWorkerProcess, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; diff --git a/src/modules.c b/src/modules.c new file mode 100644 index 0000000..419d594 --- /dev/null +++ b/src/modules.c @@ -0,0 +1,14 @@ +#include +#include + +extern ngx_module_t gHttpModule; + +ngx_module_t* ngx_modules[] = { + &gHttpModule, + NULL +}; + +char* ngx_module_names[] = { + "ngx_http_otel_module", + NULL +}; diff --git a/src/str_view.hpp b/src/str_view.hpp new file mode 100644 index 0000000..debdad4 --- /dev/null +++ b/src/str_view.hpp @@ -0,0 +1,5 @@ +#pragma once + +#include + +typedef opentelemetry::nostd::string_view StrView; diff --git a/src/trace_context.hpp b/src/trace_context.hpp new file mode 100644 index 0000000..dc94bdd --- /dev/null +++ b/src/trace_context.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include +#include +#include + +#include "str_view.hpp" + +struct TraceContext { + opentelemetry::trace::TraceId traceId; + opentelemetry::trace::SpanId spanId; + bool sampled; + StrView state; + + static TraceContext generate(bool sampled, TraceContext parent = {}) + { + opentelemetry::sdk::trace::RandomIdGenerator idGen; + + return {parent.traceId.IsValid() ? + parent.traceId : idGen.GenerateTraceId(), + idGen.GenerateSpanId(), + sampled, + parent.state}; + } +}; diff --git a/src/trace_service_client.hpp b/src/trace_service_client.hpp new file mode 100644 index 0000000..6871019 --- /dev/null +++ b/src/trace_service_client.hpp @@ -0,0 +1,108 @@ +#pragma once + +#include + +#include +#include +#include + +namespace otel_proto_trace = opentelemetry::proto::collector::trace::v1; + +class TraceServiceClient { +public: + typedef otel_proto_trace::ExportTraceServiceRequest Request; + typedef otel_proto_trace::ExportTraceServiceResponse Response; + typedef otel_proto_trace::TraceService TraceService; + + typedef std::function + ResponseCb; + + TraceServiceClient(const std::string& target) + { + auto channel = grpc::CreateChannel( + target, grpc::InsecureChannelCredentials()); + channel->GetState(true); // trigger 'connecting' state + + stub = TraceService::NewStub(channel); + } + + void send(Request& req, ResponseCb cb) + { + std::unique_ptr call{new ActiveCall{}}; + + call->request = std::move(req); + call->cb = std::move(cb); + + ++pending; + + // post actual RPC to worker thread to minimize load on caller + gpr_timespec past{}; + call->sendAlarm.Set(&queue, past, call.release()); + } + + void run() + { + void* tag = NULL; + bool ok = false; + + while (queue.Next(&tag, &ok)) { + assert(ok); + + if (tag == &shutdownAlarm) { + shutdown = true; + } else { + std::unique_ptr call{(ActiveCall*)tag}; + + if (!call->sent) { + --pending; + + call->responseReader = stub->AsyncExport( + &call->context, call->request, &queue); + call->sent = true; + + call->responseReader->Finish( + &call->response, &call->status, call.get()); + call.release(); + } else { + call->cb(std::move(call->request), + std::move(call->response), std::move(call->status)); + } + } + + // It's not clear if gRPC guarantees any order for expired alarms, + // so we use 'pending' counter to ensure CQ shutdown happens last. + // https://github.com/grpc/grpc/issues/31398 + if (shutdown && pending == 0) { + queue.Shutdown(); + } + } + } + + void stop() + { + gpr_timespec past{}; + shutdownAlarm.Set(&queue, past, &shutdownAlarm); + } + +private: + struct ActiveCall { + grpc::Alarm sendAlarm; + bool sent; + + grpc::ClientContext context; + Request request; + Response response; + grpc::Status status; + std::unique_ptr> + responseReader; + + ResponseCb cb; + }; + + std::unique_ptr stub; + grpc::CompletionQueue queue; + + grpc::Alarm shutdownAlarm; + std::atomic pending{0}; + bool shutdown{false}; +};