Kinetic C/C++ Client
 All Classes Functions Variables Pages
blocking_kinetic_connection.cc
1 /*
2  * kinetic-cpp-client
3  * Copyright (C) 2014 Seagate Technology.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18  *
19  */
20 
21 #include <memory>
22 #include <sys/select.h>
23 #include <errno.h>
24 #include <stdexcept>
25 #include "kinetic/blocking_kinetic_connection.h"
26 
27 
28 namespace kinetic {
29 
30 using std::shared_ptr;
31 using std::unique_ptr;
32 using std::string;
33 using std::make_shared;
34 using std::move;
35 
36 BlockingKineticConnection::BlockingKineticConnection( unique_ptr<NonblockingKineticConnection> nonblocking_connection,
37  unsigned int network_timeout_seconds)
38  : network_timeout_seconds_(network_timeout_seconds) {
39  nonblocking_connection_ = std::move(nonblocking_connection);
40 }
41 
42 BlockingKineticConnection::~BlockingKineticConnection() {}
43 
45  friend class BlockingKineticConnection;
46 
47  public:
48  BlockingCallbackState() : done_(false), success_(false),
49  error_(KineticStatus(StatusCode::OK, "default -- never seen")) { }
50 
51  virtual ~BlockingCallbackState() {}
52 
53  protected:
54  bool done_;
55  bool success_;
56  KineticStatus error_;
57 
58  void OnSuccess() {
59  done_ = true;
60  success_ = true;
61  }
62 
63  void OnError(KineticStatus error) {
64  done_ = true;
65  success_ = false;
66  error_ = error;
67  }
68 };
69 
71  public:
72  virtual void Success() {
73  OnSuccess();
74  }
75 
76  virtual void Failure(KineticStatus error) {
77  OnError(error);
78  }
79 
80  private:
81 };
82 
83 KineticStatus BlockingKineticConnection::NoOp() {
84  auto handler = make_shared<SimpleCallback>();
85  return RunOperation(handler, nonblocking_connection_->NoOp(handler));
86 }
87 
89  public:
91  unique_ptr<string>& actual_key,
92  unique_ptr<KineticRecord>& record,
93  bool want_actual_key)
94  : actual_key_(actual_key), record_(record),
95  want_actual_key_(want_actual_key) {}
96  virtual void Success(const string &key, unique_ptr<KineticRecord> record) {
97  OnSuccess();
98 
99  if (want_actual_key_) {
100  if (actual_key_) {
101  *actual_key_ = key;
102  } else {
103  actual_key_.reset(new string(key));
104  }
105  }
106 
107  record_ = std::move(record);
108  }
109 
110  virtual void Failure(KineticStatus error) {
111  OnError(error);
112  }
113 
114  private:
115  unique_ptr<string>& actual_key_;
116  unique_ptr<KineticRecord>& record_;
117  bool want_actual_key_;
118 };
119 
121  public BlockingCallbackState {
122  public:
123  explicit BlockingGetVersionCallback(unique_ptr<string>& version)
124  : version_(version) {}
125 
126  virtual void Success(const std::string& version) {
127  OnSuccess();
128 
129  if (version_) {
130  *version_ = version;
131  } else {
132  version_.reset(new string(version));
133  }
134  }
135 
136  virtual void Failure(KineticStatus error) {
137  OnError(error);
138  }
139 
140  private:
141  unique_ptr<string>& version_;
142 };
143 
145  public BlockingCallbackState {
146  public:
147  explicit BlockingGetKeyRangeCallback(unique_ptr<vector<string>>& keys)
148  : keys_(keys) {}
149 
150  virtual void Success(unique_ptr<vector<string>> keys) {
151  OnSuccess();
152 
153  keys_ = move(keys);
154  }
155 
156  virtual void Failure(KineticStatus error) {
157  OnError(error);
158  }
159 
160  private:
161  unique_ptr<vector<string>>& keys_;
162 };
163 
165  nonblocking_connection_->SetClientClusterVersion(cluster_version);
166 }
167 
168 KineticStatus BlockingKineticConnection::Get(const shared_ptr<const string> key,
169  unique_ptr<KineticRecord>& record) {
170  unique_ptr<string> actual_key(nullptr);
171  auto handler = make_shared<BlockingGetCallback>(actual_key, record, false);
172  return RunOperation(handler, nonblocking_connection_->Get(key, handler));
173 }
174 
175 KineticStatus BlockingKineticConnection::Get(const string& key, unique_ptr<KineticRecord>& record) {
176  return this->Get(make_shared<string>(key), record);
177 }
178 
180  public:
181  virtual void Success() {
182  OnSuccess();
183  }
184 
185  virtual void Failure(KineticStatus error) {
186  OnError(error);
187  }
188 
189  private:
190 };
191 
192 KineticStatus BlockingKineticConnection::Put(const shared_ptr<const string> key,
193  const shared_ptr<const string> current_version, WriteMode mode,
194  const shared_ptr<const KineticRecord> record,
195  PersistMode persistMode) {
196  auto handler = make_shared<BlockingPutCallback>();
197 
198  return RunOperation(handler,
199  nonblocking_connection_->Put(key, current_version, mode, record, handler, persistMode));
200 }
201 
202 KineticStatus BlockingKineticConnection::Put(const string& key,
203  const string& current_version, WriteMode mode,
204  const KineticRecord& record,
205  PersistMode persistMode) {
206  return this->Put(make_shared<string>(key), make_shared<string>(current_version), mode,
207  make_shared<KineticRecord>(record), persistMode);
208 }
209 
210 KineticStatus BlockingKineticConnection::Put(const shared_ptr<const string> key,
211  const shared_ptr<const string> current_version, WriteMode mode,
212  const shared_ptr<const KineticRecord> record) {
213  auto handler = make_shared<BlockingPutCallback>();
214 
215  // Rely on nonblocking_connection to handle the default PersistMode case
216  return RunOperation(handler,
217  nonblocking_connection_->Put(key, current_version, mode, record, handler));
218 }
219 
220 KineticStatus BlockingKineticConnection::Put(const string& key,
221  const string& current_version, WriteMode mode,
222  const KineticRecord& record) {
223  return this->Put(make_shared<string>(key), make_shared<string>(current_version), mode,
224  make_shared<KineticRecord>(record));
225 }
226 
227 KineticStatus BlockingKineticConnection::Delete(const shared_ptr<const string> key,
228  const shared_ptr<const string> version, WriteMode mode, PersistMode persistMode) {
229  auto callback = make_shared<SimpleCallback>();
230  return RunOperation(callback, nonblocking_connection_->Delete(key, version, mode,
231  callback, persistMode));
232 }
233 
234 KineticStatus BlockingKineticConnection::Delete(const string& key, const string& version,
235  WriteMode mode, PersistMode persistMode) {
236  return this->Delete(make_shared<string>(key),
237  make_shared<string>(version), mode, persistMode);
238 }
239 
240 KineticStatus BlockingKineticConnection::Delete(const shared_ptr<const string> key,
241  const shared_ptr<const string> version, WriteMode mode) {
242  auto callback = make_shared<SimpleCallback>();
243  // Let the nonblocking_connection handle the default persistOption
244  return RunOperation(callback, nonblocking_connection_->Delete(key, version, mode, callback));
245 }
246 
247 KineticStatus BlockingKineticConnection::Delete(const string& key, const string& version,
248  WriteMode mode) {
249  return this->Delete(make_shared<string>(key), make_shared<string>(version), mode);
250 }
251 
252 KineticStatus BlockingKineticConnection::InstantErase(const shared_ptr<string> pin) {
253  auto callback = make_shared<SimpleCallback>();
254  return RunOperation(callback, nonblocking_connection_->InstantErase(pin, callback));
255 }
256 KineticStatus BlockingKineticConnection::InstantErase(const string& pin) {
257  return this->InstantErase(make_shared<string>(pin));
258 }
259 
260 KineticStatus BlockingKineticConnection::SecureErase(const shared_ptr<string> pin){
261  auto callback = make_shared<SimpleCallback>();
262  return RunOperation(callback, nonblocking_connection_->SecureErase(pin, callback));
263 }
264 KineticStatus BlockingKineticConnection::SecureErase(const string& pin){
265  return this->SecureErase(make_shared<string>(pin));
266 }
267 
268 KineticStatus BlockingKineticConnection::SetClusterVersion(int64_t new_cluster_version) {
269  auto callback = make_shared<SimpleCallback>();
270  return RunOperation(callback,
271  nonblocking_connection_->SetClusterVersion(new_cluster_version, callback));
272 }
273 
275  public:
276  explicit GetLogCallback(unique_ptr<DriveLog>& drive_log) : drive_log_(drive_log) {}
277 
278  virtual void Success(unique_ptr<DriveLog> drive_log) {
279  OnSuccess();
280  drive_log_ = std::move(drive_log);
281  }
282  virtual void Failure(KineticStatus error) {
283  OnError(error);
284  }
285 
286  private:
287  unique_ptr<DriveLog>& drive_log_;
288 };
289 
290 KineticStatus BlockingKineticConnection::GetLog(unique_ptr<DriveLog>& drive_log) {
291  auto callback = make_shared<GetLogCallback>(drive_log);
292  return RunOperation(callback, nonblocking_connection_->GetLog(callback));
293 }
294 
295 KineticStatus BlockingKineticConnection::GetLog(const vector<Command_GetLog_Type>& types, unique_ptr<DriveLog>& drive_log) {
296  auto callback = make_shared<GetLogCallback>(drive_log);
297  return RunOperation(callback, nonblocking_connection_->GetLog(types, callback));
298 }
299 
300 KineticStatus BlockingKineticConnection::UpdateFirmware(const shared_ptr<const string>
301  new_firmware) {
302  auto callback = make_shared<SimpleCallback>();
303  return RunOperation(callback, nonblocking_connection_->UpdateFirmware(new_firmware, callback));
304 }
305 
306 KineticStatus BlockingKineticConnection::SetACLs(const shared_ptr<const list<ACL>> acls) {
307  auto callback = make_shared<SimpleCallback>();
308  return RunOperation(callback, nonblocking_connection_->SetACLs(acls, callback));
309 }
310 
311 KineticStatus BlockingKineticConnection::SetErasePIN(const shared_ptr<const string> new_pin,
312  const shared_ptr<const string> current_pin){
313  auto callback = make_shared<SimpleCallback>();
314  return RunOperation(callback, nonblocking_connection_->SetErasePIN(new_pin, current_pin, callback));
315 }
316 KineticStatus BlockingKineticConnection::SetErasePIN(const string& new_pin, const string& current_pin){
317  return this->SetErasePIN(make_shared<string>(new_pin),make_shared<string>(current_pin));
318 }
319 
320 KineticStatus BlockingKineticConnection::SetLockPIN(const shared_ptr<const string> new_pin,
321  const shared_ptr<const string> current_pin){
322  auto callback = make_shared<SimpleCallback>();
323  return RunOperation(callback, nonblocking_connection_->SetLockPIN(new_pin, current_pin, callback));
324 }
325 KineticStatus BlockingKineticConnection::SetLockPIN(const string& new_pin, const string& current_pin){
326  return this->SetLockPIN(make_shared<string>(new_pin),make_shared<string>(current_pin));
327 }
328 
329 KineticStatus BlockingKineticConnection::LockDevice(const shared_ptr<string> pin){
330  auto callback = make_shared<SimpleCallback>();
331  return RunOperation(callback, nonblocking_connection_->LockDevice(pin, callback));
332 }
333 KineticStatus BlockingKineticConnection::LockDevice(const string& pin){
334  return this->LockDevice(make_shared<string>(pin));
335 }
336 
337 KineticStatus BlockingKineticConnection::UnlockDevice(const shared_ptr<string> pin){
338  auto callback = make_shared<SimpleCallback>();
339  return RunOperation(callback, nonblocking_connection_->UnlockDevice(pin, callback));
340 }
341 KineticStatus BlockingKineticConnection::UnlockDevice(const string& pin){
342  return this->UnlockDevice(make_shared<string>(pin));
343 }
344 
345 KineticStatus BlockingKineticConnection::GetNext(const shared_ptr<const string> key,
346  unique_ptr<string>& actual_key, unique_ptr<KineticRecord>& record) {
347  auto callback = make_shared<BlockingGetCallback>(actual_key, record, true);
348  return RunOperation(callback, nonblocking_connection_->GetNext(key, callback));
349 }
350 
351 KineticStatus BlockingKineticConnection::GetNext(const string& key,
352  unique_ptr<string>& actual_key, unique_ptr<KineticRecord>& record) {
353  return this->GetNext(make_shared<string>(key), actual_key, record);
354 }
355 
356 KineticStatus BlockingKineticConnection::GetPrevious(const shared_ptr<const string> key,
357  unique_ptr<string>& actual_key, unique_ptr<KineticRecord>& record) {
358  auto callback = make_shared<BlockingGetCallback>(actual_key, record,
359  true);
360  return RunOperation(callback, nonblocking_connection_->GetPrevious(key, callback));
361 }
362 
363 KineticStatus BlockingKineticConnection::GetPrevious(const string& key,
364  unique_ptr<string>& actual_key, unique_ptr<KineticRecord>& record) {
365  return this->GetPrevious(make_shared<string>(key), actual_key, record);
366 }
367 
368 KineticStatus BlockingKineticConnection::GetVersion(const shared_ptr<const string> key,
369  unique_ptr<string>& version) {
370  auto callback = make_shared<BlockingGetVersionCallback>(version);
371  return RunOperation(callback, nonblocking_connection_->GetVersion(key, callback));
372 }
373 
374 KineticStatus BlockingKineticConnection::GetVersion(const string& key,
375  unique_ptr<string>& version) {
376  return this->GetVersion(make_shared<string>(key), version);
377 }
378 
379 KineticStatus BlockingKineticConnection::GetKeyRange(const shared_ptr<const string> start_key,
380  bool start_key_inclusive,
381  const shared_ptr<const string> end_key,
382  bool end_key_inclusive,
383  bool reverse_results,
384  int32_t max_results,
385  unique_ptr<vector<string>>& keys) {
386  auto callback = make_shared<BlockingGetKeyRangeCallback>(keys);
387 
388  return RunOperation(callback,
389  nonblocking_connection_->GetKeyRange(start_key,
390  start_key_inclusive,
391  end_key,
392  end_key_inclusive,
393  reverse_results,
394  max_results,
395  callback));
396 }
397 
398 KineticStatus BlockingKineticConnection::GetKeyRange(const string& start_key,
399  bool start_key_inclusive,
400  const string& end_key,
401  bool end_key_inclusive,
402  bool reverse_results,
403  int32_t max_results,
404  unique_ptr<vector<string>>& keys) {
405  return this->GetKeyRange(make_shared<string>(start_key),
406  start_key_inclusive, make_shared<string>(end_key),
407  end_key_inclusive, reverse_results, max_results,
408  keys);
409 }
410 
411 
412 KeyRangeIterator BlockingKineticConnection::IterateKeyRange(
413  const shared_ptr<const string> start_key,
414  bool start_key_inclusive,
415  const shared_ptr<const string> end_key,
416  bool end_key_inclusive,
417  unsigned int frame_size) {
418  KeyRangeIterator it(this,
419  frame_size,
420  *start_key,
421  start_key_inclusive,
422  *end_key,
423  end_key_inclusive);
424  return it;
425 }
426 
427 KeyRangeIterator BlockingKineticConnection::IterateKeyRange(const string& start_key,
428  bool start_key_inclusive,
429  const string& end_key,
430  bool end_key_inclusive,
431  unsigned int frame_size) {
432  return this->IterateKeyRange(make_shared<string>(start_key),
433  start_key_inclusive, make_shared<string>(end_key),
434  end_key_inclusive, frame_size);
435 }
436 
437 KineticStatus BlockingKineticConnection::P2PPush(const P2PPushRequest& push_request,
438  unique_ptr<vector<KineticStatus>>& operation_statuses) {
439  return this->P2PPush(make_shared<P2PPushRequest>(push_request), operation_statuses);
440 }
441 
443  public:
444  explicit BlockingP2PPushCallback(unique_ptr<vector<KineticStatus>>& statuses)
445  : statuses_(statuses) {}
446  virtual void Success(unique_ptr<vector<KineticStatus>> statuses, const Command& response) {
447  OnSuccess();
448  statuses_ = std::move(statuses);
449  }
450  virtual void Failure(KineticStatus error, Command const * const response) {
451  OnError(error);
452  }
453 
454  private:
455  unique_ptr<vector<KineticStatus>>& statuses_;
456 };
457 
458 KineticStatus BlockingKineticConnection::P2PPush(
459  const shared_ptr<const P2PPushRequest> push_request,
460  unique_ptr<vector<KineticStatus>>& operation_statuses) {
461  auto callback = make_shared<BlockingP2PPushCallback>(operation_statuses);
462  return RunOperation(callback,
463  nonblocking_connection_->P2PPush(push_request, callback));
464 }
465 
466 KineticStatus BlockingKineticConnection::RunOperation(
467  shared_ptr<BlockingCallbackState> callback,
468  HandlerKey handler_key) {
469  fd_set read_fds, write_fds;
470  int nfds;
471 
472  if (!nonblocking_connection_->Run(&read_fds, &write_fds, &nfds)) {
473  nonblocking_connection_->RemoveHandler(handler_key);
474  return KineticStatus(StatusCode::CLIENT_IO_ERROR, "Connection failed");
475  }
476 
477  while (!(callback->done_)) {
478  struct timeval tv;
479  tv.tv_sec = network_timeout_seconds_;
480  tv.tv_usec = 0;
481 
482  int number_ready_fds = select(nfds, &read_fds, &write_fds, NULL, &tv);
483  if (number_ready_fds < 0) {
484  // select() returned an error
485  nonblocking_connection_->RemoveHandler(handler_key);
486  return KineticStatus(StatusCode::CLIENT_IO_ERROR, strerror(errno));
487  } else if (number_ready_fds == 0) {
488  // select() returned before any sockets were ready meaning the connection timed out
489  nonblocking_connection_->RemoveHandler(handler_key);
490  return KineticStatus(StatusCode::CLIENT_IO_ERROR, "Network timeout");
491  }
492 
493  // At least one FD was ready meaning that the connection is ready
494  // to make some progress
495  if (!nonblocking_connection_->Run(&read_fds, &write_fds, &nfds)) {
496  nonblocking_connection_->RemoveHandler(handler_key);
497  return KineticStatus(StatusCode::CLIENT_IO_ERROR, "Connection failed");
498  }
499  }
500 
501  // done was set, meaning handler was invoked and therefore removed internally
502 
503  if (callback->success_) {
504  return KineticStatus(StatusCode::OK, "");
505  } else {
506  return callback->error_;
507  }
508 }
509 
510 } // namespace kinetic
void SetClientClusterVersion(int64_t cluster_version)
If the drive has a non-zero cluster version, requests will fail unless the developer tells the client...
Indicates whether a Kinetic operation (get, put, security, etc) put succeeded or failed. Unlike Status it provides details like whether the failure resulted from a version or an HMAC error.