kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
send_helper.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include "send_helper.h"
21 #include "send_internal.h"
22 
23 #include "listener.h"
24 #include "send.h"
25 #include "syscall.h"
26 #include "util.h"
27 
28 #include <assert.h>
29 
30 static ssize_t write_plain(struct bus *b, boxed_msg *box);
31 static ssize_t write_ssl(struct bus *b, boxed_msg *box, SSL *ssl);
33 
34 #ifdef TEST
35 struct timeval done;
36 uint16_t backpressure = 0;
37 #endif
38 
40  SSL *ssl = box->ssl;
41  ssize_t wrsz = 0;
42 
43  /* Attempt a single write to the socket. */
44  if (ssl == BUS_NO_SSL) {
45  wrsz = write_plain(b, box);
46  } else {
47  assert(ssl);
48  wrsz = write_ssl(b, box, ssl);
49  }
50  BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64,
51  "wrote %zd", wrsz);
52 
53  if (wrsz == -1) {
55  return SHHW_ERROR;
56  } else if (wrsz == 0) {
57  /* If the OS set POLLOUT but we can't actually write, then
58  * just go back to the poll() loop with no progress.
59  * If we busywait on this, something is deeply wrong. */
60  BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 128,
61  "suspicious: wrote %zd bytes to <fd:%d, seq_id%lld>",
62  wrsz, box->fd, (long long)box->out_seq_id);
63  } else {
64  /* Update amount written so far */
65  box->out_sent_size += wrsz;
66  }
67 
68  size_t msg_size = box->out_msg_size;
69  size_t sent_size = box->out_sent_size;
70  size_t rem = msg_size - sent_size;
71 
72  BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64,
73  "wrote %zd, rem is %zd", wrsz, rem);
74 
75  if (rem == 0) { /* check if whole message is written */
76  #ifndef TEST
77  struct timeval done;
78  #endif
79  if (Util_Timestamp(&done, true)) {
80  box->tv_send_done = done;
81  } else {
83  return SHHW_ERROR;
84  }
85 
87  return SHHW_DONE;
88  } else {
90  return SHHW_ERROR;
91  }
92  } else {
93  return SHHW_OK;
94  }
95 }
96 
97 static ssize_t write_plain(struct bus *b, boxed_msg *box) {
98  int fd = box->fd;
99  uint8_t *msg = box->out_msg;
100  size_t msg_size = box->out_msg_size;
101  size_t sent_size = box->out_sent_size;
102  size_t rem = msg_size - sent_size;
103 
104  BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64,
105  "write %p to %d, %zd bytes",
106  (void*)&msg[sent_size], fd, rem);
107 
108  /* Attempt a single write. ('for' is due to continue-based retry.) */
109  for (;;) {
110  ssize_t wrsz = syscall_write(fd, &msg[sent_size], rem);
111  if (wrsz == -1) {
112  if (Util_IsResumableIOError(errno)) {
113  errno = 0;
114  continue;
115  } else {
116  /* will notify about closed socket upstream */
117  BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64,
118  "write: socket error writing, %s", strerror(errno));
119  errno = 0;
120  return -1;
121  }
122  } else if (wrsz > 0) {
123  BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64,
124  "sent: %zd", wrsz);
125  return wrsz;
126  } else {
127  return 0;
128  }
129  }
130 }
131 
132 static ssize_t write_ssl(struct bus *b, boxed_msg *box, SSL *ssl) {
133  uint8_t *msg = box->out_msg;
134  size_t msg_size = box->out_msg_size;
135  ssize_t rem = msg_size - box->out_sent_size;
136  int fd = box->fd;
137  (void)fd;
138  ssize_t written = 0;
139  assert(rem >= 0);
140 
141  while (rem > 0) {
142  ssize_t wrsz = syscall_SSL_write(ssl, &msg[box->out_sent_size], rem);
143  BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64,
144  "SSL_write: socket %d, write %zd => wrsz %zd",
145  fd, rem, wrsz);
146  if (wrsz > 0) {
147  written += wrsz;
148  return written;
149  } else if (wrsz < 0) {
150  int reason = syscall_SSL_get_error(ssl, wrsz);
151  switch (reason) {
152  case SSL_ERROR_WANT_WRITE:
153  /* We need to write more, but currently cannot write.
154  * Log and go back to the poll() loop. */
155  BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64,
156  "SSL_write: socket %d: WANT_WRITE", fd);
157  return 0;
158 
159  case SSL_ERROR_WANT_READ:
160  BUS_LOG_SNPRINTF(b, 0, LOG_SENDER, b->udata, 64,
161  "SSL_write: socket %d: WANT_READ", fd);
162  assert(false); // shouldn't get this; we're writing.
163  break;
164 
165  case SSL_ERROR_SYSCALL:
166  {
167  if (Util_IsResumableIOError(errno)) {
168  errno = 0;
169  /* don't break; we want to retry on EINTR etc. until
170  * we get WANT_WRITE, otherwise poll(2) may not retry
171  * the socket for too long */
172  continue;
173  } else {
174  /* will notify about socket error upstream */
175  BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64,
176  "SSL_write on fd %d: SSL_ERROR_SYSCALL -- %s",
177  fd, strerror(errno));
178  errno = 0;
179  return -1;
180  }
181  }
182  case SSL_ERROR_SSL:
183  BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64,
184  "SSL_write: socket %d: error SSL (wrsz %zd)",
185  box->fd, wrsz);
186 
187  /* Log error messages from OpenSSL */
188  for (;;) {
189  unsigned long e = ERR_get_error();
190  if (e == 0) { break; } // no more errors
191  BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 128,
192  "SSL_write error: %s",
193  ERR_error_string(e, NULL));
194  }
195  return -1;
196  default:
197  {
198  BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64,
199  "SSL_write on socket %d: match fail: error %d", box->fd, reason);
200  return -1;
201  }
202  }
203  } else {
204  /* SSL_write should give SSL_ERROR_WANT_WRITE when unable
205  * to write further. */
206  }
207  }
208  BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64,
209  "SSL_write: leaving loop, %zd bytes written", written);
210  return written;
211 }
212 
213 /* Notify the listener that the client has finished sending a message, and
214  * transfer all details for handling the response to it. Blocking. */
216  /* Notify listener that it should expect a response to a
217  * successfully sent message. */
218  BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 128,
219  "telling listener to EXPECT sent response, with box %p, seq_id %lld",
220  (void *)box, (long long)box->out_seq_id);
221 
222  if (box->result.status == BUS_SEND_UNDEFINED) {
224  }
225 
226  struct listener *l = Bus_GetListenerForSocket(b, box->fd);
227 
228  for (int retries = 0; retries < SEND_NOTIFY_LISTENER_RETRIES; retries++) {
229  #ifndef TEST
230  uint16_t backpressure = 0;
231  #endif
232  /* If this succeeds, then this thread cannot touch the box anymore. */
233  if (Listener_ExpectResponse(l, box, &backpressure)) {
234  Bus_BackpressureDelay(b, backpressure,
236  return true;
237  } else {
238  BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64,
239  "enqueue_request_sent: failed delivery %d", retries);
241  }
242  }
243 
244  /* Timeout, will be treated as a TX error */
245  return false;
246 }
int syscall_SSL_get_error(const SSL *ssl, int ret)
Definition: syscall.c:52
bool Util_IsResumableIOError(int errno_)
Definition: util.c:26
bus_msg_result_t result
Result message, constructed in place after the request/response cycle has completed or failed due to ...
Receiver of responses.
SendHelper_HandleWrite_res
Definition: send_helper.h:26
#define LISTENER_EXPECT_BACKPRESSURE_SHIFT
How many bits to >> the backpressure value from the listener when a send has completed.
Definition: listener.h:32
size_t out_sent_size
Message bus.
void * udata
User data for callbacks.
uint8_t * out_msg
int64_t out_seq_id
bus_send_status_t status
Definition: bus_types.h:216
#define SEND_NOTIFY_LISTENER_RETRY_DELAY
Definition: send_internal.h:26
SSL * ssl
valid pointer or BUS_BOXED_MSG_NO_SSL
void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift)
Provide backpressure by sleeping for (backpressure >> shift) msec, if the value is greater than 0...
Definition: bus.c:551
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
Definition: bus_types.h:59
#define SEND_NOTIFY_LISTENER_RETRIES
Definition: send_internal.h:25
static bool enqueue_EXPECT_message_to_listener(bus *b, boxed_msg *box)
Definition: send_helper.c:215
static ssize_t write_plain(struct bus *b, boxed_msg *box)
Definition: send_helper.c:97
struct timeval tv_send_done
#define BUS_NO_SSL
Special "NO SSL" value, to distinguish from a NULL SSL handle.
size_t out_msg_size
static ssize_t write_ssl(struct bus *b, boxed_msg *box, SSL *ssl)
Definition: send_helper.c:132
SendHelper_HandleWrite_res SendHelper_HandleWrite(bus *b, boxed_msg *box)
Definition: send_helper.c:39
void Send_HandleFailure(struct bus *b, boxed_msg *box, bus_send_status_t status)
Definition: send.c:208
int syscall_SSL_write(SSL *ssl, const void *buf, int num)
Wrappers for OpenSSL calls.
Definition: syscall.c:44
bool Listener_ExpectResponse(struct listener *l, boxed_msg *box, uint16_t *backpressure)
The client has finished a write, the listener should expect a response.
Definition: listener.c:143
ssize_t syscall_write(int fildes, const void *buf, size_t nbyte)
Definition: syscall.c:35
bool Util_Timestamp(struct timeval *tv, bool relative)
Definition: util.c:30
struct listener * Bus_GetListenerForSocket(struct bus *b, int fd)
For a given file descriptor, get the listener ID to use.
Definition: bus.c:330
int fd
Destination filename and message body.
int syscall_poll(struct pollfd fds[], nfds_t nfds, int timeout)
Wrappers for syscalls, to allow mocking for testing.
Definition: syscall.c:27