Skip to content

Commit

Permalink
feat(tool): restore plugin test server
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Sep 20, 2024
1 parent ca32f2c commit c3681cd
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 11 deletions.
2 changes: 1 addition & 1 deletion etc/kuiper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ portable:
# or other circumstance where the python executable cannot be successfully invoked through the default command.
pythonBin: python
# control init timeout. If the init time is longer than this value, the plugin will be terminated.
initTimeout: 5s
initTimeout: 60s

openTelemetry:
serviceName: kuiperd-service
Expand Down
18 changes: 18 additions & 0 deletions internal/plugin/portable/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ func InitManager() (*Manager, error) {
return m, nil
}

func MockManager(plugins map[string]*PluginInfo) (*Manager, error) {
reg := &registry{
RWMutex: sync.RWMutex{},
plugins: make(map[string]*PluginInfo),
sources: make(map[string]string),
sinks: make(map[string]string),
functions: make(map[string]string),
}
for name, pi := range plugins {
err := pi.Validate(name)
if err != nil {
return nil, err
}
reg.Set(name, pi)

Check warning on line 124 in internal/plugin/portable/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/plugin/portable/manager.go#L111-L124

Added lines #L111 - L124 were not covered by tests
}
return &Manager{reg: reg}, nil

Check warning on line 126 in internal/plugin/portable/manager.go

View check run for this annotation

Codecov / codecov/patch

internal/plugin/portable/manager.go#L126

Added line #L126 was not covered by tests
}

