Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ Base URL: http://stats.filspark.com/

http://stats.filspark.com/participant//0x000000000000000000000000000000000000dEaD/reward-transfers


- `GET /stations/daily?from=<day>&to=<day>`

http://stats.filspark.com/stations/daily
Expand All @@ -85,6 +84,10 @@ Base URL: http://stats.filspark.com/

http://stats.filspark.com/deals/summary

- `GET /retrieval-result-codes/daily?from=2024-01-01&to=2024-01-31`

http://stats.filspark.com/retrieval-result-codes/daily


## Development

Expand Down
7 changes: 7 additions & 0 deletions db/migrations/005.do.retrieval-result-status.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE daily_retrieval_result_codes (
day DATE NOT NULL,
code TEXT NOT NULL,
rate NUMERIC NOT NULL,
PRIMARY KEY (day, code)
);
CREATE INDEX daily_retrieval_result_codes_to_day ON daily_retrieval_result_codes (day);
57 changes: 35 additions & 22 deletions observer/bin/spark-observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@ import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
import { ethers } from 'ethers'
import * as Sentry from '@sentry/node'
import timers from 'node:timers/promises'
import { InfluxDB } from '@influxdata/influxdb-client'
import assert from 'node:assert/strict'

import { RPC_URL, rpcHeaders } from '../lib/config.js'
import { getPgPools } from '@filecoin-station/spark-stats-db'
import {
observeTransferEvents,
observeScheduledRewards
observeScheduledRewards,
observeRetrievalResultCodes
} from '../lib/observer.js'

const { INFLUXDB_TOKEN } = process.env

assert(INFLUXDB_TOKEN, 'INFLUXDB_TOKEN required')

const pgPools = await getPgPools()

const fetchRequest = new ethers.FetchRequest(RPC_URL)
Expand All @@ -19,39 +26,45 @@ const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true

const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider)

const ONE_HOUR = 60 * 60 * 1000
const influx = new InfluxDB({
url: 'https://eu-central-1-1.aws.cloud2.influxdata.com',
// spark-stats-observer-read
token: INFLUXDB_TOKEN
})

const loopObserveTransferEvents = async () => {
while (true) {
const start = Date.now()
try {
await observeTransferEvents(pgPools.stats, ieContract, provider)
} catch (e) {
console.error(e)
Sentry.captureException(e)
}
const dt = Date.now() - start
console.log(`Observing Transfer events took ${dt}ms`)
await timers.setTimeout(ONE_HOUR - dt)
}
}
const influxQueryApi = influx.getQueryApi('Filecoin Station')

const ONE_HOUR = 60 * 60 * 1000

