Skip to content

Commit 483f2f2

Browse files
committed
resend payment received msgs until acked
1 parent f44d97c commit 483f2f2

File tree

8 files changed

+155
-8
lines changed

8 files changed

+155
-8
lines changed

core/src/main/java/haveno/core/trade/Trade.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ public enum State {
195195
SELLER_SENT_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
196196
SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
197197
SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
198-
SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED);
198+
SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
199+
BUYER_RECEIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED);
199200

200201
@NotNull
201202
public Phase getPhase() {

core/src/main/java/haveno/core/trade/protocol/ProcessModel.java

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,15 +161,19 @@ public class ProcessModel implements Model, PersistablePayload {
161161
@Getter
162162
@Setter
163163
private boolean importMultisigHexScheduled;
164+
private ObjectProperty<Boolean> paymentAccountDecryptedProperty = new SimpleObjectProperty<>(false);
164165

165-
// We want to indicate the user the state of the message delivery of the
166-
// PaymentSentMessage. As well we do an automatic re-send in case it was not ACKed yet.
166+
// We want to indicate the user the state of the message delivery of the payment
167+
// confirmation messages. We do an automatic re-send in case it was not ACKed yet.
167168
// To enable that even after restart we persist the state.
168169
@Setter
169170
private ObjectProperty<MessageState> paymentSentMessageStatePropertySeller = new SimpleObjectProperty<>(MessageState.UNDEFINED);
170171
@Setter
171172
private ObjectProperty<MessageState> paymentSentMessageStatePropertyArbitrator = new SimpleObjectProperty<>(MessageState.UNDEFINED);
172-
private ObjectProperty<Boolean> paymentAccountDecryptedProperty = new SimpleObjectProperty<>(false);
173+
@Setter
174+
private ObjectProperty<MessageState> paymentReceivedMessageStatePropertyBuyer = new SimpleObjectProperty<>(MessageState.UNDEFINED);
175+
@Setter
176+
private ObjectProperty<MessageState> paymentReceivedMessageStatePropertyArbitrator = new SimpleObjectProperty<>(MessageState.UNDEFINED);
173177

174178
public ProcessModel(String offerId, String accountId, PubKeyRing pubKeyRing) {
175179
this(offerId, accountId, pubKeyRing, new TradePeer(), new TradePeer(), new TradePeer());
@@ -208,6 +212,8 @@ public protobuf.ProcessModel toProtoMessage() {
208212
.setFundsNeededForTrade(fundsNeededForTrade)
209213
.setPaymentSentMessageStateSeller(paymentSentMessageStatePropertySeller.get().name())
210214
.setPaymentSentMessageStateArbitrator(paymentSentMessageStatePropertyArbitrator.get().name())
215+
.setPaymentReceivedMessageStateBuyer(paymentReceivedMessageStatePropertyBuyer.get().name())
216+
.setPaymentReceivedMessageStateArbitrator(paymentReceivedMessageStatePropertyArbitrator.get().name())
211217
.setBuyerPayoutAmountFromMediation(buyerPayoutAmountFromMediation)
212218
.setSellerPayoutAmountFromMediation(sellerPayoutAmountFromMediation)
213219
.setTradeProtocolErrorHeight(tradeProtocolErrorHeight)
@@ -253,6 +259,14 @@ public static ProcessModel fromProto(protobuf.ProcessModel proto, CoreProtoResol
253259
MessageState paymentSentMessageStateArbitrator = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateArbitratorString);
254260
processModel.setPaymentSentMessageStateArbitrator(paymentSentMessageStateArbitrator);
255261

262+
String paymentReceivedMessageStateBuyerString = ProtoUtil.stringOrNullFromProto(proto.getPaymentReceivedMessageStateBuyer());
263+
MessageState paymentReceivedMessageStateBuyer = ProtoUtil.enumFromProto(MessageState.class, paymentReceivedMessageStateBuyerString);
264+
processModel.setPaymentReceivedMessageStateBuyer(paymentReceivedMessageStateBuyer);
265+
266+
String paymentReceivedMessageStateArbitratorString = ProtoUtil.stringOrNullFromProto(proto.getPaymentReceivedMessageStateArbitrator());
267+
MessageState paymentReceivedMessageStateArbitrator = ProtoUtil.enumFromProto(MessageState.class, paymentReceivedMessageStateArbitratorString);
268+
processModel.setPaymentReceivedMessageStateArbitrator(paymentReceivedMessageStateArbitrator);
269+
256270
return processModel;
257271
}
258272

@@ -293,6 +307,20 @@ void setPaymentSentAckMessageArbitrator(AckMessage ackMessage) {
293307
setPaymentSentMessageStateArbitrator(messageState);
294308
}
295309

310+
void setPaymentReceivedAckMessageBuyer(AckMessage ackMessage) {
311+
MessageState messageState = ackMessage.isSuccess() ?
312+
MessageState.ACKNOWLEDGED :
313+
MessageState.FAILED;
314+
setPaymentReceivedMessageStateBuyer(messageState);
315+
}
316+
317+
void setPaymentReceivedAckMessageArbitrator(AckMessage ackMessage) {
318+
MessageState messageState = ackMessage.isSuccess() ?
319+
MessageState.ACKNOWLEDGED :
320+
MessageState.FAILED;
321+
setPaymentReceivedMessageStateArbitrator(messageState);
322+
}
323+
296324
public void setPaymentSentMessageStateSeller(MessageState paymentSentMessageStateProperty) {
297325
this.paymentSentMessageStatePropertySeller.set(paymentSentMessageStateProperty);
298326
if (tradeManager != null) {
@@ -307,6 +335,20 @@ public void setPaymentSentMessageStateArbitrator(MessageState paymentSentMessage
307335
}
308336
}
309337

338+
public void setPaymentReceivedMessageStateBuyer(MessageState paymentReceivedMessageStateProperty) {
339+
this.paymentReceivedMessageStatePropertyBuyer.set(paymentReceivedMessageStateProperty);
340+
if (tradeManager != null) {
341+
tradeManager.requestPersistence();
342+
}
343+
}
344+
345+
public void setPaymentReceivedMessageStateArbitrator(MessageState paymentReceivedMessageStateProperty) {
346+
this.paymentReceivedMessageStatePropertyArbitrator.set(paymentReceivedMessageStateProperty);
347+
if (tradeManager != null) {
348+
tradeManager.requestPersistence();
349+
}
350+
}
351+
310352
public boolean isPaymentSentMessageAckedBySeller() {
311353
return paymentSentMessageStatePropertySeller.get() == MessageState.ACKNOWLEDGED;
312354
}
@@ -315,6 +357,14 @@ public boolean isPaymentSentMessageAckedByArbitrator() {
315357
return paymentSentMessageStatePropertyArbitrator.get() == MessageState.ACKNOWLEDGED;
316358
}
317359

360+
public boolean isPaymentReceivedMessageAckedByBuyer() {
361+
return paymentReceivedMessageStatePropertyBuyer.get() == MessageState.ACKNOWLEDGED;
362+
}
363+
364+
public boolean isPaymentReceivedMessageAckedByArbitrator() {
365+
return paymentReceivedMessageStatePropertyArbitrator.get() == MessageState.ACKNOWLEDGED;
366+
}
367+
318368
void setDepositTxSentAckMessage(AckMessage ackMessage) {
319369
MessageState messageState = ackMessage.isSuccess() ?
320370
MessageState.ACKNOWLEDGED :

core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ public void maybeReprocessPaymentSentMessage(boolean reprocessOnError) {
291291
return;
292292
}
293293

294-
log.warn("Reprocessing payment sent message for {} {}", trade.getClass().getSimpleName(), trade.getId());
294+
log.info("Processing PaymentSentMessage for {} {}", trade.getClass().getSimpleName(), trade.getId());
295295
handle(trade.getBuyer().getPaymentSentMessage(), trade.getBuyer().getPaymentSentMessage().getSenderNodeAddress(), reprocessOnError);
296296
}
297297
}, trade.getId());
@@ -307,7 +307,7 @@ public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) {
307307
return;
308308
}
309309

310-
log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId());
310+
log.info("Processing PaymentReceivedMessage for {} {}", trade.getClass().getSimpleName(), trade.getId());
311311
handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError);
312312
}
313313
}, trade.getId());
@@ -726,6 +726,22 @@ private void onAckMessage(AckMessage ackMessage, NodeAddress sender) {
726726
}
727727
}
728728

