Skip to content

Commit cbc7a37

Browse files
authored
Fix deadlock between remove and pollFirstEntry (#123)
* Fix deadlock between remove and pollFirstEntry Resynchronize getURLs critical part Added entrySetView in ConcurrentOrderedMap (used in llistURLs and countURLs) Signed-off-by: Laurent Klock <Laurent.Klock@arhs-cube.com> * Revert changes specific to local setup Signed-off-by: Laurent Klock <Laurent.Klock@arhs-cube.com> --------- Signed-off-by: Laurent Klock <Laurent.Klock@arhs-cube.com>
1 parent 46dcbc6 commit cbc7a37

File tree

6 files changed

+133
-125
lines changed

6 files changed

+133
-125
lines changed

service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -638,20 +638,22 @@ public void getURLs(GetParams request, StreamObserver<URLInfo> responseObserver)
638638
final QueueInterface currentQueue;
639639
final QueueWithinCrawl currentCrawlQueue;
640640

641-
Entry<QueueWithinCrawl, QueueInterface> e = getQueues().firstEntry();
642-
currentQueue = e.getValue();
643-
currentCrawlQueue = e.getKey();
644-
645-
// to make sure we don't loop over the ones we already processed
646-
if (firstCrawlQueue == null) {
647-
firstCrawlQueue = currentCrawlQueue;
648-
} else if (firstCrawlQueue.equals(currentCrawlQueue)) {
649-
break;
650-
}
641+
synchronized (getQueues()) {
642+
Entry<QueueWithinCrawl, QueueInterface> e = getQueues().firstEntry();
643+
currentQueue = e.getValue();
644+
currentCrawlQueue = e.getKey();
645+
646+
// to make sure we don't loop over the ones we already processed
647+
if (firstCrawlQueue == null) {
648+
firstCrawlQueue = currentCrawlQueue;
649+
} else if (firstCrawlQueue.equals(currentCrawlQueue)) {
650+
break;
651+
}
651652

652-
// We remove the entry and put it at the end of the map
653-
Entry<QueueWithinCrawl, QueueInterface> first = getQueues().pollFirstEntry();
654-
getQueues().put(first.getKey(), first.getValue());
653+
// We remove the entry and put it at the end of the map
654+
Entry<QueueWithinCrawl, QueueInterface> first = getQueues().pollFirstEntry();
655+
getQueues().put(first.getKey(), first.getValue());
656+
}
655657

656658
// if a crawlID has been specified make sure it matches
657659
if (crawlID != null && !currentCrawlQueue.getCrawlid().equals(crawlID)) {
@@ -916,7 +918,7 @@ public void listURLs(
916918
long sentCount = 0;
917919

918920
Iterator<Entry<QueueWithinCrawl, QueueInterface>> qiterator =
919-
getQueues().entrySet().iterator();
921+
getQueues().entrySetView().iterator();
920922

921923
while (qiterator.hasNext() && sentCount < maxURLs) {
922924
Entry<QueueWithinCrawl, QueueInterface> e = qiterator.next();
@@ -1009,7 +1011,7 @@ public void countURLs(
10091011
long totalCount = 0;
10101012

10111013
Iterator<Entry<QueueWithinCrawl, QueueInterface>> qiterator =
1012-
getQueues().entrySet().iterator();
1014+
getQueues().entrySetView().iterator();
10131015

10141016
while (qiterator.hasNext()) {
10151017
Entry<QueueWithinCrawl, QueueInterface> e = qiterator.next();

service/src/main/java/crawlercommons/urlfrontier/service/ConcurrentInsertionOrderMap.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,11 @@ public interface ConcurrentInsertionOrderMap<K, V> extends ConcurrentMap<K, V> {
4343
*/
4444
@Override
4545
Collection<V> values();
46+
47+
/**
48+
* Returns a set view containing the mappings in this map which is not backed by the map. (Used
49+
* in listURLs and countURLs to avoid NoSuchElementException if another thread has rotated the
50+
* queue in getURLs)
51+
*/
52+
Set<Map.Entry<K, V>> entrySetView();
4653
}

service/src/main/java/crawlercommons/urlfrontier/service/ConcurrentOrderedMap.java

Lines changed: 35 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.AbstractSet;
1010
import java.util.Collection;
1111
import java.util.Iterator;
12+
import java.util.LinkedHashSet;
1213
import java.util.Map;
1314
import java.util.NoSuchElementException;
1415
import java.util.Objects;
@@ -49,10 +50,6 @@ public class ConcurrentOrderedMap<K, V> implements ConcurrentInsertionOrderMap<K
4950

5051
private final Striped<Lock> striped;
5152

52-
// Used for first entry tracking
53-
private volatile Long firstEntryOrder = null;
54-
private K firstEntryKey = null;
55-
5653
class ValueEntry {
5754
public ValueEntry(V v, long o) {
5855
this.value = v;
@@ -114,17 +111,8 @@ public V put(K key, V value) {
114111
ventry.value = value;
115112
} else {
116113
long newOrder = insertionCounter.getAndIncrement();
117-
insertionOrderMap.put(newOrder, key);
118114
valueMap.put(key, new ValueEntry(value, newOrder));
119-
120-
if (firstEntryOrder == null) {
121-
firstEntryOrder = newOrder;
122-
123-
synchronized (this) {
124-
// Update first entry key if this is the first entry
125-
firstEntryKey = key;
126-
}
127-
}
115+
insertionOrderMap.put(newOrder, key);
128116
}
129117

130118
return oldValue;
@@ -152,20 +140,6 @@ public V remove(Object key) {
152140
// Remove from insertion order map if value existed
153141
if (removed != null) {
154142
insertionOrderMap.remove(removed.order);
155-
if (removed.order == firstEntryOrder) {
156-
// Update first entry order if the removed entry was the first one
157-
try {
158-
firstEntryOrder = insertionOrderMap.firstKey();
159-
synchronized (this) {
160-
firstEntryKey = insertionOrderMap.get(firstEntryOrder);
161-
}
162-
} catch (NoSuchElementException e) {
163-
firstEntryOrder = null;
164-
synchronized (this) {
165-
firstEntryKey = null;
166-
}
167-
}
168-
}
169143
}
170144

171145
return (removed != null) ? removed.value : null;
@@ -208,6 +182,24 @@ public int size() {
208182
};
209183
}
210184

185+
@Override
186+
public Set<Map.Entry<K, V>> entrySetView() {
187+
LinkedHashSet<Map.Entry<K, V>> entrySet = new LinkedHashSet<>(valueMap.size());
188+
insertionOrderMap.forEach(
189+
(order, key) -> {
190+
ValueEntry valueEntry = valueMap.get(key);
191+
if (valueEntry != null) {
192+
entrySet.add(new AbstractMap.SimpleImmutableEntry<>(key, valueEntry.value));
193+
} else {
194+
LOG.warn(
195+
"Inconsistent state (entrySetView): key {} exists in order map but not in value map",
196+
key);
197+
}
198+
});
199+
200+
return entrySet;
201+
}
202+
211203
@Override
212204
public Set<Map.Entry<K, V>> entrySet() {
213205
return new AbstractSet<Map.Entry<K, V>>() {
@@ -271,11 +263,6 @@ public void clear() {
271263
insertionOrderMap.clear();
272264
insertionCounter.set(0);
273265

274-
firstEntryOrder = null;
275-
synchronized (this) {
276-
firstEntryKey = null;
277-
}
278-
279266
} finally {
280267
unlockAllStripes();
281268
}
@@ -369,7 +356,8 @@ public Entry<K, V> firstEntry() {
369356
return new AbstractMap.SimpleImmutableEntry<>(key, valueEntry.value);
370357
} else {
371358
LOG.error(
372-
"Inconsistent state: key {} exists in order map but not in value map", key);
359+
"Inconsistent state (firstEntry): key {} exists in order map but not in value map",
360+
key);
373361
return null;
374362
}
375363
} else {
@@ -382,49 +370,27 @@ public Entry<K, V> firstEntry() {
382370
*/
383371
public Entry<K, V> pollFirstEntry() {
384372
K key;
385-
Long order;
386-
Lock stripe = null;
387-
388-
// Synchronize to ensure atomic read of firstEntryKey
389-
synchronized (this) {
390-
order = firstEntryOrder;
391-
key = firstEntryKey;
392-
if (key == null || order == null) {
393-
return null;
394-
}
395373

396-
stripe = getStripe(key);
397-
stripe.lock();
374+
Entry<Long, K> removed = insertionOrderMap.pollFirstEntry();
375+
if (removed == null) {
376+
return null;
398377
}
399378

400-
try {
401-
Entry<Long, K> removed = insertionOrderMap.pollFirstEntry();
402-
if (removed == null) {
403-
return null;
404-
}
379+
// Get and remove from value map
380+
key = removed.getValue();
381+
382+
Lock stripe = getStripe(key);
383+
stripe.lock();
405384

406-
// Get and remove from value map
407-
ValueEntry valueEntry = valueMap.remove(removed.getValue());
385+
try {
386+
ValueEntry valueEntry = valueMap.remove(key);
408387
if (valueEntry == null) {
409388
LOG.error(
410-
"Inconsistent state: key {} exists in order map but not in value map", key);
389+
"Inconsistent state (pollFirstEntry): key {} exists in order map but not in value map",
390+
key);
411391
return null;
412392
}
413393

414-
// Update first entry tracking
415-
Entry<Long, K> newFirst = insertionOrderMap.firstEntry();
416-
if (newFirst != null) {
417-
firstEntryOrder = newFirst.getKey();
418-
synchronized (this) {
419-
firstEntryKey = newFirst.getValue();
420-
}
421-
} else {
422-
firstEntryOrder = null;
423-
synchronized (this) {
424-
firstEntryKey = null;
425-
}
426-
}
427-
428394
return new AbstractMap.SimpleImmutableEntry<>(key, valueEntry.value);
429395
} finally {
430396
stripe.unlock();
@@ -464,17 +430,8 @@ public void putAll(Map<? extends K, ? extends V> m) {
464430
ventry.value = entry.getValue();
465431
} else {
466432
long newOrder = insertionCounter.getAndIncrement();
467-
insertionOrderMap.put(newOrder, key);
468433
valueMap.put(key, new ValueEntry(entry.getValue(), newOrder));
469-
470-
if (firstEntryOrder == null) {
471-
firstEntryOrder = newOrder;
472-
473-
synchronized (this) {
474-
// Update first entry key if this is the first entry
475-
firstEntryKey = key;
476-
}
477-
}
434+
insertionOrderMap.put(newOrder, key);
478435
}
479436
}
480437
} finally {

service/src/main/java/crawlercommons/urlfrontier/service/memory/MemoryFrontierService.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public MemoryFrontierService(final Map<String, String> configuration, String hos
3838

3939
// no explicit config
4040
public MemoryFrontierService(String host, int port) {
41-
this(new HashMap<String, String>(), host, port);
41+
this(new HashMap<>(), host, port);
4242
}
4343

4444
/**
@@ -122,31 +122,33 @@ protected AckMessage.Status putURLItem(URLItem value) {
122122
QueueWithinCrawl qk = QueueWithinCrawl.get(key, iu.crawlID);
123123

124124
// get the priority queue or create one
125-
URLQueue queue = (URLQueue) getQueues().get(qk);
126-
if (queue == null) {
127-
getQueues().put(qk, new URLQueue(iu));
128-
return Status.OK;
129-
}
125+
synchronized (getQueues()) {
126+
URLQueue queue = (URLQueue) getQueues().get(qk);
127+
if (queue == null) {
128+
getQueues().put(qk, new URLQueue(iu));
129+
return Status.OK;
130+
}
130131

131-
// check whether the URL already exists
132-
if (queue.contains(iu)) {
133-
if (discovered) {
134-
putURLs_alreadyknown_count.inc();
135-
// we already discovered it - so no need for it
136-
return Status.SKIPPED;
137-
} else {
138-
// overwrite the existing version
139-
queue.remove(iu);
132+
// check whether the URL already exists
133+
if (queue.contains(iu)) {
134+
if (discovered) {
135+
putURLs_alreadyknown_count.inc();
136+
// we already discovered it - so no need for it
137+
return Status.SKIPPED;
138+
} else {
139+
// overwrite the existing version
140+
queue.remove(iu);
141+
}
140142
}
141-
}
142143

143-
// add the new item
144-
// unless it is an update and it's nextFetchDate is 0 == NEVER
145-
if (!discovered && iu.nextFetchDate == 0) {
146-
putURLs_completed_count.inc();
147-
queue.addToCompleted(iu.url);
148-
} else {
149-
queue.add(iu);
144+
// add the new item
145+
// unless it is an update and it's nextFetchDate is 0 == NEVER
146+
if (!discovered && iu.nextFetchDate == 0) {
147+
putURLs_completed_count.inc();
148+
queue.addToCompleted(iu.url);
149+
} else {
150+
queue.add(iu);
151+
}
150152
}
151153

152154
return Status.OK;

service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -699,13 +699,13 @@ protected long deleteLocalCrawl(String crawlID) {
699699
if (samePrefix) {
700700
if (startKey == null) {
701701
startKey =
702-
(prefixed_queue.getCrawlid().replaceAll("_", "%5F") + "_")
702+
(prefixed_queue.getCrawlid().replace("_", "%5F") + "_")
703703
.getBytes(StandardCharsets.UTF_8);
704704
}
705705
toDelete.add(prefixed_queue);
706706
} else if (startKey != null) {
707707
endKey =
708-
(prefixed_queue.getCrawlid().replaceAll("_", "%5F") + "_")
708+
(prefixed_queue.getCrawlid().replace("_", "%5F") + "_")
709709
.getBytes(StandardCharsets.UTF_8);
710710
break;
711711
}

0 commit comments

Comments
 (0)