Skip to content

Commit 5ef3ff7

Browse files
authored
refactor(pubsub): move AckHandlerWrapper to header (#10290)
I will need this class in the implementation of blocking pulls.
1 parent fa4b763 commit 5ef3ff7

File tree

7 files changed

+209
-31
lines changed

7 files changed

+209
-31
lines changed

google/cloud/pubsub/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ add_library(
4848
connection_options.h
4949
exactly_once_ack_handler.cc
5050
exactly_once_ack_handler.h
51+
internal/ack_handler_wrapper.cc
52+
internal/ack_handler_wrapper.h
5153
internal/batch_sink.h
5254
internal/batching_publisher_connection.cc
5355
internal/batching_publisher_connection.h
@@ -265,6 +267,7 @@ function (google_cloud_cpp_pubsub_client_define_tests)
265267
blocking_publisher_connection_test.cc
266268
blocking_publisher_test.cc
267269
exactly_once_ack_handler_test.cc
270+
internal/ack_handler_wrapper_test.cc
268271
internal/batching_publisher_connection_test.cc
269272
internal/default_batch_sink_test.cc
270273
internal/defaults_test.cc

google/cloud/pubsub/google_cloud_cpp_pubsub.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ google_cloud_cpp_pubsub_hdrs = [
2424
"blocking_publisher_connection.h",
2525
"connection_options.h",
2626
"exactly_once_ack_handler.h",
27+
"internal/ack_handler_wrapper.h",
2728
"internal/batch_sink.h",
2829
"internal/batching_publisher_connection.h",
2930
"internal/blocking_publisher_connection_impl.h",
@@ -96,6 +97,7 @@ google_cloud_cpp_pubsub_srcs = [
9697
"blocking_publisher_connection.cc",
9798
"connection_options.cc",
9899
"exactly_once_ack_handler.cc",
100+
"internal/ack_handler_wrapper.cc",
99101
"internal/batching_publisher_connection.cc",
100102
"internal/blocking_publisher_connection_impl.cc",
101103
"internal/create_channel.cc",
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/pubsub/internal/ack_handler_wrapper.h"
16+
#include "google/cloud/log.h"
17+
18+
namespace google {
19+
namespace cloud {
20+
namespace pubsub_internal {
21+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
22+
23+
void AckHandlerWrapper::ack() {
24+
auto f = impl_->ack();
25+
if (message_id_.empty()) return;
26+
f.then([id = std::move(message_id_)](auto f) {
27+
auto status = f.get();
28+
GCP_LOG(WARNING) << "error while trying to ack(), status=" << status
29+
<< ", message_id=" << id;
30+
});
31+
}
32+
33+
void AckHandlerWrapper::nack() {
34+
auto f = impl_->nack();
35+
if (message_id_.empty()) return;
36+
f.then([id = std::move(message_id_)](auto f) {
37+
auto status = f.get();
38+
GCP_LOG(WARNING) << "error while trying to nack(), status=" << status
39+
<< ", message_id=" << id;
40+
});
41+
}
42+
43+
std::int32_t AckHandlerWrapper::delivery_attempt() const {
44+
return impl_->delivery_attempt();
45+
}
46+
47+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
48+
} // namespace pubsub_internal
49+
} // namespace cloud
50+
} // namespace google
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_ACK_HANDLER_WRAPPER_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_ACK_HANDLER_WRAPPER_H
17+
18+
#include "google/cloud/pubsub/ack_handler.h"
19+
#include "google/cloud/pubsub/exactly_once_ack_handler.h"
20+
#include "google/cloud/version.h"
21+
#include <string>
22+
23+
namespace google {
24+
namespace cloud {
25+
namespace pubsub_internal {
26+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
27+
28+
class AckHandlerWrapper : public pubsub::AckHandler::Impl {
29+
public:
30+
explicit AckHandlerWrapper(
31+
std::unique_ptr<pubsub::ExactlyOnceAckHandler::Impl> impl)
32+
: impl_(std::move(impl)) {}
33+
AckHandlerWrapper(std::unique_ptr<pubsub::ExactlyOnceAckHandler::Impl> impl,
34+
std::string message_id)
35+
: impl_(std::move(impl)), message_id_(std::move(message_id)) {}
36+
~AckHandlerWrapper() override = default;
37+
38+
void ack() override;
39+
void nack() override;
40+
std::int32_t delivery_attempt() const override;
41+
42+
private:
43+
std::unique_ptr<pubsub::ExactlyOnceAckHandler::Impl> impl_;
44+
std::string message_id_;
45+
};
46+
47+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
48+
} // namespace pubsub_internal
49+
} // namespace cloud
50+
} // namespace google
51+
52+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_ACK_HANDLER_WRAPPER_H
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/pubsub/internal/ack_handler_wrapper.h"
16+
#include "google/cloud/internal/make_status.h"
17+
#include "google/cloud/testing_util/scoped_log.h"
18+
#include "absl/memory/memory.h"
19+
#include <gmock/gmock.h>
20+
21+
namespace google {
22+
namespace cloud {
23+
namespace pubsub_internal {
24+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
25+
26+
using ::google::cloud::internal::PermissionDeniedError;
27+
using ::google::cloud::testing_util::ScopedLog;
28+
using ::testing::AllOf;
29+
using ::testing::ByMove;
30+
using ::testing::Contains;
31+
using ::testing::HasSubstr;
32+
using ::testing::Return;
33+
34+
class MockExactlyOnceAckHandlerImpl
35+
: public pubsub::ExactlyOnceAckHandler::Impl {
36+
public:
37+
MOCK_METHOD(future<Status>, ack, (), (override));
38+
MOCK_METHOD(future<Status>, nack, (), (override));
39+
MOCK_METHOD(std::int32_t, delivery_attempt, (), (const, override));
40+
};
41+
42+
TEST(AckHandlerWrapper, Ack) {
43+
auto mock = absl::make_unique<MockExactlyOnceAckHandlerImpl>();
44+
EXPECT_CALL(*mock, ack)
45+
.WillOnce(
46+
Return(ByMove(make_ready_future(PermissionDeniedError("uh-oh")))));
47+
ScopedLog log;
48+
AckHandlerWrapper tested(std::move(mock), "test-id");
49+
tested.ack();
50+
EXPECT_THAT(log.ExtractLines(),
51+
Contains(AllOf(HasSubstr(" ack()"), HasSubstr("uh-oh"),
52+
HasSubstr("test-id"))));
53+
}
54+
55+
TEST(AckHandlerWrapper, AckEmpty) {
56+
auto mock = absl::make_unique<MockExactlyOnceAckHandlerImpl>();
57+
EXPECT_CALL(*mock, ack)
58+
.WillOnce(
59+
Return(ByMove(make_ready_future(PermissionDeniedError("uh-oh")))));
60+
ScopedLog log;
61+
AckHandlerWrapper tested(std::move(mock));
62+
tested.ack();
63+
EXPECT_THAT(log.ExtractLines(), Not(Contains(HasSubstr(" ack()"))));
64+
}
65+
66+
TEST(AckHandlerWrapper, Nack) {
67+
auto mock = absl::make_unique<MockExactlyOnceAckHandlerImpl>();
68+
EXPECT_CALL(*mock, nack)
69+
.WillOnce(
70+
Return(ByMove(make_ready_future(PermissionDeniedError("uh-oh")))));
71+
ScopedLog log;
72+
AckHandlerWrapper tested(std::move(mock), "test-id");
73+
tested.nack();
74+
EXPECT_THAT(log.ExtractLines(),
75+
Contains(AllOf(HasSubstr(" nack()"), HasSubstr("uh-oh"),
76+
HasSubstr("test-id"))));
77+
}
78+
79+
TEST(AckHandlerWrapper, NackEmpty) {
80+
auto mock = absl::make_unique<MockExactlyOnceAckHandlerImpl>();
81+
EXPECT_CALL(*mock, nack)
82+
.WillOnce(
83+
Return(ByMove(make_ready_future(PermissionDeniedError("uh-oh")))));
84+
ScopedLog log;
85+
AckHandlerWrapper tested(std::move(mock));
86+
tested.nack();
87+
EXPECT_THAT(log.ExtractLines(), Not(Contains(HasSubstr(" nack()"))));
88+
}
89+
90+
TEST(AckHandlerWrapper, DeliveryAttempt) {
91+
auto mock = absl::make_unique<MockExactlyOnceAckHandlerImpl>();
92+
EXPECT_CALL(*mock, delivery_attempt).WillOnce(Return(42));
93+
AckHandlerWrapper tested(std::move(mock), "test-id");
94+
EXPECT_EQ(tested.delivery_attempt(), 42);
95+
}
96+
97+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
98+
} // namespace pubsub_internal
99+
} // namespace cloud
100+
} // namespace google

google/cloud/pubsub/internal/subscription_session.cc

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "google/cloud/pubsub/internal/subscription_session.h"
1616
#include "google/cloud/pubsub/ack_handler.h"
1717
#include "google/cloud/pubsub/exactly_once_ack_handler.h"
18+
#include "google/cloud/pubsub/internal/ack_handler_wrapper.h"
1819
#include "google/cloud/pubsub/internal/streaming_subscription_batch_source.h"
1920
#include "google/cloud/pubsub/internal/subscription_lease_management.h"
2021
#include "google/cloud/pubsub/internal/subscription_message_queue.h"
@@ -27,39 +28,8 @@ namespace pubsub_internal {
2728
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2829
namespace {
2930

30-
using ::google::cloud::pubsub::AckHandler;
3131
using ::google::cloud::pubsub::ExactlyOnceAckHandler;
3232

33-
class AckHandlerWrapper : public AckHandler::Impl {
34-
public:
35-
explicit AckHandlerWrapper(std::unique_ptr<ExactlyOnceAckHandler::Impl> impl,
36-
std::string message_id)
37-
: impl_(std::move(impl)), message_id_(std::move(message_id)) {}
38-
~AckHandlerWrapper() override = default;
39-
40-
void ack() override {
41-
(void)impl_->ack().then([id = std::move(message_id_)](auto f) {
42-
auto status = f.get();
43-
GCP_LOG(WARNING) << "error while trying to ack(), status=" << status
44-
<< ", message_id=" << id;
45-
});
46-
}
47-
void nack() override {
48-
(void)impl_->nack().then([id = std::move(message_id_)](auto f) {
49-
auto status = f.get();
50-
GCP_LOG(WARNING) << "error while trying to nack(), status=" << status
51-
<< ", message_id=" << id;
52-
});
53-
}
54-
std::int32_t delivery_attempt() const override {
55-
return impl_->delivery_attempt();
56-
}
57-
58-
private:
59-
std::unique_ptr<ExactlyOnceAckHandler::Impl> impl_;
60-
std::string message_id_;
61-
};
62-
6333
class SubscriptionSessionImpl
6434
: public std::enable_shared_from_this<SubscriptionSessionImpl> {
6535
public:

google/cloud/pubsub/pubsub_client_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pubsub_client_unit_tests = [
2121
"blocking_publisher_connection_test.cc",
2222
"blocking_publisher_test.cc",
2323
"exactly_once_ack_handler_test.cc",
24+
"internal/ack_handler_wrapper_test.cc",
2425
"internal/batching_publisher_connection_test.cc",
2526
"internal/default_batch_sink_test.cc",
2627
"internal/defaults_test.cc",

0 commit comments

Comments
 (0)