Skip to content

Reputation recorder #2897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/release-notes/eclair-vnext.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,33 @@ Attribution data also provides hold times from payment relayers, both for fulfil
Support is enabled by default.
It can be disabled by setting `eclair.features.option_attribution_data = disabled`.

### Local reputation and HTLC endorsement

To protect against jamming attacks, eclair gives a reputation to its neighbors and uses it to decide if a HTLC should be relayed given how congested the outgoing channel is.
The reputation is basically how much this node paid us in fees divided by how much they should have paid us for the liquidity and slots that they blocked.
The reputation is per incoming node and endorsement level.
The confidence that the HTLC will be fulfilled is transmitted to the next node using the endorsement TLV of the `update_add_htlc` message.
Note that HTLCs that are considered dangerous are still relayed: this is the first phase of a network-wide experimentation aimed at collecting data.

To configure, edit `eclair.conf`:

```eclair.conf
// We assign reputations to our peers to prioritize payments during congestion.
// The reputation is computed as fees paid divided by what should have been paid if all payments were successful.
eclair.relay.peer-reputation {
// Set this parameter to false to disable the reputation algorithm and simply relay the incoming endorsement
// value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md,
enabled = true
// Reputation decays with the following half life to emphasize recent behavior.
half-life = 15 days
// Payments that stay pending for longer than this get penalized
max-relay-duration = 12 seconds
// Pending payments are counted as failed, and because they could potentially stay pending for a very long time,
// the following multiplier is applied.
pending-multiplier = 200 // A pending payment counts as a thousand failed ones.
}
```

### API changes

- `listoffers` now returns more details about each offer.
Expand Down
17 changes: 17 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,23 @@ eclair {
// Number of blocks before the incoming HTLC expires that an async payment must be triggered by the receiver
cancel-safety-before-timeout-blocks = 144
}

// We assign reputation to our peers to prioritize payments during congestion.
// The reputation is computed as fees paid divided by what should have been paid if all payments were successful.
peer-reputation {
// Set this parameter to false to disable the reputation algorithm and simply relay the incoming endorsement
// value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md,
enabled = true
// Reputation decays with the following half life to emphasize recent behavior.
half-life = 15 days
// Payments that stay pending for longer than this get penalized.
max-relay-duration = 12 seconds
// Pending payments are counted as failed, and because they could potentially stay pending for a very long time,
// the following multiplier is applied. We want it to be as close as possible to the true cost of a worst case
// HTLC (max-cltv-delta / max-relay-duration, around 100000 with default parameters) while still being comparable
// to the number of HTLCs received per peer during twice the half life.
pending-multiplier = 200 // A pending payment counts as two hundred failed ones.
}
}

on-chain-fees {
Expand Down
9 changes: 8 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.offer.OffersConfig
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
import fr.acinq.eclair.reputation.Reputation
import fr.acinq.eclair.router.Announcements.AddressException
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, PaymentWeightRatios}
import fr.acinq.eclair.router.Router._
Expand Down Expand Up @@ -641,7 +642,13 @@ object NodeParams extends Logging {
privateChannelFees = getRelayFees(config.getConfig("relay.fees.private-channels")),
minTrampolineFees = getRelayFees(config.getConfig("relay.fees.min-trampoline")),
enforcementDelay = FiniteDuration(config.getDuration("relay.fees.enforcement-delay").getSeconds, TimeUnit.SECONDS),
asyncPaymentsParams = AsyncPaymentsParams(asyncPaymentHoldTimeoutBlocks, asyncPaymentCancelSafetyBeforeTimeoutBlocks)
asyncPaymentsParams = AsyncPaymentsParams(asyncPaymentHoldTimeoutBlocks, asyncPaymentCancelSafetyBeforeTimeoutBlocks),
peerReputationConfig = Reputation.Config(
enabled = config.getBoolean("relay.peer-reputation.enabled"),
halfLife = FiniteDuration(config.getDuration("relay.peer-reputation.half-life").getSeconds, TimeUnit.SECONDS),
maxRelayDuration = FiniteDuration(config.getDuration("relay.peer-reputation.max-relay-duration").getSeconds, TimeUnit.SECONDS),
pendingMultiplier = config.getDouble("relay.peer-reputation.pending-multiplier"),
),
),
db = database,
autoReconnect = config.getBoolean("auto-reconnect"),
Expand Down
8 changes: 7 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import fr.acinq.eclair.payment.offer.{DefaultOfferHandler, OfferManager}
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
import fr.acinq.eclair.reputation.ReputationRecorder
import fr.acinq.eclair.router._
import fr.acinq.eclair.tor.{Controller, TorProtocolHandler}
import fr.acinq.eclair.wire.protocol.NodeAddress
Expand Down Expand Up @@ -379,7 +380,12 @@ class Setup(val datadir: File,
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
peerReadyManager = system.spawn(Behaviors.supervise(PeerReadyManager()).onFailure(typed.SupervisorStrategy.restart), name = "peer-ready-manager")
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
reputationRecorder_opt = if (nodeParams.relayParams.peerReputationConfig.enabled) {
Some(system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder"))
} else {
None
}
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, reputationRecorder_opt, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
_ = relayer ! PostRestartHtlcCleaner.Init(channels)
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningS
import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.transactions.CommitmentSpec
import fr.acinq.eclair.transactions.Transactions._
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureReason, FundingCreated, FundingSigned, Init, LiquidityAds, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxInitRbf, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc}
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureReason, FundingCreated, FundingSigned, HtlcFailureMessage, Init, LiquidityAds, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, TxInitRbf, TxSignatures, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc}
import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiry, CltvExpiryDelta, Features, InitFeature, MilliSatoshi, MilliSatoshiLong, RealShortChannelId, TimestampMilli, UInt64}
import scodec.bits.ByteVector

