From 4380507f8913ee12f24857bf81fc9d2dbf1aafe8 Mon Sep 17 00:00:00 2001 From: HuangXiaomeng Date: Thu, 21 Mar 2024 14:07:42 +0800 Subject: [PATCH 1/4] fix bug: grpc host should get from net --- example/broadcast/main.go | 6 +++--- example/mapreduce/main.go | 8 ++++---- example/mapreduce/order_info.go | 6 +++--- internal/actor/common/utils.go | 13 +------------ internal/actor/init.go | 9 ++++++++- 5 files changed, 19 insertions(+), 23 deletions(-) diff --git a/example/broadcast/main.go b/example/broadcast/main.go index 0e8d199..3c0fe86 100644 --- a/example/broadcast/main.go +++ b/example/broadcast/main.go @@ -24,9 +24,9 @@ func main() { // This is just an example, the real configuration needs to be obtained from the platform cfg := &schedulerx.Config{ Endpoint: "acm.aliyun.com", - Namespace: "a0e3ffd7-xxx-xxx-xxx-86ca9dc68932", - GroupId: "dts-demo", - AppKey: "xxxxx", + Namespace: "fa6ed99e-1469-4477-855c-a2bf1659d039", + GroupId: "xueren_test_sub", + AppKey: "myV5K5Xaf1knuzKdPBaj3A==", } client, err := schedulerx.GetClient(cfg) if err != nil { diff --git a/example/mapreduce/main.go b/example/mapreduce/main.go index d08e4f4..50aa392 100644 --- a/example/mapreduce/main.go +++ b/example/mapreduce/main.go @@ -25,9 +25,9 @@ func main() { // This is just an example, the real configuration needs to be obtained from the platform cfg := &schedulerx.Config{ Endpoint: "acm.aliyun.com", - Namespace: "a0e3ffd7-xxx-xxx-xxx-86ca9dc68932", - GroupId: "dts-demo", - AppKey: "xxxxx", + Namespace: "fa6ed99e-1469-4477-855c-a2bf1659d039", + GroupId: "xueren_test_sub", + AppKey: "myV5K5Xaf1knuzKdPBaj3A==", } client, err := schedulerx.GetClient(cfg) if err != nil { @@ -38,6 +38,6 @@ func main() { task := &TestMapReduceJob{ mapjob.NewMapReduceJobProcessor(), // FIXME how define user behavior } - client.RegisterTask("TestMapReduce", task) + client.RegisterTask("TestMapReduceJob", task) select {} } diff --git a/example/mapreduce/order_info.go b/example/mapreduce/order_info.go index 44e96eb..d081757 100644 --- a/example/mapreduce/order_info.go +++ b/example/mapreduce/order_info.go @@ -51,7 +51,7 @@ func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error { // Process the MapReduce model is used to distributed scan orders for timeout confirmation func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { var ( - num = 100 * 10000 + num = 1000 err error ) taskName := jobCtx.TaskName() @@ -76,8 +76,8 @@ func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.P fmt.Printf("task is not OrderInfo, task=%+v\n", jobCtx.Task()) } fmt.Printf("orderInfo=%+v\n", orderInfo) - time.Sleep(1 * time.Second) - fmt.Println("Finish Process...") + time.Sleep(10 * time.Millisecond) + // fmt.Println("Finish Process...") return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(strconv.Itoa(orderInfo.Value)), diff --git a/internal/actor/common/utils.go b/internal/actor/common/utils.go index d90f3ce..08c54d0 100644 --- a/internal/actor/common/utils.go +++ b/internal/actor/common/utils.go @@ -25,7 +25,6 @@ import ( "github.com/asynkron/protoactor-go/actor" "github.com/alibaba/schedulerx-worker-go/internal/remoting/pool" - "github.com/alibaba/schedulerx-worker-go/internal/utils" ) const ( @@ -72,11 +71,6 @@ func SchedulerxServerPid(ctx context.Context) *actor.PID { // The workerAddr issued by the server is the address reported by the heartbeat. // It is the connection address obtained from the connection pool, not the ActorSystem address, so it needs to be converted. func GetRealWorkerAddr(workerIdAddr string) string { - localHostAddr, err := utils.GetIpv4AddrHost() - if err != nil { - panic(err) - } - parts := strings.Split(workerIdAddr, "@") workerAddr := parts[1] addrParts := strings.Split(workerAddr, ":") @@ -92,12 +86,7 @@ func GetRealWorkerAddr(workerIdAddr string) string { panic(fmt.Sprintf("invalid worker addr: %s", workerAddr)) } - if addrParts[0] == localHostAddr { - // Debugging on local machine, starting multiple processes - host = "127.0.0.1" - } else { - host = addrParts[0] - } + host = addrParts[0] if len(addrParts) == 2 { port = addrParts[1] diff --git a/internal/actor/init.go b/internal/actor/init.go index 36fb175..5b71c39 100644 --- a/internal/actor/init.go +++ b/internal/actor/init.go @@ -17,6 +17,7 @@ package actor import ( + "github.com/alibaba/schedulerx-worker-go/internal/utils" "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/remote" "google.golang.org/grpc" @@ -85,13 +86,19 @@ func InitActors(actorSystem *actor.ActorSystem) error { }() var ( - host = "127.0.0.1" + host = "0.0.0.0" port = 0 // random port ) if actorSystemPort := config.GetWorkerConfig().ActorSystemPort(); actorSystemPort != 0 { port = int(actorSystemPort) } + localHost, err := utils.GetIpv4AddrHost() + if err != nil { + panic(err) + } + host = localHost + // The maximum limit for a subtask is 64kb, and a maximum of 1000 batches can be sent together, which is 64MB, // plus about 200MB for serialization and request headers. remoteConfig := remote.Configure(host, port, From 64fd0defcee638d7306a9c1ebc80b7883812d87d Mon Sep 17 00:00:00 2001 From: HuangXiaomeng Date: Fri, 22 Mar 2024 16:34:30 +0800 Subject: [PATCH 2/4] fix bug: context may null --- internal/actor/container_actor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/actor/container_actor.go b/internal/actor/container_actor.go index 70aa33e..dd19f52 100644 --- a/internal/actor/container_actor.go +++ b/internal/actor/container_actor.go @@ -17,6 +17,7 @@ package actor import ( + "context" "encoding/json" "fmt" "runtime/debug" @@ -346,5 +347,6 @@ func convertMasterStartContainerRequest2JobContext(req *schedulerx.MasterStartCo jobCtx.SetShardingNum(req.GetShardingNum()) jobCtx.SetTimeType(req.GetTimeType()) jobCtx.SetTimeExpression(req.GetTimeExpression()) + jobCtx.Context = context.Background() return jobCtx, nil } From b3637fab52fcf3cf5f91d3011673876ed0c99a82 Mon Sep 17 00:00:00 2001 From: HuangXiaomeng Date: Fri, 22 Mar 2024 16:35:14 +0800 Subject: [PATCH 3/4] feat: actorySystemPort change to grpcPort --- config/worker_config.go | 10 +++++----- internal/actor/init.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/config/worker_config.go b/config/worker_config.go index b51127b..fc59879 100644 --- a/config/worker_config.go +++ b/config/worker_config.go @@ -127,9 +127,9 @@ func WithTaskBodySizeMax(taskBodySizeMax int32) Option { } } -func WithActorSystemPort(port int32) Option { +func WithGrpcPort(port int32) Option { return func(config *WorkerConfig) { - config.actorSystemPort = port + config.grpcPort = port } } @@ -158,7 +158,7 @@ type WorkerConfig struct { workerParallelTaskMaxSize int32 workerMapPageSize int32 taskBodySizeMax int32 - actorSystemPort int32 + grpcPort int32 } func (w *WorkerConfig) IsShareContainerPool() bool { @@ -217,8 +217,8 @@ func (w *WorkerConfig) TaskBodySizeMax() int32 { return w.taskBodySizeMax } -func (w *WorkerConfig) ActorSystemPort() int32 { - return w.actorSystemPort +func (w *WorkerConfig) GrpcPort() int32 { + return w.grpcPort } func defaultWorkerConfig() *WorkerConfig { diff --git a/internal/actor/init.go b/internal/actor/init.go index 5b71c39..0469ce4 100644 --- a/internal/actor/init.go +++ b/internal/actor/init.go @@ -89,8 +89,8 @@ func InitActors(actorSystem *actor.ActorSystem) error { host = "0.0.0.0" port = 0 // random port ) - if actorSystemPort := config.GetWorkerConfig().ActorSystemPort(); actorSystemPort != 0 { - port = int(actorSystemPort) + if grpcPort := config.GetWorkerConfig().GrpcPort(); grpcPort != 0 { + port = int(grpcPort) } localHost, err := utils.GetIpv4AddrHost() From 36ed55d2eeee0520483fa0d5cbd77c9f751861b2 Mon Sep 17 00:00:00 2001 From: HuangXiaomeng Date: Fri, 22 Mar 2024 19:44:19 +0800 Subject: [PATCH 4/4] feat: support config.WithIface --- config/worker_config.go | 11 +++++++++++ internal/actor/init.go | 16 ++++++++++++---- internal/utils/ip_util.go | 25 +++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/config/worker_config.go b/config/worker_config.go index fc59879..42271db 100644 --- a/config/worker_config.go +++ b/config/worker_config.go @@ -133,6 +133,12 @@ func WithGrpcPort(port int32) Option { } } +func WithIface(iface string) Option { + return func(config *WorkerConfig) { + config.iface = iface + } +} + func NewWorkerConfig(opts ...Option) *WorkerConfig { once.Do(func() { workerConfig = defaultWorkerConfig() @@ -159,6 +165,7 @@ type WorkerConfig struct { workerMapPageSize int32 taskBodySizeMax int32 grpcPort int32 + iface string } func (w *WorkerConfig) IsShareContainerPool() bool { @@ -221,6 +228,10 @@ func (w *WorkerConfig) GrpcPort() int32 { return w.grpcPort } +func (w *WorkerConfig) Iface() string { + return w.iface +} + func defaultWorkerConfig() *WorkerConfig { return &WorkerConfig{ isSecondDelayIntervalMS: false, diff --git a/internal/actor/init.go b/internal/actor/init.go index 0469ce4..f9734ae 100644 --- a/internal/actor/init.go +++ b/internal/actor/init.go @@ -93,11 +93,19 @@ func InitActors(actorSystem *actor.ActorSystem) error { port = int(grpcPort) } - localHost, err := utils.GetIpv4AddrHost() - if err != nil { - panic(err) + if config.GetWorkerConfig().Iface() != "" { + localHost, err := utils.GetIpv4AddrByIface(config.GetWorkerConfig().Iface()) + if err != nil { + panic(err) + } + host = localHost + } else { + localHost, err := utils.GetIpv4AddrHost() + if err != nil { + panic(err) + } + host = localHost } - host = localHost // The maximum limit for a subtask is 64kb, and a maximum of 1000 batches can be sent together, which is 64MB, // plus about 200MB for serialization and request headers. diff --git a/internal/utils/ip_util.go b/internal/utils/ip_util.go index bbeec88..f210250 100644 --- a/internal/utils/ip_util.go +++ b/internal/utils/ip_util.go @@ -56,6 +56,31 @@ func GetIpv4AddrHost() (string, error) { return "", errors.New("cannot find valid ipv4 addr") } +func GetIpv4AddrByIface(_iface string) (string, error) { + ifaces, err := net.Interfaces() + if err != nil { + return "", err + } + + // 遍历所有网卡 + for _, iface := range ifaces { + if iface.Name == _iface { // 指定网卡名称 + addrs, err := iface.Addrs() + if err != nil { + return "", err + } + // 遍历网卡的地址信息 + for _, addr := range addrs { + ip, _, _ := net.ParseCIDR(addr.String()) + if ip.To4() != nil { + return ip.String(), nil + } + } + } + } + return "", errors.New("cannot find valid ipv4 addr") +} + func ParseIPAddr(addr string) (string, int, error) { host, portStr, err := net.SplitHostPort(addr) if err != nil {