Skip to content

Commit

Permalink
feat: Use HashSet for job definitions (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
linkdotnet committed Jun 19, 2024
1 parent c7427b3 commit aaa35a6
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ All notable changes to **NCronJob** will be documented in this file. The project

## [Unreleased]

### Changed

- Identical Job Definitions will not lead to multiple instances of the job running concurrently. By [@linkdotnet](https://github.com/linkdotnet).

## [2.8.2] - 2024-06-18

### Changed
Expand Down
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<PackageReference Include="SonarAnalyzer.CSharp" Version="9.26.0.92422">
<PackageReference Include="SonarAnalyzer.CSharp" Version="9.27.0.93347">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
18 changes: 18 additions & 0 deletions docs/features/define-and-schedule-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,24 @@ Services.AddNCronJob(options =>
});
```

!!! info

Defining multiple identifical schedules for the same job will not lead to multiple instances of the job running concurrently. NCronJob will ensure that only one instance of the job is running at any given time. One can define different custom names for each schedule to differentiate between them.

The following example illustrates how to define multiple schedules that are identical and will only lead to one instance of the job running at any given time:

```csharp
Services.AddNCronJob(options =>
{
options.AddJob<MyCronJob>(j =>
{
j.WithCronExpression("0 20 * * *")
.And
.WithCronExpression("0 20 * * *");
});
});
```

## Scheduling Jobs With Time Zones
The library offers you the ability to schedule jobs using time zones.

Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ theme:
features:
- navigation.instant
- content.action.edit
- content.code.copy
markdown_extensions:
- smarty
- sane_lists
Expand Down
2 changes: 1 addition & 1 deletion src/NCronJob/Configuration/ConcurrencySettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ internal class ConcurrencySettings
/// currently uses the ThreadPool.UnsafeQueueUserWorkItem method to execute tasks, which may have security implications,
/// and is not currently recommended for production use.
/// </remarks>
public static bool UseDeterministicTaskScheduler => true;
public static bool UseDeterministicTaskScheduler => false;
}

46 changes: 34 additions & 12 deletions src/NCronJob/Registry/JobRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,39 @@ namespace NCronJob;

internal sealed class JobRegistry
{
private readonly List<JobDefinition> allJobs;
private readonly ImmutableArray<JobDefinition> oneTimeJobs;
private readonly List<JobDefinition> cronJobs;
private readonly HashSet<JobDefinition> allJobs;
private readonly ImmutableHashSet<JobDefinition> oneTimeJobs;
private readonly HashSet<JobDefinition> cronJobs;

public JobRegistry(IEnumerable<JobDefinition> jobs)
{
var jobDefinitions = jobs as JobDefinition[] ?? jobs.ToArray();
allJobs = [..jobDefinitions];
cronJobs = [..jobDefinitions.Where(c => c.CronExpression is not null)];
allJobs = new HashSet<JobDefinition>(jobDefinitions, JobDefinitionEqualityComparer.Instance);
cronJobs = new HashSet<JobDefinition>(jobDefinitions.Where(c => c.CronExpression is not null), JobDefinitionEqualityComparer.Instance);
oneTimeJobs = [..jobDefinitions.Where(c => c.IsStartupJob)];

AssertNoDuplicateJobNames();
}

public IReadOnlyCollection<JobDefinition> GetAllCronJobs() => cronJobs;

public IReadOnlyCollection<JobDefinition> GetAllOneTimeJobs() => oneTimeJobs;

public bool IsJobRegistered<T>() => allJobs.Exists(j => j.Type == typeof(T));
public bool IsJobRegistered<T>() => allJobs.Any(j => j.Type == typeof(T));

public JobDefinition GetJobDefinition<T>() => allJobs.First(j => j.Type == typeof(T));

public JobDefinition? FindJobDefinition(Type type)
=> allJobs.Find(j => j.Type == type);
=> allJobs.FirstOrDefault(j => j.Type == type);

public JobDefinition? FindJobDefinition(string jobName)
=> allJobs.Find(j => j.CustomName == jobName);
=> allJobs.FirstOrDefault(j => j.CustomName == jobName);

public void Add(JobDefinition jobDefinition)
{
AssertNoDuplicateJobNames(jobDefinition.CustomName);

var isTypeUpdate = allJobs.Exists(j => j.JobFullName == jobDefinition.JobFullName);
var isTypeUpdate = allJobs.Any(j => j.JobFullName == jobDefinition.JobFullName);
if (isTypeUpdate)
{
Remove(jobDefinition);
Expand All @@ -48,13 +49,13 @@ public void Add(JobDefinition jobDefinition)
}
}
public int GetJobTypeConcurrencyLimit(string jobTypeName)
=> allJobs.Find(j => j.JobFullName == jobTypeName)
=> allJobs.FirstOrDefault(j => j.JobFullName == jobTypeName)
?.ConcurrencyPolicy
?.MaxDegreeOfParallelism ?? 1;

public void RemoveByName(string jobName) => Remove(allJobs.Find(j => j.CustomName == jobName));
public void RemoveByName(string jobName) => Remove(allJobs.FirstOrDefault(j => j.CustomName == jobName));

public void RemoveByType(Type type) => Remove(allJobs.Find(j => j.Type == type));
public void RemoveByType(Type type) => Remove(allJobs.FirstOrDefault(j => j.Type == type));

private void Remove(JobDefinition? jobDefinition)
{
Expand Down Expand Up @@ -84,4 +85,25 @@ private void AssertNoDuplicateJobNames(string? additionalJobName = null)
throw new InvalidOperationException($"Duplicate job names found: {string.Join(", ", duplicateJobName)}");
}
}

private sealed class JobDefinitionEqualityComparer : IEqualityComparer<JobDefinition>
{
public static readonly JobDefinitionEqualityComparer Instance = new();

public bool Equals(JobDefinition? x, JobDefinition? y) =>
(x is null && y is null) || (x is not null && y is not null && x.Type == y.Type
&& x.Parameter == y.Parameter
&& x.CronExpression == y.CronExpression
&& x.TimeZone == y.TimeZone
&& x.CustomName == y.CustomName
&& x.IsStartupJob == y.IsStartupJob);

public int GetHashCode(JobDefinition obj) => HashCode.Combine(
obj.Type,
obj.Parameter,
obj.CronExpression,
obj.TimeZone,
obj.CustomName,
obj.IsStartupJob);
}
}
5 changes: 4 additions & 1 deletion src/NCronJob/Scheduler/TaskScheduler/TaskFactoryProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ namespace NCronJob;

internal static class TaskFactoryProvider
{
private static TaskScheduler TaskScheduler => ConcurrencySettings.UseDeterministicTaskScheduler ? new DeterministicTaskScheduler() : TaskScheduler.Default;
private static TaskScheduler TaskScheduler => ConcurrencySettings.UseDeterministicTaskScheduler
? new DeterministicTaskScheduler()
: TaskScheduler.Default;

private static readonly TaskFactory TaskFactory = new(
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
Expand Down
53 changes: 41 additions & 12 deletions tests/NCronJob.Tests/NCronJobIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public async Task EachJobRunHasItsOwnScope()
ServiceCollection.AddSingleton(storage);
ServiceCollection.AddScoped<GuidGenerator>();
ServiceCollection.AddNCronJob(n => n.AddJob<ScopedServiceJob>(
p => p.WithCronExpression("* * * * *").And.WithCronExpression("* * * * *")));
p => p.WithCronExpression("* * * * *").WithParameter("null")
.And
.WithCronExpression("* * * * *")));
var provider = CreateServiceProvider();

await provider.GetRequiredService<IHostedService>().StartAsync(CancellationToken);
Expand Down Expand Up @@ -214,10 +216,6 @@ public async Task WhileAwaitingJobTriggeringInstantJobShouldAnywayTriggerCronJob
[Fact]
public async Task MinimalJobApiCanBeUsedForTriggeringCronJobs()
{
ServiceCollection.AddNCronJob(async (ChannelWriter<object?> writer) =>
{
await writer.WriteAsync(null);
}, "* * * * *");
ServiceCollection.AddNCronJob(async (ChannelWriter<object?> writer) =>
{
await writer.WriteAsync(null);
Expand All @@ -227,18 +225,18 @@ public async Task MinimalJobApiCanBeUsedForTriggeringCronJobs()
await provider.GetRequiredService<IHostedService>().StartAsync(CancellationToken);

FakeTimer.Advance(TimeSpan.FromMinutes(1));
var jobFinished = await WaitForJobsOrTimeout(2);
var jobFinished = await WaitForJobsOrTimeout(1);
jobFinished.ShouldBeTrue();
}

[Fact]
public async Task ConcurrentJobConfigurationShouldBeRespected()
{
ServiceCollection.AddNCronJob(n => n.AddJob<ShortRunningJob>(p => p
.WithCronExpression("* * * * *")
.And.WithCronExpression("* * * * *")
.And.WithCronExpression("* * * * *")
.And.WithCronExpression("* * * * *")));
.WithCronExpression("* * * * *").WithName("Job 1")
.And.WithCronExpression("* * * * *").WithName("Job 2")
.And.WithCronExpression("* * * * *").WithName("Job 3")
.And.WithCronExpression("* * * * *").WithName("Job 4")));
var provider = CreateServiceProvider();

await provider.GetRequiredService<IHostedService>().StartAsync(CancellationToken);
Expand Down Expand Up @@ -297,7 +295,6 @@ public async Task ExecuteAnInstantJobDelegate()
[Fact]
public async Task AnonymousJobsCanBeExecutedMultipleTimes()
{

ServiceCollection.AddNCronJob(n => n.AddJob(async (ChannelWriter<object> writer, CancellationToken ct) =>
{
await Task.Delay(10, ct);
Expand Down Expand Up @@ -334,7 +331,7 @@ public void AddingJobsWithTheSameCustomNameLeadsToException()
{
ServiceCollection.AddNCronJob(
n => n.AddJob(() => { }, "* * * * *", jobName: "Job1")
.AddJob(() => { }, "* * * * *", jobName: "Job1"));
.AddJob(() => { }, "0 * * * *", jobName: "Job1"));
var provider = CreateServiceProvider();

Action act = () => provider.GetRequiredService<JobRegistry>();
Expand All @@ -354,6 +351,38 @@ public void AddJobsDynamicallyWhenNameIsDuplicatedLeadsToException()
act.ShouldThrow<InvalidOperationException>();
}

[Fact]
public async Task TwoJobsWithSameDefinitionLeadToOneExecution()
{
ServiceCollection.AddNCronJob(n => n.AddJob<SimpleJob>(p => p.WithCronExpression("* * * * *").And.WithCronExpression("* * * * *")));
var provider = CreateServiceProvider();
await provider.GetRequiredService<IHostedService>().StartAsync(CancellationToken);

var countJobs = provider.GetRequiredService<JobRegistry>().GetAllCronJobs().Count;
countJobs.ShouldBe(1);

FakeTimer.Advance(TimeSpan.FromMinutes(1));
var jobFinished = await WaitForJobsOrTimeout(2, TimeSpan.FromMilliseconds(250));
jobFinished.ShouldBeFalse();
}

[Fact]
public async Task TwoJobsWithDifferentDefinitionLeadToTwoExecutions()
{
ServiceCollection.AddNCronJob(n => n
.AddJob<SimpleJob>(p => p.WithCronExpression("* * * * *").WithParameter("1"))
.AddJob<SimpleJob>(p => p.WithCronExpression("* * * * *").WithParameter("2")));
var provider = CreateServiceProvider();
await provider.GetRequiredService<IHostedService>().StartAsync(CancellationToken);

var countJobs = provider.GetRequiredService<JobRegistry>().GetAllCronJobs().Count;
countJobs.ShouldBe(2);

FakeTimer.Advance(TimeSpan.FromMinutes(1));
var jobFinished = await WaitForJobsOrTimeout(2, TimeSpan.FromMilliseconds(250));
jobFinished.ShouldBeTrue();
}

private static class JobMethods
{
public static async Task WriteTrueStaticAsync(ChannelWriter<object> writer, CancellationToken ct)
Expand Down

0 comments on commit aaa35a6

Please sign in to comment.