Expand Down Expand Up @@ -139,19 +139,25 @@ case class INPUT_RESTORED(data: PersistentChannelData)
sealed trait Upstream { def amountIn: MilliSatoshi }
object Upstream {
/** We haven't restarted and have full information about the upstream parent(s). */
sealed trait Hot extends Upstream
sealed trait Hot extends Upstream {
def show: String
}
object Hot {
/** Our node is forwarding a single incoming HTLC. */
case class Channel(add: UpdateAddHtlc, receivedAt: TimestampMilli, receivedFrom: PublicKey) extends Hot {
override val amountIn: MilliSatoshi = add.amountMsat
val expiryIn: CltvExpiry = add.cltvExpiry

override def show: String = s"Channel(receivedAt=${receivedAt.toLong}, receivedFrom=${receivedFrom.toHex}, endorsement=${add.endorsement})"
}
/** Our node is forwarding a payment based on a set of HTLCs from potentially multiple upstream channels. */
case class Trampoline(received: List[Channel]) extends Hot {
override val amountIn: MilliSatoshi = received.map(_.add.amountMsat).sum
// We must use the lowest expiry of the incoming HTLC set.
val expiryIn: CltvExpiry = received.map(_.add.cltvExpiry).min
val receivedAt: TimestampMilli = received.map(_.receivedAt).max

override def show: String = s"Trampoline(${received.map(_.show).mkString(",")})"
}
}

Expand All @@ -175,7 +181,11 @@ object Upstream {
}

/** Our node is the origin of the payment: there are no matching upstream HTLCs. */
case class Local(id: UUID) extends Hot with Cold { override val amountIn: MilliSatoshi = 0 msat }
case class Local(id: UUID) extends Hot with Cold {
override val amountIn: MilliSatoshi = 0 msat

override def show: String = toString
}
}

