Skip to content

Commit 863ff76

Browse files
committed
fix: a rare deadlock with blmove and multi-db (#4568)
The bug requires lots of conditions in order to reproduce: 1. blocking operations on multiple databases 2. use of lua scripts that wake blocking transactions The bug was discovered due to a deadlock in BLMOVE but could also manifest with other commands that would "disappear" causing local starvation effects on the connections sending them. With BLMOVE it causes a global deadlock in the transaction queue in dragonfly. The fix is actually deleting a few lines of code introduced by #3260 from 6 months ago, so it is actually a long lived regression. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
1 parent 3f77407 commit 863ff76

File tree

3 files changed

+336
-9
lines changed

3 files changed

+336
-9
lines changed

src/server/engine_shard.cc

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -578,17 +578,11 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
578578
trans = nullptr;
579579

580580
if ((is_self && disarmed) || continuation_trans_->DisarmInShard(sid)) {
581-
auto bc = continuation_trans_->GetNamespace().GetBlockingController(shard_id_);
582581
if (bool keep = run(continuation_trans_, false); !keep) {
583582
// if this holds, we can remove this check altogether.
584583
DCHECK(continuation_trans_ == nullptr);
585584
continuation_trans_ = nullptr;
586585
}
587-
if (bc && bc->HasAwakedTransaction()) {
588-
// Break if there are any awakened transactions, as we must give way to them
589-
// before continuing to handle regular transactions from the queue.
590-
return;
591-
}
592586
}
593587
}
594588

@@ -630,7 +624,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
630624

631625
// If we disarmed, but didn't find ourselves in the loop, run now.
632626
if (trans && disarmed) {
633-
DCHECK(trans != head);
634627
DCHECK(trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q));
635628

636629
bool is_ooo = trans_mask & Transaction::OUT_OF_ORDER;

src/server/list_family_test.cc

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,5 +1080,255 @@ TEST_F(ListFamilyTest, ContendExpire) {
10801080
}
10811081
}
10821082

