Skip to content

Commit

Permalink
Merge pull request #20 from alibaba/develop
Browse files Browse the repository at this point in the history
merge from develop: fix memory leak
  • Loading branch information
yaohuitc committed Apr 24, 2024
2 parents 345d5a1 + f4f6d23 commit 4e62b1d
Show file tree
Hide file tree
Showing 22 changed files with 179 additions and 154 deletions.
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func newClient(cfg *Config, opts ...Option) (*Client, error) {
logger.Infof("SchedulerX server connected, remoteAddr=%s, localAddr=%s", conn.RemoteAddr(), conn.LocalAddr().String())
}

tasks := tasks.GetTaskMap()
masterpool.InitTaskMasterPool(masterpool.NewTaskMasterPool(tasks))
taskMap := tasks.GetTaskMap()
masterpool.InitTaskMasterPool(masterpool.NewTaskMasterPool(taskMap))

// Init actors
actorSystem := actor.NewActorSystem()
Expand All @@ -162,7 +162,7 @@ func newClient(cfg *Config, opts ...Option) (*Client, error) {
return &Client{
cfg: cfg,
opts: options,
tasks: tasks,
tasks: taskMap,
actorSystem: actorSystem,
}, nil
}
Expand Down
12 changes: 12 additions & 0 deletions config/worker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ func WithIface(iface string) Option {
}
}

func WithQueueSize(queueSize int32) Option {
return func(config *WorkerConfig) {
config.queueSize = queueSize
}
}

