Skip to content

Commit 908adc2

Browse files
committed
rewire integration jobs to use sftp
1 parent 3a2ad90 commit 908adc2

File tree

31 files changed

+685
-436
lines changed

31 files changed

+685
-436
lines changed

TeachingRecordSystem/Directory.Packages.props

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,5 +115,6 @@
115115
<PackageVersion Include="xunit.v3" Version="3.1.0" />
116116
<PackageVersion Include="xunit.v3.assert" Version="3.1.0" />
117117
<PackageVersion Include="xunit.v3.extensibility.core" Version="3.1.0" />
118+
<PackageVersion Include="Azure.Storage.Files.DataLake" Version="12.16.0" />
118119
</ItemGroup>
119-
</Project>
120+
</Project>

TeachingRecordSystem/src/TeachingRecordSystem.Api/packages.lock.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2225,6 +2225,7 @@
22252225
"dependencies": {
22262226
"AngleSharp": "[1.1.2, )",
22272227
"Azure.Storage.Blobs": "[12.25.0, )",
2228+
"Azure.Storage.Files.DataLake": "[12.16.0, )",
22282229
"CloudNative.CloudEvents": "[2.8.0, )",
22292230
"CloudNative.CloudEvents.SystemTextJson": "[2.8.0, )",
22302231
"CsvHelper": "[30.1.0, )",
@@ -2340,6 +2341,17 @@
23402341
"Azure.Storage.Common": "12.24.0"
23412342
}
23422343
},
2344+
"Azure.Storage.Files.DataLake": {
2345+
"type": "CentralTransitive",
2346+
"requested": "[12.16.0, )",
2347+
"resolved": "12.16.0",
2348+
"contentHash": "D+mkKzL5diWWqGX4pDNeHOIYfv91H15BJSfSLeBCjV3mHFAhykxYDWXOTVJ0ThobJ3yI2R4SahjbVmdgd0vlyg==",
2349+
"dependencies": {
2350+
"Azure.Storage.Blobs": "12.18.0",
2351+
"Azure.Storage.Common": "12.17.0",
2352+
"System.Text.Json": "4.7.2"
2353+
}
2354+
},
23432355
"Castle.Core": {
23442356
"type": "CentralTransitive",
23452357
"requested": "[5.1.1, )",

TeachingRecordSystem/src/TeachingRecordSystem.AuthorizeAccess/packages.lock.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2510,6 +2510,7 @@
25102510
"dependencies": {
25112511
"AngleSharp": "[1.1.2, )",
25122512
"Azure.Storage.Blobs": "[12.25.0, )",
2513+
"Azure.Storage.Files.DataLake": "[12.16.0, )",
25132514
"CloudNative.CloudEvents": "[2.8.0, )",
25142515
"CloudNative.CloudEvents.SystemTextJson": "[2.8.0, )",
25152516
"CsvHelper": "[30.1.0, )",
@@ -2625,6 +2626,17 @@
26252626
"Azure.Storage.Common": "12.24.0"
26262627
}
26272628
},
2629+
"Azure.Storage.Files.DataLake": {
2630+
"type": "CentralTransitive",
2631+
"requested": "[12.16.0, )",
2632+
"resolved": "12.16.0",
2633+
"contentHash": "D+mkKzL5diWWqGX4pDNeHOIYfv91H15BJSfSLeBCjV3mHFAhykxYDWXOTVJ0ThobJ3yI2R4SahjbVmdgd0vlyg==",
2634+
"dependencies": {
2635+
"Azure.Storage.Blobs": "12.18.0",
2636+
"Azure.Storage.Common": "12.17.0",
2637+
"System.Text.Json": "4.7.2"
2638+
}
2639+
},
26282640
"CloudNative.CloudEvents": {
26292641
"type": "CentralTransitive",
26302642
"requested": "[2.8.0, )",

TeachingRecordSystem/src/TeachingRecordSystem.Cli/packages.lock.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,6 +1995,7 @@
19951995
"dependencies": {
19961996
"AngleSharp": "[1.1.2, )",
19971997
"Azure.Storage.Blobs": "[12.25.0, )",
1998+
"Azure.Storage.Files.DataLake": "[12.16.0, )",
19981999
"CloudNative.CloudEvents": "[2.8.0, )",
19992000
"CloudNative.CloudEvents.SystemTextJson": "[2.8.0, )",
20002001
"CsvHelper": "[30.1.0, )",
@@ -2063,6 +2064,17 @@
20632064
"Azure.Storage.Common": "12.24.0"
20642065
}
20652066
},
2067+
"Azure.Storage.Files.DataLake": {
2068+
"type": "CentralTransitive",
2069+
"requested": "[12.16.0, )",
2070+
"resolved": "12.16.0",
2071+
"contentHash": "D+mkKzL5diWWqGX4pDNeHOIYfv91H15BJSfSLeBCjV3mHFAhykxYDWXOTVJ0ThobJ3yI2R4SahjbVmdgd0vlyg==",
2072+
"dependencies": {
2073+
"Azure.Storage.Blobs": "12.18.0",
2074+
"Azure.Storage.Common": "12.17.0",
2075+
"System.Text.Json": "4.7.2"
2076+
}
2077+
},
20662078
"CloudNative.CloudEvents": {
20672079
"type": "CentralTransitive",
20682080
"requested": "[2.8.0, )",

TeachingRecordSystem/src/TeachingRecordSystem.Core/Extensions.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using Azure.Storage;
2+
using Azure.Storage.Files.DataLake;
13
using Hangfire;
24
using Hangfire.PostgreSql;
35
using Microsoft.Extensions.Azure;
@@ -50,6 +52,21 @@ public static IServiceCollection AddBlobStorage(this IServiceCollection services
5052
{
5153
clientBuilder.AddBlobServiceClient(configuration.GetRequiredValue("StorageConnectionString"));
5254
});
55+
56+
services.AddKeyedSingleton<DataLakeServiceClient>("sftpstorage", (sp, key) =>
57+
{
58+
var sftpAccountName = configuration.GetValue<string>("SftpStorageName");
59+
var sftpAccessKey = configuration.GetValue<string>("SftpStorageAccessKey");
60+
61+
62+
if (string.IsNullOrEmpty(sftpAccountName) || string.IsNullOrEmpty(sftpAccessKey))
63+
throw new InvalidOperationException("Invalid SFTP Storage connection string configuration.");
64+
65+
var dfsUri = new Uri($"https://{sftpAccountName}.dfs.core.windows.net");
66+
var credential = new StorageSharedKeyCredential(sftpAccountName, sftpAccessKey);
67+
68+
return new DataLakeServiceClient(dfsUri, credential);
69+
});
5370
}
5471