1083+
TEST_F(ListFamilyTest, LMPopInvalidSyntax) {
1084+
// Not enough arguments
1085+
auto resp = Run({"lmpop", "1", "a"});
1086+
EXPECT_THAT(resp, ErrArg("wrong number of arguments"));
1087+
1088+
// Zero keys
1089+
resp = Run({"lmpop", "0", "LEFT", "COUNT", "1"});
1090+
EXPECT_THAT(resp, ErrArg("syntax error"));
1091+
1092+
// Number of keys is not uint
1093+
resp = Run({"lmpop", "aa", "a", "LEFT"});
1094+
EXPECT_THAT(resp, ErrArg("value is not an integer or out of range"));
1095+
1096+
// Missing LEFT/RIGHT
1097+
resp = Run({"lmpop", "1", "a", "COUNT", "1"});
1098+
EXPECT_THAT(resp, ErrArg("syntax error"));
1099+
1100+
// Wrong number of keys
1101+
resp = Run({"lmpop", "1", "a", "b", "LEFT"});
1102+
EXPECT_THAT(resp, ErrArg("syntax error"));
1103+
1104+
// COUNT without number
1105+
resp = Run({"lmpop", "1", "a", "LEFT", "COUNT"});
1106+
EXPECT_THAT(resp, ErrArg("syntax error"));
1107+
1108+
// COUNT is not uint
1109+
resp = Run({"lmpop", "1", "a", "LEFT", "COUNT", "boo"});
1110+
EXPECT_THAT(resp, ErrArg("value is not an integer or out of range"));
1111+
1112+
// Too many arguments
1113+
resp = Run({"lmpop", "1", "c", "LEFT", "COUNT", "2", "foo"});
1114+
EXPECT_THAT(resp, ErrArg("syntax error"));
1115+
}
1116+
1117+
TEST_F(ListFamilyTest, LMPop) {
1118+
// All lists are empty
1119+
auto resp = Run({"lmpop", "1", "e", "LEFT"});
1120+
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
1121+
1122+
// LEFT operation
1123+
resp = Run({"lpush", "a", "a1", "a2"});
1124+
EXPECT_THAT(resp, IntArg(2));
1125+
1126+
resp = Run({"lmpop", "1", "a", "LEFT"});
1127+
EXPECT_THAT(resp, RespArray(ElementsAre("a", RespArray(ElementsAre("a2")))));
1128+
1129+
// RIGHT operation
1130+
resp = Run({"lpush", "b", "b1", "b2"});
1131+
EXPECT_THAT(resp, IntArg(2));
1132+
1133+
resp = Run({"lmpop", "1", "b", "RIGHT"});
1134+
EXPECT_THAT(resp, RespArray(ElementsAre("b", RespArray(ElementsAre("b1")))));
1135+
1136+
// COUNT > 1
1137+
resp = Run({"lpush", "c", "c1", "c2"});
1138+
EXPECT_THAT(resp, IntArg(2));
1139+
1140+
resp = Run({"lmpop", "1", "c", "RIGHT", "COUNT", "2"});
1141+
EXPECT_THAT(resp, RespArray(ElementsAre("c", RespArray(ElementsAre("c1", "c2")))));
1142+
1143+
resp = Run({"llen", "c"});
1144+
EXPECT_THAT(resp, IntArg(0));
1145+
1146+
// COUNT > number of elements in list
1147+
resp = Run({"lpush", "d", "d1", "d2"});
1148+
EXPECT_THAT(resp, IntArg(2));
1149+
1150+
resp = Run({"lmpop", "1", "d", "RIGHT", "COUNT", "3"});
1151+
EXPECT_THAT(resp, RespArray(ElementsAre("d", RespArray(ElementsAre("d1", "d2")))));
1152+
1153+
resp = Run({"llen", "d"});
1154+
EXPECT_THAT(resp, IntArg(0));
1155+
1156+
// First non-empty list is not the first list
1157+
resp = Run({"lpush", "x", "x1"});
1158+
EXPECT_THAT(resp, IntArg(1));
1159+
1160+
resp = Run({"lpush", "y", "y1"});
1161+
EXPECT_THAT(resp, IntArg(1));
1162+
1163+
resp = Run({"lmpop", "3", "empty", "x", "y", "RIGHT"});
1164+
EXPECT_THAT(resp, RespArray(ElementsAre("x", RespArray(ElementsAre("x1")))));
1165+
1166+
resp = Run({"llen", "x"});
1167+
EXPECT_THAT(resp, IntArg(0));
1168+
}
1169+
1170+
TEST_F(ListFamilyTest, LMPopMultipleElements) {
1171+
// Test removing multiple elements from left end
1172+
Run({"rpush", "list1", "a", "b", "c", "d", "e"});
1173+
auto resp = Run({"lmpop", "1", "list1", "LEFT", "COUNT", "3"});
1174+
EXPECT_THAT(resp, RespArray(ElementsAre("list1", RespArray(ElementsAre("a", "b", "c")))));
1175+
1176+
resp = Run({"lrange", "list1", "0", "-1"});
1177+
EXPECT_THAT(resp.GetVec(), ElementsAre("d", "e"));
1178+
1179+
// Test removing multiple elements from right end
1180+
Run({"rpush", "list2", "v", "w", "x", "y", "z"});
1181+
resp = Run({"lmpop", "1", "list2", "RIGHT", "COUNT", "2"});
1182+
EXPECT_THAT(resp, RespArray(ElementsAre("list2", RespArray(ElementsAre("z", "y")))));
1183+
1184+
resp = Run({"lrange", "list2", "0", "-1"});
1185+
EXPECT_THAT(resp.GetVec(), ElementsAre("v", "w", "x"));
1186+
}
1187+
1188+
TEST_F(ListFamilyTest, LMPopMultipleLists) {
1189+
// Test finding first non-empty list
1190+
Run({"rpush", "list1", "a", "b"});
1191+
Run({"rpush", "list2", "c", "d"});
1192+
Run({"rpush", "list3", "e", "f"});
1193+
1194+
// Pop from first non-empty list
1195+
auto resp = Run({"lmpop", "3", "list1", "list2", "list3", "LEFT"});
1196+
EXPECT_THAT(resp, RespArray(ElementsAre("list1", RespArray(ElementsAre("a")))));
1197+
1198+
// Pop from second list after first becomes empty
1199+
Run({"lmpop", "1", "list1", "LEFT"}); // Empty list1
1200+
resp = Run({"lmpop", "3", "list1", "list2", "list3", "RIGHT", "COUNT", "2"});
1201+
EXPECT_THAT(resp, RespArray(ElementsAre("list2", RespArray(ElementsAre("d", "c")))));
1202+
1203+
// Verify third list remains untouched
1204+
resp = Run({"lrange", "list3", "0", "-1"});
1205+
EXPECT_THAT(resp.GetVec(), ElementsAre("e", "f"));
1206+
}
1207+
1208+
TEST_F(ListFamilyTest, LMPopEdgeCases) {
1209+
// Test with empty list
1210+
Run({"rpush", "empty_list", "a"});
1211+
Run({"lpop", "empty_list"});
1212+
auto resp = Run({"lmpop", "1", "empty_list", "LEFT"});
1213+
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
1214+
1215+
// Test with non-existent list
1216+
resp = Run({"lmpop", "1", "nonexistent", "LEFT"});
1217+
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
1218+
1219+
// Test with wrong type key
1220+
Run({"set", "string_key", "value"});
1221+
resp = Run({"lmpop", "1", "string_key", "LEFT"});
1222+
EXPECT_THAT(resp, ErrArg("WRONGTYPE Operation against a key holding the wrong kind of value"));
1223+
1224+
// Test without COUNT parameter - should return 1 element by default
1225+
Run({"rpush", "list", "a", "b"});
1226+
resp = Run({"lmpop", "1", "list", "LEFT"});
1227+
EXPECT_THAT(resp,
1228+
RespArray(ElementsAre(
1229+
"list", RespArray(ElementsAre("a"))))); // Should return 1 element by default
1230+
1231+
// Test with COUNT = 0 - should return error
1232+
resp = Run({"lmpop", "1", "list", "LEFT", "COUNT", "0"});
1233+
EXPECT_THAT(resp, RespArray(ElementsAre("list", RespArray(ElementsAre()))));
1234+
1235+
// Test with negative COUNT - should return error
1236+
resp = Run({"lmpop", "1", "list", "LEFT", "COUNT", "-1"});
1237+
EXPECT_THAT(resp, RespArray(ElementsAre("list", RespArray(ElementsAre("b")))));
1238+
}
1239+
1240+
TEST_F(ListFamilyTest, LMPopDocExample) {
1241+
// Try to pop from non-existing lists
1242+
auto resp = Run({"LMPOP", "2", "non1", "non2", "LEFT", "COUNT", "10"});
1243+
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
1244+
1245+
// Create first list and test basic pop
1246+
resp = Run({"LPUSH", "mylist", "one", "two", "three", "four", "five"});
1247+
EXPECT_THAT(resp, IntArg(5));
1248+
1249+
resp = Run({"LMPOP", "1", "mylist", "LEFT"});
1250+
EXPECT_THAT(resp, RespArray(ElementsAre("mylist", RespArray(ElementsAre("five")))));
1251+
1252+
resp = Run({"LRANGE", "mylist", "0", "-1"});
1253+
EXPECT_THAT(resp.GetVec(), ElementsAre("four", "three", "two", "one"));
1254+
1255+
// Test RIGHT pop with COUNT
1256+
resp = Run({"LMPOP", "1", "mylist", "RIGHT", "COUNT", "10"});
1257+
EXPECT_THAT(resp, RespArray(ElementsAre("mylist",
1258+
RespArray(ElementsAre("one", "two", "three", "four")))));
1259+
1260+
// Create two lists and test multi-key pop
1261+
resp = Run({"LPUSH", "mylist", "one", "two", "three", "four", "five"});
1262+
EXPECT_THAT(resp, IntArg(5));
1263+
1264+
resp = Run({"LPUSH", "mylist2", "a", "b", "c", "d", "e"});
1265+
EXPECT_THAT(resp, IntArg(5));
1266+
1267+
resp = Run({"LMPOP", "2", "mylist", "mylist2", "RIGHT", "COUNT", "3"});
1268+
EXPECT_THAT(resp,
1269+
RespArray(ElementsAre("mylist", RespArray(ElementsAre("one", "two", "three")))));
1270+
1271+
resp = Run({"LRANGE", "mylist", "0", "-1"});
1272+
EXPECT_THAT(resp.GetVec(), ElementsAre("five", "four"));
1273+
1274+
resp = Run({"LMPOP", "2", "mylist", "mylist2", "RIGHT", "COUNT", "5"});
1275+
EXPECT_THAT(resp, RespArray(ElementsAre("mylist", RespArray(ElementsAre("four", "five")))));
1276+
1277+
resp = Run({"LMPOP", "2", "mylist", "mylist2", "RIGHT", "COUNT", "10"});
1278+
EXPECT_THAT(resp,
1279+
RespArray(ElementsAre("mylist2", RespArray(ElementsAre("a", "b", "c", "d", "e")))));
1280+
1281+
// Verify both lists are now empty
1282+
resp = Run({"EXISTS", "mylist", "mylist2"});
1283+
EXPECT_THAT(resp, IntArg(0));
1284+
}
1285+
1286+
TEST_F(ListFamilyTest, LMPopWrongType) {
1287+
// Setup: create a list and a hash
1288+
Run({"lpush", "l1", "e1"});
1289+
Run({"hset", "foo", "k1", "v1"});
1290+
1291+
// Test: first key is wrong type
1292+
auto resp = Run({"lmpop", "2", "foo", "l1", "left"});
1293+
EXPECT_THAT(resp, ErrArg("WRONGTYPE Operation against a key holding the wrong kind of value"));
1294+
1295+
// Test: second key is wrong type but first doesn't exist
1296+
resp = Run({"lmpop", "2", "nonexistent", "foo", "left"});
1297+
EXPECT_THAT(resp, ErrArg("WRONGTYPE Operation against a key holding the wrong kind of value"));
1298+
1299+
// Test: second key is wrong type but first is a valid list
1300+
resp = Run({"lmpop", "2", "l1", "foo", "left"});
1301+
EXPECT_THAT(resp, RespArray(ElementsAre("l1", RespArray(ElementsAre("e1")))));
1302+
}
1303+
1304+
// Reproduce a flow that trigerred a wrong DCHECK in the transaction flow.
1305+
TEST_F(ListFamilyTest, AwakeMulti) {
1306+
auto f1 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&] {
1307+
for (unsigned i = 0; i < 100; ++i) {
1308+
Run("CONSUMER", {"blmove", "src", "dest", "LEFT", "LEFT", "0"});
1309+
};
1310+
});
1311+
auto f2 = pp_->at(1)->LaunchFiber([&] {
1312+
for (unsigned i = 0; i < 100; ++i) {
1313+
Run("PROD", {"lpush", "src", "a"});
1314+
ThisFiber::SleepFor(50us);
1315+
};
1316+
});
1317+
1318+
auto f3 = pp_->at(2)->LaunchFiber([&] {
1319+
for (unsigned i = 0; i < 100; ++i) {
1320+
Run({"multi"});
1321+
for (unsigned j = 0; j < 8; ++j) {
1322+
Run({"get", StrCat("key", j)});
1323+
};
1324+
Run({"exec"});
1325+
};
1326+
});
1327+
1328+
f1.Join();
1329+
f2.Join();
1330+
f3.Join();
1331+
}
1332+
10831333
#pragma GCC diagnostic pop
10841334
} // namespace dfly

