kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
listener_helper.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include "listener_helper.h"
21 #include "listener_task.h"
22 #include "syscall.h"
23 #include "atomic.h"
24 
25 #include <assert.h>
26 
27 #ifdef TEST
28 uint8_t msg_buf[sizeof(uint8_t)];
29 #endif
30 
32  struct bus *b = l->bus;
33 
34  BUS_LOG_SNPRINTF(b, 4, LOG_LISTENER, b->udata, 128,
35  "get_free_msg -- in use: %d", l->msgs_in_use);
36 
37  for (;;) {
38  listener_msg *head = l->msg_freelist;
39  if (head == NULL) {
40  BUS_LOG(b, 3, LOG_LISTENER, "No free messages!", b->udata);
41  return NULL;
42  } else if (ATOMIC_BOOL_COMPARE_AND_SWAP(&l->msg_freelist, head, head->next)) {
43  for (;;) {
44  int16_t miu = l->msgs_in_use;
45  assert(miu < MAX_QUEUE_MESSAGES);
46 
47  if (ATOMIC_BOOL_COMPARE_AND_SWAP(&l->msgs_in_use, miu, miu + 1)) {
48  BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 64,
49  "got free msg: %u", head->id);
50 
51  /* Add counterpressure between the client and the listener.
52  * 10 * ((n >> 1) ** 2) microseconds */
53  int16_t delay = 10 * (miu >> 1) * (miu >> 1);
54  if (delay > 0) {
55  struct timespec ts = {
56  .tv_sec = 0,
57  .tv_nsec = 1000L * delay,
58  };
59  nanosleep(&ts, NULL);
60  }
61  BUS_ASSERT(b, b->udata, head->type == MSG_NONE);
62  memset(&head->u, 0, sizeof(head->u));
63  return head;
64  }
65  }
66  }
67  }
68 }
69 
70 bool ListenerHelper_PushMessage(struct listener *l, listener_msg *msg, int *reply_fd) {
71  struct bus *b = l->bus;
72  BUS_ASSERT(b, b->udata, msg);
73 
74  #ifndef TEST
75  uint8_t msg_buf[sizeof(uint8_t)];
76  #endif
77  msg_buf[0] = msg->id;
78 
79  if (reply_fd) { *reply_fd = msg->pipes[0]; }
80 
81  for (;;) {
82  ssize_t wr = syscall_write(l->commit_pipe, msg_buf, sizeof(msg_buf));
83  if (wr == sizeof(msg_buf)) {
84  return true; // committed
85  } else {
86  if (errno == EINTR) { /* signal interrupted; retry */
87  errno = 0;
88  continue;
89  } else {
90  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
91  "write_commit error, errno %d", errno);
92  errno = 0;
94  return false;
95  }
96  }
97  }
98 }
99 
101  struct bus *b = l->bus;
102 
103  struct rx_info_t *head = l->rx_info_freelist;
104  if (head == NULL) {
105  BUS_LOG(b, 6, LOG_SENDER, "No rx_info cells left!", b->udata);
106  return NULL;
107  } else {
108  l->rx_info_freelist = head->next;
109  head->next = NULL;
110  l->rx_info_in_use++;
111  BUS_LOG(l->bus, 4, LOG_LISTENER, "reserving RX info", l->bus->udata);
112  BUS_ASSERT(b, b->udata, head->state == RIS_INACTIVE);
113  if (l->rx_info_max_used < head->id) {
114  BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128,
115  "rx_info_max_used <- %d", head->id);
116  l->rx_info_max_used = head->id;
118  }
119 
120  BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128,
121  "got free rx_info_t %d (%p)", head->id, (void *)head);
122  BUS_ASSERT(b, b->udata, head == &l->rx_info[head->id]);
123  return head;
124  }
125 }
126 
128  int fd, int64_t seq_id) {
129  struct bus *b = l->bus;
130  for (int i = 0; i <= l->rx_info_max_used; i++) {
131  rx_info_t *info = &l->rx_info[i];
132 
133  switch (info->state) {
134  case RIS_INACTIVE:
135  break; /* skip */
136  case RIS_HOLD:
137  BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128,
138  "find_info_by_sequence_id: info (%p) at +%d: <fd:%d, seq_id:%lld>",
139  (void*)info, info->id, fd, (long long)seq_id);
140  if (info->u.hold.fd == fd && info->u.hold.seq_id == seq_id) {
141  return info;
142  }
143  break;
144  case RIS_EXPECT:
145  {
146  struct boxed_msg *box = info->u.expect.box;
147  BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128,
148  "find_info_by_sequence_id: info (%p) at +%d [s %d]: box is %p",
149  (void*)info, info->id, info->u.expect.error, (void*)box);
150  if (box != NULL && box->out_seq_id == seq_id && box->fd == fd) {
151  return info;
152  }
153  break;
154  }
155  default:
156  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
157  "match fail %d on line %d", info->state, __LINE__);
158  BUS_ASSERT(b, b->udata, false);
159  }
160  }
161 
162  if (b->log_level > 5 || 0) {
163  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
164  "==== Could not find <fd:%d, seq_id:%lld>, dumping table ====\n",
165  fd, (long long)seq_id);
167  }
168  /* Not found. Probably an unsolicited status message. */
169  return NULL;
170 }
const uint16_t id
#define MAX_QUEUE_MESSAGES
Max number of unprocessed queue messages.
Record in table for partially processed messages.
struct rx_info_t::@12::@13 hold
listener_msg * msg_freelist
A queue message, with a command in the tagged union.
Receiver of responses.
bool ListenerHelper_PushMessage(struct listener *l, listener_msg *msg, int *reply_fd)
Push a message into the listener's message queue.
Message bus.
void * udata
User data for callbacks.
struct listener_msg * next
struct bus * bus
union rx_info_t::@12 u
int64_t out_seq_id
struct rx_info_t * next
rx_info_t * rx_info_freelist
int log_level
Log level.
void ListenerTask_ReleaseMsg(struct listener *l, listener_msg *msg)
Release a message to the listener's message pool.
rx_info_t rx_info[(1024)]
uint16_t rx_info_max_used
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
Definition: bus_types.h:59
void ListenerTask_DumpRXInfoTable(listener *l)
Dump the RX info table.
struct rx_info_t::@12::@14 expect
union listener_msg::@6 u
rx_info_t * ListenerHelper_GetFreeRXInfo(struct listener *l)
Get a free RX_INFO record, if any are available.
#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)
Definition: atomic.h:27
rx_info_t * ListenerHelper_FindInfoBySequenceID(listener *l, int fd, int64_t seq_id)
Try to find an RX_INFO record by a pair.
#define BUS_ASSERT(B, UDATA, COND)
Definition: bus_types.h:83
ssize_t syscall_write(int fildes, const void *buf, size_t nbyte)
Definition: syscall.c:35
rx_info_state state
int fd
Destination filename and message body.
listener_msg * ListenerHelper_GetFreeMsg(listener *l)
Get a free message from the listener's message pool.
#define MAX_PENDING_MESSAGES
#define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA)
Definition: bus_types.h:45