const loopObserveScheduledRewards = async () => {
const loop = async (name, fn, interval) => {
while (true) {
const start = Date.now()
try {
await observeScheduledRewards(pgPools, ieContract)
await fn()
} catch (e) {
console.error(e)
Sentry.captureException(e)
}
const dt = Date.now() - start
console.log(`Observing scheduled rewards took ${dt}ms`)
await timers.setTimeout((24 * ONE_HOUR) - dt)
console.log(`Loop "${name}" took ${dt}ms`)
await timers.setTimeout(interval - dt)
}
}

await Promise.all([
loopObserveTransferEvents(),
loopObserveScheduledRewards()
loop(
'Transfer events',
() => observeTransferEvents(pgPools.stats, ieContract, provider),
ONE_HOUR
),
loop(
'Scheduled rewards',
() => observeScheduledRewards(pgPools, ieContract),
24 * ONE_HOUR
),
loop(
'Retrieval result codes',
() => observeRetrievalResultCodes(pgPools.stats, influxQueryApi),
ONE_HOUR
)
])
31 changes: 31 additions & 0 deletions observer/lib/observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,34 @@ export const observeScheduledRewards = async (pgPools, ieContract, fetch = globa
function isEventLog (logOrEventLog) {
return 'args' in logOrEventLog
}

export const observeRetrievalResultCodes = async (pgPoolStats, influxQueryApi) => {
// TODO: The `mean` aggregation will produce slightly wrong numbers, since
// the query is aggregating over relative numbers - with varying measurement
// counts, the relative numbers should be weighted differently. Since the
// measurement count per round should be relatively stable, this should be
// good enough for now. Please pick up and improve this.
// Ref: https://github.com/filecoin-station/spark-stats/pull/244#discussion_r1824808007
const rows = await influxQueryApi.collectRows(`
import "strings"
from(bucket: "spark-evaluate")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "retrieval_stats_honest")
|> filter(fn: (r) => strings.hasPrefix(v: r._field, prefix: "result_rate_"))
|> aggregateWindow(every: 1d, fn: mean, createEmpty: false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will likely produce slightly incorrect data.

My understanding:

  1. spark-evaluate creates a new set of result_rate_* datapoints every round. Each result_rate_* value is a fraction of how many measurements reported that result code.
  2. In your query, you are aggregating these per-round data rates to per-day rates by calculating the mean average of the fractions.

Consider the following case (simplified):

  1. On round 1, we collect 400k measurements, of which 40k report OK. result_rate_OK is 10%.
  2. On round 2, we collect 600k measurements, of which 50k report OK. result_rate_OK is 8.33%.

Mean average: (10+8.33)/2 = 9.165%
Precise value: (40k+50k)/(400k+600k) = 9%

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we are doing in other places:

  • In spark-evaluate, we keep track of both per-result count and total count
  • In spark-stats, we calculate the % rate from those numbers

For example: per-miner OK results:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, the mean over relative values will be off with different measurement count. Since measurement count is constant-ish now, I think this doesn't matter a lot. But it should still be improved.

Do you have a suggestion how to solve this with Influx queries? I assume we'd need to join over the measurement count.

I have added a comment in 818285d, are you ok shipping like this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a suggestion how to solve this with Influx queries? I assume we'd need to join over the measurement count.

I don't think you can solve this at the query level. Maybe you could write a query that multiplies each result_rate_XXX value with measurements (total count) to reconstruct the number of measurements that reported the result code XXX.

IMO, result_rate_XXX points are not the right data source for this feature. We can keep pushing in this direction to see if we can find a fix. It may be better to step back and add a new stats writer to spark-evaluate, as we already do for other stats.

I have added a comment in 818285d, are you ok shipping like this?

What are the ramifications of shipping like this:

  • The chart will show incorrect data. (Hopefully, the inaccuracy is too small for anybody to notice. But we don't know how bad it will be, do we?)
  • We are introducing a technical debt. In the worst-case scenario, we will need to remove the InfluxDB observer and implement something else. Since we are talking about ~200 LOC, I think this is not a big deal.

While I am not excited about shipping it like this, I think it's an acceptable trade-off to make. 👍🏻

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you can solve this at the query level. Maybe you could write a query that multiplies each result_rate_XXX value with measurements (total count) to reconstruct the number of measurements that reported the result code XXX.

Yes this is what I was thinking about. But this join is going to be expensive.

IMO, result_rate_XXX points are not the right data source for this feature. We can keep pushing in this direction to see if we can find a fix. It may be better to step back and add a new stats writer to spark-evaluate, as we already do for other stats.

Yeah post LabWeek we can work on the proper solution for these.

What are the ramifications of shipping like this:

  • The chart will show incorrect data. (Hopefully, the inaccuracy is too small for anybody to notice. But we don't know how bad it will be, do we?)

I propose let's first see how this looks in the dashboard and then decide if it's too wrong to ship.

  • We are introducing a technical debt. In the worst-case scenario, we will need to remove the InfluxDB observer and implement something else. Since we are talking about ~200 LOC, I think this is not a big deal.

The worst case is one revert away so I think this is ok. Also, from a business perspective, if we're asked about why we didn't add this chart that was requested last LabWeek, technical debt is not a good answer. Do you have a realistic alternative proposal?

While I am not excited about shipping it like this, I think it's an acceptable trade-off to make. 👍🏻

I'm at the moment not operating at the level of excitement :D I just want to ship (obviously without shipping something too bad) so we have more useful things to show.

|> keep(columns: ["_value", "_time", "_field"])
|> map(fn: (r) => ({ r with _field: strings.replace(v: r._field, t: "result_rate_", u: "", i: 1) }))
`)
for (const row of rows) {
await pgPoolStats.query(`
INSERT INTO daily_retrieval_result_codes
(day, code, rate)
VALUES ($1, $2, $3)
ON CONFLICT (day, code) DO UPDATE SET rate = EXCLUDED.rate
`, [
row._time,
row._field,
row._value
])
}
}
1 change: 1 addition & 0 deletions observer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"dependencies": {
"@filecoin-station/spark-impact-evaluator": "^1.1.1",
"@filecoin-station/spark-stats-db": "^1.0.0",
"@influxdata/influxdb-client": "^1.35.0",
"@sentry/node": "^8.36.0",
"debug": "^4.3.7",
"ethers": "^6.13.4",
Expand Down
25 changes: 24 additions & 1 deletion observer/test/observer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { beforeEach, describe, it } from 'mocha'
import { getPgPools } from '@filecoin-station/spark-stats-db'
import { givenDailyParticipants } from '@filecoin-station/spark-stats-db/test-helpers.js'

import { observeTransferEvents, observeScheduledRewards } from '../lib/observer.js'
import { observeTransferEvents, observeScheduledRewards, observeRetrievalResultCodes } from '../lib/observer.js'

describe('observer', () => {
let pgPools
Expand Down Expand Up @@ -184,4 +184,27 @@ describe('observer', () => {
}])
})
})

