Skip to content

Commit 8be1329

Browse files
committed
Distributed rate limit, fix search panic, add migration task
1 parent 5fbf5b2 commit 8be1329

File tree

9 files changed

+266
-290
lines changed

9 files changed

+266
-290
lines changed

Cargo.lock

Lines changed: 0 additions & 66 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/frontend/.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
BASE_URL=https://api.modrinth.com/v2/
22
BROWSER_BASE_URL=https://api.modrinth.com/v2/
3+
PYRO_BASE_URL=https://archon.modrinth.com/

apps/labrinth/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@ actix-ws = "0.3.0"
1919
actix-files = "0.6.5"
2020
prometheus = "0.13.4"
2121
actix-web-prom = { version = "0.9.0", features = ["process"] }
22-
governor = "0.6.3"
2322

2423
tracing = "0.1.41"
2524
tracing-subscriber = "0.3.19"
2625
tracing-actix-web = "0.7.16"
2726
console-subscriber = "0.4.1"
2827

29-
tokio = { version = "1.35.1", features = ["sync"] }
28+
tokio = { version = "1.35.1", features = ["sync", "rt-multi-thread"] }
3029
tokio-stream = "0.1.14"
3130

3231
futures = "0.3.30"

apps/labrinth/src/background_task.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::database::redis::RedisPool;
22
use crate::queue::payouts::process_payout;
3-
use crate::search;
43
use crate::search::indexing::index_projects;
4+
use crate::{database, search};
55
use clap::ValueEnum;
66
use sqlx::Postgres;
77
use tracing::{info, warn};
@@ -15,6 +15,7 @@ pub enum BackgroundTask {
1515
Payouts,
1616
IndexBilling,
1717
IndexSubscriptions,
18+
Migrations,
1819
}
1920

