Skip to content

Commit

Permalink
add metric for flusher component
Browse files Browse the repository at this point in the history
  • Loading branch information
henryzhx8 committed Sep 14, 2024
1 parent 67a0121 commit 028cc81
Show file tree
Hide file tree
Showing 41 changed files with 779 additions and 134 deletions.
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake)
set(SUB_DIRECTORIES_LIST
application app_config checkpoint container_manager logger go_pipeline monitor profile_sender models
config config/feedbacker config/provider config/watcher
pipeline pipeline/batch pipeline/compression pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
runner runner/sink/http
protobuf/config_server/v1 protobuf/config_server/v2 protobuf/sls
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
Expand Down
2 changes: 1 addition & 1 deletion core/common/common.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ list(APPEND THIS_SOURCE_FILES_LIST ${XX_HASH_SOURCE_FILES})
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/memory/SourceBuffer.h)
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/http/AsynCurlRunner.cpp ${CMAKE_SOURCE_DIR}/common/http/Curl.cpp ${CMAKE_SOURCE_DIR}/common/http/HttpResponse.cpp)
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/timer/Timer.cpp ${CMAKE_SOURCE_DIR}/common/timer/HttpRequestTimerEvent.cpp)

list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/compression/Compressor.cpp ${CMAKE_SOURCE_DIR}/common/compression/CompressorFactory.cpp ${CMAKE_SOURCE_DIR}/common/compression/LZ4Compressor.cpp ${CMAKE_SOURCE_DIR}/common/compression/ZstdCompressor.cpp)
# remove several files in common
list(REMOVE_ITEM THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/BoostRegexValidator.cpp ${CMAKE_SOURCE_DIR}/common/GetUUID.cpp)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@

namespace logtail {

enum class CompressType { NONE, LZ4, ZSTD };
enum class CompressType {
NONE,
LZ4,
ZSTD
#ifdef APSARA_UNIT_TEST_MAIN
,
MOCK
#endif
};

} // namespace logtail
57 changes: 57 additions & 0 deletions core/common/compression/Compressor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "common/compression/Compressor.h"

#include <chrono>

using namespace std;

namespace logtail {

void Compressor::SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, std::move(labels), std::move(dynamicLabels));
mInItemsCnt = mMetricsRecordRef.CreateCounter("in_items_cnt");
mInItemSizeBytes = mMetricsRecordRef.CreateCounter("in_item_size_bytes");
mOutItemsCnt = mMetricsRecordRef.CreateCounter("out_items_cnt");
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter("out_item_size_bytes");
mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter("discarded_items_cnt");
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter("discarded_item_size_bytes");
mTotalDelayMs = mMetricsRecordRef.CreateCounter("total_delay_ms");
}

bool Compressor::DoCompress(const string& input, string& output, string& errorMsg) {
if (mMetricsRecordRef != nullptr) {
mInItemsCnt->Add(1);
mInItemSizeBytes->Add(input.size());
}

auto before = chrono::system_clock::now();
auto res = Compress(input, output, errorMsg);

if (mMetricsRecordRef != nullptr) {
mTotalDelayMs->Add(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - before).count());
if (res) {
mOutItemsCnt->Add(1);
mOutItemSizeBytes->Add(output.size());
} else {
mDiscardedItemsCnt->Add(1);
mDiscardedItemSizeBytes->Add(input.size());
}
}
return res;
}

} // namespace logtail
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

#include <string>

#include "pipeline/compression/CompressType.h"
#include "monitor/LogtailMetric.h"
#include "common/compression/CompressType.h"

namespace logtail {

Expand All @@ -27,17 +28,35 @@ class Compressor {
Compressor(CompressType type) : mType(type) {}
virtual ~Compressor() = default;

virtual bool Compress(const std::string& input, std::string& output, std::string& errorMsg) = 0;
bool DoCompress(const std::string& input, std::string& output, std::string& errorMsg);

#ifdef APSARA_UNIT_TEST_MAIN
// buffer shoudl be reserved for output before calling this function
virtual bool UnCompress(const std::string& input, std::string& output, std::string& errorMsg) = 0;
#endif

CompressType GetCompressType() const { return mType; }
void SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});

protected:
mutable MetricsRecordRef mMetricsRecordRef;
CounterPtr mInItemsCnt;
CounterPtr mInItemSizeBytes;
CounterPtr mOutItemsCnt;
CounterPtr mOutItemSizeBytes;
CounterPtr mDiscardedItemsCnt;
CounterPtr mDiscardedItemSizeBytes;
CounterPtr mTotalDelayMs;

private:
virtual bool Compress(const std::string& input, std::string& output, std::string& errorMsg) = 0;

