Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ diesel = { git = "https://github.com/juspay/diesel.git", branch = "dynamic-schem
"chrono",
"uuid",
"postgres_backend",
"i-implement-a-third-party-backend-and-opt-into-breaking-changes"
] }
fred = { version = "9.2.1" }
futures-util = "0.3.28"
Expand Down
19 changes: 6 additions & 13 deletions crates/context_aware_config/src/api/default_config/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHel
use jsonschema::{Draft, JSONSchema, ValidationError};
use serde_json::Value;
use service_utils::{
db::types::ConnectionImpl,
helpers::{parse_config_tags, validation_err_to_str},
service::types::{AppHeader, AppState, CustomHeaders, DbConnection, SchemaName},
};
Expand Down Expand Up @@ -317,38 +318,30 @@ fn fetch_default_key(

#[get("")]
async fn get(
db_conn: DbConnection,
mut db_conn: ConnectionImpl,
filters: Query<PaginationParams>,
schema_name: SchemaName,
) -> superposition::Result<Json<PaginatedResponse<DefaultConfig>>> {
let DbConnection(mut conn) = db_conn;

if let Some(true) = filters.all {
let result: Vec<DefaultConfig> = dsl::default_configs
.schema_name(&schema_name)
.get_results(&mut conn)?;
let result: Vec<DefaultConfig> =
dsl::default_configs.get_results(&mut db_conn)?;
return Ok(Json(PaginatedResponse {
total_pages: 1,
total_items: result.len() as i64,
data: result,
}));
}

let n_default_configs: i64 = dsl::default_configs
.count()
.schema_name(&schema_name)
.get_result(&mut conn)?;
let n_default_configs: i64 = dsl::default_configs.count().get_result(&mut db_conn)?;
let limit = filters.count.unwrap_or(10);
let mut builder = dsl::default_configs
.order(dsl::created_at.desc())
.limit(limit)
.schema_name(&schema_name)
.into_boxed();
if let Some(page) = filters.page {
let offset = (page - 1) * limit;
builder = builder.offset(offset);
}
let result: Vec<DefaultConfig> = builder.load(&mut conn)?;
let result: Vec<DefaultConfig> = builder.load(&mut db_conn)?;
let total_pages = (n_default_configs as f64 / limit as f64).ceil() as i64;
Ok(Json(PaginatedResponse {
total_pages,
Expand Down
1 change: 1 addition & 0 deletions crates/service_utils/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ use diesel::{
};

pub mod utils;
pub mod types;

pub type PgSchemaConnectionPool = Pool<ConnectionManager<PgConnection>>;
219 changes: 219 additions & 0 deletions crates/service_utils/src/db/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
use actix_web::web::Data;
use actix_web::{FromRequest, HttpMessage};
use derive_more::{Deref, DerefMut};
use diesel::connection::{
AnsiTransactionManager, Connection, ConnectionSealed, DefaultLoadingMode,
LoadConnection, SimpleConnection, TransactionManager,
};
use diesel::pg::{Pg, PgQueryBuilder};
use diesel::r2d2::{ConnectionManager, PooledConnection};
use diesel::PgConnection;
use diesel::RunQueryDsl;

use crate::service::types::{AppState, SchemaName};

pub struct TransactionManagerImpl;

impl TransactionManager<ConnectionImpl> for TransactionManagerImpl {
type TransactionStateData = <AnsiTransactionManager as TransactionManager<
PgConnection,
>>::TransactionStateData;

fn begin_transaction(conn: &mut ConnectionImpl) -> diesel::prelude::QueryResult<()> {
AnsiTransactionManager::begin_transaction(&mut *conn.conn)?;
let result = diesel::sql_query("SELECT set_config('search_path', $1, true)")
.bind::<diesel::sql_types::Text, _>(&conn.namespace)
.execute(&mut *conn.conn)?;
log::info!("{:?}", result);
Ok(())
}

fn rollback_transaction(
conn: &mut ConnectionImpl,
) -> diesel::prelude::QueryResult<()> {
AnsiTransactionManager::rollback_transaction(&mut *conn.conn)
}

fn commit_transaction(conn: &mut ConnectionImpl) -> diesel::prelude::QueryResult<()> {
AnsiTransactionManager::commit_transaction(&mut *conn.conn)
}

fn transaction_manager_status_mut(
conn: &mut ConnectionImpl,
) -> &mut diesel::connection::TransactionManagerStatus {
AnsiTransactionManager::transaction_manager_status_mut(&mut *conn.conn)
}
}

pub struct ConnectionImpl {
namespace: String,
conn: PooledConnection<ConnectionManager<PgConnection>>,
}

impl ConnectionImpl {
pub fn new(
namespace: String,
mut conn: PooledConnection<ConnectionManager<PgConnection>>,
) -> Self {
conn.set_prepared_statement_cache_size(diesel::connection::CacheSize::Disabled);
ConnectionImpl { namespace, conn }
}

pub fn set_namespace(&mut self, namespace: String) {
self.namespace = namespace;
}

pub fn from_request_override(
req: &actix_web::HttpRequest,
schema_name: String,
) -> Result<Self, actix_web::Error> {
let app_state = match req.app_data::<Data<AppState>>() {
Some(state) => state,
None => {
log::info!(
"DbConnection-FromRequest: Unable to get app_data from request"
);
return Err(actix_web::error::ErrorInternalServerError(""));
}
};

match app_state.db_pool.get() {
Ok(conn) => Ok(ConnectionImpl::new(schema_name, conn)),
Err(e) => {
log::info!("Unable to get db connection from pool, error: {e}");
Err(actix_web::error::ErrorInternalServerError(""))
}
}
}
}

impl ConnectionSealed for ConnectionImpl {}

impl SimpleConnection for ConnectionImpl {
fn batch_execute(&mut self, query: &str) -> diesel::prelude::QueryResult<()> {
self.conn.batch_execute(query)
}
}

impl Connection for ConnectionImpl {
type Backend = Pg;
type TransactionManager = TransactionManagerImpl;

// NOTE: this function will never be used, so namespace here doesn't matter
fn establish(database_url: &str) -> diesel::prelude::ConnectionResult<Self> {
let conn = PooledConnection::establish(database_url)?;
Ok(ConnectionImpl {
namespace: String::new(),
conn,
})
}

fn execute_returning_count<T>(
&mut self,
source: &T,
) -> diesel::prelude::QueryResult<usize>
where
T: diesel::query_builder::QueryFragment<Self::Backend>
+ diesel::query_builder::QueryId,
{
log::info!("{:?}", source.to_sql(&mut PgQueryBuilder::default(), &Pg));
self.transaction::<usize, diesel::result::Error, _>(|conn| {
(*conn.conn).execute_returning_count(source)
})
}

fn transaction_state(&mut self,) -> &mut<Self::TransactionManager as diesel::connection::TransactionManager<Self>>::TransactionStateData{
self.conn.transaction_state()
}

fn set_prepared_statement_cache_size(&mut self, size: diesel::connection::CacheSize) {
self.conn.set_prepared_statement_cache_size(size)
}

fn set_instrumentation(
&mut self,
instrumentation: impl diesel::connection::Instrumentation,
) {
self.conn.set_instrumentation(instrumentation)
}

fn instrumentation(&mut self) -> &mut dyn diesel::connection::Instrumentation {
self.conn.instrumentation()
}
}

impl LoadConnection<DefaultLoadingMode> for ConnectionImpl {
type Cursor<'conn, 'query> =
<PgConnection as LoadConnection<DefaultLoadingMode>>::Cursor<'conn, 'query>;
type Row<'conn, 'query> =
<PgConnection as LoadConnection<DefaultLoadingMode>>::Row<'conn, 'query>;

fn load<'conn, 'query, T>(
&'conn mut self,
source: T,
) -> diesel::prelude::QueryResult<Self::Cursor<'conn, 'query>>
where
T: diesel::query_builder::Query
+ diesel::query_builder::QueryFragment<Self::Backend>
+ diesel::query_builder::QueryId
+ 'query,
Self::Backend: diesel::expression::QueryMetadata<T::SqlType>,
{
self.transaction::<Self::Cursor<'conn, 'query>, diesel::result::Error, _>(
|conn| {
log::info!("{:?}", source.to_sql(&mut PgQueryBuilder::default(), &Pg));
<PgConnection as LoadConnection<DefaultLoadingMode>>::load::<T>(
&mut *conn.conn,
source,
)
},
)
}
}

impl FromRequest for ConnectionImpl {
type Error = actix_web::Error;
type Future = std::future::Ready<Result<ConnectionImpl, Self::Error>>;

fn from_request(
req: &actix_web::HttpRequest,
_: &mut actix_web::dev::Payload,
) -> Self::Future {
let schema_name = req.extensions().get::<SchemaName>().cloned().unwrap().0;
std::future::ready(ConnectionImpl::from_request_override(req, schema_name))
}
}

#[derive(Deref, DerefMut)]
pub struct PublicConnection(pub ConnectionImpl);
impl FromRequest for PublicConnection {
type Error = actix_web::Error;
type Future = std::future::Ready<Result<PublicConnection, Self::Error>>;

fn from_request(
req: &actix_web::HttpRequest,
_: &mut actix_web::dev::Payload,
) -> Self::Future {
std::future::ready(
ConnectionImpl::from_request_override(req, String::from("public"))
.map(|conn| PublicConnection(conn)),
)
}
}

#[derive(Deref, DerefMut)]
pub struct SuperpositionConnection(pub ConnectionImpl);
impl FromRequest for SuperpositionConnection {
type Error = actix_web::Error;
type Future = std::future::Ready<Result<SuperpositionConnection, Self::Error>>;

fn from_request(
req: &actix_web::HttpRequest,
_: &mut actix_web::dev::Payload,
) -> Self::Future {
std::future::ready(
ConnectionImpl::from_request_override(req, String::from("superposition"))
.map(|conn| SuperpositionConnection(conn)),
)
}
}
3 changes: 0 additions & 3 deletions crates/service_utils/src/service/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ pub struct DbConnection(pub PooledConnection<ConnectionManager<PgConnection>>);
impl FromRequest for DbConnection {
type Error = Error;
type Future = Ready<Result<DbConnection, Self::Error>>;

fn from_request(
req: &actix_web::HttpRequest,
_: &mut actix_web::dev::Payload,
Expand All @@ -185,15 +184,13 @@ impl FromRequest for DbConnection {
return ready(Err(error::ErrorInternalServerError("")));
}
};

let result = match app_state.db_pool.get() {
Ok(conn) => Ok(DbConnection(conn)),
Err(e) => {
log::info!("Unable to get db connection from pool, error: {e}");
Err(error::ErrorInternalServerError(""))
}
};

ready(result)
}
}
Expand Down
Loading