kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
bus_example.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include <unistd.h>
23 #include <assert.h>
24 #include <string.h>
25 #include <err.h>
26 #include <signal.h>
27 #include <sys/time.h>
28 #include <poll.h>
29 
30 #ifdef __Linux__
31 // some Linux distros put this in a nonstandard place.
32 #include <getopt.h>
33 #endif
34 
35 #include "bus.h"
36 #include "atomic.h"
37 #include "socket99.h"
38 #include "util.h"
39 
40 typedef struct {
41  uint32_t magic_number;
42  uint32_t size;
43  int64_t seq_id;
44 } prot_header_t;
45 
46 #define MAGIC_NUMBER 3
47 
48 #define MAX_SOCKETS 1000
49 #define DEFAULT_BUF_SIZE (1024 * 1024 + sizeof(prot_header_t))
50 #define PRINT_RESPONSES 0
51 
56 };
57 
58 typedef struct {
59  enum socket_state state;
60  size_t cur_payload_size;
61  size_t used;
62  uint8_t buf[];
63 } socket_info;
64 
65 typedef struct {
66  int port_low;
67  int port_high;
68  int verbosity;
69  size_t buf_size;
70 
71  int sockets_used;
72  int sockets[MAX_SOCKETS];
73 
74  time_t last_second;
75  size_t ticks;
76  size_t sent_msgs;
77  size_t completed_deliveries;
78  size_t max_seq_id;
79 
80  socket_info *info[MAX_SOCKETS];
81 } example_state;
82 
83 example_state state;
84 
85 static void run_bus(example_state *s, struct bus *b);
86 static void parse_args(int argc, char **argv, example_state *s);
87 static time_t get_cur_second(void);
88 
89 static void log_cb(log_event_t event, int log_level, const char *msg, void *udata) {
90  example_state *s = (example_state *)udata;
91  const char *event_str = Bus_LogEventStr(event);
92  fprintf(/*stderr*/stdout, "%ld -- %s[%d] -- %s\n",
93  s->last_second, event_str, log_level, msg);
94 }
95 
96 #define LOG(VERBOSITY, ...) \
97  do { if (state.verbosity >= VERBOSITY) { printf(__VA_ARGS__); } } while(0)
98 
99 static const char *executable_name = NULL;
100 
101 static bus_sink_cb_res_t reset_transfer(socket_info *si) {
102  //printf("EXPECT: header of %zd bytes\n", sizeof(prot_header_t));
103  bus_sink_cb_res_t res = { /* prime pump with header size */
104  .next_read = sizeof(prot_header_t),
105  };
106 
107  si->state = STATE_AWAITING_HEADER;
108  si->used = 0;
109  return res;
110 }
111 
113  size_t read_size, void *socket_udata) {
114 
115  socket_info *si = (socket_info *)socket_udata;
116  assert(si);
117 
118  switch (si->state) {
119  case STATE_UNINIT:
120  {
121  assert(read_size == 0);
122  return reset_transfer(si);
123  }
125  {
126  bool valid_header = true;
127  bool split_header = false;
128 
129  size_t header_rem = sizeof(prot_header_t) - si->used;
130  if (read_size > header_rem) {
131  printf("surplus read_size %zd\n", read_size);
132  printf("header_rem %zd (sizeof(prot_header_t) %zd)\n", header_rem, sizeof(prot_header_t));
133  assert(false);
134  } else if (read_size < sizeof(prot_header_t)) {
135  //printf("split header, %zd\n", read_size);
136  split_header = true;
137  }
138 
139  size_t copied = read_size;
140  if (copied > header_rem) { copied = header_rem; }
141 
142  memcpy(&si->buf[si->used], read_buf, copied);
143  si->used += copied;
144 
145  if (si->used < sizeof(prot_header_t)) {
146  bus_sink_cb_res_t res = {
147  .next_read = sizeof(prot_header_t) - si->used,
148  };
149  si->state = STATE_AWAITING_HEADER;
150  return res;
151  }
152 
153  assert(si->used == sizeof(prot_header_t));
154 
155  prot_header_t *header = (prot_header_t *)&si->buf[0];
156 
157  if (si->used < sizeof(prot_header_t)) {
158  printf("INVALID HEADER A: read_size %zd\n", si->used);
159  valid_header = false;
160  } else if (header->magic_number != MAGIC_NUMBER) {
161  printf("INVALID HEADER B: magic number 0x%08x\n", header->magic_number);
162  valid_header = false;
163  }
164 
165  if (valid_header) {
166  prot_header_t *header = (prot_header_t *)&si->buf[0];
167  si->cur_payload_size = header->size;
168  memcpy(&si->buf[si->used], read_buf + copied, read_size - copied);
169  si->used += read_size - copied;
170  bus_sink_cb_res_t res = {
171  .next_read = header->size,
172  };
173  si->state = STATE_AWAITING_BODY;
174  return res;
175  } else {
176  assert(false);
177  return reset_transfer(si);
178  }
179 
180  break;
181  }
182  case STATE_AWAITING_BODY:
183  {
184  assert(DEFAULT_BUF_SIZE - si->used >= read_size);
185  memcpy(&si->buf[si->used], read_buf, read_size);
186  si->used += read_size;
187  assert(si->used <= si->cur_payload_size + sizeof(prot_header_t));
188  size_t rem = si->cur_payload_size + sizeof(prot_header_t) - si->used;
189 
190  if (rem == 0) {
191  bus_sink_cb_res_t res = {
192  .next_read = sizeof(prot_header_t),
193  .full_msg_buffer = read_buf,
194  };
195  si->state = STATE_AWAITING_HEADER;
196  si->used = 0;
197  return res;
198  } else {
199  bus_sink_cb_res_t res = {
200  .next_read = rem,
201  };
202  return res;
203  }
204  }
205  default:
206  assert(false);
207  }
208 }
209 
210 static bus_unpack_cb_res_t unpack_cb(void *msg, void *socket_udata) {
211  /* just got .full_msg_buffer from sink_cb -- pass it along as-is */
212  socket_info *si = (socket_info *)socket_udata;
213  prot_header_t *header = (prot_header_t *)&si->buf[0];
214  uint8_t *payload = (uint8_t *)&si->buf[sizeof(prot_header_t)];
215 
216 #if PRINT_RESPONSES
217  for (int i = 0; i < si->used; i++) {
218  if ((i & 15) == 0 && i > 0) { printf("\n"); }
219  printf("%02x ", si->buf[i]);
220  }
221 #endif
222 
223  bus_unpack_cb_res_t res = {
224  .ok = true,
225  .u.success = {
226  .seq_id = header->seq_id,
227  .msg = payload,
228  },
229  };
230  return res;
231 }
232 
233 static void unexpected_msg_cb(void *msg,
234  int64_t seq_id, void *bus_udata, void *socket_udata) {
235  printf("\n\n\nUNEXPECTED MESSAGE: %p, seq_id %lld, bus_udata %p, socket_udata %p\n\n\n\n",
236  msg, (long long)seq_id, bus_udata, socket_udata);
237 
238  assert(false);
239 }
240 
241 int main(int argc, char **argv) {
242  executable_name = argv[0];
243  state.last_second = get_cur_second();
244 
245  parse_args(argc, argv, &state);
246 
247  bus_config cfg = {
248  .log_cb = log_cb,
249  .log_level = state.verbosity,
250  .sink_cb = sink_cb,
251  .unpack_cb = unpack_cb,
252  .unexpected_msg_cb = unexpected_msg_cb,
253  .bus_udata = &state,
254  };
255  bus_result res = {0};
256  if (!Bus_Init(&cfg, &res)) {
257  LOG(0, "failed to init bus: %d\n", res.status);
258  return 1;
259  }
260 
261  struct bus *b = res.bus;
262 
263  run_bus(&state, b);
264 
265  if (b) {
266  LOG(1, "shutting down\n");
267  Bus_Shutdown(b);
268  Bus_Free(b);
269  return 0;
270  } else {
271  return 1;
272  }
273 }
274 
275 static bool loop_flag = true;
276 static sig_t old_sigint_handler = NULL;
277 
278 static void signal_handler(int arg) {
279  loop_flag = false;
280  LOG(3, "-- caught signal %d\n", arg);
281  if (arg == SIGINT) {
282  signal(arg, old_sigint_handler);
283  }
284  (void)arg;
285 }
286 
287 static sig_t register_signal_handler(int sig) {
288  sig_t old_handler = signal(sig, signal_handler);
289  if (old_handler == SIG_ERR) { err(1, "signal"); }
290  return old_handler;
291 }
292 
293 static void register_signal_handlers(void) {
294  (void)register_signal_handler(SIGPIPE);
295  old_sigint_handler = register_signal_handler(SIGINT);
296 }
297 
298 static void usage(void) {
299  fprintf(stderr,
300  "Usage: %s [-b BUF_SIZE] [-l LOW_PORT] [-h HIGH_PORT] [-s STOP_AT_SEQUENCE_ID] [-v] \n"
301  " If only one of -l or -h are specified, it will use just that one port.\n"
302  " -v can be used multiple times to increase verbosity.\n"
303  , executable_name);
304  exit(1);
305 }
306 
307 static void parse_args(int argc, char **argv, example_state *s) {
308  int a = 0;
309 
310  s->buf_size = DEFAULT_BUF_SIZE;
311 
312  while ((a = getopt(argc, argv, "b:l:h:s:v")) != -1) {
313  switch (a) {
314  case 'b': /* buffer size */
315  s->buf_size = atol(optarg);
316  break;
317  case 'l': /* low port */
318  s->port_low = atoi(optarg);
319  break;
320  case 'h': /* high port */
321  s->port_high = atoi(optarg);
322  break;
323  case 's': /* stop loop at sequence ID */
324  s->max_seq_id = atoi(optarg);
325  break;
326  case 'v': /* verbosity */
327  s->verbosity++;
328  break;
329  default:
330  fprintf(stderr, "illegal option: -- %c\n", a);
331  usage();
332  }
333  }
334 
335  if (s->port_low == 0) { s->port_low = s->port_high; }
336  if (s->port_high == 0) { s->port_high = s->port_low; }
337  if (s->port_high < s->port_low || s->port_low == 0) { usage(); }
338 
339  if (s->verbosity > 0) {
340  printf("bus_size: %zd\n", s->buf_size);
341  printf("port_low: %d\n", s->port_low);
342  printf("port_high: %d\n", s->port_high);
343  printf("verbosity: %d\n", s->verbosity);
344  }
345 }
346 
347 static void open_sockets(example_state *s) {
348  int socket_count = s->port_high - s->port_low + 1;
349 
350  size_t info_size = sizeof(socket_info) + s->buf_size;
351 
352  /* open socket(s) */
353  for (int i = 0; i < socket_count; i++) {
354  int port = i + s->port_low;
355 
356  socket99_config cfg = {
357  .host = "127.0.0.1",
358  .port = port,
359  .nonblocking = true,
360  };
361  socket99_result res;
362 
363  if (!socket99_open(&cfg, &res)) {
364  socket99_fprintf(stderr, &res);
365  exit(1);
366  }
367 
368  s->sockets[i] = res.fd;
369  s->sockets_used++;
370  socket_info *si = malloc(info_size);
371  assert(si);
372  memset(si, 0, info_size);
373  s->info[i] = si;
374  }
375 }
376 
377 static size_t construct_msg(uint8_t *buf, size_t buf_size, size_t payload_size, int64_t seq_id) {
378  size_t header_size = sizeof(prot_header_t);
379  assert(buf_size > header_size);
380  prot_header_t *header = (prot_header_t *)buf;
381 
382  uint8_t *payload = &buf[header_size];
383  for (int i = 0; i < payload_size; i++) {
384  payload[i] = (uint8_t)(0xFF & i);
385  }
386 
387  header->magic_number = MAGIC_NUMBER;
388  header->seq_id = seq_id;
389  header->size = payload_size;
390 
391  return header_size + payload_size;
392 }
393 
394 /* Should it CAS on the completion counter?
395  * This should account for nearly all CPU usage. */
396 #define INCREMENT_COMPLETION_COUNTER 1
397 
398 static void completion_cb(bus_msg_result_t *res, void *udata) {
399  example_state *s = &state;
400  socket_info *si = (socket_info *)udata;
401 
402  switch (res->status) {
403  case BUS_SEND_SUCCESS:
404  {
405 #if INCREMENT_COMPLETION_COUNTER
406  for (;;) {
407  size_t cur = s->completed_deliveries;
408  if (ATOMIC_BOOL_COMPARE_AND_SWAP(&s->completed_deliveries, cur, cur + 1)) {
409  LOG(3, " -- ! got %zd bytes, seq_id 0x%08llx, %p\n",
410  si->cur_payload_size, res->u.response.seq_id,
411  res->u.response.opaque_msg);
412  break;
413  }
414  }
415 #else
416  (void)s;
417  (void)si;
418 #endif
419  }
420  break;
421  case BUS_SEND_TX_TIMEOUT:
422  LOG(1, "BUS_SEND_TIMEOUT\n");
423  break;
424  case BUS_SEND_TX_FAILURE:
425  LOG(1, "BUS_SEND_TX_FAILURE\n");
426  break;
427  case BUS_SEND_RX_FAILURE:
428  LOG(1, "BUS_SEND_RX_FAILURE\n");
429  break;
430  case BUS_SEND_RX_TIMEOUT:
431  LOG(1, "BUS_SEND_RX_TIMEOUT\n");
432  break;
433  default:
434  fprintf(stderr, "match fail: %d\n", res->status);
435  assert(false);
436  }
437 }
438 
439 static void tick_handler(example_state *s) {
440  s->ticks++;
441  LOG(1, "%ld -- %zd ticks, %zd requests, %zd responses (delta %zd)\n",
442  s->last_second, s->ticks, s->sent_msgs, s->completed_deliveries,
443  s->completed_deliveries - s->sent_msgs);
444 }
445 
446 static time_t get_cur_second(void) {
447  struct timeval tv;
448  if (!Util_Timestamp(&tv, true)) {
449  assert(false);
450  }
451  return tv.tv_sec;
452 }
453 
454 static void run_bus(example_state *s, struct bus *b) {
456  open_sockets(s);
457 
458  for (int i = 0; i < s->sockets_used; i++) {
459  Bus_RegisterSocket(b, BUS_SOCKET_PLAIN, s->sockets[i], s->info[i]);
460  }
461 
462  int cur_socket_i = 0;
463  int64_t seq_id = 1;
464 
465  uint8_t msg_buf[DEFAULT_BUF_SIZE];
466  size_t buf_size = sizeof(msg_buf);
467  size_t payload_size = seq_id;
468 
469  s->last_second = get_cur_second();
470 
471  int sleep_counter = 0;
472  int dropped = 0;
473 
474  while (loop_flag) {
475  time_t cur_second = get_cur_second();
476  if (cur_second != s->last_second) {
477  tick_handler(s);
478  s->last_second = cur_second;
479  payload_size = 8;
480  if (sleep_counter > 0) {
481  sleep_counter--;
482  if (sleep_counter == 0) {
483  printf(" -- resuming\n");
484  }
485  } else if ((cur_second & 0x3f) == 0x00) {
486  printf(" -- sleeping for 10 seconds\n");
487  sleep_counter = 10;
488  }
489  }
490 
491  if (sleep_counter == 0) {
492  size_t msg_size = construct_msg(msg_buf, buf_size,
493  100 * /*payload_size * */ 1024L, seq_id);
494  LOG(3, " @@ sending message with %zd bytes\n", msg_size);
495  bus_user_msg msg = {
496  .fd = s->sockets[cur_socket_i],
497  .seq_id = seq_id,
498  .msg = msg_buf,
499  .msg_size = msg_size,
500  .cb = completion_cb,
501  .udata = s->info[cur_socket_i],
502  };
503 
504  s->sent_msgs++;
505  payload_size++;
506  if (!Bus_SendRequest(b, &msg)) {
507  LOG(1, " @@@ Bus_SendRequest failed!\n");
508  dropped++;
509  if (dropped >= 100) {
510  LOG(1, " @@@ more than 100 send failures, halting\n");
511  loop_flag = false;
512  }
513  }
514 
515  cur_socket_i++;
516  if (cur_socket_i == s->sockets_used) {
517  cur_socket_i = 0;
518  }
519  seq_id++;
520  if (seq_id == s->max_seq_id) {
521  loop_flag = false;
522  }
523  } else {
524  poll(NULL, 0, 1000);
525  }
526  }
527 }
Bus_Init_res_t status
Definition: bus_types.h:210
static void completion_cb(bus_msg_result_t *res, void *udata)
Definition: bus_example.c:398
static void run_bus(example_state *s, struct bus *b)
Definition: bus_example.c:454
bool Bus_Shutdown(bus *b)
Begin shutting the system down.
Definition: bus.c:494
static void log_cb(log_event_t event, int log_level, const char *msg, void *udata)
Definition: bus_example.c:89
log_event_t
Definition: bus_types.h:95
static const char * executable_name
Definition: bus_example.c:99
static sig_t register_signal_handler(int sig)
Definition: bus_example.c:287
static bool loop_flag
Definition: bus_example.c:275
static void parse_args(int argc, char **argv, example_state *s)
Definition: bus_example.c:307
static uint8_t read_buf[(2 *1024L *1024)]
Definition: echosrv.c:44
bool Bus_SendRequest(struct bus *b, bus_user_msg *msg)
Send a request.
Definition: bus.c:297
static bus_sink_cb_res_t sink_cb(uint8_t *read_buf, size_t read_size, void *socket_udata)
Definition: bus_example.c:112
void Bus_Free(bus *b)
Free internal data structures for the bus.
Definition: bus.c:599
static void open_sockets(example_state *s)
Definition: bus_example.c:347
Message bus.
const char * Bus_LogEventStr(log_event_t event)
Get the string key for a log event ID.
Definition: bus.c:335
static void register_signal_handlers(void)
Definition: bus_example.c:293
void * udata
User data for callbacks.
socket_state
Definition: bus_example.c:52
static bus_unpack_cb_res_t unpack_cb(void *msg, void *socket_udata)
Definition: bus_example.c:210
int main(int argc, char **argv)
Definition: bus_example.c:241
bus_send_status_t status
Definition: bus_types.h:216
static time_t get_cur_second(void)
Definition: bus_example.c:446
static void usage(void)
Definition: bus_example.c:298
struct bus_msg_result_t::@3::@5 response
#define DEFAULT_BUF_SIZE
Definition: bus_example.c:49
static void signal_handler(int arg)
Definition: bus_example.c:278
#define MAX_SOCKETS
Definition: bus_example.c:48
bool Bus_Init(bus_config *config, struct bus_result *res)
Initialize a bus, based on configuration in *config.
Definition: bus.c:64
#define MAGIC_NUMBER
Definition: bus_example.c:46
static bus_sink_cb_res_t reset_transfer(socket_info *si)
Definition: bus_example.c:101
#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)
Definition: atomic.h:27
bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *udata)
Register a socket connected to an endpoint, and data that will be passed to all interactions on that ...
Definition: bus.c:350
static sig_t old_sigint_handler
Definition: bus_example.c:276
union bus_msg_result_t::@3 u
static void unexpected_msg_cb(void *msg, int64_t seq_id, void *bus_udata, void *socket_udata)
Definition: bus_example.c:233
struct bus * bus
Definition: bus_types.h:211
bus_log_cb * log_cb
Definition: bus_types.h:175
#define LOG(VERBOSITY,...)
Definition: bus_example.c:96
bool Util_Timestamp(struct timeval *tv, bool relative)
Definition: util.c:30
example_state state
Definition: bus_example.c:83
static size_t construct_msg(uint8_t *buf, size_t buf_size, size_t payload_size, int64_t seq_id)
Definition: bus_example.c:377
static void tick_handler(example_state *s)
Definition: bus_example.c:439