Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions .github/workflows/build_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: Cargo Build & Test

on:
push:
pull_request:

env:
CARGO_TERM_COLOR: always

jobs:
build_and_test:
name: etcd_fdw Build and Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Cache build/deps
uses: actions/cache@v4
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: cargo-${{ hashFiles('**/Cargo.lock') }}

- name: Cache pgrx
uses: actions/cache@v4
with:
path: |
~/.pgrx/
key: pgrx-0.16.0

- name: Install latest stable toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt, clippy
- run: sudo apt install build-essential bison flex clang protobuf-compiler libreadline8 libreadline-dev -y
- run: cargo install cargo-pgrx --version 0.16.0
- run: cargo pgrx init
- run: cargo build --verbose
- run: cargo test --verbose
- run: cargo pgrx test --verbose


11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@ pg14 = ["pgrx/pg14", "pgrx-tests/pg14", "supabase-wrappers/pg14"]
pg15 = ["pgrx/pg15", "pgrx-tests/pg15", "supabase-wrappers/pg15"]
pg16 = ["pgrx/pg16", "pgrx-tests/pg16", "supabase-wrappers/pg16"]
pg17 = ["pgrx/pg17", "pgrx-tests/pg17", "supabase-wrappers/pg17"]
pg18 = ["pgrx/pg18", "pgrx-tests/pg18", "supabase-wrappers/pg18"]
pg_test = []

[dependencies]
etcd-client = "0.16"
futures = "0.3.31"
pgrx = {version="=0.14.3"}
supabase-wrappers = {version="0.1.23", default-features = false}
pgrx = {version="=0.16.0"}
supabase-wrappers = {git="https://github.com/supabase/wrappers.git", branch="main",default-features = false}
thiserror = "2.0.16"
tokio = { version = "1.47.1", features = ["full"] }
testcontainers = { version = "0.25.0", features = ["blocking"] }
serde = { version = "1.0.226", features = ["derive"] }

[dev-dependencies]
pgrx-tests = "=0.14.3"
pgrx-tests = "=0.16.0"
testcontainers = { version = "0.25.0", features = ["blocking"] }
serde = { version = "1.0.226", features = ["derive"] }

[profile.dev]
panic = "unwind"
Expand Down
135 changes: 135 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use etcd_client::{Client, DeleteOptions, GetOptions, KeyValue, PutOptions};
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::PgSqlErrorCode;
use pgrx::*;
use supabase_wrappers::prelude::*;
use thiserror::Error;

Expand Down Expand Up @@ -279,3 +280,137 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
Ok(())
}
}

#[cfg(test)]
pub mod pg_test {

pub fn setup(_options: Vec<&str>) {
// perform one-off initialization when the pg_test framework starts
}

pub fn postgresql_conf_options() -> Vec<&'static str> {
// return any postgresql.conf settings that are required for your tests
vec![]
}
}

#[pg_schema]
#[cfg(any(test, feature = "pg_test"))]
mod tests {
use std::time::Duration;

use super::*;
use testcontainers::{
core::{IntoContainerPort, WaitFor},
runners::SyncRunner,
Container, GenericImage, ImageExt,
};

const CMD: [&'static str; 5] = [
"/usr/local/bin/etcd",
"--listen-client-urls",
"http://0.0.0.0:2379",
"--advertise-client-urls",
"http://0.0.0.0:2379",
];

fn create_container() -> (Container<GenericImage>, String) {
let container = GenericImage::new("quay.io/coreos/etcd", "v3.6.4")
.with_exposed_port(2379.tcp())
.with_wait_for(WaitFor::message_on_either_std(
"ready to serve client requests",
))
.with_privileged(true)
.with_cmd(CMD)
.with_startup_timeout(Duration::from_secs(90))
.start()
.expect("An etcd image was supposed to be started");

let host = container
.get_host()
.expect("Host-address should be available");

let port = container
.get_host_port_ipv4(2379.tcp())
.expect("Exposed host port should be available");

let url = format!("{}:{}", host, port);
(container, url)
}

fn create_fdt(url: String) -> () {
Spi::run("CREATE FOREIGN DATA WRAPPER etcd_fdw handler etcd_fdw_handler validator etcd_fdw_validator;").expect("FDW should have been created");

// Create a server
Spi::run(
format!(
"CREATE SERVER etcd_test_server FOREIGN DATA WRAPPER etcd_fdw options(connstr '{}')",
url
)
.as_str(),
)
.expect("Server should have been created");

// Create a foreign table
Spi::run("CREATE FOREIGN TABLE test (key text, value text) server etcd_test_server options (rowid_column 'key')").expect("Test table should have been created");
}

#[pg_test]
fn test_create_table() {
let (_container, url) = create_container();

create_fdt(url);
}
#[pg_test]
fn test_insert_select() {
let (_container, url) = create_container();

create_fdt(url);

// Insert into the foreign table
Spi::run("INSERT INTO test (key, value) VALUES ('foo','bar'),('bar','baz')")
.expect("INSERT should work");

let query_result = Spi::get_two::<String, String>("SELECT * FROM test WHERE key='foo'")
.expect("Select should work");

assert_eq!((Some(format!("foo")), Some(format!("bar"))), query_result);
let query_result = Spi::get_two::<String, String>("SELECT * FROM test WHERE key='bar'")
.expect("SELECT should work");

assert_eq!((Some(format!("bar")), Some(format!("baz"))), query_result);
}

#[pg_test]
fn test_update() {
let (_container, url) = create_container();

create_fdt(url);

Spi::run("INSERT INTO test (key, value) VALUES ('foo','bar'),('bar','baz')")
.expect("INSERT should work");

Spi::run("UPDATE test SET value='test_successful'").expect("UPDATE should work");

let query_result =
Spi::get_one::<String>("SELECT value FROM test;").expect("SELECT should work");

assert_eq!(Some(format!("test_successful")), query_result);
}

#[pg_test]
fn test_delete() {
let (_container, url) = create_container();

create_fdt(url);

Spi::run("INSERT INTO test (key, value) VALUES ('foo','bar'),('bar','baz')")
.expect("INSERT should work");

Spi::run("DELETE FROM test").expect("DELETE should work");

let query_result = Spi::get_one::<String>("SELECT value FROM test;");

assert_eq!(Err(spi::SpiError::InvalidPosition), query_result);
}
}
Loading