diff --git a/internal/actor/job_instance_actor.go b/internal/actor/job_instance_actor.go index ffd431b..20ee8dd 100644 --- a/internal/actor/job_instance_actor.go +++ b/internal/actor/job_instance_actor.go @@ -19,6 +19,8 @@ package actor import ( "context" "fmt" + "github.com/alibaba/schedulerx-worker-go/processor" + "github.com/tidwall/gjson" "time" "github.com/asynkron/protoactor-go/actor" @@ -129,32 +131,56 @@ func (a *jobInstanceActor) handleSubmitJobInstance(actorCtx actor.Context, msg * Success: proto.Bool(true), } actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath)) - jobInstanceInfo := convert2JobInstanceInfo(req) - var taskMaster taskmaster.TaskMaster - switch common.ExecuteMode(jobInstanceInfo.GetExecuteMode()) { - case common.StandaloneExecuteMode: - taskMaster = master.NewStandaloneTaskMaster(jobInstanceInfo, actorCtx) - case common.BroadcastExecuteMode: - taskMaster = master.NewBroadcastTaskMaster(jobInstanceInfo, actorCtx) - case common.BatchExecuteMode: - taskMaster = master.NewBatchTaskMaster(jobInstanceInfo, actorCtx) - case common.ParallelExecuteMode: - taskMaster = master.NewParallelTaskMaster(jobInstanceInfo, actorCtx) - case common.GridExecuteMode: - taskMaster = master.NewGridTaskMaster(jobInstanceInfo, actorCtx) - case common.ShardingExecuteMode: - taskMaster = master.NewShardingTaskMaster(jobInstanceInfo, actorCtx) - default: - logger.Errorf("Submit jobInstanceId=%d failed, unknown executeMode=%s", jobInstanceInfo.GetExecuteMode()) + + // check job is registered + jobName := gjson.Get(jobInstanceInfo.GetContent(), "jobName").String() + // Compatible with the existing Java language configuration mechanism + if jobInstanceInfo.GetJobType() == "java" { + jobName = gjson.Get(jobInstanceInfo.GetContent(), "className").String() } + task, ok := masterpool.GetTaskMasterPool().Tasks().Find(jobName) + if !ok || task == nil { + fmt.Errorf("handleSubmitJobInstance error, jobName=%s is unregistered. ", jobName) + + // report job instance status with at-least-once-delivery + req := &schedulerx.WorkerReportJobInstanceStatusRequest{ + JobId: proto.Int64(jobInstanceInfo.GetJobId()), + JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()), + Status: proto.Int32(int32(processor.InstanceStatusFailed)), + DeliveryId: proto.Int64(utils.GetDeliveryId()), + GroupId: proto.String(jobInstanceInfo.GetGroupId()), + Result: proto.String(fmt.Sprintf("jobName=%s is unregistered", jobName)), + } + actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ + Msg: req, + } + } else { + var taskMaster taskmaster.TaskMaster + switch common.ExecuteMode(jobInstanceInfo.GetExecuteMode()) { + case common.StandaloneExecuteMode: + taskMaster = master.NewStandaloneTaskMaster(jobInstanceInfo, actorCtx) + case common.BroadcastExecuteMode: + taskMaster = master.NewBroadcastTaskMaster(jobInstanceInfo, actorCtx) + case common.BatchExecuteMode: + taskMaster = master.NewBatchTaskMaster(jobInstanceInfo, actorCtx) + case common.ParallelExecuteMode: + taskMaster = master.NewParallelTaskMaster(jobInstanceInfo, actorCtx) + case common.GridExecuteMode: + taskMaster = master.NewGridTaskMaster(jobInstanceInfo, actorCtx) + case common.ShardingExecuteMode: + taskMaster = master.NewShardingTaskMaster(jobInstanceInfo, actorCtx) + default: + logger.Errorf("Submit jobInstanceId=%d failed, unknown executeMode=%s", jobInstanceInfo.GetExecuteMode()) + } - if taskMaster != nil { - masterpool.GetTaskMasterPool().Put(jobInstanceInfo.GetJobInstanceId(), taskMaster) - if err := taskMaster.SubmitInstance(msg.Ctx, jobInstanceInfo); err != nil { - return err + if taskMaster != nil { + masterpool.GetTaskMasterPool().Put(jobInstanceInfo.GetJobInstanceId(), taskMaster) + if err := taskMaster.SubmitInstance(msg.Ctx, jobInstanceInfo); err != nil { + return err + } + logger.Infof("Submit jobInstanceId=%d succeed", req.GetJobInstanceId()) } - logger.Infof("Submit jobInstanceId=%d succeed", req.GetJobInstanceId()) } } return nil