Skip to content

Commit a95eda5

Browse files
authored
feat(GCS+gRPC): capture streaming RPCs metadata (#6902)
gRPC returns metadata for all requests, including streaming RPCs. In the storage service this is very useful for troubleshooting, this change weaves the metadata (and some additional bits from `grpc::ClientContext`) through the different layers. The metadata is exposed as "headers", because that matches the existing APIs (and gRPC uses HTTP/2 headers under the hood anyway).
1 parent 3c44e59 commit a95eda5

16 files changed

+269
-9
lines changed

generator/integration_tests/golden/mocks/mock_golden_kitchen_sink_stub.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ class MockTailLogEntriesStreamingReadRpc
7373
MOCK_METHOD(void, Cancel, (), (override));
7474
MOCK_METHOD((absl::variant<Status, ::google::test::admin::database::v1::TailLogEntriesResponse>), Read, (),
7575
(override));
76+
MOCK_METHOD(internal::StreamingRpcMetadata, GetRequestMetadata, (),
77+
(const, override));
7678
};
7779

7880
} // namespace GOOGLE_CLOUD_CPP_GENERATED_NS

google/cloud/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC)
427427
internal/setup_context.h
428428
internal/streaming_read_rpc.cc
429429
internal/streaming_read_rpc.h
430+
internal/streaming_read_rpc_logging.cc
430431
internal/streaming_read_rpc_logging.h
431432
internal/streaming_write_rpc.cc
432433
internal/streaming_write_rpc.h