func NewWorkerConfig(opts ...Option) *WorkerConfig {
once.Do(func() {
workerConfig = defaultWorkerConfig()
Expand Down Expand Up @@ -166,6 +172,7 @@ type WorkerConfig struct {
taskBodySizeMax int32
grpcPort int32
iface string
queueSize int32
}

func (w *WorkerConfig) IsShareContainerPool() bool {
Expand Down Expand Up @@ -232,6 +239,10 @@ func (w *WorkerConfig) Iface() string {
return w.iface
}

func (w *WorkerConfig) QueueSize() int32 {
return w.queueSize
}

func defaultWorkerConfig() *WorkerConfig {
return &WorkerConfig{
isSecondDelayIntervalMS: false,
Expand All @@ -248,5 +259,6 @@ func defaultWorkerConfig() *WorkerConfig {
workerParallelTaskMaxSize: constants.ParallelTaskListSizeMaxDefault,
workerMapPageSize: constants.WorkerMapPageSizeDefault,
taskBodySizeMax: constants.TaskBodySizeMaxDefault,
queueSize: constants.MapMasterQueueSizeDefault,
}
}
31 changes: 17 additions & 14 deletions internal/actor/common/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@

package actorcomm

import "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
import (
"github.com/alibaba/schedulerx-worker-go/config"
"github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
)

var (
sxMsgCh = make(chan interface{}, 10*10000)
heartbeatMsgCh = make(chan interface{}, 10*10000)
taskMasterMsgCh = make(chan interface{}, 10*10000)
containerRouterMsgCh = make(chan interface{}, 10*10000)
atLeastOnceDeliveryMsgCh = make(chan interface{}, 10*10000)

workerMapTaskRespMsgCh = make(chan *schedulerx.WorkerMapTaskResponse, 10*10000)
workerBatchUpdateTaskStatusRespMsgCh = make(chan *schedulerx.WorkerBatchUpdateTaskStatusResponse, 10*10000)
workerQueryJobInstanceStatusRespMsgCh = make(chan *schedulerx.WorkerQueryJobInstanceStatusResponse, 10*10000)
workerClearTasksRespMsgCh = make(chan *schedulerx.WorkerClearTasksResponse, 10*10000)
workerBatchCreateTasksRespMsgCh = make(chan *schedulerx.WorkerBatchCreateTasksResponse, 10*10000)
workerPullTasksRespMsgCh = make(chan *schedulerx.WorkerPullTasksResponse, 10*10000)
workerReportTaskListStatusRespMsgCh = make(chan *schedulerx.WorkerReportTaskListStatusResponse, 10*10000)
sxMsgCh = make(chan interface{}, config.GetWorkerConfig().QueueSize())
heartbeatMsgCh = make(chan interface{}, config.GetWorkerConfig().QueueSize())
taskMasterMsgCh = make(chan interface{}, config.GetWorkerConfig().QueueSize())
containerRouterMsgCh = make(chan interface{}, config.GetWorkerConfig().QueueSize())
atLeastOnceDeliveryMsgCh = make(chan interface{}, config.GetWorkerConfig().QueueSize())

workerMapTaskRespMsgCh = make(chan *schedulerx.WorkerMapTaskResponse, config.GetWorkerConfig().QueueSize())
workerBatchUpdateTaskStatusRespMsgCh = make(chan *schedulerx.WorkerBatchUpdateTaskStatusResponse, config.GetWorkerConfig().QueueSize())
workerQueryJobInstanceStatusRespMsgCh = make(chan *schedulerx.WorkerQueryJobInstanceStatusResponse, config.GetWorkerConfig().QueueSize())
workerClearTasksRespMsgCh = make(chan *schedulerx.WorkerClearTasksResponse, config.GetWorkerConfig().QueueSize())
workerBatchCreateTasksRespMsgCh = make(chan *schedulerx.WorkerBatchCreateTasksResponse, config.GetWorkerConfig().QueueSize())
workerPullTasksRespMsgCh = make(chan *schedulerx.WorkerPullTasksResponse, config.GetWorkerConfig().QueueSize())
workerReportTaskListStatusRespMsgCh = make(chan *schedulerx.WorkerReportTaskListStatusResponse, config.GetWorkerConfig().QueueSize())
)

func SxMsgReceiver() chan interface{} {
Expand Down
62 changes: 35 additions & 27 deletions internal/actor/container_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ import (
)

var _ actor.Actor = &containerActor{}
var defaultActorPool, _ = ants.NewPool(
ants.DefaultAntsPoolSize,
ants.WithPanicHandler(func(i interface{}) {
if r := recover(); r != nil {
logger.Errorf("Panic happened in containerStarter, %v\n%s", r, debug.Stack())
}
}))

type containerActor struct {
enableShareContainerPool bool
Expand All @@ -53,19 +60,12 @@ type containerActor struct {
}

func newContainerActor() *containerActor {
gopool, _ := ants.NewPool(
ants.DefaultAntsPoolSize,
ants.WithPanicHandler(func(i interface{}) {
if r := recover(); r != nil {
logger.Errorf("Panic happened in containerStarter, %v\n%s", r, debug.Stack())
}
}))
return &containerActor{
enableShareContainerPool: config.GetWorkerConfig().IsShareContainerPool(),
batchSize: config.GetWorkerConfig().WorkerMapPageSize(),
statusReqBatchHandlerPool: batch.GetContainerStatusReqHandlerPool(),
containerPool: container.GetThreadContainerPool(),
containerStarter: gopool,
containerStarter: defaultActorPool,
lock: sync.Mutex{},
}
}
Expand Down Expand Up @@ -205,26 +205,31 @@ func (a *containerActor) handleKillContainer(actorCtx actor.Context, req *schedu

func (a *containerActor) handleDestroyContainerPool(actorCtx actor.Context, req *schedulerx.MasterDestroyContainerPoolRequest) {
if !a.enableShareContainerPool {
handler, ok := a.statusReqBatchHandlerPool.GetHandlers().Load(req.GetJobInstanceId())
if ok {
a.lock.Lock()
defer a.lock.Unlock()
// handler, ok := a.statusReqBatchHandlerPool.GetHandlers().Load(req.GetJobInstanceId())
// if ok {
a.lock.Lock()
defer a.lock.Unlock()
logger.Infof("handleDestroyContainerPool from jobInstanceId=%v.", req.GetJobInstanceId())
a.statusReqBatchHandlerPool.Stop(req.GetJobInstanceId())
a.containerPool.DestroyByInstance(req.GetJobInstanceId())
/*
if h, ok := handler.(*batch.ContainerStatusReqHandler); ok {
if latestRequest := h.GetLatestRequest(); latestRequest != nil {
reportTaskStatusRequest, ok := latestRequest.(*schedulerx.ContainerReportTaskStatusRequest)
if ok {
if reportTaskStatusRequest.GetSerialNum() != req.GetSerialNum() {
logger.Infof("skip handleDestroyContainerPool cycleId=%v_%v, handler serialNum=%v.", req.GetJobInstanceId(), req.GetSerialNum(), reportTaskStatusRequest.GetSerialNum())
return
if latestRequest := h.GetLatestRequest(); latestRequest != nil {
reportTaskStatusRequest, ok := latestRequest.(*schedulerx.ContainerReportTaskStatusRequest)
if ok {
if reportTaskStatusRequest.GetSerialNum() != req.GetSerialNum() {
logger.Infof("skip handleDestroyContainerPool cycleId=%v_%v, handler serialNum=%v.", req.GetJobInstanceId(), req.GetSerialNum(), reportTaskStatusRequest.GetSerialNum())
return
}
logger.Infof("handleDestroyContainerPool from cycleId=%v_%v, handler serialNum=%v.", req.GetJobInstanceId(), req.GetSerialNum(), reportTaskStatusRequest.GetSerialNum())
a.statusReqBatchHandlerPool.Stop(req.GetJobInstanceId())
a.containerPool.DestroyByInstance(req.GetJobInstanceId())
}
} else {
logger.Infof("handleDestroyContainerPool from cycleId=%v_%v, handler serialNum=%v.", req.GetJobInstanceId(), req.GetSerialNum(), reportTaskStatusRequest.GetSerialNum())
a.containerPool.DestroyByInstance(req.GetJobInstanceId())
a.statusReqBatchHandlerPool.Stop(req.GetJobInstanceId())
}
}
}
}
*/
// }
}
response := &schedulerx.MasterDestroyContainerPoolResponse{
Success: proto.Bool(true),
Expand All @@ -236,7 +241,7 @@ func (a *containerActor) handleDestroyContainerPool(actorCtx actor.Context, req
logger.Warnf("Cannot send MasterKillContainerResponse due to sender is unknown in handleDestroyContainerPool of containerActor, request=%+v", req)
}

a.containerPool.ReleaseInstanceLock(req.GetJobInstanceId())
// a.containerPool.ReleaseInstanceLock(req.GetJobInstanceId())
}

func (a *containerActor) killInstance(jobId, jobInstanceId int64) {
Expand All @@ -250,6 +255,7 @@ func (a *containerActor) killInstance(jobId, jobInstanceId int64) {
if strings.HasPrefix(uniqueId, prefixKey) {
container.Kill()
containerMap.Delete(uniqueId)
a.statusReqBatchHandlerPool.Stop(jobInstanceId)
}
return true
})
Expand Down Expand Up @@ -282,10 +288,12 @@ func (a *containerActor) startContainer(actorCtx actor.Context, req *schedulerx.
}
if !a.statusReqBatchHandlerPool.Contains(statusReqBatchHandlerKey) {
// support 1.5 million requests
reqQueue := batch.NewReqQueue(150 * 10000)
reqQueue := batch.NewReqQueue(config.GetWorkerConfig().QueueSize())
a.statusReqBatchHandlerPool.Start(
statusReqBatchHandlerKey,
batch.NewContainerStatusReqHandler(statusReqBatchHandlerKey, 1, 1, a.batchSize, reqQueue, req.GetInstanceMasterAkkaPath()))
batch.NewContainerStatusReqHandler(statusReqBatchHandlerKey, 1, 1,
a.batchSize, reqQueue, req.GetInstanceMasterAkkaPath()),
)
}
consumerNum := int32(constants.ConsumerNumDefault)
if req.GetConsumerNum() > 0 {
Expand Down
18 changes: 10 additions & 8 deletions internal/actor/job_instance_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package actor
import (
"context"
"fmt"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/tidwall/gjson"
"time"

"github.com/tidwall/gjson"

"github.com/alibaba/schedulerx-worker-go/processor"

"github.com/asynkron/protoactor-go/actor"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -67,16 +69,16 @@ func (a *jobInstanceActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *schedulerx.WorkerBatchReportTaskStatuesResponse:
// send to atLeastOnceDeliveryRoutingActor
//actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
// actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
// Msg: msg,
//}
// }
// FIXME atLeastOnceDelivery not yet implement, retry 3 times, interval 30s
a.handleReportWorkerStatus(ctx, ctx.Message())
case *schedulerx.WorkerReportJobInstanceStatusResponse:
// send to atLeastOnceDeliveryRoutingActor
//actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
// actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
// Msg: msg,
//}
// }
// FIXME atLeastOnceDelivery not yet implement, retry 3 times, interval 30s
a.handleReportWorkerStatus(ctx, ctx.Message())
case *actorcomm.SchedulerWrappedMsg:
Expand Down Expand Up @@ -141,7 +143,7 @@ func (a *jobInstanceActor) handleSubmitJobInstance(actorCtx actor.Context, msg *
}
task, ok := masterpool.GetTaskMasterPool().Tasks().Find(jobName)
if !ok || task == nil {
fmt.Errorf("handleSubmitJobInstance error, jobName=%s is unregistered. ", jobName)
logger.Errorf("handleSubmitJobInstance error, jobName=%s is unregistered. ", jobName)

// report job instance status with at-least-once-delivery
req := &schedulerx.WorkerReportJobInstanceStatusRequest{
Expand Down Expand Up @@ -244,7 +246,7 @@ func (a *jobInstanceActor) handleKillJobInstance(actorCtx actor.Context, msg *ac
} else {
if taskMaster := masterpool.GetTaskMasterPool().Get(req.GetJobInstanceId()); taskMaster != nil {
if err := taskMaster.KillInstance("killed from server"); err != nil {
logger.Infof(fmt.Sprintf("%d killed from server failed, err=%s", req.GetJobInstanceId()), err.Error())
logger.Infof("%d killed from server failed, err=%s", req.GetJobInstanceId(), err.Error())
}
}
errMsg := fmt.Sprintf("%d killed from server", req.GetJobInstanceId())
Expand Down
12 changes: 9 additions & 3 deletions internal/batch/base_req_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func (rcvr *BaseReqHandler) SetWorkThreadNum(workThreadNum int) {
}

func (rcvr *BaseReqHandler) Start(h ReqHandler) error {
gopool, err := ants.NewPool(rcvr.maxBatchThreadNum,
gopool, err := ants.NewPool(
rcvr.maxBatchThreadNum,
ants.WithExpiryDuration(30*time.Second),
ants.WithPanicHandler(func(i interface{}) {
if r := recover(); r != nil {
Expand All @@ -145,10 +146,10 @@ func (rcvr *BaseReqHandler) Start(h ReqHandler) error {
for {
select {
case <-rcvr.stopBatchRetrieveCh:
break
return
default:
reqs := rcvr.AsyncHandleReqs(h)
logger.Debugf("jobInstanceId=%s, batch retrieve reqs, size:%d, remain size:%d, batchSize:%d",
logger.Debugf("jobInstanceId=%d, batch retrieve reqs, size:%d, remain size:%d, batchSize:%d",
rcvr.jobInstanceId, len(reqs), len(rcvr.reqsQueue.requests), rcvr.batchSize)
if int32(len(reqs)) < rcvr.batchSize*4/5 {
// no element in reqs, sleep a while for aggregation
Expand All @@ -171,9 +172,14 @@ func (rcvr *BaseReqHandler) Stop() {
}
if rcvr.batchProcessSvc != nil {
rcvr.batchProcessSvc.Release()
rcvr.batchProcessSvc = nil
}
if rcvr.reqsQueue != nil {
rcvr.reqsQueue.Clear()
rcvr.reqsQueue = nil
}
if rcvr.activeRunnableNum != nil {
rcvr.activeRunnableNum = nil
}
}

Expand Down
17 changes: 10 additions & 7 deletions internal/batch/container_status_req_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []inte
err := h.batchProcessSvc.Submit(func() {
if h.enableShareContainerPool {
// FIXME fix import cycle
//// 如果开启共享线程池,statues 可能会有多个 jobInstanceId,需要先 split 成不同的 list
//taskStatusRequestMap := make(map[*Pair][]*schedulerx.ContainerReportTaskStatusRequest)
//for _, req := range reqs {
// // 如果开启共享线程池,statues 可能会有多个 jobInstanceId,需要先 split 成不同的 list
// taskStatusRequestMap := make(map[*Pair][]*schedulerx.ContainerReportTaskStatusRequest)
// for _, req := range reqs {
// pair := &Pair{
// jobInstanceId: req.GetJobInstanceId(),
// serialNum: req.GetSerialNum(),
Expand All @@ -75,10 +75,10 @@ func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []inte
// } else {
// taskStatusRequestMap[pair] = []*schedulerx.ContainerReportTaskStatusRequest{req}
// }
//}
// }
//
//// 针对不同的 jobInstanceId,构造 batchStatusRequests
//for pair, reqs := range taskStatusRequestMap {
// // 针对不同的 jobInstanceId,构造 batchStatusRequests
// for pair, reqs := range taskStatusRequestMap {
// var (
// instanceMasterActorPath string
// finishCount = 0
Expand Down Expand Up @@ -137,7 +137,7 @@ func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []inte
// } else {
// logger.Errorf("instanceMasterActorPath is null, jobInstanceId=%d", jobInstanceId)
// }
//}
// }
} else {
taskStatuses := make([]*schedulerx.TaskStatusInfo, 0, len(reqs))
// some attrs are duplicated in all reqs, for example: workAddr, workerId, jobId, jobInstanceId, taskMasterPath
Expand All @@ -157,6 +157,9 @@ func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []inte
if req.GetProgress() != "" {
taskStatusInfo.Progress = proto.String(req.GetProgress())
}
if req.GetTraceId() != "" {
taskStatusInfo.TraceId = proto.String(req.GetTraceId())
}
taskStatuses = append(taskStatuses, taskStatusInfo)
}
req := &schedulerx.ContainerBatchReportTaskStatuesRequest{
Expand Down
1 change: 1 addition & 0 deletions internal/batch/container_status_req_handler_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (p *ContainerStatusReqHandlerPool) Stop(jobInstanceId int64) {
handler, ok := p.handlers.LoadAndDelete(jobInstanceId)
if ok {
handler.(*ContainerStatusReqHandler).Stop()
handler = nil
}
}

Expand Down
Loading

0 comments on commit 4e62b1d

Please sign in to comment.