108108import java .math .BigInteger ;
109109import java .util .ArrayList ;
110110import java .util .Arrays ;
111+ import java .util .Collection ;
111112import java .util .Comparator ;
112113import java .util .Date ;
113114import java .util .HashMap ;
@@ -316,12 +317,12 @@ public void onDirectMessage(DecryptedMessageWithPubKey message, NodeAddress send
316317
317318 public void onAllServicesInitialized () {
318319 if (p2PService .isBootstrapped ()) {
319- initPersistedTrades ();
320+ initTrades ();
320321 } else {
321322 p2PService .addP2PServiceListener (new BootstrapListener () {
322323 @ Override
323324 public void onDataReceived () {
324- initPersistedTrades ();
325+ initTrades ();
325326 }
326327 });
327328 }
@@ -332,13 +333,13 @@ public void onDataReceived() {
332333 @ Override
333334 public void onAccountCreated () {
334335 log .info (TradeManager .class + ".accountService.onAccountCreated()" );
335- initPersistedTrades ();
336+ initTrades ();
336337 }
337338
338339 @ Override
339340 public void onAccountOpened () {
340341 log .info (TradeManager .class + ".accountService.onAccountOpened()" );
341- initPersistedTrades ();
342+ initTrades ();
342343 }
343344
344345 @ Override
@@ -423,11 +424,11 @@ public TradeProtocol createTradeProtocol(Trade trade) {
423424 }
424425
425426 ///////////////////////////////////////////////////////////////////////////////////////////
426- // Init pending trade
427+ // Init trades
427428 ///////////////////////////////////////////////////////////////////////////////////////////
428429
429- private void initPersistedTrades () {
430- log .info ("Initializing persisted trades" );
430+ private void initTrades () {
431+ log .info ("Initializing trades" );
431432
432433 // initialize off main thread
433434 new Thread (() -> {
@@ -437,52 +438,20 @@ private void initPersistedTrades() {
437438
438439 // initialize trades in parallel
439440 int threadPoolSize = 10 ;
440- Set <Runnable > tasks = new HashSet <Runnable >();
441+ Set <Runnable > initTasksP1 = new HashSet <Runnable >();
442+ Set <Runnable > initTasksP2 = new HashSet <Runnable >();
441443 Set <String > uids = new HashSet <String >();
442444 Set <Trade > tradesToSkip = new HashSet <Trade >();
443445 Set <Trade > uninitializedTrades = new HashSet <Trade >();
444446 for (Trade trade : trades ) {
445- tasks .add (() -> {
446- try {
447-
448- // check for duplicate uid
449- if (!uids .add (trade .getUid ())) {
450- log .warn ("Found trade with duplicate uid, skipping. That should never happen. {} {}, uid={}" , trade .getClass ().getSimpleName (), trade .getId (), trade .getUid ());
451- tradesToSkip .add (trade );
452- return ;
453- }
454-
455- // skip if failed and error handling not scheduled
456- if (failedTradesManager .getObservableList ().contains (trade ) && !trade .isProtocolErrorHandlingScheduled ()) {
457- log .warn ("Skipping initialization of failed trade {} {}" , trade .getClass ().getSimpleName (), trade .getId ());
458- tradesToSkip .add (trade );
459- return ;
460- }
461-
462- // add random delay up to 10s to avoid syncing at exactly the same time
463- if (trades .size () > 1 && trade .walletExists ()) {
464- int delay = (int ) (Math .random () * 10000 );
465- HavenoUtils .waitFor (delay );
466- }
467-
468- // initialize trade
469- initPersistedTrade (trade );
470-
471- // record if protocol didn't initialize
472- if (!trade .isDepositsPublished ()) {
473- uninitializedTrades .add (trade );
474- }
475- } catch (Exception e ) {
476- if (!isShutDownStarted ) {
477- log .warn ("Error initializing {} {}: {}\n " , trade .getClass ().getSimpleName (), trade .getId (), e .getMessage (), e );
478- trade .setInitError (e );
479- trade .prependErrorMessage (e .getMessage ());
480- }
481- }
482- });
447+ Runnable initTradeTask = getInitTradeTask (trade , trades , tradesToSkip , uninitializedTrades , uids );
448+ if (trade .isDepositsPublished () && !trade .isPayoutUnlocked ()) initTasksP1 .add (initTradeTask );
449+ else initTasksP2 .add (initTradeTask );
483450 };
484- ThreadUtils .awaitTasks (tasks , threadPoolSize );
485- log .info ("Done initializing persisted trades" );
451+ ThreadUtils .awaitTasks (initTasksP1 , threadPoolSize );
452+ log .info ("Done initializing priority trades" );
453+ ThreadUtils .awaitTasks (initTasksP2 , threadPoolSize );
454+ log .info ("Done initializing all trades" );
486455 if (isShutDownStarted ) return ;
487456
488457 // remove skipped trades
@@ -545,7 +514,54 @@ private void initPersistedTrades() {
545514 HavenoUtils .waitFor (100 );
546515 }
547516
548- private void initPersistedTrade (Trade trade ) {
517+ private Runnable getInitTradeTask (Trade trade , Collection <Trade > trades , Set <Trade > tradesToSkip , Set <Trade > uninitializedTrades , Set <String > uids ) {
518+ return () -> {
519+ try {
520+
521+ // check for duplicate uid
522+ synchronized (uids ) {
523+ if (!uids .add (trade .getUid ())) {
524+ log .warn ("Found trade with duplicate uid, skipping. That should never happen. {} {}, uid={}" , trade .getClass ().getSimpleName (), trade .getId (), trade .getUid ());
525+ tradesToSkip .add (trade );
526+ return ;
527+ }
528+ }
529+
530+ // skip if failed and error handling not scheduled
531+ if (failedTradesManager .getObservableList ().contains (trade ) && !trade .isProtocolErrorHandlingScheduled ()) {
532+ log .warn ("Skipping initialization of failed trade {} {}" , trade .getClass ().getSimpleName (), trade .getId ());
533+ synchronized (tradesToSkip ) {
534+ tradesToSkip .add (trade );
535+ return ;
536+ }
537+ }
538+
539+ // add random delay up to 10s to avoid syncing at exactly the same time
540+ if (trades .size () > 1 && trade .walletExists ()) {
541+ int delay = (int ) (Math .random () * 10000 );
542+ HavenoUtils .waitFor (delay );
543+ }
544+
545+ // initialize trade
546+ initTrade (trade );
547+
548+ // record if protocol didn't initialize
549+ if (!trade .isDepositsPublished ()) {
550+ synchronized (uninitializedTrades ) {
551+ uninitializedTrades .add (trade );
552+ }
553+ }
554+ } catch (Exception e ) {
555+ if (!isShutDownStarted ) {
556+ log .warn ("Error initializing {} {}: {}\n " , trade .getClass ().getSimpleName (), trade .getId (), e .getMessage (), e );
557+ trade .setInitError (e );
558+ trade .prependErrorMessage (e .getMessage ());
559+ }
560+ }
561+ };
562+ }
563+
564+ private void initTrade (Trade trade ) {
549565 if (isShutDown ) return ;
550566 if (getTradeProtocol (trade ) != null ) return ;
551567 initTradeAndProtocol (trade , createTradeProtocol (trade ));
@@ -1117,7 +1133,7 @@ private void removeFailedTrade(Trade trade) {
11171133 private void addTradeToPendingTrades (Trade trade ) {
11181134 if (!trade .isInitialized ()) {
11191135 try {
1120- initPersistedTrade (trade );
1136+ initTrade (trade );
11211137 } catch (Exception e ) {
11221138 log .warn ("Error initializing {} {} on move to pending trades" , trade .getClass ().getSimpleName (), trade .getShortId (), e );
11231139 }
@@ -1200,7 +1216,7 @@ private boolean unFailTrade(Trade trade) {
12001216 return false ;
12011217 }
12021218
1203- initPersistedTrade (trade );
1219+ initTrade (trade );
12041220
12051221 UserThread .execute (() -> {
12061222 synchronized (tradableList .getList ()) {
0 commit comments