google/cloud/google_cloud_cpp_grpc_utils.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ google_cloud_cpp_grpc_utils_srcs = [
7777
"internal/minimal_iam_credentials_stub.cc",
7878
"internal/retry_loop_helpers.cc",
7979
"internal/streaming_read_rpc.cc",
80+
"internal/streaming_read_rpc_logging.cc",
8081
"internal/streaming_write_rpc.cc",
8182
"internal/time_utils.cc",
8283
"internal/unified_grpc_credentials.cc",

google/cloud/internal/resumable_streaming_read_rpc.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ class ResumableStreamingReadRpc : public StreamingReadRpc<ResponseType> {
127127
return last_status;
128128
}
129129

130+
StreamingRpcMetadata GetRequestMetadata() const override {
131+
return impl_ ? impl_->GetRequestMetadata() : StreamingRpcMetadata{};
132+
}
133+
130134
private:
131135
std::unique_ptr<RetryPolicy const> const retry_policy_prototype_;
132136
std::unique_ptr<BackoffPolicy const> const backoff_policy_prototype_;

google/cloud/internal/resumable_streaming_read_rpc_test.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class MockStreamingReadRpc : public StreamingReadRpc<FakeResponse> {
5151
public:
5252
MOCK_METHOD(void, Cancel, (), (override));
5353
MOCK_METHOD(ReadReturn, Read, (), (override));
54+
MOCK_METHOD(StreamingRpcMetadata, GetRequestMetadata, (), (const, override));
5455
};
5556

5657
struct TestRetryablePolicy {

google/cloud/internal/streaming_read_rpc.cc

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,55 @@
1414

1515
#include "google/cloud/internal/streaming_read_rpc.h"
1616
#include "google/cloud/log.h"
17+
#include <grpc/compression.h>
1718

1819
namespace google {
1920
namespace cloud {
2021
inline namespace GOOGLE_CLOUD_CPP_NS {
2122
namespace internal {
23+
namespace {
24+
// AFAICT there is no C++ API to get the name, but the C core API is public and
25+
// documented:
26+
// https://grpc.github.io/grpc/core/compression_8h.html
27+
std::string ToString(grpc_compression_algorithm algo) {
28+
char const* name;
29+
if (grpc_compression_algorithm_name(algo, &name) == 0) {
30+
return "unknown";
31+
}
32+
return name;
33+
}
34+
} // namespace
35+
36+
StreamingRpcMetadata GetRequestMetadataFromContext(
37+
grpc::ClientContext const& context) {
38+
StreamingRpcMetadata metadata{
39+
// Use invalid header names (starting with ':') to store the
40+
// grpc::ClientContext metadata.
41+
{":grpc-context-peer", context.peer()},
42+
{":grpc-context-compression-algorithm",
43+
ToString(context.compression_algorithm())},
44+
};
45+
auto hint = metadata.end();
46+
for (auto const& kv : context.GetServerInitialMetadata()) {
47+
// gRPC metadata is stored in `grpc::string_ref`, a type inspired by
48+
// `std::string_view`. We need to explicitly convert these to `std::string`.
49+
// In addition, we use a prefix to distinguish initial vs. trailing headers.
50+
auto key = ":grpc-initial-" + std::string{kv.first.data(), kv.first.size()};
51+
auto value = std::string{kv.second.data(), kv.second.size()};
52+
hint = std::next(
53+
metadata.emplace_hint(hint, std::move(key), std::move(value)));
54+
}
55+
hint = metadata.end();
56+
for (auto const& kv : context.GetServerTrailingMetadata()) {
57+
// Same as above, convert `grpc::string_ref` to `std::string`:
58+
auto key =
59+
":grpc-trailing-" + std::string{kv.first.data(), kv.first.size()};
60+
auto value = std::string{kv.second.data(), kv.second.size()};
61+
hint = std::next(
62+
metadata.emplace_hint(hint, std::move(key), std::move(value)));
63+
}
64+
return metadata;
65+
}
2266

2367
void StreamingReadRpcReportUnhandledError(Status const& status,
2468
char const* tname) {

google/cloud/internal/streaming_read_rpc.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,22 @@
2121
#include "absl/types/variant.h"
2222
#include <grpcpp/grpcpp.h>
2323
#include <grpcpp/support/sync_stream.h>
24+
#include <map>
2425
#include <memory>
26+
#include <string>
2527

2628
namespace google {
2729
namespace cloud {
2830
inline namespace GOOGLE_CLOUD_CPP_NS {
2931
namespace internal {
3032

33+
/// A simple representation of request metadata.
34+
using StreamingRpcMetadata = std::multimap<std::string, std::string>;
35+
36+
/// Return interesting bits of metadata stored in the client context.
37+
StreamingRpcMetadata GetRequestMetadataFromContext(
38+
grpc::ClientContext const& context);
39+
3140
/**
3241
* Defines the interface for wrappers around gRPC streaming read RPCs.
3342
*
@@ -50,6 +59,15 @@ class StreamingReadRpc {
5059

5160
/// Return the next element, or the final RPC status.
5261
virtual absl::variant<Status, ResponseType> Read() = 0;
62+
63+
/**
64+
* Return the request metadata.
65+
*
66+
* Request metadata is useful for troubleshooting, but may be relatively
67+
* expensive to extract. Library developers should avoid this function in
68+
* the critical path.
69+
*/
70+
virtual StreamingRpcMetadata GetRequestMetadata() const = 0;
5371
};
5472

5573
/// Report the errors in a standalone function to minimize includes
@@ -87,6 +105,11 @@ class StreamingReadRpcImpl : public StreamingReadRpc<ResponseType> {
87105
return Finish();
88106
}
89107

108+
StreamingRpcMetadata GetRequestMetadata() const override {
109+
if (!context_) return {};
110+
return GetRequestMetadataFromContext(*context_);
111+
}
112+
90113
private:
91114
Status Finish() {
92115
auto status = MakeStatusFromRpcError(stream_->Finish());
@@ -119,6 +142,8 @@ class StreamingReadRpcError : public StreamingReadRpc<ResponseType> {
119142

120143
absl::variant<Status, ResponseType> Read() override { return status_; }
121144

145+
StreamingRpcMetadata GetRequestMetadata() const override { return {}; }
146+
122147
private:
123148
Status status_;
124149
};
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/internal/streaming_read_rpc_logging.h"
16+
#include "google/cloud/internal/absl_str_cat_quiet.h"
17+
#include "google/cloud/internal/absl_str_join_quiet.h"
18+
19+
namespace google {
20+
namespace cloud {
21+
inline namespace GOOGLE_CLOUD_CPP_NS {
22+
namespace internal {
23+
24+
std::string FormatMetadata(StreamingRpcMetadata const& metadata) {
25+
auto formatter = [](std::string* output,
26+
StreamingRpcMetadata::value_type const& p) {
27+
*output += absl::StrCat("{", p.first, ": ", p.second, "}");
28+
};
29+
return absl::StrJoin(metadata.begin(), metadata.end(), ", ", formatter);
30+
}
31+
32+
} // namespace internal
33+
} // namespace GOOGLE_CLOUD_CPP_NS
34+
} // namespace cloud
35+
} // namespace google

google/cloud/internal/streaming_read_rpc_logging.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ namespace google {
3232
namespace cloud {
3333
inline namespace GOOGLE_CLOUD_CPP_NS {
3434
namespace internal {
35+
std::string FormatMetadata(StreamingRpcMetadata const& metadata);
3536

3637
/**
3738
* Logging decorator for StreamingReadRpc.
@@ -61,6 +62,12 @@ class StreamingReadRpcLogging : public StreamingReadRpc<ResponseType> {
6162
<< absl::visit(ResultVisitor(tracing_options_), result);
6263
return result;
6364
}
65+
StreamingRpcMetadata GetRequestMetadata() const override {
66+
auto metadata = reader_->GetRequestMetadata();
67+
GCP_LOG(DEBUG) << __func__ << "() >> metadata={" << FormatMetadata(metadata)
68+
<< "}";
69+
return metadata;
70+
}
6471

6572
private:
6673
class ResultVisitor {

google/cloud/internal/streaming_read_rpc_logging_test.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class MockStreamingReadRpc : public StreamingReadRpc<ResponseType> {
3939
MOCK_METHOD(void, Cancel, (), (override));
4040
MOCK_METHOD((absl::variant<Status, google::protobuf::Duration>), Read, (),
4141
(override));
42+
MOCK_METHOD(StreamingRpcMetadata, GetRequestMetadata, (), (const, override));
4243
};
4344

4445
class StreamingReadRpcLoggingTest : public ::testing::Test {
@@ -100,6 +101,21 @@ TEST_F(StreamingReadRpcLoggingTest, Read) {
100101
EXPECT_THAT(log_lines, Contains(HasSubstr("Invalid argument.")));
101102
}
102103

104+
TEST_F(StreamingReadRpcLoggingTest, FormatMetadata) {
105+
struct Test {
106+
StreamingRpcMetadata metadata;
107+
std::string expected;
108+
} cases[] = {
109+
{{}, ""},
110+
{{{"a", "b"}}, "{a: b}"},
111+
{{{"a", "b"}, {"k", "v"}}, "{a: b}, {k: v}"},
112+
};
113+
for (auto const& test : cases) {
114+
auto const actual = FormatMetadata(test.metadata);
115+
EXPECT_EQ(test.expected, actual);
116+
}
117+
}
118+
103119
} // namespace
104120
} // namespace internal
105121
} // namespace GOOGLE_CLOUD_CPP_NS

0 commit comments

Comments
 (0)