Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Go input to Cpp pipeline #1715

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

Assassin718
Copy link

@Assassin718 Assassin718 commented Aug 26, 2024

已实现

  • v1接口LogEvent的转发
  • v2接口LogEvent、MetricEvent转发
  • Go InputWrapper对多个LogEvent缓存后打包成PipelineEventGroup发送给Cpp processor
  • Go input不同input mode (push/pull)识别

待实现

  • SpanEvent转发
  • 支持对Go侧缓存参数的设置

}
}
break;
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_METRIC:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个PR会把Metric也处理吗,还是只是给出定义,后续补齐?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metric和Trace只要能转成PipelineEventGroup,后续都可以用现有流水线处理了

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

转换也在这个PR里补齐吧

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cpp这边的metric event暂时只支持untyped single value,go的multivalue以及typed value都无法转换

core/config/PipelineConfig.cpp Outdated Show resolved Hide resolved
}
}
break;
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_METRIC:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metric和Trace只要能转成PipelineEventGroup,后续都可以用现有流水线处理了

core/go_pipeline/LogtailPlugin.cpp Outdated Show resolved Hide resolved
core/go_pipeline/LogtailPlugin.cpp Outdated Show resolved Hide resolved
}
}
break;
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_METRIC:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

转换也在这个PR里补齐吧

pluginmanager/plugin_wrapper_metric_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_metric_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_metric_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_metric_v1.go Show resolved Hide resolved
pluginmanager/plugin_wrapper_service_v1.go Outdated Show resolved Hide resolved
@Abingcbc Abingcbc changed the title [WIP] feat: add Go input to Cpp processor feat: add Go input to Cpp processor Sep 2, 2024
@Abingcbc Abingcbc changed the title feat: add Go input to Cpp processor feat: support Go input to Cpp pipeline Sep 2, 2024
@@ -0,0 +1,85 @@
syntax = "proto2";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这块有测试pb 序列化的开销和反序列化开销吗?另外有没有对比一些其他协议的性能测试,比如flatbuffer,arrow。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个之前有测试过了,pb序列化和大小都是比较有优势的,但是反序列稍微慢些

pkg/pipeline/input.go Outdated Show resolved Hide resolved
pkg/pipeline/input.go Outdated Show resolved Hide resolved
state: input,
interval: wrapper.Interval * time.Millisecond,
state: &wrapper,
interval: wrapper.Interval,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么少了time.Millisecond

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里wrapper.Interval本来就是乘过time.Millisecond的,这里再乘一个Millisecond运行过程中不对,应该是之前写错了

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个确定是写错了吗,我们应该有业务用MetricInputV2 的,好像没反馈 interval 不对呀?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里wrapper.Interval是由metricWrapperV2.Init初始化的,这个Init函数输入参数有一个int类型的inputInterval,在函数中对inputInterval乘time.Millisecond赋给了wrapper.Interval

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的,这个好像是近期引入的 bug,我们内部会延后一点时间和社区 main 同步,所以上一个版本还没问题

pluginmanager/plugin_runner_v2.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_metric_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_metric_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_service_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_service_v1.go Outdated Show resolved Hide resolved
@@ -59,6 +59,11 @@ struct containerMeta{
char** envsVal;
};

struct loadGoPipelineResp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func GetPipelineMetrics() *C.PluginMetrics也要调整吗?改成 pb协议到core。

@@ -51,11 +52,11 @@ class Pipeline {
PipelineContext& GetContext() const { return mContext; }
const Json::Value& GetConfig() const { return *mConfig; }
const std::vector<std::unique_ptr<FlusherInstance>>& GetFlushers() const { return mFlushers; }
bool IsFlushingThroughGoPipeline() const { return !mGoPipelineWithoutInput.isNull(); }
bool IsFlushingThroughGoPipeline() const;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是因为当流水线配置为go input -> cpp processor -> go processor/aggregator/flusher时,在pipeline::Init()中将go的配置全部添加到了mGoPipelineWithInput,此时mGoPipelineWithoutInput为空,因此这里会返回false,导致cpp无法向go发送数据

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants