Skip to content

Fix deadlock between remove and pollFirstEntry #123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
*/target/
.vscode/settings.json
service/localserver.bat
service/rocksdb/
Original file line number Diff line number Diff line change
Expand Up @@ -638,20 +638,22 @@ public void getURLs(GetParams request, StreamObserver<URLInfo> responseObserver)
final QueueInterface currentQueue;
final QueueWithinCrawl currentCrawlQueue;

Entry<QueueWithinCrawl, QueueInterface> e = getQueues().firstEntry();
currentQueue = e.getValue();
currentCrawlQueue = e.getKey();

// to make sure we don't loop over the ones we already processed
if (firstCrawlQueue == null) {
firstCrawlQueue = currentCrawlQueue;
} else if (firstCrawlQueue.equals(currentCrawlQueue)) {
break;
}
synchronized (getQueues()) {
Entry<QueueWithinCrawl, QueueInterface> e = getQueues().firstEntry();
currentQueue = e.getValue();
currentCrawlQueue = e.getKey();

// to make sure we don't loop over the ones we already processed
if (firstCrawlQueue == null) {
firstCrawlQueue = currentCrawlQueue;
} else if (firstCrawlQueue.equals(currentCrawlQueue)) {
break;
}

// We remove the entry and put it at the end of the map
Entry<QueueWithinCrawl, QueueInterface> first = getQueues().pollFirstEntry();
getQueues().put(first.getKey(), first.getValue());
// We remove the entry and put it at the end of the map
Entry<QueueWithinCrawl, QueueInterface> first = getQueues().pollFirstEntry();
getQueues().put(first.getKey(), first.getValue());
}

