Skip to content

Commit a5257d1

Browse files
committed
[feat]curvefs/client: add check in cache
1. check disk cache is work 2. check kv cache is work 3. add tools-v2 Signed-off-by: Cyber-SiKu <Cyber-SiKu@outlook.com>
1 parent bea74e3 commit a5257d1

File tree

20 files changed

+640
-106
lines changed

20 files changed

+640
-106
lines changed

curvefs/src/client/common/common.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ std::ostream &operator<<(std::ostream &os, MetaServerOpType optype) {
8080

8181
const char kCurveFsWarmupOpAdd[] = "add";
8282
const char kCurveFsWarmupOpCancel[] = "cancel";
83+
const char kCurveFsWarmupOpCheck[] = "check";
8384
const char kCurveFsWarmupTypeList[] = "list";
8485
const char kCurveFsWarmupTypeSingle[] = "single";
8586

@@ -89,6 +90,8 @@ WarmupOpType GetWarmupOpType(const std::string& op) {
8990
ret = WarmupOpType::kWarmupOpAdd;
9091
} else if (op == kCurveFsWarmupOpCancel) {
9192
ret = WarmupOpType::kWarmupOpCancel;
93+
} else if (op == kCurveFsWarmupOpCheck) {
94+
ret = WarmupOpType::kWarmupOpCheck;
9295
}
9396
return ret;
9497
}

curvefs/src/client/common/common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,13 @@ std::ostream &operator<<(std::ostream &os, MetaServerOpType optype);
6868
constexpr size_t kMinWarmupOpArgsNum = 1;
6969
constexpr size_t kWarmupAddArgsNum = 6;
7070
constexpr size_t kWarmupCancelArgsNum = 2;
71+
constexpr size_t kWarmupCheckArgsNum = 6;
7172

7273
enum class WarmupOpType {
7374
kWarmupOpUnknown = 0,
7475
kWarmupOpAdd = 1,
7576
kWarmupOpCancel = 2,
77+
kWarmupOpCheck = 3,
7678
};
7779

7880
WarmupOpType GetWarmupOpType(const std::string& op);

curvefs/src/client/curve_fuse_op.cpp

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323

2424
#include "curvefs/src/client/curve_fuse_op.h"
2525