5572
return services;

TeachingRecordSystem/src/TeachingRecordSystem.Core/Jobs/CapitaExportAmendJob.cs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
using System.ComponentModel.DataAnnotations;
22
using System.Globalization;
33
using System.Text;
4-
using Azure.Storage.Blobs;
4+
using Azure.Storage.Files.DataLake;
55
using CsvHelper;
66
using CsvHelper.Configuration;
7+
using Microsoft.Extensions.DependencyInjection;
78
using Microsoft.Extensions.Logging;
89
using Microsoft.Extensions.Options;
910
using TeachingRecordSystem.Core.DataStore.Postgres;
1011
using TeachingRecordSystem.Core.DataStore.Postgres.Models;
1112
using TeachingRecordSystem.Core.Events.Legacy;
1213

13-
public class CapitaExportAmendJob(BlobServiceClient blobServiceClient, ILogger<CapitaExportAmendJob> logger, TrsDbContext dbContext, IClock clock, IOptions<CapitaTpsUserOption> capitaUser)
14+
public class CapitaExportAmendJob([FromKeyedServices("sftpstorage")] DataLakeServiceClient dataLakeServiceClient, ILogger<CapitaExportAmendJob> logger, TrsDbContext dbContext, IClock clock, IOptions<CapitaTpsUserOption> capitaUser)
1415
{
1516
public const string JobSchedule = "0 3 * * *";
1617
public const string LastRunDateKey = "LastRunDate";
17-
public const string StorageContainer = "dqt-integrations";
18-
public const string EXPORTS_FOLDER = "capita/exports";
18+
public const string StorageContainer = "capita-integrations";
19+
public const string ExportsFolder = "exports";
1920

2021
public async Task<long> ExecuteAsync(CancellationToken cancellationToken)
2122
{
@@ -141,19 +142,19 @@ public async Task<long> ExecuteAsync(CancellationToken cancellationToken)
141142
return integrationJob.IntegrationTransactionId;
142143
}
143144

144-
public async Task UploadFileAsync(Stream fileContentStream, string fileName)
145+
public async Task UploadFileAsync(Stream fileContentStream, string fileName, CancellationToken cancellationToken = default)
145146
{
146-
// Get the container client
147-
var containerClient = blobServiceClient!.GetBlobContainerClient(StorageContainer);
148-
await containerClient.CreateIfNotExistsAsync();
149-
150-
var targetFileName = $"{EXPORTS_FOLDER}/{fileName}";
151-
152-
// Get the blob client for the target file
153-
var blobClient = containerClient.GetBlobClient(targetFileName);
154-
155-
// Upload the stream
156-
await blobClient.UploadAsync(fileContentStream, overwrite: true);
147+
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(StorageContainer);
148+
await fileSystemClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
149+
var targetPath = $"{ExportsFolder}/{fileName}";
150+
var fileClient = fileSystemClient.GetFileClient(targetPath);
151+
await fileClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
152+
153+
await using var memory = new MemoryStream();
154+
await fileContentStream.CopyToAsync(memory, cancellationToken);
155+
memory.Position = 0;
156+
await fileClient.AppendAsync(memory, offset: 0, cancellationToken: cancellationToken);
157+
await fileClient.FlushAsync(memory.Length, cancellationToken: cancellationToken);
157158
}
158159

159160
public string GetFileName(IClock now)
@@ -330,4 +331,3 @@ public class CapitaTpsUserOption
330331
[Required]
331332
public required Guid CapitaTpsUserId { get; set; }
332333
}
333-

