Skip to content

Commit 8035b35

Browse files
authored
Create a write thread (istio#67)
* Create a write thread * use unique_ptr * fix asan test failure. * remove one std::move
1 parent 11f8ac6 commit 8035b35

File tree

1 file changed

+69
-13
lines changed

1 file changed

+69
-13
lines changed

mixerclient/src/grpc_transport.cc

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
* limitations under the License.
1414
*/
1515
#include "src/grpc_transport.h"
16+
17+
#include <condition_variable>
1618
#include <mutex>
19+
#include <queue>
1720
#include <thread>
1821

1922
namespace istio {
@@ -37,25 +40,26 @@ class GrpcStream final : public WriteInterface<RequestType> {
3740

3841
static void Start(
3942
std::shared_ptr<GrpcStream<RequestType, ResponseType>> grpc_stream) {
40-
std::thread t([grpc_stream]() { grpc_stream->ReadMainLoop(); });
41-
t.detach();
43+
std::thread read_t([grpc_stream]() { grpc_stream->ReadMainLoop(); });
44+
read_t.detach();
45+
std::thread write_t([grpc_stream]() { grpc_stream->WriteMainLoop(); });
46+
write_t.detach();
4247
}
4348

4449
void Write(const RequestType& request) override {
45-
std::lock_guard<std::mutex> lock(write_mutex_);
46-
if (!stream_->Write(request)) {
47-
GOOGLE_LOG(INFO) << "Stream Write failed: half close";
48-
write_closed_ = true;
49-
}
50+
// make a copy and push to the queue
51+
WriteQueuePush(new RequestType(request));
5052
}
5153

5254
void WritesDone() override {
53-
std::lock_guard<std::mutex> lock(write_mutex_);
54-
stream_->WritesDone();
55-
write_closed_ = true;
55+
// push a nullptr to indicate half close
56+
WriteQueuePush(nullptr);
5657
}
5758

58-
bool is_write_closed() const override { return write_closed_; }
59+
bool is_write_closed() const override {
60+
std::lock_guard<std::mutex> lock(write_closed_mutex_);
61+
return write_closed_;
62+
}
5963

6064
private:
6165
// The worker loop to read response messages.
@@ -67,23 +71,75 @@ class GrpcStream final : public WriteInterface<RequestType> {
6771
::grpc::Status status = stream_->Finish();
6872
GOOGLE_LOG(INFO) << "Stream Finished with status: "
6973
<< status.error_message();
74+
75+
// Notify Write thread to quit.
76+
set_write_closed();
77+
WriteQueuePush(nullptr);
78+
7079
// Convert grpc status to protobuf status.
7180
::google::protobuf::util::Status pb_status(
7281
::google::protobuf::util::error::Code(status.error_code()),
7382
::google::protobuf::StringPiece(status.error_message()));
7483
reader_->OnClose(pb_status);
7584
}
7685

86+
void WriteQueuePush(RequestType* request) {
87+
std::unique_lock<std::mutex> lk(write_queue_mutex_);
88+
write_queue_.emplace(request);
89+
cv_.notify_one();
90+
}
91+
92+
std::unique_ptr<RequestType> WriteQueuePop() {
93+
std::unique_lock<std::mutex> lk(write_queue_mutex_);
94+
while (write_queue_.empty()) {
95+
cv_.wait(lk);
96+
}
97+
auto ret = std::move(write_queue_.front());
98+
write_queue_.pop();
99+
return ret;
100+
}
101+
102+
void set_write_closed() {
103+
std::lock_guard<std::mutex> lock(write_closed_mutex_);
104+
write_closed_ = true;
105+
}
106+
107+
// The worker loop to write request message.
108+
void WriteMainLoop() {
109+
while (true) {
110+
auto request = WriteQueuePop();
111+
if (!request) {
112+
if (!is_write_closed()) {
113+
stream_->WritesDone();
114+
set_write_closed();
115+
}
116+
break;
117+
}
118+
if (!stream_->Write(*request)) {
119+
set_write_closed();
120+
break;
121+
}
122+
}
123+
}
124+
77125
// The client context.
78126
::grpc::ClientContext context_;
79-
// Mutex to make sure not calling stream_->Write() parallelly.
80-
std::mutex write_mutex_;
127+
81128
// The reader writer stream.
82129
StreamPtr stream_;
83130
// The reader interface from caller.
84131
ReadInterface<ResponseType>* reader_;
132+
85133
// Indicates if write is closed.
134+
mutable std::mutex write_closed_mutex_;
86135
bool write_closed_;
136+
137+
// Mutex to protect write queue.
138+
std::mutex write_queue_mutex_;
139+
// condition to wait for write_queue.
140+
std::condition_variable cv_;
141+
// a queue to store pending queue for write
142+
std::queue<std::unique_ptr<RequestType>> write_queue_;
87143
};
88144

89145
typedef GrpcStream<::istio::mixer::v1::CheckRequest,

0 commit comments

Comments
 (0)