21 #include "nonblocking_packet_receiver.h"
27 using com::seagate::kinetic::client::proto::Message_AuthType_UNSOLICITEDSTATUS;
28 using com::seagate::kinetic::client::proto::Command_Status_StatusCode_SUCCESS;
30 using std::shared_ptr;
31 using std::unique_ptr;
37 KineticStatus GetKineticStatus(StatusCode code, int64_t expected_cluster_version) {
39 case StatusCode::CLIENT_IO_ERROR:
40 return KineticStatus(code,
"IO error");
41 case StatusCode::CLIENT_SHUTDOWN:
42 return KineticStatus(code,
"Client shutdown");
43 case StatusCode::PROTOCOL_ERROR_RESPONSE_NO_ACKSEQUENCE:
44 return KineticStatus(code,
"Response did not contain ack sequence");
45 case StatusCode::CLIENT_RESPONSE_HMAC_VERIFICATION_ERROR:
46 return KineticStatus(code,
"Response HMAC verification failed");
47 case StatusCode::REMOTE_HMAC_ERROR:
48 return KineticStatus(code,
"Remote HMAC verification failed");
49 case StatusCode::REMOTE_NOT_AUTHORIZED:
50 return KineticStatus(code,
"Not authorized");
51 case StatusCode::REMOTE_CLUSTER_VERSION_MISMATCH:
52 return KineticStatus(code,
"Cluster version mismatch", expected_cluster_version);
53 case StatusCode::REMOTE_INTERNAL_ERROR:
54 return KineticStatus(code,
"Remote internal error");
55 case StatusCode::REMOTE_HEADER_REQUIRED:
56 return KineticStatus(code,
"Request requires a header to be set");
57 case StatusCode::REMOTE_NOT_FOUND:
58 return KineticStatus(code,
"Key not found");
59 case StatusCode::REMOTE_VERSION_MISMATCH:
60 return KineticStatus(code,
"Version mismatch");
61 case StatusCode::REMOTE_SERVICE_BUSY:
62 return KineticStatus(code,
"Remote service is busy");
63 case StatusCode::REMOTE_EXPIRED:
64 return KineticStatus(code,
"Remote timeout");
65 case StatusCode::REMOTE_DATA_ERROR:
66 return KineticStatus(code,
"Remote transient data error");
67 case StatusCode::REMOTE_PERM_DATA_ERROR:
68 return KineticStatus(code,
"Remote permanent data error");
69 case StatusCode::REMOTE_REMOTE_CONNECTION_ERROR:
70 return KineticStatus(code,
"Remote connection to peer failed");
71 case StatusCode::REMOTE_NO_SPACE:
72 return KineticStatus(code,
"No space left");
73 case StatusCode::REMOTE_NO_SUCH_HMAC_ALGORITHM:
74 return KineticStatus(code,
"Unknown HMAC algorithm");
75 case StatusCode::REMOTE_NESTED_OPERATION_ERRORS:
76 return KineticStatus(code,
"Operation completed but has nested errors");
78 return KineticStatus(code,
"Internal Error");
87 void Handle(
const Command &response, unique_ptr<const string> value){
88 done = success =
true;
90 void Error(
KineticStatus error, Command
const *
const response){
95 NonblockingReceiver::NonblockingReceiver(shared_ptr<SocketWrapperInterface> socket_wrapper,
97 : socket_wrapper_(socket_wrapper), hmac_provider_(hmac_provider),
98 connection_options_(connection_options), nonblocking_response_(NULL),
99 connection_id_(0), handler_(NULL) {
101 shared_ptr<HandshakeHandler> hh = std::make_shared<HandshakeHandler>();
102 map_.insert(make_pair(-1,make_pair(hh,-1)));
103 handler_to_message_seq_map_.insert(make_pair(-1, -1));
105 auto start = std::chrono::steady_clock::now();
108 if(Receive() == kError)
112 auto now = std::chrono::steady_clock::now();
113 if(std::chrono::duration_cast<std::chrono::seconds>(now-start).count() > 30)
117 throw std::runtime_error(
"Could not complete handshake.");
121 NonblockingReceiver::~NonblockingReceiver() {
122 if (nonblocking_response_ != NULL) {
123 delete nonblocking_response_;
125 CallAllErrorHandlers(KineticStatus(StatusCode::CLIENT_SHUTDOWN,
"Receiver shutdown"));
128 bool NonblockingReceiver::Enqueue(shared_ptr<HandlerInterface> handler, google::int64 sequence,
129 HandlerKey handler_key) {
130 auto seq_to_handler_res = map_.insert(make_pair(sequence, make_pair(handler, handler_key)));
131 if (!seq_to_handler_res.second) {
132 LOG(WARNING) <<
"Found existing handler for sequence " << sequence;
135 auto handler_key_to_seq_res =
136 handler_to_message_seq_map_.insert(make_pair(handler_key, sequence));
137 if (!handler_key_to_seq_res.second) {
138 LOG(WARNING) <<
"Found existing sequence " << sequence <<
" for handler_key "
140 auto handler_map_entry = map_.find(sequence);
141 CHECK(handler_map_entry != map_.end())
142 <<
"Couldn't find just-inserted handler map entry for sequence "
144 map_.erase(handler_map_entry);
151 NonblockingPacketServiceStatus NonblockingReceiver::Receive() {
153 if (nonblocking_response_ == NULL) {
159 nonblocking_response_ =
new NonblockingPacketReader(
160 socket_wrapper_, &message_, value_);
163 NonblockingStringStatus status = nonblocking_response_->Read();
164 if (status != kDone) {
165 if (status == kInProgress) {
168 CallAllErrorHandlers(KineticStatus(StatusCode::CLIENT_IO_ERROR,
"I/O read error"));
173 delete nonblocking_response_;
174 nonblocking_response_ = NULL;
176 if(message_.has_hmacauth())
177 if (!hmac_provider_.ValidateHmac(message_, connection_options_.
hmac_key)) {
178 LOG(INFO) <<
"Response HMAC mismatch";
179 CallAllErrorHandlers(KineticStatus(StatusCode::CLIENT_RESPONSE_HMAC_VERIFICATION_ERROR,
180 "Response HMAC mismatch"));
183 if(!command_.ParseFromString(message_.commandbytes())){
184 CallAllErrorHandlers(KineticStatus(StatusCode::CLIENT_IO_ERROR,
"I/O read error parsing proto::Command"));
187 if (command_.header().has_connectionid()) {
188 connection_id_ = command_.header().connectionid();
191 if(message_.authtype() == Message_AuthType_UNSOLICITEDSTATUS)
192 command_.mutable_header()->set_acksequence(-1);
194 if (!command_.header().has_acksequence()) {
195 LOG(INFO) <<
"Got response without an acksequence";
196 CallAllErrorHandlers(KineticStatus(StatusCode::PROTOCOL_ERROR_RESPONSE_NO_ACKSEQUENCE,
197 "Response had no acksequence"));
201 auto find_result = map_.find(command_.header().acksequence());
202 if (find_result == map_.end()) {
203 LOG(WARNING) <<
"Couldn't find a handler for acksequence " <<
204 command_.header().acksequence();
207 auto handler_pair = find_result->second;
208 handler_ = handler_pair.first;
209 map_.erase(find_result);
211 CHECK_EQ((
size_t) 1, handler_to_message_seq_map_.erase(handler_pair.second))
212 <<
"Couldn't delete handler key to sequence entry for handler_key "
213 << handler_pair.second;
215 if (command_.status().code() == Command_Status_StatusCode_SUCCESS) {
216 handler_->Handle(command_, move(value_));
218 handler_->Error(GetKineticStatus(ConvertFromProtoStatus(
219 command_.status().code()), command_.header().clusterversion()),
227 int64_t NonblockingReceiver::connection_id() {
228 return connection_id_;
231 void NonblockingReceiver::CallAllErrorHandlers(KineticStatus error) {
233 handler_->Error(error,
nullptr);
237 auto iter = map_.begin();
238 while (iter != map_.end()) {
239 auto handler_pair = iter->second;
240 shared_ptr<HandlerInterface> handler = handler_pair.first;
241 HandlerKey handler_key = handler_pair.second;
243 CHECK_EQ((
size_t) 1, handler_to_message_seq_map_.erase(handler_key))
244 <<
"Couldn't delete handler to sequence entry for handler_key " << handler_key;
246 handler->Error(error,
nullptr);
253 bool NonblockingReceiver::Remove(HandlerKey key) {
254 auto handler_key_to_seq = handler_to_message_seq_map_.find(key);
255 if (handler_key_to_seq == handler_to_message_seq_map_.end()) {
259 google::protobuf::int64 seq = handler_key_to_seq->second;
261 handler_to_message_seq_map_.erase(handler_key_to_seq);
263 auto seq_to_handler = map_.find(seq);
264 CHECK(seq_to_handler != map_.end()) <<
"Handler key " << handler_key_to_seq->first
265 <<
" mapped to seq " << seq <<
" but no handler entry for that seq";
267 auto handler_pair = seq_to_handler->second;
268 auto handler_key = handler_pair.second;
269 CHECK_EQ(handler_key, key);
270 map_.erase(seq_to_handler);
Wrapper class that handles computing HMACs. The supplied implementation uses openssl, but users can supply an alternate implementation that uses a different library (e. g. one providing specialized HW accelaration)
std::string hmac_key
The HMAC key of the user specified in user_id.
Indicates whether a Kinetic operation (get, put, security, etc) put succeeded or failed. Unlike Status it provides details like whether the failure resulted from a version or an HMAC error.
Use this struct to pass all connection options to the KineticConnectionFactory.