/**
Expand Down Expand Up @@ -209,6 +219,7 @@ final case class CMD_ADD_HTLC(replyTo: ActorRef,
onion: OnionRoutingPacket,
nextPathKey_opt: Option[PublicKey],
confidence: Double,
endorsement: Int,
fundingFee_opt: Option[LiquidityAds.FundingFee],
origin: Origin.Hot,
commit: Boolean = false) extends HasReplyToCommand with ForbiddenCommandDuringQuiescenceNegotiation with ForbiddenCommandWhenQuiescent
Expand Down Expand Up @@ -247,6 +258,10 @@ final case class CMD_GET_CHANNEL_STATE(replyTo: ActorRef) extends HasReplyToComm
final case class CMD_GET_CHANNEL_DATA(replyTo: ActorRef) extends HasReplyToCommand
final case class CMD_GET_CHANNEL_INFO(replyTo: akka.actor.typed.ActorRef[RES_GET_CHANNEL_INFO]) extends Command

case class OutgoingHtlcAdded(add: UpdateAddHtlc, remoteNodeId: PublicKey, upstream: Upstream.Hot, fee: MilliSatoshi)
case class OutgoingHtlcFailed(fail: HtlcFailureMessage)
case class OutgoingHtlcFulfilled(fulfill: UpdateFulfillHtlc)

/*
88888888b. 8888888888 .d8888b. 88888888b. ,ad8888ba, 888b 88 .d8888b. 8888888888 .d8888b.
88 "8b 88 d88P Y88b 88 "8b d8"' `"8b 8888b 88 d88P Y88b 88 d88P Y88b
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ case class RemoteError(e: protocol.Error) extends ChannelError
// @formatter:on

class ChannelException(val channelId: ByteVector32, message: String) extends RuntimeException(message)
class ChannelJammingException(override val channelId: ByteVector32, message: String) extends ChannelException(channelId, message)

// @formatter:off
case class InvalidChainHash (override val channelId: ByteVector32, local: BlockHash, remote: BlockHash) extends ChannelException(channelId, s"invalid chainHash (local=$local remote=$remote)")
Expand Down Expand Up @@ -150,4 +151,6 @@ case class CommandUnavailableInThisState (override val channelId: Byte
case class ForbiddenDuringSplice (override val channelId: ByteVector32, command: String) extends ChannelException(channelId, s"cannot process $command while splicing")
case class ForbiddenDuringQuiescence (override val channelId: ByteVector32, command: String) extends ChannelException(channelId, s"cannot process $command while quiescent")
case class ConcurrentRemoteSplice (override val channelId: ByteVector32) extends ChannelException(channelId, "splice attempt canceled, remote initiated splice before us")
case class TooManySmallHtlcs (override val channelId: ByteVector32, number: Long, below: MilliSatoshi) extends ChannelJammingException(channelId, s"too many small htlcs: $number HTLCs below $below")
case class ConfidenceTooLow (override val channelId: ByteVector32, confidence: Double, occupancy: Double) extends ChannelJammingException(channelId, s"confidence too low: confidence=$confidence occupancy=$occupancy")
// @formatter:on
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ case class Commitment(fundingTxIndex: Long,
localCommit.spec.htlcs.collect(DirectedHtlc.incoming).filter(nearlyExpired)
}

def canSendAdd(amount: MilliSatoshi, params: ChannelParams, changes: CommitmentChanges, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, Unit] = {
def canSendAdd(amount: MilliSatoshi, params: ChannelParams, changes: CommitmentChanges, feerates: FeeratesPerKw, feeConf: OnChainFeeConf, confidence: Double): Either[ChannelException, Unit] = {
// we allowed mismatches between our feerates and our remote's as long as commitments didn't contain any HTLC at risk
// we need to verify that we're not disagreeing on feerates anymore before offering new HTLCs
// NB: there may be a pending update_fee that hasn't been applied yet that needs to be taken into account
Expand Down Expand Up @@ -509,7 +509,8 @@ case class Commitment(fundingTxIndex: Long,
if (allowedHtlcValueInFlight < htlcValueInFlight) {
return Left(HtlcValueTooHighInFlight(params.channelId, maximum = allowedHtlcValueInFlight, actual = htlcValueInFlight))
}
if (Seq(params.localParams.maxAcceptedHtlcs, params.remoteParams.maxAcceptedHtlcs).min < outgoingHtlcs.size) {
val maxAcceptedHtlcs = params.localParams.maxAcceptedHtlcs.min(params.remoteParams.maxAcceptedHtlcs)
if (maxAcceptedHtlcs < outgoingHtlcs.size) {
return Left(TooManyAcceptedHtlcs(params.channelId, maximum = Seq(params.localParams.maxAcceptedHtlcs, params.remoteParams.maxAcceptedHtlcs).min))
}

Expand All @@ -526,6 +527,18 @@ case class Commitment(fundingTxIndex: Long,
return Left(RemoteDustHtlcExposureTooHigh(params.channelId, maxDustExposure, remoteDustExposureAfterAdd))
}

// Jamming protection
// Must be the last checks so that they can be ignored for shadow deployment.
for ((amountMsat, i) <- outgoingHtlcs.toSeq.map(_.amountMsat).sorted.zipWithIndex) {
if ((amountMsat.toLong < 1) || (math.log(amountMsat.toLong.toDouble) * maxAcceptedHtlcs / math.log(params.localParams.maxHtlcValueInFlightMsat.toLong.toDouble / maxAcceptedHtlcs) < i)) {
return Left(TooManySmallHtlcs(params.channelId, number = i + 1, below = amountMsat))
}
}
val occupancy = (outgoingHtlcs.size.toDouble / maxAcceptedHtlcs).max(htlcValueInFlight.toLong.toDouble / allowedHtlcValueInFlight.toLong.toDouble)
if (confidence + 0.1 < occupancy) { // We add a 10% tolerance to enable payments from nodes without history and to account for the fact that even at the highest endorsement level we still expect a confidence of less than 93.75%.
return Left(ConfidenceTooLow(params.channelId, confidence, occupancy))
}

Right(())
}

Expand Down Expand Up @@ -870,7 +883,7 @@ case class Commitments(params: ChannelParams,
* @param cmd add HTLC command
* @return either Left(failure, error message) where failure is a failure message (see BOLT #4 and the Failure Message class) or Right(new commitments, updateAddHtlc)
*/
def sendAdd(cmd: CMD_ADD_HTLC, currentHeight: BlockHeight, channelConf: ChannelConf, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, (Commitments, UpdateAddHtlc)] = {
def sendAdd(cmd: CMD_ADD_HTLC, currentHeight: BlockHeight, channelConf: ChannelConf, feerates: FeeratesPerKw, feeConf: OnChainFeeConf)(implicit log: LoggingAdapter): Either[ChannelException, (Commitments, UpdateAddHtlc)] = {
// we must ensure we're not relaying htlcs that are already expired, otherwise the downstream channel will instantly close
// NB: we add a 3 blocks safety to reduce the probability of running into this when our bitcoin node is slightly outdated
val minExpiry = CltvExpiry(currentHeight + 3)
Expand All @@ -889,14 +902,28 @@ case class Commitments(params: ChannelParams,
return Left(HtlcValueTooSmall(params.channelId, minimum = htlcMinimum, actual = cmd.amount))
}

val add = UpdateAddHtlc(channelId, changes.localNextHtlcId, cmd.amount, cmd.paymentHash, cmd.cltvExpiry, cmd.onion, cmd.nextPathKey_opt, cmd.confidence, cmd.fundingFee_opt)
val add = UpdateAddHtlc(channelId, changes.localNextHtlcId, cmd.amount, cmd.paymentHash, cmd.cltvExpiry, cmd.onion, cmd.nextPathKey_opt, cmd.endorsement, cmd.fundingFee_opt)
// we increment the local htlc index and add an entry to the origins map
val changes1 = changes.addLocalProposal(add).copy(localNextHtlcId = changes.localNextHtlcId + 1)
val originChannels1 = originChannels + (add.id -> cmd.origin)
// we verify that this htlc is allowed in every active commitment
active.map(_.canSendAdd(add.amountMsat, params, changes1, feerates, feeConf))
.collectFirst { case Left(f) => Left(f) }
.getOrElse(Right(copy(changes = changes1, originChannels = originChannels1), add))
val canSendAdds = active.map(_.canSendAdd(add.amountMsat, params, changes1, feerates, feeConf, cmd.confidence))
val result = canSendAdds.collectFirst { case Left(f) if !f.isInstanceOf[ChannelJammingException] => // We ignore jamming protection. TODO: enable jamming protection
Metrics.dropHtlc(f, Tags.Directions.Outgoing)
Left(f)
}.getOrElse(Right(copy(changes = changes1, originChannels = originChannels1), add))
// Jamming protection is disabled but we still log which HTLCs would be dropped if it was enabled.
if (result.isRight) {
canSendAdds.collectFirst {
case Left(f: TooManySmallHtlcs) =>
log.info("TooManySmallHtlcs: {} outgoing HTLCs are below {}}", f.number, f.below)
Metrics.dropHtlc(f, Tags.Directions.Outgoing)
case Left(f: ConfidenceTooLow) =>
log.info("ConfidenceTooLow: confidence is {}% while channel is {}% full", (100 * f.confidence).toInt, (100 * f.occupancy).toInt)
Metrics.dropHtlc(f, Tags.Directions.Outgoing)
}
}
result
}

def receiveAdd(add: UpdateAddHtlc, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, Commitments] = {
Expand Down
Loading