729+
// handle ack for PaymentReceivedMessage, which automatically re-sends if not ACKed in a certain time
730+
if (ackMessage.getSourceMsgClassName().equals(PaymentReceivedMessage.class.getSimpleName())) {
731+
if (trade.getTradePeer(sender) == trade.getBuyer()) {
732+
processModel.setPaymentReceivedAckMessageBuyer(ackMessage);
733+
trade.setStateIfValidTransitionTo(Trade.State.BUYER_RECEIVED_PAYMENT_RECEIVED_MSG);
734+
processModel.getTradeManager().requestPersistence();
735+
} else if (trade.getTradePeer(sender) == trade.getArbitrator()) {
736+
processModel.setPaymentReceivedAckMessageArbitrator(ackMessage);
737+
processModel.getTradeManager().requestPersistence();
738+
} else if (!ackMessage.isSuccess()) {
739+
String err = "Received AckMessage with error state for " + ackMessage.getSourceMsgClassName() + " from "+ sender + " with tradeId " + trade.getId() + " and errorMessage=" + ackMessage.getErrorMessage();
740+
log.warn(err);
741+
return; // log error and ignore nack if not seller
742+
}
743+
}
744+
729745
if (ackMessage.isSuccess()) {
730746
log.info("Received AckMessage for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid());
731747

core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ private void tryToSendAgainLater() {
185185
return;
186186
}
187187

188-
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
189188
if (timer != null) {
190189
timer.stop();
191190
}

core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,27 +35,39 @@
3535
package haveno.core.trade.protocol.tasks;
3636

3737
import com.google.common.base.Charsets;
38+
39+
import haveno.common.Timer;
40+
import haveno.common.UserThread;
3841
import haveno.common.crypto.PubKeyRing;
3942
import haveno.common.crypto.Sig;
4043
import haveno.common.taskrunner.TaskRunner;
4144
import haveno.core.account.sign.SignedWitness;
4245
import haveno.core.account.witness.AccountAgeWitnessService;
46+
import haveno.core.network.MessageState;
4347
import haveno.core.trade.HavenoUtils;
4448
import haveno.core.trade.Trade;
4549
import haveno.core.trade.messages.PaymentReceivedMessage;
4650
import haveno.core.trade.messages.TradeMailboxMessage;
4751
import haveno.core.trade.protocol.TradePeer;
4852
import haveno.core.util.JsonUtil;
4953
import haveno.network.p2p.NodeAddress;
54+
import javafx.beans.value.ChangeListener;
5055
import lombok.EqualsAndHashCode;
5156
import lombok.extern.slf4j.Slf4j;
5257

5358
import static com.google.common.base.Preconditions.checkArgument;
5459

60+
import java.util.concurrent.TimeUnit;
61+
5562
@Slf4j
5663
@EqualsAndHashCode(callSuper = true)
5764
public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessageTask {
58-
SignedWitness signedWitness = null;
65+
private SignedWitness signedWitness = null;
66+
private ChangeListener<MessageState> listener;
67+
private Timer timer;
68+
private static final int MAX_RESEND_ATTEMPTS = 20;
69+
private int delayInMin = 10;
70+
private int resendCounter = 0;
5971

6072
public SellerSendPaymentReceivedMessage(TaskRunner<Trade> taskHandler, Trade trade) {
6173
super(taskHandler, trade);
@@ -136,6 +148,7 @@ protected TradeMailboxMessage getTradeMailboxMessage(String tradeId) {
136148
protected void setStateSent() {
137149
trade.advanceState(Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG);
138150
log.info("{} sent: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
151+
tryToSendAgainLater();
139152
processModel.getTradeManager().requestPersistence();
140153
}
141154

@@ -159,4 +172,59 @@ protected void setStateArrived() {
159172
log.info("{} arrived: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
160173
processModel.getTradeManager().requestPersistence();
161174
}
175+
176+
private void cleanup() {
177+
if (timer != null) {
178+
timer.stop();
179+
}
180+
if (listener != null) {
181+
processModel.getPaymentReceivedMessageStatePropertyBuyer().removeListener(listener);
182+
}
183+
}
184+
185+
private void tryToSendAgainLater() {
186+
187+
// skip if already acked
188+
if (isAckedByReceiver()) return;
189+
190+
if (resendCounter >= MAX_RESEND_ATTEMPTS) {
191+
cleanup();
192+
log.warn("We never received an ACK message when sending the PaymentReceivedMessage to the peer. We stop trying to send the message.");
193+
return;
194+
}
195+
196+
if (timer != null) {
197+
timer.stop();
198+
}
199+
200+
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
201+
202+
if (resendCounter == 0) {
203+
listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
204+
processModel.getPaymentReceivedMessageStatePropertyBuyer().addListener(listener);
205+
onMessageStateChange(processModel.getPaymentReceivedMessageStatePropertyBuyer().get());
206+
}
207+
208+
// first re-send is after 2 minutes, then increase the delay exponentially
209+
if (resendCounter == 0) {
210+
int shortDelay = 2;
211+
log.info("We will send the message again to the peer after a delay of {} min.", shortDelay);
212+
timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES);
213+
} else {
214+
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
215+
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
216+
delayInMin = (int) ((double) delayInMin * 1.5);
217+
}
218+
resendCounter++;
219+
}
220+
221+
private void onMessageStateChange(MessageState newValue) {
222+
if (newValue == MessageState.ACKNOWLEDGED) {
223+
trade.setStateIfValidTransitionTo(Trade.State.BUYER_RECEIVED_PAYMENT_RECEIVED_MSG);
224+
processModel.getTradeManager().requestPersistence();
225+
cleanup();
226+
}
227+
}
228+
229+
protected abstract boolean isAckedByReceiver();
162230
}

core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToArbitrator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,9 @@ public SellerSendPaymentReceivedMessageToArbitrator(TaskRunner<Trade> taskHandle
3535
protected TradePeer getReceiver() {
3636
return trade.getArbitrator();
3737
}
38+
39+
@Override
40+
protected boolean isAckedByReceiver() {
41+
return trade.getProcessModel().isPaymentReceivedMessageAckedByArbitrator();
42+
}
3843
}

core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,9 @@ protected void onFault(String errorMessage, TradeMessage message) {
4444
appendToErrorMessage("Sending message failed: message=" + message + "\nerrorMessage=" + errorMessage);
4545
complete();
4646
}
47+
48+
@Override
49+
protected boolean isAckedByReceiver() {
50+
return trade.getProcessModel().isPaymentReceivedMessageAckedByBuyer();
51+
}
4752
}

proto/src/main/proto/pb.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,6 +1465,7 @@ message Trade {
14651465
SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG = 24;
14661466
SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG = 25;
14671467
SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG = 26;
1468+
BUYER_RECEIVED_PAYMENT_RECEIVED_MSG = 27;
14681469
}
14691470

14701471
enum Phase {
@@ -1582,6 +1583,8 @@ message ProcessModel {
15821583
int64 trade_protocol_error_height = 18;
15831584
string trade_fee_address = 19;
15841585
bool import_multisig_hex_scheduled = 20;
1586+
string payment_received_message_state_buyer = 21;
1587+
string payment_received_message_state_arbitrator = 22;
15851588
}
15861589

15871590
message TradePeer {

0 commit comments

Comments
 (0)