describe('observeRetrievalResultCodes', () => {
beforeEach(async () => {
await pgPools.stats.query('DELETE FROM daily_retrieval_result_codes')
})

it('observes retrieval result codes', async () => {
await observeRetrievalResultCodes(pgPools.stats, {
collectRows: async () => [
{ _time: today(), _field: 'OK', _value: 0.5 },
{ _time: today(), _field: 'CAR_TOO_LARGE', _value: 0.5 }
]
})
const { rows } = await pgPools.stats.query(`
SELECT day::TEXT, code, rate
FROM daily_retrieval_result_codes
`)
assert.deepStrictEqual(rows, [
{ day: today(), code: 'OK', rate: '0.5' },
{ day: today(), code: 'CAR_TOO_LARGE', rate: '0.5' }
])
})
})
})
5 changes: 4 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion stats/lib/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import {
fetchParticipantScheduledRewards,
fetchParticipantRewardTransfers,
fetchRetrievalSuccessRate,
fetchDealSummary
fetchDealSummary,
fetchDailyRetrievalResultCodes
} from './stats-fetchers.js'

import { handlePlatformRoutes } from './platform-routes.js'
Expand Down Expand Up @@ -104,6 +105,8 @@ const handler = async (req, res, pgPools, SPARK_API_BASE_URL) => {
await respond(fetchParticipantRewardTransfers, segs[1])
} else if (req.method === 'GET' && url === '/miners/retrieval-success-rate/summary') {
await respond(fetchMinersRSRSummary)
} else if (req.method === 'GET' && url === '/retrieval-result-codes/daily') {
await respond(fetchDailyRetrievalResultCodes)
} else if (req.method === 'GET' && segs[0] === 'miner' && segs[1] && segs[2] === 'deals' && segs[3] === 'eligible' && segs[4] === 'summary') {
redirectToSparkApi(req, res, SPARK_API_BASE_URL)
} else if (req.method === 'GET' && segs[0] === 'client' && segs[1] && segs[2] === 'deals' && segs[3] === 'eligible' && segs[4] === 'summary') {
Expand Down
20 changes: 20 additions & 0 deletions stats/lib/stats-fetchers.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,23 @@ export const fetchMinersRSRSummary = async (pgPools, filter) => {
}))
return stats
}

export const fetchDailyRetrievalResultCodes = async (pgPools, filter) => {
const { rows } = await pgPools.stats.query(`
SELECT day::TEXT, code, rate
FROM daily_retrieval_result_codes
WHERE day >= $1 AND day <= $2
`, [
filter.from,
filter.to
])
const days = {}
for (const row of rows) {
if (!days[row.day]) {
days[row.day] = {}
}
days[row.day][row.code] = row.rate
}
const stats = Object.entries(days).map(([day, rates]) => ({ day, rates }))
return stats
}
32 changes: 32 additions & 0 deletions stats/test/handler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ describe('HTTP request handler', () => {
await pgPools.evaluate.query('DELETE FROM daily_deals')
await pgPools.stats.query('DELETE FROM daily_scheduled_rewards')
await pgPools.stats.query('DELETE FROM daily_reward_transfers')
await pgPools.stats.query('DELETE FROM daily_retrieval_result_codes')
})

it('returns 200 for GET /', async () => {
Expand Down Expand Up @@ -434,6 +435,37 @@ describe('HTTP request handler', () => {
})
})

describe('GET /retrieval-result-codes/daily', () => {
it('returns daily retrieval result codes for the given date range', async () => {
await pgPools.stats.query(`
INSERT INTO daily_retrieval_result_codes
(day, code, rate)
VALUES
('2024-01-11', 'OK', 0.1),
('2024-01-11', 'CAR_TOO_LARGE', 0.9),
('2024-01-12', 'OK', 1),
('2024-01-13', 'OK', 0.5),
('2024-01-13', 'IPNI_500', 0.5)
`)

const res = await fetch(
new URL(
'/retrieval-result-codes/daily?from=2024-01-11&to=2024-01-13',
baseUrl
), {
redirect: 'manual'
}
)
await assertResponseStatus(res, 200)
const stats = await res.json()
assert.deepStrictEqual(stats, [
{ day: '2024-01-11', rates: { OK: '0.1', CAR_TOO_LARGE: '0.9' } },
{ day: '2024-01-12', rates: { OK: '1' } },
{ day: '2024-01-13', rates: { OK: '0.5', IPNI_500: '0.5' } }
])
})
})

describe('summary of eligible deals', () => {
describe('GET /miner/{id}/deals/eligible/summary', () => {
it('redirects to spark-api', async () => {
Expand Down