41 #include <sys/types.h>
42 #include <sys/socket.h>
44 #include <sys/ioctl.h>
45 #include <netinet/in.h>
54 #include <arpa/inet.h>
59 #include <qb/qbipc_common.h>
69 #define MAP_ANONYMOUS MAP_ANON
76 #define GROUP_HASH_SIZE 32
148 static struct list_head downlist_messages_head;
149 static struct list_head joinlist_messages_head;
178 static unsigned int my_member_list_entries;
182 static unsigned int my_old_member_list_entries = 0;
208 static int cpg_lib_init_fn (
void *conn);
210 static int cpg_lib_exit_fn (
void *conn);
212 static void message_handler_req_exec_cpg_procjoin (
216 static void message_handler_req_exec_cpg_procleave (
220 static void message_handler_req_exec_cpg_joinlist (
224 static void message_handler_req_exec_cpg_mcast (
228 static void message_handler_req_exec_cpg_partial_mcast (
232 static void message_handler_req_exec_cpg_downlist_old (
236 static void message_handler_req_exec_cpg_downlist (
240 static void exec_cpg_procjoin_endian_convert (
void *msg);
242 static void exec_cpg_joinlist_endian_convert (
void *msg);
244 static void exec_cpg_mcast_endian_convert (
void *msg);
246 static void exec_cpg_partial_mcast_endian_convert (
void *msg);
248 static void exec_cpg_downlist_endian_convert_old (
void *msg);
250 static void exec_cpg_downlist_endian_convert (
void *msg);
252 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message);
254 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message);
256 static void message_handler_req_lib_cpg_finalize (
void *conn,
const void *message);
258 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message);
260 static void message_handler_req_lib_cpg_partial_mcast (
void *conn,
const void *message);
262 static void message_handler_req_lib_cpg_membership (
void *conn,
263 const void *message);
265 static void message_handler_req_lib_cpg_local_get (
void *conn,
266 const void *message);
268 static void message_handler_req_lib_cpg_iteration_initialize (
270 const void *message);
272 static void message_handler_req_lib_cpg_iteration_next (
274 const void *message);
276 static void message_handler_req_lib_cpg_iteration_finalize (
278 const void *message);
280 static void message_handler_req_lib_cpg_zc_alloc (
282 const void *message);
284 static void message_handler_req_lib_cpg_zc_free (
286 const void *message);
288 static void message_handler_req_lib_cpg_zc_execute (
290 const void *message);
292 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason);
294 static int cpg_exec_send_downlist(
void);
296 static int cpg_exec_send_joinlist(
void);
298 static void downlist_messages_delete (
void);
300 static void downlist_master_choose_and_send (
void);
302 static void joinlist_inform_clients (
void);
304 static void joinlist_messages_delete (
void);
306 static void cpg_sync_init (
307 const unsigned int *trans_list,
308 size_t trans_list_entries,
309 const unsigned int *member_list,
310 size_t member_list_entries,
313 static int cpg_sync_process (
void);
315 static void cpg_sync_activate (
void);
317 static void cpg_sync_abort (
void);
319 static void do_proc_join(
325 static void do_proc_leave(
331 static int notify_lib_totem_membership (
333 int member_list_entries,
334 const unsigned int *member_list);
336 static inline int zcb_all_free (
339 static char *cpg_print_group_name (
352 .lib_handler_fn = message_handler_req_lib_cpg_leave,
356 .lib_handler_fn = message_handler_req_lib_cpg_mcast,
360 .lib_handler_fn = message_handler_req_lib_cpg_membership,
364 .lib_handler_fn = message_handler_req_lib_cpg_local_get,
368 .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
372 .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
376 .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
380 .lib_handler_fn = message_handler_req_lib_cpg_finalize,
384 .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
388 .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
392 .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
396 .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
406 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
409 .exec_handler_fn = message_handler_req_exec_cpg_procleave,
410 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
413 .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
414 .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
417 .exec_handler_fn = message_handler_req_exec_cpg_mcast,
418 .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
421 .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
422 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
425 .exec_handler_fn = message_handler_req_exec_cpg_downlist,
426 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
429 .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
430 .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
435 .
name =
"corosync cluster closed process group service v1.01",
438 .private_data_size =
sizeof (
struct cpg_pd),
441 .lib_init_fn = cpg_lib_init_fn,
442 .lib_exit_fn = cpg_lib_exit_fn,
443 .lib_engine = cpg_lib_engine,
445 .exec_init_fn = cpg_exec_init_fn,
446 .exec_dump_fn = NULL,
447 .exec_engine = cpg_exec_engine,
449 .sync_init = cpg_sync_init,
450 .sync_process = cpg_sync_process,
451 .sync_activate = cpg_sync_activate,
452 .sync_abort = cpg_sync_abort
457 return (&cpg_service_engine);
461 struct qb_ipc_request_header header __attribute__((aligned(8)));
468 struct qb_ipc_request_header header __attribute__((aligned(8)));
477 struct qb_ipc_request_header header __attribute__((aligned(8)));
488 struct qb_ipc_request_header header __attribute__((aligned(8)));
494 struct qb_ipc_request_header header __attribute__((aligned(8)));
529 for (i = 0; i < group->length; i++) {
532 if (c >=
' ' && c < 0x7f && c !=
'\\') {
536 res[dest_pos++] =
'\\';
537 res[dest_pos++] =
'\\';
539 snprintf(res + dest_pos,
sizeof(res) - dest_pos,
"\\x%02X", c);
549 static void cpg_sync_init (
550 const unsigned int *trans_list,
551 size_t trans_list_entries,
552 const unsigned int *member_list,
553 size_t member_list_entries,
562 memcpy (my_member_list, member_list, member_list_entries *
563 sizeof (
unsigned int));
564 my_member_list_entries = member_list_entries;
566 last_sync_ring_id.nodeid = ring_id->
rep.
nodeid;
567 last_sync_ring_id.seq = ring_id->
seq;
575 for (i = 0; i < my_old_member_list_entries; i++) {
577 for (j = 0; j < trans_list_entries; j++) {
578 if (my_old_member_list[i] == trans_list[j]) {
584 g_req_exec_cpg_downlist.nodeids[entries++] =
585 my_old_member_list[i];
588 g_req_exec_cpg_downlist.left_nodes = entries;
591 static int cpg_sync_process (
void)
596 res = cpg_exec_send_downlist();
603 res = cpg_exec_send_joinlist();
608 static void cpg_sync_activate (
void)
610 memcpy (my_old_member_list, my_member_list,
611 my_member_list_entries *
sizeof (
unsigned int));
612 my_old_member_list_entries = my_member_list_entries;
615 downlist_master_choose_and_send ();
618 joinlist_inform_clients ();
620 downlist_messages_delete ();
622 joinlist_messages_delete ();
624 notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
627 static void cpg_sync_abort (
void)
630 downlist_messages_delete ();
631 joinlist_messages_delete ();
634 static int notify_lib_totem_membership (
636 int member_list_entries,
637 const unsigned int *member_list)
651 res->member_list_entries = member_list_entries;
652 res->header.size = size;
654 res->header.error =
CS_OK;
660 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
671 static int notify_lib_joinlist(
674 int joined_list_entries,
676 int left_list_entries,
689 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
691 if (mar_name_compare (&pi->
group, group_name) == 0) {
695 for (i = 0; i < left_list_entries; i++) {
696 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
713 res->joined_list_entries = joined_list_entries;
714 res->left_list_entries = left_list_entries;
715 res->member_list_entries = count;
717 res->header.size = size;
719 res->header.error =
CS_OK;
722 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
725 if (mar_name_compare (&pi->
group, group_name) == 0) {
729 for (i = 0;i < left_list_entries; i++) {
730 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
736 retgi->nodeid = pi->
nodeid;
737 retgi->pid = pi->
pid;
743 if (left_list_entries) {
745 retgi += left_list_entries;
748 if (joined_list_entries) {
750 retgi += joined_list_entries;
756 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
758 if (mar_name_compare (&cpd->
group_name, group_name) == 0) {
759 assert (joined_list_entries <= 1);
760 if (joined_list_entries) {
761 if (joined_list[0].
pid == cpd->
pid &&
772 if (left_list_entries) {
773 if (left_list[0].
pid == cpd->
pid &&
790 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
796 notify_lib_totem_membership (cpd->
conn, my_old_member_list_entries, my_old_member_list);
803 static void downlist_log(
const char *msg,
struct downlist_msg* dl)
806 "%s: sender %s; members(old:%d left:%d)",
813 static struct downlist_msg* downlist_master_choose (
void)
818 uint32_t cmp_members;
819 uint32_t best_members;
823 for (iter = downlist_messages_head.
next;
824 iter != &downlist_messages_head;
828 downlist_log(
"comparing", cmp);
831 for (i = 0; i < cmp->left_nodes; i++) {
833 log_printf (LOG_DEBUG,
"Ignoring this entry because I'm in the left list\n");
849 best_members = best->old_members - best->left_nodes;
850 cmp_members = cmp->old_members - cmp->left_nodes;
852 if (cmp_members > best_members) {
854 }
else if (cmp_members == best_members) {
855 if (cmp->old_members > best->old_members) {
857 }
else if (cmp->old_members == best->old_members) {
865 assert (best != NULL);
870 static void downlist_master_choose_and_send (
void)
881 int left_list_entries;
884 qb_map_iter_t *miter;
889 stored_msg = downlist_master_choose ();
894 downlist_log(
"chosen downlist", stored_msg);
896 group_map = qb_skiplist_create();
903 for (iter = process_info_list_head.
next; iter != &process_info_list_head; ) {
908 for (i = 0; i < stored_msg->left_nodes; i++) {
910 if (pi->
nodeid == stored_msg->nodeids[i]) {
917 marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->
group);
918 cpg_group.value[cpg_group.length] = 0;
920 pcd = (
struct confchg_data *)qb_map_get(group_map, cpg_group.value);
922 pcd = (
struct confchg_data *)calloc(1,
sizeof(
struct confchg_data));
923 memcpy(&pcd->cpg_group, &cpg_group,
sizeof(
struct cpg_name));
924 qb_map_put(group_map, pcd->cpg_group.value, pcd);
926 size = pcd->left_list_entries;
927 pcd->left_list[size].nodeid = left_pi->
nodeid;
928 pcd->left_list[size].pid = left_pi->
pid;
930 pcd->left_list_entries++;
931 list_del (&left_pi->
list);
937 miter = qb_map_iter_create(group_map);
938 while (qb_map_iter_next(miter, (
void **)&pcd)) {
939 marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
941 log_printf (LOG_DEBUG,
"left_list_entries:%d", pcd->left_list_entries);
942 for (i=0; i<pcd->left_list_entries; i++) {
943 log_printf (LOG_DEBUG,
"left_list[%d] group:%s, ip:%s, pid:%d",
944 i, cpg_print_group_name(&group),
946 pcd->left_list[i].pid);
950 notify_lib_joinlist(&group, NULL,
952 pcd->left_list_entries,
958 qb_map_iter_free(miter);
959 qb_map_destroy(group_map);
965 static void joinlist_remove_zombie_pi_entries (
void)
973 for (pi_iter = process_info_list_head.
next; pi_iter != &process_info_list_head; ) {
975 pi_iter = pi_iter->
next;
988 for (jl_iter = joinlist_messages_head.
next;
989 jl_iter != &joinlist_messages_head;
990 jl_iter = jl_iter->
next) {
999 pi->
pid == stored_msg->
pid &&
1012 static void joinlist_inform_clients (
void)
1019 for (iter = joinlist_messages_head.
next;
1020 iter != &joinlist_messages_head;
1021 iter = iter->
next) {
1025 log_printf (LOG_DEBUG,
"joinlist_messages[%u] group:%s, ip:%s, pid:%d",
1026 i++, cpg_print_group_name(&stored_msg->
group_name),
1039 joinlist_remove_zombie_pi_entries ();
1042 static void downlist_messages_delete (
void)
1047 for (iter = downlist_messages_head.
next;
1048 iter != &downlist_messages_head;
1051 iter_next = iter->
next;
1054 list_del (&stored_msg->
list);
1059 static void joinlist_messages_delete (
void)
1064 for (iter = joinlist_messages_head.
next;
1065 iter != &joinlist_messages_head;
1068 iter_next = iter->
next;
1071 list_del (&stored_msg->
list);
1074 list_init (&joinlist_messages_head);
1079 list_init (&downlist_messages_head);
1080 list_init (&joinlist_messages_head);
1094 iter_next = iter->
next;
1097 list_del (&pi->
list);
1101 list_del (&cpg_iteration_instance->
list);
1102 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->
handle);
1105 static void cpg_pd_finalize (
struct cpg_pd *cpd)
1108 struct cpg_iteration_instance *cpii;
1115 iter_next = iter->
next;
1119 cpg_iteration_instance_finalize (cpii);
1122 list_del (&cpd->
list);
1125 static int cpg_lib_exit_fn (
void *conn)
1136 cpg_pd_finalize (cpd);
1142 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason)
1145 struct iovec req_exec_cpg_iovec;
1164 static void exec_cpg_procjoin_endian_convert (
void *msg)
1168 req_exec_cpg_procjoin->pid =
swab32(req_exec_cpg_procjoin->pid);
1169 swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
1170 req_exec_cpg_procjoin->reason =
swab32(req_exec_cpg_procjoin->reason);
1173 static void exec_cpg_joinlist_endian_convert (
void *msg_v)
1176 struct qb_ipc_response_header *res = (
struct qb_ipc_response_header *)msg;
1179 swab_mar_int32_t (&res->size);
1181 while ((
const char*)jle < msg + res->size) {
1188 static void exec_cpg_downlist_endian_convert_old (
void *msg)
1192 static void exec_cpg_downlist_endian_convert (
void *msg)
1197 req_exec_cpg_downlist->left_nodes =
swab32(req_exec_cpg_downlist->left_nodes);
1198 req_exec_cpg_downlist->old_members =
swab32(req_exec_cpg_downlist->old_members);
1200 for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
1201 req_exec_cpg_downlist->nodeids[i] =
swab32(req_exec_cpg_downlist->nodeids[i]);
1206 static void exec_cpg_mcast_endian_convert (
void *msg)
1210 swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1211 swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1212 req_exec_cpg_mcast->pid =
swab32(req_exec_cpg_mcast->pid);
1213 req_exec_cpg_mcast->msglen =
swab32(req_exec_cpg_mcast->msglen);
1214 swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1217 static void exec_cpg_partial_mcast_endian_convert (
void *msg)
1221 swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1222 swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1223 req_exec_cpg_mcast->pid =
swab32(req_exec_cpg_mcast->pid);
1224 req_exec_cpg_mcast->msglen =
swab32(req_exec_cpg_mcast->msglen);
1225 req_exec_cpg_mcast->fraglen =
swab32(req_exec_cpg_mcast->fraglen);
1226 req_exec_cpg_mcast->type =
swab32(req_exec_cpg_mcast->type);
1227 swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1233 for (iter = process_info_list_head.
next; iter != &process_info_list_head; ) {
1237 if (pi->
pid == pid && pi->
nodeid == nodeid &&
1238 mar_name_compare (&pi->
group, group_name) == 0) {
1246 static void do_proc_join(
1249 unsigned int nodeid,
1258 if (process_info_find (name, pid, nodeid) != NULL) {
1268 memcpy(&pi->
group, name,
sizeof(*name));
1269 list_init(&pi->
list);
1274 list_to_add = &process_info_list_head;
1275 for (list = process_info_list_head.
next; list != &process_info_list_head; list = list->
next) {
1285 list_add (&pi->
list, list_to_add);
1287 notify_info.pid = pi->
pid;
1288 notify_info.nodeid =
nodeid;
1289 notify_info.reason = reason;
1291 notify_lib_joinlist(&pi->
group, NULL,
1297 static void do_proc_leave(
1300 unsigned int nodeid,
1307 notify_info.pid = pid;
1308 notify_info.nodeid =
nodeid;
1309 notify_info.reason = reason;
1311 notify_lib_joinlist(name, NULL,
1316 for (iter = process_info_list_head.
next; iter != &process_info_list_head; ) {
1320 if (pi->
pid == pid && pi->
nodeid == nodeid &&
1321 mar_name_compare (&pi->
group, name)==0) {
1322 list_del (&pi->
list);
1328 static void message_handler_req_exec_cpg_downlist_old (
1329 const void *message,
1330 unsigned int nodeid)
1336 static void message_handler_req_exec_cpg_downlist(
1337 const void *message,
1338 unsigned int nodeid)
1340 const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
1348 req_exec_cpg_downlist->left_nodes, downlist_state);
1354 stored_msg->old_members = req_exec_cpg_downlist->old_members;
1355 stored_msg->left_nodes = req_exec_cpg_downlist->left_nodes;
1356 memcpy (stored_msg->nodeids, req_exec_cpg_downlist->nodeids,
1357 req_exec_cpg_downlist->left_nodes * sizeof (
mar_uint32_t));
1358 list_init (&stored_msg->
list);
1359 list_add (&stored_msg->
list, &downlist_messages_head);
1361 for (i = 0; i < my_member_list_entries; i++) {
1363 for (iter = downlist_messages_head.
next;
1364 iter != &downlist_messages_head;
1365 iter = iter->
next) {
1377 downlist_master_choose_and_send ();
1381 static void message_handler_req_exec_cpg_procjoin (
1382 const void *message,
1383 unsigned int nodeid)
1390 (
unsigned int)req_exec_cpg_procjoin->pid);
1392 do_proc_join (&req_exec_cpg_procjoin->group_name,
1393 req_exec_cpg_procjoin->pid, nodeid,
1397 static void message_handler_req_exec_cpg_procleave (
1398 const void *message,
1399 unsigned int nodeid)
1406 (
unsigned int)req_exec_cpg_procjoin->pid);
1408 do_proc_leave (&req_exec_cpg_procjoin->group_name,
1409 req_exec_cpg_procjoin->pid, nodeid,
1410 req_exec_cpg_procjoin->reason);
1415 static void message_handler_req_exec_cpg_joinlist (
1416 const void *message_v,
1417 unsigned int nodeid)
1419 const char *message = message_v;
1420 const struct qb_ipc_response_header *res = (
const struct qb_ipc_response_header *)message;
1427 while ((
const char*)jle < message + res->size) {
1431 stored_msg->
pid = jle->
pid;
1433 list_init (&stored_msg->
list);
1434 list_add (&stored_msg->
list, &joinlist_messages_head);
1439 static void message_handler_req_exec_cpg_mcast (
1440 const void *message,
1441 unsigned int nodeid)
1445 int msglen = req_exec_cpg_mcast->msglen;
1448 struct iovec iovec[2];
1462 iovec[1].iov_base = (
char*)message+
sizeof(*req_exec_cpg_mcast);
1463 iovec[1].iov_len = msglen;
1465 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; ) {
1470 && (mar_name_compare (&cpd->
group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1474 for (pi_iter = process_info_list_head.
next;
1475 pi_iter != &process_info_list_head; pi_iter = pi_iter->
next) {
1479 if (pi->
nodeid == nodeid &&
1480 mar_name_compare (&pi->
group, &req_exec_cpg_mcast->group_name) == 0) {
1497 static void message_handler_req_exec_cpg_partial_mcast (
1498 const void *message,
1499 unsigned int nodeid)
1503 int msglen = req_exec_cpg_mcast->fraglen;
1506 struct iovec iovec[2];
1524 iovec[1].iov_base = (
char*)message+
sizeof(*req_exec_cpg_mcast);
1525 iovec[1].iov_len = msglen;
1527 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; ) {
1532 && (mar_name_compare (&cpd->
group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1536 for (pi_iter = process_info_list_head.
next;
1537 pi_iter != &process_info_list_head; pi_iter = pi_iter->
next) {
1541 if (pi->
nodeid == nodeid &&
1542 mar_name_compare (&pi->
group, &req_exec_cpg_mcast->group_name) == 0) {
1560 static int cpg_exec_send_downlist(
void)
1565 g_req_exec_cpg_downlist.header.size =
sizeof(
struct req_exec_cpg_downlist);
1567 g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1569 iov.iov_base = (
void *)&g_req_exec_cpg_downlist;
1570 iov.iov_len = g_req_exec_cpg_downlist.header.size;
1575 static int cpg_exec_send_joinlist(
void)
1579 struct qb_ipc_response_header *res;
1582 struct iovec req_exec_cpg_iovec;
1584 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
1596 buf = alloca(
sizeof(
struct qb_ipc_response_header) +
sizeof(
struct join_list_entry) * count);
1602 jle = (
struct join_list_entry *)(buf +
sizeof(
struct qb_ipc_response_header));
1603 res = (
struct qb_ipc_response_header *)buf;
1605 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
1616 res->size =
sizeof(
struct qb_ipc_response_header)+sizeof(struct
join_list_entry) * count;
1618 req_exec_cpg_iovec.iov_base = buf;
1619 req_exec_cpg_iovec.iov_len = res->size;
1624 static int cpg_lib_init_fn (
void *conn)
1627 memset (cpd, 0,
sizeof(
struct cpg_pd));
1629 list_add (&cpd->
list, &cpg_pd_list_head);
1640 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message)
1649 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
1652 if (cpd_item->
pid == req_lib_cpg_join->pid &&
1653 mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->
group_name) == 0) {
1665 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
1669 mar_name_compare(&req_lib_cpg_join->group_name, &pi->
group) == 0) {
1685 cpd->
pid = req_lib_cpg_join->pid;
1686 cpd->
flags = req_lib_cpg_join->flags;
1687 memcpy (&cpd->
group_name, &req_lib_cpg_join->group_name,
1690 cpg_node_joinleave_send (req_lib_cpg_join->pid,
1691 &req_lib_cpg_join->group_name,
1713 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message)
1735 cpg_node_joinleave_send (req_lib_cpg_leave->pid,
1736 &req_lib_cpg_leave->group_name,
1750 static void message_handler_req_lib_cpg_finalize (
1752 const void *message)
1764 list_del (&cpd->
list);
1765 list_init (&cpd->
list);
1785 fd = open (path, O_RDWR, 0600);
1793 res = ftruncate (fd, bytes);
1795 goto error_close_unlink;
1798 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1801 if (addr == MAP_FAILED) {
1802 goto error_close_unlink;
1805 madvise(addr, bytes, MADV_NOSYNC);
1810 munmap (addr, bytes);
1822 static inline int zcb_alloc (
1824 const char *path_to_file,
1831 zcb_mapped = malloc (
sizeof (
struct zcb_mapped));
1832 if (zcb_mapped == NULL) {
1845 list_init (&zcb_mapped->
list);
1853 static inline int zcb_free (
struct zcb_mapped *zcb_mapped)
1857 res = munmap (zcb_mapped->
addr, zcb_mapped->
size);
1858 list_del (&zcb_mapped->
list);
1863 static inline int zcb_by_addr_free (
struct cpg_pd *cpd,
void *addr)
1866 struct zcb_mapped *zcb_mapped;
1867 unsigned int res = 0;
1872 zcb_mapped =
list_entry (list,
struct zcb_mapped, list);
1874 if (zcb_mapped->
addr == addr) {
1875 res = zcb_free (zcb_mapped);
1883 static inline int zcb_all_free (
1887 struct zcb_mapped *zcb_mapped;
1892 zcb_mapped =
list_entry (list,
struct zcb_mapped, list);
1896 zcb_free (zcb_mapped);
1906 static uint64_t void2serveraddr (
void *server_ptr)
1914 static void *serveraddr2void (uint64_t
server_addr)
1922 static void message_handler_req_lib_cpg_zc_alloc (
1924 const void *message)
1927 struct qb_ipc_response_header res_header;
1935 res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1942 res_header.size =
sizeof (
struct qb_ipc_response_header);
1949 static void message_handler_req_lib_cpg_zc_free (
1951 const void *message)
1954 struct qb_ipc_response_header res_header;
1960 addr = serveraddr2void (hdr->server_address);
1962 zcb_by_addr_free (cpd, addr);
1964 res_header.size =
sizeof (
struct qb_ipc_response_header);
1972 static void message_handler_req_lib_cpg_partial_mcast (
void *conn,
const void *message)
1978 struct iovec req_exec_cpg_iovec[2];
1981 int msglen = req_lib_cpg_mcast->fraglen;
2013 if (error ==
CS_OK) {
2014 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + msglen;
2017 req_exec_cpg_mcast.pid = cpd->
pid;
2018 req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
2019 req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
2020 req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
2022 memcpy(&req_exec_cpg_mcast.group_name, &group_name,
2025 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
2026 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
2027 req_exec_cpg_iovec[1].iov_base = (
char *)&req_lib_cpg_mcast->message;
2028 req_exec_cpg_iovec[1].iov_len = msglen;
2031 assert(result == 0);
2034 conn, group_name.value, cpd->
cpd_state, error);
2043 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message)
2045 const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
2049 struct iovec req_exec_cpg_iovec[2];
2050 struct req_exec_cpg_mcast req_exec_cpg_mcast;
2051 int msglen = req_lib_cpg_mcast->msglen;
2072 if (error ==
CS_OK) {
2073 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + msglen;
2076 req_exec_cpg_mcast.pid = cpd->
pid;
2077 req_exec_cpg_mcast.msglen = msglen;
2079 memcpy(&req_exec_cpg_mcast.group_name, &group_name,
2082 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
2083 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
2084 req_exec_cpg_iovec[1].iov_base = (
char *)&req_lib_cpg_mcast->message;
2085 req_exec_cpg_iovec[1].iov_len = msglen;
2088 assert(result == 0);
2091 conn, group_name.value, cpd->
cpd_state, error);
2095 static void message_handler_req_lib_cpg_zc_execute (
2097 const void *message)
2100 struct qb_ipc_request_header *
header;
2103 struct iovec req_exec_cpg_iovec[2];
2104 struct req_exec_cpg_mcast req_exec_cpg_mcast;
2105 struct req_lib_cpg_mcast *req_lib_cpg_mcast;
2111 header = (
struct qb_ipc_request_header *)(((
char *)serveraddr2void(hdr->server_address) + sizeof (
struct coroipcs_zc_header)));
2112 req_lib_cpg_mcast = (
struct req_lib_cpg_mcast *)header;
2131 if (error ==
CS_OK) {
2132 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
2135 req_exec_cpg_mcast.pid = cpd->
pid;
2136 req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
2138 memcpy(&req_exec_cpg_mcast.group_name, &cpd->
group_name,
2141 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
2142 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
2143 req_exec_cpg_iovec[1].iov_base = (
char *)header +
sizeof(
struct req_lib_cpg_mcast);
2144 req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
2161 static void message_handler_req_lib_cpg_membership (
void *conn,
2162 const void *message)
2165 (
struct req_lib_cpg_membership_get *)message;
2168 int member_count = 0;
2175 for (iter = process_info_list_head.
next;
2176 iter != &process_info_list_head; iter = iter->
next) {
2179 if (mar_name_compare (&pi->
group, &req_lib_cpg_membership_get->group_name) == 0) {
2191 static void message_handler_req_lib_cpg_local_get (
void *conn,
2192 const void *message)
2205 static void message_handler_req_lib_cpg_iteration_initialize (
2207 const void *message)
2214 struct cpg_iteration_instance *cpg_iteration_instance;
2227 res = hdb_handle_create (&cpg_iteration_handle_t_db,
sizeof (
struct cpg_iteration_instance),
2228 &cpg_iteration_handle);
2235 res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (
void *)&cpg_iteration_instance);
2243 cpg_iteration_instance->
handle = cpg_iteration_handle;
2248 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
2260 iter2 = iter2->
next) {
2263 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2279 if (mar_name_compare (&pi->
group, &req_lib_cpg_iterationinitialize->group_name) != 0)
2292 goto error_put_destroy;
2296 list_init (&new_pi->
list);
2310 iter2 = iter2->
next) {
2313 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2318 list_add (&new_pi->
list, iter2);
2328 list_init (&cpg_iteration_instance->
list);
2334 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2336 if (error !=
CS_OK) {
2337 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2350 static void message_handler_req_lib_cpg_iteration_next (
2352 const void *message)
2356 struct cpg_iteration_instance *cpg_iteration_instance;
2363 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2364 req_lib_cpg_iterationnext->iteration_handle,
2365 (
void *)&cpg_iteration_instance);
2372 assert (cpg_iteration_instance);
2393 hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
2403 static void message_handler_req_lib_cpg_iteration_finalize (
2405 const void *message)
2409 struct cpg_iteration_instance *cpg_iteration_instance;
2415 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2416 req_lib_cpg_iterationfinalize->iteration_handle,
2417 (
void *)&cpg_iteration_instance);
2424 assert (cpg_iteration_instance);
2426 cpg_iteration_instance_finalize (cpg_iteration_instance);
2427 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->
handle);
int initial_totem_conf_sent
mar_cpg_address_t member_list[]
mar_req_coroipcc_zc_free_t struct
mar_uint32_t sender_nodeid
#define CPG_MAX_NAME_LENGTH
void(* lib_handler_fn)(void *conn, const void *msg)
uint64_t initial_transition_counter
#define LOGSYS_LEVEL_TRACE
mar_uint32_t sender_nodeid
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
The req_lib_cpg_join struct.
mar_req_coroipcc_zc_alloc_t struct
The corosync_service_engine struct.
mar_uint32_t old_members __attribute__((aligned(8)))
struct corosync_service_engine * cpg_get_service_engine_ver0(void)
The res_lib_cpg_partial_deliver_callback struct.
The req_lib_cpg_mcast struct.
The corosync_lib_handler struct.
The res_lib_cpg_membership_get struct.
struct message_header header
struct list_head * current_pointer
The res_lib_cpg_iterationnext struct.
unsigned char addr[TOTEMIP_ADDRLEN]
struct qb_ipc_request_header header __attribute__((aligned(8)))
struct qb_ipc_request_header header __attribute__((aligned(8)))
The res_lib_cpg_iterationinitialize struct.
The corosync_exec_handler struct.
uint64_t transition_counter
#define log_printf(level, format, args...)
void(* exec_handler_fn)(const void *msg, unsigned int nodeid)
The res_lib_cpg_partial_send struct.
struct list_head iteration_instance_list_head
#define SERVICE_ID_MAKE(a, b)
The req_lib_cpg_iterationinitialize struct.
#define LOGSYS_LEVEL_WARNING
void *(* ipc_private_data_get)(void *conn)
The res_lib_cpg_join struct.
void(* ipc_source_set)(mar_message_source_t *source, void *conn)
void(* ipc_refcnt_inc)(void *conn)
mar_req_coroipcc_zc_execute_t struct
The res_lib_cpg_mcast struct.
#define LOGSYS_LEVEL_ERROR
void(* ipc_refcnt_dec)(void *conn)
struct totem_ip_address rep
mar_uint32_t member_list[]
cs_error_t
The cs_error_t enum.
The req_lib_cpg_leave struct.
#define LOGSYS_LEVEL_DEBUG
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
int(* ipc_dispatch_iov_send)(void *conn, const struct iovec *iov, unsigned int iov_len)
mar_cpg_name_t group_name
The req_lib_cpg_iterationfinalize struct.
mar_cpg_name_t group_name
The corosync_api_v1 struct.
int(* ipc_dispatch_send)(void *conn, const void *msg, size_t mlen)
LOGSYS_DECLARE_SUBSYS("CPG")
DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db, NULL)
#define swab32(x)
The swab32 macro.
int(* totem_mcast)(const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee)
The res_lib_cpg_finalize struct.
int(* ipc_response_send)(void *conn, const void *msg, size_t mlen)
The res_lib_cpg_local_get struct.
#define PROCESSOR_COUNT_MAX
struct qb_ipc_request_header header __attribute__((aligned(8)))
The res_lib_cpg_iterationfinalize struct.
struct corosync_service_engine cpg_service_engine
The req_lib_cpg_partial_mcast struct.
struct list_head zcb_mapped_list_head
The req_lib_cpg_iterationnext struct.
struct qb_ipc_request_header header __attribute__((aligned(8)))
The res_lib_cpg_confchg_callback struct.
#define list_entry(ptr, type, member)
mar_cpg_name_t group_name
The req_lib_cpg_membership_get struct.
struct list_head items_list_head
The res_lib_cpg_leave struct.
struct memb_ring_id ring_id
const char *(* totem_ifaces_print)(unsigned int nodeid)
struct qb_ipc_request_header header __attribute__((aligned(8)))
unsigned int(* totem_nodeid_get)(void)
The res_lib_cpg_totem_confchg_callback struct.
DECLARE_LIST_INIT(cpg_pd_list_head)
Message from another node.
The mar_message_source_t struct.