TeachingRecordSystem/src/TeachingRecordSystem.Core/Jobs/CapitaExportNewJob.cs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
using System.Globalization;
22
using System.Text;
3-
using Azure.Storage.Blobs;
3+
using Azure.Storage.Files.DataLake;
44
using CsvHelper;
55
using CsvHelper.Configuration;
6+
using Microsoft.Extensions.DependencyInjection;
67
using Microsoft.Extensions.Logging;
78
using TeachingRecordSystem.Core.DataStore.Postgres;
89
using TeachingRecordSystem.Core.DataStore.Postgres.Models;
910
using TeachingRecordSystem.Core.Events.Legacy;
1011

11-
public class CapitaExportNewJob(BlobServiceClient blobServiceClient, ILogger<CapitaExportNewJob> logger, TrsDbContext dbContext, IClock clock)
12+
public class CapitaExportNewJob([FromKeyedServices("sftpstorage")] DataLakeServiceClient dataLakeServiceClient, ILogger<CapitaExportNewJob> logger, TrsDbContext dbContext, IClock clock)
1213
{
1314
public const string JobSchedule = "0 3 * * *";
1415
public const string LastRunDateKey = "LastRunDate";
15-
public const string StorageContainer = "dqt-integrations";
16-
public const string EXPORTS_FOLDER = "capita/exports";
16+
public const string StorageContainer = "capita-integrations";
17+
public const string ExportsFolder = "exports";
1718

1819
public async Task<long> ExecuteAsync(CancellationToken cancellationToken)
1920
{
@@ -164,19 +165,36 @@ public async Task<long> ExecuteAsync(CancellationToken cancellationToken)
164165
return integrationJob.IntegrationTransactionId;
165166
}
166167

167-
public async Task UploadFileAsync(Stream fileContentStream, string fileName)
168+
public async Task UploadFileAsync(Stream fileContentStream, string fileName, CancellationToken cancellationToken = default)
168169
{
169170
// Get the container client
170-
var containerClient = blobServiceClient!.GetBlobContainerClient(StorageContainer);
171-
await containerClient.CreateIfNotExistsAsync();
171+
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(StorageContainer);
172+
await fileSystemClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
172173

173-
var targetFileName = $"{EXPORTS_FOLDER}/{fileName}";
174+
var targetPath = $"{ExportsFolder}/{fileName}";
175+
var fileClient = fileSystemClient.GetFileClient(targetPath);
174176

175-
// Get the blob client for the target file
176-
var blobClient = containerClient.GetBlobClient(targetFileName);
177+
await fileClient.DeleteIfExistsAsync(cancellationToken: cancellationToken);
177178

178-
// Upload the stream
179-
await blobClient.UploadAsync(fileContentStream, overwrite: true);
179+
await fileClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
180+
181+
Stream uploadStream = fileContentStream;
182+
if (!fileContentStream.CanSeek)
183+
{
184+
var memory = new MemoryStream();
185+
await fileContentStream.CopyToAsync(memory, cancellationToken);
186+
memory.Position = 0;
187+
uploadStream = memory;
188+
}
189+
190+
await fileClient.AppendAsync(uploadStream, offset: 0, cancellationToken: cancellationToken);
191+
await fileClient.FlushAsync(uploadStream.Length, cancellationToken: cancellationToken);
192+
193+
// Dispose temporary memory stream if we created one
194+
if (uploadStream != fileContentStream)
195+
{
196+
await uploadStream.DisposeAsync();
197+
}
180198
}
181199

182200
public string GetFileName(IClock now)

TeachingRecordSystem/src/TeachingRecordSystem.Core/Jobs/CapitaImportJob.cs

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,27 @@
11
using System.Globalization;
22
using System.Text;
33
using System.Text.RegularExpressions;
4-
using Azure.Storage.Blobs;
5-
using Azure.Storage.Blobs.Models;
6-
using Azure.Storage.Blobs.Specialized;
4+
using Azure.Storage.Files.DataLake;
75
using CsvHelper;
86
using CsvHelper.Configuration;
7+
using Microsoft.Extensions.DependencyInjection;
98
using Microsoft.Extensions.Logging;
109
using Microsoft.Extensions.Options;
1110
using TeachingRecordSystem.Core.DataStore.Postgres;
1211
using TeachingRecordSystem.Core.DataStore.Postgres.Models;
1312
using TeachingRecordSystem.Core.Dqt;
1413
using TeachingRecordSystem.Core.Services.PersonMatching;
1514

15+
1616
namespace TeachingRecordSystem.Core.Jobs;
1717

18-
public class CapitaImportJob(BlobServiceClient blobServiceClient, ILogger<CapitaImportJob> logger, TrsDbContext dbContext, IClock clock, IPersonMatchingService personMatchingService, IOptions<CapitaTpsUserOption> capitaUser)
18+
public class CapitaImportJob([FromKeyedServices("sftpstorage")] DataLakeServiceClient dataLakeServiceClient, ILogger<CapitaImportJob> logger, TrsDbContext dbContext, IClock clock, IPersonMatchingService personMatchingService, IOptions<CapitaTpsUserOption> capitaUser)
1919
{
2020
public const string JobSchedule = "0 4 * * *";
21-
public const string StorageContainer = "dqt-integrations";
22-
public const string PICKUP_FOLDER = "capita/pickup";
21+
public const string StorageContainer = "capita-integrations";
22+
public const string PickupFolder = "pickup";
2323
private const string ProcessedFolder = "capita/processed";
24+
public const string ArchivedContainer = "archived-integration-transactions";
2425

2526
public async Task ExecuteAsync(CancellationToken cancellationToken)
2627
{
@@ -37,29 +38,30 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
3738

3839
public async Task ArchiveFileAsync(string fileName, CancellationToken cancellationToken)
3940
{
40-
var blobContainerClient = blobServiceClient.GetBlobContainerClient(StorageContainer);
41-
var sourceBlobClient = blobContainerClient.GetBlobClient(fileName);
42-
var fileNameParts = fileName.Split("/");
43-
var fileNameWithoutFolder = $"{DateTime.Now.ToString("ddMMyyyyHHmm")}-{fileNameParts.Last()}";
44-
var targetFileName = $"{ProcessedFolder}/{fileNameWithoutFolder}";
45-
46-
// Acquire a lease to prevent another client modifying the source blob
47-
var lease = sourceBlobClient.GetBlobLeaseClient();
48-
await lease.AcquireAsync(TimeSpan.FromSeconds(60), cancellationToken: cancellationToken);
49-
50-
var targetBlobClient = blobContainerClient.GetBlobClient(targetFileName);
51-
var copyOperation = await targetBlobClient.StartCopyFromUriAsync(sourceBlobClient.Uri, cancellationToken: cancellationToken);
52-
await copyOperation.WaitForCompletionAsync();
53-
54-
// Release the lease
55-
var sourceProperties = await sourceBlobClient.GetPropertiesAsync(cancellationToken: cancellationToken);
56-
if (sourceProperties.Value.LeaseState == LeaseState.Leased)
57-
{
58-
await lease.ReleaseAsync(cancellationToken: cancellationToken);
59-
}
41+
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(StorageContainer);
42+
var arhivedFileSystemClient = dataLakeServiceClient.GetFileSystemClient(ArchivedContainer);
43+
var sourceFile = fileSystemClient.GetFileClient(fileName);
44+
45+
var fileNameParts = fileName.Split('/');
46+
var fileNameWithoutFolder = $"{DateTime.UtcNow:ddMMyyyyHHmm}-{fileNameParts.Last()}";
47+
var targetPath = $"{ProcessedFolder}/{fileNameWithoutFolder}";
48+
var targetFile = arhivedFileSystemClient.GetFileClient(targetPath);
49+
50+
await targetFile.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
51+
52+
// Read the source file
53+
var readResponse = await sourceFile.ReadAsync(cancellationToken: cancellationToken);
54+
await using var sourceStream = readResponse.Value.Content;
6055

61-
// Now remove the original blob
62-
await sourceBlobClient.DeleteAsync(DeleteSnapshotsOption.IncludeSnapshots, cancellationToken: cancellationToken);
56+
await using var memory = new MemoryStream();
57+
await sourceStream.CopyToAsync(memory, cancellationToken);
58+
memory.Position = 0;
59+
60+
await targetFile.AppendAsync(memory, offset: 0, cancellationToken: cancellationToken);
61+
await targetFile.FlushAsync(memory.Length, cancellationToken: cancellationToken);
62+
63+
// Delete the original file
64+
await sourceFile.DeleteIfExistsAsync(cancellationToken: cancellationToken);
6365
}
6466

6567
public async Task<long> ImportAsync(StreamReader reader, string fileName)
@@ -392,27 +394,26 @@ public async Task<TrnRequestMatchResult> GetPotentialMatchingPersonsAsync(Capita
392394

393395
public async Task<Stream> GetDownloadStreamAsync(string fileName)
394396
{
395-
BlobContainerClient containerClient = blobServiceClient.GetBlobContainerClient(StorageContainer);
396-
BlobClient blobClient = containerClient.GetBlobClient($"{fileName}");
397-
var streamingResult = await blobClient.DownloadStreamingAsync();
398-
return streamingResult.Value.Content;
397+
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(StorageContainer);
398+
var fileClient = fileSystemClient.GetFileClient(fileName);
399+
var readResponse = await fileClient.ReadAsync();
400+
return readResponse.Value.Content; // Stream, must be disposed by caller
399401
}
400402

401403
private async Task<string[]> GetImportFilesAsync(CancellationToken cancellationToken)
402404
{
403-
var blobContainerClient = blobServiceClient.GetBlobContainerClient(StorageContainer);
405+
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(StorageContainer);
404406
var fileNames = new List<string>();
405-
var resultSegment = blobContainerClient.GetBlobsByHierarchyAsync(prefix: PICKUP_FOLDER, delimiter: "", cancellationToken: cancellationToken).AsPages();
406-
await foreach (Azure.Page<BlobHierarchyItem> blobPage in resultSegment)
407+
408+
await foreach (var pathItem in fileSystemClient.GetPathsAsync($"{PickupFolder}/", recursive: false, cancellationToken: cancellationToken))
407409
{
408-
foreach (BlobHierarchyItem blobhierarchyItem in blobPage.Values)
410+
// Only add files, skip directories
411+
if (pathItem.IsDirectory == false)
409412
{
410-
if (blobhierarchyItem.IsBlob)
411-
{
412-
fileNames.Add(blobhierarchyItem.Blob.Name);
413-
}
413+
fileNames.Add(pathItem.Name);
414414
}
415415
}
416+
416417
return fileNames.ToArray();
417418
}
418419
}

0 commit comments

Comments
 (0)