Kinetic C/C++ Client
 All Classes Functions Variables Pages
nonblocking_packet.cc
1 /*
2  * kinetic-cpp-client
3  * Copyright (C) 2014 Seagate Technology.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18  *
19  */
20 
21 #include "nonblocking_packet.h"
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/tcp.h>
25 #include <arpa/inet.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
28 
29 #include "glog/logging.h"
30 
31 namespace kinetic {
32 
33 using std::make_shared;
34 using std::string;
35 
36 NonblockingPacketWriter::NonblockingPacketWriter(shared_ptr<SocketWrapperInterface> socket_wrapper, unique_ptr<const Message> message,
37  const shared_ptr<const string> value)
38  : socket_wrapper_(socket_wrapper), message_(move(message)), value_(value), state_(kMagic),
39  writer_(new NonblockingStringWriter(socket_wrapper_, make_shared<string>("F"))) {}
40 
41 NonblockingPacketWriter::~NonblockingPacketWriter() {
42  if (writer_ != NULL) {
43  delete writer_;
44  }
45 }
46 
47 NonblockingStringStatus NonblockingPacketWriter::Write() {
48  struct stat statbuf;
49  if (fstat(socket_wrapper_->fd(), &statbuf)) {
50  PLOG(ERROR) << "Unable to fstat socket";
51  return kFailed;
52  }
53 #ifndef __APPLE__
54  if (S_ISSOCK(statbuf.st_mode)) {
55  int optval = 1;
56  setsockopt(socket_wrapper_->fd(), IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
57  }
58 #endif
59 
60  while (true) {
61  NonblockingStringStatus status = writer_->Write();
62  if (status != kDone) {
63  return status;
64  }
65 
66  // Transition to the next state
67  switch (state_) {
68  case kMagic:
69  if (!TransitionFromMagic()) {
70  return kFailed;
71  }
72  break;
73  case kMessageLength:
74  TransitionFromMessageLength();
75  break;
76  case kValueLength:
77  TransitionFromValueLength();
78  break;
79  case kMessage:
80  TransitionFromMessage();
81  break;
82  case kValue:
83  TransitionFromValue();
84  break;
85  case kFinished:
86  if (fstat(socket_wrapper_->fd(), &statbuf)) {
87  PLOG(ERROR) << "Unable to fstat socket";
88  return kFailed;
89  }
90  if (S_ISSOCK(statbuf.st_mode)) {
91  int optval = 0;
92 #ifndef __APPLE__
93  setsockopt(socket_wrapper_->fd(), IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
94 #endif
95  optval = 1;
96  setsockopt(socket_wrapper_->fd(), IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval));
97  }
98  return kDone;
99  default:
100  CHECK(false);
101  }
102  }
103 }
104 
105 bool NonblockingPacketWriter::TransitionFromMagic() {
106  // Move on to the writing the message length
107  if (!message_->SerializeToString(&serialized_message_)) {
108  // Serialization can fail if the message is missing required fields
109  return false;
110  }
111  uint32_t size = htonl(serialized_message_.size());
112  delete writer_;
113  std::string encoded_size(reinterpret_cast<char *>(&size), sizeof(size));
114  writer_ = new NonblockingStringWriter(socket_wrapper_, make_shared<string>(encoded_size));
115  state_ = kMessageLength;
116  return true;
117 }
118 
119 void NonblockingPacketWriter::TransitionFromMessageLength() {
120  // Move on to writing the value length
121  uint32_t size = htonl(value_->size());
122  delete writer_;
123  std::string encoded_size(reinterpret_cast<char *>(&size), sizeof(size));
124  writer_ = new NonblockingStringWriter(socket_wrapper_, make_shared<string>(encoded_size));
125  state_ = kValueLength;
126 }
127 
128 void NonblockingPacketWriter::TransitionFromValueLength() {
129  // Move on to writing the serialized message
130  delete writer_;
131  writer_ = new NonblockingStringWriter(socket_wrapper_, make_shared<string>(serialized_message_));
132  state_ = kMessage;
133 }
134 
135 void NonblockingPacketWriter::TransitionFromMessage() {
136  // Move on to writing the value
137  delete writer_;
138  writer_ = new NonblockingStringWriter(socket_wrapper_, value_);
139  state_ = kValue;
140 }
141 
142 void NonblockingPacketWriter::TransitionFromValue() {
143  // We're done!
144  state_ = kFinished;
145 }
146 
147 NonblockingPacketReader::NonblockingPacketReader(shared_ptr<SocketWrapperInterface> socket_wrapper, Message* response,
148  unique_ptr<const string> &value)
149  : socket_wrapper_(socket_wrapper), response_(response), state_(kMagic), value_(value), magic_(),
150  reader_(new NonblockingStringReader(socket_wrapper_, 1, magic_)) {
151 }
152 
153 NonblockingPacketReader::~NonblockingPacketReader() {
154  if (reader_ != NULL) {
155  delete reader_;
156  }
157 }
158 
159 NonblockingStringStatus NonblockingPacketReader::Read() {
160  while (true) {
161  NonblockingStringStatus status = reader_->Read();
162  if (status != kDone) {
163  return status;
164  }
165 
166  // Transition to the next state
167  switch (state_) {
168  case kMagic:
169  if (!TransitionFromMagic()) {
170  return kFailed;
171  }
172  break;
173  case kMessageLength:
174  TransitionFromMessageLength();
175  break;
176  case kValueLength:
177  TransitionFromValueLength();
178  break;
179  case kMessage:
180  TransitionFromMessage();
181  break;
182  case kValue:
183  if (!TransitionFromValue()) {
184  return kFailed;
185  }
186  break;
187  case kFinished:
188  return kDone;
189  break;
190  default:
191  CHECK(false);
192  }
193  }
194 }
195 
196 bool NonblockingPacketReader::TransitionFromMagic() {
197  // Check the magic byte and move on to reading the message length
198  if (*magic_ != "F") {
199  return false;
200  }
201  delete reader_;
202  reader_ = new NonblockingStringReader(socket_wrapper_, 4, message_length_);
203  state_ = kMessageLength;
204  return true;
205 }
206 
207 void NonblockingPacketReader::TransitionFromMessageLength() {
208  // Move on to reading the value length
209  delete reader_;
210  reader_ = new NonblockingStringReader(socket_wrapper_, 4, value_length_);
211  state_ = kValueLength;
212 }
213 
214 void NonblockingPacketReader::TransitionFromValueLength() {
215  // Move on to reading the message
216  delete reader_;
217  CHECK_EQ(4u, message_length_->size());
218  uint32_t length = ntohl(*reinterpret_cast<const uint32_t *>(message_length_->data()));
219  reader_ = new NonblockingStringReader(socket_wrapper_, length, message_);
220  state_ = kMessage;
221 }
222 
223 void NonblockingPacketReader::TransitionFromMessage() {
224  // Move on to reading the value
225  delete reader_;
226  CHECK_EQ(4u, value_length_->size());
227  uint32_t length = ntohl(*reinterpret_cast<const uint32_t *>(value_length_->data()));
228  reader_ = new NonblockingStringReader(socket_wrapper_, length, value_);
229  state_ = kValue;
230 }
231 
232 bool NonblockingPacketReader::TransitionFromValue() {
233  // We're done!
234  state_ = kFinished;
235  return response_->ParseFromString(*message_);
236 }
237 
238 } // namespace kinetic