Skip to content

Commit

Permalink
feat: check job is registered when submit jobInstance (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangXiaomeng committed Jan 23, 2024
1 parent 41f9aca commit d24babd
Showing 1 changed file with 48 additions and 22 deletions.
70 changes: 48 additions & 22 deletions internal/actor/job_instance_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package actor
import (
"context"
"fmt"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/tidwall/gjson"
"time"

"github.com/asynkron/protoactor-go/actor"
Expand Down Expand Up @@ -129,32 +131,56 @@ func (a *jobInstanceActor) handleSubmitJobInstance(actorCtx actor.Context, msg *
Success: proto.Bool(true),
}
actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath))

jobInstanceInfo := convert2JobInstanceInfo(req)
var taskMaster taskmaster.TaskMaster
switch common.ExecuteMode(jobInstanceInfo.GetExecuteMode()) {
case common.StandaloneExecuteMode:
taskMaster = master.NewStandaloneTaskMaster(jobInstanceInfo, actorCtx)
case common.BroadcastExecuteMode:
taskMaster = master.NewBroadcastTaskMaster(jobInstanceInfo, actorCtx)
case common.BatchExecuteMode:
taskMaster = master.NewBatchTaskMaster(jobInstanceInfo, actorCtx)
case common.ParallelExecuteMode:
taskMaster = master.NewParallelTaskMaster(jobInstanceInfo, actorCtx)
case common.GridExecuteMode:
taskMaster = master.NewGridTaskMaster(jobInstanceInfo, actorCtx)
case common.ShardingExecuteMode:
taskMaster = master.NewShardingTaskMaster(jobInstanceInfo, actorCtx)
default:
logger.Errorf("Submit jobInstanceId=%d failed, unknown executeMode=%s", jobInstanceInfo.GetExecuteMode())

// check job is registered
jobName := gjson.Get(jobInstanceInfo.GetContent(), "jobName").String()
// Compatible with the existing Java language configuration mechanism
if jobInstanceInfo.GetJobType() == "java" {
jobName = gjson.Get(jobInstanceInfo.GetContent(), "className").String()
}
task, ok := masterpool.GetTaskMasterPool().Tasks().Find(jobName)
if !ok || task == nil {
fmt.Errorf("handleSubmitJobInstance error, jobName=%s is unregistered. ", jobName)

// report job instance status with at-least-once-delivery
req := &schedulerx.WorkerReportJobInstanceStatusRequest{
JobId: proto.Int64(jobInstanceInfo.GetJobId()),
JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()),
Status: proto.Int32(int32(processor.InstanceStatusFailed)),
DeliveryId: proto.Int64(utils.GetDeliveryId()),
GroupId: proto.String(jobInstanceInfo.GetGroupId()),
Result: proto.String(fmt.Sprintf("jobName=%s is unregistered", jobName)),
}
actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: req,
}
} else {
var taskMaster taskmaster.TaskMaster
switch common.ExecuteMode(jobInstanceInfo.GetExecuteMode()) {
case common.StandaloneExecuteMode:
taskMaster = master.NewStandaloneTaskMaster(jobInstanceInfo, actorCtx)
case common.BroadcastExecuteMode:
taskMaster = master.NewBroadcastTaskMaster(jobInstanceInfo, actorCtx)
case common.BatchExecuteMode:
taskMaster = master.NewBatchTaskMaster(jobInstanceInfo, actorCtx)
case common.ParallelExecuteMode:
taskMaster = master.NewParallelTaskMaster(jobInstanceInfo, actorCtx)
case common.GridExecuteMode:
taskMaster = master.NewGridTaskMaster(jobInstanceInfo, actorCtx)
case common.ShardingExecuteMode:
taskMaster = master.NewShardingTaskMaster(jobInstanceInfo, actorCtx)
default:
logger.Errorf("Submit jobInstanceId=%d failed, unknown executeMode=%s", jobInstanceInfo.GetExecuteMode())
}

if taskMaster != nil {
masterpool.GetTaskMasterPool().Put(jobInstanceInfo.GetJobInstanceId(), taskMaster)
if err := taskMaster.SubmitInstance(msg.Ctx, jobInstanceInfo); err != nil {
return err
if taskMaster != nil {
masterpool.GetTaskMasterPool().Put(jobInstanceInfo.GetJobInstanceId(), taskMaster)
if err := taskMaster.SubmitInstance(msg.Ctx, jobInstanceInfo); err != nil {
return err
}
logger.Infof("Submit jobInstanceId=%d succeed", req.GetJobInstanceId())
}
logger.Infof("Submit jobInstanceId=%d succeed", req.GetJobInstanceId())
}
}
return nil
Expand Down

0 comments on commit d24babd

Please sign in to comment.