Skip to content

Commit a4de7e2

Browse files
committed
De-functionfy table names
1 parent 9b23051 commit a4de7e2

File tree

20 files changed

+217
-213
lines changed

20 files changed

+217
-213
lines changed

Stores/MySQL/Cleipnir.ResilientFunctions.MySQL.Tests/UtilTests/RegisterTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public override Task DeleteFailsWhenNonExpectedValueForExistingRegister()
6161
private async Task<IRegister> CreateAndInitializeRegister([CallerMemberName] string memberName = "")
6262
{
6363
var count = _syncedCounter.Increment();
64-
var underlyingRegister = new MySqlUnderlyingRegister(Sql.ConnectionString, tablePrefix: count.ToString());
64+
var underlyingRegister = new MySqlUnderlyingRegister(Sql.ConnectionString);
6565
await underlyingRegister.DropUnderlyingTable();
6666
await underlyingRegister.Initialize();
6767
return new Register(underlyingRegister);

Stores/MySQL/Cleipnir.ResilientFunctions.MySQL/MySqlEffectsStore.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public async Task Initialize()
2121
{
2222
await using var conn = await CreateConnection();
2323
_initializeSql ??= @$"
24-
CREATE TABLE IF NOT EXISTS {_tablePrefix}rfunction_effects (
24+
CREATE TABLE IF NOT EXISTS {_tablePrefix}_effects (
2525
id VARCHAR(450) PRIMARY KEY,
2626
status INT NOT NULL,
2727
result TEXT NULL,
@@ -35,7 +35,7 @@ exception TEXT NULL
3535
public async Task Truncate()
3636
{
3737
await using var conn = await CreateConnection();
38-
_truncateSql ??= $"TRUNCATE TABLE {_tablePrefix}rfunction_effects";
38+
_truncateSql ??= $"TRUNCATE TABLE {_tablePrefix}_effects";
3939
var command = new MySqlCommand(_truncateSql, conn);
4040
await command.ExecuteNonQueryAsync();
4141
}
@@ -46,7 +46,7 @@ public async Task SetEffectResult(FunctionId functionId, StoredEffect storedEffe
4646
var (functionTypeId, functionInstanceId) = functionId;
4747
await using var conn = await CreateConnection();
4848
_setEffectResultSql ??= $@"
49-
INSERT INTO {_tablePrefix}rfunction_effects
49+
INSERT INTO {_tablePrefix}_effects
5050
(id, status, result, exception)
5151
VALUES
5252
(?, ?, ?, ?)
@@ -73,7 +73,7 @@ public async Task<IEnumerable<StoredEffect>> GetEffectResults(FunctionId functio
7373
await using var conn = await CreateConnection();
7474
_getEffectResultsSql ??= @$"
7575
SELECT id, status, result, exception
76-
FROM {_tablePrefix}rfunction_effects
76+
FROM {_tablePrefix}_effects
7777
WHERE id LIKE ?";
7878
await using var command = new MySqlCommand(_getEffectResultsSql, conn)
7979
{
@@ -103,7 +103,7 @@ public async Task<IEnumerable<StoredEffect>> GetEffectResults(FunctionId functio
103103
public async Task DeleteEffectResult(FunctionId functionId, EffectId effectId)
104104
{
105105
await using var conn = await CreateConnection();
106-
_deleteEffectResultSql ??= $"DELETE FROM {_tablePrefix}rfunction_effects WHERE id = ?";
106+
_deleteEffectResultSql ??= $"DELETE FROM {_tablePrefix}_effects WHERE id = ?";
107107
var id = Escaper.Escape(functionId.TypeId.Value, functionId.InstanceId.Value, effectId.Value);
108108
await using var command = new MySqlCommand(_deleteEffectResultSql, conn)
109109
{
@@ -117,7 +117,7 @@ public async Task DeleteEffectResult(FunctionId functionId, EffectId effectId)
117117
public async Task Remove(FunctionId functionId)
118118
{
119119
await using var conn = await CreateConnection();
120-
_removeSql ??= $"DELETE FROM {_tablePrefix}rfunction_effects WHERE id LIKE ?";
120+
_removeSql ??= $"DELETE FROM {_tablePrefix}_effects WHERE id LIKE ?";
121121
var id = Escaper.Escape(functionId.TypeId.Value, functionId.InstanceId.Value) + $"{Escaper.Separator}%" ;
122122
await using var command = new MySqlCommand(_removeSql, conn)
123123
{

Stores/MySQL/Cleipnir.ResilientFunctions.MySQL/MySqlFunctionStore.cs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class MySqlFunctionStore : IFunctionStore
3030

3131
public MySqlFunctionStore(string connectionString, string tablePrefix = "")
3232
{
33+
tablePrefix = tablePrefix == "" ? "rfunctions" : tablePrefix;
34+
3335
_connectionString = connectionString;
3436
_tablePrefix = tablePrefix;
3537
_messageStore = new MySqlMessageStore(connectionString, tablePrefix);
@@ -50,7 +52,7 @@ public async Task Initialize()
5052
await TimeoutStore.Initialize();
5153
await using var conn = await CreateOpenConnection(_connectionString);
5254
_initializeSql ??= $@"
53-
CREATE TABLE IF NOT EXISTS {_tablePrefix}rfunctions (
55+
CREATE TABLE IF NOT EXISTS {_tablePrefix} (
5456
function_type_id VARCHAR(200) NOT NULL,
5557
function_instance_id VARCHAR(200) NOT NULL,
5658
param_json TEXT NULL,
@@ -79,7 +81,7 @@ public async Task DropIfExists()
7981
await _timeoutStore.DropUnderlyingTable();
8082

8183
await using var conn = await CreateOpenConnection(_connectionString);
82-
_dropIfExistsSql ??= $"DROP TABLE IF EXISTS {_tablePrefix}rfunctions";
84+
_dropIfExistsSql ??= $"DROP TABLE IF EXISTS {_tablePrefix}";
8385
await using var command = new MySqlCommand(_dropIfExistsSql, conn);
8486
await command.ExecuteNonQueryAsync();
8587
}
@@ -94,7 +96,7 @@ public async Task TruncateTables()
9496
await _statesStore.Truncate();
9597

9698
await using var conn = await CreateOpenConnection(_connectionString);
97-
_truncateTablesSql ??= $"TRUNCATE TABLE {_tablePrefix}rfunctions";
99+
_truncateTablesSql ??= $"TRUNCATE TABLE {_tablePrefix}";
98100
await using var command = new MySqlCommand(_truncateTablesSql, conn);
99101
await command.ExecuteNonQueryAsync();
100102
}
@@ -111,7 +113,7 @@ public async Task<bool> CreateFunction(
111113

112114
var status = postponeUntil == null ? Status.Executing : Status.Postponed;
113115
_createFunctionSql ??= @$"
114-
INSERT IGNORE INTO {_tablePrefix}rfunctions
116+
INSERT IGNORE INTO {_tablePrefix}
115117
(function_type_id, function_instance_id, param_json, status, epoch, lease_expiration, postponed_until, timestamp)
116118
VALUES
117119
(?, ?, ?, ?, 0, ?, ?, ?)";
@@ -138,7 +140,7 @@ INSERT IGNORE INTO {_tablePrefix}rfunctions
138140
{
139141
await using var conn = await CreateOpenConnection(_connectionString);
140142
_restartExecutionSql ??= @$"
141-
UPDATE {_tablePrefix}rfunctions
143+
UPDATE {_tablePrefix}
142144
SET epoch = epoch + 1, status = {(int)Status.Executing}, lease_expiration = ?
143145
WHERE function_type_id = ? AND function_instance_id = ? AND epoch = ?;
144146
SELECT
@@ -152,7 +154,7 @@ INSERT IGNORE INTO {_tablePrefix}rfunctions
152154
lease_expiration,
153155
interrupt_count,
154156
timestamp
155-
FROM {_tablePrefix}rfunctions
157+
FROM {_tablePrefix}
156158
WHERE function_type_id = ? AND function_instance_id = ?;";
157159

158160
await using var command = new MySqlCommand(_restartExecutionSql, conn)
@@ -183,7 +185,7 @@ public async Task<bool> RenewLease(FunctionId functionId, int expectedEpoch, lon
183185
{
184186
await using var conn = await CreateOpenConnection(_connectionString);
185187
_renewLeaseSql ??= $@"
186-
UPDATE {_tablePrefix}rfunctions
188+
UPDATE {_tablePrefix}
187189
SET lease_expiration = ?
188190
WHERE function_type_id = ? AND function_instance_id = ? AND epoch = ? AND status = {(int) Status.Executing}";
189191
await using var command = new MySqlCommand(_renewLeaseSql, conn)
@@ -207,7 +209,7 @@ public async Task<IEnumerable<StoredExecutingFunction>> GetCrashedFunctions(Func
207209
await using var conn = await CreateOpenConnection(_connectionString);
208210
_getCrashedFunctionsSql ??= @$"
209211
SELECT function_instance_id, epoch, lease_expiration
210-
FROM {_tablePrefix}rfunctions
212+
FROM {_tablePrefix}
211213
WHERE function_type_id = ? AND lease_expiration < ? AND status = {(int) Status.Executing}";
212214
await using var command = new MySqlCommand(_getCrashedFunctionsSql, conn)
213215
{
@@ -238,7 +240,7 @@ public async Task<IEnumerable<StoredPostponedFunction>> GetPostponedFunctions(Fu
238240
await using var conn = await CreateOpenConnection(_connectionString);
239241
_getPostponedFunctionsSql ??= @$"
240242
SELECT function_instance_id, epoch, postponed_until
241-
FROM {_tablePrefix}rfunctions
243+
FROM {_tablePrefix}
242244
WHERE function_type_id = ? AND status = {(int) Status.Postponed} AND postponed_until <= ?";
243245
await using var command = new MySqlCommand(_getPostponedFunctionsSql, conn)
244246
{
@@ -273,7 +275,7 @@ public async Task<bool> SetFunctionState(
273275
await using var conn = await CreateOpenConnection(_connectionString);
274276

275277
_setFunctionStateSql ??= $@"
276-
UPDATE {_tablePrefix}rfunctions
278+
UPDATE {_tablePrefix}
277279
SET status = ?,
278280
param_json = ?,
279281
result_json = ?,
@@ -313,7 +315,7 @@ public async Task<bool> SucceedFunction(
313315
{
314316
await using var conn = await CreateOpenConnection(_connectionString);
315317
_succeedFunctionSql ??= $@"
316-
UPDATE {_tablePrefix}rfunctions
318+
UPDATE {_tablePrefix}
317319
SET status = {(int) Status.Succeeded}, result_json = ?, default_state = ?, timestamp = ?, epoch = ?
318320
WHERE
319321
function_type_id = ? AND
@@ -349,7 +351,7 @@ public async Task<bool> PostponeFunction(
349351
{
350352
await using var conn = await CreateOpenConnection(_connectionString);
351353
_postponedFunctionSql ??= $@"
352-
UPDATE {_tablePrefix}rfunctions
354+
UPDATE {_tablePrefix}
353355
SET status = {(int) Status.Postponed}, postponed_until = ?, default_state = ?, timestamp = ?, epoch = ?
354356
WHERE
355357
function_type_id = ? AND
@@ -385,7 +387,7 @@ public async Task<bool> FailFunction(
385387
{
386388
await using var conn = await CreateOpenConnection(_connectionString);
387389
_failFunctionSql ??= $@"
388-
UPDATE {_tablePrefix}rfunctions
390+
UPDATE {_tablePrefix}
389391
SET status = {(int) Status.Failed}, exception_json = ?, default_state = ?, timestamp = ?, epoch = ?
390392
WHERE
391393
function_type_id = ? AND
@@ -422,7 +424,7 @@ public async Task<bool> SuspendFunction(
422424
await using var conn = await CreateOpenConnection(_connectionString);
423425

424426
_suspendFunctionSql ??= $@"
425-
UPDATE {_tablePrefix}rfunctions
427+
UPDATE {_tablePrefix}
426428
SET status = {(int) Status.Suspended}, default_state = ?, timestamp = ?
427429
WHERE function_type_id = ? AND
428430
function_instance_id = ? AND
@@ -451,7 +453,7 @@ public async Task SetDefaultState(FunctionId functionId, string? stateJson)
451453
{
452454
await using var conn = await CreateOpenConnection(_connectionString);
453455
_setDefaultStateSql ??= $@"
454-
UPDATE {_tablePrefix}rfunctions
456+
UPDATE {_tablePrefix}
455457
SET default_state = ?
456458
WHERE function_type_id = ? AND function_instance_id = ?";
457459
await using var command = new MySqlCommand(_setDefaultStateSql, conn)
@@ -476,7 +478,7 @@ public async Task<bool> SetParameters(
476478
await using var conn = await CreateOpenConnection(_connectionString);
477479

478480
_setParametersSql ??= $@"
479-
UPDATE {_tablePrefix}rfunctions
481+
UPDATE {_tablePrefix}
480482
SET param_json = ?,
481483
result_json = ?,
482484
epoch = epoch + 1
@@ -507,7 +509,7 @@ public async Task<bool> IncrementInterruptCount(FunctionId functionId)
507509
await using var conn = await CreateOpenConnection(_connectionString);
508510

509511
_incrementInterruptCountSql ??= $@"
510-
UPDATE {_tablePrefix}rfunctions
512+
UPDATE {_tablePrefix}
511513
SET interrupt_count = interrupt_count + 1
512514
WHERE function_type_id = ? AND function_instance_id = ? AND status = {(int) Status.Executing};";
513515

@@ -531,7 +533,7 @@ public async Task<bool> IncrementInterruptCount(FunctionId functionId)
531533

532534
_getInterruptCountSql ??= $@"
533535
SELECT interrupt_count
534-
FROM {_tablePrefix}rfunctions
536+
FROM {_tablePrefix}
535537
WHERE function_type_id = ? AND function_instance_id = ?;";
536538

537539
await using var command = new MySqlCommand(_getInterruptCountSql, conn)
@@ -552,7 +554,7 @@ SELECT interrupt_count
552554
await using var conn = await CreateOpenConnection(_connectionString);
553555
_getFunctionStatusSql ??= $@"
554556
SELECT status, epoch
555-
FROM {_tablePrefix}rfunctions
557+
FROM {_tablePrefix}
556558
WHERE function_type_id = ? AND function_instance_id = ?;";
557559
await using var command = new MySqlCommand(_getFunctionStatusSql, conn)
558560
{
@@ -591,7 +593,7 @@ SELECT interrupt_count
591593
lease_expiration,
592594
interrupt_count,
593595
timestamp
594-
FROM {_tablePrefix}rfunctions
596+
FROM {_tablePrefix}
595597
WHERE function_type_id = ? AND function_instance_id = ?;";
596598
await using var command = new MySqlCommand(_getFunctionSql, conn)
597599
{
@@ -655,7 +657,7 @@ public async Task DeleteFunction(FunctionId functionId)
655657
await using var conn = await CreateOpenConnection(_connectionString);
656658

657659
_deleteFunctionSql ??= $@"
658-
DELETE FROM {_tablePrefix}rfunctions
660+
DELETE FROM {_tablePrefix}
659661
WHERE function_type_id = ? AND function_instance_id = ?";
660662

661663
await using var command = new MySqlCommand(_deleteFunctionSql, conn)

Stores/MySQL/Cleipnir.ResilientFunctions.MySQL/MySqlMessageStore.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public async Task Initialize()
2222
{
2323
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
2424
_initializeSql ??= @$"
25-
CREATE TABLE IF NOT EXISTS {_tablePrefix}rfunctions_messages (
25+
CREATE TABLE IF NOT EXISTS {_tablePrefix}_messages (
2626
function_type_id VARCHAR(255),
2727
function_instance_id VARCHAR(255),
2828
position INT NOT NULL,
@@ -39,7 +39,7 @@ PRIMARY KEY (function_type_id, function_instance_id, position)
3939
public async Task DropUnderlyingTable()
4040
{
4141
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
42-
_dropUnderlyingTableSql ??= $"DROP TABLE IF EXISTS {_tablePrefix}rfunctions_messages";
42+
_dropUnderlyingTableSql ??= $"DROP TABLE IF EXISTS {_tablePrefix}_messages";
4343
await using var command = new MySqlCommand(_dropUnderlyingTableSql, conn);
4444
await command.ExecuteNonQueryAsync();
4545
}
@@ -48,7 +48,7 @@ public async Task DropUnderlyingTable()
4848
public async Task TruncateTable()
4949
{
5050
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);;
51-
_truncateTableSql ??= $"TRUNCATE TABLE {_tablePrefix}rfunctions_messages;";
51+
_truncateTableSql ??= $"TRUNCATE TABLE {_tablePrefix}_messages;";
5252
var command = new MySqlCommand(_truncateTableSql, conn);
5353
await command.ExecuteNonQueryAsync();
5454
}
@@ -65,15 +65,15 @@ public async Task<FunctionStatus> AppendMessage(FunctionId functionId, StoredMes
6565
var lockName = functionId.ToString().GenerateSHA256Hash();
6666
_appendMessageSql ??= @$"
6767
SELECT GET_LOCK(?, 10);
68-
INSERT INTO {_tablePrefix}rfunctions_messages
68+
INSERT INTO {_tablePrefix}_messages
6969
(function_type_id, function_instance_id, position, message_json, message_type, idempotency_key)
7070
SELECT ?, ?, COALESCE(MAX(position), -1) + 1, ?, ?, ?
71-
FROM {_tablePrefix}rfunctions_messages
71+
FROM {_tablePrefix}_messages
7272
WHERE function_type_id = ? AND function_instance_id = ?;
7373
SELECT RELEASE_LOCK(?);
7474
7575
SELECT epoch, status
76-
FROM {_tablePrefix}rfunctions
76+
FROM {_tablePrefix}
7777
WHERE function_type_id = ? AND function_instance_id = ?;";
7878

7979
await using var command = new MySqlCommand(_appendMessageSql, conn)
@@ -122,7 +122,7 @@ public async Task<bool> ReplaceMessage(FunctionId functionId, int position, Stor
122122
var (messageJson, messageType, idempotencyKey) = storedMessage;
123123

124124
_replaceMessageSql ??= @$"
125-
UPDATE {_tablePrefix}rfunctions_messages
125+
UPDATE {_tablePrefix}_messages
126126
SET message_json = ?, message_type = ?, idempotency_key = ?
127127
WHERE function_type_id = ? AND function_instance_id = ? AND position = ?";
128128
await using var command = new MySqlCommand(_replaceMessageSql, conn)
@@ -146,7 +146,7 @@ public async Task Truncate(FunctionId functionId)
146146
{
147147
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
148148
_truncateSql ??= @$"
149-
DELETE FROM {_tablePrefix}rfunctions_messages
149+
DELETE FROM {_tablePrefix}_messages
150150
WHERE function_type_id = ? AND function_instance_id = ?";
151151

152152
await using var command = new MySqlCommand(_truncateSql, conn);
@@ -162,7 +162,7 @@ public async Task<IReadOnlyList<StoredMessage>> GetMessages(FunctionId functionI
162162
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
163163
_getMessagesSql ??= @$"
164164
SELECT message_json, message_type, idempotency_key
165-
FROM {_tablePrefix}rfunctions_messages
165+
FROM {_tablePrefix}_messages
166166
WHERE function_type_id = ? AND function_instance_id = ? AND position >= ?
167167
ORDER BY position ASC;";
168168
await using var command = new MySqlCommand(_getMessagesSql, conn)
@@ -194,7 +194,7 @@ public async Task<bool> HasMoreMessages(FunctionId functionId, int skip)
194194
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
195195
_hasMoreMessagesSql ??= @$"
196196
SELECT COALESCE(MAX(position), -1)
197-
FROM {_tablePrefix}rfunctions_messages
197+
FROM {_tablePrefix}_messages
198198
WHERE function_type_id = ? AND function_instance_id = ?;";
199199
await using var command = new MySqlCommand(_hasMoreMessagesSql, conn)
200200
{

0 commit comments

Comments
 (0)