Kinetic C/C++ Client
 All Classes Functions Variables Pages
nonblocking_packet_receiver.cc
1 /*
2  * kinetic-cpp-client
3  * Copyright (C) 2014 Seagate Technology.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18  *
19  */
20 
21 #include "nonblocking_packet_receiver.h"
22 #include <chrono>
23 #include <exception>
24 #include <stdexcept>
25 namespace kinetic {
26 
27 using com::seagate::kinetic::client::proto::Message_AuthType_UNSOLICITEDSTATUS;
28 using com::seagate::kinetic::client::proto::Command_Status_StatusCode_SUCCESS;
29 using std::string;
30 using std::shared_ptr;
31 using std::unique_ptr;
32 using std::move;
33 using std::make_pair;
34 
35 
36 
37 KineticStatus GetKineticStatus(StatusCode code, int64_t expected_cluster_version) {
38  switch (code) {
39  case StatusCode::CLIENT_IO_ERROR:
40  return KineticStatus(code, "IO error");
41  case StatusCode::CLIENT_SHUTDOWN:
42  return KineticStatus(code, "Client shutdown");
43  case StatusCode::PROTOCOL_ERROR_RESPONSE_NO_ACKSEQUENCE:
44  return KineticStatus(code, "Response did not contain ack sequence");
45  case StatusCode::CLIENT_RESPONSE_HMAC_VERIFICATION_ERROR:
46  return KineticStatus(code, "Response HMAC verification failed");
47  case StatusCode::REMOTE_HMAC_ERROR:
48  return KineticStatus(code, "Remote HMAC verification failed");
49  case StatusCode::REMOTE_NOT_AUTHORIZED:
50  return KineticStatus(code, "Not authorized");
51  case StatusCode::REMOTE_CLUSTER_VERSION_MISMATCH:
52  return KineticStatus(code, "Cluster version mismatch", expected_cluster_version);
53  case StatusCode::REMOTE_INTERNAL_ERROR:
54  return KineticStatus(code, "Remote internal error");
55  case StatusCode::REMOTE_HEADER_REQUIRED:
56  return KineticStatus(code, "Request requires a header to be set");
57  case StatusCode::REMOTE_NOT_FOUND:
58  return KineticStatus(code, "Key not found");
59  case StatusCode::REMOTE_VERSION_MISMATCH:
60  return KineticStatus(code, "Version mismatch");
61  case StatusCode::REMOTE_SERVICE_BUSY:
62  return KineticStatus(code, "Remote service is busy");
63  case StatusCode::REMOTE_EXPIRED:
64  return KineticStatus(code, "Remote timeout");
65  case StatusCode::REMOTE_DATA_ERROR:
66  return KineticStatus(code, "Remote transient data error");
67  case StatusCode::REMOTE_PERM_DATA_ERROR:
68  return KineticStatus(code, "Remote permanent data error");
69  case StatusCode::REMOTE_REMOTE_CONNECTION_ERROR:
70  return KineticStatus(code, "Remote connection to peer failed");
71  case StatusCode::REMOTE_NO_SPACE:
72  return KineticStatus(code, "No space left");
73  case StatusCode::REMOTE_NO_SUCH_HMAC_ALGORITHM:
74  return KineticStatus(code, "Unknown HMAC algorithm");
75  case StatusCode::REMOTE_NESTED_OPERATION_ERRORS:
76  return KineticStatus(code, "Operation completed but has nested errors");
77  default:
78  return KineticStatus(code, "Internal Error");
79  }
80 }
81 
83 public:
84  bool done;
85  bool success;
86  HandshakeHandler():done(false),success(false){}
87  void Handle(const Command &response, unique_ptr<const string> value){
88  done = success = true;
89  }
90  void Error(KineticStatus error, Command const * const response){
91  done = true;
92  }
93 };
94 
95 NonblockingReceiver::NonblockingReceiver(shared_ptr<SocketWrapperInterface> socket_wrapper,
96  HmacProvider hmac_provider, const ConnectionOptions &connection_options)
97 : socket_wrapper_(socket_wrapper), hmac_provider_(hmac_provider),
98 connection_options_(connection_options), nonblocking_response_(NULL),
99 connection_id_(0), handler_(NULL) {
100 
101  shared_ptr<HandshakeHandler> hh = std::make_shared<HandshakeHandler>();
102  map_.insert(make_pair(-1,make_pair(hh,-1)));
103  handler_to_message_seq_map_.insert(make_pair(-1, -1));
104 
105  auto start = std::chrono::steady_clock::now();
106 
107  while(true){
108  if(Receive() == kError)
109  break;
110  if(hh->done)
111  break;
112  auto now = std::chrono::steady_clock::now();
113  if(std::chrono::duration_cast<std::chrono::seconds>(now-start).count() > 30)
114  break;
115  }
116  if(!hh->success)
117  throw std::runtime_error("Could not complete handshake.");
118 
119 }
120 
121 NonblockingReceiver::~NonblockingReceiver() {
122  if (nonblocking_response_ != NULL) {
123  delete nonblocking_response_;
124  }
125  CallAllErrorHandlers(KineticStatus(StatusCode::CLIENT_SHUTDOWN, "Receiver shutdown"));
126 }
127 
128 bool NonblockingReceiver::Enqueue(shared_ptr<HandlerInterface> handler, google::int64 sequence,
129  HandlerKey handler_key) {
130  auto seq_to_handler_res = map_.insert(make_pair(sequence, make_pair(handler, handler_key)));
131  if (!seq_to_handler_res.second) {
132  LOG(WARNING) << "Found existing handler for sequence " << sequence;
133  return false;
134  }
135  auto handler_key_to_seq_res =
136  handler_to_message_seq_map_.insert(make_pair(handler_key, sequence));
137  if (!handler_key_to_seq_res.second) {
138  LOG(WARNING) << "Found existing sequence " << sequence << " for handler_key "
139  << handler_key;
140  auto handler_map_entry = map_.find(sequence);
141  CHECK(handler_map_entry != map_.end())
142  << "Couldn't find just-inserted handler map entry for sequence "
143  << sequence;
144  map_.erase(handler_map_entry);
145  return false;
146  }
147  return true;
148 }
149 
150 
151 NonblockingPacketServiceStatus NonblockingReceiver::Receive() {
152  while (true) {
153  if (nonblocking_response_ == NULL) {
154  if (map_.empty()) {
155  return kIdle;
156  }
157 
158  // Start working on the next thing in the request queue
159  nonblocking_response_ = new NonblockingPacketReader(
160  socket_wrapper_, &message_, value_);
161  }
162 
163  NonblockingStringStatus status = nonblocking_response_->Read();
164  if (status != kDone) {
165  if (status == kInProgress) {
166  return kIoWait;
167  }
168  CallAllErrorHandlers(KineticStatus(StatusCode::CLIENT_IO_ERROR, "I/O read error"));
169  return kError;
170  }
171 
172  // We're done receiving this response
173  delete nonblocking_response_;
174  nonblocking_response_ = NULL;
175 
176  if(message_.has_hmacauth())
177  if (!hmac_provider_.ValidateHmac(message_, connection_options_.hmac_key)) {
178  LOG(INFO) << "Response HMAC mismatch";
179  CallAllErrorHandlers(KineticStatus(StatusCode::CLIENT_RESPONSE_HMAC_VERIFICATION_ERROR,
180  "Response HMAC mismatch"));
181  return kIdle;
182  }
183  if(!command_.ParseFromString(message_.commandbytes())){
184  CallAllErrorHandlers(KineticStatus(StatusCode::CLIENT_IO_ERROR, "I/O read error parsing proto::Command"));
185  return kError;
186  }
187  if (command_.header().has_connectionid()) {
188  connection_id_ = command_.header().connectionid();
189  }
190 
191  if(message_.authtype() == Message_AuthType_UNSOLICITEDSTATUS)
192  command_.mutable_header()->set_acksequence(-1);
193 
194  if (!command_.header().has_acksequence()) {
195  LOG(INFO) << "Got response without an acksequence";
196  CallAllErrorHandlers(KineticStatus(StatusCode::PROTOCOL_ERROR_RESPONSE_NO_ACKSEQUENCE,
197  "Response had no acksequence"));
198  return kIdle;
199  }
200 
201  auto find_result = map_.find(command_.header().acksequence());
202  if (find_result == map_.end()) {
203  LOG(WARNING) << "Couldn't find a handler for acksequence " <<
204  command_.header().acksequence();
205  continue;
206  }
207  auto handler_pair = find_result->second;
208  handler_ = handler_pair.first;
209  map_.erase(find_result);
210 
211  CHECK_EQ((size_t) 1, handler_to_message_seq_map_.erase(handler_pair.second))
212  << "Couldn't delete handler key to sequence entry for handler_key "
213  << handler_pair.second;
214 
215  if (command_.status().code() == Command_Status_StatusCode_SUCCESS) {
216  handler_->Handle(command_, move(value_));
217  } else {
218  handler_->Error(GetKineticStatus(ConvertFromProtoStatus(
219  command_.status().code()), command_.header().clusterversion()),
220  &command_);
221  }
222 
223  handler_.reset();
224  }
225 }
226 
227 int64_t NonblockingReceiver::connection_id() {
228  return connection_id_;
229 }
230 
231 void NonblockingReceiver::CallAllErrorHandlers(KineticStatus error) {
232  if (handler_) {
233  handler_->Error(error, nullptr);
234  handler_.reset();
235  }
236 
237  auto iter = map_.begin();
238  while (iter != map_.end()) {
239  auto handler_pair = iter->second;
240  shared_ptr<HandlerInterface> handler = handler_pair.first;
241  HandlerKey handler_key = handler_pair.second;
242 
243  CHECK_EQ((size_t) 1, handler_to_message_seq_map_.erase(handler_key))
244  << "Couldn't delete handler to sequence entry for handler_key " << handler_key;
245 
246  handler->Error(error, nullptr);
247  handler.reset();
248  iter++;
249  }
250  map_.clear();
251 }
252 
253 bool NonblockingReceiver::Remove(HandlerKey key) {
254  auto handler_key_to_seq = handler_to_message_seq_map_.find(key);
255  if (handler_key_to_seq == handler_to_message_seq_map_.end()) {
256  return false;
257  }
258 
259  google::protobuf::int64 seq = handler_key_to_seq->second;
260 
261  handler_to_message_seq_map_.erase(handler_key_to_seq);
262 
263  auto seq_to_handler = map_.find(seq);
264  CHECK(seq_to_handler != map_.end()) << "Handler key " << handler_key_to_seq->first
265  << " mapped to seq " << seq << " but no handler entry for that seq";
266 
267  auto handler_pair = seq_to_handler->second;
268  auto handler_key = handler_pair.second;
269  CHECK_EQ(handler_key, key);
270  map_.erase(seq_to_handler);
271 
272  return true;
273 }
274 
275 } // namespace kinetic
Wrapper class that handles computing HMACs. The supplied implementation uses openssl, but users can supply an alternate implementation that uses a different library (e. g. one providing specialized HW accelaration)
Definition: hmac_provider.h:33
std::string hmac_key
The HMAC key of the user specified in user_id.
Indicates whether a Kinetic operation (get, put, security, etc) put succeeded or failed. Unlike Status it provides details like whether the failure resulted from a version or an HMAC error.
Use this struct to pass all connection options to the KineticConnectionFactory.