Skip to content

Commit

Permalink
add metric for queue (#1756)
Browse files Browse the repository at this point in the history
  • Loading branch information
henryzhx8 committed Sep 14, 2024
1 parent 1c42ebd commit 67a0121
Show file tree
Hide file tree
Showing 46 changed files with 616 additions and 225 deletions.
2 changes: 1 addition & 1 deletion core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ void LogFileReader::initExactlyOnce(uint32_t concurrency) {
mEOOption->fbKey = QueueKeyManager::GetInstance()->GetKey(GetProject() + "-" + mEOOption->primaryCheckpointKey
+ mEOOption->rangeCheckpointPtrs[0]->data.hash_key());
ExactlyOnceQueueManager::GetInstance()->CreateOrUpdateQueue(
mEOOption->fbKey, ProcessQueueManager::sMaxPriority, mConfigName, mEOOption->rangeCheckpointPtrs);
mEOOption->fbKey, ProcessQueueManager::sMaxPriority, *mReaderConfig.second, mEOOption->rangeCheckpointPtrs);
for (auto& cpt : mEOOption->rangeCheckpointPtrs) {
cpt->fbKey = mEOOption->fbKey;
}
Expand Down
31 changes: 30 additions & 1 deletion core/monitor/LogtailMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ CounterPtr MetricsRecord::CreateCounter(const std::string& name) {
}

IntGaugePtr MetricsRecord::CreateIntGauge(const std::string& name) {
IntGaugePtr gaugePtr = std::make_shared<Gauge<uint64_t>>(name);
IntGaugePtr gaugePtr = std::make_shared<IntGauge>(name);
mIntGauges.emplace_back(gaugePtr);
return gaugePtr;
}
Expand Down Expand Up @@ -134,6 +134,21 @@ const MetricsRecord* MetricsRecordRef::operator->() const {
return mMetrics;
}

void MetricsRecordRef::AddLabels(MetricLabels&& labels) {
mMetrics->GetLabels()->insert(mMetrics->GetLabels()->end(), labels.begin(), labels.end());
}

#ifdef APSARA_UNIT_TEST_MAIN
bool MetricsRecordRef::HasLabel(const std::string& key, const std::string& value) const {
for (auto item : *(mMetrics->GetLabels())) {
if (item.first == key && item.second == value) {
return true;
}
}
return false;
}
#endif

// ReentrantMetricsRecord相关操作可以无锁,因为mCounters、mGauges只在初始化时会添加内容,后续只允许Get操作
void ReentrantMetricsRecord::Init(MetricLabels& labels, std::unordered_map<std::string, MetricType>& metricKeys) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, std::move(labels));
Expand Down Expand Up @@ -220,6 +235,20 @@ void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref,
mHead = cur;
}

void WriteMetrics::CreateMetricsRecordRef(MetricsRecordRef& ref,
MetricLabels&& labels,
DynamicMetricLabels&& dynamicLabels) {
MetricsRecord* cur = new MetricsRecord(std::make_shared<MetricLabels>(labels),
std::make_shared<DynamicMetricLabels>(dynamicLabels));
ref.SetMetricsRecord(cur);
}

void WriteMetrics::CommitMetricsRecordRef(MetricsRecordRef& ref) {
std::lock_guard<std::mutex> lock(mMutex);
ref.mMetrics->SetNext(mHead);
mHead = ref.mMetrics;
}

