Skip to content
6 changes: 4 additions & 2 deletions src/Spd.Manager.ScheduleJob/Contract.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
using MediatR;
using MediatR;

namespace Spd.Manager.ScheduleJob
{
public interface IScheduleJobManager
{
public Task<Unit> Handle(RunScheduleJobSessionCommand command, CancellationToken ct);
public Task<Unit> Handle(RunMonthlyInvoiceJobCommand command, CancellationToken ct);
}

#region run schedule job session
public record RunScheduleJobSessionCommand(Guid JobSessionId, int ConcurrentRequests) : IRequest<Unit>;
public record RunScheduleJobSessionCommand(Guid JobSessionId, int ConcurrentRequests = 3) : IRequest<Unit>;
public record RunMonthlyInvoiceJobCommand(Guid JobSessionId, int ConcurrentRequests = 3) : IRequest<Unit>;
#endregion
}
53 changes: 28 additions & 25 deletions src/Spd.Manager.ScheduleJob/ScheduleJobManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Spd.Manager.ScheduleJob;

public class ScheduleJobManager :
IRequestHandler<RunScheduleJobSessionCommand, Unit>,
IRequestHandler<RunMonthlyInvoiceJobCommand, Unit>,
IScheduleJobManager
{
private readonly IScheduleJobSessionRepository _scheduleJobSessionRepository;
Expand All @@ -31,7 +32,7 @@ public ScheduleJobManager(IScheduleJobSessionRepository scheduleJobSessionReposi
_logger = logger;
}

public async Task<Unit> Handle(RunScheduleJobSessionCommand cmd, CancellationToken ct)
public async Task<Unit> Handle(RunMonthlyInvoiceJobCommand cmd, CancellationToken ct)
{
ScheduleJobSessionResp? resp = await _scheduleJobSessionRepository.GetAsync(cmd.JobSessionId, ct);
if (resp == null)
Expand Down Expand Up @@ -67,33 +68,35 @@ public async Task<Unit> Handle(RunScheduleJobSessionCommand cmd, CancellationTok
return default;
}

//generalized,waiting for mano testing
//public async Task<Unit> Handle(RunScheduleJobSessionCommand cmd, CancellationToken ct)
//{
// ScheduleJobSessionResp? resp = await _scheduleJobSessionRepository.GetAsync(cmd.JobSessionId, ct);
// if (resp == null)
// {
// throw new ApiException(HttpStatusCode.BadRequest, "The schedule job session does not exist.");
// }
public async Task<Unit> Handle(RunScheduleJobSessionCommand cmd, CancellationToken ct)
{
ScheduleJobSessionResp? resp = await _scheduleJobSessionRepository.GetAsync(cmd.JobSessionId, ct);
if (resp == null)
{
throw new ApiException(HttpStatusCode.BadRequest, "The schedule job session does not exist.");
}

// Stopwatch stopwatch = Stopwatch.StartNew();
// //request will from bcgov_schedulejob
// RunJobRequest request = new RunJobRequest
// {
// PrimaryEntityActionStr = "spd_MonthlyInvoice",
// PrimaryEntityName = "accounts",
// PrimaryEntityFilterStr = "statecode eq 0 && spd_eligibleforcreditpayment eq 100000001",
// PrimaryEntityIdName = "accountid"
// };
// var result = await _generalizeScheduleJobRepository.RunJobsAsync(request, ct);
// stopwatch.Stop();
Stopwatch stopwatch = Stopwatch.StartNew();
using var cts = new CancellationTokenSource(); // no timeout, this is the resolve for mysteriously cancelled requests
//request will from bcgov_schedulejob
//filterStr should be like "statecode eq 0 and spd_eligibleforcreditpayment eq 100000001"
RunJobRequest request = new RunJobRequest
{
PrimaryTypeName = resp.PrimaryEntity,
PrimaryEntityActionStr = resp.EndPoint,
PrimaryEntityName = resp.PrimaryEntity + "s",
PrimaryEntityFilterStr = resp.FilterStr.Trim(),
PrimaryEntityIdName = resp.PrimaryEntity + "id",
};
var result = await _generalizeScheduleJobRepository.RunJobsAsync(request, cmd.ConcurrentRequests, cts.Token);
stopwatch.Stop();

// //update result in JobSession
// UpdateScheduleJobSessionCmd updateResultCmd = CreateUpdateScheduleJobSessionCmd(cmd.JobSessionId, result, Decimal.Round((decimal)(stopwatch.ElapsedMilliseconds / 1000), 2));
// await _scheduleJobSessionRepository.ManageAsync(updateResultCmd, ct);
//update result in JobSession
UpdateScheduleJobSessionCmd updateResultCmd = CreateUpdateScheduleJobSessionCmd(cmd.JobSessionId, result, Decimal.Round((decimal)(stopwatch.ElapsedMilliseconds / 1000), 2));
await _scheduleJobSessionRepository.ManageAsync(updateResultCmd, cts.Token);

// return default;
//}
return default;
}

private UpdateScheduleJobSessionCmd CreateUpdateScheduleJobSessionCmd(Guid jobSessionId, IEnumerable<ResultResp> results, decimal durationInSec)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Spd.Presentation.Dynamics/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"ScreeningAppPaymentPath": "api/crrpa/payment-secure-link",
"LicensingAppPaymentPath": "api/licensing/payment-secure-link",
"ScreeningOrgInvitationPath": "crrp/invitation-link-bceid",
"ScheduleJobConcurrentRequests": 3, //when the openshift call dyanmics actions, how many concurent requests can be sent out to dynamics.
"ScheduleJobConcurrentRequests": 10, //when the openshift call dyanmics actions, how many concurent requests can be sent out to dynamics.
"PayBC": {
"DirectRefund": {
"AuthenticationSettings": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ namespace Spd.Resource.Repository.JobSchedule.GeneralizeScheduleJob
{
public interface IGeneralizeScheduleJobRepository
{
public Task<IEnumerable<ResultResp>> RunJobsAsync(RunJobRequest request, CancellationToken cancellationToken);
public Task<IEnumerable<ResultResp>> RunJobsAsync(RunJobRequest request, int ConcurrentRequests, CancellationToken cancellationToken);
}

public record RunJobRequest
{
public String PrimaryEntityName { get; set; }
public string PrimaryEntityIdName { get; set; }
public string PrimaryTypeName { get; set; } //exp: account
public String PrimaryEntityName { get; set; } //exp: accounts
public string PrimaryEntityIdName { get; set; } //exp: accountid
public string? PrimaryEntityFilterStr { get; set; }
public string PrimaryEntityActionStr { get; set; }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,99 @@
using AutoMapper;
using Microsoft.Extensions.Logging;
using Microsoft.OData.Client;
using Spd.Utilities.Dynamics;
using System.Reflection;

namespace Spd.Resource.Repository.JobSchedule.GeneralizeScheduleJob;
internal class GeneralizeScheduleJobRepository : IGeneralizeScheduleJobRepository
{
private readonly DynamicsContext _context;
private readonly IMapper _mapper;
private readonly ILogger<IGeneralizeScheduleJobRepository> _logger;

public GeneralizeScheduleJobRepository(IDynamicsContextFactory ctx,
IMapper mapper)
IMapper mapper,
ILogger<IGeneralizeScheduleJobRepository> logger)
{
_context = ctx.Create();
_mapper = mapper;
this._logger = logger;
}

public async Task<IEnumerable<ResultResp>> RunJobsAsync(RunJobRequest request, CancellationToken ct)
//public async Task<IEnumerable<ResultResp>> RunJobsAsync(RunJobRequest request, CancellationToken ct)
//{
// string primaryEntityName = request.PrimaryEntityName;
// string filterStr = request.PrimaryEntityFilterStr;
// string actionStr = request.PrimaryEntityActionStr;
// string primaryEntityIdName = request.PrimaryEntityIdName;

// var property = _context.GetType().GetProperty(primaryEntityName);
// if (property == null) throw new Exception("Property not found.");

// dynamic query = property.GetValue(_context) as DataServiceQuery;
// query.AddQueryOption("$filter", $"{filterStr}");
// var result = await query.ExecuteAsync(ct);
// var data = ((IEnumerable<dynamic>)result).ToList();

// using var semaphore = new SemaphoreSlim(10); // Limit to 10 concurrent requests

// var tasks = data.Select(async a =>
// {
// await semaphore.WaitAsync();
// try
// {
// //get primary id
// var idProperty = a.GetType().GetProperty(primaryEntityIdName);
// object obj = idProperty.GetValue(a);
// string idStr = obj.ToString();

// //invoke method
// var method = a.GetType().GetMethod(actionStr);
// var result = method?.Invoke(a, null);
// var getValueAsyncMethod = result.GetType().GetMethod(
// "GetValueAsync",
// new[] { typeof(CancellationToken) });
// if (getValueAsyncMethod == null) throw new Exception("GetValueAsync method not found.");
// var task = (Task)getValueAsyncMethod.Invoke(result, new object[] { ct });
// await task.ConfigureAwait(false);

// // If it's Task<T>, get the result via reflection
// var resultProperty = task.GetType().GetProperty("Result");
// var value = resultProperty?.GetValue(task);
// ResultResp rr = _mapper.Map<ResultResp>(value);
// rr.PrimaryEntityId = Guid.Parse(idStr);
// return rr;
// }
// catch (Exception ex)
// {
// var idProperty = a.GetType().GetProperty(primaryEntityIdName);
// object obj = idProperty.GetValue(a);
// return new ResultResp { IsSuccess = false, ResultStr = ex.Message, PrimaryEntityId = Guid.Parse(obj.ToString()) };
// }
// finally
// {
// semaphore.Release();
// }
// });

// var results = await Task.WhenAll(tasks);
// return results;
//}

public async Task<IEnumerable<ResultResp>> RunJobsAsync(RunJobRequest request, int concurrentRequests, CancellationToken ct)
{
string primaryTypeName = request.PrimaryTypeName;
string primaryEntityName = request.PrimaryEntityName;
string filterStr = request.PrimaryEntityFilterStr;
string filterStr = request.PrimaryEntityFilterStr ?? string.Empty;
string actionStr = request.PrimaryEntityActionStr;
string primaryEntityIdName = request.PrimaryEntityIdName;

var property = _context.GetType().GetProperty(primaryEntityName);
if (property == null) throw new Exception("Property not found.");

dynamic query = property.GetValue(_context) as DataServiceQuery;
query.AddQueryOption("$filter", $"{filterStr}");
var result = await query.ExecuteAsync(ct);
var data = ((IEnumerable<dynamic>)result).ToList();
// Resolve type dynamically
Type entityType = GetEntityTypeByName(primaryTypeName);
var data = await CallGetAllPrimaryEntityDynamicAsync(entityType, primaryEntityName, filterStr, ct);

using var semaphore = new SemaphoreSlim(10); // Limit to 10 concurrent requests
using var semaphore = new SemaphoreSlim(concurrentRequests); // Limit to 10 concurrent requests
_logger.LogDebug("{ConcurrentRequests} concurrent requests", concurrentRequests);

var tasks = data.Select(async a =>
{
Expand All @@ -44,10 +108,13 @@ public async Task<IEnumerable<ResultResp>> RunJobsAsync(RunJobRequest request, C
//invoke method
var method = a.GetType().GetMethod(actionStr);
var result = method?.Invoke(a, null);
if (result == null) throw new Exception($"the {actionStr} invoked returns null");

var getValueAsyncMethod = result.GetType().GetMethod(
"GetValueAsync",
new[] { typeof(CancellationToken) });
if (getValueAsyncMethod == null) throw new Exception("GetValueAsync method not found.");

var task = (Task)getValueAsyncMethod.Invoke(result, new object[] { ct });
await task.ConfigureAwait(false);

Expand All @@ -56,10 +123,17 @@ public async Task<IEnumerable<ResultResp>> RunJobsAsync(RunJobRequest request, C
var value = resultProperty?.GetValue(task);
ResultResp rr = _mapper.Map<ResultResp>(value);
rr.PrimaryEntityId = Guid.Parse(idStr);
_logger.LogInformation("{actionStr} executed result : success = {Success} {Result} primaryEntityId={entityId}", actionStr, rr.IsSuccess, rr.ResultStr, idStr);
return rr;
}
catch (Exception ex)
{
Exception current = ex;
while (current != null)
{
_logger.LogError("Exception Type: {ExceptionName} \r\n Message: {Message} \r\n Stack Trace: {StackTrace}", current.GetType().Name, current.Message, current.StackTrace);
current = current.InnerException;
}
var idProperty = a.GetType().GetProperty(primaryEntityIdName);
object obj = idProperty.GetValue(a);
return new ResultResp { IsSuccess = false, ResultStr = ex.Message, PrimaryEntityId = Guid.Parse(obj.ToString()) };
Expand All @@ -74,5 +148,88 @@ public async Task<IEnumerable<ResultResp>> RunJobsAsync(RunJobRequest request, C
return results;
}

private static Type GetEntityTypeByName(string typeName)
{
var type = Type.GetType(typeName);
if (type != null)
return type;

type = AppDomain.CurrentDomain
.GetAssemblies()
.SelectMany(a => a.GetTypes())
.FirstOrDefault(t => t.Name.Equals(typeName, StringComparison.OrdinalIgnoreCase));

if (type != null)
return type;

throw new InvalidOperationException($"Type '{typeName}' not found.");
}

private async Task<List<object>> CallGetAllPrimaryEntityDynamicAsync(
Type entityType,
string primaryEntityName,
string filterStr,
CancellationToken ct)
{
var method = this.GetType().GetMethod(
"GetAllPrimaryEntityAsync",
BindingFlags.NonPublic | BindingFlags.Instance);

if (method == null)
throw new InvalidOperationException("GetAllPrimaryEntityAsync method not found.");

var genericMethod = method.MakeGenericMethod(entityType);

var task = (Task)genericMethod.Invoke(this, new object[] { primaryEntityName, filterStr, ct });

await task.ConfigureAwait(false);

var resultProperty = task.GetType().GetProperty("Result");
var result = resultProperty.GetValue(task);

// Convert IEnumerable<T> → IEnumerable<object>
var enumerable = result as System.Collections.IEnumerable;
if (enumerable == null)
throw new InvalidOperationException("The result is not enumerable.");

var list = new List<object>();
foreach (var item in enumerable)
{
list.Add(item);
}
_logger.LogInformation("{Count} {Name} found", list.Count, primaryEntityName);
return list;
}

private async Task<IEnumerable<T>> GetAllPrimaryEntityAsync<T>(string primaryEntityName, string filterStr, CancellationToken ct)
{
//filterStr = "statecode eq 0 and spd_eligibleforcreditpayment eq 100000001";
var property = _context.GetType().GetProperty(primaryEntityName);
if (property == null) throw new Exception("Property not found.");

var query = property.GetValue(_context) as DataServiceQuery<T>;
query = query.AddQueryOption("$filter", filterStr)
.IncludeCount();

var allEntities = new List<T>();
QueryOperationResponse<T> response;
DataServiceQueryContinuation<T> continuation = null;

do
{
if (continuation == null)
{
response = (QueryOperationResponse<T>)await query.ExecuteAsync(ct);
}
else
{
response = (QueryOperationResponse<T>)await _context.ExecuteAsync(continuation, ct);
}
allEntities.AddRange(response);
continuation = response.GetContinuation();
} while (continuation != null);

return allEntities;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public record ScheduleJobSessionResp()
public Guid ScheduleJobId { get; set; }
public string EndPoint { get; set; }
public string PrimaryEntity { get; set; }

public string FilterStr { get; set; }
}

public enum JobSessionStatusCode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public Mappings()
.ForMember(d => d.ScheduleJobId, opt => opt.MapFrom(s => s._bcgov_schedulejobid_value))
.ForMember(d => d.PrimaryEntity, opt => opt.MapFrom(s => s.bcgov_ScheduleJobId.bcgov_primaryentity))
.ForMember(d => d.EndPoint, opt => opt.MapFrom(s => s.bcgov_ScheduleJobId.bcgov_endpoint))
.ForMember(d => d.FilterStr, opt => opt.MapFrom(s => s.bcgov_ScheduleJobId.bcgov_fetchxml))
;

_ = CreateMap<UpdateScheduleJobSessionCmd, bcgov_schedulejobsession>()
Expand Down