corosync  2.4.6
totempg.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2003-2005 MontaVista Software, Inc.
3  * Copyright (c) 2005 OSDL.
4  * Copyright (c) 2006-2012 Red Hat, Inc.
5  *
6  * All rights reserved.
7  *
8  * Author: Steven Dake (sdake@redhat.com)
9  * Author: Mark Haverkamp (markh@osdl.org)
10  *
11  * This software licensed under BSD license, the text of which follows:
12  *
13  * Redistribution and use in source and binary forms, with or without
14  * modification, are permitted provided that the following conditions are met:
15  *
16  * - Redistributions of source code must retain the above copyright notice,
17  * this list of conditions and the following disclaimer.
18  * - Redistributions in binary form must reproduce the above copyright notice,
19  * this list of conditions and the following disclaimer in the documentation
20  * and/or other materials provided with the distribution.
21  * - Neither the name of the MontaVista Software, Inc. nor the names of its
22  * contributors may be used to endorse or promote products derived from this
23  * software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35  * THE POSSIBILITY OF SUCH DAMAGE.
36  */
37 
38 /*
39  * FRAGMENTATION AND PACKING ALGORITHM:
40  *
41  * Assemble the entire message into one buffer
42  * if full fragment
43  * store fragment into lengths list
44  * for each full fragment
45  * multicast fragment
46  * set length and fragment fields of pg mesage
47  * store remaining multicast into head of fragmentation data and set lens field
48  *
49  * If a message exceeds the maximum packet size allowed by the totem
50  * single ring protocol, the protocol could lose forward progress.
51  * Statically calculating the allowed data amount doesn't work because
52  * the amount of data allowed depends on the number of fragments in
53  * each message. In this implementation, the maximum fragment size
54  * is dynamically calculated for each fragment added to the message.
55 
56  * It is possible for a message to be two bytes short of the maximum
57  * packet size. This occurs when a message or collection of
58  * messages + the mcast header + the lens are two bytes short of the
59  * end of the packet. Since another len field consumes two bytes, the
60  * len field would consume the rest of the packet without room for data.
61  *
62  * One optimization would be to forgo the final len field and determine
63  * it from the size of the udp datagram. Then this condition would no
64  * longer occur.
65  */
66 
67 /*
68  * ASSEMBLY AND UNPACKING ALGORITHM:
69  *
70  * copy incoming packet into assembly data buffer indexed by current
71  * location of end of fragment
72  *
73  * if not fragmented
74  * deliver all messages in assembly data buffer
75  * else
76  * if msg_count > 1 and fragmented
77  * deliver all messages except last message in assembly data buffer
78  * copy last fragmented section to start of assembly data buffer
79  * else
80  * if msg_count = 1 and fragmented
81  * do nothing
82  *
83  */
84 
85 #include <config.h>
86 
87 #ifdef HAVE_ALLOCA_H
88 #include <alloca.h>
89 #endif
90 #include <sys/types.h>
91 #include <sys/socket.h>
92 #include <netinet/in.h>
93 #include <arpa/inet.h>
94 #include <sys/uio.h>
95 #include <stdio.h>
96 #include <stdlib.h>
97 #include <string.h>
98 #include <assert.h>
99 #include <pthread.h>
100 #include <errno.h>
101 #include <limits.h>
102 
103 #include <corosync/swab.h>
104 #include <corosync/list.h>
105 #include <qb/qbloop.h>
106 #include <qb/qbipcs.h>
107 #include <corosync/totem/totempg.h>
108 #define LOGSYS_UTILS_ONLY 1
109 #include <corosync/logsys.h>
110 
111 #include "totemmrp.h"
112 #include "totemsrp.h"
113 
114 #define min(a,b) ((a) < (b)) ? a : b
115 
117  short version;
118  short type;
119 };
120 
121 #if !(defined(__i386__) || defined(__x86_64__))
122 /*
123  * Need align on architectures different then i386 or x86_64
124  */
125 #define TOTEMPG_NEED_ALIGN 1
126 #endif
127 
128 /*
129  * totempg_mcast structure
130  *
131  * header: Identify the mcast.
132  * fragmented: Set if this message continues into next message
133  * continuation: Set if this message is a continuation from last message
134  * msg_count Indicates how many packed messages are contained
135  * in the mcast.
136  * Also, the size of each packed message and the messages themselves are
137  * appended to the end of this structure when sent.
138  */
141  unsigned char fragmented;
142  unsigned char continuation;
143  unsigned short msg_count;
144  /*
145  * short msg_len[msg_count];
146  */
147  /*
148  * data for messages
149  */
150 };
151 
152 /*
153  * Maximum packet size for totem pg messages
154  */
155 #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
156  sizeof (struct totempg_mcast))
157 
158 /*
159  * Local variables used for packing small messages
160  */
161 static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
162 
163 static int mcast_packed_msg_count = 0;
164 
165 static int totempg_reserved = 1;
166 
167 static unsigned int totempg_size_limit;
168 
169 static totem_queue_level_changed_fn totem_queue_level_changed = NULL;
170 
171 static uint32_t totempg_threaded_mode = 0;
172 
173 /*
174  * Function and data used to log messages
175  */
176 static int totempg_log_level_security;
177 static int totempg_log_level_error;
178 static int totempg_log_level_warning;
179 static int totempg_log_level_notice;
180 static int totempg_log_level_debug;
181 static int totempg_subsys_id;
182 static void (*totempg_log_printf) (
183  int level,
184  int subsys,
185  const char *function,
186  const char *file,
187  int line,
188  const char *format, ...) __attribute__((format(printf, 6, 7)));
189 
191 
192 static totempg_stats_t totempg_stats;
193 
197 };
198 
199 struct assembly {
200  unsigned int nodeid;
201  unsigned char data[MESSAGE_SIZE_MAX];
202  int index;
203  unsigned char last_frag_num;
205  struct list_head list;
206 };
207 
208 static void assembly_deref (struct assembly *assembly);
209 
210 static int callback_token_received_fn (enum totem_callback_token_type type,
211  const void *data);
212 
213 DECLARE_LIST_INIT(assembly_list_inuse);
214 
215 /*
216  * Free list is used both for transitional and operational assemblies
217  */
218 DECLARE_LIST_INIT(assembly_list_free);
219 
220 DECLARE_LIST_INIT(assembly_list_inuse_trans);
221 
222 DECLARE_LIST_INIT(totempg_groups_list);
223 
224 /*
225  * Staging buffer for packed messages. Messages are staged in this buffer
226  * before sending. Multiple messages may fit which cuts down on the
227  * number of mcasts sent. If a message doesn't completely fit, then
228  * the mcast header has a fragment bit set that says that there are more
229  * data to follow. fragment_size is an index into the buffer. It indicates
230  * the size of message data and where to place new message data.
231  * fragment_contuation indicates whether the first packed message in
232  * the buffer is a continuation of a previously packed fragment.
233  */
234 static unsigned char *fragmentation_data;
235 
236 static int fragment_size = 0;
237 
238 static int fragment_continuation = 0;
239 
240 static int totempg_waiting_transack = 0;
241 
243  void (*deliver_fn) (
244  unsigned int nodeid,
245  const void *msg,
246  unsigned int msg_len,
247  int endian_conversion_required);
248 
249  void (*confchg_fn) (
250  enum totem_configuration_type configuration_type,
251  const unsigned int *member_list, size_t member_list_entries,
252  const unsigned int *left_list, size_t left_list_entries,
253  const unsigned int *joined_list, size_t joined_list_entries,
254  const struct memb_ring_id *ring_id);
255 
257 
259  int32_t q_level;
260 
261  struct list_head list;
262 };
263 
264 static unsigned char next_fragment = 1;
265 
266 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
267 
268 static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
269 
270 static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
271 
272 #define log_printf(level, format, args...) \
273 do { \
274  totempg_log_printf(level, \
275  totempg_subsys_id, \
276  __FUNCTION__, __FILE__, __LINE__, \
277  format, ##args); \
278 } while (0);
279 
280 static int msg_count_send_ok (int msg_count);
281 
282 static int byte_count_send_ok (int byte_count);
283 
284 static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
285 {
286  log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
287  totempg_waiting_transack = waiting_trans_ack;
288 }
289 
290 static struct assembly *assembly_ref (unsigned int nodeid)
291 {
292  struct assembly *assembly;
293  struct list_head *list;
294  struct list_head *active_assembly_list_inuse;
295 
296  if (totempg_waiting_transack) {
297  active_assembly_list_inuse = &assembly_list_inuse_trans;
298  } else {
299  active_assembly_list_inuse = &assembly_list_inuse;
300  }
301 
302  /*
303  * Search inuse list for node id and return assembly buffer if found
304  */
305  for (list = active_assembly_list_inuse->next;
306  list != active_assembly_list_inuse;
307  list = list->next) {
308 
309  assembly = list_entry (list, struct assembly, list);
310 
311  if (nodeid == assembly->nodeid) {
312  return (assembly);
313  }
314  }
315 
316  /*
317  * Nothing found in inuse list get one from free list if available
318  */
319  if (list_empty (&assembly_list_free) == 0) {
320  assembly = list_entry (assembly_list_free.next, struct assembly, list);
321  list_del (&assembly->list);
322  list_add (&assembly->list, active_assembly_list_inuse);
323  assembly->nodeid = nodeid;
324  assembly->index = 0;
325  assembly->last_frag_num = 0;
327  return (assembly);
328  }
329 
330  /*
331  * Nothing available in inuse or free list, so allocate a new one
332  */
333  assembly = malloc (sizeof (struct assembly));
334  /*
335  * TODO handle memory allocation failure here
336  */
337  assert (assembly);
338  assembly->nodeid = nodeid;
339  assembly->data[0] = 0;
340  assembly->index = 0;
341  assembly->last_frag_num = 0;
343  list_init (&assembly->list);
344  list_add (&assembly->list, active_assembly_list_inuse);
345 
346  return (assembly);
347 }
348 
349 static void assembly_deref (struct assembly *assembly)
350 {
351 
352  list_del (&assembly->list);
353  list_add (&assembly->list, &assembly_list_free);
354 }
355 
356 static void assembly_deref_from_normal_and_trans (int nodeid)
357 {
358  int j;
359  struct list_head *list, *list_next;
360  struct list_head *active_assembly_list_inuse;
361  struct assembly *assembly;
362 
363  for (j = 0; j < 2; j++) {
364  if (j == 0) {
365  active_assembly_list_inuse = &assembly_list_inuse;
366  } else {
367  active_assembly_list_inuse = &assembly_list_inuse_trans;
368  }
369 
370  for (list = active_assembly_list_inuse->next;
371  list != active_assembly_list_inuse;
372  list = list_next) {
373 
374  list_next = list->next;
375  assembly = list_entry (list, struct assembly, list);
376 
377  if (nodeid == assembly->nodeid) {
378  list_del (&assembly->list);
379  list_add (&assembly->list, &assembly_list_free);
380  }
381  }
382  }
383 
384 }
385 
386 static inline void app_confchg_fn (
387  enum totem_configuration_type configuration_type,
388  const unsigned int *member_list, size_t member_list_entries,
389  const unsigned int *left_list, size_t left_list_entries,
390  const unsigned int *joined_list, size_t joined_list_entries,
391  const struct memb_ring_id *ring_id)
392 {
393  int i;
394  struct totempg_group_instance *instance;
395  struct list_head *list;
396 
397  /*
398  * For every leaving processor, add to free list
399  * This also has the side effect of clearing out the dataset
400  * In the leaving processor's assembly buffer.
401  */
402  for (i = 0; i < left_list_entries; i++) {
403  assembly_deref_from_normal_and_trans (left_list[i]);
404  }
405 
406  for (list = totempg_groups_list.next;
407  list != &totempg_groups_list;
408  list = list->next) {
409 
410  instance = list_entry (list, struct totempg_group_instance, list);
411 
412  if (instance->confchg_fn) {
413  instance->confchg_fn (
414  configuration_type,
415  member_list,
416  member_list_entries,
417  left_list,
418  left_list_entries,
419  joined_list,
420  joined_list_entries,
421  ring_id);
422  }
423  }
424 }
425 
426 static inline void group_endian_convert (
427  void *msg,
428  int msg_len)
429 {
430  unsigned short *group_len;
431  int i;
432  char *aligned_msg;
433 
434 #ifdef TOTEMPG_NEED_ALIGN
435  /*
436  * Align data structure for not i386 or x86_64
437  */
438  if ((size_t)msg % 4 != 0) {
439  aligned_msg = alloca(msg_len);
440  memcpy(aligned_msg, msg, msg_len);
441  } else {
442  aligned_msg = msg;
443  }
444 #else
445  aligned_msg = msg;
446 #endif
447 
448  group_len = (unsigned short *)aligned_msg;
449  group_len[0] = swab16(group_len[0]);
450  for (i = 1; i < group_len[0] + 1; i++) {
451  group_len[i] = swab16(group_len[i]);
452  }
453 
454  if (aligned_msg != msg) {
455  memcpy(msg, aligned_msg, msg_len);
456  }
457 }
458 
459 static inline int group_matches (
460  struct iovec *iovec,
461  unsigned int iov_len,
462  struct totempg_group *groups_b,
463  unsigned int group_b_cnt,
464  unsigned int *adjust_iovec)
465 {
466  unsigned short *group_len;
467  char *group_name;
468  int i;
469  int j;
470 #ifdef TOTEMPG_NEED_ALIGN
471  struct iovec iovec_aligned = { NULL, 0 };
472 #endif
473 
474  assert (iov_len == 1);
475 
476 #ifdef TOTEMPG_NEED_ALIGN
477  /*
478  * Align data structure for not i386 or x86_64
479  */
480  if ((size_t)iovec->iov_base % 4 != 0) {
481  iovec_aligned.iov_base = alloca(iovec->iov_len);
482  memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
483  iovec_aligned.iov_len = iovec->iov_len;
484  iovec = &iovec_aligned;
485  }
486 #endif
487 
488  group_len = (unsigned short *)iovec->iov_base;
489  group_name = ((char *)iovec->iov_base) +
490  sizeof (unsigned short) * (group_len[0] + 1);
491 
492 
493  /*
494  * Calculate amount to adjust the iovec by before delivering to app
495  */
496  *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
497  for (i = 1; i < group_len[0] + 1; i++) {
498  *adjust_iovec += group_len[i];
499  }
500 
501  /*
502  * Determine if this message should be delivered to this instance
503  */
504  for (i = 1; i < group_len[0] + 1; i++) {
505  for (j = 0; j < group_b_cnt; j++) {
506  if ((group_len[i] == groups_b[j].group_len) &&
507  (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
508  return (1);
509  }
510  }
511  group_name += group_len[i];
512  }
513  return (0);
514 }
515 
516 
517 static inline void app_deliver_fn (
518  unsigned int nodeid,
519  void *msg,
520  unsigned int msg_len,
521  int endian_conversion_required)
522 {
523  struct totempg_group_instance *instance;
524  struct iovec stripped_iovec;
525  unsigned int adjust_iovec;
526  struct iovec *iovec;
527  struct list_head *list;
528 
529  struct iovec aligned_iovec = { NULL, 0 };
530 
531  if (endian_conversion_required) {
532  group_endian_convert (msg, msg_len);
533  }
534 
535  /*
536  * TODO: segmentation/assembly need to be redesigned to provide aligned access
537  * in all cases to avoid memory copies on non386 archs. Probably broke backwars
538  * compatibility
539  */
540 
541 #ifdef TOTEMPG_NEED_ALIGN
542  /*
543  * Align data structure for not i386 or x86_64
544  */
545  aligned_iovec.iov_base = alloca(msg_len);
546  aligned_iovec.iov_len = msg_len;
547  memcpy(aligned_iovec.iov_base, msg, msg_len);
548 #else
549  aligned_iovec.iov_base = msg;
550  aligned_iovec.iov_len = msg_len;
551 #endif
552 
553  iovec = &aligned_iovec;
554 
555  for (list = totempg_groups_list.next;
556  list != &totempg_groups_list;
557  list = list->next) {
558 
559  instance = list_entry (list, struct totempg_group_instance, list);
560  if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) {
561  stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
562  stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
563 
564 #ifdef TOTEMPG_NEED_ALIGN
565  /*
566  * Align data structure for not i386 or x86_64
567  */
568  if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) {
569  /*
570  * Deal with misalignment
571  */
572  stripped_iovec.iov_base =
573  alloca (stripped_iovec.iov_len);
574  memcpy (stripped_iovec.iov_base,
575  (char *)iovec->iov_base + adjust_iovec,
576  stripped_iovec.iov_len);
577  }
578 #endif
579  instance->deliver_fn (
580  nodeid,
581  stripped_iovec.iov_base,
582  stripped_iovec.iov_len,
583  endian_conversion_required);
584  }
585  }
586 }
587 
588 static void totempg_confchg_fn (
589  enum totem_configuration_type configuration_type,
590  const unsigned int *member_list, size_t member_list_entries,
591  const unsigned int *left_list, size_t left_list_entries,
592  const unsigned int *joined_list, size_t joined_list_entries,
593  const struct memb_ring_id *ring_id)
594 {
595 // TODO optimize this
596  app_confchg_fn (configuration_type,
597  member_list, member_list_entries,
598  left_list, left_list_entries,
599  joined_list, joined_list_entries,
600  ring_id);
601 }
602 
603 static void totempg_deliver_fn (
604  unsigned int nodeid,
605  const void *msg,
606  unsigned int msg_len,
607  int endian_conversion_required)
608 {
609  struct totempg_mcast *mcast;
610  unsigned short *msg_lens;
611  int i;
612  struct assembly *assembly;
613  char header[FRAME_SIZE_MAX];
614  int msg_count;
615  int continuation;
616  int start;
617  const char *data;
618  int datasize;
619  struct iovec iov_delv;
620  size_t expected_msg_len;
621 
622  assembly = assembly_ref (nodeid);
623  assert (assembly);
624 
625  if (msg_len < sizeof(struct totempg_mcast)) {
626  log_printf(LOG_WARNING,
627  "Message (totempg_mcast) received from node %u is too short... Ignoring.", nodeid);
628 
629  return ;
630  }
631 
632  /*
633  * Assemble the header into one block of data and
634  * assemble the packet contents into one block of data to simplify delivery
635  */
636 
637  mcast = (struct totempg_mcast *)msg;
638  if (endian_conversion_required) {
639  mcast->msg_count = swab16 (mcast->msg_count);
640  }
641 
642  msg_count = mcast->msg_count;
643  datasize = sizeof (struct totempg_mcast) +
644  msg_count * sizeof (unsigned short);
645 
646  if (msg_len < datasize) {
647  log_printf(LOG_WARNING,
648  "Message (totempg_mcast datasize) received from node %u"
649  " is too short... Ignoring.", nodeid);
650 
651  return ;
652  }
653 
654  memcpy (header, msg, datasize);
655  data = msg;
656 
657  msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
658  expected_msg_len = datasize;
659  for (i = 0; i < mcast->msg_count; i++) {
660  if (endian_conversion_required) {
661  msg_lens[i] = swab16 (msg_lens[i]);
662  }
663 
664  expected_msg_len += msg_lens[i];
665  }
666 
667  if (msg_len != expected_msg_len) {
668  log_printf(LOG_WARNING,
669  "Message (totempg_mcast) received from node %u"
670  " doesn't have expected length of %zu (has %u) bytes... Ignoring.",
671  nodeid, expected_msg_len, msg_len);
672 
673  return ;
674  }
675 
676  memcpy (&assembly->data[assembly->index], &data[datasize],
677  msg_len - datasize);
678 
679  /*
680  * If the last message in the buffer is a fragment, then we
681  * can't deliver it. We'll first deliver the full messages
682  * then adjust the assembly buffer so we can add the rest of the
683  * fragment when it arrives.
684  */
685  msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
686  continuation = mcast->continuation;
687  iov_delv.iov_base = (void *)&assembly->data[0];
688  iov_delv.iov_len = assembly->index + msg_lens[0];
689 
690  /*
691  * Make sure that if this message is a continuation, that it
692  * matches the sequence number of the previous fragment.
693  * Also, if the first packed message is a continuation
694  * of a previous message, but the assembly buffer
695  * is empty, then we need to discard it since we can't
696  * assemble a complete message. Likewise, if this message isn't a
697  * continuation and the assembly buffer is empty, we have to discard
698  * the continued message.
699  */
700  start = 0;
701 
702  if (assembly->throw_away_mode == THROW_AWAY_ACTIVE) {
703  /* Throw away the first msg block */
704  if (mcast->fragmented == 0 || mcast->fragmented == 1) {
706 
707  assembly->index += msg_lens[0];
708  iov_delv.iov_base = (void *)&assembly->data[assembly->index];
709  iov_delv.iov_len = msg_lens[1];
710  start = 1;
711  }
712  } else
713  if (assembly->throw_away_mode == THROW_AWAY_INACTIVE) {
714  if (continuation == assembly->last_frag_num) {
715  assembly->last_frag_num = mcast->fragmented;
716  for (i = start; i < msg_count; i++) {
717  app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
718  endian_conversion_required);
719  assembly->index += msg_lens[i];
720  iov_delv.iov_base = (void *)&assembly->data[assembly->index];
721  if (i < (msg_count - 1)) {
722  iov_delv.iov_len = msg_lens[i + 1];
723  }
724  }
725  } else {
726  log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u",
727  continuation, assembly->last_frag_num);
729  }
730  }
731 
732  if (mcast->fragmented == 0) {
733  /*
734  * End of messages, dereference assembly struct
735  */
736  assembly->last_frag_num = 0;
737  assembly->index = 0;
738  assembly_deref (assembly);
739  } else {
740  /*
741  * Message is fragmented, keep around assembly list
742  */
743  if (mcast->msg_count > 1) {
744  memmove (&assembly->data[0],
745  &assembly->data[assembly->index],
746  msg_lens[msg_count]);
747 
748  assembly->index = 0;
749  }
750  assembly->index += msg_lens[msg_count];
751  }
752 }
753 
754 /*
755  * Totem Process Group Abstraction
756  * depends on poll abstraction, POSIX, IPV4
757  */
758 
760 
761 int callback_token_received_fn (enum totem_callback_token_type type,
762  const void *data)
763 {
764  struct totempg_mcast mcast;
765  struct iovec iovecs[3];
766 
767  if (totempg_threaded_mode == 1) {
768  pthread_mutex_lock (&mcast_msg_mutex);
769  }
770  if (mcast_packed_msg_count == 0) {
771  if (totempg_threaded_mode == 1) {
772  pthread_mutex_unlock (&mcast_msg_mutex);
773  }
774  return (0);
775  }
776  if (totemmrp_avail() == 0) {
777  if (totempg_threaded_mode == 1) {
778  pthread_mutex_unlock (&mcast_msg_mutex);
779  }
780  return (0);
781  }
782  mcast.header.version = 0;
783  mcast.header.type = 0;
784  mcast.fragmented = 0;
785 
786  /*
787  * Was the first message in this buffer a continuation of a
788  * fragmented message?
789  */
790  mcast.continuation = fragment_continuation;
791  fragment_continuation = 0;
792 
793  mcast.msg_count = mcast_packed_msg_count;
794 
795  iovecs[0].iov_base = (void *)&mcast;
796  iovecs[0].iov_len = sizeof (struct totempg_mcast);
797  iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
798  iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
799  iovecs[2].iov_base = (void *)&fragmentation_data[0];
800  iovecs[2].iov_len = fragment_size;
801  (void)totemmrp_mcast (iovecs, 3, 0);
802 
803  mcast_packed_msg_count = 0;
804  fragment_size = 0;
805 
806  if (totempg_threaded_mode == 1) {
807  pthread_mutex_unlock (&mcast_msg_mutex);
808  }
809  return (0);
810 }
811 
812 /*
813  * Initialize the totem process group abstraction
814  */
816  qb_loop_t *poll_handle,
817  struct totem_config *totem_config)
818 {
819  int res;
820 
821  totempg_totem_config = totem_config;
822  totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
823  totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error;
824  totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
825  totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
826  totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
827  totempg_log_printf = totem_config->totem_logging_configuration.log_printf;
828  totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
829 
830  fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
831  if (fragmentation_data == 0) {
832  return (-1);
833  }
834 
835  totemsrp_net_mtu_adjust (totem_config);
836 
837  res = totemmrp_initialize (
838  poll_handle,
839  totem_config,
840  &totempg_stats,
841  totempg_deliver_fn,
842  totempg_confchg_fn,
843  totempg_waiting_trans_ack_cb);
844 
845  if (res == -1) {
846  goto error_exit;
847  }
848 
850  &callback_token_received_handle,
852  0,
853  callback_token_received_fn,
854  0);
855 
856  totempg_size_limit = (totemmrp_avail() - 1) *
857  (totempg_totem_config->net_mtu -
858  sizeof (struct totempg_mcast) - 16);
859 
860  list_init (&totempg_groups_list);
861 
862 error_exit:
863  return (res);
864 }
865 
866 void totempg_finalize (void)
867 {
868  if (totempg_threaded_mode == 1) {
869  pthread_mutex_lock (&totempg_mutex);
870  }
872  if (totempg_threaded_mode == 1) {
873  pthread_mutex_unlock (&totempg_mutex);
874  }
875 }
876 
877 /*
878  * Multicast a message
879  */
880 static int mcast_msg (
881  struct iovec *iovec_in,
882  unsigned int iov_len,
883  int guarantee)
884 {
885  int res = 0;
886  struct totempg_mcast mcast;
887  struct iovec iovecs[3];
888  struct iovec iovec[64];
889  int i;
890  int dest, src;
891  int max_packet_size = 0;
892  int copy_len = 0;
893  int copy_base = 0;
894  int total_size = 0;
895 
896  if (totempg_threaded_mode == 1) {
897  pthread_mutex_lock (&mcast_msg_mutex);
898  }
900 
901  /*
902  * Remove zero length iovectors from the list
903  */
904  assert (iov_len < 64);
905  for (dest = 0, src = 0; src < iov_len; src++) {
906  if (iovec_in[src].iov_len) {
907  memcpy (&iovec[dest++], &iovec_in[src],
908  sizeof (struct iovec));
909  }
910  }
911  iov_len = dest;
912 
913  max_packet_size = TOTEMPG_PACKET_SIZE -
914  (sizeof (unsigned short) * (mcast_packed_msg_count + 1));
915 
916  mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
917 
918  /*
919  * Check if we would overwrite new message queue
920  */
921  for (i = 0; i < iov_len; i++) {
922  total_size += iovec[i].iov_len;
923  }
924 
925  if (byte_count_send_ok (total_size + sizeof(unsigned short) *
926  (mcast_packed_msg_count)) == 0) {
927 
928  if (totempg_threaded_mode == 1) {
929  pthread_mutex_unlock (&mcast_msg_mutex);
930  }
931  return(-1);
932  }
933 
934  mcast.header.version = 0;
935  for (i = 0; i < iov_len; ) {
936  mcast.fragmented = 0;
937  mcast.continuation = fragment_continuation;
938  copy_len = iovec[i].iov_len - copy_base;
939 
940  /*
941  * If it all fits with room left over, copy it in.
942  * We need to leave at least sizeof(short) + 1 bytes in the
943  * fragment_buffer on exit so that max_packet_size + fragment_size
944  * doesn't exceed the size of the fragment_buffer on the next call.
945  */
946  if ((iovec[i].iov_len + fragment_size) <
947  (max_packet_size - sizeof (unsigned short))) {
948 
949  memcpy (&fragmentation_data[fragment_size],
950  (char *)iovec[i].iov_base + copy_base, copy_len);
951  fragment_size += copy_len;
952  mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
953  next_fragment = 1;
954  copy_len = 0;
955  copy_base = 0;
956  i++;
957  continue;
958 
959  /*
960  * If it just fits or is too big, then send out what fits.
961  */
962  } else {
963  unsigned char *data_ptr;
964 
965  copy_len = min(copy_len, max_packet_size - fragment_size);
966  if( copy_len == max_packet_size )
967  data_ptr = (unsigned char *)iovec[i].iov_base + copy_base;
968  else {
969  data_ptr = fragmentation_data;
970  }
971 
972  memcpy (&fragmentation_data[fragment_size],
973  (unsigned char *)iovec[i].iov_base + copy_base, copy_len);
974  mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
975 
976  /*
977  * if we're not on the last iovec or the iovec is too large to
978  * fit, then indicate a fragment. This also means that the next
979  * message will have the continuation of this one.
980  */
981  if ((i < (iov_len - 1)) ||
982  ((copy_base + copy_len) < iovec[i].iov_len)) {
983  if (!next_fragment) {
984  next_fragment++;
985  }
986  fragment_continuation = next_fragment;
987  mcast.fragmented = next_fragment++;
988  assert(fragment_continuation != 0);
989  assert(mcast.fragmented != 0);
990  } else {
991  fragment_continuation = 0;
992  }
993 
994  /*
995  * assemble the message and send it
996  */
997  mcast.msg_count = ++mcast_packed_msg_count;
998  iovecs[0].iov_base = (void *)&mcast;
999  iovecs[0].iov_len = sizeof(struct totempg_mcast);
1000  iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
1001  iovecs[1].iov_len = mcast_packed_msg_count *
1002  sizeof(unsigned short);
1003  iovecs[2].iov_base = (void *)data_ptr;
1004  iovecs[2].iov_len = fragment_size + copy_len;
1005  assert (totemmrp_avail() > 0);
1006  res = totemmrp_mcast (iovecs, 3, guarantee);
1007  if (res == -1) {
1008  goto error_exit;
1009  }
1010 
1011  /*
1012  * Recalculate counts and indexes for the next.
1013  */
1014  mcast_packed_msg_lens[0] = 0;
1015  mcast_packed_msg_count = 0;
1016  fragment_size = 0;
1017  max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
1018 
1019  /*
1020  * If the iovec all fit, go to the next iovec
1021  */
1022  if ((copy_base + copy_len) == iovec[i].iov_len) {
1023  copy_len = 0;
1024  copy_base = 0;
1025  i++;
1026 
1027  /*
1028  * Continue with the rest of the current iovec.
1029  */
1030  } else {
1031  copy_base += copy_len;
1032  }
1033  }
1034  }
1035 
1036  /*
1037  * Bump only if we added message data. This may be zero if
1038  * the last buffer just fit into the fragmentation_data buffer
1039  * and we were at the last iovec.
1040  */
1041  if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1042  mcast_packed_msg_count++;
1043  }
1044 
1045 error_exit:
1046  if (totempg_threaded_mode == 1) {
1047  pthread_mutex_unlock (&mcast_msg_mutex);
1048  }
1049  return (res);
1050 }
1051 
1052 /*
1053  * Determine if a message of msg_size could be queued
1054  */
1055 static int msg_count_send_ok (
1056  int msg_count)
1057 {
1058  int avail = 0;
1059 
1060  avail = totemmrp_avail ();
1061  totempg_stats.msg_queue_avail = avail;
1062 
1063  return ((avail - totempg_reserved) > msg_count);
1064 }
1065 
1066 static int byte_count_send_ok (
1067  int byte_count)
1068 {
1069  unsigned int msg_count = 0;
1070  int avail = 0;
1071 
1072  avail = totemmrp_avail ();
1073 
1074  msg_count = (byte_count / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1075 
1076  return (avail >= msg_count);
1077 }
1078 
1079 static int send_reserve (
1080  int msg_size)
1081 {
1082  unsigned int msg_count = 0;
1083 
1084  msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1085  totempg_reserved += msg_count;
1086  totempg_stats.msg_reserved = totempg_reserved;
1087 
1088  return (msg_count);
1089 }
1090 
1091 static void send_release (
1092  int msg_count)
1093 {
1094  totempg_reserved -= msg_count;
1095  totempg_stats.msg_reserved = totempg_reserved;
1096 }
1097 
1098 #ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1099 #undef MESSAGE_QUEUE_MAX
1100 #define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1101 #endif /* HAVE_SMALL_MEMORY_FOOTPRINT */
1102 
1103 static uint32_t q_level_precent_used(void)
1104 {
1105  return (100 - (((totemmrp_avail() - totempg_reserved) * 100) / MESSAGE_QUEUE_MAX));
1106 }
1107 
1109  void **handle_out,
1111  int delete,
1112  int (*callback_fn) (enum totem_callback_token_type type, const void *),
1113  const void *data)
1114 {
1115  unsigned int res;
1116  if (totempg_threaded_mode == 1) {
1117  pthread_mutex_lock (&callback_token_mutex);
1118  }
1119  res = totemmrp_callback_token_create (handle_out, type, delete,
1120  callback_fn, data);
1121  if (totempg_threaded_mode == 1) {
1122  pthread_mutex_unlock (&callback_token_mutex);
1123  }
1124  return (res);
1125 }
1126 
1128  void *handle_out)
1129 {
1130  if (totempg_threaded_mode == 1) {
1131  pthread_mutex_lock (&callback_token_mutex);
1132  }
1133  totemmrp_callback_token_destroy (handle_out);
1134  if (totempg_threaded_mode == 1) {
1135  pthread_mutex_unlock (&callback_token_mutex);
1136  }
1137 }
1138 
1139 /*
1140  * vi: set autoindent tabstop=4 shiftwidth=4 :
1141  */
1142 
1144  void **totempg_groups_instance,
1145 
1146  void (*deliver_fn) (
1147  unsigned int nodeid,
1148  const void *msg,
1149  unsigned int msg_len,
1150  int endian_conversion_required),
1151 
1152  void (*confchg_fn) (
1153  enum totem_configuration_type configuration_type,
1154  const unsigned int *member_list, size_t member_list_entries,
1155  const unsigned int *left_list, size_t left_list_entries,
1156  const unsigned int *joined_list, size_t joined_list_entries,
1157  const struct memb_ring_id *ring_id))
1158 {
1159  struct totempg_group_instance *instance;
1160 
1161  if (totempg_threaded_mode == 1) {
1162  pthread_mutex_lock (&totempg_mutex);
1163  }
1164 
1165  instance = malloc (sizeof (struct totempg_group_instance));
1166  if (instance == NULL) {
1167  goto error_exit;
1168  }
1169 
1170  instance->deliver_fn = deliver_fn;
1171  instance->confchg_fn = confchg_fn;
1172  instance->groups = 0;
1173  instance->groups_cnt = 0;
1174  instance->q_level = QB_LOOP_MED;
1175  list_init (&instance->list);
1176  list_add (&instance->list, &totempg_groups_list);
1177 
1178  if (totempg_threaded_mode == 1) {
1179  pthread_mutex_unlock (&totempg_mutex);
1180  }
1181  *totempg_groups_instance = instance;
1182  return (0);
1183 
1184 error_exit:
1185  if (totempg_threaded_mode == 1) {
1186  pthread_mutex_unlock (&totempg_mutex);
1187  }
1188  return (-1);
1189 }
1190 
1192  void *totempg_groups_instance,
1193  const struct totempg_group *groups,
1194  size_t group_cnt)
1195 {
1196  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1197  struct totempg_group *new_groups;
1198  unsigned int res = 0;
1199 
1200  if (totempg_threaded_mode == 1) {
1201  pthread_mutex_lock (&totempg_mutex);
1202  }
1203 
1204  new_groups = realloc (instance->groups,
1205  sizeof (struct totempg_group) *
1206  (instance->groups_cnt + group_cnt));
1207  if (new_groups == 0) {
1208  res = ENOMEM;
1209  goto error_exit;
1210  }
1211  memcpy (&new_groups[instance->groups_cnt],
1212  groups, group_cnt * sizeof (struct totempg_group));
1213  instance->groups = new_groups;
1214  instance->groups_cnt += group_cnt;
1215 
1216 error_exit:
1217  if (totempg_threaded_mode == 1) {
1218  pthread_mutex_unlock (&totempg_mutex);
1219  }
1220  return (res);
1221 }
1222 
1224  void *totempg_groups_instance,
1225  const struct totempg_group *groups,
1226  size_t group_cnt)
1227 {
1228  if (totempg_threaded_mode == 1) {
1229  pthread_mutex_lock (&totempg_mutex);
1230  }
1231 
1232  if (totempg_threaded_mode == 1) {
1233  pthread_mutex_unlock (&totempg_mutex);
1234  }
1235  return (0);
1236 }
1237 
1238 #define MAX_IOVECS_FROM_APP 32
1239 #define MAX_GROUPS_PER_MSG 32
1240 
1242  void *totempg_groups_instance,
1243  const struct iovec *iovec,
1244  unsigned int iov_len,
1245  int guarantee)
1246 {
1247  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1248  unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1249  struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1250  int i;
1251  unsigned int res;
1252 
1253  if (totempg_threaded_mode == 1) {
1254  pthread_mutex_lock (&totempg_mutex);
1255  }
1256 
1257  /*
1258  * Build group_len structure and the iovec_mcast structure
1259  */
1260  group_len[0] = instance->groups_cnt;
1261  for (i = 0; i < instance->groups_cnt; i++) {
1262  group_len[i + 1] = instance->groups[i].group_len;
1263  iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
1264  iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group;
1265  }
1266  iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
1267  iovec_mcast[0].iov_base = group_len;
1268  for (i = 0; i < iov_len; i++) {
1269  iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
1270  iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
1271  }
1272 
1273  res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
1274 
1275  if (totempg_threaded_mode == 1) {
1276  pthread_mutex_unlock (&totempg_mutex);
1277  }
1278 
1279  return (res);
1280 }
1281 
1282 static void check_q_level(
1283  void *totempg_groups_instance)
1284 {
1285  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1286  int32_t old_level = instance->q_level;
1287  int32_t percent_used = q_level_precent_used();
1288 
1289  if (percent_used >= 75 && instance->q_level != TOTEM_Q_LEVEL_CRITICAL) {
1290  instance->q_level = TOTEM_Q_LEVEL_CRITICAL;
1291  } else if (percent_used < 30 && instance->q_level != TOTEM_Q_LEVEL_LOW) {
1292  instance->q_level = TOTEM_Q_LEVEL_LOW;
1293  } else if (percent_used > 40 && percent_used < 50 && instance->q_level != TOTEM_Q_LEVEL_GOOD) {
1294  instance->q_level = TOTEM_Q_LEVEL_GOOD;
1295  } else if (percent_used > 60 && percent_used < 70 && instance->q_level != TOTEM_Q_LEVEL_HIGH) {
1296  instance->q_level = TOTEM_Q_LEVEL_HIGH;
1297  }
1298  if (totem_queue_level_changed && old_level != instance->q_level) {
1299  totem_queue_level_changed(instance->q_level);
1300  }
1301 }
1302 
1304  void *totempg_groups_instance)
1305 {
1306  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1307 
1308  check_q_level(instance);
1309 }
1310 
1312  void *totempg_groups_instance,
1313  const struct iovec *iovec,
1314  unsigned int iov_len)
1315 {
1316  struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1317  unsigned int size = 0;
1318  unsigned int i;
1319  unsigned int reserved = 0;
1320 
1321  if (totempg_threaded_mode == 1) {
1322  pthread_mutex_lock (&totempg_mutex);
1323  pthread_mutex_lock (&mcast_msg_mutex);
1324  }
1325 
1326  for (i = 0; i < instance->groups_cnt; i++) {
1327  size += instance->groups[i].group_len;
1328  }
1329  for (i = 0; i < iov_len; i++) {
1330  size += iovec[i].iov_len;
1331  }
1332 
1333  if (size >= totempg_size_limit) {
1334  reserved = -1;
1335  goto error_exit;
1336  }
1337 
1338  if (byte_count_send_ok (size)) {
1339  reserved = send_reserve (size);
1340  } else {
1341  reserved = 0;
1342  }
1343 
1344 error_exit:
1345  check_q_level(instance);
1346 
1347  if (totempg_threaded_mode == 1) {
1348  pthread_mutex_unlock (&mcast_msg_mutex);
1349  pthread_mutex_unlock (&totempg_mutex);
1350  }
1351  return (reserved);
1352 }
1353 
1354 
1356 {
1357  if (totempg_threaded_mode == 1) {
1358  pthread_mutex_lock (&totempg_mutex);
1359  pthread_mutex_lock (&mcast_msg_mutex);
1360  }
1361  send_release (msg_count);
1362  if (totempg_threaded_mode == 1) {
1363  pthread_mutex_unlock (&mcast_msg_mutex);
1364  pthread_mutex_unlock (&totempg_mutex);
1365  }
1366  return 0;
1367 }
1368 
1370  void *totempg_groups_instance,
1371  int guarantee,
1372  const struct totempg_group *groups,
1373  size_t groups_cnt,
1374  const struct iovec *iovec,
1375  unsigned int iov_len)
1376 {
1377  unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1378  struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1379  int i;
1380  unsigned int res;
1381 
1382  if (totempg_threaded_mode == 1) {
1383  pthread_mutex_lock (&totempg_mutex);
1384  }
1385 
1386  /*
1387  * Build group_len structure and the iovec_mcast structure
1388  */
1389  group_len[0] = groups_cnt;
1390  for (i = 0; i < groups_cnt; i++) {
1391  group_len[i + 1] = groups[i].group_len;
1392  iovec_mcast[i + 1].iov_len = groups[i].group_len;
1393  iovec_mcast[i + 1].iov_base = (void *) groups[i].group;
1394  }
1395  iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
1396  iovec_mcast[0].iov_base = group_len;
1397  for (i = 0; i < iov_len; i++) {
1398  iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1399  iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1400  }
1401 
1402  res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
1403 
1404  if (totempg_threaded_mode == 1) {
1405  pthread_mutex_unlock (&totempg_mutex);
1406  }
1407  return (res);
1408 }
1409 
1410 /*
1411  * Returns -1 if error, 0 if can't send, 1 if can send the message
1412  */
1414  void *totempg_groups_instance,
1415  const struct totempg_group *groups,
1416  size_t groups_cnt,
1417  const struct iovec *iovec,
1418  unsigned int iov_len)
1419 {
1420  unsigned int size = 0;
1421  unsigned int i;
1422  unsigned int res;
1423 
1424  if (totempg_threaded_mode == 1) {
1425  pthread_mutex_lock (&totempg_mutex);
1426  }
1427 
1428  for (i = 0; i < groups_cnt; i++) {
1429  size += groups[i].group_len;
1430  }
1431  for (i = 0; i < iov_len; i++) {
1432  size += iovec[i].iov_len;
1433  }
1434 
1435  res = msg_count_send_ok (size);
1436 
1437  if (totempg_threaded_mode == 1) {
1438  pthread_mutex_unlock (&totempg_mutex);
1439  }
1440  return (res);
1441 }
1442 
1444  unsigned int nodeid,
1445  struct totem_ip_address *interfaces,
1446  unsigned int interfaces_size,
1447  char ***status,
1448  unsigned int *iface_count)
1449 {
1450  int res;
1451 
1452  res = totemmrp_ifaces_get (
1453  nodeid,
1454  interfaces,
1455  interfaces_size,
1456  status,
1457  iface_count);
1458 
1459  return (res);
1460 }
1461 
1463 {
1464  totemmrp_event_signal (type, value);
1465 }
1466 
1467 void* totempg_get_stats (void)
1468 {
1469  return &totempg_stats;
1470 }
1471 
1473  const char *cipher_type,
1474  const char *hash_type)
1475 {
1476  int res;
1477 
1478  res = totemmrp_crypto_set (cipher_type, hash_type);
1479 
1480  return (res);
1481 }
1482 
1484 {
1485  int res;
1486 
1487  res = totemmrp_ring_reenable ();
1488 
1489  return (res);
1490 }
1491 
1492 #define ONE_IFACE_LEN 63
1493 const char *totempg_ifaces_print (unsigned int nodeid)
1494 {
1495  static char iface_string[256 * INTERFACE_MAX];
1496  char one_iface[ONE_IFACE_LEN+1];
1497  struct totem_ip_address interfaces[INTERFACE_MAX];
1498  char **status;
1499  unsigned int iface_count;
1500  unsigned int i;
1501  int res;
1502 
1503  iface_string[0] = '\0';
1504 
1505  res = totempg_ifaces_get (nodeid, interfaces, INTERFACE_MAX, &status, &iface_count);
1506  if (res == -1) {
1507  return ("no interface found for nodeid");
1508  }
1509 
1510  res = totempg_ifaces_get (nodeid, interfaces, INTERFACE_MAX, &status, &iface_count);
1511 
1512  for (i = 0; i < iface_count; i++) {
1513  snprintf (one_iface, ONE_IFACE_LEN,
1514  "r(%d) ip(%s) ",
1515  i, totemip_print (&interfaces[i]));
1516  strcat (iface_string, one_iface);
1517  }
1518  return (iface_string);
1519 }
1520 
1521 unsigned int totempg_my_nodeid_get (void)
1522 {
1523  return (totemmrp_my_nodeid_get());
1524 }
1525 
1527 {
1528  return (totemmrp_my_family_get());
1529 }
1531  void (*totem_service_ready) (void))
1532 {
1533  totemmrp_service_ready_register (totem_service_ready);
1534 }
1535 
1537 {
1538  totem_queue_level_changed = fn;
1539 }
1540 
1542  const struct totem_ip_address *member,
1543  int ring_no)
1544 {
1545  return totemmrp_member_add (member, ring_no);
1546 }
1547 
1549  const struct totem_ip_address *member,
1550  int ring_no)
1551 {
1552  return totemmrp_member_remove (member, ring_no);
1553 }
1554 
1556 {
1557  totempg_threaded_mode = 1;
1559 }
1560 
1562 {
1563  totemmrp_trans_ack ();
1564 }
1565 
unsigned char last_frag_num
Definition: totempg.c:203
int totempg_initialize(qb_loop_t *poll_handle, struct totem_config *totem_config)
Initialize the totem process groups abstraction.
Definition: totempg.c:815
int totemmrp_ifaces_get(unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemmrp.c:199
void totemmrp_finalize(void)
Definition: totemmrp.c:154
Totem Single Ring Protocol.
#define TOTEMPG_NEED_ALIGN
Definition: totempg.c:125
uint32_t value
int totempg_groups_initialize(void **totempg_groups_instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
Definition: totempg.c:1143
size_t group_len
Definition: totempg.h:57
struct list_head * next
Definition: list.h:47
void totemmrp_callback_token_destroy(void *handle_out)
Definition: totemmrp.c:188
void * totempg_get_stats(void)
Definition: totempg.c:1467
The totem_ip_address struct.
Definition: coroapi.h:111
const char * totemip_print(const struct totem_ip_address *addr)
Definition: totemip.c:264
Totem Single Ring Protocol.
void(* confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totempg.c:249
int totempg_groups_join(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
Definition: totempg.c:1191
void totempg_queue_level_register_callback(totem_queue_level_changed_fn fn)
Definition: totempg.c:1536
#define TOTEMPG_PACKET_SIZE
Definition: totempg.c:155
unsigned char fragmented
Definition: totempg.c:141
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
struct totempg_group * groups
Definition: totempg.c:256
int totemmrp_initialize(qb_loop_t *poll_handle, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Initialize the group messaging interface.
Definition: totemmrp.c:118
void totempg_trans_ack(void)
Definition: totempg.c:1561
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:5152
int totemmrp_member_add(const struct totem_ip_address *member, int ring_no)
Definition: totemmrp.c:256
int totemmrp_avail(void)
Return number of available messages that can be queued.
Definition: totemmrp.c:173
struct message_header header
Definition: totemsrp.c:60
int guarantee
Definition: totemsrp.c:66
struct list_head list
Definition: totempg.c:261
#define log_printf(level, format, args...)
Definition: totempg.c:272
int totemmrp_my_family_get(void)
Definition: totemmrp.c:233
int totemmrp_crypto_set(const char *cipher_type, const char *hash_type)
Definition: totemmrp.c:219
void totemmrp_trans_ack(void)
Definition: totemmrp.c:283
Definition: list.h:46
int totempg_groups_send_ok_groups(void *totempg_groups_instance, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
Definition: totempg.c:1413
unsigned char data[MESSAGE_SIZE_MAX]
Definition: totempg.c:201
int totempg_groups_mcast_groups(void *totempg_groups_instance, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
Definition: totempg.c:1369
int totempg_ifaces_get(unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totempg.c:1443
void(*) in log_level_security)
Definition: totem.h:85
#define INTERFACE_MAX
Definition: coroapi.h:88
#define MAX_GROUPS_PER_MSG
Definition: totempg.c:1239
const char * totempg_ifaces_print(unsigned int nodeid)
Definition: totempg.c:1493
void totempg_threaded_mode_enable(void)
Definition: totempg.c:1555
DECLARE_LIST_INIT(assembly_list_inuse)
int totemmrp_member_remove(const struct totem_ip_address *member, int ring_no)
Definition: totemmrp.c:267
void(* totem_queue_level_changed_fn)(enum totem_q_level level)
Definition: totempg.h:182
int totempg_groups_leave(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
Definition: totempg.c:1223
struct list_head list
Definition: totempg.c:205
int totemmrp_mcast(struct iovec *iovec, unsigned int iov_len, int priority)
Multicast a message.
Definition: totemmrp.c:162
int totempg_my_family_get(void)
Definition: totempg.c:1526
void(* deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totempg.c:243
unsigned char continuation
Definition: totempg.c:142
const void * group
Definition: totempg.h:56
void * callback_token_received_handle
Definition: totempg.c:759
int totemmrp_callback_token_create(void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemmrp.c:178
int totemmrp_ring_reenable(void)
Definition: totemmrp.c:238
Linked list API.
int totempg_groups_mcast_joined(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
Definition: totempg.c:1241
void totempg_check_q_level(void *totempg_groups_instance)
Definition: totempg.c:1303
unsigned int totempg_my_nodeid_get(void)
Definition: totempg.c:1521
typedef __attribute__
uint32_t msg_queue_avail
Definition: totem.h:297
totem_event_type
Definition: totem.h:219
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:78
int totempg_groups_joined_release(int msg_count)
Definition: totempg.c:1355
void totempg_event_signal(enum totem_event_type type, int value)
Definition: totempg.c:1462
unsigned short msg_count
Definition: totempg.c:143
int totempg_member_remove(const struct totem_ip_address *member, int ring_no)
Definition: totempg.c:1548
unsigned int totemmrp_my_nodeid_get(void)
Definition: totemmrp.c:228
void totempg_callback_token_destroy(void *handle_out)
Definition: totempg.c:1127
unsigned int net_mtu
Definition: totem.h:168
void totemmrp_service_ready_register(void(*totem_service_ready)(void))
Definition: totemmrp.c:248
throw_away_mode
Definition: totempg.c:194
int totempg_groups_joined_reserve(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len)
Definition: totempg.c:1311
int totempg_ring_reenable(void)
Definition: totempg.c:1483
#define MESSAGE_QUEUE_MAX
Definition: totempg.c:1100
The memb_ring_id struct.
Definition: coroapi.h:122
#define swab16(x)
The swab16 macro.
Definition: swab.h:39
void totemmrp_threaded_mode_enable(void)
Definition: totemmrp.c:278
uint32_t msg_reserved
Definition: totem.h:296
void totempg_service_ready_register(void(*totem_service_ready)(void))
Definition: totempg.c:1530
unsigned int nodeid
Definition: totempg.c:200
#define min(a, b)
Definition: totempg.c:114
#define FRAME_SIZE_MAX
Definition: totem.h:50
#define list_entry(ptr, type, member)
Definition: list.h:84
#define ONE_IFACE_LEN
Definition: totempg.c:1492
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:166
enum throw_away_mode throw_away_mode
Definition: totempg.c:204
int totempg_crypto_set(const char *cipher_type, const char *hash_type)
Definition: totempg.c:1472
void totemmrp_event_signal(enum totem_event_type type, int value)
Definition: totemmrp.c:194
char type
Definition: totemrrp.c:518
static void(*) struct totem_config totempg_totem_config)
Definition: totempg.c:190
#define MESSAGE_SIZE_MAX
Definition: coroapi.h:97
unsigned int nodeid
Definition: coroapi.h:75
void totempg_finalize(void)
Definition: totempg.c:866
struct memb_ring_id ring_id
Definition: totemsrp.c:64
int totempg_member_add(const struct totem_ip_address *member, int ring_no)
Definition: totempg.c:1541
int index
Definition: totempg.c:202
Totem Single Ring Protocol.
int totempg_callback_token_create(void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totempg.c:1108
struct totempg_mcast_header header
Definition: totempg.c:140
totem_callback_token_type
The totem_callback_token_type enum.
Definition: coroapi.h:142