Kinetic C/C++ Client
 All Classes Functions Variables Pages
nonblocking_string.cc
1 /*
2  * kinetic-cpp-client
3  * Copyright (C) 2014 Seagate Technology.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18  *
19  */
20 
21 #include "nonblocking_string.h"
22 
23 #include <errno.h>
24 #include <unistd.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <sys/stat.h>
28 
29 #include "glog/logging.h"
30 
31 namespace kinetic {
32 
33 using std::string;
34 using std::unique_ptr;
35 using std::move;
36 
37 NonblockingStringReader::NonblockingStringReader(shared_ptr<SocketWrapperInterface> socket_wrapper, size_t size, unique_ptr<const string> &s)
38  : socket_wrapper_(socket_wrapper), size_(size), s_(s), buf_(new char[size]), bytes_read_(0) {}
39 
40 NonblockingStringReader::~NonblockingStringReader() {
41  delete[] buf_;
42 }
43 
44 NonblockingStringStatus NonblockingStringReader::Read() {
45  while (bytes_read_ < size_) {
46  int status = 0;
47  if(socket_wrapper_->getSSL()){
48  status = SSL_read(socket_wrapper_->getSSL(), buf_ + bytes_read_, size_ - bytes_read_);
49  }
50  else status = read(socket_wrapper_->fd(), buf_ + bytes_read_, size_ - bytes_read_);
51  if (status == 0) {
52  // Unexpected EOF
53  return kFailed;
54  }
55  if (status < 0) {
56  if (errno == EINTR) {
57  continue;
58  }
59  if (errno == EAGAIN || errno == EWOULDBLOCK) {
60  return kInProgress;
61  }
62  // Encountered an irrecoverable error
63  return kFailed;
64  }
65  bytes_read_ += status;
66  }
67 
68  CHECK_EQ(bytes_read_, size_);
69  // it'd be nice if we could use make_unique
70  unique_ptr<const string> p(new string(buf_, size_));
71  s_ = move(p);
72  return kDone;
73 }
74 
75 NonblockingStringWriter::NonblockingStringWriter(shared_ptr<SocketWrapperInterface> socket_wrapper, const shared_ptr<const string> s)
76  : socket_wrapper_(socket_wrapper), s_(s), bytes_written_(0) {}
77 
78 NonblockingStringStatus NonblockingStringWriter::Write() {
79  while (bytes_written_ < s_->size()) {
80  int flags = 0;
81  // Prevent sending SIGPIPE signal on Linux. SIGPIPE is undesirable because the library
82  // client will crash if the remote server closes the connection.
83  #ifdef MSG_NOSIGNAL
84  flags |= MSG_NOSIGNAL;
85  #endif
86 
87  // We need to use send() for sockets but tests use pipes so we can't assume sockets.
88  // To deal with this annoyance detect whether the FD is a socket and use the write
89  // send/write
90  struct stat statbuf;
91  if (fstat(socket_wrapper_->fd(), &statbuf)) {
92  PLOG(ERROR) << "Unable to fstat socket";
93  return kFailed;
94  }
95  int status;
96  if (S_ISSOCK(statbuf.st_mode)) {
97  if(socket_wrapper_->getSSL())
98  status = SSL_write(socket_wrapper_->getSSL(), s_->data() + bytes_written_, s_->size() - bytes_written_);
99  else
100  status = send(
101  socket_wrapper_->fd(),
102  s_->data() + bytes_written_,
103  s_->size() - bytes_written_,
104  flags);
105  } else {
106  if(socket_wrapper_->getSSL())
107  status = SSL_write(socket_wrapper_->getSSL(), s_->data() + bytes_written_, s_->size() - bytes_written_);
108  else
109  status = write(socket_wrapper_->fd(), s_->data() + bytes_written_, s_->size() - bytes_written_);
110  }
111  if (status == 0) {
112  return kFailed;
113  }
114  if (status < 0) {
115  if (errno == EINTR) {
116  continue;
117  }
118  if (errno == EAGAIN || errno == EWOULDBLOCK) {
119  return kInProgress;
120  }
121  return kFailed;
122  }
123  bytes_written_ += status;
124  }
125 
126  CHECK_EQ(bytes_written_, s_->size());
127  return kDone;
128 }
129 
130 } // namespace kinetic