Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ jobs:
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/spark_stats
EVALUATE_DB_URL: postgres://postgres:postgres@localhost:5432/spark_evaluate
API_DB_URL: postgres://postgres:postgres@localhost:5432/spark
NPM_CONFIG_WORKSPACE: stats
steps:
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark_evaluate"
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark"
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
Expand Down Expand Up @@ -57,11 +55,9 @@ jobs:
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/spark_stats
EVALUATE_DB_URL: postgres://postgres:postgres@localhost:5432/spark_evaluate
API_DB_URL: postgres://postgres:postgres@localhost:5432/spark
NPM_CONFIG_WORKSPACE: observer
steps:
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark_evaluate"
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark"
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
Expand Down Expand Up @@ -100,11 +96,9 @@ jobs:
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/spark_stats
EVALUATE_DB_URL: postgres://postgres:postgres@localhost:5432/spark_evaluate
API_DB_URL: postgres://postgres:postgres@localhost:5432/spark
NPM_CONFIG_WORKSPACE: observer
steps:
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark_evaluate"
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark"
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
Expand Down
28 changes: 3 additions & 25 deletions db/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { migrateWithPgClient as migrateEvaluateDB } from 'spark-evaluate/lib/migrate.js'
import { migrate as migrateApiDB } from 'spark-api/migrations/index.js'
import pg from 'pg'
import { dirname, join } from 'node:path'
import { fileURLToPath } from 'node:url'
Expand All @@ -9,18 +8,14 @@ import Postgrator from 'postgrator'
/** @typedef {import('./typings.js').PgPools} PgPools */
/** @typedef {import('./typings.js').PgPoolStats} PgPoolStats */
/** @typedef {import('./typings.js').PgPoolEvaluate} PgPoolEvaluate */
/** @typedef {import('./typings.js').PgPoolApi} PgPoolApi */
/** @typedef {import('./typings.js').Queryable} Queryable */

export { migrateEvaluateDB, migrateApiDB }
export { migrateEvaluateDB }

