Skip to content

Commit bb04329

Browse files
committed
feat: watcher subscribes to data source for changes
1 parent 7405ca3 commit bb04329

24 files changed

+1648
-1
lines changed

.github/workflows/ci.yml

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
name: ci
2+
3+
on:
4+
push:
5+
pull_request:
6+
schedule: [cron: "40 1 * * 0"]
7+
8+
jobs:
9+
10+
test-release-build:
11+
name: Build
12+
runs-on: ubuntu-latest
13+
14+
strategy:
15+
matrix:
16+
include:
17+
- toolchain: "nightly"
18+
19+
steps:
20+
- name: Setup | Checkout
21+
uses: actions/checkout@v2
22+
23+
24+
- name: Setup | Toolchain
25+
uses: actions-rs/toolchain@v1.0.6
26+
with:
27+
toolchain: "${{ matrix.toolchain }}"
28+
override: true
29+
30+
31+
- name: Build | Release Mode
32+
uses: actions-rs/cargo@v1
33+
with:
34+
command: build
35+
args: --release
36+
37+
38+
unittest:
39+
runs-on: ubuntu-latest
40+
41+
strategy:
42+
fail-fast: false
43+
matrix:
44+
include:
45+
- toolchain: "nightly"
46+
features: ""
47+
48+
steps:
49+
- name: Setup | Checkout
50+
uses: actions/checkout@v2
51+
52+
53+
- name: Setup | Toolchain
54+
uses: actions-rs/toolchain@v1.0.6
55+
with:
56+
toolchain: "${{ matrix.toolchain }}"
57+
override: true
58+
59+
60+
- name: Test
61+
uses: actions-rs/cargo@v1
62+
with:
63+
command: test
64+
args: --features "${{ matrix.features }}"
65+
env:
66+
RUST_LOG: debug
67+
RUST_BACKTRACE: full
68+
69+
70+
lint:
71+
name: lint
72+
runs-on: ubuntu-latest
73+
steps:
74+
- uses: actions/checkout@v2
75+
- uses: actions-rs/toolchain@v1.0.6
76+
with:
77+
toolchain: nightly
78+
components: rustfmt, clippy
79+
80+
81+
- name: Check Apache License Header
82+
uses: korandoru/hawkeye@v2
83+
84+
85+
- name: Format
86+
uses: actions-rs/cargo@v1
87+
with:
88+
command: fmt
89+
args: --all -- --check
90+
91+
92+
- name: clippy
93+
shell: bash
94+
run: |
95+
cargo clippy --no-deps --workspace --all-targets -- -D warnings
96+
97+
98+
- name: Build-doc
99+
uses: actions-rs/cargo@v1
100+
with:
101+
command: doc
102+
args: --all --no-deps
103+
env:
104+
RUSTDOCFLAGS: "-D warnings"
105+
106+
107+
- shell: bash
108+
run: cargo install cargo-audit
109+
110+
111+
- name: Audit dependencies
112+
shell: bash
113+
# if: "!contains(github.event.head_commit.message, 'skip audit')"
114+
run: cargo audit --db ./target/advisory-db
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
name: 'commit-message-check'
2+
on:
3+
pull_request:
4+
types:
5+
- opened
6+
- edited
7+
- reopened
8+
- synchronize
9+
push:
10+
branches:
11+
12+
jobs:
13+
check-commit-message:
14+
name: check-subject
15+
runs-on: ubuntu-latest
16+
steps:
17+
- name: check-subject-type
18+
uses: gsactions/commit-message-checker@v1
19+
with:
20+
checkAllCommitMessages: 'true' # optional: this checks all commits associated with a pull request
21+
excludeDescription: 'true' # optional: this excludes the description body of a pull request
22+
excludeTitle: 'true' # optional: this excludes the title of a pull request
23+
accessToken: ${{ secrets.GITHUB_TOKEN }}
24+
pattern: '^(DataChange:|Change:|Feature:|Improve:|Perf:|Dep:|Doc:|Test:|CI:|Refactor:|Fix:|Fixdoc:|Fixup:|Merge|BumpVer:|Chore:|fix:|feat:|perf:|refactor:|test:|docs:|deps:|chore:|ci:|Build\(deps\):) .+$'
25+
flags: 'gm'
26+
error: |
27+
Subject line has to contain a commit type, e.g.: "Change: blabla" or a merge commit e.g.: "Merge xxx".
28+
Valid types are:
29+
DataChange - Persistent data change
30+
Change - API breaking change
31+
Feature - API compatible new feature
32+
feat - API compatible new feature
33+
Improve - Become better without functional changes
34+
Perf - Performance improvement
35+
perf - Performance improvement
36+
Dep - dependency update
37+
deps - dependency update
38+
Doc - doc update
39+
docs - doc update
40+
Test - test udpate
41+
test - test udpate
42+
CI - CI workflow update
43+
ci - CI workflow update
44+
Refactor - refactor without function change.
45+
refactor - refactor without function change.
46+
Fix - fix bug
47+
fix - fix bug
48+
Fixdoc - fix doc
49+
Fixup - minor change: e.g., fix sth mentioned in a review.
50+
BumpVer - Bump to a new version.
51+
Chore - Nothing important.
52+
chore - Nothing important.
53+
Build(deps) - bot: dependabot.
54+
55+
56+

.gitignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,8 @@ target/
1414
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
1515
# and can be added to the global gitignore or merged into this file. For a more nuclear
1616
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
17-
#.idea/
17+
/.idea/
18+
19+
Cargo.lock
20+
21+
.DS_Store

Cargo.toml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[package]
2+
name = "watcher"
3+
description = "subscribe data changes"
4+
version = "0.1.0"
5+
authors = ["Databend Authors <opensource@datafuselabs.com>"]
6+
license = "Apache-2.0"
7+
edition = "2021"
8+
9+
[features]
10+
11+
[dependencies]
12+
futures = "0.3.24"
13+
log = { version = "0.4.21", features = ["serde", "kv_unstable_std"] }
14+
span-map = { version = "0.2.0" }
15+
tokio = { version = "1.35.0", features = ["sync"] }
16+
tokio-util = { version = "0.7.13" }
17+
18+
[dev-dependencies]
19+
anyhow = { version = "1.0.65" }
20+
# Enable other features for testing
21+
tokio = { version = "1.35.0", features = ["sync", "time", "macros", "rt"] }
22+
23+
#[[example]]
24+
#name = "basic_usage"
25+
#path = "examples/basic_usage.rs"

Makefile

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
all: test fmt clippy doc
2+
3+
test:
4+
cargo test
5+
6+
fmt:
7+
cargo fmt
8+
9+
clippy:
10+
cargo clippy
11+
12+
doc:
13+
cargo doc
14+
15+

licenserc.toml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
baseDir = "."
2+
3+
headerPath = "Apache-2.0.txt"
4+
5+
excludes = [
6+
# hidden files
7+
".cargo",
8+
".github",
9+
".dockerignore",
10+
".gitignore",
11+
".gitattributes",
12+
".editorconfig",
13+
14+
"LICENSE",
15+
"Makefile",
16+
17+
"examples/**",
18+
19+
# docs and generated files
20+
"**/*.md",
21+
"**/*.hbs",
22+
"**/*.template",
23+
"**/*.cue",
24+
"**/*.json",
25+
"**/*.sql",
26+
"**/*.proto",
27+
"**/*.yml",
28+
"**/*.yaml",
29+
"**/*.toml",
30+
"**/*.lock",
31+
"**/*.yapf",
32+
"**/*.test",
33+
"**/*.txt",
34+
]
35+
36+
[properties]
37+
inceptionYear = 2021
38+
copyrightOwner = "Datafuse Labs"

rust-toolchain.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[toolchain]
2+
channel = "nightly-2025-01-01"
3+
components = ["rustfmt", "clippy", "rust-src", "miri", "rust-analyzer"]

