Skip to content

Commit

Permalink
Merge remote-tracking branch 'alibaba/main' into go-config
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 16, 2024
2 parents d8d4c5b + 67a0121 commit 96955d9
Show file tree
Hide file tree
Showing 73 changed files with 844 additions and 277 deletions.
1 change: 1 addition & 0 deletions core/common/common.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ list(APPEND THIS_SOURCE_FILES_LIST ${XX_HASH_SOURCE_FILES})
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/memory/SourceBuffer.h)
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/http/AsynCurlRunner.cpp ${CMAKE_SOURCE_DIR}/common/http/Curl.cpp ${CMAKE_SOURCE_DIR}/common/http/HttpResponse.cpp)
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/timer/Timer.cpp ${CMAKE_SOURCE_DIR}/common/timer/HttpRequestTimerEvent.cpp)

# remove several files in common
list(REMOVE_ITEM THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/BoostRegexValidator.cpp ${CMAKE_SOURCE_DIR}/common/GetUUID.cpp)

Expand Down
13 changes: 7 additions & 6 deletions core/common/http/AsynCurlRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

#include "common/http/AsynCurlRunner.h"

#include <chrono>

#include "app_config/AppConfig.h"
#include "common/StringTools.h"
#include "common/http/Curl.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"

using namespace std;