2021
impl BackgroundTask {
@@ -28,6 +29,7 @@ impl BackgroundTask {
2829
) {
2930
use BackgroundTask::*;
3031
match self {
32+
Migrations => run_migrations().await,
3133
IndexSearch => index_search(pool, redis_pool, search_config).await,
3234
ReleaseScheduled => release_scheduled(pool).await,
3335
UpdateVersions => update_versions(pool, redis_pool).await,
@@ -50,6 +52,12 @@ impl BackgroundTask {
5052
}
5153
}
5254

55+
pub async fn run_migrations() {
56+
database::check_for_migrations()
57+
.await
58+
.expect("An error occurred while running migrations.");
59+
}
60+
5361
pub async fn index_search(
5462
pool: sqlx::Pool<Postgres>,
5563
redis_pool: RedisPool,

apps/labrinth/src/lib.rs

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::num::NonZeroU32;
21
use std::sync::Arc;
32
use std::time::Duration;
43

@@ -13,14 +12,12 @@ use tracing::{info, warn};
1312

1413
extern crate clickhouse as clickhouse_crate;
1514
use clickhouse_crate::Client;
16-
use governor::middleware::StateInformationMiddleware;
17-
use governor::{Quota, RateLimiter};
1815
use util::cors::default_cors;
1916

2017
use crate::background_task::update_versions;
2118
use crate::queue::moderation::AutomatedModerationQueue;
2219
use crate::util::env::{parse_strings_from_var, parse_var};
23-
use crate::util::ratelimit::KeyedRateLimiter;
20+
use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters};
2421
use sync::friends::handle_pubsub;
2522

2623
pub mod auth;
@@ -57,7 +54,7 @@ pub struct LabrinthConfig {
5754
pub analytics_queue: Arc<AnalyticsQueue>,
5855
pub active_sockets: web::Data<ActiveSockets>,
5956
pub automated_moderation_queue: web::Data<AutomatedModerationQueue>,
60-
pub rate_limiter: KeyedRateLimiter,
57+
pub rate_limiter: web::Data<AsyncRateLimiter>,
6158
pub stripe_client: stripe::Client,
6259
}
6360

@@ -93,24 +90,10 @@ pub fn app_setup(
9390

9491
let mut scheduler = scheduler::Scheduler::new();
9592

96-
let limiter: KeyedRateLimiter = Arc::new(
97-
RateLimiter::keyed(Quota::per_minute(NonZeroU32::new(300).unwrap()))
98-
.with_middleware::<StateInformationMiddleware>(),
99-
);
100-
let limiter_clone = Arc::clone(&limiter);
101-
scheduler.run(Duration::from_secs(60), move || {
102-
info!(
103-
"Clearing ratelimiter, storage size: {}",
104-
limiter_clone.len()
105-
);
106-
limiter_clone.retain_recent();
107-
info!(
108-
"Done clearing ratelimiter, storage size: {}",
109-
limiter_clone.len()
110-
);
111-
112-
async move {}
113-
});
93+
let limiter = web::Data::new(AsyncRateLimiter::new(
94+
redis_pool.clone(),
95+
GCRAParameters::new(300, 300),
96+
));
11497

11598
if enable_background_tasks {
11699
// The interval in seconds at which the local database is indexed
@@ -329,6 +312,7 @@ pub fn app_config(
329312
.app_data(labrinth_config.active_sockets.clone())
330313
.app_data(labrinth_config.automated_moderation_queue.clone())
331314
.app_data(web::Data::new(labrinth_config.stripe_client.clone()))
315+
.app_data(labrinth_config.rate_limiter.clone())
332316
.configure(
333317
#[allow(unused_variables)]
334318
|cfg| {

apps/labrinth/src/main.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
use actix_web::middleware::from_fn;
12
use actix_web::{App, HttpServer};
23
use actix_web_prom::PrometheusMetricsBuilder;
34
use clap::Parser;
45
use labrinth::background_task::BackgroundTask;
56
use labrinth::database::redis::RedisPool;
67
use labrinth::file_hosting::S3Host;
78
use labrinth::search;
8-
use labrinth::util::ratelimit::RateLimit;
9+
use labrinth::util::ratelimit::rate_limit_middleware;
910
use labrinth::{check_env_vars, clickhouse, database, file_hosting, queue};
1011
use std::sync::Arc;
1112
use tracing::{error, info};
@@ -33,6 +34,10 @@ struct Args {
3334
#[arg(long)]
3435
no_background_tasks: bool,
3536

37+
/// Don't automatically run migrations. This means the migrations should be run via --run-background-task.
38+
#[arg(long)]
39+
no_migrations: bool,
40+
3641
/// Run a single background task and then exit. Perfect for cron jobs.
3742
#[arg(long, value_enum, id = "task")]
3843
run_background_task: Option<BackgroundTask>,
@@ -67,9 +72,11 @@ async fn main() -> std::io::Result<()> {
6772
dotenvy::var("BIND_ADDR").unwrap()
6873
);
6974

70-
database::check_for_migrations()
71-
.await
72-
.expect("An error occurred while running migrations.");
75+
if !args.no_migrations {
76+
database::check_for_migrations()
77+
.await
78+
.expect("An error occurred while running migrations.");
79+
}
7380
}
7481

7582
// Database Connector
@@ -164,7 +171,7 @@ async fn main() -> std::io::Result<()> {
164171
App::new()
165172
.wrap(TracingLogger::default())
166173
.wrap(prometheus.clone())
167-
.wrap(RateLimit(Arc::clone(&labrinth_config.rate_limiter)))
174+
.wrap(from_fn(rate_limit_middleware))
168175
.wrap(actix_web::middleware::Compress::default())
169176
.wrap(sentry_actix::Sentry::new())
170177
.configure(|cfg| labrinth::app_config(cfg, labrinth_config.clone()))

apps/labrinth/src/routes/internal/statuses.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,30 +91,30 @@ pub async fn ws_init(
9191
let friend_statuses = if !friends.is_empty() {
9292
let db = db.clone();
9393
let redis = redis.clone();
94-
tokio_stream::iter(friends.iter())
94+
95+
let statuses = tokio_stream::iter(friends.iter())
9596
.map(|x| {
9697
let db = db.clone();
9798
let redis = redis.clone();
9899
async move {
99-
async move {
100-
get_user_status(
101-
if x.user_id == user_id.into() {
102-
x.friend_id
103-
} else {
104-
x.user_id
105-
}
106-
.into(),
107-
&db,
108-
&redis,
109-
)
110-
.await
111-
}
100+
get_user_status(
101+
if x.user_id == user_id.into() {
102+
x.friend_id
103+
} else {
104+
x.user_id
105+
}
106+
.into(),
107+
&db,
108+
&redis,
109+
)
110+
.await
112111
}
113112
})
114113
.buffer_unordered(16)
115-
.filter_map(|x| x)
116114
.collect::<Vec<_>>()
117-
.await
115+
.await;
116+
117+
statuses.into_iter().flatten().collect()
118118
} else {
119119
Vec::new()
120120
};

apps/labrinth/src/search/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,9 @@ pub async fn search_for_project(
209209
let mut filter_string = String::new();
210210

211211
// Convert offset and limit to page and hits_per_page
212-
let hits_per_page = limit;
213-
let page = offset / limit + 1;
212+
let hits_per_page = if limit == 0 { 1 } else { limit };
213+
214+
let page = offset / hits_per_page + 1;
214215

215216
let results = {
216217
let mut query = meilisearch_index.search();

0 commit comments

Comments
 (0)