func (m *Manager) syncRegistry() (err error) {
defer func() {
failpoint.Inject("syncRegistryErr", func() {
Expand Down
2 changes: 1 addition & 1 deletion internal/plugin/portable/runtime/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (r *NanomsgReqChannel) Handshake() error {
if err != nil {
return err
}
err = r.sock.SetOption(mangos.OptionRecvDeadline, time.Duration(conf.Config.Portable.InitTimeout)*time.Millisecond)
err = r.sock.SetOption(mangos.OptionRecvDeadline, time.Duration(conf.Config.Portable.InitTimeout))
if err != nil {
return err
}
Expand Down
9 changes: 1 addition & 8 deletions internal/plugin/portable/runtime/plugin_ins_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewPluginInsForTest(name string, ctrlChan ControlChannel) *PluginIns {
InstanceId: 0,
}] = []byte{}
return &PluginIns{
process: nil,
process: &os.Process{},

Check warning on line 75 in internal/plugin/portable/runtime/plugin_ins_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/plugin/portable/runtime/plugin_ins_manager.go#L75

Added line #L75 was not covered by tests
ctrlChan: ctrlChan,
name: name,
commands: commands,
Expand Down Expand Up @@ -197,13 +197,6 @@ func GetPluginInsManager() *pluginInsManager {
return pm
}

func GetPluginInsManager4Test() *pluginInsManager {
testPM := &pluginInsManager{
instances: make(map[string]*PluginIns),
}
return testPM
}

func (p *pluginInsManager) GetPluginInsStatus(name string) (*PluginStatus, bool) {
ins, ok := p.getPluginIns(name)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion test/start_kuiper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export BUILD_ID=dontKillMe
export KUIPER__BASIC__PROMETHEUS="true"
export KUIPER__BASIC__PROMETHEUSPORT=9081
export KUIPER__BASIC__RESTPORT=9081
export KUIPER__PORTABLE__INITTIMEOUT=50000
export KUIPER__PORTABLE__INITTIMEOUT="5m"

cd $base_dir/
touch log/kuiper.out
Expand Down
281 changes: 281 additions & 0 deletions tools/plugin_server/plugin_test_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
context2 "context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/plugin/portable"
"github.com/lf-edge/ekuiper/v2/internal/plugin/portable/runtime"
"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/internal/topo/state"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
)

// Only support to test a single plugin Testing process.
// 0. Edit the testingPlugin variable to match your plugin meta.
// 1. Start this server, and wait for handshake.
// 2. Start or debug your plugin. Make sure the handshake completed.
// 3. Issue startSymbol/stopSymbol REST API to debug your plugin symbol.

// EDIT HERE: Define the plugins that you want to test.
var testingPlugin = &portable.PluginInfo{
PluginMeta: runtime.PluginMeta{
Name: "pysam",
Version: "v1",
Language: "python",
Executable: "pysam.py",
},
Sources: []string{"pyjson"},
Sinks: []string{"print"},
Functions: []string{"revert"},
}

var mockSinkData = []map[string]interface{}{
{
"name": "hello",
"count": 5,
}, {
"name": "world",
"count": 10,
},
}

var mockFuncData = [][]interface{}{
{"twelve"},
{"eleven"},
}

var (
m *portable.Manager
ctx api.StreamContext
cancels sync.Map
)

func main() {
var err error
m, err = portable.MockManager(map[string]*portable.PluginInfo{testingPlugin.Name: testingPlugin})
if err != nil {
panic(err)

Check warning on line 82 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L78-L82

Added lines #L78 - L82 were not covered by tests
}
ins, err := startPluginIns(testingPlugin)
if err != nil {
panic(err)

Check warning on line 86 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L84-L86

Added lines #L84 - L86 were not covered by tests
}
defer ins.Stop()
runtime.GetPluginInsManager().AddPluginIns(testingPlugin.Name, ins)
c := context.WithValue(context.Background(), context.LoggerKey, conf.Log)
ctx = c.WithMeta("rule1", "op1", &state.MemoryStore{}).WithInstance(1)
server := createRestServer("127.0.0.1", 33333)
server.ListenAndServe()

Check warning on line 93 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L88-L93

Added lines #L88 - L93 were not covered by tests
}

func startPluginIns(info *portable.PluginInfo) (*runtime.PluginIns, error) {
conf.Log.Infof("create control channel")
ctrlChan, err := runtime.CreateControlChannel(info.Name)
if err != nil {
return nil, fmt.Errorf("can't create new control channel: %s", err.Error())
}
conf.Log.Println("waiting handshake")
if conf.Config == nil {
conf.Config = &conf.KuiperConf{}
}
conf.Config.Portable.InitTimeout = cast.DurationConf(5 * time.Minute)
err = ctrlChan.Handshake()
if err != nil {
return nil, fmt.Errorf("plugin %s control handshake error: %v", info.Name, err)
}
conf.Log.Println("plugin start running")
return runtime.NewPluginInsForTest(info.Name, ctrlChan), nil

Check warning on line 112 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L96-L112

Added lines #L96 - L112 were not covered by tests
}

func createRestServer(ip string, port int) *http.Server {
r := mux.NewRouter()
r.HandleFunc("/symbol/start", startSymbolHandler).Methods(http.MethodPost)
r.HandleFunc("/symbol/stop", stopSymbolHandler).Methods(http.MethodPost)
server := &http.Server{
Addr: cast.JoinHostPortInt(ip, port),
// Good practice to set timeouts to avoid Slowloris attacks.
WriteTimeout: time.Second * 60 * 5,
ReadTimeout: time.Second * 60 * 5,
IdleTimeout: time.Second * 60,
Handler: handlers.CORS(handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Type", "Content-Language", "Origin"}))(r),
}
server.SetKeepAlivesEnabled(false)
return server

Check warning on line 128 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L115-L128

Added lines #L115 - L128 were not covered by tests
}

func startSymbolHandler(w http.ResponseWriter, r *http.Request) {
ctrl, err := decode(r)
if err != nil {
http.Error(w, fmt.Sprintf("Invalid body: decode error %v", err), http.StatusBadRequest)
return
}
switch ctrl.PluginType {
case runtime.TYPE_SOURCE:
ss, err := m.Source(ctrl.SymbolName)
if err != nil {
http.Error(w, fmt.Sprintf("running source %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
return
}
source := ss.(*runtime.PortableSource)
newctx, cancel := ctx.WithCancel()
if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
return
}
go func() {
<-newctx.Done()
source.Close(newctx)
cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
}()
err = source.Provision(newctx, ctrl.Config)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err = source.Connect(newctx, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
go source.Subscribe(newctx, func(_ api.StreamContext, data any, _ map[string]any, _ time.Time) {
fmt.Printf("%v\n", data)
}, func(ctx api.StreamContext, err error) {
fmt.Println(err.Error())
})
case runtime.TYPE_SINK:
ss, err := m.Sink(ctrl.SymbolName)
if err != nil {
http.Error(w, fmt.Sprintf("running sink %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
return
}
sink := ss.(*runtime.PortableSink)
newctx, cancel := ctx.WithCancel()
if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
return
}
err = sink.Provision(newctx, ctrl.Config)
if err != nil {
http.Error(w, fmt.Sprintf("open sink %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
return
}
err = sink.Connect(newctx, nil)
if err != nil {
http.Error(w, fmt.Sprintf("open sink %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
return
}
go func() {
defer func() {
sink.Close(newctx)
cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
}()
for {
for _, m := range mockSinkData {
jsonStr, err := json.Marshal(m)
if err != nil {
fmt.Printf("cannot collect data: %v\n", err)
continue

Check warning on line 202 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L131-L202

Added lines #L131 - L202 were not covered by tests
}
err = sink.Collect(newctx, &xsql.RawTuple{Rawdata: jsonStr})
if err != nil {
fmt.Printf("cannot collect data: %v\n", err)
continue

Check warning on line 207 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L204-L207

Added lines #L204 - L207 were not covered by tests
}
select {
case <-newctx.Done():
fmt.Println("stop sink")
return
default:

Check warning on line 213 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L209-L213

Added lines #L209 - L213 were not covered by tests
}
time.Sleep(1 * time.Second)

Check warning on line 215 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L215

Added line #L215 was not covered by tests
}
}
}()
case runtime.TYPE_FUNC:
f, err := m.Function(ctrl.SymbolName)
if err != nil {
http.Error(w, fmt.Sprintf("running function %s %v", ctrl.SymbolName, err), http.StatusBadRequest)
return
}
newctx, cancel := ctx.WithCancel()
fc := context.NewDefaultFuncContext(newctx, 1)
if _, ok := cancels.LoadOrStore(ctrl.PluginType+ctrl.SymbolName, cancel); ok {
http.Error(w, fmt.Sprintf("source symbol %s already exists", ctrl.SymbolName), http.StatusBadRequest)
return
}
go func() {
defer func() {
cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
}()
for {
for _, m := range mockFuncData {
r, ok := f.Exec(fc, m)
if !ok {
fmt.Print("cannot exec func\n")
continue

Check warning on line 240 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L219-L240

Added lines #L219 - L240 were not covered by tests
}
fmt.Println(r)
select {
case <-newctx.Done():
fmt.Println("stop func")
return
default:

Check warning on line 247 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L242-L247

Added lines #L242 - L247 were not covered by tests
}
time.Sleep(1 * time.Second)

Check warning on line 249 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L249

Added line #L249 was not covered by tests
}
}
}()
}

w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))

Check warning on line 256 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L255-L256

Added lines #L255 - L256 were not covered by tests
}

func stopSymbolHandler(w http.ResponseWriter, r *http.Request) {
ctrl, err := decode(r)
if err != nil {
http.Error(w, fmt.Sprintf("Invalid body: decode error %v", err), http.StatusBadRequest)
return
}
if cancel, ok := cancels.Load(ctrl.PluginType + ctrl.SymbolName); ok {
cancel.(context2.CancelFunc)()
cancels.Delete(ctrl.PluginType + ctrl.SymbolName)
} else {
http.Error(w, fmt.Sprintf("Symbol %s already close", ctrl.SymbolName), http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))

Check warning on line 273 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L259-L273

Added lines #L259 - L273 were not covered by tests
}

func decode(r *http.Request) (*runtime.Control, error) {
defer r.Body.Close()
ctrl := &runtime.Control{}
err := json.NewDecoder(r.Body).Decode(ctrl)
return ctrl, err

Check warning on line 280 in tools/plugin_server/plugin_test_server.go

View check run for this annotation

Codecov / codecov/patch

tools/plugin_server/plugin_test_server.go#L276-L280

Added lines #L276 - L280 were not covered by tests
}

0 comments on commit c3681cd

Please sign in to comment.