MetricsRecord* WriteMetrics::GetHead() {
std::lock_guard<std::mutex> lock(mMutex);
return mHead;
Expand Down
21 changes: 18 additions & 3 deletions core/monitor/LogtailMetric.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
*/

#pragma once

#include <atomic>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "LoongCollectorMetricTypes.h"
#include "common/Lock.h"
#include "monitor/LoongCollectorMetricTypes.h"
#include "protobuf/sls/sls_logs.pb.h"

namespace logtail {
Expand Down Expand Up @@ -53,6 +57,8 @@ class MetricsRecord {
};

class MetricsRecordRef {
friend class WriteMetrics;

private:
MetricsRecord* mMetrics = nullptr;

Expand All @@ -70,6 +76,11 @@ class MetricsRecordRef {
IntGaugePtr CreateIntGauge(const std::string& name);
DoubleGaugePtr CreateDoubleGauge(const std::string& name);
const MetricsRecord* operator->() const;
// this is not thread-safe, and should be only used before WriteMetrics::CommitMetricsRecordRef
void AddLabels(MetricLabels&& labels);
#ifdef APSARA_UNIT_TEST_MAIN
bool HasLabel(const std::string& key, const std::string& value) const;
#endif
};

class ReentrantMetricsRecord {
Expand Down Expand Up @@ -110,10 +121,13 @@ class WriteMetrics {
const std::string& configName,
const std::string& pluginType,
const std::string& pluginID,
const std::string& nodeID,
const std::string& nodeID,
const std::string& childNodeID,
MetricLabels& labels);
void PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});
void
PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});
void CreateMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});
void CommitMetricsRecordRef(MetricsRecordRef& ref);
MetricsRecord* DoSnapshot();


Expand Down Expand Up @@ -144,4 +158,5 @@ class ReadMetrics {
friend class ILogtailMetricUnittest;
#endif
};

} // namespace logtail
31 changes: 23 additions & 8 deletions core/monitor/LoongCollectorMetricTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
*/

#pragma once

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "common/Lock.h"
#include "protobuf/sls/sls_logs.pb.h"


