first commit
This commit is contained in:
28
F4SD-Cockpit-ServerCore/Services/IStagedRelationService.cs
Normal file
28
F4SD-Cockpit-ServerCore/Services/IStagedRelationService.cs
Normal file
@@ -0,0 +1,28 @@
|
||||
using C4IT.FASD.Base;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace C4IT_DataHistoryProvider_Base.Services
|
||||
{
|
||||
public interface IStagedRelationService
|
||||
{
|
||||
/// <summary>
|
||||
/// Triggers search for relations to given information objects.
|
||||
/// </summary>
|
||||
cF4sdStagedSearchResultRelationTaskId StartGatheringRelatedObjects(IEnumerable<cFasdApiSearchResultEntry> relatedTo, int ageInDays);
|
||||
|
||||
/// <summary>
|
||||
/// Used after triggering search for related objects in <see cref="StartGatheringRelatedObjects"/> to retrieve the already found relations.
|
||||
/// </summary>
|
||||
/// <param name="id">Id returned by <see cref="StartGatheringRelatedObjects"/> to reference the results to a search term.</param>
|
||||
Task<cF4sdStagedSearchResultRelations> GetRelatedObjectsAsync(Guid id, CancellationToken token);
|
||||
|
||||
/// <summary>
|
||||
/// Cancels the gathering of related objects.
|
||||
/// </summary>
|
||||
/// <param name="id">Id returned by <see cref="StartGatheringRelatedObjects"/> to reference the gathering process.</param>
|
||||
void StopGatheringRelatedObjects(Guid id);
|
||||
}
|
||||
}
|
||||
168
F4SD-Cockpit-ServerCore/Services/StagedRelationService.cs
Normal file
168
F4SD-Cockpit-ServerCore/Services/StagedRelationService.cs
Normal file
@@ -0,0 +1,168 @@
|
||||
using C4IT.FASD.Base;
|
||||
using C4IT.Logging;
|
||||
using C4IT_DataHistoryProvider_Base.DataSources;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using System.Timers;
|
||||
using static C4IT.Logging.cLogManager;
|
||||
|
||||
namespace C4IT_DataHistoryProvider_Base.Services
|
||||
{
|
||||
public class StagedRelationService : IStagedRelationService
|
||||
{
|
||||
private static readonly Dictionary<Guid, GatherRelatedObjectTask> _relationTasks = new Dictionary<Guid, GatherRelatedObjectTask>();
|
||||
private readonly IEnumerable<ISearchResultRelationProvider> _providers;
|
||||
|
||||
private static readonly System.Timers.Timer _relationTaskCleanupTimer = new System.Timers.Timer(CleanupInterval);
|
||||
private const int CleanupInterval = 5 * 60 * 1000;
|
||||
private readonly TimeSpan RelationTaskExpiration = TimeSpan.FromMinutes(15);
|
||||
|
||||
public StagedRelationService(IEnumerable<ISearchResultRelationProvider> providers)
|
||||
{
|
||||
_providers = providers;
|
||||
SetupCleanupTimer();
|
||||
}
|
||||
|
||||
private void SetupCleanupTimer()
|
||||
{
|
||||
if (_relationTaskCleanupTimer.Enabled)
|
||||
return;
|
||||
|
||||
_relationTaskCleanupTimer.Interval = CleanupInterval;
|
||||
_relationTaskCleanupTimer.Elapsed += CleanUpRelationTasks;
|
||||
_relationTaskCleanupTimer.Start();
|
||||
}
|
||||
|
||||
public cF4sdStagedSearchResultRelationTaskId StartGatheringRelatedObjects(IEnumerable<cFasdApiSearchResultEntry> relatedTo, int age)
|
||||
{
|
||||
cF4sdStagedSearchResultRelationTaskId taskId = new cF4sdStagedSearchResultRelationTaskId();
|
||||
try
|
||||
{
|
||||
if (relatedTo.Count() == 0)
|
||||
return taskId;
|
||||
|
||||
GatherRelatedObjectTask relationTask = new GatherRelatedObjectTask() { RelatedObjectTasks = new List<(HashSet<enumFasdInformationClass> PendingInformationClasses, Task<cF4sdStagedSearchResultRelations> Task)>() };
|
||||
enumFasdInformationClass relatedToInformationClass = cF4sdIdentityEntry.GetFromSearchResult(relatedTo.FirstOrDefault().Type);
|
||||
|
||||
List<cF4sdIdentityEntry> identities = relatedTo.Select(relation => new cF4sdIdentityEntry() { Id = relation.id, Class = relatedToInformationClass }).ToList();
|
||||
|
||||
foreach (var collectorModule in _providers)
|
||||
{
|
||||
taskId.PendingInformationClasses.UnionWith(collectorModule.GetSupportedInformationClasses());
|
||||
|
||||
Task<cF4sdStagedSearchResultRelations> task = Task.Run(() =>
|
||||
collectorModule.GetRelationsAsync(identities, relatedToInformationClass, age, relationTask.TokenSource.Token));
|
||||
|
||||
relationTask.RelatedObjectTasks.Add((collectorModule.GetSupportedInformationClasses(), task));
|
||||
}
|
||||
|
||||
taskId.Id = Guid.NewGuid();
|
||||
_relationTasks.Add(taskId.Id, relationTask);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogException(ex);
|
||||
}
|
||||
return taskId;
|
||||
}
|
||||
|
||||
public async Task<cF4sdStagedSearchResultRelations> GetRelatedObjectsAsync(Guid id, CancellationToken token)
|
||||
{
|
||||
HashSet<enumFasdInformationClass> pendingInformationClasses = null;
|
||||
try
|
||||
{
|
||||
if (!_relationTasks.TryGetValue(id, out var gatherRelatedTask))
|
||||
{
|
||||
LogEntry($"Could find any task find related objects with Id '{id}'", LogLevels.Info);
|
||||
return null;
|
||||
}
|
||||
|
||||
IEnumerable<Task<cF4sdStagedSearchResultRelations>> gatheringTasks = gatherRelatedTask.RelatedObjectTasks.Select(relatedObjectTask => relatedObjectTask.Task);
|
||||
await Task.WhenAny(gatheringTasks);
|
||||
|
||||
lock (_relationTasks)
|
||||
{
|
||||
gatherRelatedTask.LastRetrievel = DateTime.UtcNow;
|
||||
var completedTasks = gatherRelatedTask.RelatedObjectTasks.Where(t => t.Task.IsCompleted).ToArray();
|
||||
|
||||
foreach (var completedTask in completedTasks)
|
||||
{
|
||||
gatherRelatedTask.RelatedObjectTasks.Remove(completedTask);
|
||||
}
|
||||
|
||||
pendingInformationClasses = new HashSet<enumFasdInformationClass>(gatherRelatedTask.RelatedObjectTasks.SelectMany(t => t.PendingInformationClasses));
|
||||
var relations = completedTasks.SelectMany(t => t.Task.Result.Relations ?? Enumerable.Empty<cF4sdApiSearchResultRelation>());
|
||||
return new cF4sdStagedSearchResultRelations() { Relations = relations, PendingInformationClasses = pendingInformationClasses };
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogException(ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (pendingInformationClasses?.Count == 0)
|
||||
_relationTasks.Remove(id);
|
||||
}
|
||||
|
||||
return new cF4sdStagedSearchResultRelations() { Relations = Enumerable.Empty<cF4sdApiSearchResultRelation>(), PendingInformationClasses = pendingInformationClasses };
|
||||
}
|
||||
|
||||
public void StopGatheringRelatedObjects(Guid id)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!_relationTasks.TryGetValue(id, out var task))
|
||||
return;
|
||||
|
||||
task.TokenSource.Cancel();
|
||||
_relationTasks.Remove(id);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void CleanUpRelationTasks(object sender, ElapsedEventArgs e)
|
||||
{
|
||||
try
|
||||
{
|
||||
DateTime currentTime = DateTime.UtcNow;
|
||||
|
||||
List<Guid> expiredTasks = new List<Guid>();
|
||||
lock (_relationTasks)
|
||||
{
|
||||
foreach (var relationTask in _relationTasks)
|
||||
{
|
||||
if (currentTime - relationTask.Value.LastRetrievel < RelationTaskExpiration)
|
||||
continue;
|
||||
expiredTasks.Add(relationTask.Key);
|
||||
}
|
||||
|
||||
expiredTasks.ForEach(task => _relationTasks.Remove(task));
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private class GatherRelatedObjectTask
|
||||
{
|
||||
public CancellationTokenSource TokenSource { get; set; }
|
||||
public DateTime LastRetrievel { get; set; }
|
||||
public ICollection<(HashSet<enumFasdInformationClass> PendingInformationClasses, Task<cF4sdStagedSearchResultRelations> Task)> RelatedObjectTasks { get; set; }
|
||||
|
||||
public GatherRelatedObjectTask()
|
||||
{
|
||||
TokenSource = new CancellationTokenSource();
|
||||
LastRetrievel = DateTime.UtcNow;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user