CompressType mType = CompressType::NONE;

#ifdef APSARA_UNIT_TEST_MAIN
friend class CompressorUnittest;
friend class CompressorFactoryUnittest;
#endif
};

} // namespace logtail
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "pipeline/compression/CompressorFactory.h"
#include "common/compression/CompressorFactory.h"

#include "common/ParamExtractor.h"
#include "pipeline/compression/LZ4Compressor.h"
#include "pipeline/compression/ZstdCompressor.h"
#include "monitor/MetricConstants.h"
#include "common/compression/LZ4Compressor.h"
#include "common/compression/ZstdCompressor.h"

using namespace std;

namespace logtail {

unique_ptr<Compressor> CompressorFactory::Create(const Json::Value& config,
const PipelineContext& ctx,
const string& pluginType,
CompressType defaultType) {
const PipelineContext& ctx,
const string& pluginType,
const string& flusherId,
CompressType defaultType) {
string compressType, errorMsg;
unique_ptr<Compressor> compressor;
if (!GetOptionalStringParam(config, "CompressType", compressType, errorMsg)) {
PARAM_WARNING_DEFAULT(ctx.GetLogger(),
ctx.GetAlarm(),
Expand All @@ -37,11 +40,11 @@ unique_ptr<Compressor> CompressorFactory::Create(const Json::Value& config,
ctx.GetProjectName(),
ctx.GetLogstoreName(),
ctx.GetRegion());
return Create(defaultType);
compressor = Create(defaultType);
} else if (compressType == "lz4") {
return Create(CompressType::LZ4);
compressor = Create(CompressType::LZ4);
} else if (compressType == "zstd") {
return Create(CompressType::ZSTD);
compressor = Create(CompressType::ZSTD);
} else if (compressType == "none") {
return nullptr;
} else if (!compressType.empty()) {
Expand All @@ -54,10 +57,15 @@ unique_ptr<Compressor> CompressorFactory::Create(const Json::Value& config,
ctx.GetProjectName(),
ctx.GetLogstoreName(),
ctx.GetRegion());
return Create(defaultType);
compressor = Create(defaultType);
} else {
return Create(defaultType);
compressor = Create(defaultType);
}
compressor->SetMetricRecordRef({{METRIC_LABEL_PROJECT, ctx.GetProjectName()},
{METRIC_LABEL_CONFIG_NAME, ctx.GetConfigName()},
{METRIC_LABEL_KEY_COMPONENT_NAME, "compressor"},
{METRIC_LABEL_KEY_FLUSHER_NODE_ID, flusherId}});
return compressor;
}

unique_ptr<Compressor> CompressorFactory::Create(CompressType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
#include <memory>
#include <string>

#include "pipeline/compression/CompressType.h"
#include "pipeline/compression/Compressor.h"
#include "pipeline/PipelineContext.h"
#include "common/compression/CompressType.h"
#include "common/compression/Compressor.h"

namespace logtail {

Expand All @@ -42,13 +42,13 @@ class CompressorFactory {
std::unique_ptr<Compressor> Create(const Json::Value& config,
const PipelineContext& ctx,
const std::string& pluginType,
const std::string& flusherId,
CompressType defaultType);
std::unique_ptr<Compressor> Create(CompressType type);

private:
CompressorFactory() = default;
~CompressorFactory() = default;

std::unique_ptr<Compressor> Create(CompressType defaultType);
};

} // namespace logtail
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "pipeline/compression/LZ4Compressor.h"
#include "common/compression/LZ4Compressor.h"

#include <lz4/lz4.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@

#pragma once

#include "pipeline/compression/Compressor.h"
#include "common/compression/Compressor.h"

namespace logtail {

class LZ4Compressor : public Compressor {
public:
LZ4Compressor(CompressType type) : Compressor(type){};
LZ4Compressor(CompressType type) : Compressor(type) {};

bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override;

#ifdef APSARA_UNIT_TEST_MAIN
bool UnCompress(const std::string& input, std::string& output, std::string& errorMsg) override;
#endif

private:
bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override;
};

} // namespace logtail
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "pipeline/compression/ZstdCompressor.h"
#include "common/compression/ZstdCompressor.h"

#include <zstd/zstd.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@

#pragma once

#include "pipeline/compression/Compressor.h"
#include "common/compression/Compressor.h"

namespace logtail {

class ZstdCompressor : public Compressor {
public:
ZstdCompressor(CompressType type, int32_t level = 1) : Compressor(type), mCompressionLevel(level){};

bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override;
ZstdCompressor(CompressType type, int32_t level = 1) : Compressor(type), mCompressionLevel(level) {};

#ifdef APSARA_UNIT_TEST_MAIN
bool UnCompress(const std::string& input, std::string& output, std::string& errorMsg) override;
#endif

private:
bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override;

int32_t mCompressionLevel = 1;
};

Expand Down
16 changes: 7 additions & 9 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
#include "common/JsonUtil.h"
#include "common/LogtailCommonFlags.h"
#include "common/TimeUtil.h"
#include "pipeline/compression/CompressorFactory.h"
#include "common/compression/CompressorFactory.h"
#include "container_manager/ConfigContainerInfoUpdateCmd.h"
#include "file_server/ConfigManager.h"
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/LogtailAlarm.h"
#include "pipeline/PipelineManager.h"
#include "profile_sender/ProfileSender.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "profile_sender/ProfileSender.h"

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect,
Expand All @@ -53,16 +53,13 @@ LogtailPlugin::LogtailPlugin() {
mPluginValid = false;
mPluginAlarmConfig.mLogstore = "logtail_alarm";
mPluginAlarmConfig.mAliuid = STRING_FLAG(logtail_profile_aliuid);
mPluginAlarmConfig.mCompressor
= CompressorFactory::GetInstance()->Create(Json::Value(), PipelineContext(), "flusher_sls", CompressType::ZSTD);
mPluginAlarmConfig.mCompressor = CompressorFactory::GetInstance()->Create(CompressType::ZSTD);
mPluginProfileConfig.mLogstore = "shennong_log_profile";
mPluginProfileConfig.mAliuid = STRING_FLAG(logtail_profile_aliuid);
mPluginProfileConfig.mCompressor
= CompressorFactory::GetInstance()->Create(Json::Value(), PipelineContext(), "flusher_sls", CompressType::ZSTD);
mPluginProfileConfig.mCompressor = CompressorFactory::GetInstance()->Create(CompressType::ZSTD);
mPluginContainerConfig.mLogstore = "logtail_containers";
mPluginContainerConfig.mAliuid = STRING_FLAG(logtail_profile_aliuid);
mPluginContainerConfig.mCompressor
= CompressorFactory::GetInstance()->Create(Json::Value(), PipelineContext(), "flusher_sls", CompressType::ZSTD);
mPluginContainerConfig.mCompressor = CompressorFactory::GetInstance()->Create(CompressType::ZSTD);

mPluginCfg["LogtailSysConfDir"] = AppConfig::GetInstance()->GetLogtailSysConfDir();
mPluginCfg["HostIP"] = LogFileProfiler::mIpAddr;
Expand Down Expand Up @@ -470,7 +467,8 @@ void LogtailPlugin::ProcessLogGroup(const std::string& configName,
}
}

void LogtailPlugin::GetGoMetrics(std::vector<std::map<std::string, std::string>>& metircsList, const string& metricType) {
void LogtailPlugin::GetGoMetrics(std::vector<std::map<std::string, std::string>>& metircsList,
const string& metricType) {
if (mGetGoMetricsFun != nullptr) {
GoString type;
type.n = metricType.size();
Expand Down
2 changes: 0 additions & 2 deletions core/monitor/MetricConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ const std::string METRIC_PROC_PARSE_STDOUT_TOTAL = "proc_parse_stdout_total";
const std::string METRIC_PROC_PARSE_STDERR_TOTAL = "proc_parse_stderr_total";

// flusher common metrics
const std::string METRIC_FLUSHER_IN_RECORDS_TOTAL = "flusher_in_records_total";
const std::string METRIC_FLUSHER_IN_RECORDS_SIZE_BYTES = "flusher_in_records_size_bytes";
const std::string METRIC_FLUSHER_ERROR_TOTAL = "flusher_error_total";
const std::string METRIC_FLUSHER_DISCARD_RECORDS_TOTAL = "flusher_discard_records_total";
const std::string METRIC_FLUSHER_SUCCESS_RECORDS_TOTAL = "flusher_success_records_total";
Expand Down
2 changes: 0 additions & 2 deletions core/monitor/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ extern const std::string METRIC_PROC_PARSE_STDOUT_TOTAL;
extern const std::string METRIC_PROC_PARSE_STDERR_TOTAL;

// flusher common metrics
extern const std::string METRIC_FLUSHER_IN_RECORDS_TOTAL;
extern const std::string METRIC_FLUSHER_IN_RECORDS_SIZE_BYTES;
extern const std::string METRIC_FLUSHER_ERROR_TOTAL;
extern const std::string METRIC_FLUSHER_DISCARD_RECORDS_TOTAL;
extern const std::string METRIC_FLUSHER_SUCCESS_RECORDS_TOTAL;
Expand Down
Loading

0 comments on commit 028cc81

Please sign in to comment.