kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
listener_task.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include "listener_task.h"
21 #include "listener_task_internal.h"
22 #include "util.h"
23 #include "syscall.h"
24 
25 #include <assert.h>
26 #include "listener_cmd.h"
27 #include "listener_io.h"
28 #include "atomic.h"
29 
30 #ifdef TEST
31 struct timeval now;
32 struct timeval cur;
33 size_t backpressure = 0;
34 int poll_res = 0;
35 #define WHILE if
36 #else
37 #define WHILE while
38 #endif
39 
40 static void tick_handler(listener *l);
41 static void clean_up_completed_info(listener *l, rx_info_t *info);
42 static void retry_delivery(listener *l, rx_info_t *info);
43 static void observe_backpressure(listener *l, size_t backpressure);
44 
45 void *ListenerTask_MainLoop(void *arg) {
46  listener *self = (listener *)arg;
47  assert(self);
48  struct bus *b = self->bus;
49 
50  #ifndef TEST
51  struct timeval now;
52  #endif
53  time_t last_sec = (time_t)-1; // always trigger first time
54 
55  /* The listener thread has full control over its execution -- the
56  * only thing other threads can do is reserve messages from l->msgs,
57  * write commands into them, and then commit them by writing their
58  * msg->id into the incoming command ID pipe. All cross-thread
59  * communication is managed at the command interface, so it doesn't
60  * need any internal locking. */
61 
62  WHILE (self->shutdown_notify_fd == LISTENER_NO_FD) {
63  if (!Util_Timestamp(&now, true)) {
64  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
65  "timestamp failure: %d", errno);
66  }
67  time_t cur_sec = now.tv_sec;
68  if (cur_sec != last_sec) {
69  tick_handler(self);
70  last_sec = cur_sec;
71  }
72 
73  int delay = (self->is_idle ? INFINITE_DELAY : LISTENER_TASK_TIMEOUT_DELAY);
74  #ifndef TEST
75  int poll_res = 0;
76  #endif
77 
78  int to_poll = self->tracked_fds - self->inactive_fds + INCOMING_MSG_PIPE;
79 
80  poll_res = syscall_poll(self->fds, to_poll, delay);
81  BUS_LOG_SNPRINTF(b, (poll_res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64,
82  "poll res %d", poll_res);
83 
84  if (poll_res < 0) {
85  if (Util_IsResumableIOError(errno)) {
86  errno = 0;
87  } else {
88  /* unrecoverable poll error -- FD count is bad
89  * or FDS is a bad pointer. */
90  BUS_ASSERT(b, b->udata, false);
91  }
92  } else if (poll_res > 0) {
93  ListenerCmd_CheckIncomingMessages(self, &poll_res);
94  if (poll_res > 0) {
95  ListenerIO_AttemptRecv(self, poll_res);
96  }
97  } else {
98  /* nothing to do */
99  }
100  }
101 
102  /* (This will always be true, except when testing.) */
103  if (self->shutdown_notify_fd != LISTENER_NO_FD) {
104  BUS_LOG(b, 3, LOG_LISTENER, "shutting down", b->udata);
105 
106  if (self->tracked_fds > 0) {
107  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
108  "%d connections still open!", self->tracked_fds);
109  }
110 
111  ListenerCmd_NotifyCaller(self, self->shutdown_notify_fd);
112  self->shutdown_notify_fd = LISTENER_SHUTDOWN_COMPLETE_FD;
113  }
114 
115  BUS_LOG(b, 3, LOG_LISTENER, "shutting down...", b->udata);
116  return NULL;
117 }
118 
119 static void tick_handler(listener *l) {
120  struct bus *b = l->bus;
121  bool any_work = false;
122 
123  BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 128,
124  "tick... %p: %d of %d msgs in use, %d of %d rx_info in use, %d tracked_fds",
125  (void*)l, l->msgs_in_use, MAX_QUEUE_MESSAGES,
127 
128  if (b->log_level > 5) { /* dump msg table types */
129  for (int i = 0; i < l->msgs_in_use; i++) {
130  printf(" -- msg %d: type %d\n", i, l->msgs[i].type);
131  }
132  }
133 
134  if (b->log_level > 5 || 0) { ListenerTask_DumpRXInfoTable(l); }
135 
136  for (int i = 0; i <= l->rx_info_max_used; i++) {
137  rx_info_t *info = &l->rx_info[i];
138 
139  switch (info->state) {
140  case RIS_INACTIVE:
141  break;
142  case RIS_HOLD:
143  any_work = true;
144  /* Check timeout */
145  if (info->timeout_sec == 1) {
146  #ifndef TEST
147  struct timeval cur;
148  #endif
149  if (!Util_Timestamp(&cur, false)) {
150  BUS_LOG(b, 0, LOG_LISTENER,
151  "gettimeofday failure in tick_handler!", b->udata);
152  continue;
153  }
154 
155  /* never got a response, but we don't have the callback
156  * either -- the client will notify about the timeout. */
157  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128,
158  "timing out hold info %p -- <fd:%d, seq_id:%lld> at (%ld.%ld)",
159  (void*)info, info->u.hold.fd, (long long)info->u.hold.seq_id,
160  (long)cur.tv_sec, (long)cur.tv_usec);
161 
163  } else {
164  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
165  "decrementing countdown on info %p [%u]: %ld",
166  (void*)info, info->id, (long)info->timeout_sec - 1);
167  info->timeout_sec--;
168  }
169  break;
170  case RIS_EXPECT:
171  any_work = true;
172  if (info->u.expect.error == RX_ERROR_READY_FOR_DELIVERY) {
173  BUS_LOG(b, 4, LOG_LISTENER,
174  "retrying RX event delivery", b->udata);
175  retry_delivery(l, info);
176  } else if (info->u.expect.error == RX_ERROR_DONE) {
177  BUS_LOG_SNPRINTF(b, 4, LOG_LISTENER, b->udata, 64,
178  "cleaning up completed RX event at info %p", (void*)info);
179  clean_up_completed_info(l, info);
180  } else if (info->u.expect.error != RX_ERROR_NONE) {
181  BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 64,
182  "notifying of rx failure -- error %d (info %p)",
183  info->u.expect.error, (void*)info);
185  } else if (info->timeout_sec == 1) {
186  #ifndef TEST
187  struct timeval cur;
188  #endif
189  if (!Util_Timestamp(&cur, false)) {
190  BUS_LOG(b, 0, LOG_LISTENER,
191  "gettimeofday failure in tick_handler!", b->udata);
192  continue;
193  }
194  struct boxed_msg *box = info->u.expect.box;
195  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 256 + 64,
196  "notifying of rx failure -- timeout (info %p) -- "
197  "<fd:%d, seq_id:%lld>, from time (queued:%ld.%ld) to (sent:%ld.%ld) to (now:%ld.%ld)",
198  (void*)info, box->fd, (long long)box->out_seq_id,
199  (long)box->tv_send_start.tv_sec, (long)box->tv_send_start.tv_usec,
200  (long)box->tv_send_done.tv_sec, (long)box->tv_send_done.tv_usec,
201  (long)cur.tv_sec, (long)cur.tv_usec);
202  (void)box;
203 
205  } else {
206  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
207  "decrementing countdown on info %p [%u]: %ld",
208  (void*)info, info->id, (long)info->timeout_sec - 1);
209  info->timeout_sec--;
210  }
211  break;
212  default:
213  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
214  "match fail %d on line %d", info->state, __LINE__);
215  BUS_ASSERT(b, b->udata, false);
216  }
217  }
218  if (!any_work) { l->is_idle = true; }
219 }
220 
222  for (int i = 0; i <= l->rx_info_max_used; i++) {
223  rx_info_t *info = &l->rx_info[i];
224 
225  printf(" -- state: %d, info[%d]: timeout %ld",
226  info->state, info->id, (long)info->timeout_sec);
227  switch (l->rx_info[i].state) {
228  case RIS_HOLD:
229  printf(", fd %d, seq_id %lld, has_result? %d\n",
230  info->u.hold.fd, (long long)info->u.hold.seq_id, info->u.hold.has_result);
231  break;
232  case RIS_EXPECT:
233  {
234  struct boxed_msg *box = info->u.expect.box;
235  printf(", box %p (fd:%d, seq_id:%lld), error %d, has_result? %d\n",
236  (void *)box, box ? box->fd : -1, box ? (long long)box->out_seq_id : -1,
237  info->u.expect.error, info->u.expect.has_result);
238  break;
239  }
240  case RIS_INACTIVE:
241  printf(", INACTIVE (next: %d)\n", info->next ? info->next->id : -1);
242  break;
243  }
244  }
245 }
246 
247 static void retry_delivery(listener *l, rx_info_t *info) {
248  struct bus *b = l->bus;
249  BUS_ASSERT(b, b->udata, info->state == RIS_EXPECT);
250  BUS_ASSERT(b, b->udata, info->u.expect.error == RX_ERROR_READY_FOR_DELIVERY);
251  BUS_ASSERT(b, b->udata, info->u.expect.box);
252 
253  struct boxed_msg *box = info->u.expect.box;
254  info->u.expect.box = NULL; /* release */
255  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
256  "releasing box %p at line %d", (void*)box, __LINE__);
258 
259  #ifndef TEST
260  size_t backpressure = 0;
261  #endif
262  if (Bus_ProcessBoxedMessage(l->bus, box, &backpressure)) {
263  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
264  "successfully delivered box %p (seq_id %lld) from info %d at line %d (retry)",
265  (void*)box, (long long)box->out_seq_id, info->id, __LINE__);
266  info->u.expect.error = RX_ERROR_DONE;
268  } else {
269  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
270  "returning box %p at line %d", (void*)box, __LINE__);
271  info->u.expect.box = box; /* retry in tick_handler */
272  }
273 
274  observe_backpressure(l, backpressure);
275 }
276 
278  struct bus *b = l->bus;
279  BUS_ASSERT(b, b->udata, info->state == RIS_EXPECT);
280  BUS_ASSERT(b, b->udata, info->u.expect.error == RX_ERROR_DONE);
281  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
282  "info %p, box is %p at line %d", (void*)info,
283  (void*)info->u.expect.box, __LINE__);
284 
285  #ifndef TEST
286  size_t backpressure = 0;
287  #endif
288  if (info->u.expect.box) {
289  struct boxed_msg *box = info->u.expect.box;
290  if (box->result.status != BUS_SEND_SUCCESS) {
291  printf("*** info %d: info->timeout %ld\n",
292  info->id, (long)info->timeout_sec);
293  printf(" info->error %d\n", info->u.expect.error);
294  printf(" info->box == %p\n", (void*)box);
295  printf(" info->box->result.status == %d\n", box->result.status);
296  printf(" info->box->fd %d\n", box->fd);
297  printf(" info->box->out_seq_id %lld\n", (long long)box->out_seq_id);
298  printf(" info->box->out_msg %p\n", (void*)box->out_msg);
299 
300  }
302  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
303  "releasing box %p at line %d", (void*)box, __LINE__);
304  info->u.expect.box = NULL; /* release */
305  if (Bus_ProcessBoxedMessage(l->bus, box, &backpressure)) {
307  } else {
308  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
309  "returning box %p at line %d", (void*)box, __LINE__);
310  info->u.expect.box = box; /* retry in tick_handler */
311  }
312  } else { /* already processed, just release it */
314  }
315 
316  observe_backpressure(l, backpressure);
317 }
318 
320  rx_info_t *info, bus_send_status_t status) {
321  #ifndef TEST
322  size_t backpressure = 0;
323  #endif
324  struct bus *b = l->bus;
325  BUS_ASSERT(b, b->udata, info->state == RIS_EXPECT);
326  BUS_ASSERT(b, b->udata, info->u.expect.box);
327 
328  BUS_ASSERT(b, b->udata, status != BUS_SEND_UNDEFINED);
329  info->u.expect.box->result.status = status;
330 
331  boxed_msg *box = info->u.expect.box;
332  info->u.expect.box = NULL;
333  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
334  "releasing box %p at line %d", (void*)box, __LINE__);
335  if (Bus_ProcessBoxedMessage(l->bus, box, &backpressure)) {
336  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
337  "delivered box %p with failure message %d at line %d (info %p)",
338  (void*)box, status, __LINE__, (void*)info);
339  info->u.expect.error = RX_ERROR_DONE;
341  } else {
342  /* RetuBus_RegisterSocket to info, will be released on retry. */
343  info->u.expect.box = box;
344  }
345 
346  observe_backpressure(l, backpressure);
347 }
348 
349 static connection_info *get_connection_info(struct listener *l, int fd) {
350  struct bus *b = l->bus;
351  for (int i = 0; i < l->tracked_fds; i++) {
352  connection_info *ci = l->fd_info[i];
353  BUS_ASSERT(b, b->udata, ci);
354  if (ci->fd == fd) { return ci; }
355  }
356  return NULL;
357 }
358 
360  struct bus *b = l->bus;
361  BUS_ASSERT(b, b->udata, info);
362  BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128,
363  "releasing RX info %d (%p), state %d", info->id, (void *)info, info->state);
364  BUS_ASSERT(b, b->udata, info->id < MAX_PENDING_MESSAGES);
365  BUS_ASSERT(b, b->udata, info == &l->rx_info[info->id]);
366 
367  switch (info->state) {
368  case RIS_HOLD:
369  BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128,
370  " -- releasing HOLD: has result? %d", info->u.hold.has_result);
371  if (info->u.hold.has_result) {
372  /* If we have a message that timed out, we need to free it,
373  * but don't know how. We should never get here, because it
374  * means the client finished sending the message, but the
375  * listener never got the handler callback. */
376 
377  if (info->u.hold.result.ok) {
378  void *msg = info->u.hold.result.u.success.msg;
379  int64_t seq_id = info->u.hold.result.u.success.seq_id;
380 
381  connection_info *ci = get_connection_info(l, info->u.hold.fd);
382  if (ci && b->unexpected_msg_cb) {
383  BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 128,
384  "CALLING UNEXPECTED_MSG_CB ON RESULT %p", (void *)&info->u.hold.result);
385  b->unexpected_msg_cb(msg, seq_id, b->udata, ci->udata);
386  } else {
387  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128,
388  "LEAKING RESULT %p", (void *)&info->u.hold.result);
389  }
390  }
391  }
392  break;
393  case RIS_EXPECT:
394  BUS_ASSERT(b, b->udata, info->u.expect.error == RX_ERROR_DONE);
395  BUS_ASSERT(b, b->udata, info->u.expect.box == NULL);
396  break;
397  default:
398  case RIS_INACTIVE:
399  BUS_ASSERT(b, b->udata, false);
400  }
401 
402  /* Set to no longer active and push on the freelist. */
403  BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128,
404  "releasing rx_info_t %d (%p), was %d",
405  info->id, (void *)info, info->state);
406 
407  BUS_ASSERT(b, b->udata, info->state != RIS_INACTIVE);
408  info->state = RIS_INACTIVE;
409  memset(&info->u, 0, sizeof(info->u));
410  info->next = l->rx_info_freelist;
411  l->rx_info_freelist = info;
412 
413  if (l->rx_info_max_used == info->id && info->id > 0) {
414  BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128,
415  "rx_info_max_used--, from %d to %d",
417  while (l->rx_info[l->rx_info_max_used].state == RIS_INACTIVE) {
418  l->rx_info_max_used--;
419  if (l->rx_info_max_used == 0) { break; }
420  }
422  }
423 
424  l->rx_info_in_use--;
425 }
426 
428  struct bus *b = l->bus;
429  BUS_ASSERT(b, b->udata, msg->id < MAX_QUEUE_MESSAGES);
430  msg->type = MSG_NONE;
431 
432  for (;;) {
433  listener_msg *fl = l->msg_freelist;
434  msg->next = fl;
435  if (ATOMIC_BOOL_COMPARE_AND_SWAP(&l->msg_freelist, fl, msg)) {
436  for (;;) {
437  int16_t miu = l->msgs_in_use;
438  if (ATOMIC_BOOL_COMPARE_AND_SWAP(&l->msgs_in_use, miu, miu - 1)) {
439  BUS_ASSERT(b, b->udata, miu >= 0);
440  BUS_LOG(b, 3, LOG_LISTENER, "Releasing msg", b->udata);
441  return;
442  }
443  }
444  }
445  }
446 }
447 
448 bool ListenerTask_GrowReadBuf(listener *l, size_t nsize) {
449  if (nsize < l->read_buf_size) { return true; }
450 
451  uint8_t *nbuf = realloc(l->read_buf, nsize);
452  if (nbuf) {
453  struct bus *b = l->bus;
454  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
455  "Read buffer realloc success, %p (%zd) to %p (%zd)",
456  l->read_buf, l->read_buf_size,
457  nbuf, nsize);
458  l->read_buf = nbuf;
459  l->read_buf_size = nsize;
460  return true;
461  } else {
462  return false;
463  }
464 }
465 
467  struct bus *b = l->bus;
468 
469  struct boxed_msg *box = info->u.expect.box;
470  info->u.expect.box = NULL; /* release */
471  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
472  "attempting delivery of %p", (void*)box);
473  BUS_LOG_SNPRINTF(b, 5, LOG_MEMORY, b->udata, 128,
474  "releasing box %p at line %d", (void*)box, __LINE__);
475 
476  bus_msg_result_t *result = &box->result;
477  if (result->status == BUS_SEND_SUCCESS) {
478  } else if (result->status == BUS_SEND_REQUEST_COMPLETE) {
479  result->status = BUS_SEND_SUCCESS;
480  } else {
481  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128,
482  "unexpected status for completed RX event at info +%d, box %p, status %d",
483  info->id, (void *)box, result->status);
484  }
485 
486  bus_unpack_cb_res_t unpacked_result;
487  switch (info->state) {
488  case RIS_EXPECT:
489  BUS_ASSERT(b, b->udata, info->u.expect.has_result);
490  unpacked_result = info->u.expect.result;
491  break;
492  default:
493  case RIS_HOLD:
494  case RIS_INACTIVE:
495  BUS_ASSERT(b, b->udata, false);
496  }
497 
498  BUS_ASSERT(b, b->udata, unpacked_result.ok);
499  int64_t seq_id = unpacked_result.u.success.seq_id;
500  void *opaque_msg = unpacked_result.u.success.msg;
501  result->u.response.seq_id = seq_id;
502  result->u.response.opaque_msg = opaque_msg;
503 
504  #ifndef TEST
505  size_t backpressure = 0;
506  #endif
507  if (Bus_ProcessBoxedMessage(b, box, &backpressure)) {
508  /* success */
509  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256,
510  "successfully delivered box %p (seq_id:%lld), marking info %d as DONE",
511  (void*)box, (long long)seq_id, info->id);
512  info->u.expect.error = RX_ERROR_DONE;
513  BUS_LOG_SNPRINTF(b, 4, LOG_LISTENER, b->udata, 128,
514  "initial clean-up attempt for completed RX event at info +%d", info->id);
515  clean_up_completed_info(l, info);
516  info = NULL; /* drop out of scope, likely to be stale */
517  } else {
518  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
519  "returning box %p at line %d", (void*)box, __LINE__);
520  info->u.expect.box = box; /* retry in tick_handler */
521  }
522  observe_backpressure(l, backpressure);
523 }
524 
525 static void observe_backpressure(listener *l, size_t backpressure) {
526  size_t cur = l->upstream_backpressure;
527  l->upstream_backpressure = (cur + backpressure) / 2;
528 }
529 
530 
532  uint16_t msg_fill_pressure = 0;
533 
534  if (l->msgs_in_use < 0.25 * MAX_QUEUE_MESSAGES) {
535  msg_fill_pressure = 0;
536  } else if (l->msgs_in_use < 0.5 * MAX_QUEUE_MESSAGES) {
537  msg_fill_pressure = MSG_BP_1QTR * 2 * l->msgs_in_use;
538  } else if (l->msgs_in_use < 0.75 * MAX_QUEUE_MESSAGES) {
539  msg_fill_pressure = MSG_BP_HALF * 10 * l->msgs_in_use;
540  } else {
541  msg_fill_pressure = MSG_BP_3QTR * 100 * l->msgs_in_use;
542  }
543 
544  uint16_t rx_info_fill_pressure = 0;
545  if (l->rx_info_in_use < 0.25 * MAX_PENDING_MESSAGES) {
546  rx_info_fill_pressure = 0;
547  } else if (l->rx_info_in_use < 0.5 * MAX_PENDING_MESSAGES) {
548  rx_info_fill_pressure = RX_INFO_BP_1QTR * l->rx_info_in_use;
549  } else if (l->rx_info_in_use < 0.75 * MAX_PENDING_MESSAGES) {
550  rx_info_fill_pressure = RX_INFO_BP_HALF * l->rx_info_in_use;
551  } else {
552  rx_info_fill_pressure = RX_INFO_BP_3QTR * l->rx_info_in_use;
553  }
554 
555  uint16_t threadpool_fill_pressure = THREADPOOL_BP * l->upstream_backpressure;
556 
557  struct bus *b = l->bus;
558  BUS_LOG_SNPRINTF(b, 6, LOG_SENDER, b->udata, 64,
559  "lbp: %u, %u (iu %u), %u",
560  msg_fill_pressure, rx_info_fill_pressure, l->rx_info_in_use, threadpool_fill_pressure);
561 
562  return msg_fill_pressure + rx_info_fill_pressure
563  + threadpool_fill_pressure;
564 }
const uint16_t id
bool Util_IsResumableIOError(int errno_)
Definition: util.c:26
#define MAX_QUEUE_MESSAGES
Max number of unprocessed queue messages.
Record in table for partially processed messages.
struct rx_info_t::@12::@13 hold
listener_msg * msg_freelist
#define LISTENER_TASK_TIMEOUT_DELAY
How long the listener should wait for responses before becoming idle and blocking.
bool Bus_ProcessBoxedMessage(struct bus *b, struct boxed_msg *box, size_t *backpressure)
Deliver a boxed message to the thread pool to execute.
Definition: bus.c:580
static connection_info * get_connection_info(struct listener *l, int fd)
A queue message, with a command in the tagged union.
#define MSG_BP_1QTR
Coefficients for backpressure based on certain conditions.
union bus_unpack_cb_res_t::@0 u
bus_msg_result_t result
Result message, constructed in place after the request/response cycle has completed or failed due to ...
Receiver of responses.
static void observe_backpressure(listener *l, size_t backpressure)
bus_unexpected_msg_cb * unexpected_msg_cb
listener_msg msgs[(32)]
#define RX_INFO_BP_HALF
connection_info * fd_info[1000]
The connection info, corresponding to the the file descriptors tracked in l->fds. ...
uint16_t ListenerTask_GetBackpressure(struct listener *l)
Get the current backpressure from the listener.
void * ListenerTask_MainLoop(void *arg)
Listener's main loop – function pointer for pthread start function.
Definition: listener_task.c:45
#define MSG_BP_HALF
struct timeval tv_send_start
Event timestamps to track timeouts.
Message bus.
#define RX_INFO_BP_3QTR
void * udata
User data for callbacks.
uint8_t * out_msg
struct listener_msg * next
struct bus * bus
bool ListenerTask_GrowReadBuf(listener *l, size_t nsize)
Grow the listener's read buffer to NSIZE.
union rx_info_t::@12 u
int64_t out_seq_id
void ListenerTask_ReleaseRXInfo(struct listener *l, rx_info_t *info)
Release an INFO to the listener's info pool.
void ListenerCmd_CheckIncomingMessages(listener *l, int *res)
Process incoming commands, if any.
Definition: listener_cmd.c:73
struct rx_info_t * next
rx_info_t * rx_info_freelist
#define LISTENER_SHUTDOWN_COMPLETE_FD
int log_level
Log level.
uint16_t tracked_fds
FDs currently tracked by listener.
bus_send_status_t status
Definition: bus_types.h:216
void * udata
user connection data
void ListenerTask_ReleaseMsg(struct listener *l, listener_msg *msg)
Release a message to the listener's message pool.
void ListenerCmd_NotifyCaller(listener *l, int fd)
Notify the listener's caller that a command has completed.
Definition: listener_cmd.c:42
static void tick_handler(listener *l)
rx_info_t rx_info[(1024)]
void ListenerTask_AttemptDelivery(listener *l, struct rx_info_t *info)
Attempt delivery of the message boxed in INFO.
struct bus_msg_result_t::@3::@5 response
Per-socket connection context.
#define WHILE
Definition: listener_task.c:37
bus_send_status_t
Definition: bus_types.h:193
#define THREADPOOL_BP
uint16_t rx_info_max_used
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
Definition: bus_types.h:59
void ListenerTask_DumpRXInfoTable(listener *l)
Dump the RX info table.
struct rx_info_t::@12::@14 expect
struct timeval tv_send_done
#define MSG_BP_3QTR
static void clean_up_completed_info(listener *l, rx_info_t *info)
#define INCOMING_MSG_PIPE
Offset to account for the first file descriptor being the incoming message pipe.
#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)
Definition: atomic.h:27
void ListenerTask_NotifyMessageFailure(listener *l, rx_info_t *info, bus_send_status_t status)
Notify the client that the event in INFO has failed with STATUS.
#define INFINITE_DELAY
Special value meaning poll should block indefinitely.
union bus_msg_result_t::@3 u
struct bus_unpack_cb_res_t::@0::@1 success
#define BUS_ASSERT(B, UDATA, COND)
Definition: bus_types.h:83
rx_info_state state
static void retry_delivery(listener *l, rx_info_t *info)
#define LISTENER_NO_FD
Sentinel values used for listener.shutdown_notify_fd.
#define RX_INFO_BP_1QTR
bool Util_Timestamp(struct timeval *tv, bool relative)
Definition: util.c:30
int fd
Destination filename and message body.
size_t upstream_backpressure
void ListenerIO_AttemptRecv(listener *l, int available)
Definition: listener_io.c:45
int syscall_poll(struct pollfd fds[], nfds_t nfds, int timeout)
Wrappers for syscalls, to allow mocking for testing.
Definition: syscall.c:27
#define MAX_PENDING_MESSAGES
#define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA)
Definition: bus_types.h:45