tests/dragonfly/generic_test.py

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
import os
21
import logging
32
import pytest
43
import redis
54
import asyncio
65
from redis import asyncio as aioredis
76

87
from . import dfly_multi_test_args, dfly_args
9-
from .instance import DflyStartException
8+
from .instance import DflyInstance, DflyStartException
109
from .utility import batch_fill_data, gen_test_data, EnvironCntx
1110
from .seeder import StaticSeeder
1211

@@ -81,6 +80,91 @@ async def task2(k, n):
8180
)
8281

8382

83+
@dfly_args({"proactor_threads": 2, "num_shards": 2})
84+
async def test_blocking_multiple_dbs(async_client: aioredis.Redis, df_server: DflyInstance):
85+
active = True
86+
87+
# A task to trigger the flow that eventually looses a transaction
88+
# blmove is used to trigger a global deadlock, but we could use any
89+
# command - the effect would be - a deadlocking locally that connection
90+
async def blmove_task_loose(num):
91+
async def run(id):
92+
c = df_server.client()
93+
await c.lpush(f"key{id}", "val")
94+
while active:
95+
await c.blmove(f"key{id}", f"key{id}", 0, "LEFT", "LEFT")
96+
await asyncio.sleep(0.01)
97+
98+
tasks = []
99+
for i in range(num):
100+
tasks.append(run(i))
101+
102+
await asyncio.gather(*tasks)
103+
104+
# A task that creates continuation_trans_ by constantly timing out on
105+
# an empty set. We could probably use any 2-hop operation like rename.
106+
async def task_blocking(num):
107+
async def block(id):
108+
c = df_server.client()
109+
while active:
110+
await c.blmove(f"{{{id}}}from", f"{{{id}}}to", 0.1, "LEFT", "LEFT")
111+
112+
tasks = []
113+
for i in range(num):
114+
tasks.append(block(i))
115+
await asyncio.gather(*tasks)
116+
117+
# produce is constantly waking up consumers. It is used to trigger the
118+
# flow that creates wake ups on a differrent database in the
119+
# middle of continuation transaction.
120+
async def tasks_produce(num, iters):
121+
LPUSH_SCRIPT = """
122+
redis.call('LPUSH', KEYS[1], "val")
123+
"""
124+
125+
async def produce(id):
126+
c = df_server.client(db=1) # important to be on a different db
127+
for i in range(iters):
128+
# Must be a lua script and not multi-exec for some reason.
129+
await c.eval(LPUSH_SCRIPT, 1, f"list{{{id}}}")
130+
131+
tasks = []
132+
for i in range(num):
133+
task = asyncio.create_task(produce(i))
134+
tasks.append(task)
135+
136+
await asyncio.gather(*tasks)
137+
logging.info("Finished producing")
138+
139+
# works with producer to constantly block and wake up
140+
async def tasks_consume(num, iters):
141+
async def drain(id, iters):
142+
client = df_server.client(db=1)
143+
for _ in range(iters):
144+
await client.blmove(f"list{{{id}}}", f"sink{{{id}}}", 0, "LEFT", "LEFT")
145+
146+
tasks = []
147+
for i in range(num):
148+
task = asyncio.create_task(drain(i, iters))
149+
tasks.append(task)
150+
151+
await asyncio.gather(*tasks)
152+
logging.info("Finished consuming")
153+
154+
num_keys = 32
155+
num_iters = 200
156+
async_task1 = asyncio.create_task(blmove_task_loose(num_keys))
157+
async_task2 = asyncio.create_task(task_blocking(num_keys))
158+
logging.info("Starting tasks")
159+
await asyncio.gather(
160+
tasks_consume(num_keys, num_iters),
161+
tasks_produce(num_keys, num_iters),
162+
)
163+
logging.info("Finishing tasks")
164+
active = False
165+
await asyncio.gather(async_task1, async_task2)
166+
167+
84168
async def test_arg_from_environ_overwritten_by_cli(df_factory):
85169
with EnvironCntx(DFLY_port="6378"):
86170
with df_factory.create(port=6377):

0 commit comments

Comments
 (0)