namespace logtail {

enum class MetricType {
Expand All @@ -45,25 +48,37 @@ class Counter {

template <typename T>
class Gauge {
private:
std::string mName;
std::atomic<T> mVal;

public:
Gauge(const std::string& name, T val = 0) : mName(name), mVal(val) {}
~Gauge() = default;

T GetValue() const { return mVal.load(); }
const std::string& GetName() const { return mName; }
void Set(T val) { mVal.store(val); }
Gauge* Collect() { return new Gauge<T>(mName, mVal.load()); }

protected:
std::string mName;
std::atomic<T> mVal;
};

class IntGauge : public Gauge<uint64_t> {
public:
IntGauge(const std::string& name, uint64_t val = 0) : Gauge<uint64_t>(name, val) {}
~IntGauge() = default;

IntGauge* Collect() { return new IntGauge(mName, mVal.load()); }
void Add(uint64_t val) { mVal.fetch_add(val); }
void Sub(uint64_t val) { mVal.fetch_sub(val); }
};

using CounterPtr = std::shared_ptr<Counter>;
using IntGaugePtr = std::shared_ptr<Gauge<uint64_t>>;
using IntGaugePtr = std::shared_ptr<IntGauge>;
using DoubleGaugePtr = std::shared_ptr<Gauge<double>>;

using MetricLabels = std::vector<std::pair<std::string, std::string>>;
using MetricLabelsPtr = std::shared_ptr<MetricLabels>;
using DynamicMetricLabels = std::vector<std::pair<std::string, std::function<std::string()>>>;
using DynamicMetricLabelsPtr = std::shared_ptr<DynamicMetricLabels>;

} // namespace logtail
} // namespace logtail
5 changes: 5 additions & 0 deletions core/monitor/MetricConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ const std::string METRIC_LABEL_PLUGIN_ID = "plugin_id";
const std::string METRIC_LABEL_NODE_ID = "node_id";
const std::string METRIC_LABEL_CHILD_NODE_ID = "child_node_id";

const std::string METRIC_LABEL_KEY_COMPONENT_NAME = "component_name";
const std::string METRIC_LABEL_KEY_QUEUE_TYPE = "queue_type";
const std::string METRIC_LABEL_KEY_EXACTLY_ONCE_FLAG = "is_exactly_once";
const std::string METRIC_LABEL_KEY_FLUSHER_NODE_ID = "flusher_node_id";

// input file plugin labels
const std::string METRIC_LABEL_FILE_DEV = "file_dev";
const std::string METRIC_LABEL_FILE_INODE = "file_inode";
Expand Down
5 changes: 5 additions & 0 deletions core/monitor/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ extern const std::string METRIC_LABEL_FILE_DEV;
extern const std::string METRIC_LABEL_FILE_INODE;
extern const std::string METRIC_LABEL_FILE_NAME;

extern const std::string METRIC_LABEL_KEY_COMPONENT_NAME;
extern const std::string METRIC_LABEL_KEY_QUEUE_TYPE;
extern const std::string METRIC_LABEL_KEY_EXACTLY_ONCE_FLAG;
extern const std::string METRIC_LABEL_KEY_FLUSHER_NODE_ID;

// input file metrics
extern const std::string METRIC_INPUT_RECORDS_TOTAL;
extern const std::string METRIC_INPUT_RECORDS_SIZE_BYTES;
Expand Down
13 changes: 7 additions & 6 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
#include <cstdint>
#include <utility>

#include "pipeline/batch/TimeoutFlushManager.h"
#include "common/Flags.h"
#include "common/ParamExtractor.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "go_pipeline/LogtailPlugin.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "pipeline/batch/TimeoutFlushManager.h"
#include "pipeline/plugin/PluginRegistry.h"
#include "plugin/processor/ProcessorParseApsaraNative.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKeyManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "plugin/processor/ProcessorParseApsaraNative.h"

DECLARE_FLAG_INT32(default_plugin_log_queue_size);

Expand Down Expand Up @@ -288,10 +288,11 @@ bool Pipeline::Init(PipelineConfig&& config) {
? ProcessQueueManager::sMaxPriority
: mContext.GetGlobalConfig().mProcessPriority - 1;
if (isInputSupportAck) {
ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(mContext.GetProcessQueueKey(), priority);
ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(
mContext.GetProcessQueueKey(), priority, mContext);
} else {
ProcessQueueManager::GetInstance()->CreateOrUpdateCircularQueue(
mContext.GetProcessQueueKey(), priority, 1024);
mContext.GetProcessQueueKey(), priority, 1024, mContext);
}


Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/instance/FlusherInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
namespace logtail {
bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, Json::Value& optionalGoPipeline) {
mPlugin->SetContext(context);
mPlugin->SetNodeID(NodeID());
mPlugin->SetMetricsRecordRef(Name(), PluginID(), NodeID(), ChildNodeID());
if (!mPlugin->Init(config, optionalGoPipeline)) {
return false;
Expand Down
3 changes: 3 additions & 0 deletions core/pipeline/plugin/interface/Flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ class Flusher : public Plugin {
virtual SinkType GetSinkType() { return SinkType::NONE; }

QueueKey GetQueueKey() const { return mQueueKey; }
void SetNodeID(const std::string& nodeID) { mNodeID = nodeID; }
const std::string& GetNodeID() const { return mNodeID; }

protected:
void GenerateQueueKey(const std::string& target);
bool PushToQueue(std::unique_ptr<SenderQueueItem>&& item, uint32_t retryTimes = 500);
void DealSenderQueueItemAfterSend(SenderQueueItem* item, bool keep);

QueueKey mQueueKey;
std::string mNodeID;

#ifdef APSARA_UNIT_TEST_MAIN
friend class FlusherInstanceUnittest;
Expand Down
28 changes: 27 additions & 1 deletion core/pipeline/queue/BoundedProcessQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,31 @@ using namespace std;

namespace logtail {

BoundedProcessQueue::BoundedProcessQueue(
size_t cap, size_t low, size_t high, int64_t key, uint32_t priority, const PipelineContext& ctx)
: QueueInterface(key, cap, ctx),
BoundedQueueInterface(key, cap, low, high, ctx),
ProcessQueueInterface(key, cap, priority, ctx) {
if (ctx.IsExactlyOnceEnabled()) {
mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_EXACTLY_ONCE_FLAG, "true"}});
}
WriteMetrics::GetInstance()->CommitMetricsRecordRef(mMetricsRecordRef);
}

bool BoundedProcessQueue::Push(unique_ptr<ProcessQueueItem>&& item) {
if (!IsValidToPush()) {
return false;
}
item->mEnqueTime = chrono::system_clock::now();
auto size = item->mEventGroup.DataSize();
mQueue.push(std::move(item));
ChangeStateIfNeededAfterPush();

mInItemsCnt->Add(1);
mInItemDataSizeBytes->Add(size);
mQueueSize->Set(Size());
mQueueDataSizeByte->Add(size);
mValidToPushFlag->Set(IsValidToPush());
return true;
}

Expand All @@ -36,10 +55,17 @@ bool BoundedProcessQueue::Pop(unique_ptr<ProcessQueueItem>& item) {
if (ChangeStateIfNeededAfterPop()) {
GiveFeedback();
}

mOutItemsCnt->Add(1);
mTotalDelayMs->Add(
chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - item->mEnqueTime).count());
mQueueSize->Set(Size());
mQueueDataSizeByte->Sub(item->mEventGroup.DataSize());
mValidToPushFlag->Set(IsValidToPush());
return true;
}

void BoundedProcessQueue::SetUpStreamFeedbacks(std::vector<FeedbackInterface*>&& feedbacks) {
void BoundedProcessQueue::SetUpStreamFeedbacks(vector<FeedbackInterface*>&& feedbacks) {
mUpStreamFeedbacks.clear();
for (auto& item : feedbacks) {
if (item == nullptr) {
Expand Down
5 changes: 1 addition & 4 deletions core/pipeline/queue/BoundedProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ namespace logtail {
class BoundedProcessQueue : public BoundedQueueInterface<std::unique_ptr<ProcessQueueItem>>,
public ProcessQueueInterface {
public:
BoundedProcessQueue(size_t cap, size_t low, size_t high, int64_t key, uint32_t priority, const std::string& config)
: QueueInterface(key, cap),
BoundedQueueInterface(key, cap, low, high),
ProcessQueueInterface(key, cap, priority, config) {}
BoundedProcessQueue(size_t cap, size_t low, size_t high, int64_t key, uint32_t priority, const PipelineContext& ctx);

bool Push(std::unique_ptr<ProcessQueueItem>&& item) override;
bool Pop(std::unique_ptr<ProcessQueueItem>& item) override;
Expand Down
9 changes: 7 additions & 2 deletions core/pipeline/queue/BoundedQueueInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ namespace logtail {
template <typename T>
class BoundedQueueInterface : virtual public QueueInterface<T> {
public:
BoundedQueueInterface(QueueKey key, size_t cap, size_t low, size_t high)
: QueueInterface<T>(key, cap), mLowWatermark(low), mHighWatermark(high) {}
BoundedQueueInterface(QueueKey key, size_t cap, size_t low, size_t high, const PipelineContext& ctx)
: QueueInterface<T>(key, cap, ctx), mLowWatermark(low), mHighWatermark(high) {
this->mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_QUEUE_TYPE, "bounded"}});
mValidToPushFlag = this->mMetricsRecordRef.CreateIntGauge("valid_to_push");
}
virtual ~BoundedQueueInterface() = default;

BoundedQueueInterface(const BoundedQueueInterface& que) = delete;
Expand Down Expand Up @@ -57,6 +60,8 @@ class BoundedQueueInterface : virtual public QueueInterface<T> {
mValidToPush = true;
}

IntGaugePtr mValidToPushFlag;

private:
virtual void GiveFeedback() const = 0;
virtual size_t Size() const = 0;
Expand Down
9 changes: 9 additions & 0 deletions core/pipeline/queue/BoundedSenderQueueInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ namespace logtail {

FeedbackInterface* BoundedSenderQueueInterface::sFeedback = nullptr;

BoundedSenderQueueInterface::BoundedSenderQueueInterface(
size_t cap, size_t low, size_t high, QueueKey key, const string& flusherId, const PipelineContext& ctx)
: QueueInterface(key, cap, ctx), BoundedQueueInterface<std::unique_ptr<SenderQueueItem>>(key, cap, low, high, ctx) {
mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_COMPONENT_NAME, "sender_queue"}});
mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_FLUSHER_NODE_ID, flusherId}});
mExtraBufferSize = mMetricsRecordRef.CreateIntGauge("extra_buffer_size");
mExtraBufferDataSizeBytes = mMetricsRecordRef.CreateIntGauge("extra_buffer_data_size_bytes");
}

void BoundedSenderQueueInterface::SetFeedback(FeedbackInterface* feedback) {
if (feedback == nullptr) {
// should not happen
Expand Down
Loading

0 comments on commit 67a0121

Please sign in to comment.