const {
// DATABASE_URL points to `spark_stats` database managed by this monorepo
DATABASE_URL = 'postgres://localhost:5432/spark_stats',

// API_DB_URL points to `spark` database managed by spark-api repo.
API_DB_URL = 'postgres://localhost:5432/spark',

// EVALUATE_DB_URL points to `spark_evaluate` database managed by spark-evaluate repo.
// Eventually, we should move the code updating stats from spark-evaluate to this repo
// and then we won't need two connection strings.
Expand Down Expand Up @@ -84,32 +79,15 @@ export const getEvaluatePgPool = async () => {
return evaluate
}

/**
* @returns {Promise<PgPoolApi>}
*/
export const getApiPgPool = async () => {
const stats = Object.assign(
new pg.Pool({
...poolConfig,
connectionString: API_DB_URL
}),
/** @type {const} */({ db: 'api' })
)
stats.on('error', onError)
await stats.query('SELECT 1')
return stats
}

/**
* @returns {Promise<PgPools>}
*/
export const getPgPools = async () => {
const stats = await getStatsPgPool()
const evaluate = await getEvaluatePgPool()
const api = await getApiPgPool()
const end = async () => { await Promise.all([stats.end(), evaluate.end(), api.end()]) }
const end = async () => { await Promise.all([stats.end(), evaluate.end()]) }

return { stats, evaluate, api, end }
return { stats, evaluate, end }
}

/**
Expand Down
1 change: 0 additions & 1 deletion db/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"dependencies": {
"pg": "^8.12.0",
"postgrator": "^7.2.0",
"spark-api": "https://github.com/filecoin-station/spark-api/archive/7075fb55b253d48d5d5eb4846f13a3f688d80437.tar.gz",
"spark-evaluate": "filecoin-station/spark-evaluate#main"
},
"standard": {
Expand Down
6 changes: 0 additions & 6 deletions db/typings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,13 @@ export interface PgPoolStats extends pg.Pool {
db: 'stats'
}

export interface PgPoolApi extends pg.Pool {
db: 'api'
}

export type PgPool =
| PgPoolEvaluate
| PgPoolStats
| PgPoolApi

export interface PgPools {
stats: PgPoolStats;
evaluate: PgPoolEvaluate;
api: PgPoolApi;
end(): Promise<void>
}

Expand Down
10 changes: 0 additions & 10 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions stats/bin/migrate.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import {
getPgPools,
migrateApiDB,
migrateEvaluateDB,
migrateStatsDB
} from '@filecoin-station/spark-stats-db'

const pgPools = await getPgPools()
await migrateStatsDB(pgPools.stats)
await migrateEvaluateDB(pgPools.evaluate)
await migrateApiDB(pgPools.api)
3 changes: 2 additions & 1 deletion stats/bin/spark-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { getPgPools } from '@filecoin-station/spark-stats-db'
const {
PORT = '8080',
HOST = '127.0.0.1',
SPARK_API_BASE_URL = 'https://api.filspark.com/',
REQUEST_LOGGING = 'true'
} = process.env

Expand All @@ -17,7 +18,7 @@ const logger = {
request: ['1', 'true'].includes(REQUEST_LOGGING) ? console.info : () => {}
}

const handler = createHandler({ pgPools, logger })
const handler = createHandler({ SPARK_API_BASE_URL, pgPools, logger })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this PR, SPARK_API_BASE_URL is never overridden, so it should be removed from args and added to config instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to make this configurable via env vars so we can run spark-stats locally with redirects to the local spark-api instance. With your proposal, I would have to edit the local config file and then remember to discard the changes before committing to git.

In that light, do you still prefer to move this to the config file? I don't have a strong opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I didn't see that possibility. I never run these services in dev 😅

const server = http.createServer(handler)
console.log('Starting the http server on host %j port %s', HOST, PORT)
server.listen(Number(PORT), HOST)
Expand Down
100 changes: 17 additions & 83 deletions stats/lib/handler.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as Sentry from '@sentry/node'
import { json } from 'http-responders'

import { getStatsWithFilterAndCaching } from './request-helpers.js'

Expand All @@ -22,18 +21,20 @@ import { handlePlatformRoutes } from './platform-routes.js'

/**
* @param {object} args
* @param {string} args.SPARK_API_BASE_URL
* @param {import('@filecoin-station/spark-stats-db').PgPools} args.pgPools
* @param {import('./typings.d.ts').Logger} args.logger
* @returns
*/
export const createHandler = ({
SPARK_API_BASE_URL,
pgPools,
logger
}) => {
return (req, res) => {
const start = Date.now()
logger.request(`${req.method} ${req.url} ...`)
handler(req, res, pgPools)
handler(req, res, pgPools, SPARK_API_BASE_URL)
.catch(err => errorHandler(res, err, logger))
.then(() => {
logger.request(`${req.method} ${req.url} ${res.statusCode} (${Date.now() - start}ms)`)
Expand Down Expand Up @@ -73,8 +74,9 @@ const createRespondWithFetchFn =
* @param {import('node:http').IncomingMessage} req
* @param {import('node:http').ServerResponse} res
* @param {import('@filecoin-station/spark-stats-db').PgPools} pgPools
* @param {string} SPARK_API_BASE_URL
*/
const handler = async (req, res, pgPools) => {
const handler = async (req, res, pgPools, SPARK_API_BASE_URL) => {
// Caveat! `new URL('//foo', 'http://127.0.0.1')` would produce "http://foo/" - not what we want!
const { pathname, searchParams } = new URL(`http://127.0.0.1${req.url}`)
const segs = pathname.split('/').filter(Boolean)
Expand Down Expand Up @@ -102,11 +104,11 @@ const handler = async (req, res, pgPools) => {
} else if (req.method === 'GET' && url === '/miners/retrieval-success-rate/summary') {
await respond(fetchMinersRSRSummary)
} else if (req.method === 'GET' && segs[0] === 'miner' && segs[1] && segs[2] === 'deals' && segs[3] === 'eligible' && segs[4] === 'summary') {
await getRetrievableDealsForMiner(req, res, pgPools.api, segs[1])
await redirectToSparkApi(req, res, SPARK_API_BASE_URL)
} else if (req.method === 'GET' && segs[0] === 'client' && segs[1] && segs[2] === 'deals' && segs[3] === 'eligible' && segs[4] === 'summary') {
await getRetrievableDealsForClient(req, res, pgPools.api, segs[1])
await redirectToSparkApi(req, res, SPARK_API_BASE_URL)
} else if (req.method === 'GET' && segs[0] === 'allocator' && segs[1] && segs[2] === 'deals' && segs[3] === 'eligible' && segs[4] === 'summary') {
await getRetrievableDealsForAllocator(req, res, pgPools.api, segs[1])
await redirectToSparkApi(req, res, SPARK_API_BASE_URL)
} else if (await handlePlatformRoutes(req, res, pgPools)) {
// no-op, request was handled by handlePlatformRoute
} else if (req.method === 'GET' && url === '/') {
Expand Down Expand Up @@ -141,86 +143,18 @@ const notFound = (res) => {
}

/**
* @param {import('node:http').IncomingMessage} _req
* @param {import('node:http').IncomingMessage} req
* @param {import('node:http').ServerResponse} res
* @param {PgPools['api']} client
* @param {string} minerId
* @param {string} SPARK_API_BASE_URL
*/
const getRetrievableDealsForMiner = async (_req, res, client, minerId) => {
/** @type {{rows: {client_id: string; deal_count: number}[]}} */
const { rows } = await client.query(`
SELECT client_id, COUNT(cid)::INTEGER as deal_count FROM retrievable_deals
WHERE miner_id = $1 AND expires_at > now()
GROUP BY client_id
ORDER BY deal_count DESC, client_id ASC
`, [
minerId
])

// Cache the response for 6 hours
res.setHeader('cache-control', `max-age=${6 * 3600}`)

const body = {
minerId,
dealCount: rows.reduce((sum, row) => sum + row.deal_count, 0),
clients:
rows.map(
// eslint-disable-next-line camelcase
({ client_id, deal_count }) => ({ clientId: client_id, dealCount: deal_count })
)
}

json(res, body)
}

const getRetrievableDealsForClient = async (_req, res, client, clientId) => {
/** @type {{rows: {miner_id: string; deal_count: number}[]}} */
const { rows } = await client.query(`
SELECT miner_id, COUNT(cid)::INTEGER as deal_count FROM retrievable_deals
WHERE client_id = $1 AND expires_at > now()
GROUP BY miner_id
ORDER BY deal_count DESC, miner_id ASC
`, [
clientId
])

const redirectToSparkApi = async (req, res, SPARK_API_BASE_URL) => {
console.log('SPARK_API_BASE_URL', SPARK_API_BASE_URL)
console.log('req.url', req.url)
// Cache the response for 6 hours
res.setHeader('cache-control', `max-age=${6 * 3600}`)

const body = {
clientId,
dealCount: rows.reduce((sum, row) => sum + row.deal_count, 0),
providers: rows.map(
// eslint-disable-next-line camelcase
({ miner_id, deal_count }) => ({ minerId: miner_id, dealCount: deal_count })
)
}
json(res, body)
}

const getRetrievableDealsForAllocator = async (_req, res, client, allocatorId) => {
/** @type {{rows: {client_id: string; deal_count: number}[]}} */
const { rows } = await client.query(`
SELECT ac.client_id, COUNT(cid)::INTEGER as deal_count
FROM allocator_clients ac
LEFT JOIN retrievable_deals rd ON ac.client_id = rd.client_id
WHERE ac.allocator_id = $1 AND expires_at > now()
GROUP BY ac.client_id
ORDER BY deal_count DESC, ac.client_id ASC
`, [
allocatorId
])

// Cache the response for 6 hours
res.setHeader('cache-control', `max-age=${6 * 3600}`)

const body = {
allocatorId,
dealCount: rows.reduce((sum, row) => sum + row.deal_count, 0),
clients: rows.map(
// eslint-disable-next-line camelcase
({ client_id, deal_count }) => ({ clientId: client_id, dealCount: deal_count })
)
}
json(res, body)
const location = new URL(req.url, SPARK_API_BASE_URL).toString()
res.setHeader('location', location)
res.statusCode = 302
res.end(location)
}
Loading
Loading