Skip to content

Commit

Permalink
fix add raw log
Browse files Browse the repository at this point in the history
  • Loading branch information
Assassin718 committed Aug 27, 2024
1 parent a93d0fb commit b44d250
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 0 deletions.
15 changes: 15 additions & 0 deletions pkg/helper/pipeline_event_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ func CreateLogEventByArray(t time.Time, enableTimestampNano bool, columns []stri
return &logEvent, nil
}

func CreateLogEventByRawLog(log *protocol.Log) (*protocol.LogEvent, error) {
var logEvent protocol.LogEvent
logEvent.Contents = make([]*protocol.LogEvent_Content, 0, len(log.Contents))
for _, logC := range log.Contents {
cont := &protocol.LogEvent_Content{
Key: logC.Key,
Value: logC.Value,
}
logEvent.Contents = append(logEvent.Contents, cont)
}
logEvent.Time = uint32(log.Time)
logEvent.TimeNs = *log.TimeNs
return &logEvent, nil
}

func CreatePipelineEventGroup(metadata map[string]string, configTag map[string]string, logTags map[string]string, logEvents []*protocol.LogEvent) (*protocol.PipelineEventGroup, error) {
var pipelineEventGroup protocol.PipelineEventGroup
pipelineEventGroup.Metadata = metadata
Expand Down
7 changes: 7 additions & 0 deletions pluginmanager/plugin_wrapper_metric_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func (p *MetricWrapperV1) AddDataArrayWithContext(tags map[string]string,
}
if p.Config.GlobalConfig.GoInputToNativeProcessor {
logEvent, _ := helper.CreateLogEventByArray(logTime, p.Config.GlobalConfig.EnableTimestampNanosecond, columns, values)
p.inputRecordsTotal.Add(1)
p.inputRecordsSizeBytes.Add(int64(logEvent.Size()))
p.LogsCachedChan <- &pipeline.LogEventWithContext{LogEvent: logEvent, Context: tags}
return
}
Expand All @@ -131,6 +133,11 @@ func (p *MetricWrapperV1) AddDataArrayWithContext(tags map[string]string,
func (p *MetricWrapperV1) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{}) {
p.inputRecordsTotal.Add(1)
p.inputRecordsSizeBytes.Add(int64(log.Size()))
if p.Config.GlobalConfig.GoInputToNativeProcessor {
logEvent, _ := helper.CreateLogEventByRawLog(log)
p.LogsCachedChan <- &pipeline.LogEventWithContext{LogEvent: logEvent, Context: nil}
return
}
p.LogsChan <- &pipeline.LogWithContext{Log: log, Context: ctx}
}

Expand Down
7 changes: 7 additions & 0 deletions pluginmanager/plugin_wrapper_service_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (p *ServiceWrapperV1) AddDataArrayWithContext(tags map[string]string,
// need push to native processor
if p.Config.GlobalConfig.GoInputToNativeProcessor {
logEvent, _ := helper.CreateLogEventByArray(logTime, p.Config.GlobalConfig.EnableTimestampNanosecond, columns, values)
p.inputRecordsTotal.Add(1)
p.inputRecordsSizeBytes.Add(int64(logEvent.Size()))
p.LogsCachedChan <- &pipeline.LogEventWithContext{LogEvent: logEvent, Context: tags}
return
}
Expand All @@ -130,6 +132,11 @@ func (p *ServiceWrapperV1) AddDataArrayWithContext(tags map[string]string,
func (p *ServiceWrapperV1) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{}) {
p.inputRecordsTotal.Add(1)
p.inputRecordsSizeBytes.Add(int64(log.Size()))
if p.Config.GlobalConfig.GoInputToNativeProcessor {
logEvent, _ := helper.CreateLogEventByRawLog(log)
p.LogsCachedChan <- &pipeline.LogEventWithContext{LogEvent: logEvent, Context: nil}
return
}
p.LogsChan <- &pipeline.LogWithContext{Log: log, Context: ctx}
}

Expand Down

0 comments on commit b44d250

Please sign in to comment.