Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2206,6 +2206,7 @@ private void startPeriodicRefreshOffersTimer() {
if (periodicRefreshOffersTimer == null)
periodicRefreshOffersTimer = UserThread.runPeriodically(() -> {
if (!stopped) {
log.info("Refreshing my open offers");
synchronized (openOffers.getList()) {
int size = openOffers.size();
//we clone our list as openOffers might change during our delayed call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,13 +482,14 @@ private MoneroTxSet signAndPublishDisputePayoutTx(Trade trade) {
// TODO (monero-project): creating tx will require exchanging updated multisig hex if message needs reprocessed. provide weight with describe_transfer so fee can be estimated?
MoneroTxWallet feeEstimateTx = null;
try {
log.info("Creating dispute fee estimate tx for {} {}", getClass().getSimpleName(), trade.getShortId());
feeEstimateTx = createDisputePayoutTx(trade, dispute.getContract(), disputeResult, false);
} catch (Exception e) {
log.warn("Could not recreate dispute payout tx to verify fee: {}\n", e.getMessage(), e);
}
if (feeEstimateTx != null) {
HavenoUtils.verifyMinerFee(feeEstimateTx.getFee(), arbitratorSignedPayoutTx.getFee());
log.info("Dispute payout tx fee {} is within tolerance");
log.info("Dispute payout tx fee is within tolerance for {} {}", getClass().getSimpleName(), trade.getShortId());
}
} else {
disputeTxSet.setMultisigTxHex(trade.getPayoutTxHex());
Expand Down
20 changes: 18 additions & 2 deletions core/src/main/java/haveno/core/trade/SellerTrade.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@

import haveno.core.offer.Offer;
import haveno.core.trade.protocol.ProcessModel;
import haveno.core.trade.protocol.SellerProtocol;
import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.NodeAddress;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.math.BigInteger;
import java.util.Date;

@Slf4j
public abstract class SellerTrade extends Trade {

private static final long resendPaymentReceivedMessagesDurationMs = 2L * 30 * 24 * 60 * 60 * 1000; // ~2 months

SellerTrade(Offer offer,
BigInteger tradeAmount,
long tradePrice,
Expand Down Expand Up @@ -62,7 +65,20 @@ public boolean confirmPermitted() {
}

public boolean isFinished() {
return super.isFinished() && ((SellerProtocol) getProtocol()).needsToResendPaymentReceivedMessages();
return super.isFinished() && !needsToResendPaymentReceivedMessages();
}

public boolean needsToResendPaymentReceivedMessages() {
return !isShutDownStarted() && getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !getProcessModel().isPaymentReceivedMessagesReceived() && resendPaymentReceivedMessagesEnabled() && resendPaymentReceivedMessagesWithinDuration();
}

private boolean resendPaymentReceivedMessagesEnabled() {
return getOffer().getOfferPayload().getProtocolVersion() >= 2;
}

public boolean resendPaymentReceivedMessagesWithinDuration() {
Date startDate = getMaxTradePeriodDate(); // TODO: preferably use the date when the payment receipt was confirmed
return new Date().getTime() <= (startDate.getTime() + resendPaymentReceivedMessagesDurationMs);
}
}

29 changes: 22 additions & 7 deletions core/src/main/java/haveno/core/trade/Trade.java
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ public void initialize(ProcessModelServiceProvider serviceProvider) {
if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) {
if (!isInitialized) return;
log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId());
if (isCompleted()) clearAndShutDown();
if (isInitialized && isFinished()) clearAndShutDown();
else deleteWallet();
}
});
Expand Down Expand Up @@ -838,7 +838,7 @@ public NodeAddress getArbitratorNodeAddress() {

public void setCompleted(boolean completed) {
this.isCompleted = completed;
if (isPayoutUnlocked()) clearAndShutDown();
if (isInitialized && isFinished()) clearAndShutDown();
}

///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1456,11 +1456,11 @@ private void doProcessPayoutTx(String payoutTxHex, boolean sign, boolean publish

// verify fee is within tolerance by recreating payout tx
// TODO (monero-project): creating tx will require exchanging updated multisig hex if message needs reprocessed. provide weight with describe_transfer so fee can be estimated?
log.info("Creating fee estimate tx for {} {}", getClass().getSimpleName(), getId());
log.info("Creating fee estimate tx for {} {}", getClass().getSimpleName(), getShortId());
saveWallet(); // save wallet before creating fee estimate tx
MoneroTxWallet feeEstimateTx = createPayoutTx();
HavenoUtils.verifyMinerFee(feeEstimateTx.getFee(), payoutTx.getFee());
log.info("Payout tx fee {} is within tolerance");
log.info("Payout tx fee is within tolerance for {} {}", getClass().getSimpleName(), getShortId());
}

// set signed payout tx hex
Expand Down Expand Up @@ -1558,6 +1558,11 @@ public boolean mediationResultAppliedPenaltyToSeller() {
}

public void clearAndShutDown() {

// unregister p2p message listener immediately
removeDecryptedDirectMessageListener();

// clear process data and shut down trade
ThreadUtils.execute(() -> {
clearProcessData();
onShutDownStarted();
Expand All @@ -1574,18 +1579,25 @@ private void clearProcessData() {
}

// TODO: clear other process data
setPayoutTxHex(null);
if (processModel.isPaymentReceivedMessagesReceived()) setPayoutTxHex(null);
for (TradePeer peer : getAllPeers()) {
peer.setUnsignedPayoutTxHex(null);
peer.setUpdatedMultisigHex(null);
peer.setDisputeClosedMessage(null);
peer.setPaymentSentMessage(null);
peer.setDepositTxHex(null);
peer.setDepositTxKey(null);
if (peer.isPaymentReceivedMessageReceived()) peer.setPaymentReceivedMessage(null);
if (peer.isPaymentReceivedMessageReceived()) {
peer.setUnsignedPayoutTxHex(null);
peer.setPaymentReceivedMessage(null);
}
}
}

private void removeDecryptedDirectMessageListener() {
if (getProcessModel() == null || getProcessModel().getProvider() == null || getProcessModel().getP2PService() == null) return;
getProcessModel().getP2PService().removeDecryptedDirectMessageListener(getProtocol());
}

public void maybeClearSensitiveData() {
String change = "";
if (contract != null && contract.maybeClearSensitiveData()) {
Expand Down Expand Up @@ -1620,6 +1632,9 @@ public void shutDown() {
isShutDownStarted = true;
if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId());

// unregister p2p message listener
removeDecryptedDirectMessageListener();

// create task to shut down trade
Runnable shutDownTask = () -> {

Expand Down
11 changes: 4 additions & 7 deletions core/src/main/java/haveno/core/trade/TradeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -988,29 +988,26 @@ public void onTradeCompleted(Trade trade) {
if (trade.isCompleted()) throw new RuntimeException("Trade " + trade.getId() + " was already completed");
closedTradableManager.add(trade);
trade.setCompleted(true);
removeTrade(trade, true);
removeTrade(trade);
xmrWalletService.swapPayoutAddressEntryToAvailable(trade.getId()); // TODO The address entry should have been removed already. Check and if its the case remove that.
requestPersistence();
}

public void unregisterTrade(Trade trade) {
log.warn("Unregistering {} {}", trade.getClass().getSimpleName(), trade.getId());
removeTrade(trade, true);
removeTrade(trade);
removeFailedTrade(trade);
if (!trade.isMaker()) xmrWalletService.swapPayoutAddressEntryToAvailable(trade.getId()); // TODO The address entry should have been removed already. Check and if its the case remove that.
requestPersistence();
}

public void removeTrade(Trade trade, boolean removeDirectMessageListener) {
public void removeTrade(Trade trade) {
log.info("TradeManager.removeTrade() " + trade.getId());

// remove trade
synchronized (tradableList.getList()) {
if (!tradableList.remove(trade)) return;
}

// unregister message listener and persist
if (removeDirectMessageListener) p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade));
requestPersistence();
}

Expand Down Expand Up @@ -1077,7 +1074,7 @@ private void updateTradePeriodState() {
// we move the trade to FailedTradesManager
public void onMoveInvalidTradeToFailedTrades(Trade trade) {
failedTradesManager.add(trade);
removeTrade(trade, false);
removeTrade(trade);
}

public void onMoveFailedTradeToPendingTrades(Trade trade) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ public SellerProtocol(SellerTrade trade) {
protected void onInitialized() {
super.onInitialized();

// re-send payment received message if payout not published
// re-send payment received message if not acked
ThreadUtils.execute(() -> {
if (!needsToResendPaymentReceivedMessages()) return;
if (!((SellerTrade) trade).needsToResendPaymentReceivedMessages()) return;
synchronized (trade.getLock()) {
if (!needsToResendPaymentReceivedMessages()) return;
if (!!((SellerTrade) trade).needsToResendPaymentReceivedMessages()) return;
latchTrade();
given(anyPhase(Trade.Phase.PAYMENT_RECEIVED)
.with(SellerEvent.STARTUP))
Expand All @@ -93,13 +93,6 @@ protected void onInitialized() {
}, trade.getId());
}

public boolean needsToResendPaymentReceivedMessages() {
return !trade.isShutDownStarted() && trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.getProcessModel().isPaymentReceivedMessagesReceived() && resendPaymentReceivedMessagesEnabled();
}

private boolean resendPaymentReceivedMessagesEnabled() {
return trade.getOffer().getOfferPayload().getProtocolVersion() >= 2;
}

@Override
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,12 @@ else if (peer == trade.getArbitrator()) {
log.warn("Received AckMessage from unexpected peer for {}, sender={}, trade={} {}, messageUid={}, success={}, errorMsg={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.isSuccess(), ackMessage.getErrorMessage());
return;
}

// clear and shut down trade if completely finished after ack
if (trade.isFinished()) {
log.info("Trade {} {} is finished after PaymentReceivedMessage ACK, shutting it down", trade.getClass().getSimpleName(), trade.getId());
trade.clearAndShutDown();
}
}

// generic handling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import haveno.core.account.witness.AccountAgeWitnessService;
import haveno.core.network.MessageState;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.SellerTrade;
import haveno.core.trade.Trade;
import haveno.core.trade.messages.PaymentReceivedMessage;
import haveno.core.trade.messages.TradeMailboxMessage;
Expand Down Expand Up @@ -236,6 +237,9 @@ protected boolean isMessageReceived() {
}

protected boolean stopSending() {
return isMessageReceived() || !trade.isPaymentReceived(); // stop if received or trade state reset // TODO: also stop after some number of blocks?
if (isMessageReceived()) return true; // stop if message received
if (!trade.isPaymentReceived()) return true; // stop if trade state reset
if (trade.isPayoutPublished() && !((SellerTrade) trade).resendPaymentReceivedMessagesWithinDuration()) return true; // stop if payout is published and we are not in the resend period
return false;
}
}
58 changes: 41 additions & 17 deletions p2p/src/main/java/haveno/network/p2p/P2PService.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ public void onTorNodeReady() {

requestDataManager.requestPreliminaryData();
keepAliveManager.start();
p2pServiceListeners.forEach(SetupListener::onTorNodeReady);
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(SetupListener::onTorNodeReady);
}
}

@Override
Expand All @@ -258,17 +260,23 @@ public void onHiddenServicePublished() {

hiddenServicePublished.set(true);

p2pServiceListeners.forEach(SetupListener::onHiddenServicePublished);
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(SetupListener::onHiddenServicePublished);
}
}

@Override
public void onSetupFailed(Throwable throwable) {
p2pServiceListeners.forEach(e -> e.onSetupFailed(throwable));
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(e -> e.onSetupFailed(throwable));
}
}

@Override
public void onRequestCustomBridges() {
p2pServiceListeners.forEach(SetupListener::onRequestCustomBridges);
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(SetupListener::onRequestCustomBridges);
}
}

// Called from networkReadyBinding
Expand Down Expand Up @@ -304,7 +312,9 @@ public void onPreliminaryDataReceived() {

@Override
public void onUpdatedDataReceived() {
p2pServiceListeners.forEach(P2PServiceListener::onUpdatedDataReceived);
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(P2PServiceListener::onUpdatedDataReceived);
}
}

@Override
Expand All @@ -314,7 +324,9 @@ public void onNoSeedNodeAvailable() {

@Override
public void onNoPeersAvailable() {
p2pServiceListeners.forEach(P2PServiceListener::onNoPeersAvailable);
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(P2PServiceListener::onNoPeersAvailable);
}
}

@Override
Expand All @@ -334,7 +346,9 @@ private void applyIsBootstrapped(Consumer<P2PServiceListener> listenerHandler) {
mailboxMessageService.onBootstrapped();

// Once we have applied the state in the P2P domain we notify our listeners
p2pServiceListeners.forEach(listenerHandler);
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(listenerHandler);
}

mailboxMessageService.initAfterBootstrapped();
}
Expand Down Expand Up @@ -369,12 +383,14 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
try {
DecryptedMessageWithPubKey decryptedMsg = encryptionService.decryptAndVerify(sealedMsg.getSealedAndSigned());
connection.maybeHandleSupportedCapabilitiesMessage(decryptedMsg.getNetworkEnvelope());
connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress ->
decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress)),
() -> {
log.error("peersNodeAddress is expected to be available at onMessage for " +
"processing PrefixedSealedAndSignedMessage.");
});
connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress -> {
synchronized (decryptedDirectMessageListeners) {
decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress));
}
}, () -> {
log.error("peersNodeAddress is expected to be available at onMessage for " +
"processing PrefixedSealedAndSignedMessage.");
});
} catch (CryptoException e) {
log.warn("Decryption of a direct message failed. This is not expected as the " +
"direct message was sent to our node.");
Expand Down Expand Up @@ -503,19 +519,27 @@ public boolean removeData(ProtectedStoragePayload protectedStoragePayload) {
///////////////////////////////////////////////////////////////////////////////////////////

public void addDecryptedDirectMessageListener(DecryptedDirectMessageListener listener) {
decryptedDirectMessageListeners.add(listener);
synchronized (decryptedDirectMessageListeners) {
decryptedDirectMessageListeners.add(listener);
}
}

public void removeDecryptedDirectMessageListener(DecryptedDirectMessageListener listener) {
decryptedDirectMessageListeners.remove(listener);
synchronized (decryptedDirectMessageListeners) {
decryptedDirectMessageListeners.remove(listener);
}
}

public void addP2PServiceListener(P2PServiceListener listener) {
p2pServiceListeners.add(listener);
synchronized (p2pServiceListeners) {
p2pServiceListeners.add(listener);
}
}

public void removeP2PServiceListener(P2PServiceListener listener) {
p2pServiceListeners.remove(listener);
synchronized (p2pServiceListeners) {
p2pServiceListeners.remove(listener);
}
}

public void addHashSetChangedListener(HashMapChangedListener hashMapChangedListener) {
Expand Down
Loading