corosync  2.4.2
totemudpu.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2005 MontaVista Software, Inc.
3  * Copyright (c) 2006-2012 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8 
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <config.h>
37 
38 #include <assert.h>
39 #include <sys/mman.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <sys/socket.h>
43 #include <netdb.h>
44 #include <sys/un.h>
45 #include <sys/ioctl.h>
46 #include <sys/param.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
49 #include <unistd.h>
50 #include <fcntl.h>
51 #include <stdlib.h>
52 #include <stdio.h>
53 #include <errno.h>
54 #include <sched.h>
55 #include <time.h>
56 #include <sys/time.h>
57 #include <sys/poll.h>
58 #include <sys/uio.h>
59 #include <limits.h>
60 
61 #include <qb/qbdefs.h>
62 #include <qb/qbloop.h>
63 
64 #include <corosync/sq.h>
65 #include <corosync/list.h>
66 #include <corosync/swab.h>
67 #define LOGSYS_UTILS_ONLY 1
68 #include <corosync/logsys.h>
69 #include "totemudpu.h"
70 
71 #include "util.h"
72 #include "totemcrypto.h"
73 
74 #include <nss.h>
75 #include <pk11pub.h>
76 #include <pkcs11.h>
77 #include <prerror.h>
78 
79 #ifndef MSG_NOSIGNAL
80 #define MSG_NOSIGNAL 0
81 #endif
82 
83 #define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * FRAME_SIZE_MAX)
84 #define NETIF_STATE_REPORT_UP 1
85 #define NETIF_STATE_REPORT_DOWN 2
86 
87 #define BIND_STATE_UNBOUND 0
88 #define BIND_STATE_REGULAR 1
89 #define BIND_STATE_LOOPBACK 2
90 
92  struct list_head list;
94  int fd;
95  int active;
96 };
97 
100 
102 
104 
106 
108 
109  void *context;
110 
112  void *context,
113  const void *msg,
114  unsigned int msg_len);
115 
117  void *context,
118  const struct totem_ip_address *iface_address);
119 
121 
122  /*
123  * Function and data used to log messages
124  */
126 
128 
130 
132 
134 
136 
138  int level,
139  int subsys,
140  const char *function,
141  const char *file,
142  int line,
143  const char *format,
144  ...)__attribute__((format(printf, 6, 7)));
145 
146  void *udpu_context;
147 
149 
150  struct iovec totemudpu_iov_recv;
151 
153 
155 
157 
159 
161 
163 
164  struct timeval stats_tv_start;
165 
167 
168  int firstrun;
169 
170  qb_loop_timer_handle timer_netif_check_timeout;
171 
172  unsigned int my_memb_entries;
173 
175 
177 
179 
181 
182  qb_loop_timer_handle timer_merge_detect_timeout;
183 
185 
187 };
188 
189 struct work_item {
190  const void *msg;
191  unsigned int msg_len;
193 };
194 
195 static int totemudpu_build_sockets (
196  struct totemudpu_instance *instance,
197  struct totem_ip_address *bindnet_address,
198  struct totem_ip_address *bound_to);
199 
200 static int totemudpu_create_sending_socket(
201  void *udpu_context,
202  const struct totem_ip_address *member);
203 
205  void *udpu_context);
206 
207 static void totemudpu_start_merge_detect_timeout(
208  void *udpu_context);
209 
210 static void totemudpu_stop_merge_detect_timeout(
211  void *udpu_context);
212 
213 static struct totem_ip_address localhost;
214 
215 static void totemudpu_instance_initialize (struct totemudpu_instance *instance)
216 {
217  memset (instance, 0, sizeof (struct totemudpu_instance));
218 
220 
221  instance->totemudpu_iov_recv.iov_base = instance->iov_buffer;
222 
223  instance->totemudpu_iov_recv.iov_len = FRAME_SIZE_MAX; //sizeof (instance->iov_buffer);
224 
225  /*
226  * There is always atleast 1 processor
227  */
228  instance->my_memb_entries = 1;
229 
230  list_init (&instance->member_list);
231 }
232 
233 #define log_printf(level, format, args...) \
234 do { \
235  instance->totemudpu_log_printf ( \
236  level, instance->totemudpu_subsys_id, \
237  __FUNCTION__, __FILE__, __LINE__, \
238  (const char *)format, ##args); \
239 } while (0);
240 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
241 do { \
242  char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
243  const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
244  instance->totemudpu_log_printf ( \
245  level, instance->totemudpu_subsys_id, \
246  __FUNCTION__, __FILE__, __LINE__, \
247  fmt ": %s (%d)", ##args, _error_ptr, err_num); \
248  } while(0)
249 
251  void *udpu_context,
252  const char *cipher_type,
253  const char *hash_type)
254 {
255 
256  return (0);
257 }
258 
259 
260 static inline void ucast_sendmsg (
261  struct totemudpu_instance *instance,
262  struct totem_ip_address *system_to,
263  const void *msg,
264  unsigned int msg_len)
265 {
266  struct msghdr msg_ucast;
267  int res = 0;
268  size_t buf_out_len;
269  unsigned char buf_out[FRAME_SIZE_MAX];
270  struct sockaddr_storage sockaddr;
271  struct iovec iovec;
272  int addrlen;
273 
274  /*
275  * Encrypt and digest the message
276  */
278  instance->crypto_inst,
279  (const unsigned char *)msg,
280  msg_len,
281  buf_out,
282  &buf_out_len) != 0) {
283  log_printf(LOGSYS_LEVEL_CRIT, "Error encrypting/signing packet (non-critical)");
284  return;
285  }
286 
287  iovec.iov_base = (void *)buf_out;
288  iovec.iov_len = buf_out_len;
289 
290  /*
291  * Build unicast message
292  */
294  instance->totem_interface->ip_port, &sockaddr, &addrlen);
295  memset(&msg_ucast, 0, sizeof(msg_ucast));
296  msg_ucast.msg_name = &sockaddr;
297  msg_ucast.msg_namelen = addrlen;
298  msg_ucast.msg_iov = (void *)&iovec;
299  msg_ucast.msg_iovlen = 1;
300 #ifdef HAVE_MSGHDR_CONTROL
301  msg_ucast.msg_control = 0;
302 #endif
303 #ifdef HAVE_MSGHDR_CONTROLLEN
304  msg_ucast.msg_controllen = 0;
305 #endif
306 #ifdef HAVE_MSGHDR_FLAGS
307  msg_ucast.msg_flags = 0;
308 #endif
309 #ifdef HAVE_MSGHDR_ACCRIGHTS
310  msg_ucast.msg_accrights = NULL;
311 #endif
312 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN
313  msg_ucast.msg_accrightslen = 0;
314 #endif
315 
316 
317  /*
318  * Transmit unicast message
319  * An error here is recovered by totemsrp
320  */
321  res = sendmsg (instance->token_socket, &msg_ucast, MSG_NOSIGNAL);
322  if (res < 0) {
323  LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug,
324  "sendmsg(ucast) failed (non-critical)");
325  }
326 }
327 
328 static inline void mcast_sendmsg (
329  struct totemudpu_instance *instance,
330  const void *msg,
331  unsigned int msg_len,
332  int only_active)
333 {
334  struct msghdr msg_mcast;
335  int res = 0;
336  size_t buf_out_len;
337  unsigned char buf_out[FRAME_SIZE_MAX];
338  struct iovec iovec;
339  struct sockaddr_storage sockaddr;
340  int addrlen;
341  struct list_head *list;
342  struct totemudpu_member *member;
343 
344  /*
345  * Encrypt and digest the message
346  */
348  instance->crypto_inst,
349  (const unsigned char *)msg,
350  msg_len,
351  buf_out,
352  &buf_out_len) != 0) {
353  log_printf(LOGSYS_LEVEL_CRIT, "Error encrypting/signing packet (non-critical)");
354  return;
355  }
356 
357  iovec.iov_base = (void *)buf_out;
358  iovec.iov_len = buf_out_len;
359 
360  memset(&msg_mcast, 0, sizeof(msg_mcast));
361  /*
362  * Build multicast message
363  */
364  for (list = instance->member_list.next;
365  list != &instance->member_list;
366  list = list->next) {
367 
368  member = list_entry (list,
369  struct totemudpu_member,
370  list);
371 
372  /*
373  * Do not send multicast message if message is not "flush", member
374  * is inactive and timeout for sending merge message didn't expired.
375  */
376  if (only_active && !member->active && !instance->send_merge_detect_message)
377  continue ;
378 
380  instance->totem_interface->ip_port, &sockaddr, &addrlen);
381  msg_mcast.msg_name = &sockaddr;
382  msg_mcast.msg_namelen = addrlen;
383  msg_mcast.msg_iov = (void *)&iovec;
384  msg_mcast.msg_iovlen = 1;
385  #ifdef HAVE_MSGHDR_CONTROL
386  msg_mcast.msg_control = 0;
387  #endif
388  #ifdef HAVE_MSGHDR_CONTROLLEN
389  msg_mcast.msg_controllen = 0;
390  #endif
391  #ifdef HAVE_MSGHDR_FLAGS
392  msg_mcast.msg_flags = 0;
393  #endif
394  #ifdef HAVE_MSGHDR_ACCRIGHTS
395  msg_mcast.msg_accrights = NULL;
396  #endif
397  #ifdef HAVE_MSGHDR_ACCRIGHTSLEN
398  msg_mcast.msg_accrightslen = 0;
399  #endif
400 
401  /*
402  * Transmit multicast message
403  * An error here is recovered by totemsrp
404  */
405  res = sendmsg (member->fd, &msg_mcast, MSG_NOSIGNAL);
406  if (res < 0) {
407  LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug,
408  "sendmsg(mcast) failed (non-critical)");
409  }
410  }
411 
412  if (!only_active || instance->send_merge_detect_message) {
413  /*
414  * Current message was sent to all nodes
415  */
417  instance->send_merge_detect_message = 0;
418  }
419 }
420 
422  void *udpu_context)
423 {
424  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
425  int res = 0;
426 
427  if (instance->token_socket > 0) {
428  qb_loop_poll_del (instance->totemudpu_poll_handle,
429  instance->token_socket);
430  close (instance->token_socket);
431  }
432 
433  totemudpu_stop_merge_detect_timeout(instance);
434 
435  return (res);
436 }
437 
438 static int net_deliver_fn (
439  int fd,
440  int revents,
441  void *data)
442 {
443  struct totemudpu_instance *instance = (struct totemudpu_instance *)data;
444  struct msghdr msg_recv;
445  struct iovec *iovec;
446  struct sockaddr_storage system_from;
447  int bytes_received;
448  int res = 0;
449 
450  iovec = &instance->totemudpu_iov_recv;
451 
452  /*
453  * Receive datagram
454  */
455  msg_recv.msg_name = &system_from;
456  msg_recv.msg_namelen = sizeof (struct sockaddr_storage);
457  msg_recv.msg_iov = iovec;
458  msg_recv.msg_iovlen = 1;
459 #ifdef HAVE_MSGHDR_CONTROL
460  msg_recv.msg_control = 0;
461 #endif
462 #ifdef HAVE_MSGHDR_CONTROLLEN
463  msg_recv.msg_controllen = 0;
464 #endif
465 #ifdef HAVE_MSGHDR_FLAGS
466  msg_recv.msg_flags = 0;
467 #endif
468 #ifdef HAVE_MSGHDR_ACCRIGHTS
469  msg_recv.msg_accrights = NULL;
470 #endif
471 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN
472  msg_recv.msg_accrightslen = 0;
473 #endif
474 
475  bytes_received = recvmsg (fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
476  if (bytes_received == -1) {
477  return (0);
478  } else {
479  instance->stats_recv += bytes_received;
480  }
481 
482  /*
483  * Authenticate and if authenticated, decrypt datagram
484  */
485 
486  res = crypto_authenticate_and_decrypt (instance->crypto_inst, iovec->iov_base, &bytes_received);
487  if (res == -1) {
488  log_printf (instance->totemudpu_log_level_security, "Received message has invalid digest... ignoring.");
490  "Invalid packet data");
491  iovec->iov_len = FRAME_SIZE_MAX;
492  return 0;
493  }
494  iovec->iov_len = bytes_received;
495 
496  /*
497  * Handle incoming message
498  */
499  instance->totemudpu_deliver_fn (
500  instance->context,
501  iovec->iov_base,
502  iovec->iov_len);
503 
504  iovec->iov_len = FRAME_SIZE_MAX;
505  return (0);
506 }
507 
508 static int netif_determine (
509  struct totemudpu_instance *instance,
510  struct totem_ip_address *bindnet,
511  struct totem_ip_address *bound_to,
512  int *interface_up,
513  int *interface_num)
514 {
515  int res;
516 
517  res = totemip_iface_check (bindnet, bound_to,
518  interface_up, interface_num,
519  instance->totem_config->clear_node_high_bit);
520 
521 
522  return (res);
523 }
524 
525 
526 /*
527  * If the interface is up, the sockets for totem are built. If the interface is down
528  * this function is requeued in the timer list to retry building the sockets later.
529  */
530 static void timer_function_netif_check_timeout (
531  void *data)
532 {
533  struct totemudpu_instance *instance = (struct totemudpu_instance *)data;
534  int interface_up;
535  int interface_num;
536  struct totem_ip_address *bind_address;
537 
538  /*
539  * Build sockets for every interface
540  */
541  netif_determine (instance,
542  &instance->totem_interface->bindnet,
543  &instance->totem_interface->boundto,
544  &interface_up, &interface_num);
545  /*
546  * If the network interface isn't back up and we are already
547  * in loopback mode, add timer to check again and return
548  */
549  if ((instance->netif_bind_state == BIND_STATE_LOOPBACK &&
550  interface_up == 0) ||
551 
552  (instance->my_memb_entries == 1 &&
553  instance->netif_bind_state == BIND_STATE_REGULAR &&
554  interface_up == 1)) {
555 
556  qb_loop_timer_add (instance->totemudpu_poll_handle,
557  QB_LOOP_MED,
558  instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
559  (void *)instance,
560  timer_function_netif_check_timeout,
561  &instance->timer_netif_check_timeout);
562 
563  /*
564  * Add a timer to check for a downed regular interface
565  */
566  return;
567  }
568 
569  if (instance->token_socket > 0) {
570  qb_loop_poll_del (instance->totemudpu_poll_handle,
571  instance->token_socket);
572  close (instance->token_socket);
573  }
574 
575  if (interface_up == 0) {
576  /*
577  * Interface is not up
578  */
580  bind_address = &localhost;
581 
582  /*
583  * Add a timer to retry building interfaces and request memb_gather_enter
584  */
585  qb_loop_timer_add (instance->totemudpu_poll_handle,
586  QB_LOOP_MED,
587  instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
588  (void *)instance,
589  timer_function_netif_check_timeout,
590  &instance->timer_netif_check_timeout);
591  } else {
592  /*
593  * Interface is up
594  */
596  bind_address = &instance->totem_interface->bindnet;
597  }
598  /*
599  * Create and bind the multicast and unicast sockets
600  */
601  totemudpu_build_sockets (instance,
602  bind_address,
603  &instance->totem_interface->boundto);
604 
605  qb_loop_poll_add (instance->totemudpu_poll_handle,
606  QB_LOOP_MED,
607  instance->token_socket,
608  POLLIN, instance, net_deliver_fn);
609 
610  totemip_copy (&instance->my_id, &instance->totem_interface->boundto);
611 
612  /*
613  * This reports changes in the interface to the user and totemsrp
614  */
615  if (instance->netif_bind_state == BIND_STATE_REGULAR) {
616  if (instance->netif_state_report & NETIF_STATE_REPORT_UP) {
618  "The network interface [%s] is now up.",
619  totemip_print (&instance->totem_interface->boundto));
621  instance->totemudpu_iface_change_fn (instance->context, &instance->my_id);
622  }
623  /*
624  * Add a timer to check for interface going down in single membership
625  */
626  if (instance->my_memb_entries == 1) {
627  qb_loop_timer_add (instance->totemudpu_poll_handle,
628  QB_LOOP_MED,
629  instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
630  (void *)instance,
631  timer_function_netif_check_timeout,
632  &instance->timer_netif_check_timeout);
633  }
634 
635  } else {
638  "The network interface is down.");
639  instance->totemudpu_iface_change_fn (instance->context, &instance->my_id);
640  }
642 
643  }
644 }
645 
646 /* Set the socket priority to INTERACTIVE to ensure
647  that our messages don't get queued behind anything else */
648 static void totemudpu_traffic_control_set(struct totemudpu_instance *instance, int sock)
649 {
650 #ifdef SO_PRIORITY
651  int prio = 6; /* TC_PRIO_INTERACTIVE */
652 
653  if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(int))) {
654  LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
655  "Could not set traffic priority");
656  }
657 #endif
658 }
659 
660 static int totemudpu_build_sockets_ip (
661  struct totemudpu_instance *instance,
662  struct totem_ip_address *bindnet_address,
663  struct totem_ip_address *bound_to,
664  int interface_num)
665 {
666  struct sockaddr_storage sockaddr;
667  int addrlen;
668  int res;
669  unsigned int recvbuf_size;
670  unsigned int optlen = sizeof (recvbuf_size);
671 
672  /*
673  * Setup unicast socket
674  */
675  instance->token_socket = socket (bindnet_address->family, SOCK_DGRAM, 0);
676  if (instance->token_socket == -1) {
677  LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
678  "socket() failed");
679  return (-1);
680  }
681 
682  totemip_nosigpipe (instance->token_socket);
683  res = fcntl (instance->token_socket, F_SETFL, O_NONBLOCK);
684  if (res == -1) {
685  LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
686  "Could not set non-blocking operation on token socket");
687  return (-1);
688  }
689 
690  /*
691  * Bind to unicast socket used for token send/receives
692  * This has the side effect of binding to the correct interface
693  */
694  totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port, &sockaddr, &addrlen);
695  res = bind (instance->token_socket, (struct sockaddr *)&sockaddr, addrlen);
696  if (res == -1) {
697  LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
698  "bind token socket failed");
699  return (-1);
700  }
701 
702  /*
703  * the token_socket can receive many messages. Allow a large number
704  * of receive messages on this socket
705  */
706  recvbuf_size = MCAST_SOCKET_BUFFER_SIZE;
707  res = setsockopt (instance->token_socket, SOL_SOCKET, SO_RCVBUF,
708  &recvbuf_size, optlen);
709  if (res == -1) {
710  LOGSYS_PERROR (errno, instance->totemudpu_log_level_notice,
711  "Could not set recvbuf size");
712  }
713 
714  return 0;
715 }
716 
717 static int totemudpu_build_sockets (
718  struct totemudpu_instance *instance,
719  struct totem_ip_address *bindnet_address,
720  struct totem_ip_address *bound_to)
721 {
722  int interface_num;
723  int interface_up;
724  int res;
725 
726  /*
727  * Determine the ip address bound to and the interface name
728  */
729  res = netif_determine (instance,
730  bindnet_address,
731  bound_to,
732  &interface_up,
733  &interface_num);
734 
735  if (res == -1) {
736  return (-1);
737  }
738 
739  totemip_copy(&instance->my_id, bound_to);
740 
741  res = totemudpu_build_sockets_ip (instance,
742  bindnet_address, bound_to, interface_num);
743 
744  /* We only send out of the token socket */
745  totemudpu_traffic_control_set(instance, instance->token_socket);
746 
747  /*
748  * Rebind all members to new ips
749  */
751 
752  return res;
753 }
754 
755 /*
756  * Totem Network interface - also does encryption/decryption
757  * depends on poll abstraction, POSIX, IPV4
758  */
759 
760 /*
761  * Create an instance
762  */
764  qb_loop_t *poll_handle,
765  void **udpu_context,
766  struct totem_config *totem_config,
767  totemsrp_stats_t *stats,
768  int interface_no,
769  void *context,
770 
771  void (*deliver_fn) (
772  void *context,
773  const void *msg,
774  unsigned int msg_len),
775 
776  void (*iface_change_fn) (
777  void *context,
778  const struct totem_ip_address *iface_address),
779 
780  void (*target_set_completed) (
781  void *context))
782 {
783  struct totemudpu_instance *instance;
784 
785  instance = malloc (sizeof (struct totemudpu_instance));
786  if (instance == NULL) {
787  return (-1);
788  }
789 
790  totemudpu_instance_initialize (instance);
791 
792  instance->totem_config = totem_config;
793  instance->stats = stats;
794 
795  /*
796  * Configure logging
797  */
798  instance->totemudpu_log_level_security = 1; //totem_config->totem_logging_configuration.log_level_security;
805 
806  /*
807  * Initialize random number generator for later use to generate salt
808  */
809  instance->crypto_inst = crypto_init (totem_config->private_key,
810  totem_config->private_key_len,
811  totem_config->crypto_cipher_type,
812  totem_config->crypto_hash_type,
813  instance->totemudpu_log_printf,
815  instance->totemudpu_log_level_notice,
816  instance->totemudpu_log_level_error,
817  instance->totemudpu_subsys_id);
818  if (instance->crypto_inst == NULL) {
819  free(instance);
820  return (-1);
821  }
822  /*
823  * Initialize local variables for totemudpu
824  */
825  instance->totem_interface = &totem_config->interfaces[interface_no];
826  memset (instance->iov_buffer, 0, FRAME_SIZE_MAX);
827 
828  instance->totemudpu_poll_handle = poll_handle;
829 
830  instance->totem_interface->bindnet.nodeid = instance->totem_config->node_id;
831 
832  instance->context = context;
833  instance->totemudpu_deliver_fn = deliver_fn;
834 
835  instance->totemudpu_iface_change_fn = iface_change_fn;
836 
837  instance->totemudpu_target_set_completed = target_set_completed;
838 
839  totemip_localhost (AF_INET, &localhost);
840  localhost.nodeid = instance->totem_config->node_id;
841 
842  /*
843  * RRP layer isn't ready to receive message because it hasn't
844  * initialized yet. Add short timer to check the interfaces.
845  */
846  qb_loop_timer_add (instance->totemudpu_poll_handle,
847  QB_LOOP_MED,
848  100*QB_TIME_NS_IN_MSEC,
849  (void *)instance,
850  timer_function_netif_check_timeout,
851  &instance->timer_netif_check_timeout);
852 
853  totemudpu_start_merge_detect_timeout(instance);
854 
855  *udpu_context = instance;
856  return (0);
857 }
858 
860 {
861  return malloc (FRAME_SIZE_MAX);
862 }
863 
864 void totemudpu_buffer_release (void *ptr)
865 {
866  return free (ptr);
867 }
868 
870  void *udpu_context,
871  int processor_count)
872 {
873  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
874  int res = 0;
875 
876  instance->my_memb_entries = processor_count;
877  qb_loop_timer_del (instance->totemudpu_poll_handle,
878  instance->timer_netif_check_timeout);
879  if (processor_count == 1) {
880  qb_loop_timer_add (instance->totemudpu_poll_handle,
881  QB_LOOP_MED,
882  instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
883  (void *)instance,
884  timer_function_netif_check_timeout,
885  &instance->timer_netif_check_timeout);
886  }
887 
888  return (res);
889 }
890 
891 int totemudpu_recv_flush (void *udpu_context)
892 {
893  int res = 0;
894 
895  return (res);
896 }
897 
898 int totemudpu_send_flush (void *udpu_context)
899 {
900  int res = 0;
901 
902  return (res);
903 }
904 
906  void *udpu_context,
907  const void *msg,
908  unsigned int msg_len)
909 {
910  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
911  int res = 0;
912 
913  ucast_sendmsg (instance, &instance->token_target, msg, msg_len);
914 
915  return (res);
916 }
918  void *udpu_context,
919  const void *msg,
920  unsigned int msg_len)
921 {
922  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
923  int res = 0;
924 
925  mcast_sendmsg (instance, msg, msg_len, 0);
926 
927  return (res);
928 }
929 
931  void *udpu_context,
932  const void *msg,
933  unsigned int msg_len)
934 {
935  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
936  int res = 0;
937 
938  mcast_sendmsg (instance, msg, msg_len, 1);
939 
940  return (res);
941 }
942 
943 extern int totemudpu_iface_check (void *udpu_context)
944 {
945  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
946  int res = 0;
947 
948  timer_function_netif_check_timeout (instance);
949 
950  return (res);
951 }
952 
953 extern void totemudpu_net_mtu_adjust (void *udpu_context, struct totem_config *totem_config)
954 {
955 
956  assert(totem_config->interface_count > 0);
957 
958  totem_config->net_mtu -= crypto_sec_header_size(totem_config->crypto_cipher_type,
959  totem_config->crypto_hash_type) +
961 }
962 
963 const char *totemudpu_iface_print (void *udpu_context) {
964  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
965  const char *ret_char;
966 
967  ret_char = totemip_print (&instance->my_id);
968 
969  return (ret_char);
970 }
971 
973  void *udpu_context,
974  struct totem_ip_address *addr)
975 {
976  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
977  int res = 0;
978 
979  memcpy (addr, &instance->my_id, sizeof (struct totem_ip_address));
980 
981  return (res);
982 }
983 
985  void *udpu_context,
986  const struct totem_ip_address *token_target)
987 {
988  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
989  int res = 0;
990 
991  memcpy (&instance->token_target, token_target,
992  sizeof (struct totem_ip_address));
993 
994  instance->totemudpu_target_set_completed (instance->context);
995 
996  return (res);
997 }
998 
1000  void *udpu_context)
1001 {
1002  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1003  unsigned int res;
1004  struct sockaddr_storage system_from;
1005  struct msghdr msg_recv;
1006  struct pollfd ufd;
1007  int nfds;
1008  int msg_processed = 0;
1009 
1010  /*
1011  * Receive datagram
1012  */
1013  msg_recv.msg_name = &system_from;
1014  msg_recv.msg_namelen = sizeof (struct sockaddr_storage);
1015  msg_recv.msg_iov = &instance->totemudpu_iov_recv;
1016  msg_recv.msg_iovlen = 1;
1017 #ifdef HAVE_MSGHDR_CONTROL
1018  msg_recv.msg_control = 0;
1019 #endif
1020 #ifdef HAVE_MSGHDR_CONTROLLEN
1021  msg_recv.msg_controllen = 0;
1022 #endif
1023 #ifdef HAVE_MSGHDR_FLAGS
1024  msg_recv.msg_flags = 0;
1025 #endif
1026 #ifdef HAVE_MSGHDR_ACCRIGHTS
1027  msg_recv.msg_accrights = NULL;
1028 #endif
1029 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN
1030  msg_recv.msg_accrightslen = 0;
1031 #endif
1032 
1033  do {
1034  ufd.fd = instance->token_socket;
1035  ufd.events = POLLIN;
1036  nfds = poll (&ufd, 1, 0);
1037  if (nfds == 1 && ufd.revents & POLLIN) {
1038  res = recvmsg (instance->token_socket, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
1039  if (res != -1) {
1040  msg_processed = 1;
1041  } else {
1042  msg_processed = -1;
1043  }
1044  }
1045  } while (nfds == 1);
1046 
1047  return (msg_processed);
1048 }
1049 
1050 static int totemudpu_create_sending_socket(
1051  void *udpu_context,
1052  const struct totem_ip_address *member)
1053 {
1054  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1055  int fd;
1056  int res;
1057  unsigned int sendbuf_size;
1058  unsigned int optlen = sizeof (sendbuf_size);
1059  struct sockaddr_storage sockaddr;
1060  int addrlen;
1061 
1062  fd = socket (member->family, SOCK_DGRAM, 0);
1063  if (fd == -1) {
1064  LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
1065  "Could not create socket for new member");
1066  return (-1);
1067  }
1068  totemip_nosigpipe (fd);
1069  res = fcntl (fd, F_SETFL, O_NONBLOCK);
1070  if (res == -1) {
1071  LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
1072  "Could not set non-blocking operation on token socket");
1073  goto error_close_fd;
1074  }
1075 
1076  /*
1077  * These sockets are used to send multicast messages, so their buffers
1078  * should be large
1079  */
1080  sendbuf_size = MCAST_SOCKET_BUFFER_SIZE;
1081  res = setsockopt (fd, SOL_SOCKET, SO_SNDBUF,
1082  &sendbuf_size, optlen);
1083  if (res == -1) {
1084  LOGSYS_PERROR (errno, instance->totemudpu_log_level_notice,
1085  "Could not set sendbuf size");
1086  /*
1087  * Fail in setting sendbuf size is not fatal -> don't exit
1088  */
1089  }
1090 
1091  /*
1092  * Bind to sending interface
1093  */
1094  totemip_totemip_to_sockaddr_convert(&instance->my_id, 0, &sockaddr, &addrlen);
1095  res = bind (fd, (struct sockaddr *)&sockaddr, addrlen);
1096  if (res == -1) {
1097  LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning,
1098  "bind token socket failed");
1099  goto error_close_fd;
1100  }
1101 
1102  return (fd);
1103 
1104 error_close_fd:
1105  close(fd);
1106  return (-1);
1107 }
1108 
1110  void *udpu_context,
1111  const struct totem_ip_address *member)
1112 {
1113  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1114 
1115  struct totemudpu_member *new_member;
1116 
1117  new_member = malloc (sizeof (struct totemudpu_member));
1118  if (new_member == NULL) {
1119  return (-1);
1120  }
1121 
1122  memset(new_member, 0, sizeof(*new_member));
1123 
1124  log_printf (LOGSYS_LEVEL_NOTICE, "adding new UDPU member {%s}",
1125  totemip_print(member));
1126  list_init (&new_member->list);
1127  list_add_tail (&new_member->list, &instance->member_list);
1128  memcpy (&new_member->member, member, sizeof (struct totem_ip_address));
1129  new_member->fd = totemudpu_create_sending_socket(udpu_context, member);
1130  new_member->active = 0;
1131 
1132  return (0);
1133 }
1134 
1136  void *udpu_context,
1137  const struct totem_ip_address *token_target)
1138 {
1139  int found = 0;
1140  struct list_head *list;
1141  struct totemudpu_member *member;
1142 
1143  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1144 
1145  /*
1146  * Find the member to remove and close its socket
1147  */
1148  for (list = instance->member_list.next;
1149  list != &instance->member_list;
1150  list = list->next) {
1151 
1152  member = list_entry (list,
1153  struct totemudpu_member,
1154  list);
1155 
1156  if (totemip_compare (token_target, &member->member)==0) {
1158  "removing UDPU member {%s}",
1159  totemip_print(&member->member));
1160 
1161  if (member->fd > 0) {
1163  "Closing socket to: {%s}",
1164  totemip_print(&member->member));
1165  qb_loop_poll_del (instance->totemudpu_poll_handle,
1166  member->fd);
1167  close (member->fd);
1168  }
1169  found = 1;
1170  break;
1171  }
1172  }
1173 
1174  /*
1175  * Delete the member from the list
1176  */
1177  if (found) {
1178  list_del (list);
1179  }
1180 
1181  instance = NULL;
1182  return (0);
1183 }
1184 
1186  void *udpu_context)
1187 {
1188  struct list_head *list;
1189  struct totemudpu_member *member;
1190 
1191  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1192 
1193  for (list = instance->member_list.next;
1194  list != &instance->member_list;
1195  list = list->next) {
1196 
1197  member = list_entry (list,
1198  struct totemudpu_member,
1199  list);
1200 
1201  if (member->fd > 0) {
1202  close (member->fd);
1203  }
1204 
1205  member->fd = totemudpu_create_sending_socket(udpu_context, &member->member);
1206  }
1207 
1208  return (0);
1209 }
1210 
1212  void *udpu_context,
1213  const struct totem_ip_address *member_ip,
1214  int active)
1215 {
1216  struct list_head *list;
1217  struct totemudpu_member *member;
1218  int addr_found = 0;
1219 
1220  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1221 
1222  /*
1223  * Find the member to set active flag
1224  */
1225  for (list = instance->member_list.next; list != &instance->member_list; list = list->next) {
1226  member = list_entry (list, struct totemudpu_member, list);
1227 
1228  if (totemip_compare (member_ip, &member->member) == 0) {
1230  "Marking UDPU member %s %s",
1231  totemip_print(&member->member),
1232  (active ? "active" : "inactive"));
1233 
1234  member->active = active;
1235  addr_found = 1;
1236 
1237  break;
1238  }
1239  }
1240 
1241  if (!addr_found) {
1243  "Can't find UDPU member %s (should be marked as %s)",
1244  totemip_print(member_ip),
1245  (active ? "active" : "inactive"));
1246  }
1247 
1248  return (0);
1249 }
1250 
1251 static void timer_function_merge_detect_timeout (
1252  void *data)
1253 {
1254  struct totemudpu_instance *instance = (struct totemudpu_instance *)data;
1255 
1256  if (instance->merge_detect_messages_sent_before_timeout == 0) {
1257  instance->send_merge_detect_message = 1;
1258  }
1259 
1261 
1262  totemudpu_start_merge_detect_timeout(instance);
1263 }
1264 
1265 static void totemudpu_start_merge_detect_timeout(
1266  void *udpu_context)
1267 {
1268  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1269 
1270  qb_loop_timer_add(instance->totemudpu_poll_handle,
1271  QB_LOOP_MED,
1272  instance->totem_config->merge_timeout * 2 * QB_TIME_NS_IN_MSEC,
1273  (void *)instance,
1274  timer_function_merge_detect_timeout,
1275  &instance->timer_merge_detect_timeout);
1276 
1277 }
1278 
1279 static void totemudpu_stop_merge_detect_timeout(
1280  void *udpu_context)
1281 {
1282  struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1283 
1284  qb_loop_timer_del(instance->totemudpu_poll_handle,
1285  instance->timer_merge_detect_timeout);
1286 }
unsigned int clear_node_high_bit
Definition: totem.h:117
unsigned short family
Definition: coroapi.h:113
#define NETIF_STATE_REPORT_UP
Definition: totemudpu.c:84
struct totem_config * totem_config
Definition: totemudpu.c:174
struct totem_ip_address member
Definition: totemudpu.c:93
int totemip_localhost(int family, struct totem_ip_address *localhost)
Definition: totemip.c:182
void(* totemudpu_iface_change_fn)(void *context, const struct totem_ip_address *iface_address)
Definition: totemudpu.c:116
unsigned int my_memb_entries
Definition: totemudpu.c:172
struct totem_interface * interfaces
Definition: totem.h:114
unsigned int interface_count
Definition: totem.h:115
struct list_head * next
Definition: list.h:47
size_t crypto_sec_header_size(const char *crypto_cipher_type, const char *crypto_hash_type)
Definition: totemcrypto.c:662
The totem_ip_address struct.
Definition: coroapi.h:111
int totemudpu_token_target_set(void *udpu_context, const struct totem_ip_address *token_target)
Definition: totemudpu.c:984
const char * totemip_print(const struct totem_ip_address *addr)
Definition: totemip.c:214
struct totem_ip_address my_id
Definition: totemudpu.c:166
struct totemudpu_instance * instance
Definition: totemudpu.c:192
#define NETIF_STATE_REPORT_DOWN
Definition: totemudpu.c:85
int totemip_compare(const void *a, const void *b)
Definition: totemip.c:130
#define log_printf(level, format, args...)
Definition: totemudpu.c:233
unsigned char private_key[TOTEM_PRIVATE_KEY_LEN]
Definition: totem.h:122
int totemudpu_processor_count_set(void *udpu_context, int processor_count)
Definition: totemudpu.c:869
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:77
int totemudpu_log_level_security
Definition: totemudpu.c:125
struct crypto_instance * crypto_init(const unsigned char *private_key, unsigned int private_key_len, const char *crypto_cipher_type, const char *crypto_hash_type, void(*log_printf_func)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf, 6, 7))), int log_level_security, int log_level_notice, int log_level_error, int log_subsys_id)
Definition: totemcrypto.c:786
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:95
unsigned int downcheck_timeout
Definition: totem.h:145
unsigned int private_key_len
Definition: totem.h:124
char iov_buffer[FRAME_SIZE_MAX]
Definition: totemudpu.c:148
qb_loop_timer_handle timer_merge_detect_timeout
Definition: totemudpu.c:182
Definition: list.h:46
int send_merge_detect_message
Definition: totemudpu.c:184
#define totemip_nosigpipe(s)
Definition: totemip.h:56
int totemudpu_log_level_warning
Definition: totemudpu.c:129
int totemudpu_log_level_debug
Definition: totemudpu.c:133
const char * totemudpu_iface_print(void *udpu_context)
Definition: totemudpu.c:963
struct iovec totemudpu_iov_recv
Definition: totemudpu.c:150
unsigned int node_id
Definition: totem.h:116
#define BIND_STATE_REGULAR
Definition: totemudpu.c:88
int totemip_iface_check(struct totem_ip_address *bindnet, struct totem_ip_address *boundto, int *interface_up, int *interface_num, int mask_high_bit)
Definition: totemip.c:405
int crypto_encrypt_and_sign(struct crypto_instance *instance, const unsigned char *buf_in, const size_t buf_in_len, unsigned char *buf_out, size_t *buf_out_len)
Definition: totemcrypto.c:711
void(*) void udpu_context)
Definition: totemudpu.c:144
void totemudpu_buffer_release(void *ptr)
Definition: totemudpu.c:864
void * totemudpu_buffer_alloc(void)
Definition: totemudpu.c:859
unsigned int merge_detect_messages_sent_before_timeout
Definition: totemudpu.c:186
qb_loop_t * totemudpu_poll_handle
Definition: totemudpu.c:101
int totemudpu_mcast_noflush_send(void *udpu_context, const void *msg, unsigned int msg_len)
Definition: totemudpu.c:930
unsigned int nodeid
Definition: coroapi.h:112
void(* totemudpu_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition: totemudpu.c:137
char * crypto_hash_type
Definition: totem.h:183
totemsrp_stats_t * stats
Definition: totemudpu.c:176
Linked list API.
struct totem_ip_address token_target
Definition: totemudpu.c:178
int totemudpu_crypto_set(void *udpu_context, const char *cipher_type, const char *hash_type)
Definition: totemudpu.c:250
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:75
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:74
void(* totemudpu_target_set_completed)(void *context)
Definition: totemudpu.c:120
struct totem_interface * totem_interface
Definition: totemudpu.c:103
int totemudpu_token_send(void *udpu_context, const void *msg, unsigned int msg_len)
Definition: totemudpu.c:905
struct totem_ip_address boundto
Definition: totem.h:66
typedef __attribute__
size_t totemip_udpip_header_size(int family)
Definition: totemip.c:496
struct list_head member_list
Definition: totemudpu.c:152
uint16_t ip_port
Definition: totem.h:68
struct timeval stats_tv_start
Definition: totemudpu.c:164
qb_loop_timer_handle timer_netif_check_timeout
Definition: totemudpu.c:170
#define BIND_STATE_LOOPBACK
Definition: totemudpu.c:89
unsigned int net_mtu
Definition: totem.h:165
#define MCAST_SOCKET_BUFFER_SIZE
Definition: totemudpu.c:83
int crypto_authenticate_and_decrypt(struct crypto_instance *instance, unsigned char *buf, int *buf_len)
Definition: totemcrypto.c:733
int totemudpu_member_remove(void *udpu_context, const struct totem_ip_address *token_target)
Definition: totemudpu.c:1135
int totemudpu_member_set_active(void *udpu_context, const struct totem_ip_address *member_ip, int active)
Definition: totemudpu.c:1211
void(* totemudpu_deliver_fn)(void *context, const void *msg, unsigned int msg_len)
Definition: totemudpu.c:111
#define FRAME_SIZE_MAX
Definition: totem.h:50
#define LOGSYS_LEVEL_CRIT
Definition: logsys.h:69
const void * msg
Definition: totemudp.c:198
#define list_entry(ptr, type, member)
Definition: list.h:84
int totemip_totemip_to_sockaddr_convert(struct totem_ip_address *ip_addr, uint16_t port, struct sockaddr_storage *saddr, int *addrlen)
Definition: totemip.c:222
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:163
#define LOGSYS_LEVEL_NOTICE
Definition: logsys.h:72
int totemudpu_recv_mcast_empty(void *udpu_context)
Definition: totemudpu.c:999
int totemudpu_initialize(qb_loop_t *poll_handle, void **udpu_context, struct totem_config *totem_config, totemsrp_stats_t *stats, int interface_no, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address), void(*target_set_completed)(void *context))
Create an instance.
Definition: totemudpu.c:763
struct srp_addr system_from
Definition: totemsrp.c:61
char * crypto_cipher_type
Definition: totem.h:181
int totemudpu_log_level_error
Definition: totemudpu.c:127
unsigned int merge_timeout
Definition: totem.h:143
void totemudpu_net_mtu_adjust(void *udpu_context, struct totem_config *totem_config)
Definition: totemudpu.c:953
struct totem_ip_address bindnet
Definition: totem.h:65
#define MSG_NOSIGNAL
Definition: totemudpu.c:80
int totemudpu_mcast_flush_send(void *udpu_context, const void *msg, unsigned int msg_len)
Definition: totemudpu.c:917
int totemudpu_send_flush(void *udpu_context)
Definition: totemudpu.c:898
int totemudpu_finalize(void *udpu_context)
Definition: totemudpu.c:421
struct crypto_instance * crypto_inst
Definition: totemudpu.c:99
int totemudpu_recv_flush(void *udpu_context)
Definition: totemudpu.c:891
unsigned int msg_len
Definition: totemudp.c:199
int totemudpu_member_add(void *udpu_context, const struct totem_ip_address *member)
Definition: totemudpu.c:1109
struct list_head list
Definition: totemudpu.c:92
#define LOGSYS_PERROR(err_num, level, fmt, args...)
Definition: totemudpu.c:240
int totemudpu_iface_get(void *udpu_context, struct totem_ip_address *addr)
Definition: totemudpu.c:972
int totemudpu_member_list_rebind_ip(void *udpu_context)
Definition: totemudpu.c:1185
int totemudpu_iface_check(void *udpu_context)
Definition: totemudpu.c:943
int totemudpu_log_level_notice
Definition: totemudpu.c:131