kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
echosrv.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <assert.h>
25 #include <err.h>
26 #include <errno.h>
27 #include <sys/time.h>
28 #include <sys/socket.h>
29 
30 // For TCP_NODELAY
31 #include <netinet/tcp.h>
32 
33 #include "socket99.h"
34 #include "util.h"
35 
36 /* TODO: Linux => epoll */
37 #include <poll.h>
38 
39 #define BUF_SZ (2 * 1024L * 1024)
40 #define MAX_CLIENTS 10
41 
42 #define NO_CLIENT ((int)-1)
43 
44 static uint8_t read_buf[BUF_SZ];
45 
46 #define LOG(VERBOSITY, ...) \
47  do { \
48  if (VERBOSITY <= cfg->verbosity) { \
49  printf(__VA_ARGS__); \
50  } \
51  } \
52  while(0)
53 
54 
55 typedef struct {
56  int fd;
57  size_t out_bytes;
58  size_t written_bytes;
59  uint8_t buf[2*BUF_SZ];
60 } out_buf;
61 
62 typedef struct {
63  int port_low;
64  int port_high;
65  int port_count;
66  int verbosity;
67 
68  int ticks;
69  time_t last_second;
70  int successful_writes;
71  int last_successful_writes;
72 
73  struct pollfd *accept_fds;
74  struct pollfd *client_fds;
75 
76  int client_count;
77  out_buf *out_bufs;
78 } config;
79 
80 static void init_polling(config *cfg);
81 static void open_ports(config *cfg);
82 static void handle_incoming_connections(config *cfg, int available);
83 static void handle_client_io(config *cfg, int available);
84 static void listen_loop_poll(config *cfg);
85 static void register_client(config *cfg, int cfd,
86  struct sockaddr *addr, socklen_t addr_len);
87 static void disconnect_client(config *cfg, int fd);
88 static void enqueue_write(config *cfg, int fd,
89  uint8_t *buf, size_t write_size);
90 
91 static void usage(void) {
92  fprintf(stderr,
93  "Usage: echosrv [-l LOW_PORT] [-h HIGH_PORT] [-v] \n"
94  " If only one of -l or -h are specified, it will use just that one port.\n"
95  " -v can be used multiple times to increase verbosity.\n");
96  exit(1);
97 }
98 
99 static void parse_args(int argc, char **argv, config *cfg) {
100  int a = 0;
101 
102  while ((a = getopt(argc, argv, "l:h:v")) != -1) {
103  switch (a) {
104  case 'l': /* low port */
105  cfg->port_low = atoi(optarg);
106  break;
107  case 'h': /* high port */
108  cfg->port_high = atoi(optarg);
109  break;
110  case 'v': /* verbosity */
111  cfg->verbosity++;
112  break;
113  default:
114  fprintf(stderr, "illegal option: -- %c\n", a);
115  usage();
116  }
117  }
118 
119  if (cfg->port_low == 0) { cfg->port_low = cfg->port_high; }
120  if (cfg->port_high == 0) { cfg->port_high = cfg->port_low; }
121  if (cfg->port_high < cfg->port_low || cfg->port_low == 0) { usage(); }
122  if (cfg->verbosity > 0) { printf("verbosity: %d\n", cfg->verbosity); }
123 }
124 
125 int main(int argc, char **argv) {
126  config cfg;
127  memset(&cfg, 0, sizeof(cfg));
128  parse_args(argc, argv, &cfg);
129 
130  init_polling(&cfg);
131  open_ports(&cfg);
132  listen_loop_poll(&cfg);
133 
134  return 0;
135 }
136 
137 static void init_polling(config *cfg) {
138  cfg->port_count = cfg->port_high - cfg->port_low + 1;
139 
140  size_t accept_fds_sz = cfg->port_count * sizeof(struct pollfd);
141  struct pollfd *accept_fds = malloc(accept_fds_sz);
142  assert(accept_fds);
143  memset(accept_fds, 0, accept_fds_sz);
144  cfg->accept_fds = accept_fds;
145 
146  size_t client_fds_sz = MAX_CLIENTS * sizeof(struct pollfd);
147  struct pollfd *client_fds = malloc(client_fds_sz);
148  assert(client_fds);
149  memset(client_fds, 0, client_fds_sz);
150  cfg->client_fds = client_fds;
151 
152  for (int i = 0; i < MAX_CLIENTS; i++) {
153  cfg->client_fds[i].fd = NO_CLIENT;
154  }
155 
156  size_t out_bufs_sz = MAX_CLIENTS * sizeof(out_buf);
157  cfg->out_bufs = malloc(out_bufs_sz);
158  assert(cfg->out_bufs);
159  memset(cfg->out_bufs, 0, out_bufs_sz);
160 }
161 
162 static void open_ports(config *cfg) {
163  socket99_config scfg = {
164  .host = "127.0.0.1",
165  .port = 0,
166  .server = true,
167  .nonblocking = true,
168  };
169 
170  socket99_result res;
171 
172  for (int i = 0; i < cfg->port_count; i++) {
173  scfg.port = i + cfg->port_low;
174  bool ok = socket99_open(&scfg, &res);
175  if (!ok) {
176  fprintf(stderr, "Error opening port %d: ", i + cfg->port_low);
177  socket99_fprintf(stderr, &res);
178  exit(1);
179  } else {
180  cfg->accept_fds[i].fd = res.fd;
181  cfg->accept_fds[i].events = (POLLIN);
182  LOG(2, " -- Accepting on %s:%d\n", scfg.host, scfg.port);
183  }
184  }
185 }
186 
187 #define MAX_TIMEOUT 1000
188 
189 static void tick_handler(config *cfg) {
190  cfg->ticks++;
191  LOG(1, "%ld -- client_count: %d, successful writes: %d (avg %d/sec, delta %d)\n",
192  cfg->last_second, cfg->client_count, cfg->successful_writes,
193  cfg->successful_writes / cfg->ticks,
194  cfg->successful_writes - cfg->last_successful_writes);
195  cfg->last_successful_writes = cfg->successful_writes;
196 }
197 
198 static void listen_loop_poll(config *cfg) {
199  struct timeval tv;
200 
201  if (!Util_Timestamp(&tv, true)) { assert(false); }
202  cfg->last_second = tv.tv_sec;
203 
204  assert(cfg->client_fds[0].fd == NO_CLIENT);
205 
206  int delay = 1;
207 
208  for (;;) {
209  if (!Util_Timestamp(&tv, true)) { assert(false); }
210  if (tv.tv_sec != cfg->last_second) {
211  tick_handler(cfg);
212  cfg->last_second = tv.tv_sec;
213  }
214 
215  int accept_delay = 0;
216  int client_delay = 0;
217 
218  if (cfg->client_count > 0) {
219  client_delay = delay;
220  } else if (cfg->client_count < MAX_CLIENTS) {
221  accept_delay = delay;
222  } else {
223  assert(false);
224  }
225 
226  if (cfg->client_count < MAX_CLIENTS) {
227  /* Listen for incoming connections */
228  int res = poll(cfg->accept_fds, cfg->port_count, accept_delay);
229  LOG((res == 0 ? 6 : 3), "accept poll res %d\n", res);
230 
231  if (res == -1) {
232  if (Util_IsResumableIOError(errno)) {
233  errno = 0;
234  } else {
235  err(1, "poll");
236  }
237  } else if (res == 0) {
238  /* nothing */
239  } else {
240  handle_incoming_connections(cfg, res);
241  delay = 0;
242  }
243  }
244 
245  if (cfg->client_count > 0) {
246  int res = poll(cfg->client_fds, cfg->client_count, client_delay);
247  delay <<= 1;
248  LOG((res == 0 ? 6 : 3), "client poll res %d\n", res);
249  /* Read / write to clients */
250  if (res == -1) {
251  if (Util_IsResumableIOError(errno)) {
252  errno = 0;
253  } else {
254  err(1, "poll");
255  }
256  } else if (res == 0) {
257  /* nothing */
258  } else {
259  LOG(3, "poll(client_fds, %d) => res of %d\n",
260  cfg->client_count, res);
261  handle_client_io(cfg, res);
262  delay = 0;
263  }
264  }
265 
266  if (delay == 0) {
267  delay = 1;
268  } else {
269  delay *= 2;
270  if (delay > MAX_TIMEOUT) { delay = MAX_TIMEOUT; }
271  }
272  }
273 }
274 
275 static void handle_incoming_connections(config *cfg, int available) {
276  int checked = 0;
277  for (int i = 0; i < cfg->port_count; i++) {
278  if (checked == available) { break; }
279  struct pollfd *fd = &cfg->accept_fds[i];
280  if ((fd->revents & POLLERR) || (fd->revents & POLLHUP)) {
281  assert(false);
282  } else if (fd->revents & POLLIN) {
283  checked++;
284  struct sockaddr address;
285  socklen_t addr_len;
286  int client_fd = accept(fd->fd, &address, &addr_len);
287  if (client_fd == -1) {
288  if (errno == EWOULDBLOCK) {
289  errno = 0;
290  continue;
291  } else {
292  close(fd->fd);
293  err(1, "listen");
294  //continue;
295  }
296  }
297 
298  LOG(2, "accepting client %d\n", client_fd);
299  register_client(cfg, client_fd, &address, addr_len);
300  }
301  }
302 }
303 
304 static void register_client(config *cfg, int cfd,
305  struct sockaddr *addr, socklen_t addr_len) {
306 
307  /* assign to first empty slot */
308  int client_index = 0;
309  for (client_index = 0; client_index < MAX_CLIENTS; client_index++) {
310  LOG(4, " -- cfg->client_fds[%d].fd == %d\n", client_index, cfg->client_fds[client_index].fd);
311  if (cfg->client_fds[client_index].fd == NO_CLIENT) { break; }
312  }
313  assert(client_index != MAX_CLIENTS);
314  LOG(3, " -- assigning client in slot %d\n", client_index);
315 
316  out_buf *out = &cfg->out_bufs[client_index];
317  out->fd = cfd;
318  out->out_bytes = 0;
319 
320  int flag = 1;
321  if (0 != setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int))) {
322  err(1, "setsockopt");
323  }
324 
325  struct pollfd *fd = &cfg->client_fds[client_index];
326  fd->fd = cfd;
327  fd->events = (POLLIN);
328 
329  cfg->client_count++;
330 }
331 
332 static void handle_client_io(config *cfg, int available) {
333  int checked = 0;
334  for (int i = 0; i < cfg->client_count; i++) {
335  if (checked == available) { break; }
336  struct pollfd *fd = &cfg->client_fds[i];
337 
338  LOG(4, "fd[%d]->events 0x%08x ==> revents: 0x%08x\n", i, fd->events, fd->revents);
339 
340  if ((fd->revents & POLLERR) || (fd->revents & POLLHUP)) {
341  LOG(3, "Disconnecting client %d\n", fd->fd);
342  disconnect_client(cfg, fd->fd);
343  } else if (fd->revents & POLLOUT) {
344  checked++;
345  out_buf *buf = &cfg->out_bufs[i];
346  LOG(2, "writing %zd bytes to %d\n", buf->out_bytes, buf->fd);
347  size_t wr_size = buf->out_bytes - buf->written_bytes;
348  ssize_t wres = write(buf->fd, &buf->buf[buf->written_bytes], wr_size);
349  if (wres == -1) {
350  if (Util_IsResumableIOError(errno)) {
351  errno = 0;
352  } else if (errno == EPIPE) {
353  disconnect_client(cfg, fd->fd);
354  } else {
355  err(1, "write");
356  }
357  } else {
358  buf->written_bytes += wres;
359  if (buf->written_bytes == buf->out_bytes) {
360  buf->out_bytes = 0;
361  buf->written_bytes = 0;
362  cfg->successful_writes++;
363  fd->events = POLLIN;
364  }
365  }
366  } else if (fd->revents & POLLIN) {
367  checked++;
368  /* if we can read, then queue up same as a write */
369  out_buf *buf = &cfg->out_bufs[i];
370  ssize_t rres = read(buf->fd, read_buf, BUF_SZ - 1);
371 
372  if (rres == -1) {
373  if (Util_IsResumableIOError(errno)) {
374  errno = 0;
375  } else if (errno == EPIPE) {
376  disconnect_client(cfg, fd->fd);
377  } else {
378  err(1, "read");
379  }
380  } else if (rres > 0) {
381  /* enqueue outgoing write */
382  LOG(2, "%ld -- got %zd bytes\n",
383  cfg->last_second, rres);
384  enqueue_write(cfg, buf->fd, read_buf, rres);
385  } else {
386  LOG(2, "else, rres %zd\n", rres);
387  }
388  }
389  }
390 }
391 
392 static void enqueue_write(config *cfg, int fd,
393  uint8_t *buf, size_t write_size) {
394  assert(write_size <= BUF_SZ);
395 
396  for (int i = 0; i < cfg->client_count; i++) {
397  out_buf *out = &cfg->out_bufs[i];
398  if (fd == out->fd) {
399  buf[write_size] = '\0';
400  LOG(2, "%ld -- enqueing write of %zd bytes\n",
401  cfg->last_second, write_size);
402 
403  size_t free_space = BUF_SZ - out->out_bytes;
404  assert(free_space >= write_size);
405  memcpy(&out->buf[out->out_bytes], buf, write_size);
406  out->out_bytes += write_size;
407 
408  cfg->client_fds[i].events = POLLOUT; /* write only */
409  return;
410  }
411  }
412 
413  assert(false);
414 }
415 
416 static void disconnect_client(config *cfg, int fd) {
417  for (int i = 0; i < MAX_CLIENTS; i++) {
418  if (cfg->client_fds[i].fd == fd) {
419  LOG(3, "disconnecting client %d\n", fd);
420  cfg->client_fds[i].fd = NO_CLIENT;
421  close(fd);
422  cfg->out_bufs[i].fd = NO_CLIENT;
423  cfg->out_bufs[i].out_bytes = 0;
424  cfg->client_count--;
425  return;
426  }
427  }
428 
429  assert(false); /* not found */
430 }
static void disconnect_client(config *cfg, int fd)
Definition: echosrv.c:416
bool Util_IsResumableIOError(int errno_)
Definition: util.c:26
static void register_client(config *cfg, int cfd, struct sockaddr *addr, socklen_t addr_len)
Definition: echosrv.c:304
static void listen_loop_poll(config *cfg)
Definition: echosrv.c:198
static uint8_t read_buf[(2 *1024L *1024)]
Definition: echosrv.c:44
static void tick_handler(config *cfg)
Definition: echosrv.c:189
static void handle_client_io(config *cfg, int available)
Definition: echosrv.c:332
static void usage(void)
Definition: echosrv.c:91
static void handle_incoming_connections(config *cfg, int available)
Definition: echosrv.c:275
static void enqueue_write(config *cfg, int fd, uint8_t *buf, size_t write_size)
Definition: echosrv.c:392
int main(int argc, char **argv)
Definition: echosrv.c:125
static void parse_args(int argc, char **argv, config *cfg)
Definition: echosrv.c:99
#define LOG(VERBOSITY,...)
Definition: echosrv.c:46
static void open_ports(config *cfg)
Definition: echosrv.c:162
static void init_polling(config *cfg)
Definition: echosrv.c:137
#define NO_CLIENT
Definition: echosrv.c:42
#define MAX_TIMEOUT
Definition: echosrv.c:187
bool Util_Timestamp(struct timeval *tv, bool relative)
Definition: util.c:30
#define BUF_SZ
Definition: echosrv.c:39
#define MAX_CLIENTS
Definition: echosrv.c:40