22 #include <sys/select.h>
25 #include "kinetic/blocking_kinetic_connection.h"
30 using std::shared_ptr;
31 using std::unique_ptr;
33 using std::make_shared;
36 BlockingKineticConnection::BlockingKineticConnection( unique_ptr<NonblockingKineticConnection> nonblocking_connection,
37 unsigned int network_timeout_seconds)
38 : network_timeout_seconds_(network_timeout_seconds) {
39 nonblocking_connection_ = std::move(nonblocking_connection);
42 BlockingKineticConnection::~BlockingKineticConnection() {}
49 error_(
KineticStatus(StatusCode::OK,
"default -- never seen")) { }
72 virtual void Success() {
84 auto handler = make_shared<SimpleCallback>();
85 return RunOperation(handler, nonblocking_connection_->NoOp(handler));
91 unique_ptr<string>& actual_key,
92 unique_ptr<KineticRecord>& record,
94 : actual_key_(actual_key), record_(record),
95 want_actual_key_(want_actual_key) {}
96 virtual void Success(
const string &key, unique_ptr<KineticRecord> record) {
99 if (want_actual_key_) {
103 actual_key_.reset(
new string(key));
107 record_ = std::move(record);
115 unique_ptr<string>& actual_key_;
116 unique_ptr<KineticRecord>& record_;
117 bool want_actual_key_;
124 : version_(version) {}
126 virtual void Success(
const std::string& version) {
132 version_.reset(
new string(version));
141 unique_ptr<string>& version_;
150 virtual void Success(unique_ptr<vector<string>> keys) {
161 unique_ptr<vector<string>>& keys_;
165 nonblocking_connection_->SetClientClusterVersion(cluster_version);
168 KineticStatus BlockingKineticConnection::Get(
const shared_ptr<const string> key,
169 unique_ptr<KineticRecord>& record) {
170 unique_ptr<string> actual_key(
nullptr);
171 auto handler = make_shared<BlockingGetCallback>(actual_key, record,
false);
172 return RunOperation(handler, nonblocking_connection_->Get(key, handler));
175 KineticStatus BlockingKineticConnection::Get(
const string& key, unique_ptr<KineticRecord>& record) {
176 return this->Get(make_shared<string>(key), record);
181 virtual void Success() {
192 KineticStatus BlockingKineticConnection::Put(
const shared_ptr<const string> key,
193 const shared_ptr<const string> current_version, WriteMode mode,
194 const shared_ptr<const KineticRecord> record,
195 PersistMode persistMode) {
196 auto handler = make_shared<BlockingPutCallback>();
198 return RunOperation(handler,
199 nonblocking_connection_->Put(key, current_version, mode, record, handler, persistMode));
202 KineticStatus BlockingKineticConnection::Put(
const string& key,
203 const string& current_version, WriteMode mode,
204 const KineticRecord& record,
205 PersistMode persistMode) {
206 return this->Put(make_shared<string>(key), make_shared<string>(current_version), mode,
207 make_shared<KineticRecord>(record), persistMode);
210 KineticStatus BlockingKineticConnection::Put(
const shared_ptr<const string> key,
211 const shared_ptr<const string> current_version, WriteMode mode,
212 const shared_ptr<const KineticRecord> record) {
213 auto handler = make_shared<BlockingPutCallback>();
216 return RunOperation(handler,
217 nonblocking_connection_->Put(key, current_version, mode, record, handler));
220 KineticStatus BlockingKineticConnection::Put(
const string& key,
221 const string& current_version, WriteMode mode,
222 const KineticRecord& record) {
223 return this->Put(make_shared<string>(key), make_shared<string>(current_version), mode,
224 make_shared<KineticRecord>(record));
227 KineticStatus BlockingKineticConnection::Delete(
const shared_ptr<const string> key,
228 const shared_ptr<const string> version, WriteMode mode, PersistMode persistMode) {
229 auto callback = make_shared<SimpleCallback>();
230 return RunOperation(callback, nonblocking_connection_->Delete(key, version, mode,
231 callback, persistMode));
234 KineticStatus BlockingKineticConnection::Delete(
const string& key,
const string& version,
235 WriteMode mode, PersistMode persistMode) {
236 return this->Delete(make_shared<string>(key),
237 make_shared<string>(version), mode, persistMode);
240 KineticStatus BlockingKineticConnection::Delete(
const shared_ptr<const string> key,
241 const shared_ptr<const string> version, WriteMode mode) {
242 auto callback = make_shared<SimpleCallback>();
244 return RunOperation(callback, nonblocking_connection_->Delete(key, version, mode, callback));
247 KineticStatus BlockingKineticConnection::Delete(
const string& key,
const string& version,
249 return this->Delete(make_shared<string>(key), make_shared<string>(version), mode);
252 KineticStatus BlockingKineticConnection::InstantErase(
const shared_ptr<string> pin) {
253 auto callback = make_shared<SimpleCallback>();
254 return RunOperation(callback, nonblocking_connection_->InstantErase(pin, callback));
256 KineticStatus BlockingKineticConnection::InstantErase(
const string& pin) {
257 return this->InstantErase(make_shared<string>(pin));
260 KineticStatus BlockingKineticConnection::SecureErase(
const shared_ptr<string> pin){
261 auto callback = make_shared<SimpleCallback>();
262 return RunOperation(callback, nonblocking_connection_->SecureErase(pin, callback));
264 KineticStatus BlockingKineticConnection::SecureErase(
const string& pin){
265 return this->SecureErase(make_shared<string>(pin));
268 KineticStatus BlockingKineticConnection::SetClusterVersion(int64_t new_cluster_version) {
269 auto callback = make_shared<SimpleCallback>();
270 return RunOperation(callback,
271 nonblocking_connection_->SetClusterVersion(new_cluster_version, callback));
276 explicit GetLogCallback(unique_ptr<DriveLog>& drive_log) : drive_log_(drive_log) {}
278 virtual void Success(unique_ptr<DriveLog> drive_log) {
280 drive_log_ = std::move(drive_log);
287 unique_ptr<DriveLog>& drive_log_;
290 KineticStatus BlockingKineticConnection::GetLog(unique_ptr<DriveLog>& drive_log) {
291 auto callback = make_shared<GetLogCallback>(drive_log);
292 return RunOperation(callback, nonblocking_connection_->GetLog(callback));
295 KineticStatus BlockingKineticConnection::GetLog(
const vector<Command_GetLog_Type>& types, unique_ptr<DriveLog>& drive_log) {
296 auto callback = make_shared<GetLogCallback>(drive_log);
297 return RunOperation(callback, nonblocking_connection_->GetLog(types, callback));
300 KineticStatus BlockingKineticConnection::UpdateFirmware(
const shared_ptr<const string>
302 auto callback = make_shared<SimpleCallback>();
303 return RunOperation(callback, nonblocking_connection_->UpdateFirmware(new_firmware, callback));
306 KineticStatus BlockingKineticConnection::SetACLs(
const shared_ptr<
const list<ACL>> acls) {
307 auto callback = make_shared<SimpleCallback>();
308 return RunOperation(callback, nonblocking_connection_->SetACLs(acls, callback));
311 KineticStatus BlockingKineticConnection::SetErasePIN(
const shared_ptr<const string> new_pin,
312 const shared_ptr<const string> current_pin){
313 auto callback = make_shared<SimpleCallback>();
314 return RunOperation(callback, nonblocking_connection_->SetErasePIN(new_pin, current_pin, callback));
316 KineticStatus BlockingKineticConnection::SetErasePIN(
const string& new_pin,
const string& current_pin){
317 return this->SetErasePIN(make_shared<string>(new_pin),make_shared<string>(current_pin));
320 KineticStatus BlockingKineticConnection::SetLockPIN(
const shared_ptr<const string> new_pin,
321 const shared_ptr<const string> current_pin){
322 auto callback = make_shared<SimpleCallback>();
323 return RunOperation(callback, nonblocking_connection_->SetLockPIN(new_pin, current_pin, callback));
325 KineticStatus BlockingKineticConnection::SetLockPIN(
const string& new_pin,
const string& current_pin){
326 return this->SetLockPIN(make_shared<string>(new_pin),make_shared<string>(current_pin));
329 KineticStatus BlockingKineticConnection::LockDevice(
const shared_ptr<string> pin){
330 auto callback = make_shared<SimpleCallback>();
331 return RunOperation(callback, nonblocking_connection_->LockDevice(pin, callback));
333 KineticStatus BlockingKineticConnection::LockDevice(
const string& pin){
334 return this->LockDevice(make_shared<string>(pin));
337 KineticStatus BlockingKineticConnection::UnlockDevice(
const shared_ptr<string> pin){
338 auto callback = make_shared<SimpleCallback>();
339 return RunOperation(callback, nonblocking_connection_->UnlockDevice(pin, callback));
341 KineticStatus BlockingKineticConnection::UnlockDevice(
const string& pin){
342 return this->UnlockDevice(make_shared<string>(pin));
345 KineticStatus BlockingKineticConnection::GetNext(
const shared_ptr<const string> key,
346 unique_ptr<string>& actual_key, unique_ptr<KineticRecord>& record) {
347 auto callback = make_shared<BlockingGetCallback>(actual_key, record,
true);
348 return RunOperation(callback, nonblocking_connection_->GetNext(key, callback));
351 KineticStatus BlockingKineticConnection::GetNext(
const string& key,
352 unique_ptr<string>& actual_key, unique_ptr<KineticRecord>& record) {
353 return this->GetNext(make_shared<string>(key), actual_key, record);
356 KineticStatus BlockingKineticConnection::GetPrevious(
const shared_ptr<const string> key,
357 unique_ptr<string>& actual_key, unique_ptr<KineticRecord>& record) {
358 auto callback = make_shared<BlockingGetCallback>(actual_key, record,
360 return RunOperation(callback, nonblocking_connection_->GetPrevious(key, callback));
363 KineticStatus BlockingKineticConnection::GetPrevious(
const string& key,
364 unique_ptr<string>& actual_key, unique_ptr<KineticRecord>& record) {
365 return this->GetPrevious(make_shared<string>(key), actual_key, record);
368 KineticStatus BlockingKineticConnection::GetVersion(
const shared_ptr<const string> key,
369 unique_ptr<string>& version) {
370 auto callback = make_shared<BlockingGetVersionCallback>(version);
371 return RunOperation(callback, nonblocking_connection_->GetVersion(key, callback));
374 KineticStatus BlockingKineticConnection::GetVersion(
const string& key,
375 unique_ptr<string>& version) {
376 return this->GetVersion(make_shared<string>(key), version);
379 KineticStatus BlockingKineticConnection::GetKeyRange(
const shared_ptr<const string> start_key,
380 bool start_key_inclusive,
381 const shared_ptr<const string> end_key,
382 bool end_key_inclusive,
383 bool reverse_results,
385 unique_ptr<vector<string>>& keys) {
386 auto callback = make_shared<BlockingGetKeyRangeCallback>(keys);
388 return RunOperation(callback,
389 nonblocking_connection_->GetKeyRange(start_key,
398 KineticStatus BlockingKineticConnection::GetKeyRange(
const string& start_key,
399 bool start_key_inclusive,
400 const string& end_key,
401 bool end_key_inclusive,
402 bool reverse_results,
404 unique_ptr<vector<string>>& keys) {
405 return this->GetKeyRange(make_shared<string>(start_key),
406 start_key_inclusive, make_shared<string>(end_key),
407 end_key_inclusive, reverse_results, max_results,
412 KeyRangeIterator BlockingKineticConnection::IterateKeyRange(
413 const shared_ptr<const string> start_key,
414 bool start_key_inclusive,
415 const shared_ptr<const string> end_key,
416 bool end_key_inclusive,
417 unsigned int frame_size) {
418 KeyRangeIterator it(
this,
427 KeyRangeIterator BlockingKineticConnection::IterateKeyRange(
const string& start_key,
428 bool start_key_inclusive,
429 const string& end_key,
430 bool end_key_inclusive,
431 unsigned int frame_size) {
432 return this->IterateKeyRange(make_shared<string>(start_key),
433 start_key_inclusive, make_shared<string>(end_key),
434 end_key_inclusive, frame_size);
437 KineticStatus BlockingKineticConnection::P2PPush(
const P2PPushRequest& push_request,
438 unique_ptr<vector<KineticStatus>>& operation_statuses) {
439 return this->P2PPush(make_shared<P2PPushRequest>(push_request), operation_statuses);
445 : statuses_(statuses) {}
446 virtual void Success(unique_ptr<vector<KineticStatus>> statuses,
const Command& response) {
448 statuses_ = std::move(statuses);
450 virtual void Failure(
KineticStatus error, Command
const *
const response) {
455 unique_ptr<vector<KineticStatus>>& statuses_;
459 const shared_ptr<const P2PPushRequest> push_request,
460 unique_ptr<vector<KineticStatus>>& operation_statuses) {
461 auto callback = make_shared<BlockingP2PPushCallback>(operation_statuses);
462 return RunOperation(callback,
463 nonblocking_connection_->P2PPush(push_request, callback));
466 KineticStatus BlockingKineticConnection::RunOperation(
467 shared_ptr<BlockingCallbackState> callback,
468 HandlerKey handler_key) {
469 fd_set read_fds, write_fds;
472 if (!nonblocking_connection_->Run(&read_fds, &write_fds, &nfds)) {
473 nonblocking_connection_->RemoveHandler(handler_key);
474 return KineticStatus(StatusCode::CLIENT_IO_ERROR,
"Connection failed");
477 while (!(callback->done_)) {
479 tv.tv_sec = network_timeout_seconds_;
482 int number_ready_fds = select(nfds, &read_fds, &write_fds, NULL, &tv);
483 if (number_ready_fds < 0) {
485 nonblocking_connection_->RemoveHandler(handler_key);
486 return KineticStatus(StatusCode::CLIENT_IO_ERROR, strerror(errno));
487 }
else if (number_ready_fds == 0) {
489 nonblocking_connection_->RemoveHandler(handler_key);
490 return KineticStatus(StatusCode::CLIENT_IO_ERROR,
"Network timeout");
495 if (!nonblocking_connection_->Run(&read_fds, &write_fds, &nfds)) {
496 nonblocking_connection_->RemoveHandler(handler_key);
497 return KineticStatus(StatusCode::CLIENT_IO_ERROR,
"Connection failed");
503 if (callback->success_) {
504 return KineticStatus(StatusCode::OK,
"");
506 return callback->error_;
void SetClientClusterVersion(int64_t cluster_version)
If the drive has a non-zero cluster version, requests will fail unless the developer tells the client...
Indicates whether a Kinetic operation (get, put, security, etc) put succeeded or failed. Unlike Status it provides details like whether the failure resulted from a version or an HMAC error.