Skip to content

Commit a2927e7

Browse files
authored
A streambuf based on ResumableUploadSession. (#1565)
This will be used to return a ObjectWriteStream based on ResumableUploadSession instead of just a plain simple upload.
1 parent a13bc54 commit a2927e7

File tree

7 files changed

+339
-0
lines changed

7 files changed

+339
-0
lines changed

google/cloud/storage/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ add_library(storage_client
134134
internal/curl_request.cc
135135
internal/curl_request_builder.h
136136
internal/curl_request_builder.cc
137+
internal/curl_resumable_streambuf.h
138+
internal/curl_resumable_streambuf.cc
137139
internal/curl_upload_request.cc
138140
internal/curl_upload_request.h
139141
internal/curl_wrappers.h
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Copyright 2018 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/storage/internal/curl_resumable_streambuf.h"
16+
#include "google/cloud/log.h"
17+
18+
namespace google {
19+
namespace cloud {
20+
namespace storage {
21+
inline namespace STORAGE_CLIENT_NS {
22+
namespace internal {
23+
24+
CurlResumableStreambuf::CurlResumableStreambuf(
25+
std::unique_ptr<ResumableUploadSession> upload_session,
26+
std::size_t max_buffer_size, std::unique_ptr<HashValidator> hash_validator)
27+
: upload_session_(std::move(upload_session)),
28+
max_buffer_size_(UploadChunkRequest::RoundUpToQuantum(max_buffer_size)),
29+
hash_validator_(std::move(hash_validator)),
30+
last_response_{400} {
31+
current_ios_buffer_.reserve(max_buffer_size_);
32+
}
33+
34+
bool CurlResumableStreambuf::IsOpen() const {
35+
return static_cast<bool>(upload_session_);
36+
}
37+
38+
void CurlResumableStreambuf::ValidateHash(ObjectMetadata const& meta) {
39+
hash_validator_->ProcessMetadata(meta);
40+
hash_validator_result_ =
41+
HashValidator::FinishAndCheck(__func__, std::move(*hash_validator_));
42+
}
43+
44+
CurlResumableStreambuf::int_type CurlResumableStreambuf::overflow(int_type ch) {
45+
Validate(__func__);
46+
Flush(false);
47+
if (not traits_type::eq_int_type(ch, traits_type::eof())) {
48+
current_ios_buffer_.push_back(traits_type::to_char_type(ch));
49+
pbump(1);
50+
}
51+
return 0;
52+
}
53+
54+
int CurlResumableStreambuf::sync() {
55+
Flush(true);
56+
return 0;
57+
}
58+
59+
std::streamsize CurlResumableStreambuf::xsputn(char const* s,
60+
std::streamsize count) {
61+
Validate(__func__);
62+
Flush(false);
63+
current_ios_buffer_.append(s, static_cast<std::size_t>(count));
64+
pbump(static_cast<int>(count));
65+
return count;
66+
}
67+
68+
HttpResponse CurlResumableStreambuf::DoClose() {
69+
GCP_LOG(INFO) << __func__ << "()";
70+
return Flush(true);
71+
}
72+
73+
void CurlResumableStreambuf::Validate(char const* where) const {
74+
if (IsOpen()) {
75+
return;
76+
}
77+
std::string msg = "Attempting to use closed CurlResumableStreambuf in ";
78+
msg += where;
79+
google::cloud::internal::RaiseRuntimeError(msg);
80+
}
81+
82+
HttpResponse CurlResumableStreambuf::Flush(bool final_chunk) {
83+
if (not IsOpen()) {
84+
return last_response_;
85+
}
86+
// Shorten the buffer to the actual used size.
87+
auto actual_size = static_cast<std::size_t>(pptr() - pbase());
88+
if (actual_size == 0) {
89+
return last_response_;
90+
}
91+
if (actual_size <= max_buffer_size_ and not final_chunk) {
92+
return last_response_;
93+
}
94+
95+
std::string trailing;
96+
std::size_t upload_size = 0U;
97+
if (final_chunk) {
98+
current_ios_buffer_.resize(actual_size);
99+
upload_size =
100+
upload_session_->next_expected_byte() + current_ios_buffer_.size();
101+
} else {
102+
trailing = current_ios_buffer_.substr(max_buffer_size_);
103+
current_ios_buffer_.resize(max_buffer_size_);
104+
}
105+
hash_validator_->Update(current_ios_buffer_);
106+
107+
auto result = upload_session_->UploadChunk(current_ios_buffer_, upload_size);
108+
current_ios_buffer_.clear();
109+
current_ios_buffer_.reserve(max_buffer_size_);
110+
setp(&current_ios_buffer_[0], &current_ios_buffer_[0] + max_buffer_size_);
111+
current_ios_buffer_.append(trailing);
112+
pbump(static_cast<int>(trailing.size()));
113+
114+
if (final_chunk) {
115+
upload_session_.reset();
116+
}
117+
118+
last_response_ = HttpResponse{
119+
result.first.status_code(), std::move(result.second.payload), {}};
120+
return last_response_;
121+
}
122+
123+
} // namespace internal
124+
} // namespace STORAGE_CLIENT_NS
125+
} // namespace storage
126+
} // namespace cloud
127+
} // namespace google
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright 2018 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_CURL_RESUMABLE_STREAMBUF_H_
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_CURL_RESUMABLE_STREAMBUF_H_
17+
18+
#include "google/cloud/storage/internal/hash_validator.h"
19+
#include "google/cloud/storage/internal/object_streambuf.h"
20+
#include "google/cloud/storage/internal/raw_client.h"
21+
#include <iostream>
22+
23+
namespace google {
24+
namespace cloud {
25+
namespace storage {
26+
inline namespace STORAGE_CLIENT_NS {
27+
class ObjectMetadata;
28+
namespace internal {
29+
/**
30+
* Implement a wrapper for libcurl-based resumable uploads.
31+
*/
32+
class CurlResumableStreambuf : public ObjectWriteStreambuf {
33+
public:
34+
explicit CurlResumableStreambuf(
35+
std::unique_ptr<ResumableUploadSession> upload_session,
36+
std::size_t max_buffer_size,
37+
std::unique_ptr<HashValidator> hash_validator);
38+
39+
~CurlResumableStreambuf() override = default;
40+
41+
bool IsOpen() const override;
42+
void ValidateHash(ObjectMetadata const& meta) override;
43+
std::string const& received_hash() const override {
44+
return hash_validator_result_.received;
45+
}
46+
std::string const& computed_hash() const override {
47+
return hash_validator_result_.computed;
48+
}
49+
50+
protected:
51+
int sync() override;
52+
std::streamsize xsputn(char const* s, std::streamsize count) override;
53+
int_type overflow(int_type ch) override;
54+
// TODO(coryan) this is an ugly return type.
55+
HttpResponse DoClose() override;
56+
57+
private:
58+
/// Raise an exception if the stream is closed.
59+
void Validate(char const* where) const;
60+
61+
/// Flush the libcurl buffer and swap it with the iostream buffer.
62+
HttpResponse Flush(bool final_chunk);
63+
64+
std::unique_ptr<ResumableUploadSession> upload_session_;
65+
66+
std::string current_ios_buffer_;
67+
std::size_t max_buffer_size_;
68+
69+
std::unique_ptr<HashValidator> hash_validator_;
70+
HashValidator::Result hash_validator_result_;
71+
72+
HttpResponse last_response_;
73+
};
74+
75+
} // namespace internal
76+
} // namespace STORAGE_CLIENT_NS
77+
} // namespace storage
78+
} // namespace cloud
79+
} // namespace google
80+
81+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_CURL_RESUMABLE_STREAMBUF_H_

google/cloud/storage/storage_client.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ storage_client_HDRS = [
1818
"internal/curl_download_request.h",
1919
"internal/curl_request.h",
2020
"internal/curl_request_builder.h",
21+
"internal/curl_resumable_streambuf.h",
2122
"internal/curl_upload_request.h",
2223
"internal/curl_wrappers.h",
2324
"internal/curl_client.h",
@@ -92,6 +93,7 @@ storage_client_SRCS = [
9293
"internal/curl_download_request.cc",
9394
"internal/curl_request.cc",
9495
"internal/curl_request_builder.cc",
96+
"internal/curl_resumable_streambuf.cc",
9597
"internal/curl_upload_request.cc",
9698
"internal/curl_wrappers.cc",
9799
"internal/curl_client.cc",

google/cloud/storage/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ set(storage_client_integration_tests
1919
curl_upload_request_integration_test.cc
2020
curl_download_request_integration_test.cc
2121
curl_request_integration_test.cc
22+
curl_resumable_streambuf_integration_test.cc
2223
curl_resumable_upload_session_integration_test.cc
2324
curl_streambuf_integration_test.cc
2425
object_media_integration_test.cc
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Copyright 2018 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/internal/make_unique.h"
16+
#include "google/cloud/storage/client.h"
17+
#include "google/cloud/storage/internal/curl_resumable_streambuf.h"
18+
#include "google/cloud/storage/object_stream.h"
19+
#include "google/cloud/storage/testing/storage_integration_test.h"
20+
#include "google/cloud/testing_util/init_google_mock.h"
21+
#include <gmock/gmock.h>
22+
23+
namespace google {
24+
namespace cloud {
25+
namespace storage {
26+
inline namespace STORAGE_CLIENT_NS {
27+
namespace internal {
28+
namespace {
29+
using ::testing::HasSubstr;
30+
31+
class ResumableStreambufTestEnvironment : public ::testing::Environment {
32+
public:
33+
ResumableStreambufTestEnvironment(std::string instance) {
34+
bucket_name_ = std::move(instance);
35+
}
36+
37+
static std::string const& bucket_name() { return bucket_name_; }
38+
39+
private:
40+
static std::string bucket_name_;
41+
};
42+
43+
std::string ResumableStreambufTestEnvironment::bucket_name_;
44+
45+
class CurlResumableStreambufIntegrationTest
46+
: public google::cloud::storage::testing::StorageIntegrationTest {
47+
protected:
48+
void CheckUpload(int line_count, int line_size) {
49+
Client client;
50+
auto bucket_name = ResumableStreambufTestEnvironment::bucket_name();
51+
auto object_name = MakeRandomObjectName();
52+
53+
ResumableUploadRequest request(bucket_name, object_name);
54+
request.set_multiple_options(IfGenerationMatch(0));
55+
56+
Status status;
57+
std::unique_ptr<ResumableUploadSession> session;
58+
std::tie(status, session) =
59+
client.raw_client()->CreateResumableSession(request);
60+
ASSERT_TRUE(status.ok());
61+
62+
ObjectWriteStream writer(
63+
google::cloud::internal::make_unique<CurlResumableStreambuf>(
64+
std::move(session),
65+
client.raw_client()->client_options().upload_buffer_size(),
66+
google::cloud::internal::make_unique<NullHashValidator>()));
67+
68+
std::ostringstream expected_stream;
69+
WriteRandomLines(writer, expected_stream, line_count, line_size);
70+
auto metadata = writer.Close();
71+
EXPECT_EQ(object_name, metadata.name());
72+
EXPECT_EQ(bucket_name, metadata.bucket());
73+
74+
ObjectReadStream reader = client.ReadObject(bucket_name, object_name);
75+
76+
std::string actual(std::istreambuf_iterator<char>{reader}, {});
77+
78+
std::string expected = std::move(expected_stream).str();
79+
ASSERT_EQ(expected.size(), actual.size());
80+
EXPECT_EQ(expected, actual);
81+
82+
client.DeleteObject(bucket_name, object_name,
83+
Generation(metadata.generation()));
84+
}
85+
};
86+
87+
88+
TEST_F(CurlResumableStreambufIntegrationTest, Simple) {
89+
CheckUpload(20, 128);
90+
}
91+
92+
TEST_F(CurlResumableStreambufIntegrationTest, MultipleOfUploadQuantum) {
93+
CheckUpload(3 * 2 * 1024, 128);
94+
}
95+
96+
TEST_F(CurlResumableStreambufIntegrationTest, QuantumAndNonQuantum) {
97+
CheckUpload(3 * 1024, 128);
98+
}
99+
100+
} // namespace
101+
} // namespace internal
102+
} // namespace STORAGE_CLIENT_NS
103+
} // namespace storage
104+
} // namespace cloud
105+
} // namespace google
106+
107+
int main(int argc, char* argv[]) {
108+
google::cloud::testing_util::InitGoogleMock(argc, argv);
109+
110+
// Make sure the arguments are valid.
111+
if (argc != 2) {
112+
std::string const cmd = argv[0];
113+
auto last_slash = std::string(argv[0]).find_last_of('/');
114+
std::cerr << "Usage: " << cmd.substr(last_slash + 1) << " <bucket-name>"
115+
<< std::endl;
116+
return 1;
117+
}
118+
119+
std::string const bucket_name = argv[1];
120+
(void)::testing::AddGlobalTestEnvironment(
121+
new google::cloud::storage::internal::ResumableStreambufTestEnvironment(
122+
bucket_name));
123+
124+
return RUN_ALL_TESTS();
125+
}

google/cloud/storage/tests/storage_client_integration_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ storage_client_integration_tests = [
44
"curl_upload_request_integration_test.cc",
55
"curl_download_request_integration_test.cc",
66
"curl_request_integration_test.cc",
7+
"curl_resumable_streambuf_integration_test.cc",
78
"curl_resumable_upload_session_integration_test.cc",
89
"curl_streambuf_integration_test.cc",
910
"object_media_integration_test.cc",

0 commit comments

Comments
 (0)