Expand Down Expand Up @@ -95,7 +96,7 @@ bool AsynCurlRunner::AddRequestToClient(unique_ptr<AsynHttpRequest>&& request) {

request->mPrivateData = headers;
curl_easy_setopt(curl, CURLOPT_PRIVATE, request.get());
request->mLastSendTime = time(nullptr);
request->mLastSendTime = std::chrono::system_clock::now();
auto res = curl_multi_add_handle(mClient, curl);
if (res != CURLM_OK) {
LOG_ERROR(sLogger,
Expand Down Expand Up @@ -182,8 +183,8 @@ void AsynCurlRunner::HandleCompletedRequests() {
curl_easy_getinfo(handler, CURLINFO_PRIVATE, &request);
LOG_DEBUG(sLogger,
("send http request completed, request address",
request)("response time", ToString(time(nullptr) - request->mLastSendTime))(
"try cnt", ToString(request->mTryCnt)));
request)("response time",ToString(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()- request->mLastSendTime).count()) + "ms")
("try cnt", ToString(request->mTryCnt)));
switch (msg->data.result) {
case CURLE_OK: {
long statusCode = 0;
Expand All @@ -194,9 +195,9 @@ void AsynCurlRunner::HandleCompletedRequests() {
}
default:
// considered as network error
if (++request->mTryCnt <= request->mMaxTryCnt) {
if (request->mTryCnt <= request->mMaxTryCnt) {
LOG_WARNING(sLogger,
("failed to send request", "retry immediately")("retryCnt", request->mTryCnt)(
("failed to send request", "retry immediately")("retryCnt", request->mTryCnt++)(
"errMsg", curl_easy_strerror(msg->data.result)));
// free first,becase mPrivateData will be reset in AddRequestToClient
if (request->mPrivateData) {
Expand Down
47 changes: 47 additions & 0 deletions core/common/http/Curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@

#include "common/http/Curl.h"

#include <cstdint>
#include <map>
#include <string>

#include "common/DNSCache.h"
#include "app_config/AppConfig.h"
#include "logger/Logger.h"
#include "common/http/HttpResponse.h"

using namespace std;

Expand Down Expand Up @@ -128,4 +135,44 @@ CURL* CreateCurlHandler(const std::string& method,

return curl;
}

bool SendHttpRequest(std::unique_ptr<HttpRequest>&& request, HttpResponse& response) {
curl_slist* headers = NULL;
CURL* curl = CreateCurlHandler(request->mMethod,
request->mHTTPSFlag,
request->mHost,
request->mPort,
request->mUrl,
request->mQueryString,
request->mHeader,
request->mBody,
response,
headers,
request->mTimeout,
AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(),
AppConfig::GetInstance()->GetBindInterface());
if (curl == NULL) {
LOG_ERROR(sLogger, ("failed to init curl handler", "failed to init curl client")("request address", request.get()));
return false;
}
bool success = false;
while (request->mTryCnt <= request->mMaxTryCnt) {
CURLcode res = curl_easy_perform(curl);
if (res == CURLE_OK) {
long http_code = 0;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
response.mStatusCode = (int32_t)http_code;
success = true;
break;
} else {
LOG_WARNING(sLogger,("failed to send request", "retry immediately")("retryCnt", request->mTryCnt++)("errMsg", curl_easy_strerror(res))("request address", request.get()));
}
}
if (headers != NULL) {
curl_slist_free_all(headers);
}
curl_easy_cleanup(curl);
return success;
}

} // namespace logtail
4 changes: 4 additions & 0 deletions core/common/http/Curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <cstdint>
#include <map>
#include <string>
#include <memory>

#include "common/http/HttpRequest.h"
#include "common/http/HttpResponse.h"

namespace logtail {
Expand All @@ -40,4 +42,6 @@ CURL* CreateCurlHandler(const std::string& method,
bool replaceHostWithIp = true,
const std::string& intf = "");

bool SendHttpRequest(std::unique_ptr<HttpRequest>&& request, HttpResponse& response);

} // namespace logtail
4 changes: 3 additions & 1 deletion core/common/http/HttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <chrono>
#include <cstdint>
#include <map>
#include <string>
Expand Down Expand Up @@ -43,7 +44,7 @@ struct HttpRequest {
uint32_t mMaxTryCnt = sDefaultMaxTryCnt;

uint32_t mTryCnt = 1;
time_t mLastSendTime = 0;
std::chrono::system_clock::time_point mLastSendTime;

HttpRequest(const std::string& method,
bool httpsFlag,
Expand Down Expand Up @@ -89,4 +90,5 @@ struct AsynHttpRequest : public HttpRequest {
virtual void OnSendDone(const HttpResponse& response) = 0;
};


} // namespace logtail
2 changes: 1 addition & 1 deletion core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ void LogFileReader::initExactlyOnce(uint32_t concurrency) {
mEOOption->fbKey = QueueKeyManager::GetInstance()->GetKey(GetProject() + "-" + mEOOption->primaryCheckpointKey
+ mEOOption->rangeCheckpointPtrs[0]->data.hash_key());
ExactlyOnceQueueManager::GetInstance()->CreateOrUpdateQueue(
mEOOption->fbKey, ProcessQueueManager::sMaxPriority, mConfigName, mEOOption->rangeCheckpointPtrs);
mEOOption->fbKey, ProcessQueueManager::sMaxPriority, *mReaderConfig.second, mEOOption->rangeCheckpointPtrs);
for (auto& cpt : mEOOption->rangeCheckpointPtrs) {
cpt->fbKey = mEOOption->fbKey;
}
Expand Down
2 changes: 1 addition & 1 deletion core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ int LogtailPlugin::IsValidToSend(long long logstoreKey) {
// therefore, we assume true here. This could be a potential problem if network is not available for profile info.
// However, since go profile pipeline will be stopped only during process exit, it should be fine.
if (logstoreKey == -1) {
return true;
return 0;
}
return SenderQueueManager::GetInstance()->IsValidToPush(logstoreKey) ? 0 : -1;
}
Expand Down
31 changes: 30 additions & 1 deletion core/monitor/LogtailMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ CounterPtr MetricsRecord::CreateCounter(const std::string& name) {
}

IntGaugePtr MetricsRecord::CreateIntGauge(const std::string& name) {
IntGaugePtr gaugePtr = std::make_shared<Gauge<uint64_t>>(name);
IntGaugePtr gaugePtr = std::make_shared<IntGauge>(name);
mIntGauges.emplace_back(gaugePtr);
return gaugePtr;
}
Expand Down Expand Up @@ -134,6 +134,21 @@ const MetricsRecord* MetricsRecordRef::operator->() const {
return mMetrics;
}

void MetricsRecordRef::AddLabels(MetricLabels&& labels) {
mMetrics->GetLabels()->insert(mMetrics->GetLabels()->end(), labels.begin(), labels.end());
}

#ifdef APSARA_UNIT_TEST_MAIN
bool MetricsRecordRef::HasLabel(const std::string& key, const std::string& value) const {
for (auto item : *(mMetrics->GetLabels())) {
if (item.first == key && item.second == value) {
return true;
}
}
return false;
}
#endif

// ReentrantMetricsRecord相关操作可以无锁,因为mCounters、mGauges只在初始化时会添加内容,后续只允许Get操作
void ReentrantMetricsRecord::Init(MetricLabels& labels, std::unordered_map<std::string, MetricType>& metricKeys) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, std::move(labels));
Expand Down Expand Up @@ -220,6 +235,20 @@ void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref,
mHead = cur;
}

void WriteMetrics::CreateMetricsRecordRef(MetricsRecordRef& ref,
MetricLabels&& labels,
DynamicMetricLabels&& dynamicLabels) {
MetricsRecord* cur = new MetricsRecord(std::make_shared<MetricLabels>(labels),
std::make_shared<DynamicMetricLabels>(dynamicLabels));
ref.SetMetricsRecord(cur);
}

void WriteMetrics::CommitMetricsRecordRef(MetricsRecordRef& ref) {
std::lock_guard<std::mutex> lock(mMutex);
ref.mMetrics->SetNext(mHead);
mHead = ref.mMetrics;
}

MetricsRecord* WriteMetrics::GetHead() {
std::lock_guard<std::mutex> lock(mMutex);
return mHead;
Expand Down
21 changes: 18 additions & 3 deletions core/monitor/LogtailMetric.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
*/

#pragma once

#include <atomic>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "LoongCollectorMetricTypes.h"
#include "common/Lock.h"
#include "monitor/LoongCollectorMetricTypes.h"
#include "protobuf/sls/sls_logs.pb.h"

namespace logtail {
Expand Down Expand Up @@ -53,6 +57,8 @@ class MetricsRecord {
};

class MetricsRecordRef {
friend class WriteMetrics;

private:
MetricsRecord* mMetrics = nullptr;

Expand All @@ -70,6 +76,11 @@ class MetricsRecordRef {
IntGaugePtr CreateIntGauge(const std::string& name);
DoubleGaugePtr CreateDoubleGauge(const std::string& name);
const MetricsRecord* operator->() const;
// this is not thread-safe, and should be only used before WriteMetrics::CommitMetricsRecordRef
void AddLabels(MetricLabels&& labels);
#ifdef APSARA_UNIT_TEST_MAIN
bool HasLabel(const std::string& key, const std::string& value) const;
#endif
};

class ReentrantMetricsRecord {
Expand Down Expand Up @@ -110,10 +121,13 @@ class WriteMetrics {
const std::string& configName,
const std::string& pluginType,
const std::string& pluginID,
const std::string& nodeID,
const std::string& nodeID,
const std::string& childNodeID,
MetricLabels& labels);
void PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});
void
PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});
void CreateMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});
void CommitMetricsRecordRef(MetricsRecordRef& ref);
MetricsRecord* DoSnapshot();


Expand Down Expand Up @@ -144,4 +158,5 @@ class ReadMetrics {
friend class ILogtailMetricUnittest;
#endif
};

} // namespace logtail
31 changes: 23 additions & 8 deletions core/monitor/LoongCollectorMetricTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
*/

#pragma once

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "common/Lock.h"
#include "protobuf/sls/sls_logs.pb.h"


namespace logtail {

enum class MetricType {
Expand All @@ -45,25 +48,37 @@ class Counter {

template <typename T>
class Gauge {
private:
std::string mName;
std::atomic<T> mVal;

public:
Gauge(const std::string& name, T val = 0) : mName(name), mVal(val) {}
~Gauge() = default;

T GetValue() const { return mVal.load(); }
const std::string& GetName() const { return mName; }
void Set(T val) { mVal.store(val); }
Gauge* Collect() { return new Gauge<T>(mName, mVal.load()); }

protected:
std::string mName;
std::atomic<T> mVal;
};

class IntGauge : public Gauge<uint64_t> {
public:
IntGauge(const std::string& name, uint64_t val = 0) : Gauge<uint64_t>(name, val) {}
~IntGauge() = default;

IntGauge* Collect() { return new IntGauge(mName, mVal.load()); }
void Add(uint64_t val) { mVal.fetch_add(val); }
void Sub(uint64_t val) { mVal.fetch_sub(val); }
};

using CounterPtr = std::shared_ptr<Counter>;
using IntGaugePtr = std::shared_ptr<Gauge<uint64_t>>;
using IntGaugePtr = std::shared_ptr<IntGauge>;
using DoubleGaugePtr = std::shared_ptr<Gauge<double>>;

using MetricLabels = std::vector<std::pair<std::string, std::string>>;
using MetricLabelsPtr = std::shared_ptr<MetricLabels>;
using DynamicMetricLabels = std::vector<std::pair<std::string, std::function<std::string()>>>;
using DynamicMetricLabelsPtr = std::shared_ptr<DynamicMetricLabels>;

} // namespace logtail
} // namespace logtail
5 changes: 5 additions & 0 deletions core/monitor/MetricConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ const std::string METRIC_LABEL_PLUGIN_ID = "plugin_id";
const std::string METRIC_LABEL_NODE_ID = "node_id";
const std::string METRIC_LABEL_CHILD_NODE_ID = "child_node_id";

const std::string METRIC_LABEL_KEY_COMPONENT_NAME = "component_name";
const std::string METRIC_LABEL_KEY_QUEUE_TYPE = "queue_type";
const std::string METRIC_LABEL_KEY_EXACTLY_ONCE_FLAG = "is_exactly_once";
const std::string METRIC_LABEL_KEY_FLUSHER_NODE_ID = "flusher_node_id";

// input file plugin labels
const std::string METRIC_LABEL_FILE_DEV = "file_dev";
const std::string METRIC_LABEL_FILE_INODE = "file_inode";
Expand Down
5 changes: 5 additions & 0 deletions core/monitor/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ extern const std::string METRIC_LABEL_FILE_DEV;
extern const std::string METRIC_LABEL_FILE_INODE;
extern const std::string METRIC_LABEL_FILE_NAME;

extern const std::string METRIC_LABEL_KEY_COMPONENT_NAME;
extern const std::string METRIC_LABEL_KEY_QUEUE_TYPE;
extern const std::string METRIC_LABEL_KEY_EXACTLY_ONCE_FLAG;
extern const std::string METRIC_LABEL_KEY_FLUSHER_NODE_ID;

// input file metrics
extern const std::string METRIC_INPUT_RECORDS_TOTAL;
extern const std::string METRIC_INPUT_RECORDS_SIZE_BYTES;
Expand Down
5 changes: 3 additions & 2 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,11 @@ bool Pipeline::Init(PipelineConfig&& config) {
? ProcessQueueManager::sMaxPriority
: mContext.GetGlobalConfig().mProcessPriority - 1;
if (isInputSupportAck) {
ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(mContext.GetProcessQueueKey(), priority);
ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(
mContext.GetProcessQueueKey(), priority, mContext);
} else {
ProcessQueueManager::GetInstance()->CreateOrUpdateCircularQueue(
mContext.GetProcessQueueKey(), priority, 1024);
mContext.GetProcessQueueKey(), priority, 1024, mContext);
}


Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,12 @@ void PipelineManager::StopAllPipelines() {
StreamLogManager::GetInstance()->Shutdown();
}
#endif
PrometheusInputRunner::GetInstance()->Stop();
#if defined(__linux__) && !defined(__ANDROID__)
ObserverManager::GetInstance()->HoldOn(true);
ebpf::eBPFServer::GetInstance()->Stop();
#endif
FileServer::GetInstance()->Stop();
PrometheusInputRunner::GetInstance()->Stop();

LogtailPlugin::GetInstance()->StopAll(true, true);

Expand Down
Loading

0 comments on commit 96955d9

Please sign in to comment.