21 #include "nonblocking_packet.h"
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/tcp.h>
25 #include <arpa/inet.h>
29 #include "glog/logging.h"
33 using std::make_shared;
36 NonblockingPacketWriter::NonblockingPacketWriter(shared_ptr<SocketWrapperInterface> socket_wrapper, unique_ptr<const Message> message,
37 const shared_ptr<const string> value)
38 : socket_wrapper_(socket_wrapper), message_(move(message)), value_(value), state_(kMagic),
39 writer_(new NonblockingStringWriter(socket_wrapper_, make_shared<string>(
"F"))) {}
41 NonblockingPacketWriter::~NonblockingPacketWriter() {
42 if (writer_ != NULL) {
47 NonblockingStringStatus NonblockingPacketWriter::Write() {
49 if (fstat(socket_wrapper_->fd(), &statbuf)) {
50 PLOG(ERROR) <<
"Unable to fstat socket";
54 if (S_ISSOCK(statbuf.st_mode)) {
56 setsockopt(socket_wrapper_->fd(), IPPROTO_TCP, TCP_CORK, &optval,
sizeof(optval));
61 NonblockingStringStatus status = writer_->Write();
62 if (status != kDone) {
69 if (!TransitionFromMagic()) {
74 TransitionFromMessageLength();
77 TransitionFromValueLength();
80 TransitionFromMessage();
83 TransitionFromValue();
86 if (fstat(socket_wrapper_->fd(), &statbuf)) {
87 PLOG(ERROR) <<
"Unable to fstat socket";
90 if (S_ISSOCK(statbuf.st_mode)) {
93 setsockopt(socket_wrapper_->fd(), IPPROTO_TCP, TCP_CORK, &optval,
sizeof(optval));
96 setsockopt(socket_wrapper_->fd(), IPPROTO_TCP, TCP_NODELAY, &optval,
sizeof(optval));
105 bool NonblockingPacketWriter::TransitionFromMagic() {
107 if (!message_->SerializeToString(&serialized_message_)) {
111 uint32_t size = htonl(serialized_message_.size());
113 std::string encoded_size(reinterpret_cast<char *>(&size),
sizeof(size));
114 writer_ =
new NonblockingStringWriter(socket_wrapper_, make_shared<string>(encoded_size));
115 state_ = kMessageLength;
119 void NonblockingPacketWriter::TransitionFromMessageLength() {
121 uint32_t size = htonl(value_->size());
123 std::string encoded_size(reinterpret_cast<char *>(&size),
sizeof(size));
124 writer_ =
new NonblockingStringWriter(socket_wrapper_, make_shared<string>(encoded_size));
125 state_ = kValueLength;
128 void NonblockingPacketWriter::TransitionFromValueLength() {
131 writer_ =
new NonblockingStringWriter(socket_wrapper_, make_shared<string>(serialized_message_));
135 void NonblockingPacketWriter::TransitionFromMessage() {
138 writer_ =
new NonblockingStringWriter(socket_wrapper_, value_);
142 void NonblockingPacketWriter::TransitionFromValue() {
147 NonblockingPacketReader::NonblockingPacketReader(shared_ptr<SocketWrapperInterface> socket_wrapper, Message* response,
148 unique_ptr<const string> &value)
149 : socket_wrapper_(socket_wrapper), response_(response), state_(kMagic), value_(value), magic_(),
150 reader_(new NonblockingStringReader(socket_wrapper_, 1, magic_)) {
153 NonblockingPacketReader::~NonblockingPacketReader() {
154 if (reader_ != NULL) {
159 NonblockingStringStatus NonblockingPacketReader::Read() {
161 NonblockingStringStatus status = reader_->Read();
162 if (status != kDone) {
169 if (!TransitionFromMagic()) {
174 TransitionFromMessageLength();
177 TransitionFromValueLength();
180 TransitionFromMessage();
183 if (!TransitionFromValue()) {
196 bool NonblockingPacketReader::TransitionFromMagic() {
198 if (*magic_ !=
"F") {
202 reader_ =
new NonblockingStringReader(socket_wrapper_, 4, message_length_);
203 state_ = kMessageLength;
207 void NonblockingPacketReader::TransitionFromMessageLength() {
210 reader_ =
new NonblockingStringReader(socket_wrapper_, 4, value_length_);
211 state_ = kValueLength;
214 void NonblockingPacketReader::TransitionFromValueLength() {
217 CHECK_EQ(4u, message_length_->size());
218 uint32_t length = ntohl(*reinterpret_cast<const uint32_t *>(message_length_->data()));
219 reader_ =
new NonblockingStringReader(socket_wrapper_, length, message_);
223 void NonblockingPacketReader::TransitionFromMessage() {
226 CHECK_EQ(4u, value_length_->size());
227 uint32_t length = ntohl(*reinterpret_cast<const uint32_t *>(value_length_->data()));
228 reader_ =
new NonblockingStringReader(socket_wrapper_, length, value_);
232 bool NonblockingPacketReader::TransitionFromValue() {
235 return response_->ParseFromString(*message_);