Kinetic C/C++ Client
 All Classes Functions Variables Pages
nonblocking_packet_sender.cc
1 /*
2  * kinetic-cpp-client
3  * Copyright (C) 2014 Seagate Technology.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18  *
19  */
20 
21 #include "nonblocking_packet_sender.h"
22 
23 namespace kinetic {
24 
25 using std::string;
26 using std::shared_ptr;
27 using std::unique_ptr;
28 using std::move;
29 using std::make_pair;
30 
31 NonblockingSender::NonblockingSender(shared_ptr<SocketWrapperInterface> socket_wrapper,
32  shared_ptr<NonblockingReceiverInterface> receiver,
33  shared_ptr<NonblockingPacketWriterFactoryInterface> packet_writer_factory,
34  HmacProvider hmac_provider,
35  const ConnectionOptions &connection_options)
36  : socket_wrapper_(socket_wrapper), receiver_(receiver),
37  packet_writer_factory_(packet_writer_factory), hmac_provider_(hmac_provider),
38  connection_options_(connection_options), sequence_number_(0), current_writer_(nullptr),
39  handler_(nullptr) {}
40 
41 void NonblockingSender::Enqueue(unique_ptr<Message> message, unique_ptr<Command> command,
42  const shared_ptr<const string> value, unique_ptr<HandlerInterface> handler,
43  HandlerKey handler_key) {
44 
45  command->mutable_header()->set_connectionid(receiver_->connection_id());
46  command->mutable_header()->set_sequence(sequence_number_++);
47  /* COMMAND PART OF MESSAGE IS FINALIZED */
48  message->set_commandbytes(command->SerializeAsString());
49 
50  if(message->authtype() == com::seagate::kinetic::client::proto::Message_AuthType_HMACAUTH){
51  message->mutable_hmacauth()->set_identity(connection_options_.user_id);
52  message->mutable_hmacauth()->set_hmac(hmac_provider_.ComputeHmac(*message, connection_options_.hmac_key));
53  }
54 
55  unique_ptr<Request> request(new Request());
56  request->message = move(message);
57  request->command = move(command);
58  request->value = value;
59  request->handler = move(handler);
60  request->handler_key = handler_key;
61 
62  request_queue_.push_back(move(request));
63 }
64 
65 NonblockingSender::~NonblockingSender() {
66  while (!request_queue_.empty()) {
67  unique_ptr<Request> request = move(request_queue_.front());
68  request_queue_.pop_front();
69  request->handler->Error(
70  KineticStatus(StatusCode::CLIENT_SHUTDOWN, "Sender shutdown"),
71  nullptr);
72  }
73 }
74 
75 NonblockingPacketServiceStatus NonblockingSender::Send() {
76  while (true) {
77  if (!current_writer_) {
78  if (request_queue_.empty()) {
79  return kIdle;
80  }
81 
82  // Start working on the next thing on the request queue
83  unique_ptr<Request> request = move(request_queue_.front());
84  request_queue_.pop_front();
85  message_sequence_ = request->command->header().sequence();
86  handler_key_ = request->handler_key;
87  current_writer_ = move(packet_writer_factory_->CreateWriter(socket_wrapper_,
88  move(request->message), request->value));
89  handler_ = move(request->handler);
90  }
91 
92  NonblockingStringStatus status = current_writer_->Write();
93  if (status != kDone) {
94  if (status == kInProgress) {
95  return kIoWait;
96  }
97 
98  CHECK_EQ(kFailed, status);
99 
100  handler_->Error(
101  KineticStatus(StatusCode::CLIENT_IO_ERROR, "I/O write error"), nullptr);
102  handler_.reset();
103 
104  while (!request_queue_.empty()) {
105  unique_ptr<Request> request = move(request_queue_.front());
106  request_queue_.pop_front();
107  request->handler->Error(KineticStatus(StatusCode::CLIENT_IO_ERROR,
108  "I/O write error"), nullptr);
109  }
110  return kError;
111  }
112 
113  // We're done with this request
114  current_writer_.reset();
115 
116  if (!receiver_->Enqueue(handler_, message_sequence_, handler_key_)) {
117  LOG(WARNING) << "Could not enqueue handler; already had a handler for sequence " <<
118  message_sequence_ << " and handler key " << handler_key_;
119  handler_->Error(KineticStatus(StatusCode::CLIENT_INTERNAL_ERROR,
120  "Could not enqueue handler"), nullptr);
121  }
122  handler_.reset();
123  }
124 }
125 
126 bool NonblockingSender::Remove(HandlerKey key) {
127  for (auto it = request_queue_.begin(); it != request_queue_.end(); it++) {
128  if ((*it)->handler_key == key) {
129  request_queue_.erase(it);
130  return true;
131  }
132  }
133  return false;
134 }
135 
136 } // namespace kinetic
int user_id
The ID of the user to connect as.
std::string hmac_key
The HMAC key of the user specified in user_id.