// if a crawlID has been specified make sure it matches
if (crawlID != null && !currentCrawlQueue.getCrawlid().equals(crawlID)) {
Expand Down Expand Up @@ -916,7 +918,7 @@ public void listURLs(
long sentCount = 0;

Iterator<Entry<QueueWithinCrawl, QueueInterface>> qiterator =
getQueues().entrySet().iterator();
getQueues().entrySetView().iterator();

while (qiterator.hasNext() && sentCount < maxURLs) {
Entry<QueueWithinCrawl, QueueInterface> e = qiterator.next();
Expand Down Expand Up @@ -1009,7 +1011,7 @@ public void countURLs(
long totalCount = 0;

Iterator<Entry<QueueWithinCrawl, QueueInterface>> qiterator =
getQueues().entrySet().iterator();
getQueues().entrySetView().iterator();

while (qiterator.hasNext()) {
Entry<QueueWithinCrawl, QueueInterface> e = qiterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,11 @@ public interface ConcurrentInsertionOrderMap<K, V> extends ConcurrentMap<K, V> {
*/
@Override
Collection<V> values();

/**
* Returns a set view containing the mappings in this map which is not backed by the map. (Used
* in listURLs and countURLs to avoid NoSuchElementException if another thread has rotated the
* queue in getURLs)
*/
Set<Map.Entry<K, V>> entrySetView();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.AbstractSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
Expand Down Expand Up @@ -49,10 +50,6 @@ public class ConcurrentOrderedMap<K, V> implements ConcurrentInsertionOrderMap<K

private final Striped<Lock> striped;

// Used for first entry tracking
private volatile Long firstEntryOrder = null;
private K firstEntryKey = null;

class ValueEntry {
public ValueEntry(V v, long o) {
this.value = v;
Expand Down Expand Up @@ -114,17 +111,8 @@ public V put(K key, V value) {
ventry.value = value;
} else {
long newOrder = insertionCounter.getAndIncrement();
insertionOrderMap.put(newOrder, key);
valueMap.put(key, new ValueEntry(value, newOrder));

if (firstEntryOrder == null) {
firstEntryOrder = newOrder;

synchronized (this) {
// Update first entry key if this is the first entry
firstEntryKey = key;
}
}
insertionOrderMap.put(newOrder, key);
}

return oldValue;
Expand Down Expand Up @@ -152,20 +140,6 @@ public V remove(Object key) {
// Remove from insertion order map if value existed
if (removed != null) {
insertionOrderMap.remove(removed.order);
if (removed.order == firstEntryOrder) {
// Update first entry order if the removed entry was the first one
try {
firstEntryOrder = insertionOrderMap.firstKey();
synchronized (this) {
firstEntryKey = insertionOrderMap.get(firstEntryOrder);
}
} catch (NoSuchElementException e) {
firstEntryOrder = null;
synchronized (this) {
firstEntryKey = null;
}
}
}
}

return (removed != null) ? removed.value : null;
Expand Down Expand Up @@ -208,6 +182,24 @@ public int size() {
};
}

@Override
public Set<Map.Entry<K, V>> entrySetView() {
LinkedHashSet<Map.Entry<K, V>> entrySet = new LinkedHashSet<>(valueMap.size());
insertionOrderMap.forEach(
(order, key) -> {
ValueEntry valueEntry = valueMap.get(key);
if (valueEntry != null) {
entrySet.add(new AbstractMap.SimpleImmutableEntry<>(key, valueEntry.value));
} else {
LOG.warn(
"Inconsistent state (entrySetView): key {} exists in order map but not in value map",
key);
}
});

return entrySet;
}

@Override
public Set<Map.Entry<K, V>> entrySet() {
return new AbstractSet<Map.Entry<K, V>>() {
Expand Down Expand Up @@ -271,11 +263,6 @@ public void clear() {
insertionOrderMap.clear();
insertionCounter.set(0);

firstEntryOrder = null;
synchronized (this) {
firstEntryKey = null;
}

} finally {
unlockAllStripes();
}
Expand Down Expand Up @@ -369,7 +356,8 @@ public Entry<K, V> firstEntry() {
return new AbstractMap.SimpleImmutableEntry<>(key, valueEntry.value);
} else {
LOG.error(
"Inconsistent state: key {} exists in order map but not in value map", key);
"Inconsistent state (firstEntry): key {} exists in order map but not in value map",
key);
return null;
}
} else {
Expand All @@ -382,49 +370,27 @@ public Entry<K, V> firstEntry() {
*/
public Entry<K, V> pollFirstEntry() {
K key;
Long order;
Lock stripe = null;

// Synchronize to ensure atomic read of firstEntryKey
synchronized (this) {
order = firstEntryOrder;
key = firstEntryKey;
if (key == null || order == null) {
return null;
}

stripe = getStripe(key);
stripe.lock();
Entry<Long, K> removed = insertionOrderMap.pollFirstEntry();
if (removed == null) {
return null;
}

try {
Entry<Long, K> removed = insertionOrderMap.pollFirstEntry();
if (removed == null) {
return null;
}
// Get and remove from value map
key = removed.getValue();

Lock stripe = getStripe(key);
stripe.lock();

// Get and remove from value map
ValueEntry valueEntry = valueMap.remove(removed.getValue());
try {
ValueEntry valueEntry = valueMap.remove(key);
if (valueEntry == null) {
LOG.error(
"Inconsistent state: key {} exists in order map but not in value map", key);
"Inconsistent state (pollFirstEntry): key {} exists in order map but not in value map",
key);
return null;
}

// Update first entry tracking
Entry<Long, K> newFirst = insertionOrderMap.firstEntry();
if (newFirst != null) {
firstEntryOrder = newFirst.getKey();
synchronized (this) {
firstEntryKey = newFirst.getValue();
}
} else {
firstEntryOrder = null;
synchronized (this) {
firstEntryKey = null;
}
}

return new AbstractMap.SimpleImmutableEntry<>(key, valueEntry.value);
} finally {
stripe.unlock();
Expand Down Expand Up @@ -464,17 +430,8 @@ public void putAll(Map<? extends K, ? extends V> m) {
ventry.value = entry.getValue();
} else {
long newOrder = insertionCounter.getAndIncrement();
insertionOrderMap.put(newOrder, key);
valueMap.put(key, new ValueEntry(entry.getValue(), newOrder));

if (firstEntryOrder == null) {
firstEntryOrder = newOrder;

synchronized (this) {
// Update first entry key if this is the first entry
firstEntryKey = key;
}
}
insertionOrderMap.put(newOrder, key);
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public MemoryFrontierService(final Map<String, String> configuration, String hos

// no explicit config
public MemoryFrontierService(String host, int port) {
this(new HashMap<String, String>(), host, port);
this(new HashMap<>(), host, port);
}

/**
Expand Down Expand Up @@ -122,31 +122,33 @@ protected AckMessage.Status putURLItem(URLItem value) {
QueueWithinCrawl qk = QueueWithinCrawl.get(key, iu.crawlID);

// get the priority queue or create one
URLQueue queue = (URLQueue) getQueues().get(qk);
if (queue == null) {
getQueues().put(qk, new URLQueue(iu));
return Status.OK;
}
synchronized (getQueues()) {
URLQueue queue = (URLQueue) getQueues().get(qk);
if (queue == null) {
getQueues().put(qk, new URLQueue(iu));
return Status.OK;
}

// check whether the URL already exists
if (queue.contains(iu)) {
if (discovered) {
putURLs_alreadyknown_count.inc();
// we already discovered it - so no need for it
return Status.SKIPPED;
} else {
// overwrite the existing version
queue.remove(iu);
// check whether the URL already exists
if (queue.contains(iu)) {
if (discovered) {
putURLs_alreadyknown_count.inc();
// we already discovered it - so no need for it
return Status.SKIPPED;
} else {
// overwrite the existing version
queue.remove(iu);
}
}
}

// add the new item
// unless it is an update and it's nextFetchDate is 0 == NEVER
if (!discovered && iu.nextFetchDate == 0) {
putURLs_completed_count.inc();
queue.addToCompleted(iu.url);
} else {
queue.add(iu);
// add the new item
// unless it is an update and it's nextFetchDate is 0 == NEVER
if (!discovered && iu.nextFetchDate == 0) {
putURLs_completed_count.inc();
queue.addToCompleted(iu.url);
} else {
queue.add(iu);
}
}

return Status.OK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,13 +699,13 @@ protected long deleteLocalCrawl(String crawlID) {
if (samePrefix) {
if (startKey == null) {
startKey =
(prefixed_queue.getCrawlid().replaceAll("_", "%5F") + "_")
(prefixed_queue.getCrawlid().replace("_", "%5F") + "_")
.getBytes(StandardCharsets.UTF_8);
}
toDelete.add(prefixed_queue);
} else if (startKey != null) {
endKey =
(prefixed_queue.getCrawlid().replaceAll("_", "%5F") + "_")
(prefixed_queue.getCrawlid().replace("_", "%5F") + "_")
.getBytes(StandardCharsets.UTF_8);
break;
}
Expand Down
Loading