From c0edf7b6536405d41f05f31e9903a7676beecadc Mon Sep 17 00:00:00 2001 From: woodser <13068859+woodser@users.noreply.github.com> Date: Tue, 29 Jul 2025 19:58:35 -0400 Subject: [PATCH] fixes to resend payment received messages up to 2 months --- .../haveno/core/offer/OpenOfferManager.java | 1 + .../arbitration/ArbitrationManager.java | 3 +- .../java/haveno/core/trade/SellerTrade.java | 20 ++++++- .../main/java/haveno/core/trade/Trade.java | 29 +++++++--- .../java/haveno/core/trade/TradeManager.java | 11 ++-- .../core/trade/protocol/SellerProtocol.java | 13 +---- .../core/trade/protocol/TradeProtocol.java | 6 ++ .../SellerSendPaymentReceivedMessage.java | 6 +- .../java/haveno/network/p2p/P2PService.java | 58 +++++++++++++------ 9 files changed, 102 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/haveno/core/offer/OpenOfferManager.java b/core/src/main/java/haveno/core/offer/OpenOfferManager.java index e4895fbe075..27bba08d5d6 100644 --- a/core/src/main/java/haveno/core/offer/OpenOfferManager.java +++ b/core/src/main/java/haveno/core/offer/OpenOfferManager.java @@ -2206,6 +2206,7 @@ private void startPeriodicRefreshOffersTimer() { if (periodicRefreshOffersTimer == null) periodicRefreshOffersTimer = UserThread.runPeriodically(() -> { if (!stopped) { + log.info("Refreshing my open offers"); synchronized (openOffers.getList()) { int size = openOffers.size(); //we clone our list as openOffers might change during our delayed call diff --git a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java index 848dbdb6f50..6b54e3fb48a 100644 --- a/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java +++ b/core/src/main/java/haveno/core/support/dispute/arbitration/ArbitrationManager.java @@ -482,13 +482,14 @@ private MoneroTxSet signAndPublishDisputePayoutTx(Trade trade) { // TODO (monero-project): creating tx will require exchanging updated multisig hex if message needs reprocessed. provide weight with describe_transfer so fee can be estimated? MoneroTxWallet feeEstimateTx = null; try { + log.info("Creating dispute fee estimate tx for {} {}", getClass().getSimpleName(), trade.getShortId()); feeEstimateTx = createDisputePayoutTx(trade, dispute.getContract(), disputeResult, false); } catch (Exception e) { log.warn("Could not recreate dispute payout tx to verify fee: {}\n", e.getMessage(), e); } if (feeEstimateTx != null) { HavenoUtils.verifyMinerFee(feeEstimateTx.getFee(), arbitratorSignedPayoutTx.getFee()); - log.info("Dispute payout tx fee {} is within tolerance"); + log.info("Dispute payout tx fee is within tolerance for {} {}", getClass().getSimpleName(), trade.getShortId()); } } else { disputeTxSet.setMultisigTxHex(trade.getPayoutTxHex()); diff --git a/core/src/main/java/haveno/core/trade/SellerTrade.java b/core/src/main/java/haveno/core/trade/SellerTrade.java index ddccfe59c3d..6343ce1073a 100644 --- a/core/src/main/java/haveno/core/trade/SellerTrade.java +++ b/core/src/main/java/haveno/core/trade/SellerTrade.java @@ -19,16 +19,19 @@ import haveno.core.offer.Offer; import haveno.core.trade.protocol.ProcessModel; -import haveno.core.trade.protocol.SellerProtocol; import haveno.core.xmr.wallet.XmrWalletService; import haveno.network.p2p.NodeAddress; import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; import java.math.BigInteger; +import java.util.Date; @Slf4j public abstract class SellerTrade extends Trade { + + private static final long resendPaymentReceivedMessagesDurationMs = 2L * 30 * 24 * 60 * 60 * 1000; // ~2 months + SellerTrade(Offer offer, BigInteger tradeAmount, long tradePrice, @@ -62,7 +65,20 @@ public boolean confirmPermitted() { } public boolean isFinished() { - return super.isFinished() && ((SellerProtocol) getProtocol()).needsToResendPaymentReceivedMessages(); + return super.isFinished() && !needsToResendPaymentReceivedMessages(); + } + + public boolean needsToResendPaymentReceivedMessages() { + return !isShutDownStarted() && getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !getProcessModel().isPaymentReceivedMessagesReceived() && resendPaymentReceivedMessagesEnabled() && resendPaymentReceivedMessagesWithinDuration(); + } + + private boolean resendPaymentReceivedMessagesEnabled() { + return getOffer().getOfferPayload().getProtocolVersion() >= 2; + } + + public boolean resendPaymentReceivedMessagesWithinDuration() { + Date startDate = getMaxTradePeriodDate(); // TODO: preferably use the date when the payment receipt was confirmed + return new Date().getTime() <= (startDate.getTime() + resendPaymentReceivedMessagesDurationMs); } } diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index 9772a71f7c9..faca3cf57e1 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -720,7 +720,7 @@ public void initialize(ProcessModelServiceProvider serviceProvider) { if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) { if (!isInitialized) return; log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId()); - if (isCompleted()) clearAndShutDown(); + if (isInitialized && isFinished()) clearAndShutDown(); else deleteWallet(); } }); @@ -838,7 +838,7 @@ public NodeAddress getArbitratorNodeAddress() { public void setCompleted(boolean completed) { this.isCompleted = completed; - if (isPayoutUnlocked()) clearAndShutDown(); + if (isInitialized && isFinished()) clearAndShutDown(); } /////////////////////////////////////////////////////////////////////////////////////////// @@ -1456,11 +1456,11 @@ private void doProcessPayoutTx(String payoutTxHex, boolean sign, boolean publish // verify fee is within tolerance by recreating payout tx // TODO (monero-project): creating tx will require exchanging updated multisig hex if message needs reprocessed. provide weight with describe_transfer so fee can be estimated? - log.info("Creating fee estimate tx for {} {}", getClass().getSimpleName(), getId()); + log.info("Creating fee estimate tx for {} {}", getClass().getSimpleName(), getShortId()); saveWallet(); // save wallet before creating fee estimate tx MoneroTxWallet feeEstimateTx = createPayoutTx(); HavenoUtils.verifyMinerFee(feeEstimateTx.getFee(), payoutTx.getFee()); - log.info("Payout tx fee {} is within tolerance"); + log.info("Payout tx fee is within tolerance for {} {}", getClass().getSimpleName(), getShortId()); } // set signed payout tx hex @@ -1558,6 +1558,11 @@ public boolean mediationResultAppliedPenaltyToSeller() { } public void clearAndShutDown() { + + // unregister p2p message listener immediately + removeDecryptedDirectMessageListener(); + + // clear process data and shut down trade ThreadUtils.execute(() -> { clearProcessData(); onShutDownStarted(); @@ -1574,18 +1579,25 @@ private void clearProcessData() { } // TODO: clear other process data - setPayoutTxHex(null); + if (processModel.isPaymentReceivedMessagesReceived()) setPayoutTxHex(null); for (TradePeer peer : getAllPeers()) { - peer.setUnsignedPayoutTxHex(null); peer.setUpdatedMultisigHex(null); peer.setDisputeClosedMessage(null); peer.setPaymentSentMessage(null); peer.setDepositTxHex(null); peer.setDepositTxKey(null); - if (peer.isPaymentReceivedMessageReceived()) peer.setPaymentReceivedMessage(null); + if (peer.isPaymentReceivedMessageReceived()) { + peer.setUnsignedPayoutTxHex(null); + peer.setPaymentReceivedMessage(null); + } } } + private void removeDecryptedDirectMessageListener() { + if (getProcessModel() == null || getProcessModel().getProvider() == null || getProcessModel().getP2PService() == null) return; + getProcessModel().getP2PService().removeDecryptedDirectMessageListener(getProtocol()); + } + public void maybeClearSensitiveData() { String change = ""; if (contract != null && contract.maybeClearSensitiveData()) { @@ -1620,6 +1632,9 @@ public void shutDown() { isShutDownStarted = true; if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId()); + // unregister p2p message listener + removeDecryptedDirectMessageListener(); + // create task to shut down trade Runnable shutDownTask = () -> { diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java index 1a8e2236e59..62c3478e32e 100644 --- a/core/src/main/java/haveno/core/trade/TradeManager.java +++ b/core/src/main/java/haveno/core/trade/TradeManager.java @@ -988,29 +988,26 @@ public void onTradeCompleted(Trade trade) { if (trade.isCompleted()) throw new RuntimeException("Trade " + trade.getId() + " was already completed"); closedTradableManager.add(trade); trade.setCompleted(true); - removeTrade(trade, true); + removeTrade(trade); xmrWalletService.swapPayoutAddressEntryToAvailable(trade.getId()); // TODO The address entry should have been removed already. Check and if its the case remove that. requestPersistence(); } public void unregisterTrade(Trade trade) { log.warn("Unregistering {} {}", trade.getClass().getSimpleName(), trade.getId()); - removeTrade(trade, true); + removeTrade(trade); removeFailedTrade(trade); if (!trade.isMaker()) xmrWalletService.swapPayoutAddressEntryToAvailable(trade.getId()); // TODO The address entry should have been removed already. Check and if its the case remove that. requestPersistence(); } - public void removeTrade(Trade trade, boolean removeDirectMessageListener) { + public void removeTrade(Trade trade) { log.info("TradeManager.removeTrade() " + trade.getId()); // remove trade synchronized (tradableList.getList()) { if (!tradableList.remove(trade)) return; } - - // unregister message listener and persist - if (removeDirectMessageListener) p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade)); requestPersistence(); } @@ -1077,7 +1074,7 @@ private void updateTradePeriodState() { // we move the trade to FailedTradesManager public void onMoveInvalidTradeToFailedTrades(Trade trade) { failedTradesManager.add(trade); - removeTrade(trade, false); + removeTrade(trade); } public void onMoveFailedTradeToPendingTrades(Trade trade) { diff --git a/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java b/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java index 2a01c5a2cac..a0aaa10b1f0 100644 --- a/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/SellerProtocol.java @@ -68,11 +68,11 @@ public SellerProtocol(SellerTrade trade) { protected void onInitialized() { super.onInitialized(); - // re-send payment received message if payout not published + // re-send payment received message if not acked ThreadUtils.execute(() -> { - if (!needsToResendPaymentReceivedMessages()) return; + if (!((SellerTrade) trade).needsToResendPaymentReceivedMessages()) return; synchronized (trade.getLock()) { - if (!needsToResendPaymentReceivedMessages()) return; + if (!!((SellerTrade) trade).needsToResendPaymentReceivedMessages()) return; latchTrade(); given(anyPhase(Trade.Phase.PAYMENT_RECEIVED) .with(SellerEvent.STARTUP)) @@ -93,13 +93,6 @@ protected void onInitialized() { }, trade.getId()); } - public boolean needsToResendPaymentReceivedMessages() { - return !trade.isShutDownStarted() && trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.getProcessModel().isPaymentReceivedMessagesReceived() && resendPaymentReceivedMessagesEnabled(); - } - - private boolean resendPaymentReceivedMessagesEnabled() { - return trade.getOffer().getOfferPayload().getProtocolVersion() >= 2; - } @Override protected void onTradeMessage(TradeMessage message, NodeAddress peer) { diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java index 4249e967a97..4a6947a5328 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java @@ -860,6 +860,12 @@ else if (peer == trade.getArbitrator()) { log.warn("Received AckMessage from unexpected peer for {}, sender={}, trade={} {}, messageUid={}, success={}, errorMsg={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.isSuccess(), ackMessage.getErrorMessage()); return; } + + // clear and shut down trade if completely finished after ack + if (trade.isFinished()) { + log.info("Trade {} {} is finished after PaymentReceivedMessage ACK, shutting it down", trade.getClass().getSimpleName(), trade.getId()); + trade.clearAndShutDown(); + } } // generic handling diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java index 9fe31dba606..88650894f2e 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java @@ -45,6 +45,7 @@ import haveno.core.account.witness.AccountAgeWitnessService; import haveno.core.network.MessageState; import haveno.core.trade.HavenoUtils; +import haveno.core.trade.SellerTrade; import haveno.core.trade.Trade; import haveno.core.trade.messages.PaymentReceivedMessage; import haveno.core.trade.messages.TradeMailboxMessage; @@ -236,6 +237,9 @@ protected boolean isMessageReceived() { } protected boolean stopSending() { - return isMessageReceived() || !trade.isPaymentReceived(); // stop if received or trade state reset // TODO: also stop after some number of blocks? + if (isMessageReceived()) return true; // stop if message received + if (!trade.isPaymentReceived()) return true; // stop if trade state reset + if (trade.isPayoutPublished() && !((SellerTrade) trade).resendPaymentReceivedMessagesWithinDuration()) return true; // stop if payout is published and we are not in the resend period + return false; } } diff --git a/p2p/src/main/java/haveno/network/p2p/P2PService.java b/p2p/src/main/java/haveno/network/p2p/P2PService.java index e7a810332e0..ec296d641eb 100644 --- a/p2p/src/main/java/haveno/network/p2p/P2PService.java +++ b/p2p/src/main/java/haveno/network/p2p/P2PService.java @@ -249,7 +249,9 @@ public void onTorNodeReady() { requestDataManager.requestPreliminaryData(); keepAliveManager.start(); - p2pServiceListeners.forEach(SetupListener::onTorNodeReady); + synchronized (p2pServiceListeners) { + p2pServiceListeners.forEach(SetupListener::onTorNodeReady); + } } @Override @@ -258,17 +260,23 @@ public void onHiddenServicePublished() { hiddenServicePublished.set(true); - p2pServiceListeners.forEach(SetupListener::onHiddenServicePublished); + synchronized (p2pServiceListeners) { + p2pServiceListeners.forEach(SetupListener::onHiddenServicePublished); + } } @Override public void onSetupFailed(Throwable throwable) { - p2pServiceListeners.forEach(e -> e.onSetupFailed(throwable)); + synchronized (p2pServiceListeners) { + p2pServiceListeners.forEach(e -> e.onSetupFailed(throwable)); + } } @Override public void onRequestCustomBridges() { - p2pServiceListeners.forEach(SetupListener::onRequestCustomBridges); + synchronized (p2pServiceListeners) { + p2pServiceListeners.forEach(SetupListener::onRequestCustomBridges); + } } // Called from networkReadyBinding @@ -304,7 +312,9 @@ public void onPreliminaryDataReceived() { @Override public void onUpdatedDataReceived() { - p2pServiceListeners.forEach(P2PServiceListener::onUpdatedDataReceived); + synchronized (p2pServiceListeners) { + p2pServiceListeners.forEach(P2PServiceListener::onUpdatedDataReceived); + } } @Override @@ -314,7 +324,9 @@ public void onNoSeedNodeAvailable() { @Override public void onNoPeersAvailable() { - p2pServiceListeners.forEach(P2PServiceListener::onNoPeersAvailable); + synchronized (p2pServiceListeners) { + p2pServiceListeners.forEach(P2PServiceListener::onNoPeersAvailable); + } } @Override @@ -334,7 +346,9 @@ private void applyIsBootstrapped(Consumer listenerHandler) { mailboxMessageService.onBootstrapped(); // Once we have applied the state in the P2P domain we notify our listeners - p2pServiceListeners.forEach(listenerHandler); + synchronized (p2pServiceListeners) { + p2pServiceListeners.forEach(listenerHandler); + } mailboxMessageService.initAfterBootstrapped(); } @@ -369,12 +383,14 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { try { DecryptedMessageWithPubKey decryptedMsg = encryptionService.decryptAndVerify(sealedMsg.getSealedAndSigned()); connection.maybeHandleSupportedCapabilitiesMessage(decryptedMsg.getNetworkEnvelope()); - connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress -> - decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress)), - () -> { - log.error("peersNodeAddress is expected to be available at onMessage for " + - "processing PrefixedSealedAndSignedMessage."); - }); + connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress -> { + synchronized (decryptedDirectMessageListeners) { + decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress)); + } + }, () -> { + log.error("peersNodeAddress is expected to be available at onMessage for " + + "processing PrefixedSealedAndSignedMessage."); + }); } catch (CryptoException e) { log.warn("Decryption of a direct message failed. This is not expected as the " + "direct message was sent to our node."); @@ -503,19 +519,27 @@ public boolean removeData(ProtectedStoragePayload protectedStoragePayload) { /////////////////////////////////////////////////////////////////////////////////////////// public void addDecryptedDirectMessageListener(DecryptedDirectMessageListener listener) { - decryptedDirectMessageListeners.add(listener); + synchronized (decryptedDirectMessageListeners) { + decryptedDirectMessageListeners.add(listener); + } } public void removeDecryptedDirectMessageListener(DecryptedDirectMessageListener listener) { - decryptedDirectMessageListeners.remove(listener); + synchronized (decryptedDirectMessageListeners) { + decryptedDirectMessageListeners.remove(listener); + } } public void addP2PServiceListener(P2PServiceListener listener) { - p2pServiceListeners.add(listener); + synchronized (p2pServiceListeners) { + p2pServiceListeners.add(listener); + } } public void removeP2PServiceListener(P2PServiceListener listener) { - p2pServiceListeners.remove(listener); + synchronized (p2pServiceListeners) { + p2pServiceListeners.remove(listener); + } } public void addHashSetChangedListener(HashMapChangedListener hashMapChangedListener) {