Skip to content

Commit eb733eb

Browse files
committed
fixes to resend payment received messages up to 2 months
1 parent 017d8d5 commit eb733eb

File tree

8 files changed

+78
-30
lines changed

8 files changed

+78
-30
lines changed

core/src/main/java/haveno/core/offer/OpenOfferManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2206,6 +2206,7 @@ private void startPeriodicRefreshOffersTimer() {
22062206
if (periodicRefreshOffersTimer == null)
22072207
periodicRefreshOffersTimer = UserThread.runPeriodically(() -> {
22082208
if (!stopped) {
2209+
log.info("Refreshing my open offers");
22092210
synchronized (openOffers.getList()) {
22102211
int size = openOffers.size();
22112212
//we clone our list as openOffers might change during our delayed call

core/src/main/java/haveno/core/trade/SellerTrade.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public boolean confirmPermitted() {
6262
}
6363

6464
public boolean isFinished() {
65-
return super.isFinished() && ((SellerProtocol) getProtocol()).needsToResendPaymentReceivedMessages();
65+
return super.isFinished() && !((SellerProtocol) getProtocol()).needsToResendPaymentReceivedMessages();
6666
}
6767
}
6868

core/src/main/java/haveno/core/trade/Trade.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,7 @@ public void initialize(ProcessModelServiceProvider serviceProvider) {
720720
if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) {
721721
if (!isInitialized) return;
722722
log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId());
723-
if (isCompleted()) clearAndShutDown();
723+
if (isInitialized && isFinished()) clearAndShutDown();
724724
else deleteWallet();
725725
}
726726
});
@@ -838,7 +838,7 @@ public NodeAddress getArbitratorNodeAddress() {
838838

839839
public void setCompleted(boolean completed) {
840840
this.isCompleted = completed;
841-
if (isPayoutUnlocked()) clearAndShutDown();
841+
if (isInitialized && isFinished()) clearAndShutDown();
842842
}
843843

844844
///////////////////////////////////////////////////////////////////////////////////////////
@@ -1558,6 +1558,11 @@ public boolean mediationResultAppliedPenaltyToSeller() {
15581558
}
15591559

15601560
public void clearAndShutDown() {
1561+
1562+
// unregister p2p message listener immediately
1563+
getProcessModel().getP2PService().removeDecryptedDirectMessageListener(getProtocol());
1564+
1565+
// clear process data and shut down trade
15611566
ThreadUtils.execute(() -> {
15621567
clearProcessData();
15631568
onShutDownStarted();
@@ -1620,6 +1625,9 @@ public void shutDown() {
16201625
isShutDownStarted = true;
16211626
if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId());
16221627

1628+
// unregister p2p message listener
1629+
getProcessModel().getP2PService().removeDecryptedDirectMessageListener(getProtocol());
1630+
16231631
// create task to shut down trade
16241632
Runnable shutDownTask = () -> {
16251633

core/src/main/java/haveno/core/trade/TradeManager.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -988,29 +988,26 @@ public void onTradeCompleted(Trade trade) {
988988
if (trade.isCompleted()) throw new RuntimeException("Trade " + trade.getId() + " was already completed");
989989
closedTradableManager.add(trade);
990990
trade.setCompleted(true);
991-
removeTrade(trade, true);
991+
removeTrade(trade);
992992
xmrWalletService.swapPayoutAddressEntryToAvailable(trade.getId()); // TODO The address entry should have been removed already. Check and if its the case remove that.
993993
requestPersistence();
994994
}
995995

996996
public void unregisterTrade(Trade trade) {
997997
log.warn("Unregistering {} {}", trade.getClass().getSimpleName(), trade.getId());
998-
removeTrade(trade, true);
998+
removeTrade(trade);
999999
removeFailedTrade(trade);
10001000
if (!trade.isMaker()) xmrWalletService.swapPayoutAddressEntryToAvailable(trade.getId()); // TODO The address entry should have been removed already. Check and if its the case remove that.
10011001
requestPersistence();
10021002
}
10031003

1004-
public void removeTrade(Trade trade, boolean removeDirectMessageListener) {
1004+
public void removeTrade(Trade trade) {
10051005
log.info("TradeManager.removeTrade() " + trade.getId());
10061006

10071007
// remove trade
10081008
synchronized (tradableList.getList()) {
10091009
if (!tradableList.remove(trade)) return;
10101010
}
1011-
1012-
// unregister message listener and persist
1013-
if (removeDirectMessageListener) p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade));
10141011
requestPersistence();
10151012
}
10161013

@@ -1077,7 +1074,7 @@ private void updateTradePeriodState() {
10771074
// we move the trade to FailedTradesManager
10781075
public void onMoveInvalidTradeToFailedTrades(Trade trade) {
10791076
failedTradesManager.add(trade);
1080-
removeTrade(trade, false);
1077+
removeTrade(trade);
10811078
}
10821079

10831080
public void onMoveFailedTradeToPendingTrades(Trade trade) {

core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434

3535
package haveno.core.trade.protocol;
3636

37+
import java.util.Date;
38+
3739
import haveno.common.ThreadUtils;
3840
import haveno.common.handlers.ErrorMessageHandler;
3941
import haveno.common.handlers.ResultHandler;
@@ -53,6 +55,8 @@
5355

5456
@Slf4j
5557
public class SellerProtocol extends DisputeProtocol {
58+
59+
private static final long resendPaymentReceivedMessageDuration = 2L * 30 * 24 * 60 * 60 * 1000; // 2 months
5660

5761
enum SellerEvent implements FluentProtocol.Event {
5862
STARTUP,
@@ -68,7 +72,7 @@ public SellerProtocol(SellerTrade trade) {
6872
protected void onInitialized() {
6973
super.onInitialized();
7074

71-
// re-send payment received message if payout not published
75+
// re-send payment received message if payment received not acked
7276
ThreadUtils.execute(() -> {
7377
if (!needsToResendPaymentReceivedMessages()) return;
7478
synchronized (trade.getLock()) {
@@ -94,13 +98,18 @@ protected void onInitialized() {
9498
}
9599

96100
public boolean needsToResendPaymentReceivedMessages() {
97-
return !trade.isShutDownStarted() && trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.getProcessModel().isPaymentReceivedMessagesReceived() && resendPaymentReceivedMessagesEnabled();
101+
return !trade.isShutDownStarted() && trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.getProcessModel().isPaymentReceivedMessagesReceived() && resendPaymentReceivedMessagesEnabled() && resendPaymentReceivedMessagesWithinDuration();
98102
}
99103

100104
private boolean resendPaymentReceivedMessagesEnabled() {
101105
return trade.getOffer().getOfferPayload().getProtocolVersion() >= 2;
102106
}
103107

108+
private boolean resendPaymentReceivedMessagesWithinDuration() {
109+
Date startDate = trade.getMaxTradePeriodDate(); // TODO: preferably use the date when the payment receipt was confirmed
110+
return new Date().getTime() <= (startDate.getTime() + resendPaymentReceivedMessageDuration);
111+
}
112+
104113
@Override
105114
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
106115
super.onTradeMessage(message, peer);

core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,12 @@ else if (peer == trade.getArbitrator()) {
860860
log.warn("Received AckMessage from unexpected peer for {}, sender={}, trade={} {}, messageUid={}, success={}, errorMsg={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.isSuccess(), ackMessage.getErrorMessage());
861861
return;
862862
}
863+
864+
// clear and shut down trade if completely finished after ack
865+
if (trade.isFinished()) {
866+
log.info("Trade {} {} is finished after PaymentReceivedMessage ACK, shutting it down", trade.getClass().getSimpleName(), trade.getId());
867+
trade.clearAndShutDown();
868+
}
863869
}
864870

865871
// generic handling

core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import haveno.core.trade.Trade;
4949
import haveno.core.trade.messages.PaymentReceivedMessage;
5050
import haveno.core.trade.messages.TradeMailboxMessage;
51+
import haveno.core.trade.protocol.SellerProtocol;
5152
import haveno.core.trade.protocol.TradePeer;
5253
import haveno.core.util.JsonUtil;
5354
import haveno.network.p2p.NodeAddress;
@@ -236,6 +237,8 @@ protected boolean isMessageReceived() {
236237
}
237238

238239
protected boolean stopSending() {
239-
return isMessageReceived() || !trade.isPaymentReceived(); // stop if received or trade state reset // TODO: also stop after some number of blocks?
240+
if (isMessageReceived()) return true;
241+
if (!((SellerProtocol) trade.getProtocol()).needsToResendPaymentReceivedMessages()) return true;
242+
return false;
240243
}
241244
}

p2p/src/main/java/haveno/network/p2p/P2PService.java

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,9 @@ public void onTorNodeReady() {
249249

250250
requestDataManager.requestPreliminaryData();
251251
keepAliveManager.start();
252-
p2pServiceListeners.forEach(SetupListener::onTorNodeReady);
252+
synchronized (p2pServiceListeners) {
253+
p2pServiceListeners.forEach(SetupListener::onTorNodeReady);
254+
}
253255
}
254256

255257
@Override
@@ -258,17 +260,23 @@ public void onHiddenServicePublished() {
258260

259261
hiddenServicePublished.set(true);
260262

261-
p2pServiceListeners.forEach(SetupListener::onHiddenServicePublished);
263+
synchronized (p2pServiceListeners) {
264+
p2pServiceListeners.forEach(SetupListener::onHiddenServicePublished);
265+
}
262266
}
263267

264268
@Override
265269
public void onSetupFailed(Throwable throwable) {
266-
p2pServiceListeners.forEach(e -> e.onSetupFailed(throwable));
270+
synchronized (p2pServiceListeners) {
271+
p2pServiceListeners.forEach(e -> e.onSetupFailed(throwable));
272+
}
267273
}
268274

269275
@Override
270276
public void onRequestCustomBridges() {
271-
p2pServiceListeners.forEach(SetupListener::onRequestCustomBridges);
277+
synchronized (p2pServiceListeners) {
278+
p2pServiceListeners.forEach(SetupListener::onRequestCustomBridges);
279+
}
272280
}
273281

274282
// Called from networkReadyBinding
@@ -304,7 +312,9 @@ public void onPreliminaryDataReceived() {
304312

305313
@Override
306314
public void onUpdatedDataReceived() {
307-
p2pServiceListeners.forEach(P2PServiceListener::onUpdatedDataReceived);
315+
synchronized (p2pServiceListeners) {
316+
p2pServiceListeners.forEach(P2PServiceListener::onUpdatedDataReceived);
317+
}
308318
}
309319

310320
@Override
@@ -314,7 +324,9 @@ public void onNoSeedNodeAvailable() {
314324

315325
@Override
316326
public void onNoPeersAvailable() {
317-
p2pServiceListeners.forEach(P2PServiceListener::onNoPeersAvailable);
327+
synchronized (p2pServiceListeners) {
328+
p2pServiceListeners.forEach(P2PServiceListener::onNoPeersAvailable);
329+
}
318330
}
319331

320332
@Override
@@ -334,7 +346,9 @@ private void applyIsBootstrapped(Consumer<P2PServiceListener> listenerHandler) {
334346
mailboxMessageService.onBootstrapped();
335347

336348
// Once we have applied the state in the P2P domain we notify our listeners
337-
p2pServiceListeners.forEach(listenerHandler);
349+
synchronized (p2pServiceListeners) {
350+
p2pServiceListeners.forEach(listenerHandler);
351+
}
338352

339353
mailboxMessageService.initAfterBootstrapped();
340354
}
@@ -369,12 +383,14 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
369383
try {
370384
DecryptedMessageWithPubKey decryptedMsg = encryptionService.decryptAndVerify(sealedMsg.getSealedAndSigned());
371385
connection.maybeHandleSupportedCapabilitiesMessage(decryptedMsg.getNetworkEnvelope());
372-
connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress ->
373-
decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress)),
374-
() -> {
375-
log.error("peersNodeAddress is expected to be available at onMessage for " +
376-
"processing PrefixedSealedAndSignedMessage.");
377-
});
386+
connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress -> {
387+
synchronized (decryptedDirectMessageListeners) {
388+
decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress));
389+
}
390+
}, () -> {
391+
log.error("peersNodeAddress is expected to be available at onMessage for " +
392+
"processing PrefixedSealedAndSignedMessage.");
393+
});
378394
} catch (CryptoException e) {
379395
log.warn("Decryption of a direct message failed. This is not expected as the " +
380396
"direct message was sent to our node.");
@@ -503,19 +519,27 @@ public boolean removeData(ProtectedStoragePayload protectedStoragePayload) {
503519
///////////////////////////////////////////////////////////////////////////////////////////
504520

505521
public void addDecryptedDirectMessageListener(DecryptedDirectMessageListener listener) {
506-
decryptedDirectMessageListeners.add(listener);
522+
synchronized (decryptedDirectMessageListeners) {
523+
decryptedDirectMessageListeners.add(listener);
524+
}
507525
}
508526

509527
public void removeDecryptedDirectMessageListener(DecryptedDirectMessageListener listener) {
510-
decryptedDirectMessageListeners.remove(listener);
528+
synchronized (decryptedDirectMessageListeners) {
529+
decryptedDirectMessageListeners.remove(listener);
530+
}
511531
}
512532

513533
public void addP2PServiceListener(P2PServiceListener listener) {
514-
p2pServiceListeners.add(listener);
534+
synchronized (p2pServiceListeners) {
535+
p2pServiceListeners.add(listener);
536+
}
515537
}
516538

517539
public void removeP2PServiceListener(P2PServiceListener listener) {
518-
p2pServiceListeners.remove(listener);
540+
synchronized (p2pServiceListeners) {
541+
p2pServiceListeners.remove(listener);
542+
}
519543
}
520544

521545
public void addHashSetChangedListener(HashMapChangedListener hashMapChangedListener) {

0 commit comments

Comments
 (0)