26+
#include <fmt/format.h>
27+
2628
#include <cstring>
29+
#include <functional>
2730
#include <memory>
2831
#include <string>
2932
#include <unordered_map>
@@ -65,6 +68,7 @@ using ::curvefs::client::common::WarmupStorageType;
6568
using ::curvefs::client::filesystem::AttrOut;
6669
using ::curvefs::client::filesystem::EntryOut;
6770
using ::curvefs::client::filesystem::FileOut;
71+
using ::curvefs::client::filesystem::IsCheckXAttr;
6872
using ::curvefs::client::filesystem::IsListWarmupXAttr;
6973
using ::curvefs::client::filesystem::IsWarmupXAttr;
7074
using ::curvefs::client::filesystem::StrAttr;
@@ -235,16 +239,18 @@ void UnInitFuseClient() {
235239
int AddWarmupTask(curvefs::client::common::WarmupType type, fuse_ino_t key,
236240
const std::string& path,
237241
curvefs::client::common::WarmupStorageType storageType,
238-
const std::string& mount_point, const std::string& root) {
242+
const std::string& mount_point, const std::string& root,
243+
bool check = false) {
239244
int ret = 0;
240245
bool result = true;
241246
switch (type) {
242247
case curvefs::client::common::WarmupType::kWarmupTypeList:
243-
result = g_ClientInstance->PutWarmFilelistTask(key, storageType, path,
244-
mount_point, root);
245-
break;
248+
result = g_ClientInstance->PutWarmFilelistTask(
249+
key, storageType, path, mount_point, root, check);
250+
break;
246251
case curvefs::client::common::WarmupType::kWarmupTypeSingle:
247-
result = g_ClientInstance->PutWarmFileTask(key, path, storageType);
252+
result =
253+
g_ClientInstance->PutWarmFileTask(key, path, storageType, check);
248254
break;
249255
default:
250256
// not support add warmup type (warmup single file/dir or filelist)
@@ -289,6 +295,18 @@ void QueryWarmupTask(fuse_ino_t key, std::string *data) {
289295
VLOG(9) << "Warmup [" << key << "]" << *data;
290296
}
291297

298+
void QueryCheckCachedTask(fuse_ino_t key, std::string* data) {
299+
WarmupProgress progress;
300+
bool ret = g_ClientInstance->GetCheckCachedProgress(key, &progress);
301+
if (!ret) {
302+
*data = "no check task or not warmup yet";
303+
} else {
304+
*data =
305+
fmt::format("{}/{}", progress.GetFinished(), progress.GetTotal());
306+
}
307+
VLOG(9) << "check cached [" << key << "]" << *data;
308+
}
309+
292310
void ListWarmupTasks(std::string* data) {
293311
WarmupProgress progress;
294312
std::unordered_map<std::string, WarmupProgress> filepath2progress;
@@ -342,9 +360,15 @@ int Warmup(fuse_ino_t key, const char* name, const std::string& values) {
342360
}
343361

344362
int ret = 0;
363+
bool check = false;
364+
if (curvefs::client::common::GetWarmupOpType(warmupOpType) ==
365+
curvefs::client::common::WarmupOpType::kWarmupOpCheck) {
366+
check = true;
367+
}
345368

346369
switch (curvefs::client::common::GetWarmupOpType(warmupOpType)) {
347-
case curvefs::client::common::WarmupOpType::kWarmupOpAdd: {
370+
case curvefs::client::common::WarmupOpType::kWarmupOpCheck:
371+
case curvefs::client::common::WarmupOpType::kWarmupOpAdd : {
348372
if (opTypePath.size() !=
349373
curvefs::client::common::kWarmupAddArgsNum) {
350374
LOG(ERROR)
@@ -371,7 +395,7 @@ int Warmup(fuse_ino_t key, const char* name, const std::string& values) {
371395
ret = AddWarmupTask(
372396
curvefs::client::common::GetWarmupType(warmupDataType), key,
373397
entryFilePathInClient, storageType, mountPointInCurvefs,
374-
rootPathInCurvefs);
398+
rootPathInCurvefs, check);
375399
break;
376400
}
377401
case curvefs::client::common::WarmupOpType::kWarmupOpCancel: {
@@ -444,6 +468,17 @@ void QueryWarmup(fuse_req_t req, fuse_ino_t ino, size_t size) {
444468
return fs->ReplyBuffer(req, data.data(), data.length());
445469
}
446470

471+
void QueryCheckCached(fuse_req_t req, fuse_ino_t ino, size_t size) {
472+
auto fs = Client()->GetFileSystem();
473+
474+
std::string data;
475+
QueryCheckCachedTask(ino, &data);
476+
if (size == 0) {
477+
return fs->ReplyXattr(req, data.length());
478+
}
479+
return fs->ReplyBuffer(req, data.data(), data.length());
480+
}
481+
447482
void ListWarmup(fuse_req_t req, size_t size) {
448483
auto fs = Client()->GetFileSystem();
449484

@@ -942,6 +977,8 @@ void FuseOpGetXattr(fuse_req_t req,
942977
return ListWarmup(req, size);
943978
} else if (IsWarmupXAttr(name)) {
944979
return QueryWarmup(req, ino, size);
980+
} else if (IsCheckXAttr(name)) {
981+
return QueryCheckCached(req, ino, size);
945982
}
946983

947984
rc = Client()->FuseOpGetXattr(req, ino, name, &value, size);

curvefs/src/client/filesystem/xattr.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const char XATTR_DIR_RFBYTES[] = "curve.dir.rfbytes";
4545
const char XATTR_DIR_PREFIX[] = "curve.dir";
4646
const char XATTR_WARMUP_OP[] = "curvefs.warmup.op";
4747
const char XATTR_WARMUP_OP_LIST[] = "curvefs.warmup.op.list";
48+
const char XATTR_WARMUP_OP_CHECK[] = "curvefs.warmup.check";
4849

4950
inline bool IsSpecialXAttr(const std::string& key) {
5051
static std::map<std::string, bool> xattrs {
@@ -65,6 +66,10 @@ inline bool IsWarmupXAttr(const std::string& key) {
6566
return key == XATTR_WARMUP_OP;
6667
}
6768

69+
inline bool IsCheckXAttr(const std::string& key) {
70+
return key == XATTR_WARMUP_OP_CHECK;
71+
}
72+
6873
inline bool IsListWarmupXAttr(const std::string& key) {
6974
return key == XATTR_WARMUP_OP_LIST;
7075
}

curvefs/src/client/fuse_client.h

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -312,18 +312,18 @@ class FuseClient {
312312
bool PutWarmFilelistTask(fuse_ino_t key, common::WarmupStorageType type,
313313
const std::string& path,
314314
const std::string& mount_point,
315-
const std::string& root) {
315+
const std::string& root, bool check = false) {
316316
if (fsInfo_->fstype() == FSType::TYPE_S3) {
317317
return warmupManager_->AddWarmupFilelist(key, type, path,
318-
mount_point, root);
318+
mount_point, root, check);
319319
} // only support s3
320320
return true;
321321
}
322322

323-
bool PutWarmFileTask(fuse_ino_t key, const std::string &path,
324-
common::WarmupStorageType type) {
323+
bool PutWarmFileTask(fuse_ino_t key, const std::string& path,
324+
common::WarmupStorageType type, bool check = false) {
325325
if (fsInfo_->fstype() == FSType::TYPE_S3) {
326-
return warmupManager_->AddWarmupFile(key, path, type);
326+
return warmupManager_->AddWarmupFile(key, path, type, check);
327327
} // only support s3
328328
return true;
329329
}
@@ -342,6 +342,14 @@ class FuseClient {
342342
return false;
343343
}
344344

345+
bool GetCheckCachedProgress(fuse_ino_t key,
346+
warmup::WarmupProgress* progress) {
347+
if (fsInfo_->fstype() == FSType::TYPE_S3) {
348+
return warmupManager_->QueryCheckCachedProgress(key, progress);
349+
}
350+
return false;
351+
}
352+
345353
bool GetAllWarmupProgress(Filepath2WarmupProgressMap* filepath2progress) {
346354
if (fsInfo_->fstype() == FSType::TYPE_S3) {
347355
return warmupManager_->ListWarmupProgress(filepath2progress);

curvefs/src/client/kvclient/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,6 @@ cc_library(
3535
"//src/common:curve_common",
3636
"//src/common:curve_s3_adapter",
3737
"@libmemcached",
38+
"@fmt//:fmt",
3839
],
3940
)

curvefs/src/client/kvclient/kvclient.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include <libmemcached-1.0/types/return.h>
2626

27+
#include <cstdint>
2728
#include <string>
2829

2930
namespace curvefs {
@@ -54,6 +55,8 @@ class KVClient {
5455
virtual bool Get(const std::string& key, char* value, uint64_t offset,
5556
uint64_t length, std::string* errorlog,
5657
uint64_t* actLength, memcached_return_t* retCod) = 0;
58+
59+
virtual bool Exist(const std::string& key) = 0;
5760
};
5861

5962
} // namespace client

curvefs/src/client/kvclient/kvclient_manager.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,5 +128,12 @@ int KVClientManager::GetKvCache(
128128
return 0;
129129
}
130130

131+
void KVClientManager::Exist(std::shared_ptr<ExistKVCacheTask> task) {
132+
threadPool_.Enqueue([task, this]() {
133+
task->res = client_->Exist(task->key);
134+
OnReturn(&kvClientManagerMetric_->exist, task);
135+
});
136+
}
137+
131138
} // namespace client
132139
} // namespace curvefs

curvefs/src/client/kvclient/kvclient_manager.h

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,7 @@ namespace client {
4646
class KVClientManager;
4747
struct SetKVCacheTask;
4848
struct GetKVCacheTask;
49-
50-
class GetKvCacheContext;
51-
class SetKvCacheContext;
49+
struct ExistKVCacheTask;
5250

5351
using curve::common::GetObjectAsyncContext;
5452
using curve::common::TaskThreadPool;
@@ -58,6 +56,8 @@ using SetKVCacheDone =
5856
std::function<void(const std::shared_ptr<SetKVCacheTask>&)>;
5957
using GetKVCacheDone =
6058
std::function<void(const std::shared_ptr<GetKVCacheTask>&)>;
59+
using ExistKVCacheDone =
60+
std::function<void(const std::shared_ptr<ExistKVCacheTask>&)>;
6161

6262
struct SetKVCacheTask {
6363
std::string key;
@@ -79,7 +79,7 @@ struct SetKVCacheTask {
7979
};
8080

8181
struct GetKVCacheTask {
82-
const std::string& key;
82+
std::string key;
8383
char* value;
8484
uint64_t offset;
8585
uint64_t valueLength;
@@ -101,11 +101,21 @@ struct GetKVCacheTask {
101101
timer(butil::Timer::STARTED) {}
102102
};
103103

104-
using GetKvCacheCallBack =
105-
std::function<void(const std::shared_ptr<GetKvCacheContext>&)>;
104+
struct ExistKVCacheTask {
105+
std::string key;
106+
bool res;
107+
uint64_t length = 0; // useless,just for OnReturn
108+
ExistKVCacheDone done;
109+
butil::Timer timer;
106110

107-
using SetKvCacheCallBack =
108-
std::function<void(const std::shared_ptr<SetKvCacheContext>&)>;
111+
explicit ExistKVCacheTask(
112+
const std::string& k,
113+
ExistKVCacheDone done = [](const std::shared_ptr<ExistKVCacheTask>&) {})
114+
: key(k),
115+
res(false),
116+
done(std::move(done)),
117+
timer(butil::Timer::STARTED) {}
118+
};
109119

110120
struct KvCacheContext {
111121
std::string key;
@@ -117,17 +127,6 @@ struct KvCacheContext {
117127
uint64_t startTime;
118128
};
119129

120-
struct GetKvCacheContext : KvCacheContext {
121-
char* value;
122-
bool res;
123-
GetKvCacheCallBack cb;
124-
};
125-
126-
struct SetKvCacheContext : KvCacheContext {
127-
const char* value;
128-
SetKvCacheCallBack cb;
129-
};
130-
131130
class KVClientManager {
132131
public:
133132
KVClientManager() = default;
@@ -146,6 +145,8 @@ class KVClientManager {
146145

147146
void Get(std::shared_ptr<GetKVCacheTask> task);
148147

148+
void Exist(std::shared_ptr<ExistKVCacheTask> task);
149+
149150
KVClientManagerMetric* GetMetricForTesting() {
150151
return kvClientManagerMetric_.get();
151152
}

curvefs/src/client/kvclient/memcache_client.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,44 @@
2222

2323
#include "curvefs/src/client/kvclient/memcache_client.h"
2424

25+
#include <libmemcached-1.0/exist.h>
26+
#include <libmemcached-1.0/types/return.h>
27+
28+
#include "src/client/client_metric.h"
29+
2530
namespace curvefs {
2631
namespace client {
2732

2833
thread_local memcached_st* tcli = nullptr;
2934

35+
bool MemCachedClient::Exist(const std::string& key) {
36+
// https://awesomized.github.io/libmemcached/libmemcached/memcached_exist.html?highlight=exist#_CPPv415memcached_existP12memcached_stPcP6size_t
37+
memcached_return_t ue;
38+
size_t value_length = 0;
39+
uint64_t start = butil::cpuwide_time_us();
40+
if (nullptr == tcli) {
41+
LOG(ERROR) << "create tcli";
42+
tcli = memcached_clone(nullptr, client_);
43+
}
44+
ue = memcached_exist(tcli, key.c_str(), key.length());
45+
if (ue == MEMCACHED_SUCCESS) {
46+
curve::client::CollectMetrics(&metric_->exist, 0,
47+
butil::cpuwide_time_us() - start);
48+
return true;
49+
}
50+
51+
if (ue == MEMCACHED_NOTFOUND) {
52+
curve::client::CollectMetrics(&metric_->exist, 0,
53+
butil::cpuwide_time_us() - start);
54+
} else {
55+
std::string errorlog = ResError(ue);
56+
LOG(ERROR) << "Exist key = " << key << " error = " << errorlog;
57+
metric_->exist.eps.count << 1;
58+
}
59+
memcached_free(tcli);
60+
tcli = nullptr;
61+
return false;
62+
}
63+
3064
} // namespace client
3165
} // namespace curvefs

0 commit comments

Comments
 (0)