Skip to content

Commit d52789e

Browse files
authored
Merge pull request #711 from hlibman-connamara/db_name_config
Make DB Store table names configurable
2 parents 9449794 + 02d175e commit d52789e

File tree

3 files changed

+185
-52
lines changed

3 files changed

+185
-52
lines changed

config/configuration.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,6 +1048,28 @@ const (
10481048
// - A valid go time.Duration
10491049
SQLStoreConnMaxLifetime string = "SQLStoreConnMaxLifetime"
10501050

1051+
// SQLStoreMessagesTableName defines the table name for the messages table. Default is "messages".
1052+
// If you use a different table name, you must set up your database accordingly.
1053+
//
1054+
// Required: No
1055+
//
1056+
// Default: messages
1057+
//
1058+
// Valid Values:
1059+
// - A valid string
1060+
SQLStoreMessagesTableName = "SQLStoreMessagesTableName"
1061+
1062+
// SQLStoreSessionsTableName defines the table name for the messages table. Default is "sessions".
1063+
// If you use a different table name, you must set up your database accordingly.
1064+
//
1065+
// Required: No
1066+
//
1067+
// Default: sessions
1068+
//
1069+
// Valid Values:
1070+
// - A valid string
1071+
SQLStoreSessionsTableName = "SQLStoreSessionsTableName"
1072+
10511073
// MongoStoreConnection sets the MongoDB connection URL to use for message storage.
10521074
//
10531075
// See https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo#Connect for more information.

store/sql/sql_store.go

Lines changed: 85 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ import (
2727
"github.com/quickfixgo/quickfix/config"
2828
)
2929

30+
const (
31+
defaultMessagesTable = "messages"
32+
defaultSessionsTable = "sessions"
33+
)
34+
3035
type sqlStoreFactory struct {
3136
settings *quickfix.Settings
3237
}
@@ -39,6 +44,19 @@ type sqlStore struct {
3944
sqlConnMaxLifetime time.Duration
4045
db *sql.DB
4146
placeholder placeholderFunc
47+
messagesTable string
48+
sessionsTable string
49+
50+
sqlUpdateSeqNums string
51+
sqlInsertSession string
52+
sqlGetSeqNums string
53+
sqlUpdateMessage string
54+
sqlInsertMessage string
55+
sqlGetMessages string
56+
sqlUpdateSession string
57+
sqlUpdateSenderSeqNum string
58+
sqlUpdateTargetSeqNum string
59+
sqlDeleteMessages string
4260
}
4361

4462
type placeholderFunc func(int) string
@@ -88,17 +106,29 @@ func (f sqlStoreFactory) Create(sessionID quickfix.SessionID) (msgStore quickfix
88106
if err != nil {
89107
return nil, err
90108
}
109+
110+
messagesTableName := defaultMessagesTable
111+
if name, err := sessionSettings.Setting(config.SQLStoreMessagesTableName); err == nil {
112+
messagesTableName = name
113+
}
114+
115+
sessionsTableName := defaultSessionsTable
116+
if name, err := sessionSettings.Setting(config.SQLStoreSessionsTableName); err == nil {
117+
sessionsTableName = name
118+
}
119+
91120
sqlConnMaxLifetime := 0 * time.Second
92121
if sessionSettings.HasSetting(config.SQLStoreConnMaxLifetime) {
93122
sqlConnMaxLifetime, err = sessionSettings.DurationSetting(config.SQLStoreConnMaxLifetime)
94123
if err != nil {
95124
return nil, err
96125
}
97126
}
98-
return newSQLStore(sessionID, sqlDriver, sqlDataSourceName, sqlConnMaxLifetime)
127+
128+
return newSQLStore(sessionID, sqlDriver, sqlDataSourceName, messagesTableName, sessionsTableName, sqlConnMaxLifetime)
99129
}
100130

101-
func newSQLStore(sessionID quickfix.SessionID, driver string, dataSourceName string, connMaxLifetime time.Duration) (store *sqlStore, err error) {
131+
func newSQLStore(sessionID quickfix.SessionID, driver, dataSourceName, messagesTableName, sessionsTableName string, connMaxLifetime time.Duration) (store *sqlStore, err error) {
102132

103133
memStore, memErr := quickfix.NewMemoryStoreFactory().Create(sessionID)
104134
if memErr != nil {
@@ -112,6 +142,8 @@ func newSQLStore(sessionID quickfix.SessionID, driver string, dataSourceName str
112142
sqlDriver: driver,
113143
sqlDataSourceName: dataSourceName,
114144
sqlConnMaxLifetime: connMaxLifetime,
145+
messagesTable: messagesTableName,
146+
sessionsTable: sessionsTableName,
115147
}
116148
if err = store.cache.Reset(); err != nil {
117149
err = errors.Wrap(err, "cache reset")
@@ -130,20 +162,58 @@ func newSQLStore(sessionID quickfix.SessionID, driver string, dataSourceName str
130162
if err = store.db.Ping(); err != nil { // ensure immediate connection
131163
return nil, err
132164
}
165+
166+
store.setSQLStatements()
167+
133168
if err = store.populateCache(); err != nil {
134169
return nil, err
135170
}
136171

137172
return store, nil
138173
}
139174

175+
func (store *sqlStore) setSQLStatements() {
176+
idColumns := `beginstring, session_qualifier, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid`
177+
idPlaceholders := `?,?,?,?,?,?,?,?`
178+
idWhereClause := `beginstring=? AND session_qualifier=? AND sendercompid=? AND sendersubid=? AND senderlocid=? AND targetcompid=? AND targetsubid=? AND targetlocid=?`
179+
180+
store.sqlInsertMessage = fmt.Sprintf(`INSERT INTO %s (
181+
msgseqnum, message, %s) VALUES (?, ?, %s)`,
182+
store.messagesTable, idColumns, idPlaceholders)
183+
184+
store.sqlUpdateMessage = fmt.Sprintf(`UPDATE %s SET message=? WHERE %s AND msgseqnum=?`,
185+
store.messagesTable, idWhereClause)
186+
187+
store.sqlGetMessages = fmt.Sprintf(`SELECT message FROM %s WHERE %s AND msgseqnum>=? AND msgseqnum<=? ORDER BY msgseqnum`,
188+
store.messagesTable, idWhereClause)
189+
190+
store.sqlDeleteMessages = fmt.Sprintf(`DELETE FROM %s WHERE %s`,
191+
store.messagesTable, idWhereClause)
192+
193+
store.sqlInsertSession = fmt.Sprintf(`INSERT INTO %s (
194+
creation_time, incoming_seqnum, outgoing_seqnum, %s) VALUES (?, ?, ?, %s)`,
195+
store.sessionsTable, idColumns, idPlaceholders)
196+
197+
store.sqlGetSeqNums = fmt.Sprintf(`SELECT creation_time, incoming_seqnum, outgoing_seqnum FROM %s WHERE %s`,
198+
store.sessionsTable, idWhereClause)
199+
200+
store.sqlUpdateSession = fmt.Sprintf(`UPDATE %s SET creation_time=?, incoming_seqnum=?, outgoing_seqnum=? WHERE %s`,
201+
store.sessionsTable, idWhereClause)
202+
203+
store.sqlUpdateSenderSeqNum = fmt.Sprintf(`UPDATE %s SET outgoing_seqnum=? WHERE %s`,
204+
store.sessionsTable, idWhereClause)
205+
206+
store.sqlUpdateTargetSeqNum = fmt.Sprintf(`UPDATE %s SET incoming_seqnum=? WHERE %s`,
207+
store.sessionsTable, idWhereClause)
208+
209+
store.sqlUpdateSeqNums = fmt.Sprintf(`UPDATE %s SET incoming_seqnum=?, outgoing_seqnum=? WHERE %s`,
210+
store.sessionsTable, idWhereClause)
211+
}
212+
140213
// Reset deletes the store records and sets the seqnums back to 1.
141214
func (store *sqlStore) Reset() error {
142215
s := store.sessionID
143-
_, err := store.db.Exec(sqlString(`DELETE FROM messages
144-
WHERE beginstring=? AND session_qualifier=?
145-
AND sendercompid=? AND sendersubid=? AND senderlocid=?
146-
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
216+
_, err := store.db.Exec(sqlString(store.sqlDeleteMessages, store.placeholder),
147217
s.BeginString, s.Qualifier,
148218
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
149219
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
@@ -155,11 +225,7 @@ func (store *sqlStore) Reset() error {
155225
return err
156226
}
157227

158-
_, err = store.db.Exec(sqlString(`UPDATE sessions
159-
SET creation_time=?, incoming_seqnum=?, outgoing_seqnum=?
160-
WHERE beginstring=? AND session_qualifier=?
161-
AND sendercompid=? AND sendersubid=? AND senderlocid=?
162-
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
228+
_, err = store.db.Exec(sqlString(store.sqlUpdateSession, store.placeholder),
163229
store.cache.CreationTime(), store.cache.NextTargetMsgSeqNum(), store.cache.NextSenderMsgSeqNum(),
164230
s.BeginString, s.Qualifier,
165231
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
@@ -180,11 +246,7 @@ func (store *sqlStore) populateCache() error {
180246
s := store.sessionID
181247
var creationTime time.Time
182248
var incomingSeqNum, outgoingSeqNum int
183-
row := store.db.QueryRow(sqlString(`SELECT creation_time, incoming_seqnum, outgoing_seqnum
184-
FROM sessions
185-
WHERE beginstring=? AND session_qualifier=?
186-
AND sendercompid=? AND sendersubid=? AND senderlocid=?
187-
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
249+
row := store.db.QueryRow(sqlString(store.sqlGetSeqNums, store.placeholder),
188250
s.BeginString, s.Qualifier,
189251
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
190252
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
@@ -209,12 +271,7 @@ func (store *sqlStore) populateCache() error {
209271
}
210272

211273
// session record not found, create it
212-
_, err = store.db.Exec(sqlString(`INSERT INTO sessions (
213-
creation_time, incoming_seqnum, outgoing_seqnum,
214-
beginstring, session_qualifier,
215-
sendercompid, sendersubid, senderlocid,
216-
targetcompid, targetsubid, targetlocid)
217-
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder),
274+
_, err = store.db.Exec(sqlString(store.sqlInsertSession, store.placeholder),
218275
store.cache.CreationTime(),
219276
store.cache.NextTargetMsgSeqNum(),
220277
store.cache.NextSenderMsgSeqNum(),
@@ -238,10 +295,7 @@ func (store *sqlStore) NextTargetMsgSeqNum() int {
238295
// SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent.
239296
func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error {
240297
s := store.sessionID
241-
_, err := store.db.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ?
242-
WHERE beginstring=? AND session_qualifier=?
243-
AND sendercompid=? AND sendersubid=? AND senderlocid=?
244-
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
298+
_, err := store.db.Exec(sqlString(store.sqlUpdateSenderSeqNum, store.placeholder),
245299
next, s.BeginString, s.Qualifier,
246300
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
247301
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
@@ -254,10 +308,7 @@ func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error {
254308
// SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received.
255309
func (store *sqlStore) SetNextTargetMsgSeqNum(next int) error {
256310
s := store.sessionID
257-
_, err := store.db.Exec(sqlString(`UPDATE sessions SET incoming_seqnum = ?
258-
WHERE beginstring=? AND session_qualifier=?
259-
AND sendercompid=? AND sendersubid=? AND senderlocid=?
260-
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
311+
_, err := store.db.Exec(sqlString(store.sqlUpdateTargetSeqNum, store.placeholder),
261312
next, s.BeginString, s.Qualifier,
262313
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
263314
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
@@ -295,12 +346,7 @@ func (store *sqlStore) SetCreationTime(_ time.Time) {
295346
func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error {
296347
s := store.sessionID
297348

298-
_, err := store.db.Exec(sqlString(`INSERT INTO messages (
299-
msgseqnum, message,
300-
beginstring, session_qualifier,
301-
sendercompid, sendersubid, senderlocid,
302-
targetcompid, targetsubid, targetlocid)
303-
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder),
349+
_, err := store.db.Exec(sqlString(store.sqlInsertMessage, store.placeholder),
304350
seqNum, string(msg),
305351
s.BeginString, s.Qualifier,
306352
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
@@ -318,12 +364,7 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b
318364
}
319365
defer tx.Rollback()
320366

321-
_, err = tx.Exec(sqlString(`INSERT INTO messages (
322-
msgseqnum, message,
323-
beginstring, session_qualifier,
324-
sendercompid, sendersubid, senderlocid,
325-
targetcompid, targetsubid, targetlocid)
326-
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder),
367+
_, err = tx.Exec(sqlString(store.sqlInsertMessage, store.placeholder),
327368
seqNum, string(msg),
328369
s.BeginString, s.Qualifier,
329370
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
@@ -333,10 +374,7 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b
333374
}
334375

335376
next := store.cache.NextSenderMsgSeqNum() + 1
336-
_, err = tx.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ?
337-
WHERE beginstring=? AND session_qualifier=?
338-
AND sendercompid=? AND sendersubid=? AND senderlocid=?
339-
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
377+
_, err = tx.Exec(sqlString(store.sqlUpdateSenderSeqNum, store.placeholder),
340378
next, s.BeginString, s.Qualifier,
341379
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
342380
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
@@ -354,12 +392,7 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b
354392

355393
func (store *sqlStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
356394
s := store.sessionID
357-
rows, err := store.db.Query(sqlString(`SELECT message FROM messages
358-
WHERE beginstring=? AND session_qualifier=?
359-
AND sendercompid=? AND sendersubid=? AND senderlocid=?
360-
AND targetcompid=? AND targetsubid=? AND targetlocid=?
361-
AND msgseqnum>=? AND msgseqnum<=?
362-
ORDER BY msgseqnum`, store.placeholder),
395+
rows, err := store.db.Query(sqlString(store.sqlGetMessages, store.placeholder),
363396
s.BeginString, s.Qualifier,
364397
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
365398
s.TargetCompID, s.TargetSubID, s.TargetLocationID,

store/sql/sql_store_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,84 @@ func (suite *SQLStoreTestSuite) TestSqlPlaceholderReplacement() {
8181
suite.Equal("A $1 B $2 C $3", got)
8282
}
8383

84+
func (suite *SQLStoreTestSuite) TestStoreTableRenameOverride() {
85+
sqlDriver := "sqlite3"
86+
sqlDsn := path.Join(suite.sqlStoreRootPath, fmt.Sprintf("rename-override-%d.db", time.Now().UnixNano()))
87+
88+
// Create DB with original schema
89+
db, err := sql.Open(sqlDriver, sqlDsn)
90+
require.NoError(suite.T(), err)
91+
92+
ddlFnames, err := filepath.Glob(fmt.Sprintf("../../_sql/%s/*.sql", sqlDriver))
93+
require.NoError(suite.T(), err)
94+
for _, fname := range ddlFnames {
95+
sqlBytes, err := os.ReadFile(fname)
96+
require.NoError(suite.T(), err)
97+
_, err = db.Exec(string(sqlBytes))
98+
require.NoError(suite.T(), err)
99+
}
100+
101+
// Rename tables
102+
_, err = db.Exec(`ALTER TABLE sessions RENAME TO renamed_sessions`)
103+
require.NoError(suite.T(), err)
104+
_, err = db.Exec(`ALTER TABLE messages RENAME TO renamed_messages`)
105+
require.NoError(suite.T(), err)
106+
107+
// Set config to use renamed tables
108+
sessionID := quickfix.SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"}
109+
settings, err := quickfix.ParseSettings(strings.NewReader(fmt.Sprintf(`
110+
[DEFAULT]
111+
SQLStoreDriver=%s
112+
SQLStoreDataSourceName=%s
113+
SQLStoreSessionsTableName=renamed_sessions
114+
SQLStoreMessagesTableName=renamed_messages
115+
116+
[SESSION]
117+
BeginString=%s
118+
SenderCompID=%s
119+
TargetCompID=%s
120+
`, sqlDriver, sqlDsn, sessionID.BeginString, sessionID.SenderCompID, sessionID.TargetCompID)))
121+
require.NoError(suite.T(), err)
122+
123+
// Create store with renamed table config
124+
store, err := NewStoreFactory(settings).Create(sessionID)
125+
require.NoError(suite.T(), err)
126+
defer store.Close()
127+
128+
// SaveMessage + SetNextSenderMsgSeqNum
129+
msg := []byte("8=FIX.4.4\x019=12\x0135=0\x01")
130+
require.NoError(suite.T(), store.SaveMessage(1, msg))
131+
require.NoError(suite.T(), store.SetNextSenderMsgSeqNum(2))
132+
require.NoError(suite.T(), store.SetNextTargetMsgSeqNum(2))
133+
134+
// SaveMessageAndIncrNextSenderMsgSeqNum
135+
require.NoError(suite.T(), store.SaveMessageAndIncrNextSenderMsgSeqNum(2, msg))
136+
137+
// Get and check sequence numbers
138+
nextSender := store.NextSenderMsgSeqNum()
139+
suite.Equal(3, nextSender)
140+
nextTarget := store.NextTargetMsgSeqNum()
141+
suite.Equal(2, nextTarget)
142+
143+
// IterateMessages
144+
count := 0
145+
err = store.IterateMessages(1, 2, func(_ []byte) error {
146+
count++
147+
return nil
148+
})
149+
require.NoError(suite.T(), err)
150+
suite.Equal(2, count)
151+
152+
// Reset
153+
require.NoError(suite.T(), store.Reset())
154+
155+
// After reset, sequence numbers should be 1
156+
nextSender = store.NextSenderMsgSeqNum()
157+
suite.Equal(1, nextSender)
158+
nextTarget = store.NextTargetMsgSeqNum()
159+
suite.Equal(1, nextTarget)
160+
}
161+
84162
func (suite *SQLStoreTestSuite) TearDownTest() {
85163
suite.MsgStore.Close()
86164
os.RemoveAll(suite.sqlStoreRootPath)

0 commit comments

Comments
 (0)