diff --git a/src/Spd.Manager.ScheduleJob/Contract.cs b/src/Spd.Manager.ScheduleJob/Contract.cs index e8d6be304f..7280757c07 100644 --- a/src/Spd.Manager.ScheduleJob/Contract.cs +++ b/src/Spd.Manager.ScheduleJob/Contract.cs @@ -1,13 +1,15 @@ -using MediatR; +using MediatR; namespace Spd.Manager.ScheduleJob { public interface IScheduleJobManager { public Task Handle(RunScheduleJobSessionCommand command, CancellationToken ct); + public Task Handle(RunMonthlyInvoiceJobCommand command, CancellationToken ct); } #region run schedule job session - public record RunScheduleJobSessionCommand(Guid JobSessionId, int ConcurrentRequests) : IRequest; + public record RunScheduleJobSessionCommand(Guid JobSessionId, int ConcurrentRequests = 3) : IRequest; + public record RunMonthlyInvoiceJobCommand(Guid JobSessionId, int ConcurrentRequests = 3) : IRequest; #endregion } diff --git a/src/Spd.Manager.ScheduleJob/ScheduleJobManager.cs b/src/Spd.Manager.ScheduleJob/ScheduleJobManager.cs index e1e72ac38e..7c76dcb1a6 100644 --- a/src/Spd.Manager.ScheduleJob/ScheduleJobManager.cs +++ b/src/Spd.Manager.ScheduleJob/ScheduleJobManager.cs @@ -13,6 +13,7 @@ namespace Spd.Manager.ScheduleJob; public class ScheduleJobManager : IRequestHandler, + IRequestHandler, IScheduleJobManager { private readonly IScheduleJobSessionRepository _scheduleJobSessionRepository; @@ -31,7 +32,7 @@ public ScheduleJobManager(IScheduleJobSessionRepository scheduleJobSessionReposi _logger = logger; } - public async Task Handle(RunScheduleJobSessionCommand cmd, CancellationToken ct) + public async Task Handle(RunMonthlyInvoiceJobCommand cmd, CancellationToken ct) { ScheduleJobSessionResp? resp = await _scheduleJobSessionRepository.GetAsync(cmd.JobSessionId, ct); if (resp == null) @@ -67,33 +68,35 @@ public async Task Handle(RunScheduleJobSessionCommand cmd, CancellationTok return default; } - //generalized,waiting for mano testing - //public async Task Handle(RunScheduleJobSessionCommand cmd, CancellationToken ct) - //{ - // ScheduleJobSessionResp? resp = await _scheduleJobSessionRepository.GetAsync(cmd.JobSessionId, ct); - // if (resp == null) - // { - // throw new ApiException(HttpStatusCode.BadRequest, "The schedule job session does not exist."); - // } + public async Task Handle(RunScheduleJobSessionCommand cmd, CancellationToken ct) + { + ScheduleJobSessionResp? resp = await _scheduleJobSessionRepository.GetAsync(cmd.JobSessionId, ct); + if (resp == null) + { + throw new ApiException(HttpStatusCode.BadRequest, "The schedule job session does not exist."); + } - // Stopwatch stopwatch = Stopwatch.StartNew(); - // //request will from bcgov_schedulejob - // RunJobRequest request = new RunJobRequest - // { - // PrimaryEntityActionStr = "spd_MonthlyInvoice", - // PrimaryEntityName = "accounts", - // PrimaryEntityFilterStr = "statecode eq 0 && spd_eligibleforcreditpayment eq 100000001", - // PrimaryEntityIdName = "accountid" - // }; - // var result = await _generalizeScheduleJobRepository.RunJobsAsync(request, ct); - // stopwatch.Stop(); + Stopwatch stopwatch = Stopwatch.StartNew(); + using var cts = new CancellationTokenSource(); // no timeout, this is the resolve for mysteriously cancelled requests + //request will from bcgov_schedulejob + //filterStr should be like "statecode eq 0 and spd_eligibleforcreditpayment eq 100000001" + RunJobRequest request = new RunJobRequest + { + PrimaryTypeName = resp.PrimaryEntity, + PrimaryEntityActionStr = resp.EndPoint, + PrimaryEntityName = resp.PrimaryEntity + "s", + PrimaryEntityFilterStr = resp.FilterStr.Trim(), + PrimaryEntityIdName = resp.PrimaryEntity + "id", + }; + var result = await _generalizeScheduleJobRepository.RunJobsAsync(request, cmd.ConcurrentRequests, cts.Token); + stopwatch.Stop(); - // //update result in JobSession - // UpdateScheduleJobSessionCmd updateResultCmd = CreateUpdateScheduleJobSessionCmd(cmd.JobSessionId, result, Decimal.Round((decimal)(stopwatch.ElapsedMilliseconds / 1000), 2)); - // await _scheduleJobSessionRepository.ManageAsync(updateResultCmd, ct); + //update result in JobSession + UpdateScheduleJobSessionCmd updateResultCmd = CreateUpdateScheduleJobSessionCmd(cmd.JobSessionId, result, Decimal.Round((decimal)(stopwatch.ElapsedMilliseconds / 1000), 2)); + await _scheduleJobSessionRepository.ManageAsync(updateResultCmd, cts.Token); - // return default; - //} + return default; + } private UpdateScheduleJobSessionCmd CreateUpdateScheduleJobSessionCmd(Guid jobSessionId, IEnumerable results, decimal durationInSec) { diff --git a/src/Spd.Presentation.Dynamics/appsettings.json b/src/Spd.Presentation.Dynamics/appsettings.json index ce8a7a78a2..8b245f5cd2 100644 --- a/src/Spd.Presentation.Dynamics/appsettings.json +++ b/src/Spd.Presentation.Dynamics/appsettings.json @@ -12,7 +12,7 @@ "ScreeningAppPaymentPath": "api/crrpa/payment-secure-link", "LicensingAppPaymentPath": "api/licensing/payment-secure-link", "ScreeningOrgInvitationPath": "crrp/invitation-link-bceid", - "ScheduleJobConcurrentRequests": 3, //when the openshift call dyanmics actions, how many concurent requests can be sent out to dynamics. + "ScheduleJobConcurrentRequests": 10, //when the openshift call dyanmics actions, how many concurent requests can be sent out to dynamics. "PayBC": { "DirectRefund": { "AuthenticationSettings": { diff --git a/src/Spd.Resource.Repository.JobSchedule/GeneralizeScheduleJob/Contract.cs b/src/Spd.Resource.Repository.JobSchedule/GeneralizeScheduleJob/Contract.cs index de0809e407..ebf236e6d0 100644 --- a/src/Spd.Resource.Repository.JobSchedule/GeneralizeScheduleJob/Contract.cs +++ b/src/Spd.Resource.Repository.JobSchedule/GeneralizeScheduleJob/Contract.cs @@ -2,13 +2,14 @@ namespace Spd.Resource.Repository.JobSchedule.GeneralizeScheduleJob { public interface IGeneralizeScheduleJobRepository { - public Task> RunJobsAsync(RunJobRequest request, CancellationToken cancellationToken); + public Task> RunJobsAsync(RunJobRequest request, int ConcurrentRequests, CancellationToken cancellationToken); } public record RunJobRequest { - public String PrimaryEntityName { get; set; } - public string PrimaryEntityIdName { get; set; } + public string PrimaryTypeName { get; set; } //exp: account + public String PrimaryEntityName { get; set; } //exp: accounts + public string PrimaryEntityIdName { get; set; } //exp: accountid public string? PrimaryEntityFilterStr { get; set; } public string PrimaryEntityActionStr { get; set; } } diff --git a/src/Spd.Resource.Repository.JobSchedule/GeneralizeScheduleJob/GeneralizeScheduleJobRepository.cs b/src/Spd.Resource.Repository.JobSchedule/GeneralizeScheduleJob/GeneralizeScheduleJobRepository.cs index 10d11f20d9..b8379a3dc0 100644 --- a/src/Spd.Resource.Repository.JobSchedule/GeneralizeScheduleJob/GeneralizeScheduleJobRepository.cs +++ b/src/Spd.Resource.Repository.JobSchedule/GeneralizeScheduleJob/GeneralizeScheduleJobRepository.cs @@ -1,35 +1,99 @@ using AutoMapper; +using Microsoft.Extensions.Logging; using Microsoft.OData.Client; using Spd.Utilities.Dynamics; +using System.Reflection; namespace Spd.Resource.Repository.JobSchedule.GeneralizeScheduleJob; internal class GeneralizeScheduleJobRepository : IGeneralizeScheduleJobRepository { private readonly DynamicsContext _context; private readonly IMapper _mapper; + private readonly ILogger _logger; + public GeneralizeScheduleJobRepository(IDynamicsContextFactory ctx, - IMapper mapper) + IMapper mapper, + ILogger logger) { _context = ctx.Create(); _mapper = mapper; + this._logger = logger; } - public async Task> RunJobsAsync(RunJobRequest request, CancellationToken ct) + //public async Task> RunJobsAsync(RunJobRequest request, CancellationToken ct) + //{ + // string primaryEntityName = request.PrimaryEntityName; + // string filterStr = request.PrimaryEntityFilterStr; + // string actionStr = request.PrimaryEntityActionStr; + // string primaryEntityIdName = request.PrimaryEntityIdName; + + // var property = _context.GetType().GetProperty(primaryEntityName); + // if (property == null) throw new Exception("Property not found."); + + // dynamic query = property.GetValue(_context) as DataServiceQuery; + // query.AddQueryOption("$filter", $"{filterStr}"); + // var result = await query.ExecuteAsync(ct); + // var data = ((IEnumerable)result).ToList(); + + // using var semaphore = new SemaphoreSlim(10); // Limit to 10 concurrent requests + + // var tasks = data.Select(async a => + // { + // await semaphore.WaitAsync(); + // try + // { + // //get primary id + // var idProperty = a.GetType().GetProperty(primaryEntityIdName); + // object obj = idProperty.GetValue(a); + // string idStr = obj.ToString(); + + // //invoke method + // var method = a.GetType().GetMethod(actionStr); + // var result = method?.Invoke(a, null); + // var getValueAsyncMethod = result.GetType().GetMethod( + // "GetValueAsync", + // new[] { typeof(CancellationToken) }); + // if (getValueAsyncMethod == null) throw new Exception("GetValueAsync method not found."); + // var task = (Task)getValueAsyncMethod.Invoke(result, new object[] { ct }); + // await task.ConfigureAwait(false); + + // // If it's Task, get the result via reflection + // var resultProperty = task.GetType().GetProperty("Result"); + // var value = resultProperty?.GetValue(task); + // ResultResp rr = _mapper.Map(value); + // rr.PrimaryEntityId = Guid.Parse(idStr); + // return rr; + // } + // catch (Exception ex) + // { + // var idProperty = a.GetType().GetProperty(primaryEntityIdName); + // object obj = idProperty.GetValue(a); + // return new ResultResp { IsSuccess = false, ResultStr = ex.Message, PrimaryEntityId = Guid.Parse(obj.ToString()) }; + // } + // finally + // { + // semaphore.Release(); + // } + // }); + + // var results = await Task.WhenAll(tasks); + // return results; + //} + + public async Task> RunJobsAsync(RunJobRequest request, int concurrentRequests, CancellationToken ct) { + string primaryTypeName = request.PrimaryTypeName; string primaryEntityName = request.PrimaryEntityName; - string filterStr = request.PrimaryEntityFilterStr; + string filterStr = request.PrimaryEntityFilterStr ?? string.Empty; string actionStr = request.PrimaryEntityActionStr; string primaryEntityIdName = request.PrimaryEntityIdName; - var property = _context.GetType().GetProperty(primaryEntityName); - if (property == null) throw new Exception("Property not found."); - - dynamic query = property.GetValue(_context) as DataServiceQuery; - query.AddQueryOption("$filter", $"{filterStr}"); - var result = await query.ExecuteAsync(ct); - var data = ((IEnumerable)result).ToList(); + // Resolve type dynamically + Type entityType = GetEntityTypeByName(primaryTypeName); + var data = await CallGetAllPrimaryEntityDynamicAsync(entityType, primaryEntityName, filterStr, ct); - using var semaphore = new SemaphoreSlim(10); // Limit to 10 concurrent requests + using var semaphore = new SemaphoreSlim(concurrentRequests); // Limit to 10 concurrent requests + _logger.LogDebug("{ConcurrentRequests} concurrent requests", concurrentRequests); var tasks = data.Select(async a => { @@ -44,10 +108,13 @@ public async Task> RunJobsAsync(RunJobRequest request, C //invoke method var method = a.GetType().GetMethod(actionStr); var result = method?.Invoke(a, null); + if (result == null) throw new Exception($"the {actionStr} invoked returns null"); + var getValueAsyncMethod = result.GetType().GetMethod( "GetValueAsync", new[] { typeof(CancellationToken) }); if (getValueAsyncMethod == null) throw new Exception("GetValueAsync method not found."); + var task = (Task)getValueAsyncMethod.Invoke(result, new object[] { ct }); await task.ConfigureAwait(false); @@ -56,10 +123,17 @@ public async Task> RunJobsAsync(RunJobRequest request, C var value = resultProperty?.GetValue(task); ResultResp rr = _mapper.Map(value); rr.PrimaryEntityId = Guid.Parse(idStr); + _logger.LogInformation("{actionStr} executed result : success = {Success} {Result} primaryEntityId={entityId}", actionStr, rr.IsSuccess, rr.ResultStr, idStr); return rr; } catch (Exception ex) { + Exception current = ex; + while (current != null) + { + _logger.LogError("Exception Type: {ExceptionName} \r\n Message: {Message} \r\n Stack Trace: {StackTrace}", current.GetType().Name, current.Message, current.StackTrace); + current = current.InnerException; + } var idProperty = a.GetType().GetProperty(primaryEntityIdName); object obj = idProperty.GetValue(a); return new ResultResp { IsSuccess = false, ResultStr = ex.Message, PrimaryEntityId = Guid.Parse(obj.ToString()) }; @@ -74,5 +148,88 @@ public async Task> RunJobsAsync(RunJobRequest request, C return results; } + private static Type GetEntityTypeByName(string typeName) + { + var type = Type.GetType(typeName); + if (type != null) + return type; + + type = AppDomain.CurrentDomain + .GetAssemblies() + .SelectMany(a => a.GetTypes()) + .FirstOrDefault(t => t.Name.Equals(typeName, StringComparison.OrdinalIgnoreCase)); + + if (type != null) + return type; + + throw new InvalidOperationException($"Type '{typeName}' not found."); + } + + private async Task> CallGetAllPrimaryEntityDynamicAsync( + Type entityType, + string primaryEntityName, + string filterStr, + CancellationToken ct) + { + var method = this.GetType().GetMethod( + "GetAllPrimaryEntityAsync", + BindingFlags.NonPublic | BindingFlags.Instance); + + if (method == null) + throw new InvalidOperationException("GetAllPrimaryEntityAsync method not found."); + + var genericMethod = method.MakeGenericMethod(entityType); + + var task = (Task)genericMethod.Invoke(this, new object[] { primaryEntityName, filterStr, ct }); + + await task.ConfigureAwait(false); + + var resultProperty = task.GetType().GetProperty("Result"); + var result = resultProperty.GetValue(task); + + // Convert IEnumerable → IEnumerable + var enumerable = result as System.Collections.IEnumerable; + if (enumerable == null) + throw new InvalidOperationException("The result is not enumerable."); + + var list = new List(); + foreach (var item in enumerable) + { + list.Add(item); + } + _logger.LogInformation("{Count} {Name} found", list.Count, primaryEntityName); + return list; + } + + private async Task> GetAllPrimaryEntityAsync(string primaryEntityName, string filterStr, CancellationToken ct) + { + //filterStr = "statecode eq 0 and spd_eligibleforcreditpayment eq 100000001"; + var property = _context.GetType().GetProperty(primaryEntityName); + if (property == null) throw new Exception("Property not found."); + + var query = property.GetValue(_context) as DataServiceQuery; + query = query.AddQueryOption("$filter", filterStr) + .IncludeCount(); + + var allEntities = new List(); + QueryOperationResponse response; + DataServiceQueryContinuation continuation = null; + + do + { + if (continuation == null) + { + response = (QueryOperationResponse)await query.ExecuteAsync(ct); + } + else + { + response = (QueryOperationResponse)await _context.ExecuteAsync(continuation, ct); + } + allEntities.AddRange(response); + continuation = response.GetContinuation(); + } while (continuation != null); + + return allEntities; + } } diff --git a/src/Spd.Resource.Repository.JobSchedule/ScheduleJobSession/Contract.cs b/src/Spd.Resource.Repository.JobSchedule/ScheduleJobSession/Contract.cs index bc3897b656..9a8f65b08a 100644 --- a/src/Spd.Resource.Repository.JobSchedule/ScheduleJobSession/Contract.cs +++ b/src/Spd.Resource.Repository.JobSchedule/ScheduleJobSession/Contract.cs @@ -29,7 +29,7 @@ public record ScheduleJobSessionResp() public Guid ScheduleJobId { get; set; } public string EndPoint { get; set; } public string PrimaryEntity { get; set; } - + public string FilterStr { get; set; } } public enum JobSessionStatusCode diff --git a/src/Spd.Resource.Repository.JobSchedule/ScheduleJobSession/Mappings.cs b/src/Spd.Resource.Repository.JobSchedule/ScheduleJobSession/Mappings.cs index ccec0e26a1..94d60cc134 100644 --- a/src/Spd.Resource.Repository.JobSchedule/ScheduleJobSession/Mappings.cs +++ b/src/Spd.Resource.Repository.JobSchedule/ScheduleJobSession/Mappings.cs @@ -13,6 +13,7 @@ public Mappings() .ForMember(d => d.ScheduleJobId, opt => opt.MapFrom(s => s._bcgov_schedulejobid_value)) .ForMember(d => d.PrimaryEntity, opt => opt.MapFrom(s => s.bcgov_ScheduleJobId.bcgov_primaryentity)) .ForMember(d => d.EndPoint, opt => opt.MapFrom(s => s.bcgov_ScheduleJobId.bcgov_endpoint)) + .ForMember(d => d.FilterStr, opt => opt.MapFrom(s => s.bcgov_ScheduleJobId.bcgov_fetchxml)) ; _ = CreateMap()