57 #include <sys/types.h>
59 #include <sys/socket.h>
62 #include <sys/ioctl.h>
63 #include <sys/param.h>
64 #include <netinet/in.h>
65 #include <arpa/inet.h>
78 #include <qb/qbdefs.h>
79 #include <qb/qbutil.h>
80 #include <qb/qbloop.h>
86 #define LOGSYS_UTILS_ONLY 1
95 #define LOCALHOST_IP inet_addr("127.0.0.1")
96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384
97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384
98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500
100 #define RETRANSMIT_ENTRIES_MAX 30
101 #define TOKEN_SIZE_MAX 64000
102 #define LEAVE_DUMMY_NODEID 0
114 #define SEQNO_START_MSG 0x0
115 #define SEQNO_START_TOKEN 0x0
137 #define ENDIAN_LOCAL 0xff22
438 const char *
function,
441 const char *format, ...)
__attribute__((format(printf, 6, 7)));;
454 unsigned int msg_len,
455 int endian_conversion_required);
459 const unsigned int *member_list,
size_t member_list_entries,
460 const unsigned int *left_list,
size_t left_list_entries,
461 const unsigned int *joined_list,
size_t joined_list_entries,
524 int endian_conversion_needed);
530 static int message_handler_orf_token (
534 int endian_conversion_needed);
536 static int message_handler_mcast (
540 int endian_conversion_needed);
542 static int message_handler_memb_merge_detect (
546 int endian_conversion_needed);
548 static int message_handler_memb_join (
552 int endian_conversion_needed);
554 static int message_handler_memb_commit_token (
558 int endian_conversion_needed);
560 static int message_handler_token_hold_cancel (
564 int endian_conversion_needed);
568 static unsigned int main_msgs_missing (
void);
570 static void main_token_seqid_get (
573 unsigned int *token_is);
575 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src);
577 static void srp_addr_to_nodeid (
578 unsigned int *nodeid_out,
580 unsigned int entries);
582 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b);
589 static void memb_state_gather_enter (
struct totemsrp_instance *instance,
int gather_from);
590 static void messages_deliver_to_app (
struct totemsrp_instance *instance,
int skip,
unsigned int end_point);
592 int fcc_mcasts_allowed);
593 static void messages_free (
struct totemsrp_instance *instance,
unsigned int token_aru);
597 static void target_set_completed (
void *context);
599 static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance);
604 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out);
606 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out);
607 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out);
608 static void memb_merge_detect_endian_convert (
611 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in);
612 static void timer_function_orf_token_timeout (
void *data);
613 static void timer_function_pause_timeout (
void *data);
614 static void timer_function_heartbeat_timeout (
void *data);
615 static void timer_function_token_retransmit_timeout (
void *data);
616 static void timer_function_token_hold_retransmit_timeout (
void *data);
617 static void timer_function_merge_detect_timeout (
void *data);
619 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr);
624 unsigned int msg_len);
629 unsigned int iface_no);
634 message_handler_orf_token,
635 message_handler_mcast,
636 message_handler_memb_merge_detect,
637 message_handler_memb_join,
638 message_handler_memb_commit_token,
639 message_handler_token_hold_cancel
643 static const char *rundir = NULL;
645 #define log_printf(level, format, args...) \
647 instance->totemsrp_log_printf ( \
648 level, instance->totemsrp_subsys_id, \
649 __FUNCTION__, __FILE__, __LINE__, \
652 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
654 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
655 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
656 instance->totemsrp_log_printf ( \
657 level, instance->totemsrp_subsys_id, \
658 __FUNCTION__, __FILE__, __LINE__, \
659 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
693 static void main_token_seqid_get (
696 unsigned int *token_is)
708 static unsigned int main_msgs_missing (
void)
717 uint64_t timestamp_msec;
720 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
725 "Process pause detected for %d ms, flushing membership messages.", (
unsigned int)(now_msec - timestamp_msec));
740 unsigned long long nano_secs = qb_util_nano_current_get ();
742 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
775 qb_loop_t *poll_handle,
783 unsigned int msg_len,
784 int endian_conversion_required),
788 const unsigned int *member_list,
size_t member_list_entries,
789 const unsigned int *left_list,
size_t left_list_entries,
790 const unsigned int *joined_list,
size_t joined_list_entries,
792 void (*waiting_trans_ack_cb_fn) (
799 if (instance == NULL) {
803 rundir = getenv (
"COROSYNC_RUN_DIR");
804 if (rundir == NULL) {
808 res = mkdir (rundir, 0700);
809 if (res == -1 && errno != EEXIST) {
813 res = chdir (rundir);
818 totemsrp_instance_initialize (instance);
850 "Token Timeout (%d ms) retransmit timeout (%d ms)",
853 "token hold (%d ms) retransmits before loss (%d retrans)",
856 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
863 "downcheck (%d ms) fail to recv const (%d msgs)",
869 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
873 "missed count const (%d messages)",
877 "send threads (%d threads)", totem_config->
threads);
879 "RRP token expired timeout (%d ms)",
882 "RRP token problem counter (%d ms)",
885 "RRP threshold (%d problem count)",
888 "RRP multicast threshold (%d problem count)",
891 "RRP automatic recovery check timeout (%d ms)",
918 timer_function_pause_timeout (instance);
922 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
933 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
937 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
939 "heartbeat timeout should be less than the token timeout. HeartBeat is Diabled !!");
956 main_token_seqid_get,
958 target_set_completed);
975 token_event_stats_collector,
981 token_event_stats_collector,
983 *srp_context = instance;
999 memb_leave_message_send (instance);
1019 unsigned int nodeid,
1021 unsigned int interfaces_size,
1023 unsigned int *iface_count)
1027 unsigned int found = 0;
1040 if (interfaces_size >= *iface_count) {
1060 if (interfaces_size >= *iface_count) {
1077 const char *cipher_type,
1078 const char *hash_type)
1127 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b)
1132 for (i = 0; i < 1; i++) {
1141 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src)
1152 static void srp_addr_to_nodeid (
1153 unsigned int *nodeid_out,
1155 unsigned int entries)
1159 for (i = 0; i < entries; i++) {
1160 nodeid_out[i] = srp_addr_in[i].
addr[0].
nodeid;
1164 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in)
1178 static void memb_set_subtract (
1179 struct srp_addr *out_list,
int *out_list_entries,
1180 struct srp_addr *one_list,
int one_list_entries,
1181 struct srp_addr *two_list,
int two_list_entries)
1187 *out_list_entries = 0;
1189 for (i = 0; i < one_list_entries; i++) {
1190 for (j = 0; j < two_list_entries; j++) {
1191 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1197 srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1198 *out_list_entries = *out_list_entries + 1;
1207 static void memb_consensus_set (
1234 static int memb_consensus_isset (
1251 static int memb_consensus_agreed (
1255 int token_memb_entries = 0;
1259 memb_set_subtract (token_memb, &token_memb_entries,
1263 for (i = 0; i < token_memb_entries; i++) {
1264 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1279 assert (token_memb_entries >= 1);
1284 static void memb_consensus_notset (
1286 struct srp_addr *no_consensus_list,
1287 int *no_consensus_list_entries,
1289 int comparison_list_entries)
1293 *no_consensus_list_entries = 0;
1296 if (memb_consensus_isset (instance, &instance->
my_proc_list[i]) == 0) {
1297 srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->
my_proc_list[i]);
1298 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1306 static int memb_set_equal (
1307 struct srp_addr *set1,
int set1_entries,
1308 struct srp_addr *set2,
int set2_entries)
1315 if (set1_entries != set2_entries) {
1318 for (i = 0; i < set2_entries; i++) {
1319 for (j = 0; j < set1_entries; j++) {
1320 if (srp_addr_equal (&set1[j], &set2[i])) {
1336 static int memb_set_subset (
1337 const struct srp_addr *subset,
int subset_entries,
1338 const struct srp_addr *fullset,
int fullset_entries)
1344 if (subset_entries > fullset_entries) {
1347 for (i = 0; i < subset_entries; i++) {
1348 for (j = 0; j < fullset_entries; j++) {
1349 if (srp_addr_equal (&subset[i], &fullset[j])) {
1363 static void memb_set_merge (
1364 const struct srp_addr *subset,
int subset_entries,
1365 struct srp_addr *fullset,
int *fullset_entries)
1371 for (i = 0; i < subset_entries; i++) {
1372 for (j = 0; j < *fullset_entries; j++) {
1373 if (srp_addr_equal (&fullset[j], &subset[i])) {
1379 srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1380 *fullset_entries = *fullset_entries + 1;
1387 static void memb_set_and_with_ring_id (
1403 for (i = 0; i < set2_entries; i++) {
1404 for (j = 0; j < set1_entries; j++) {
1405 if (srp_addr_equal (&set1[j], &set2[i])) {
1406 if (memcmp (&set1_ring_ids[j], old_ring_id,
sizeof (
struct memb_ring_id)) == 0) {
1413 srp_addr_copy (&and[*and_entries], &set1[j]);
1414 *and_entries = *and_entries + 1;
1421 #ifdef CODE_COVERAGE
1422 static void memb_set_print (
1429 printf (
"List '%s' contains %d entries:\n",
string, list_entries);
1431 for (i = 0; i < list_entries; i++) {
1432 printf (
"Address %d with %d rings\n", i, list[i].
no_addrs);
1433 for (j = 0; j < list[i].
no_addrs; j++) {
1434 printf (
"\tiface %d %s\n", j,
totemip_print (&list[i].addr[j]));
1435 printf (
"\tfamily %d\n", list[i].addr[j].
family);
1443 assert (instance != NULL);
1447 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr)
1449 assert (instance != NULL);
1461 timer_function_token_retransmit_timeout,
1473 timer_function_merge_detect_timeout,
1499 "Saving state aru %x high seq received %x",
1509 "Restoring instance->my_aru %x my high seq received %x",
1516 "Resetting old ring state");
1527 timer_function_pause_timeout,
1537 timer_function_orf_token_timeout,
1547 timer_function_heartbeat_timeout,
1560 static void cancel_token_retransmit_timeout (
struct totemsrp_instance *instance)
1565 static void start_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1571 timer_function_token_hold_retransmit_timeout,
1575 static void cancel_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1581 static void memb_state_consensus_timeout_expired (
1585 int no_consensus_list_entries;
1588 if (memb_consensus_agreed (instance)) {
1589 memb_consensus_reset (instance);
1591 memb_consensus_set (instance, &instance->
my_id);
1593 reset_token_timeout (instance);
1595 memb_consensus_notset (
1598 &no_consensus_list_entries,
1602 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1604 memb_state_gather_enter (instance, 0);
1615 static void timer_function_pause_timeout (
void *data)
1620 reset_pause_timeout (instance);
1625 old_ring_state_restore (instance);
1626 memb_state_gather_enter (instance, 5);
1630 static void timer_function_orf_token_timeout (
void *data)
1637 "The token was lost in the OPERATIONAL state.");
1639 "A processor failed, forming new configuration.");
1641 memb_state_gather_enter (instance, 2);
1647 "The consensus timeout expired.");
1648 memb_state_consensus_timeout_expired (instance);
1649 memb_state_gather_enter (instance, 3);
1655 "The token was lost in the COMMIT state.");
1656 memb_state_gather_enter (instance, 4);
1662 "The token was lost in the RECOVERY state.");
1663 memb_recovery_state_token_loss (instance);
1669 static void timer_function_heartbeat_timeout (
void *data)
1673 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->
memb_state);
1674 timer_function_orf_token_timeout(data);
1677 static void memb_timer_function_state_gather (
void *data)
1688 memb_join_message_send (instance);
1699 memb_timer_function_state_gather,
1705 static void memb_timer_function_gather_consensus_timeout (
void *data)
1708 memb_state_consensus_timeout_expired (instance);
1711 static void deliver_messages_from_recovery_to_regular (
struct totemsrp_instance *instance)
1716 unsigned int range = 0;
1729 for (i = 1; i <= range; i++) {
1735 recovery_message_item = ptr;
1740 mcast = recovery_message_item->
mcast;
1746 regular_message_item.mcast =
1747 (
struct mcast *)(((
char *)recovery_message_item->
mcast) +
sizeof (
struct mcast));
1748 regular_message_item.msg_len =
1749 recovery_message_item->
msg_len -
sizeof (
struct mcast);
1750 mcast = regular_message_item.mcast;
1759 "comparing if ring id is for this processors old ring seqno %d",
1773 ®ular_message_item, mcast->
seq);
1780 "-not adding msg with seq no %x", mcast->
seq);
1791 int joined_list_entries = 0;
1792 unsigned int aru_save;
1799 char left_node_msg[1024];
1800 char joined_node_msg[1024];
1802 memb_consensus_reset (instance);
1804 old_ring_state_reset (instance);
1806 deliver_messages_from_recovery_to_regular (instance);
1809 "Delivering to app %x to %x",
1812 aru_save = instance->
my_aru;
1825 memb_set_subtract (joined_list, &joined_list_entries,
1843 srp_addr_to_nodeid (trans_memb_list_totemip,
1856 instance->
my_aru = aru_save;
1861 srp_addr_to_nodeid (new_memb_list_totemip,
1863 srp_addr_to_nodeid (joined_list_totemip, joined_list,
1864 joined_list_entries);
1868 joined_list_totemip, joined_list_entries, &instance->
my_ring_id);
1930 regular_message = ptr;
1931 free (regular_message->
mcast);
1937 if (joined_list_entries) {
1939 sptr += snprintf(joined_node_msg,
sizeof(joined_node_msg)-sptr,
" joined:");
1940 for (i=0; i< joined_list_entries; i++) {
1941 sptr += snprintf(joined_node_msg+sptr,
sizeof(joined_node_msg)-sptr,
" %d", joined_list_totemip[i]);
1945 joined_node_msg[0] =
'\0';
1950 sptr += snprintf(left_node_msg,
sizeof(left_node_msg)-sptr,
" left:");
1952 sptr += snprintf(left_node_msg+sptr,
sizeof(left_node_msg)-sptr,
" %d", left_list[i]);
1956 left_node_msg[0] =
'\0';
1960 "entering OPERATIONAL state.");
1962 "A new membership (%s:%lld) was formed. Members%s%s",
1974 reset_pause_timeout (instance);
1987 static void memb_state_gather_enter (
1994 &instance->
my_id, 1,
1997 memb_join_message_send (instance);
2008 memb_timer_function_state_gather,
2021 memb_timer_function_gather_consensus_timeout,
2027 cancel_token_retransmit_timeout (instance);
2028 cancel_token_timeout (instance);
2029 cancel_merge_detect_timeout (instance);
2031 memb_consensus_reset (instance);
2033 memb_consensus_set (instance, &instance->
my_id);
2036 "entering GATHER state from %d.", gather_from);
2041 if (gather_from == 3) {
2051 static void timer_function_token_retransmit_timeout (
void *data);
2053 static void target_set_completed (
2058 memb_state_commit_token_send (instance);
2062 static void memb_state_commit_enter (
2065 old_ring_state_save (instance);
2067 memb_state_commit_token_update (instance);
2069 memb_state_commit_token_target_set (instance);
2084 "entering COMMIT state.");
2087 reset_token_retransmit_timeout (instance);
2088 reset_token_timeout (instance);
2104 static void memb_state_recovery_enter (
2109 int local_received_flg = 1;
2110 unsigned int low_ring_aru;
2111 unsigned int range = 0;
2112 unsigned int messages_originated = 0;
2121 "entering RECOVERY state.");
2132 memb_state_commit_token_send_recovery (instance, commit_token);
2147 memcpy (&my_new_memb_ring_id_list[i],
2148 &memb_list[i].ring_id,
2151 memb_set_and_with_ring_id (
2153 my_new_memb_ring_id_list,
2167 "position [%d] member %s:", i,
totemip_print (&addr[i].addr[0]));
2169 "previous ring seq %llx rep %s",
2174 "aru %x high delivered %x received flag %d",
2192 local_received_flg = 0;
2196 if (local_received_flg == 1) {
2212 if (sq_lt_compare (memb_list[i].
aru, low_ring_aru)) {
2214 low_ring_aru = memb_list[i].
aru;
2235 "copying all old ring messages from %x-%x.",
2238 for (i = 1; i <= range; i++) {
2245 low_ring_aru + i, &ptr);
2249 sort_queue_item = ptr;
2250 messages_originated++;
2265 sort_queue_item->
mcast,
2270 "Originated %d messages in RECOVERY.", messages_originated);
2275 "Did not need to originate any messages in recovery.");
2285 reset_token_timeout (instance);
2286 reset_token_retransmit_timeout (instance);
2299 token_hold_cancel_send (instance);
2306 struct iovec *iovec,
2307 unsigned int iov_len,
2314 unsigned int addr_idx;
2323 if (cs_queue_is_full (queue_use)) {
2328 memset (&message_item, 0,
sizeof (
struct message_item));
2333 message_item.
mcast = totemsrp_buffer_alloc (instance);
2334 if (message_item.
mcast == 0) {
2341 memset(message_item.
mcast, 0, sizeof (
struct mcast));
2351 addr = (
char *)message_item.
mcast;
2352 addr_idx = sizeof (
struct mcast);
2353 for (i = 0; i < iov_len; i++) {
2354 memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2355 addr_idx += iovec[i].iov_len;
2358 message_item.
msg_len = addr_idx;
2362 cs_queue_item_add (queue_use, &message_item);
2384 cs_queue_avail (queue_use, &avail);
2395 static int orf_token_remcast (
2399 struct sort_queue_item *sort_queue_item;
2403 struct sq *sort_queue;
2411 res = sq_in_range (sort_queue, seq);
2420 res = sq_item_get (sort_queue, seq, &ptr);
2425 sort_queue_item = ptr;
2429 sort_queue_item->
mcast,
2439 static void messages_free (
2441 unsigned int token_aru)
2443 struct sort_queue_item *regular_message;
2446 int log_release = 0;
2447 unsigned int release_to;
2448 unsigned int range = 0;
2450 release_to = token_aru;
2451 if (sq_lt_compare (instance->
my_last_aru, release_to)) {
2471 for (i = 1; i <= range; i++) {
2477 regular_message = ptr;
2478 totemsrp_buffer_release (instance, regular_message->
mcast);
2489 "releasing messages up to and including %x", release_to);
2493 static void update_aru (
2498 struct sq *sort_queue;
2500 unsigned int my_aru_saved = 0;
2510 my_aru_saved = instance->
my_aru;
2511 for (i = 1; i <= range; i++) {
2515 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2523 instance->
my_aru += i - 1;
2529 static int orf_token_mcast (
2532 int fcc_mcasts_allowed)
2536 struct sq *sort_queue;
2537 struct sort_queue_item sort_queue_item;
2538 struct mcast *mcast;
2539 unsigned int fcc_mcast_current;
2544 reset_token_retransmit_timeout (instance);
2555 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2556 if (cs_queue_is_empty (mcast_queue)) {
2559 message_item = (
struct message_item *)cs_queue_item_get (mcast_queue);
2567 memset (&sort_queue_item, 0,
sizeof (
struct sort_queue_item));
2571 mcast = sort_queue_item.
mcast;
2578 sq_item_add (sort_queue, &sort_queue_item, message_item->
mcast->
seq);
2582 message_item->
mcast,
2588 cs_queue_item_remove (mcast_queue);
2596 update_aru (instance);
2601 return (fcc_mcast_current);
2608 static int orf_token_rtr (
2611 unsigned int *fcc_allowed)
2616 struct sq *sort_queue;
2618 unsigned int range = 0;
2619 char retransmit_msg[1024];
2628 rtr_list = &orf_token->
rtr_list[0];
2630 strcpy (retransmit_msg,
"Retransmit List: ");
2635 sprintf (value,
"%x ", rtr_list[i].seq);
2636 strcat (retransmit_msg, value);
2638 strcat (retransmit_msg,
"");
2640 "%s", retransmit_msg);
2653 if (memcmp (&rtr_list[i].ring_id, &instance->
my_ring_id,
2660 res = orf_token_remcast (instance, rtr_list[i].seq);
2667 memmove (&rtr_list[i], &rtr_list[i + 1],
2683 range = orf_token->
seq - instance->
my_aru;
2687 (i <= range); i++) {
2692 res = sq_in_range (sort_queue, instance->
my_aru + i);
2700 res = sq_item_inuse (sort_queue, instance->
my_aru + i);
2711 res = sq_item_miss_count (sort_queue, instance->
my_aru + i);
2721 if (instance->
my_aru + i == rtr_list[j].
seq) {
2751 static void timer_function_token_retransmit_timeout (
void *data)
2761 token_retransmit (instance);
2762 reset_token_retransmit_timeout (instance);
2767 static void timer_function_token_hold_retransmit_timeout (
void *data)
2778 token_retransmit (instance);
2783 static void timer_function_merge_detect_timeout(
void *data)
2792 memb_merge_detect_transmit (instance);
2805 static int token_send (
2807 struct orf_token *orf_token,
2811 unsigned int orf_token_size;
2813 orf_token_size =
sizeof (
struct orf_token) +
2814 (orf_token->rtr_list_entries *
sizeof (
struct rtr_item));
2821 if (forward_token == 0) {
2865 struct orf_token orf_token;
2897 res = token_send (instance, &orf_token, 1);
2902 static void memb_state_commit_token_update (
2907 unsigned int high_aru;
2943 if (sq_lt_compare (high_aru, memb_list[i].aru)) {
2944 high_aru = memb_list[i].
aru;
2954 if (sq_lt_compare (memb_list[i].aru, high_aru)) {
2969 static void memb_state_commit_token_target_set (
2986 static int memb_state_commit_token_send_recovery (
2990 unsigned int commit_token_size;
3012 reset_token_retransmit_timeout (instance);
3016 static int memb_state_commit_token_send (
3019 unsigned int commit_token_size;
3041 reset_token_retransmit_timeout (instance);
3049 int token_memb_entries = 0;
3053 memb_set_subtract (token_memb, &token_memb_entries,
3061 lowest_addr = &token_memb[0].
addr[0];
3062 for (i = 1; i < token_memb_entries; i++) {
3070 static int srp_addr_compare (
const void *a,
const void *b)
3078 static void memb_state_commit_token_create (
3084 int token_memb_entries = 0;
3087 "Creating commit token because I am the rep.");
3089 memb_set_subtract (token_memb, &token_memb_entries,
3108 qsort (token_memb, token_memb_entries,
sizeof (
struct srp_addr),
3117 memcpy (addr, token_memb,
3118 token_memb_entries *
sizeof (
struct srp_addr));
3119 memset (memb_list, 0,
3125 char memb_join_data[40000];
3128 unsigned int addr_idx;
3145 addr = (
char *)memb_join;
3146 addr_idx =
sizeof (
struct memb_join);
3147 memcpy (&addr[addr_idx],
3154 memcpy (&addr[addr_idx],
3177 char memb_join_data[40000];
3178 struct memb_join *memb_join = (
struct memb_join *)memb_join_data;
3180 unsigned int addr_idx;
3181 int active_memb_entries;
3185 "sending join/leave message");
3192 &instance->
my_id, 1,
3195 memb_set_subtract (active_memb, &active_memb_entries,
3197 &instance->
my_id, 1);
3216 addr = (
char *)memb_join;
3217 addr_idx =
sizeof (
struct memb_join);
3218 memcpy (&addr[addr_idx],
3220 active_memb_entries *
3223 active_memb_entries *
3225 memcpy (&addr[addr_idx],
3264 static void memb_ring_id_create_or_load (
3270 char filename[PATH_MAX];
3272 snprintf (filename,
sizeof(filename),
"%s/ringid_%s",
3274 fd = open (filename, O_RDONLY, 0700);
3279 res = read (fd, &memb_ring_id->
seq, sizeof (uint64_t));
3285 if ((fd == -1) || (res !=
sizeof (uint64_t))) {
3286 memb_ring_id->
seq = 0;
3288 fd = open (filename, O_CREAT|O_RDWR, 0700);
3290 res = write (fd, &memb_ring_id->
seq, sizeof (uint64_t));
3294 "Couldn't write ringid file '%s'", filename);
3298 "Couldn't create ringid file '%s'", filename);
3303 assert (!totemip_zero_check(&memb_ring_id->
rep));
3307 static void memb_ring_id_set_and_store (
3309 const struct memb_ring_id *ring_id)
3315 memcpy (&instance->
my_ring_id, ring_id, sizeof (
struct memb_ring_id));
3317 snprintf (filename,
sizeof(filename),
"%s/ringid_%s",
3320 fd = open (filename, O_WRONLY, 0777);
3322 fd = open (filename, O_CREAT|O_RDWR, 0777);
3326 "Couldn't store new ring id %llx to stable storage",
3332 "Storing new sequence id for ring %llx", instance->
my_ring_id.
seq);
3334 res = write (fd, &instance->
my_ring_id.
seq, sizeof (
unsigned long long));
3335 assert (res ==
sizeof (
unsigned long long));
3350 token_hold_cancel_send (instance);
3353 if (callback_handle == 0) {
3356 *handle_out = (
void *)callback_handle;
3357 list_init (&callback_handle->
list);
3359 callback_handle->
data = (
void *) data;
3361 callback_handle->
delete =
delete;
3380 list_del (&h->
list);
3387 static void token_callbacks_execute (
3393 struct list_head *callback_listhead = 0;
3409 for (list = callback_listhead->
next; list != callback_listhead;
3412 token_callback_instance =
list_entry (list,
struct token_callback_instance, list);
3414 list_next = list->
next;
3415 del = token_callback_instance->
delete;
3422 token_callback_instance->
data);
3426 if (res == -1 && del == 1) {
3427 list_add (list, callback_listhead);
3429 free (token_callback_instance);
3453 if (queue_use != NULL) {
3454 backlog = cs_queue_used (queue_use);
3461 static int fcc_calculate (
3463 struct orf_token *token)
3465 unsigned int transmits_allowed;
3466 unsigned int backlog_calc;
3474 instance->
my_cbl = backlog_get (instance);
3483 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3484 transmits_allowed = backlog_calc;
3488 return (transmits_allowed);
3494 static void fcc_rtr_limit (
3496 struct orf_token *token,
3497 unsigned int *transmits_allowed)
3501 assert (check >= 0);
3508 *transmits_allowed = 0;
3512 static void fcc_token_update (
3514 struct orf_token *token,
3515 unsigned int msgs_transmitted)
3517 token->
fcc += msgs_transmitted - instance->
my_trc;
3519 instance->
my_trc = msgs_transmitted;
3531 static int message_handler_orf_token (
3535 int endian_conversion_needed)
3537 char token_storage[1500];
3538 char token_convert[1500];
3539 struct orf_token *token = NULL;
3541 unsigned int transmits_allowed;
3542 unsigned int mcasted_retransmit;
3543 unsigned int mcasted_regular;
3544 unsigned int last_aru;
3547 unsigned long long tv_current;
3548 unsigned long long tv_diff;
3550 tv_current = qb_util_nano_current_get ();
3551 tv_diff = tv_current -
tv_old;
3552 tv_old = tv_current;
3555 "Time since last token %0.4f ms", ((
float)tv_diff) / 1000000.0);
3561 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3562 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3567 if (endian_conversion_needed) {
3568 orf_token_endian_convert ((
struct orf_token *)msg,
3569 (
struct orf_token *)token_convert);
3570 msg = (
struct orf_token *)token_convert;
3577 token = (
struct orf_token *)token_storage;
3578 memcpy (token, msg,
sizeof (
struct orf_token));
3579 memcpy (&token->
rtr_list[0], (
char *)msg + sizeof (
struct orf_token),
3587 start_merge_detect_timeout (instance);
3590 cancel_merge_detect_timeout (instance);
3591 cancel_token_hold_retransmit_timeout (instance);
3597 #ifdef TEST_RECOVERY_MSG_COUNT
3637 messages_free (instance, token->
aru);
3652 sizeof (
struct memb_ring_id)) != 0) {
3656 reset_heartbeat_timeout(instance);
3659 cancel_heartbeat_timeout(instance);
3674 transmits_allowed = fcc_calculate (instance, token);
3675 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3677 fcc_rtr_limit (instance, token, &transmits_allowed);
3678 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3685 fcc_token_update (instance, token, mcasted_retransmit +
3688 if (sq_lt_compare (instance->
my_aru, token->
aru) ||
3693 if (token->
aru == token->
seq) {
3699 if (token->
aru == last_aru && token->
aru_addr != 0) {
3714 "FAILED TO RECEIVE");
3718 memb_set_merge (&instance->
my_id, 1,
3722 memb_state_gather_enter (instance, 6);
3745 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
3758 "install seq %x aru %x high seq received %x",
3776 "retrans flag count %x token aru %x install seq %x aru %x %x",
3780 memb_state_operational_enter (instance);
3787 token_send (instance, token, forward_token);
3790 tv_current = qb_util_nano_current_get ();
3791 tv_diff = tv_current -
tv_old;
3792 tv_old = tv_current;
3795 ((
float)tv_diff) / 1000000.0);
3798 messages_deliver_to_app (instance, 0,
3806 reset_token_timeout (instance);
3807 reset_token_retransmit_timeout (instance);
3811 start_token_hold_retransmit_timeout (instance);
3821 reset_heartbeat_timeout(instance);
3824 cancel_heartbeat_timeout(instance);
3830 static void messages_deliver_to_app (
3833 unsigned int end_point)
3835 struct sort_queue_item *sort_queue_item_p;
3838 struct mcast *mcast_in;
3839 struct mcast mcast_header;
3840 unsigned int range = 0;
3841 int endian_conversion_required;
3842 unsigned int my_high_delivered_stored = 0;
3858 for (i = 1; i <= range; i++) {
3866 my_high_delivered_stored + i);
3872 my_high_delivered_stored + i, &ptr);
3876 if (res != 0 && skip == 0) {
3887 sort_queue_item_p = ptr;
3889 mcast_in = sort_queue_item_p->
mcast;
3890 assert (mcast_in != (
struct mcast *)0xdeadbeef);
3892 endian_conversion_required = 0;
3894 endian_conversion_required = 1;
3895 mcast_endian_convert (mcast_in, &mcast_header);
3897 memcpy (&mcast_header, mcast_in,
sizeof (
struct mcast));
3904 memb_set_subset (&mcast_header.system_from,
3918 "Delivering MCAST message with seq %x to pending delivery queue",
3925 mcast_header.header.nodeid,
3926 ((
char *)sort_queue_item_p->
mcast) + sizeof (
struct mcast),
3927 sort_queue_item_p->
msg_len - sizeof (
struct mcast),
3928 endian_conversion_required);
3935 static int message_handler_mcast (
3939 int endian_conversion_needed)
3941 struct sort_queue_item sort_queue_item;
3942 struct sq *sort_queue;
3943 struct mcast mcast_header;
3946 if (endian_conversion_needed) {
3947 mcast_endian_convert (msg, &mcast_header);
3949 memcpy (&mcast_header, msg,
sizeof (
struct mcast));
3960 #ifdef TEST_DROP_MCAST_PERCENTAGE
3961 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
3969 if (memcmp (&instance->
my_ring_id, &mcast_header.ring_id,
3970 sizeof (
struct memb_ring_id)) != 0) {
3975 &mcast_header.system_from, 1,
3977 memb_state_gather_enter (instance, 7);
3981 if (!memb_set_subset (
3982 &mcast_header.system_from,
3987 memb_set_merge (&mcast_header.system_from, 1,
3989 memb_state_gather_enter (instance, 8);
4008 "Received ringid(%s:%lld) seq %x",
4010 mcast_header.ring_id.seq,
4018 sq_in_range (sort_queue, mcast_header.seq) &&
4019 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4025 sort_queue_item.
mcast = totemsrp_buffer_alloc (instance);
4026 if (sort_queue_item.
mcast == NULL) {
4029 memcpy (sort_queue_item.
mcast, msg, msg_len);
4030 sort_queue_item.
msg_len = msg_len;
4033 mcast_header.seq)) {
4037 sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4040 update_aru (instance);
4049 static int message_handler_memb_merge_detect (
4053 int endian_conversion_needed)
4058 if (endian_conversion_needed) {
4069 sizeof (
struct memb_ring_id)) == 0) {
4081 memb_state_gather_enter (instance, 9);
4085 if (!memb_set_subset (
4093 memb_state_gather_enter (instance, 10);
4109 static void memb_join_process (
4111 const struct memb_join *memb_join)
4115 int gather_entered = 0;
4116 int fail_minus_memb_entries = 0;
4129 if (memb_set_equal (proc_list,
4134 memb_set_equal (failed_list,
4139 memb_consensus_set (instance, &memb_join->
system_from);
4141 if (memb_consensus_agreed (instance) && instance->
failed_to_recv == 1) {
4148 memb_state_commit_token_create (instance);
4150 memb_state_commit_enter (instance);
4153 if (memb_consensus_agreed (instance) &&
4154 memb_lowest_in_config (instance)) {
4156 memb_state_commit_token_create (instance);
4158 memb_state_commit_enter (instance);
4163 if (memb_set_subset (proc_list,
4168 memb_set_subset (failed_list,
4180 memb_set_merge (proc_list,
4184 if (memb_set_subset (
4185 &instance->
my_id, 1,
4192 if (memb_set_subset (
4197 if (memb_set_subset (
4202 memb_set_merge (failed_list,
4206 memb_set_subtract (fail_minus_memb,
4207 &fail_minus_memb_entries,
4213 memb_set_merge (fail_minus_memb,
4214 fail_minus_memb_entries,
4220 memb_state_gather_enter (instance, 11);
4225 if (gather_entered == 0 &&
4228 memb_state_gather_enter (instance, 12);
4232 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out)
4254 srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4257 srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4282 srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4287 if (in_memb_list[i].ring_id.
rep.
family != 0) {
4293 out_memb_list[i].
aru =
swab32 (in_memb_list[i].aru);
4300 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out)
4324 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out)
4340 static void memb_merge_detect_endian_convert (
4352 static int message_handler_memb_join (
4356 int endian_conversion_needed)
4358 const struct memb_join *memb_join;
4359 struct memb_join *memb_join_convert = alloca (msg_len);
4361 if (endian_conversion_needed) {
4362 memb_join = memb_join_convert;
4363 memb_join_endian_convert (msg, memb_join_convert);
4373 if (pause_flush (instance)) {
4382 memb_join_process (instance, memb_join);
4386 memb_join_process (instance, memb_join);
4397 memb_join_process (instance, memb_join);
4398 memb_state_gather_enter (instance, 13);
4410 memb_join_process (instance, memb_join);
4411 memb_recovery_state_token_loss (instance);
4412 memb_state_gather_enter (instance, 14);
4419 static int message_handler_memb_commit_token (
4423 int endian_conversion_needed)
4433 "got commit token");
4435 if (endian_conversion_needed) {
4436 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4438 memcpy (memb_commit_token_convert, msg, msg_len);
4440 memb_commit_token = memb_commit_token_convert;
4443 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4444 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4454 memb_set_subtract (sub, &sub_entries,
4458 if (memb_set_equal (addr,
4464 memcpy (instance->
commit_token, memb_commit_token, msg_len);
4465 memb_state_commit_enter (instance);
4479 memb_state_recovery_enter (instance, memb_commit_token);
4486 "Sending initial ORF token");
4489 orf_token_send_initial (instance);
4490 reset_token_timeout (instance);
4491 reset_token_retransmit_timeout (instance);
4498 static int message_handler_token_hold_cancel (
4502 int endian_conversion_needed)
4507 sizeof (
struct memb_ring_id)) == 0) {
4511 timer_function_token_retransmit_timeout (instance);
4520 unsigned int msg_len)
4525 if (msg_len <
sizeof (
struct message_header)) {
4527 "Received message is too short... ignoring %u.",
4528 (
unsigned int)msg_len);
4532 switch (message_header->
type) {
4553 printf (
"wrong message type\n");
4570 unsigned int iface_no)
4581 memb_ring_id_create_or_load (instance, &instance->
my_ring_id);
4584 "Created or loaded sequence id %llx.%s for this ring.",
4601 memb_state_gather_enter (instance, 15);
4606 totem_config->
net_mtu -=
sizeof (
struct mcast);
4611 void (*totem_service_ready) (
void))
int totemrrp_iface_check(void *rrp_context)
void(*) in log_level_security)
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
void totemip_copy_endian_convert(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
struct srp_addr system_from
struct memb_ring_id ring_id
uint32_t waiting_trans_ack
struct srp_addr system_from
struct memb_ring_id ring_id
int totemsrp_log_level_debug
struct memb_ring_id my_ring_id
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
void(* totemsrp_service_ready_fn)(void)
struct message_header header
unsigned int old_ring_state_high_seq_received
unsigned int proc_list_entries
struct totem_interface * interfaces
unsigned int interface_count
int totemsrp_my_family_get(void *srp_context)
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
const char * totemip_print(const struct totem_ip_address *addr)
unsigned char addr[TOTEMIP_ADDRLEN]
int totemsrp_log_level_error
#define LEAVE_DUMMY_NODEID
struct memb_ring_id ring_id
qb_loop_timer_handle timer_heartbeat_timeout
unsigned int failed_list_entries
unsigned long long int tv_old
#define SEQNO_START_TOKEN
unsigned int token_hold_timeout
struct memb_ring_id ring_id
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
int totemip_compare(const void *a, const void *b)
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int ring_no)
void * token_sent_event_handle
struct srp_addr system_from
int totemsrp_log_level_notice
unsigned int totemsrp_my_nodeid_get(void *srp_context)
char rrp_mode[TOTEM_RRP_MODE_BYTES]
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
int totemsrp_log_level_warning
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
uint64_t memb_merge_detect_rx
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
struct cs_queue new_message_queue_trans
struct message_header header
unsigned char end_of_commit_token[0]
unsigned char addr[TOTEMIP_ADDRLEN]
char commit_token_storage[40000]
unsigned int rrp_problem_count_timeout
struct list_head token_callback_sent_listhead
struct cs_queue new_message_queue
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
uint64_t gather_token_lost
int totemsrp_log_level_trace
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
int totemrrp_ifaces_get(void *rrp_context, char ***status, unsigned int *iface_count)
struct memb_ring_id my_old_ring_id
void * totemrrp_buffer_alloc(void *rrp_context)
unsigned int downcheck_timeout
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
uint64_t memb_commit_token_tx
int my_deliver_memb_entries
unsigned int max_network_delay
unsigned int heartbeat_failures_allowed
#define TOTEM_TOKEN_STATS_MAX
unsigned long long token_ring_id_seq
struct totem_ip_address mcast_address
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
unsigned int send_join_timeout
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
unsigned int rrp_problem_count_threshold
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
uint64_t operational_entered
unsigned long long ring_seq
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
uint64_t operational_token_lost
unsigned int received_flg
uint64_t consensus_timeouts
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
struct message_handlers totemsrp_message_handlers
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
uint64_t recovery_token_lost
unsigned int token_retransmits_before_loss_const
unsigned char end_of_memb_join[0]
struct message_header header
int totemrrp_finalize(void *rrp_context)
struct list_head token_callback_received_listhead
int totemrrp_member_remove(void *rrp_context, const struct totem_ip_address *member, int iface_no)
struct rtr_item rtr_list[0]
int totemsrp_ring_reenable(void *srp_context)
struct memb_ring_id ring_id
unsigned int seqno_unchanged_const
uint64_t commit_token_lost
unsigned int miss_count_const
int totemrrp_crypto_set(void *rrp_context, const char *cipher_type, const char *hash_type)
uint64_t token_hold_cancel_rx
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
unsigned int join_timeout
int totemrrp_send_flush(void *rrp_context)
struct message_header header
struct totem_ip_address mcast_addr
#define MESSAGE_QUEUE_MAX
int totemrrp_member_add(void *rrp_context, const struct totem_ip_address *member, int iface_no)
unsigned int received_flg
struct totem_ip_address rep
unsigned int last_released
int orf_token_retransmit_size
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
unsigned int rrp_autorecovery_check_timeout
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
unsigned int fail_to_recv_const
void * token_recv_event_handle
struct totem_ip_address boundto
unsigned int my_high_seq_received
int totemrrp_initialize(qb_loop_t *poll_handle, void **rrp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_addr, unsigned int iface_no), void(*token_seqid_get)(const void *msg, unsigned int *seqid, unsigned int *token_is), unsigned int(*msgs_missing)(void), void(*target_set_completed)(void *context))
Create an instance.
qb_loop_t * totemsrp_poll_handle
qb_loop_timer_handle timer_pause_timeout
qb_loop_timer_handle timer_merge_detect_timeout
int my_merge_detect_timeout_outstanding
int totemsrp_log_level_security
qb_loop_timer_handle timer_orf_token_retransmit_timeout
struct totem_config * totem_config
qb_loop_timer_handle timer_orf_token_timeout
uint32_t continuous_gather
void totemsrp_threaded_mode_enable(void *context)
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totemmrp_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
int totemrrp_recv_flush(void *rrp_context)
uint32_t orf_token_discard
int my_failed_list_entries
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
uint64_t token_hold_cancel_tx
unsigned int token_timeout
unsigned int high_delivered
unsigned int consensus_timeout
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len)
#define PROCESSOR_COUNT_MAX
void totemrrp_buffer_release(void *rrp_context, void *ptr)
Totem Network interface - also does encryption/decryption.
char orf_token_retransmit[TOKEN_SIZE_MAX]
struct message_header header
struct sq regular_sort_queue
void totemsrp_finalize(void *srp_context)
void(* totemsrp_log_printf)(int level, int sybsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
#define QUEUE_RTR_ITEMS_SIZE_MAX
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
struct cs_queue retrans_message_queue
qb_loop_timer_handle memb_timer_state_gather_join_timeout
int my_trans_memb_entries
uint64_t memb_merge_detect_tx
unsigned int high_delivered
struct rtr_item rtr_list[0]
int consensus_list_entries
unsigned int rrp_problem_count_mcast_threshold
int totemrrp_processor_count_set(void *rrp_context, unsigned int processor_count)
void(*) enum memb_stat memb_state)
int totemrrp_mcast_noflush_send(void *rrp_context, const void *msg, unsigned int msg_len)
uint32_t threaded_mode_enabled
enum totem_callback_token_type callback_type
int totemrrp_mcast_recv_empty(void *rrp_context)
#define list_entry(ptr, type, member)
#define LOGSYS_PERROR(err_num, level, fmt, args...)
struct totem_logging_configuration totem_logging_configuration
int totemrrp_mcast_flush_send(void *rrp_context, const void *msg, unsigned int msg_len)
struct memb_ring_id ring_id
#define log_printf(level, format, args...)
void totemsrp_trans_ack(void *context)
unsigned int max_messages
uint64_t recovery_entered
qb_loop_timer_handle memb_timer_state_commit_timeout
struct memb_commit_token * commit_token
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
unsigned int merge_timeout
unsigned int use_heartbeat
struct message_header header
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int ring_no)
unsigned int token_retransmit_timeout
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
#define RETRANSMIT_ENTRIES_MAX
int totemip_equal(const struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
unsigned int my_token_seq
struct memb_ring_id ring_id
int totemrrp_ring_reenable(void *rrp_context, unsigned int iface_no)
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
struct totem_ip_address addr[INTERFACE_MAX]
unsigned int rrp_token_expired_timeout
struct memb_ring_id ring_id
unsigned int my_install_seq
int totemrrp_token_send(void *rrp_context, const void *msg, unsigned int msg_len)
struct sq recovery_sort_queue
int totemrrp_token_target_set(void *rrp_context, struct totem_ip_address *addr, unsigned int iface_no)
totem_callback_token_type
int(* callback_fn)(enum totem_callback_token_type type, const void *)
unsigned int my_high_ring_delivered