rustfmt.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
edition = "2021"
2+
style_edition = "2021"
3+
reorder_imports = true
4+
imports_granularity = "Item"
5+
group_imports = "StdExternalCrate"
6+
where_single_line = true
7+
trailing_comma = "Vertical"
8+
overflow_delimited_expr = true
9+
format_code_in_doc_comments = true
10+
normalize_comments = true

src/desc.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::event_filter::EventFilter;
16+
use crate::id::WatcherId;
17+
use crate::type_config::TypeConfig;
18+
use crate::KeyRange;
19+
20+
/// Descriptor for a watcher that monitors key-value change events.
21+
///
22+
/// A `WatchDesc` defines the scope and filtering criteria for a watcher,
23+
/// specifying which key range to observe and what types of events
24+
/// (updates, deletes) to receive notifications for.
25+
#[derive(Clone, Debug)]
26+
pub struct WatchDesc<C>
27+
where C: TypeConfig
28+
{
29+
/// Unique identifier for this watcher instance.
30+
pub watcher_id: WatcherId,
31+
32+
/// Event filter that determines which event types (update/delete)
33+
/// this watcher should receive.
34+
pub interested: EventFilter,
35+
36+
/// The range of keys this watcher is monitoring.
37+
/// Only changes to keys within this range will trigger notifications.
38+
pub key_range: KeyRange<C>,
39+
}
40+
41+
impl<C> WatchDesc<C>
42+
where C: TypeConfig
43+
{
44+
pub(crate) fn new(id: WatcherId, interested: EventFilter, key_range: KeyRange<C>) -> Self {
45+
Self {
46+
watcher_id: id,
47+
interested,
48+
key_range,
49+
}
50+
}
51+
}

src/dispatch/command.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use futures::future::BoxFuture;
16+
17+
use crate::dispatch::Dispatcher;
18+
use crate::type_config::KVChange;
19+
use crate::type_config::TypeConfig;
20+
21+
/// A command sent to [`Dispatcher`].
22+
///
23+
/// [`Dispatcher`] is a single task that processes all commands serially in the order they are received.
24+
///
25+
/// Commands can be categorized into two types:
26+
/// - Synchronous commands: These commands (like `Change` and `Func`) execute immediately and block
27+
/// the dispatcher's thread until completion.
28+
/// - Asynchronous commands: These commands (like `AsyncFunc` and `Future`) return a future that
29+
/// will be awaited on the dispatcher's thread, allowing other tasks to make progress while waiting.
30+
///
31+
/// This command-based architecture ensures thread safety by serializing all operations on the dispatcher.
32+
#[allow(clippy::type_complexity)]
33+
pub enum Command<C>
34+
where C: TypeConfig
35+
{
36+
/// Submit a key-value change event to dispatcher.
37+
Change(KVChange<C>),
38+
39+
/// Execute a function to the [`Dispatcher`].
40+
///
41+
/// The function will be called with a mutable reference to the dispatcher,
42+
/// allowing it to modify the dispatcher's state or perform operations.
43+
/// This is a synchronous operation that blocks until completion.
44+
Func {
45+
req: Box<dyn FnOnce(&mut Dispatcher<C>) + Send + 'static>,
46+
},
47+
48+
/// Send a function to [`Dispatcher`] to run it asynchronously.
49+
///
50+
/// The function will be called with a mutable reference to the dispatcher,
51+
/// allowing it to modify the dispatcher's state or perform operations.
52+
/// The function returns a future that will be awaited on the dispatcher's thread.
53+
AsyncFunc {
54+
req: Box<dyn FnOnce(&mut Dispatcher<C>) -> BoxFuture<'static, ()> + Send + 'static>,
55+
},
56+
57+
/// Send a future to [`Dispatcher`] to run it.
58+
///
59+
/// The future will be awaited on the dispatcher's thread.
60+
Future(BoxFuture<'static, ()>),
61+
}

0 commit comments

Comments
 (0)