Skip to content

Commit

Permalink
Switched from Parallel.ForEach to TPL Dataflow
Browse files Browse the repository at this point in the history
  • Loading branch information
andersosthus committed Jan 27, 2017
1 parent f45f2a9 commit 3977c52
Showing 1 changed file with 33 additions and 8 deletions.
41 changes: 33 additions & 8 deletions src/ServiceFabricUploader/Commands/ImageStore/UploadCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Microsoft.Extensions.CommandLineUtils;
using ServiceFabricUploader.Models;

Expand Down Expand Up @@ -31,27 +32,51 @@ public static void Configure(CommandLineApplication app)
});
});
}

public async Task<int> RunAsync(AppOptions appOptions, UploadCommandOptions commandConfig)
{
var connectionInfo = ConnectionInfoHelper.CreateConnectionInfo(appOptions);
var connection = connectionInfo.CreateClusterConnection();

var filesToUpload = DiscoverFilesToUploadRecursivly(commandConfig.PackageSourcePath,
commandConfig.PackageSourcePath.FullName);
commandConfig.PackageSourcePath.FullName)
.Select(x => new Tuple<FileInfo, string>(x.Key, x.Value));

var logger = new Logger(appOptions.Verbose);
var imageStore = new SfRestApi.Endpoints.ImageStore(connection, logger);
var failedUploads = new ConcurrentBag<FileInfo>();

Parallel.ForEach(filesToUpload, async (currentFile) =>
var uploadFileBlock = new TransformBlock<Tuple<FileInfo, string>, FileInfo>(async tuple =>
{
var success = await imageStore
.UploadAsync(tuple.Item1, tuple.Item2, commandConfig.PackageName)
.ConfigureAwait(false);
return success
? null
: tuple.Item1;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});

var handleFailuresBlock = new ActionBlock<FileInfo>(file =>
{
var success =
await imageStore.UploadAsync(currentFile.Key, currentFile.Value, commandConfig.PackageName).ConfigureAwait(false);
if (!success)
failedUploads.Add(currentFile.Key);
if (file != null)
failedUploads.Add(file);
});


uploadFileBlock.LinkTo(handleFailuresBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});

foreach (var file in filesToUpload)
uploadFileBlock.Post(file);

uploadFileBlock.Complete();
handleFailuresBlock.Completion.Wait();

if (!failedUploads.Any())
{
logger.Log($"Upload completed.");
Expand Down

0 comments on commit 3977c52

Please sign in to comment.