corosync  2.4.4
exec/cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2006-2015 Red Hat, Inc.
3  *
4  * All rights reserved.
5  *
6  * Author: Christine Caulfield (ccaulfie@redhat.com)
7  * Author: Jan Friesse (jfriesse@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <config.h>
37 
38 #ifdef HAVE_ALLOCA_H
39 #include <alloca.h>
40 #endif
41 #include <sys/types.h>
42 #include <sys/socket.h>
43 #include <sys/un.h>
44 #include <sys/ioctl.h>
45 #include <netinet/in.h>
46 #include <sys/uio.h>
47 #include <unistd.h>
48 #include <fcntl.h>
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <errno.h>
52 #include <time.h>
53 #include <assert.h>
54 #include <arpa/inet.h>
55 #include <sys/mman.h>
56 #include <qb/qbmap.h>
57 
58 #include <corosync/corotypes.h>
59 #include <qb/qbipc_common.h>
60 #include <corosync/corodefs.h>
61 #include <corosync/list.h>
62 #include <corosync/logsys.h>
63 #include <corosync/coroapi.h>
64 
65 #include <corosync/cpg.h>
66 #include <corosync/ipc_cpg.h>
67 
68 #ifndef MAP_ANONYMOUS
69 #define MAP_ANONYMOUS MAP_ANON
70 #endif
71 
72 #include "service.h"
73 
74 LOGSYS_DECLARE_SUBSYS ("CPG");
75 
76 #define GROUP_HASH_SIZE 32
77 
86 };
87 
88 struct zcb_mapped {
89  struct list_head list;
90  void *addr;
91  size_t size;
92 };
93 /*
94  * state` exec deliver
95  * match group name, pid -> if matched deliver for YES:
96  * XXX indicates impossible state
97  *
98  * join leave mcast
99  * UNJOINED XXX XXX NO
100  * LEAVE_STARTED XXX YES(unjoined_enter) YES
101  * JOIN_STARTED YES(join_started_enter) XXX NO
102  * JOIN_COMPLETED XXX NO YES
103  *
104  * join_started_enter
105  * set JOIN_COMPLETED
106  * add entry to process_info list
107  * unjoined_enter
108  * set UNJOINED
109  * delete entry from process_info list
110  *
111  *
112  * library accept join error codes
113  * UNJOINED YES(CS_OK) set JOIN_STARTED
114  * LEAVE_STARTED NO(CS_ERR_BUSY)
115  * JOIN_STARTED NO(CS_ERR_EXIST)
116  * JOIN_COMPlETED NO(CS_ERR_EXIST)
117  *
118  * library accept leave error codes
119  * UNJOINED NO(CS_ERR_NOT_EXIST)
120  * LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
121  * JOIN_STARTED NO(CS_ERR_BUSY)
122  * JOIN_COMPLETED YES(CS_OK) set LEAVE_STARTED
123  *
124  * library accept mcast
125  * UNJOINED NO(CS_ERR_NOT_EXIST)
126  * LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
127  * JOIN_STARTED YES(CS_OK)
128  * JOIN_COMPLETED YES(CS_OK)
129  */
130 enum cpd_state {
135 };
136 
140 };
141 
146 };
147 static enum cpg_downlist_state_e downlist_state;
148 static struct list_head downlist_messages_head;
149 static struct list_head joinlist_messages_head;
150 
151 struct cpg_pd {
152  void *conn;
154  uint32_t pid;
156  unsigned int flags;
158  uint64_t transition_counter; /* These two are used when sending fragmented messages */
160  struct list_head list;
163 };
164 
167  struct list_head list;
168  struct list_head items_list_head; /* List of process_info */
170 };
171 
172 DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
173 
174 DECLARE_LIST_INIT(cpg_pd_list_head);
175 
176 static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
177 
178 static unsigned int my_member_list_entries;
179 
180 static unsigned int my_old_member_list[PROCESSOR_COUNT_MAX];
181 
182 static unsigned int my_old_member_list_entries = 0;
183 
184 static struct corosync_api_v1 *api = NULL;
185 
186 static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
187 
188 static mar_cpg_ring_id_t last_sync_ring_id;
189 
190 struct process_info {
191  unsigned int nodeid;
192  uint32_t pid;
194  struct list_head list; /* on the group_info members list */
195 };
196 DECLARE_LIST_INIT(process_info_list_head);
197 
199  uint32_t pid;
201 };
202 
203 /*
204  * Service Interfaces required by service_message_handler struct
205  */
206 static char *cpg_exec_init_fn (struct corosync_api_v1 *);
207 
208 static int cpg_lib_init_fn (void *conn);
209 
210 static int cpg_lib_exit_fn (void *conn);
211 
212 static void message_handler_req_exec_cpg_procjoin (
213  const void *message,
214  unsigned int nodeid);
215 
216 static void message_handler_req_exec_cpg_procleave (
217  const void *message,
218  unsigned int nodeid);
219 
220 static void message_handler_req_exec_cpg_joinlist (
221  const void *message,
222  unsigned int nodeid);
223 
224 static void message_handler_req_exec_cpg_mcast (
225  const void *message,
226  unsigned int nodeid);
227 
228 static void message_handler_req_exec_cpg_partial_mcast (
229  const void *message,
230  unsigned int nodeid);
231 
232 static void message_handler_req_exec_cpg_downlist_old (
233  const void *message,
234  unsigned int nodeid);
235 
236 static void message_handler_req_exec_cpg_downlist (
237  const void *message,
238  unsigned int nodeid);
239 
240 static void exec_cpg_procjoin_endian_convert (void *msg);
241 
242 static void exec_cpg_joinlist_endian_convert (void *msg);
243 
244 static void exec_cpg_mcast_endian_convert (void *msg);
245 
246 static void exec_cpg_partial_mcast_endian_convert (void *msg);
247 
248 static void exec_cpg_downlist_endian_convert_old (void *msg);
249 
250 static void exec_cpg_downlist_endian_convert (void *msg);
251 
252 static void message_handler_req_lib_cpg_join (void *conn, const void *message);
253 
254 static void message_handler_req_lib_cpg_leave (void *conn, const void *message);
255 
256 static void message_handler_req_lib_cpg_finalize (void *conn, const void *message);
257 
258 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
259 
260 static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
261 
262 static void message_handler_req_lib_cpg_membership (void *conn,
263  const void *message);
264 
265 static void message_handler_req_lib_cpg_local_get (void *conn,
266  const void *message);
267 
268 static void message_handler_req_lib_cpg_iteration_initialize (
269  void *conn,
270  const void *message);
271 
272 static void message_handler_req_lib_cpg_iteration_next (
273  void *conn,
274  const void *message);
275 
276 static void message_handler_req_lib_cpg_iteration_finalize (
277  void *conn,
278  const void *message);
279 
280 static void message_handler_req_lib_cpg_zc_alloc (
281  void *conn,
282  const void *message);
283 
284 static void message_handler_req_lib_cpg_zc_free (
285  void *conn,
286  const void *message);
287 
288 static void message_handler_req_lib_cpg_zc_execute (
289  void *conn,
290  const void *message);
291 
292 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason);
293 
294 static int cpg_exec_send_downlist(void);
295 
296 static int cpg_exec_send_joinlist(void);
297 
298 static void downlist_messages_delete (void);
299 
300 static void downlist_master_choose_and_send (void);
301 
302 static void joinlist_inform_clients (void);
303 
304 static void joinlist_messages_delete (void);
305 
306 static void cpg_sync_init (
307  const unsigned int *trans_list,
308  size_t trans_list_entries,
309  const unsigned int *member_list,
310  size_t member_list_entries,
311  const struct memb_ring_id *ring_id);
312 
313 static int cpg_sync_process (void);
314 
315 static void cpg_sync_activate (void);
316 
317 static void cpg_sync_abort (void);
318 
319 static void do_proc_join(
320  const mar_cpg_name_t *name,
321  uint32_t pid,
322  unsigned int nodeid,
323  int reason);
324 
325 static void do_proc_leave(
326  const mar_cpg_name_t *name,
327  uint32_t pid,
328  unsigned int nodeid,
329  int reason);
330 
331 static int notify_lib_totem_membership (
332  void *conn,
333  int member_list_entries,
334  const unsigned int *member_list);
335 
336 static inline int zcb_all_free (
337  struct cpg_pd *cpd);
338 
339 static char *cpg_print_group_name (
340  const mar_cpg_name_t *group);
341 
342 /*
343  * Library Handler Definition
344  */
345 static struct corosync_lib_handler cpg_lib_engine[] =
346 {
347  { /* 0 - MESSAGE_REQ_CPG_JOIN */
348  .lib_handler_fn = message_handler_req_lib_cpg_join,
349  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
350  },
351  { /* 1 - MESSAGE_REQ_CPG_LEAVE */
352  .lib_handler_fn = message_handler_req_lib_cpg_leave,
353  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
354  },
355  { /* 2 - MESSAGE_REQ_CPG_MCAST */
356  .lib_handler_fn = message_handler_req_lib_cpg_mcast,
357  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
358  },
359  { /* 3 - MESSAGE_REQ_CPG_MEMBERSHIP */
360  .lib_handler_fn = message_handler_req_lib_cpg_membership,
361  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
362  },
363  { /* 4 - MESSAGE_REQ_CPG_LOCAL_GET */
364  .lib_handler_fn = message_handler_req_lib_cpg_local_get,
365  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
366  },
367  { /* 5 - MESSAGE_REQ_CPG_ITERATIONINITIALIZE */
368  .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
369  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
370  },
371  { /* 6 - MESSAGE_REQ_CPG_ITERATIONNEXT */
372  .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
373  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
374  },
375  { /* 7 - MESSAGE_REQ_CPG_ITERATIONFINALIZE */
376  .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
377  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
378  },
379  { /* 8 - MESSAGE_REQ_CPG_FINALIZE */
380  .lib_handler_fn = message_handler_req_lib_cpg_finalize,
381  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
382  },
383  { /* 9 */
384  .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
385  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
386  },
387  { /* 10 */
388  .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
389  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
390  },
391  { /* 11 */
392  .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
393  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
394  },
395  { /* 12 */
396  .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
397  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
398  },
399 
400 };
401 
402 static struct corosync_exec_handler cpg_exec_engine[] =
403 {
404  { /* 0 - MESSAGE_REQ_EXEC_CPG_PROCJOIN */
405  .exec_handler_fn = message_handler_req_exec_cpg_procjoin,
406  .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
407  },
408  { /* 1 - MESSAGE_REQ_EXEC_CPG_PROCLEAVE */
409  .exec_handler_fn = message_handler_req_exec_cpg_procleave,
410  .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
411  },
412  { /* 2 - MESSAGE_REQ_EXEC_CPG_JOINLIST */
413  .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
414  .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
415  },
416  { /* 3 - MESSAGE_REQ_EXEC_CPG_MCAST */
417  .exec_handler_fn = message_handler_req_exec_cpg_mcast,
418  .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
419  },
420  { /* 4 - MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD */
421  .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
422  .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
423  },
424  { /* 5 - MESSAGE_REQ_EXEC_CPG_DOWNLIST */
425  .exec_handler_fn = message_handler_req_exec_cpg_downlist,
426  .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
427  },
428  { /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
429  .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
430  .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
431  },
432 };
433 
435  .name = "corosync cluster closed process group service v1.01",
436  .id = CPG_SERVICE,
437  .priority = 1,
438  .private_data_size = sizeof (struct cpg_pd),
439  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED,
440  .allow_inquorate = CS_LIB_ALLOW_INQUORATE,
441  .lib_init_fn = cpg_lib_init_fn,
442  .lib_exit_fn = cpg_lib_exit_fn,
443  .lib_engine = cpg_lib_engine,
444  .lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler),
445  .exec_init_fn = cpg_exec_init_fn,
446  .exec_dump_fn = NULL,
447  .exec_engine = cpg_exec_engine,
448  .exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler),
449  .sync_init = cpg_sync_init,
450  .sync_process = cpg_sync_process,
451  .sync_activate = cpg_sync_activate,
452  .sync_abort = cpg_sync_abort
453 };
454 
456 {
457  return (&cpg_service_engine);
458 }
459 
461  struct qb_ipc_request_header header __attribute__((aligned(8)));
462  mar_cpg_name_t group_name __attribute__((aligned(8)));
463  mar_uint32_t pid __attribute__((aligned(8)));
464  mar_uint32_t reason __attribute__((aligned(8)));
465 };
466 
468  struct qb_ipc_request_header header __attribute__((aligned(8)));
469  mar_cpg_name_t group_name __attribute__((aligned(8)));
470  mar_uint32_t msglen __attribute__((aligned(8)));
471  mar_uint32_t pid __attribute__((aligned(8)));
472  mar_message_source_t source __attribute__((aligned(8)));
473  mar_uint8_t message[] __attribute__((aligned(8)));
474 };
475 
477  struct qb_ipc_request_header header __attribute__((aligned(8)));
478  mar_cpg_name_t group_name __attribute__((aligned(8)));
479  mar_uint32_t msglen __attribute__((aligned(8)));
480  mar_uint32_t fraglen __attribute__((aligned(8)));
481  mar_uint32_t pid __attribute__((aligned(8)));
482  mar_uint32_t type __attribute__((aligned(8)));
483  mar_message_source_t source __attribute__((aligned(8)));
484  mar_uint8_t message[] __attribute__((aligned(8)));
485 };
486 
488  struct qb_ipc_request_header header __attribute__((aligned(8)));
489  mar_uint32_t left_nodes __attribute__((aligned(8)));
490  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
491 };
492 
494  struct qb_ipc_request_header header __attribute__((aligned(8)));
495  /* merge decisions */
496  mar_uint32_t old_members __attribute__((aligned(8)));
497  /* downlist below */
498  mar_uint32_t left_nodes __attribute__((aligned(8)));
499  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
500 };
501 
502 struct downlist_msg {
504  mar_uint32_t old_members __attribute__((aligned(8)));
505  mar_uint32_t left_nodes __attribute__((aligned(8)));
506  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
507  struct list_head list;
508 };
509 
510 struct joinlist_msg {
512  uint32_t pid;
514  struct list_head list;
515 };
516 
517 static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
518 
519 /*
520  * Function print group name. It's not reentrant
521  */
522 static char *cpg_print_group_name(const mar_cpg_name_t *group)
523 {
524  static char res[CPG_MAX_NAME_LENGTH * 4 + 1];
525  int dest_pos = 0;
526  char c;
527  int i;
528 
529  for (i = 0; i < group->length; i++) {
530  c = group->value[i];
531 
532  if (c >= ' ' && c < 0x7f && c != '\\') {
533  res[dest_pos++] = c;
534  } else {
535  if (c == '\\') {
536  res[dest_pos++] = '\\';
537  res[dest_pos++] = '\\';
538  } else {
539  snprintf(res + dest_pos, sizeof(res) - dest_pos, "\\x%02X", c);
540  dest_pos += 4;
541  }
542  }
543  }
544  res[dest_pos] = 0;
545 
546  return (res);
547 }
548 
549 static void cpg_sync_init (
550  const unsigned int *trans_list,
551  size_t trans_list_entries,
552  const unsigned int *member_list,
553  size_t member_list_entries,
554  const struct memb_ring_id *ring_id)
555 {
556  int entries;
557  int i, j;
558  int found;
559 
560  my_sync_state = CPGSYNC_DOWNLIST;
561 
562  memcpy (my_member_list, member_list, member_list_entries *
563  sizeof (unsigned int));
564  my_member_list_entries = member_list_entries;
565 
566  last_sync_ring_id.nodeid = ring_id->rep.nodeid;
567  last_sync_ring_id.seq = ring_id->seq;
568 
569  downlist_state = CPG_DOWNLIST_WAITING_FOR_MESSAGES;
570 
571  entries = 0;
572  /*
573  * Determine list of nodeids for downlist message
574  */
575  for (i = 0; i < my_old_member_list_entries; i++) {
576  found = 0;
577  for (j = 0; j < trans_list_entries; j++) {
578  if (my_old_member_list[i] == trans_list[j]) {
579  found = 1;
580  break;
581  }
582  }
583  if (found == 0) {
584  g_req_exec_cpg_downlist.nodeids[entries++] =
585  my_old_member_list[i];
586  }
587  }
588  g_req_exec_cpg_downlist.left_nodes = entries;
589 }
590 
591 static int cpg_sync_process (void)
592 {
593  int res = -1;
594 
595  if (my_sync_state == CPGSYNC_DOWNLIST) {
596  res = cpg_exec_send_downlist();
597  if (res == -1) {
598  return (-1);
599  }
600  my_sync_state = CPGSYNC_JOINLIST;
601  }
602  if (my_sync_state == CPGSYNC_JOINLIST) {
603  res = cpg_exec_send_joinlist();
604  }
605  return (res);
606 }
607 
608 static void cpg_sync_activate (void)
609 {
610  memcpy (my_old_member_list, my_member_list,
611  my_member_list_entries * sizeof (unsigned int));
612  my_old_member_list_entries = my_member_list_entries;
613 
614  if (downlist_state == CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
615  downlist_master_choose_and_send ();
616  }
617 
618  joinlist_inform_clients ();
619 
620  downlist_messages_delete ();
621  downlist_state = CPG_DOWNLIST_NONE;
622  joinlist_messages_delete ();
623 
624  notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
625 }
626 
627 static void cpg_sync_abort (void)
628 {
629  downlist_state = CPG_DOWNLIST_NONE;
630  downlist_messages_delete ();
631  joinlist_messages_delete ();
632 }
633 
634 static int notify_lib_totem_membership (
635  void *conn,
636  int member_list_entries,
637  const unsigned int *member_list)
638 {
639  struct list_head *iter;
640  char *buf;
641  int size;
643 
644  size = sizeof(struct res_lib_cpg_totem_confchg_callback) +
645  sizeof(mar_uint32_t) * (member_list_entries);
646  buf = alloca(size);
647  if (!buf)
648  return CS_ERR_LIBRARY;
649 
650  res = (struct res_lib_cpg_totem_confchg_callback *)buf;
651  res->member_list_entries = member_list_entries;
652  res->header.size = size;
654  res->header.error = CS_OK;
655 
656  memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t));
657  memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t));
658 
659  if (conn == NULL) {
660  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
661  struct cpg_pd *cpg_pd = list_entry (iter, struct cpg_pd, list);
662  api->ipc_dispatch_send (cpg_pd->conn, buf, size);
663  }
664  } else {
665  api->ipc_dispatch_send (conn, buf, size);
666  }
667 
668  return CS_OK;
669 }
670 
671 static int notify_lib_joinlist(
672  const mar_cpg_name_t *group_name,
673  void *conn,
674  int joined_list_entries,
675  mar_cpg_address_t *joined_list,
676  int left_list_entries,
677  mar_cpg_address_t *left_list,
678  int id)
679 {
680  int size;
681  char *buf;
682  struct list_head *iter;
683  int count;
684  struct res_lib_cpg_confchg_callback *res;
685  mar_cpg_address_t *retgi;
686 
687  count = 0;
688 
689  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
690  struct process_info *pi = list_entry (iter, struct process_info, list);
691  if (mar_name_compare (&pi->group, group_name) == 0) {
692  int i;
693  int founded = 0;
694 
695  for (i = 0; i < left_list_entries; i++) {
696  if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
697  founded++;
698  }
699  }
700 
701  if (!founded)
702  count++;
703  }
704  }
705 
706  size = sizeof(struct res_lib_cpg_confchg_callback) +
707  sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries);
708  buf = alloca(size);
709  if (!buf)
710  return CS_ERR_LIBRARY;
711 
712  res = (struct res_lib_cpg_confchg_callback *)buf;
713  res->joined_list_entries = joined_list_entries;
714  res->left_list_entries = left_list_entries;
715  res->member_list_entries = count;
716  retgi = res->member_list;
717  res->header.size = size;
718  res->header.id = id;
719  res->header.error = CS_OK;
720  memcpy(&res->group_name, group_name, sizeof(mar_cpg_name_t));
721 
722  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
723  struct process_info *pi=list_entry (iter, struct process_info, list);
724 
725  if (mar_name_compare (&pi->group, group_name) == 0) {
726  int i;
727  int founded = 0;
728 
729  for (i = 0;i < left_list_entries; i++) {
730  if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
731  founded++;
732  }
733  }
734 
735  if (!founded) {
736  retgi->nodeid = pi->nodeid;
737  retgi->pid = pi->pid;
738  retgi++;
739  }
740  }
741  }
742 
743  if (left_list_entries) {
744  memcpy (retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t));
745  retgi += left_list_entries;
746  }
747 
748  if (joined_list_entries) {
749  memcpy (retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t));
750  retgi += joined_list_entries;
751  }
752 
753  if (conn) {
754  api->ipc_dispatch_send (conn, buf, size);
755  } else {
756  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
757  struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
758  if (mar_name_compare (&cpd->group_name, group_name) == 0) {
759  assert (joined_list_entries <= 1);
760  if (joined_list_entries) {
761  if (joined_list[0].pid == cpd->pid &&
762  joined_list[0].nodeid == api->totem_nodeid_get()) {
764  }
765  }
766  if (cpd->cpd_state == CPD_STATE_JOIN_COMPLETED ||
768 
769  api->ipc_dispatch_send (cpd->conn, buf, size);
770  cpd->transition_counter++;
771  }
772  if (left_list_entries) {
773  if (left_list[0].pid == cpd->pid &&
774  left_list[0].nodeid == api->totem_nodeid_get() &&
775  left_list[0].reason == CONFCHG_CPG_REASON_LEAVE) {
776 
777  cpd->pid = 0;
778  memset (&cpd->group_name, 0, sizeof(cpd->group_name));
780  }
781  }
782  }
783  }
784  }
785 
786 
787  /*
788  * Traverse thru cpds and send totem membership for cpd, where it is not send yet
789  */
790  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
791  struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
792 
794  cpd->initial_totem_conf_sent = 1;
795 
796  notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list);
797  }
798  }
799 
800  return CS_OK;
801 }
802 
803 static void downlist_log(const char *msg, struct downlist_msg* dl)
804 {
805  log_printf (LOG_DEBUG,
806  "%s: sender %s; members(old:%d left:%d)",
807  msg,
809  dl->old_members,
810  dl->left_nodes);
811 }
812 
813 static struct downlist_msg* downlist_master_choose (void)
814 {
815  struct downlist_msg *cmp;
816  struct downlist_msg *best = NULL;
817  struct list_head *iter;
818  uint32_t cmp_members;
819  uint32_t best_members;
820  uint32_t i;
821  int ignore_msg;
822 
823  for (iter = downlist_messages_head.next;
824  iter != &downlist_messages_head;
825  iter = iter->next) {
826 
827  cmp = list_entry(iter, struct downlist_msg, list);
828  downlist_log("comparing", cmp);
829 
830  ignore_msg = 0;
831  for (i = 0; i < cmp->left_nodes; i++) {
832  if (cmp->nodeids[i] == api->totem_nodeid_get()) {
833  log_printf (LOG_DEBUG, "Ignoring this entry because I'm in the left list\n");
834 
835  ignore_msg = 1;
836  break;
837  }
838  }
839 
840  if (ignore_msg) {
841  continue ;
842  }
843 
844  if (best == NULL) {
845  best = cmp;
846  continue;
847  }
848 
849  best_members = best->old_members - best->left_nodes;
850  cmp_members = cmp->old_members - cmp->left_nodes;
851 
852  if (cmp_members > best_members) {
853  best = cmp;
854  } else if (cmp_members == best_members) {
855  if (cmp->old_members > best->old_members) {
856  best = cmp;
857  } else if (cmp->old_members == best->old_members) {
858  if (cmp->sender_nodeid < best->sender_nodeid) {
859  best = cmp;
860  }
861  }
862  }
863  }
864 
865  assert (best != NULL);
866 
867  return best;
868 }
869 
870 static void downlist_master_choose_and_send (void)
871 {
872  struct downlist_msg *stored_msg;
873  struct list_head *iter;
874  struct process_info *left_pi;
875  qb_map_t *group_map;
876  struct cpg_name cpg_group;
877  mar_cpg_name_t group;
878  struct confchg_data{
879  struct cpg_name cpg_group;
881  int left_list_entries;
882  struct list_head list;
883  } *pcd;
884  qb_map_iter_t *miter;
885  int i, size;
886 
887  downlist_state = CPG_DOWNLIST_APPLYING;
888 
889  stored_msg = downlist_master_choose ();
890  if (!stored_msg) {
891  log_printf (LOGSYS_LEVEL_DEBUG, "NO chosen downlist");
892  return;
893  }
894  downlist_log("chosen downlist", stored_msg);
895 
896  group_map = qb_skiplist_create();
897 
898  /*
899  * only the cpg groups included in left nodes should receive
900  * confchg event, so we will collect these cpg groups and
901  * relative left_lists here.
902  */
903  for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
904  struct process_info *pi = list_entry(iter, struct process_info, list);
905  iter = iter->next;
906 
907  left_pi = NULL;
908  for (i = 0; i < stored_msg->left_nodes; i++) {
909 
910  if (pi->nodeid == stored_msg->nodeids[i]) {
911  left_pi = pi;
912  break;
913  }
914  }
915 
916  if (left_pi) {
917  marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->group);
918  cpg_group.value[cpg_group.length] = 0;
919 
920  pcd = (struct confchg_data *)qb_map_get(group_map, cpg_group.value);
921  if (pcd == NULL) {
922  pcd = (struct confchg_data *)calloc(1, sizeof(struct confchg_data));
923  memcpy(&pcd->cpg_group, &cpg_group, sizeof(struct cpg_name));
924  qb_map_put(group_map, pcd->cpg_group.value, pcd);
925  }
926  size = pcd->left_list_entries;
927  pcd->left_list[size].nodeid = left_pi->nodeid;
928  pcd->left_list[size].pid = left_pi->pid;
929  pcd->left_list[size].reason = CONFCHG_CPG_REASON_NODEDOWN;
930  pcd->left_list_entries++;
931  list_del (&left_pi->list);
932  free (left_pi);
933  }
934  }
935 
936  /* send only one confchg event per cpg group */
937  miter = qb_map_iter_create(group_map);
938  while (qb_map_iter_next(miter, (void **)&pcd)) {
939  marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
940 
941  log_printf (LOG_DEBUG, "left_list_entries:%d", pcd->left_list_entries);
942  for (i=0; i<pcd->left_list_entries; i++) {
943  log_printf (LOG_DEBUG, "left_list[%d] group:%s, ip:%s, pid:%d",
944  i, cpg_print_group_name(&group),
945  (char*)api->totem_ifaces_print(pcd->left_list[i].nodeid),
946  pcd->left_list[i].pid);
947  }
948 
949  /* send confchg event */
950  notify_lib_joinlist(&group, NULL,
951  0, NULL,
952  pcd->left_list_entries,
953  pcd->left_list,
955 
956  free(pcd);
957  }
958  qb_map_iter_free(miter);
959  qb_map_destroy(group_map);
960 }
961 
962 /*
963  * Remove processes that might have left the group while we were suspended.
964  */
965 static void joinlist_remove_zombie_pi_entries (void)
966 {
967  struct list_head *pi_iter;
968  struct list_head *jl_iter;
969  struct process_info *pi;
970  struct joinlist_msg *stored_msg;
971  int found;
972 
973  for (pi_iter = process_info_list_head.next; pi_iter != &process_info_list_head; ) {
974  pi = list_entry (pi_iter, struct process_info, list);
975  pi_iter = pi_iter->next;
976 
977  /*
978  * Ignore local node
979  */
980  if (pi->nodeid == api->totem_nodeid_get()) {
981  continue ;
982  }
983 
984  /*
985  * Try to find message in joinlist messages
986  */
987  found = 0;
988  for (jl_iter = joinlist_messages_head.next;
989  jl_iter != &joinlist_messages_head;
990  jl_iter = jl_iter->next) {
991 
992  stored_msg = list_entry(jl_iter, struct joinlist_msg, list);
993 
994  if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
995  continue ;
996  }
997 
998  if (pi->nodeid == stored_msg->sender_nodeid &&
999  pi->pid == stored_msg->pid &&
1000  mar_name_compare (&pi->group, &stored_msg->group_name) == 0) {
1001  found = 1;
1002  break ;
1003  }
1004  }
1005 
1006  if (!found) {
1007  do_proc_leave(&pi->group, pi->pid, pi->nodeid, CONFCHG_CPG_REASON_PROCDOWN);
1008  }
1009  }
1010 }
1011 
1012 static void joinlist_inform_clients (void)
1013 {
1014  struct joinlist_msg *stored_msg;
1015  struct list_head *iter;
1016  unsigned int i;
1017 
1018  i = 0;
1019  for (iter = joinlist_messages_head.next;
1020  iter != &joinlist_messages_head;
1021  iter = iter->next) {
1022 
1023  stored_msg = list_entry(iter, struct joinlist_msg, list);
1024 
1025  log_printf (LOG_DEBUG, "joinlist_messages[%u] group:%s, ip:%s, pid:%d",
1026  i++, cpg_print_group_name(&stored_msg->group_name),
1027  (char*)api->totem_ifaces_print(stored_msg->sender_nodeid),
1028  stored_msg->pid);
1029 
1030  /* Ignore our own messages */
1031  if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
1032  continue ;
1033  }
1034 
1035  do_proc_join (&stored_msg->group_name, stored_msg->pid, stored_msg->sender_nodeid,
1037  }
1038 
1039  joinlist_remove_zombie_pi_entries ();
1040 }
1041 
1042 static void downlist_messages_delete (void)
1043 {
1044  struct downlist_msg *stored_msg;
1045  struct list_head *iter, *iter_next;
1046 
1047  for (iter = downlist_messages_head.next;
1048  iter != &downlist_messages_head;
1049  iter = iter_next) {
1050 
1051  iter_next = iter->next;
1052 
1053  stored_msg = list_entry(iter, struct downlist_msg, list);
1054  list_del (&stored_msg->list);
1055  free (stored_msg);
1056  }
1057 }
1058 
1059 static void joinlist_messages_delete (void)
1060 {
1061  struct joinlist_msg *stored_msg;
1062  struct list_head *iter, *iter_next;
1063 
1064  for (iter = joinlist_messages_head.next;
1065  iter != &joinlist_messages_head;
1066  iter = iter_next) {
1067 
1068  iter_next = iter->next;
1069 
1070  stored_msg = list_entry(iter, struct joinlist_msg, list);
1071  list_del (&stored_msg->list);
1072  free (stored_msg);
1073  }
1074  list_init (&joinlist_messages_head);
1075 }
1076 
1077 static char *cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
1078 {
1079  list_init (&downlist_messages_head);
1080  list_init (&joinlist_messages_head);
1081  api = corosync_api;
1082  return (NULL);
1083 }
1084 
1085 static void cpg_iteration_instance_finalize (struct cpg_iteration_instance *cpg_iteration_instance)
1086 {
1087  struct list_head *iter, *iter_next;
1088  struct process_info *pi;
1089 
1090  for (iter = cpg_iteration_instance->items_list_head.next;
1091  iter != &cpg_iteration_instance->items_list_head;
1092  iter = iter_next) {
1093 
1094  iter_next = iter->next;
1095 
1096  pi = list_entry (iter, struct process_info, list);
1097  list_del (&pi->list);
1098  free (pi);
1099  }
1100 
1101  list_del (&cpg_iteration_instance->list);
1102  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
1103 }
1104 
1105 static void cpg_pd_finalize (struct cpg_pd *cpd)
1106 {
1107  struct list_head *iter, *iter_next;
1108  struct cpg_iteration_instance *cpii;
1109 
1110  zcb_all_free(cpd);
1111  for (iter = cpd->iteration_instance_list_head.next;
1112  iter != &cpd->iteration_instance_list_head;
1113  iter = iter_next) {
1114 
1115  iter_next = iter->next;
1116 
1117  cpii = list_entry (iter, struct cpg_iteration_instance, list);
1118 
1119  cpg_iteration_instance_finalize (cpii);
1120  }
1121 
1122  list_del (&cpd->list);
1123 }
1124 
1125 static int cpg_lib_exit_fn (void *conn)
1126 {
1127  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1128 
1129  log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p", conn);
1130 
1131  if (cpd->group_name.length > 0 && cpd->cpd_state != CPD_STATE_LEAVE_STARTED) {
1132  cpg_node_joinleave_send (cpd->pid, &cpd->group_name,
1134  }
1135 
1136  cpg_pd_finalize (cpd);
1137 
1138  api->ipc_refcnt_dec (conn);
1139  return (0);
1140 }
1141 
1142 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason)
1143 {
1145  struct iovec req_exec_cpg_iovec;
1146  int result;
1147 
1148  memcpy(&req_exec_cpg_procjoin.group_name, group_name, sizeof(mar_cpg_name_t));
1149  req_exec_cpg_procjoin.pid = pid;
1150  req_exec_cpg_procjoin.reason = reason;
1151 
1152  req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
1154 
1155  req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
1156  req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
1157 
1158  result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
1159 
1160  return (result);
1161 }
1162 
1163 /* Can byteswap join & leave messages */
1164 static void exec_cpg_procjoin_endian_convert (void *msg)
1165 {
1167 
1168  req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
1169  swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
1170  req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
1171 }
1172 
1173 static void exec_cpg_joinlist_endian_convert (void *msg_v)
1174 {
1175  char *msg = msg_v;
1176  struct qb_ipc_response_header *res = (struct qb_ipc_response_header *)msg;
1177  struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(struct qb_ipc_response_header));
1178 
1179  swab_mar_int32_t (&res->size);
1180 
1181  while ((const char*)jle < msg + res->size) {
1182  jle->pid = swab32(jle->pid);
1183  swab_mar_cpg_name_t (&jle->group_name);
1184  jle++;
1185  }
1186 }
1187 
1188 static void exec_cpg_downlist_endian_convert_old (void *msg)
1189 {
1190 }
1191 
1192 static void exec_cpg_downlist_endian_convert (void *msg)
1193 {
1195  unsigned int i;
1196 
1197  req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
1198  req_exec_cpg_downlist->old_members = swab32(req_exec_cpg_downlist->old_members);
1199 
1200  for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
1201  req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
1202  }
1203 }
1204 
1205 
1206 static void exec_cpg_mcast_endian_convert (void *msg)
1207 {
1208  struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg;
1209 
1210  swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1211  swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1212  req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
1213  req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
1214  swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1215 }
1216 
1217 static void exec_cpg_partial_mcast_endian_convert (void *msg)
1218 {
1220 
1221  swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1222  swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1223  req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
1224  req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
1225  req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
1226  req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type);
1227  swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1228 }
1229 
1230 static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
1231  struct list_head *iter;
1232 
1233  for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
1234  struct process_info *pi = list_entry (iter, struct process_info, list);
1235  iter = iter->next;
1236 
1237  if (pi->pid == pid && pi->nodeid == nodeid &&
1238  mar_name_compare (&pi->group, group_name) == 0) {
1239  return pi;
1240  }
1241  }
1242 
1243  return NULL;
1244 }
1245 
1246 static void do_proc_join(
1247  const mar_cpg_name_t *name,
1248  uint32_t pid,
1249  unsigned int nodeid,
1250  int reason)
1251 {
1252  struct process_info *pi;
1253  struct process_info *pi_entry;
1254  mar_cpg_address_t notify_info;
1255  struct list_head *list;
1256  struct list_head *list_to_add = NULL;
1257 
1258  if (process_info_find (name, pid, nodeid) != NULL) {
1259  return ;
1260  }
1261  pi = malloc (sizeof (struct process_info));
1262  if (!pi) {
1263  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
1264  return;
1265  }
1266  pi->nodeid = nodeid;
1267  pi->pid = pid;
1268  memcpy(&pi->group, name, sizeof(*name));
1269  list_init(&pi->list);
1270 
1271  /*
1272  * Insert new process in sorted order so synchronization works properly
1273  */
1274  list_to_add = &process_info_list_head;
1275  for (list = process_info_list_head.next; list != &process_info_list_head; list = list->next) {
1276 
1277  pi_entry = list_entry(list, struct process_info, list);
1278  if (pi_entry->nodeid > pi->nodeid ||
1279  (pi_entry->nodeid == pi->nodeid && pi_entry->pid > pi->pid)) {
1280 
1281  break;
1282  }
1283  list_to_add = list;
1284  }
1285  list_add (&pi->list, list_to_add);
1286 
1287  notify_info.pid = pi->pid;
1288  notify_info.nodeid = nodeid;
1289  notify_info.reason = reason;
1290 
1291  notify_lib_joinlist(&pi->group, NULL,
1292  1, &notify_info,
1293  0, NULL,
1295 }
1296 
1297 static void do_proc_leave(
1298  const mar_cpg_name_t *name,
1299  uint32_t pid,
1300  unsigned int nodeid,
1301  int reason)
1302 {
1303  struct process_info *pi;
1304  struct list_head *iter;
1305  mar_cpg_address_t notify_info;
1306 
1307  notify_info.pid = pid;
1308  notify_info.nodeid = nodeid;
1309  notify_info.reason = reason;
1310 
1311  notify_lib_joinlist(name, NULL,
1312  0, NULL,
1313  1, &notify_info,
1315 
1316  for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
1317  pi = list_entry(iter, struct process_info, list);
1318  iter = iter->next;
1319 
1320  if (pi->pid == pid && pi->nodeid == nodeid &&
1321  mar_name_compare (&pi->group, name)==0) {
1322  list_del (&pi->list);
1323  free (pi);
1324  }
1325  }
1326 }
1327 
1328 static void message_handler_req_exec_cpg_downlist_old (
1329  const void *message,
1330  unsigned int nodeid)
1331 {
1332  log_printf (LOGSYS_LEVEL_WARNING, "downlist OLD from node 0x%x",
1333  nodeid);
1334 }
1335 
1336 static void message_handler_req_exec_cpg_downlist(
1337  const void *message,
1338  unsigned int nodeid)
1339 {
1340  const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
1341  int i;
1342  struct list_head *iter;
1343  struct downlist_msg *stored_msg;
1344  int found;
1345 
1346  if (downlist_state != CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
1347  log_printf (LOGSYS_LEVEL_WARNING, "downlist left_list: %d received in state %d",
1348  req_exec_cpg_downlist->left_nodes, downlist_state);
1349  return;
1350  }
1351 
1352  stored_msg = malloc (sizeof (struct downlist_msg));
1353  stored_msg->sender_nodeid = nodeid;
1354  stored_msg->old_members = req_exec_cpg_downlist->old_members;
1355  stored_msg->left_nodes = req_exec_cpg_downlist->left_nodes;
1356  memcpy (stored_msg->nodeids, req_exec_cpg_downlist->nodeids,
1357  req_exec_cpg_downlist->left_nodes * sizeof (mar_uint32_t));
1358  list_init (&stored_msg->list);
1359  list_add (&stored_msg->list, &downlist_messages_head);
1360 
1361  for (i = 0; i < my_member_list_entries; i++) {
1362  found = 0;
1363  for (iter = downlist_messages_head.next;
1364  iter != &downlist_messages_head;
1365  iter = iter->next) {
1366 
1367  stored_msg = list_entry(iter, struct downlist_msg, list);
1368  if (my_member_list[i] == stored_msg->sender_nodeid) {
1369  found = 1;
1370  }
1371  }
1372  if (!found) {
1373  return;
1374  }
1375  }
1376 
1377  downlist_master_choose_and_send ();
1378 }
1379 
1380 
1381 static void message_handler_req_exec_cpg_procjoin (
1382  const void *message,
1383  unsigned int nodeid)
1384 {
1385  const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
1386 
1387  log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node 0x%x (%s) for pid %u",
1388  nodeid,
1389  api->totem_ifaces_print(nodeid),
1390  (unsigned int)req_exec_cpg_procjoin->pid);
1391 
1392  do_proc_join (&req_exec_cpg_procjoin->group_name,
1393  req_exec_cpg_procjoin->pid, nodeid,
1395 }
1396 
1397 static void message_handler_req_exec_cpg_procleave (
1398  const void *message,
1399  unsigned int nodeid)
1400 {
1401  const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
1402 
1403  log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node 0x%x (%s) for pid %u",
1404  nodeid,
1405  api->totem_ifaces_print(nodeid),
1406  (unsigned int)req_exec_cpg_procjoin->pid);
1407 
1408  do_proc_leave (&req_exec_cpg_procjoin->group_name,
1409  req_exec_cpg_procjoin->pid, nodeid,
1410  req_exec_cpg_procjoin->reason);
1411 }
1412 
1413 
1414 /* Got a proclist from another node */
1415 static void message_handler_req_exec_cpg_joinlist (
1416  const void *message_v,
1417  unsigned int nodeid)
1418 {
1419  const char *message = message_v;
1420  const struct qb_ipc_response_header *res = (const struct qb_ipc_response_header *)message;
1421  const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(struct qb_ipc_response_header));
1422  struct joinlist_msg *stored_msg;
1423 
1424  log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node 0x%x",
1425  nodeid);
1426 
1427  while ((const char*)jle < message + res->size) {
1428  stored_msg = malloc (sizeof (struct joinlist_msg));
1429  memset(stored_msg, 0, sizeof (struct joinlist_msg));
1430  stored_msg->sender_nodeid = nodeid;
1431  stored_msg->pid = jle->pid;
1432  memcpy(&stored_msg->group_name, &jle->group_name, sizeof(mar_cpg_name_t));
1433  list_init (&stored_msg->list);
1434  list_add (&stored_msg->list, &joinlist_messages_head);
1435  jle++;
1436  }
1437 }
1438 
1439 static void message_handler_req_exec_cpg_mcast (
1440  const void *message,
1441  unsigned int nodeid)
1442 {
1443  const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message;
1445  int msglen = req_exec_cpg_mcast->msglen;
1446  struct list_head *iter, *pi_iter;
1447  struct cpg_pd *cpd;
1448  struct iovec iovec[2];
1449  int known_node = 0;
1450 
1452  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
1453  res_lib_cpg_mcast.msglen = msglen;
1454  res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
1455  res_lib_cpg_mcast.nodeid = nodeid;
1456 
1457  memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
1458  sizeof(mar_cpg_name_t));
1459  iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
1460  iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
1461 
1462  iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
1463  iovec[1].iov_len = msglen;
1464 
1465  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
1466  cpd = list_entry(iter, struct cpg_pd, list);
1467  iter = iter->next;
1468 
1470  && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1471 
1472  if (!known_node) {
1473  /* Try to find, if we know the node */
1474  for (pi_iter = process_info_list_head.next;
1475  pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
1476 
1477  struct process_info *pi = list_entry (pi_iter, struct process_info, list);
1478 
1479  if (pi->nodeid == nodeid &&
1480  mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
1481  known_node = 1;
1482  break;
1483  }
1484  }
1485  }
1486 
1487  if (!known_node) {
1488  log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
1489  return ;
1490  }
1491 
1492  api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
1493  }
1494  }
1495 }
1496 
1497 static void message_handler_req_exec_cpg_partial_mcast (
1498  const void *message,
1499  unsigned int nodeid)
1500 {
1501  const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
1503  int msglen = req_exec_cpg_mcast->fraglen;
1504  struct list_head *iter, *pi_iter;
1505  struct cpg_pd *cpd;
1506  struct iovec iovec[2];
1507  int known_node = 0;
1508 
1509  log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen);
1510 
1512  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
1513  res_lib_cpg_mcast.fraglen = msglen;
1514  res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
1515  res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
1516  res_lib_cpg_mcast.type = req_exec_cpg_mcast->type;
1517  res_lib_cpg_mcast.nodeid = nodeid;
1518 
1519  memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
1520  sizeof(mar_cpg_name_t));
1521  iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
1522  iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
1523 
1524  iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
1525  iovec[1].iov_len = msglen;
1526 
1527  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
1528  cpd = list_entry(iter, struct cpg_pd, list);
1529  iter = iter->next;
1530 
1532  && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1533 
1534  if (!known_node) {
1535  /* Try to find, if we know the node */
1536  for (pi_iter = process_info_list_head.next;
1537  pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
1538 
1539  struct process_info *pi = list_entry (pi_iter, struct process_info, list);
1540 
1541  if (pi->nodeid == nodeid &&
1542  mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
1543  known_node = 1;
1544  break;
1545  }
1546  }
1547  }
1548 
1549  if (!known_node) {
1550  log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
1551  return ;
1552  }
1553 
1554  api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
1555  }
1556  }
1557 }
1558 
1559 
1560 static int cpg_exec_send_downlist(void)
1561 {
1562  struct iovec iov;
1563 
1564  g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
1565  g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
1566 
1567  g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1568 
1569  iov.iov_base = (void *)&g_req_exec_cpg_downlist;
1570  iov.iov_len = g_req_exec_cpg_downlist.header.size;
1571 
1572  return (api->totem_mcast (&iov, 1, TOTEM_AGREED));
1573 }
1574 
1575 static int cpg_exec_send_joinlist(void)
1576 {
1577  int count = 0;
1578  struct list_head *iter;
1579  struct qb_ipc_response_header *res;
1580  char *buf;
1581  struct join_list_entry *jle;
1582  struct iovec req_exec_cpg_iovec;
1583 
1584  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
1585  struct process_info *pi = list_entry (iter, struct process_info, list);
1586 
1587  if (pi->nodeid == api->totem_nodeid_get ()) {
1588  count++;
1589  }
1590  }
1591 
1592  /* Nothing to send */
1593  if (!count)
1594  return 0;
1595 
1596  buf = alloca(sizeof(struct qb_ipc_response_header) + sizeof(struct join_list_entry) * count);
1597  if (!buf) {
1598  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
1599  return -1;
1600  }
1601 
1602  jle = (struct join_list_entry *)(buf + sizeof(struct qb_ipc_response_header));
1603  res = (struct qb_ipc_response_header *)buf;
1604 
1605  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
1606  struct process_info *pi = list_entry (iter, struct process_info, list);
1607 
1608  if (pi->nodeid == api->totem_nodeid_get ()) {
1609  memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t));
1610  jle->pid = pi->pid;
1611  jle++;
1612  }
1613  }
1614 
1616  res->size = sizeof(struct qb_ipc_response_header)+sizeof(struct join_list_entry) * count;
1617 
1618  req_exec_cpg_iovec.iov_base = buf;
1619  req_exec_cpg_iovec.iov_len = res->size;
1620 
1621  return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED));
1622 }
1623 
1624 static int cpg_lib_init_fn (void *conn)
1625 {
1626  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1627  memset (cpd, 0, sizeof(struct cpg_pd));
1628  cpd->conn = conn;
1629  list_add (&cpd->list, &cpg_pd_list_head);
1630 
1631  list_init (&cpd->iteration_instance_list_head);
1632  list_init (&cpd->zcb_mapped_list_head);
1633 
1634  api->ipc_refcnt_inc (conn);
1635  log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p, cpd=%p", conn, cpd);
1636  return (0);
1637 }
1638 
1639 /* Join message from the library */
1640 static void message_handler_req_lib_cpg_join (void *conn, const void *message)
1641 {
1642  const struct req_lib_cpg_join *req_lib_cpg_join = message;
1643  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1645  cs_error_t error = CS_OK;
1646  struct list_head *iter;
1647 
1648  /* Test, if we don't have same pid and group name joined */
1649  for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
1650  struct cpg_pd *cpd_item = list_entry (iter, struct cpg_pd, list);
1651 
1652  if (cpd_item->pid == req_lib_cpg_join->pid &&
1653  mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->group_name) == 0) {
1654 
1655  /* We have same pid and group name joined -> return error */
1656  error = CS_ERR_EXIST;
1657  goto response_send;
1658  }
1659  }
1660 
1661  /*
1662  * Same check must be done in process info list, because there may be not yet delivered
1663  * leave of client.
1664  */
1665  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
1666  struct process_info *pi = list_entry (iter, struct process_info, list);
1667 
1668  if (pi->nodeid == api->totem_nodeid_get () && pi->pid == req_lib_cpg_join->pid &&
1669  mar_name_compare(&req_lib_cpg_join->group_name, &pi->group) == 0) {
1670  /* We have same pid and group name joined -> return error */
1671  error = CS_ERR_TRY_AGAIN;
1672  goto response_send;
1673  }
1674  }
1675 
1676  if (req_lib_cpg_join->group_name.length > CPG_MAX_NAME_LENGTH) {
1677  error = CS_ERR_NAME_TOO_LONG;
1678  goto response_send;
1679  }
1680 
1681  switch (cpd->cpd_state) {
1682  case CPD_STATE_UNJOINED:
1683  error = CS_OK;
1685  cpd->pid = req_lib_cpg_join->pid;
1686  cpd->flags = req_lib_cpg_join->flags;
1687  memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
1688  sizeof (cpd->group_name));
1689 
1690  cpg_node_joinleave_send (req_lib_cpg_join->pid,
1691  &req_lib_cpg_join->group_name,
1693  break;
1695  error = CS_ERR_BUSY;
1696  break;
1698  error = CS_ERR_EXIST;
1699  break;
1701  error = CS_ERR_EXIST;
1702  break;
1703  }
1704 
1705 response_send:
1706  res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
1708  res_lib_cpg_join.header.error = error;
1709  api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
1710 }
1711 
1712 /* Leave message from the library */
1713 static void message_handler_req_lib_cpg_leave (void *conn, const void *message)
1714 {
1716  cs_error_t error = CS_OK;
1717  struct req_lib_cpg_leave *req_lib_cpg_leave = (struct req_lib_cpg_leave *)message;
1718  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1719 
1720  log_printf(LOGSYS_LEVEL_DEBUG, "got leave request on %p", conn);
1721 
1722  switch (cpd->cpd_state) {
1723  case CPD_STATE_UNJOINED:
1724  error = CS_ERR_NOT_EXIST;
1725  break;
1727  error = CS_ERR_NOT_EXIST;
1728  break;
1730  error = CS_ERR_BUSY;
1731  break;
1733  error = CS_OK;
1735  cpg_node_joinleave_send (req_lib_cpg_leave->pid,
1736  &req_lib_cpg_leave->group_name,
1739  break;
1740  }
1741 
1742  /* send return */
1743  res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
1745  res_lib_cpg_leave.header.error = error;
1747 }
1748 
1749 /* Finalize message from library */
1750 static void message_handler_req_lib_cpg_finalize (
1751  void *conn,
1752  const void *message)
1753 {
1754  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1756  cs_error_t error = CS_OK;
1757 
1758  log_printf (LOGSYS_LEVEL_DEBUG, "cpg finalize for conn=%p", conn);
1759 
1760  /*
1761  * We will just remove cpd from list. After this call, connection will be
1762  * closed on lib side, and cpg_lib_exit_fn will be called
1763  */
1764  list_del (&cpd->list);
1765  list_init (&cpd->list);
1766 
1767  res_lib_cpg_finalize.header.size = sizeof (res_lib_cpg_finalize);
1769  res_lib_cpg_finalize.header.error = error;
1770 
1772  sizeof (res_lib_cpg_finalize));
1773 }
1774 
1775 static int
1776 memory_map (
1777  const char *path,
1778  size_t bytes,
1779  void **buf)
1780 {
1781  int32_t fd;
1782  void *addr;
1783  int32_t res;
1784 
1785  fd = open (path, O_RDWR, 0600);
1786 
1787  unlink (path);
1788 
1789  if (fd == -1) {
1790  return (-1);
1791  }
1792 
1793  res = ftruncate (fd, bytes);
1794  if (res == -1) {
1795  goto error_close_unlink;
1796  }
1797 
1798  addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1799  MAP_SHARED, fd, 0);
1800 
1801  if (addr == MAP_FAILED) {
1802  goto error_close_unlink;
1803  }
1804 #ifdef MADV_NOSYNC
1805  madvise(addr, bytes, MADV_NOSYNC);
1806 #endif
1807 
1808  res = close (fd);
1809  if (res) {
1810  munmap (addr, bytes);
1811  return (-1);
1812  }
1813  *buf = addr;
1814  return (0);
1815 
1816 error_close_unlink:
1817  close (fd);
1818  unlink(path);
1819  return -1;
1820 }
1821 
1822 static inline int zcb_alloc (
1823  struct cpg_pd *cpd,
1824  const char *path_to_file,
1825  size_t size,
1826  void **addr)
1827 {
1828  struct zcb_mapped *zcb_mapped;
1829  unsigned int res;
1830 
1831  zcb_mapped = malloc (sizeof (struct zcb_mapped));
1832  if (zcb_mapped == NULL) {
1833  return (-1);
1834  }
1835 
1836  res = memory_map (
1837  path_to_file,
1838  size,
1839  addr);
1840  if (res == -1) {
1841  free (zcb_mapped);
1842  return (-1);
1843  }
1844 
1845  list_init (&zcb_mapped->list);
1846  zcb_mapped->addr = *addr;
1847  zcb_mapped->size = size;
1848  list_add_tail (&zcb_mapped->list, &cpd->zcb_mapped_list_head);
1849  return (0);
1850 }
1851 
1852 
1853 static inline int zcb_free (struct zcb_mapped *zcb_mapped)
1854 {
1855  unsigned int res;
1856 
1857  res = munmap (zcb_mapped->addr, zcb_mapped->size);
1858  list_del (&zcb_mapped->list);
1859  free (zcb_mapped);
1860  return (res);
1861 }
1862 
1863 static inline int zcb_by_addr_free (struct cpg_pd *cpd, void *addr)
1864 {
1865  struct list_head *list;
1866  struct zcb_mapped *zcb_mapped;
1867  unsigned int res = 0;
1868 
1869  for (list = cpd->zcb_mapped_list_head.next;
1870  list != &cpd->zcb_mapped_list_head; list = list->next) {
1871 
1872  zcb_mapped = list_entry (list, struct zcb_mapped, list);
1873 
1874  if (zcb_mapped->addr == addr) {
1875  res = zcb_free (zcb_mapped);
1876  break;
1877  }
1878 
1879  }
1880  return (res);
1881 }
1882 
1883 static inline int zcb_all_free (
1884  struct cpg_pd *cpd)
1885 {
1886  struct list_head *list;
1887  struct zcb_mapped *zcb_mapped;
1888 
1889  for (list = cpd->zcb_mapped_list_head.next;
1890  list != &cpd->zcb_mapped_list_head;) {
1891 
1892  zcb_mapped = list_entry (list, struct zcb_mapped, list);
1893 
1894  list = list->next;
1895 
1896  zcb_free (zcb_mapped);
1897  }
1898  return (0);
1899 }
1900 
1901 union u {
1902  uint64_t server_addr;
1903  void *server_ptr;
1904 };
1905 
1906 static uint64_t void2serveraddr (void *server_ptr)
1907 {
1908  union u u;
1909 
1910  u.server_ptr = server_ptr;
1911  return (u.server_addr);
1912 }
1913 
1914 static void *serveraddr2void (uint64_t server_addr)
1915 {
1916  union u u;
1917 
1919  return (u.server_ptr);
1920 };
1921 
1922 static void message_handler_req_lib_cpg_zc_alloc (
1923  void *conn,
1924  const void *message)
1925 {
1927  struct qb_ipc_response_header res_header;
1928  void *addr = NULL;
1929  struct coroipcs_zc_header *zc_header;
1930  unsigned int res;
1931  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1932 
1933  log_printf(LOGSYS_LEVEL_DEBUG, "path: %s", hdr->path_to_file);
1934 
1935  res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1936  &addr);
1937  assert(res == 0);
1938 
1939  zc_header = (struct coroipcs_zc_header *)addr;
1940  zc_header->server_address = void2serveraddr(addr);
1941 
1942  res_header.size = sizeof (struct qb_ipc_response_header);
1943  res_header.id = 0;
1944  api->ipc_response_send (conn,
1945  &res_header,
1946  res_header.size);
1947 }
1948 
1949 static void message_handler_req_lib_cpg_zc_free (
1950  void *conn,
1951  const void *message)
1952 {
1954  struct qb_ipc_response_header res_header;
1955  void *addr = NULL;
1956  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1957 
1958  log_printf(LOGSYS_LEVEL_DEBUG, " free'ing");
1959 
1960  addr = serveraddr2void (hdr->server_address);
1961 
1962  zcb_by_addr_free (cpd, addr);
1963 
1964  res_header.size = sizeof (struct qb_ipc_response_header);
1965  res_header.id = 0;
1966  api->ipc_response_send (
1967  conn, &res_header,
1968  res_header.size);
1969 }
1970 
1971 /* Fragmented mcast message from the library */
1972 static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
1973 {
1974  const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
1975  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1976  mar_cpg_name_t group_name = cpd->group_name;
1977 
1978  struct iovec req_exec_cpg_iovec[2];
1979  struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
1981  int msglen = req_lib_cpg_mcast->fraglen;
1982  int result;
1983  cs_error_t error = CS_ERR_NOT_EXIST;
1984 
1985  log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
1986  log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
1987 
1988  switch (cpd->cpd_state) {
1989  case CPD_STATE_UNJOINED:
1990  error = CS_ERR_NOT_EXIST;
1991  break;
1993  error = CS_ERR_NOT_EXIST;
1994  break;
1996  error = CS_OK;
1997  break;
1999  error = CS_OK;
2000  break;
2001  }
2002 
2003  res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send);
2005 
2006  if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) {
2008  }
2010  error = CS_ERR_INTERRUPT;
2011  }
2012 
2013  if (error == CS_OK) {
2014  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
2015  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
2017  req_exec_cpg_mcast.pid = cpd->pid;
2018  req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
2019  req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
2020  req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
2021  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
2022  memcpy(&req_exec_cpg_mcast.group_name, &group_name,
2023  sizeof(mar_cpg_name_t));
2024 
2025  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
2026  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
2027  req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
2028  req_exec_cpg_iovec[1].iov_len = msglen;
2029 
2030  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
2031  assert(result == 0);
2032  } else {
2033  log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
2034  conn, group_name.value, cpd->cpd_state, error);
2035  }
2036 
2037  res_lib_cpg_partial_send.header.error = error;
2039  sizeof (res_lib_cpg_partial_send));
2040 }
2041 
2042 /* Mcast message from the library */
2043 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
2044 {
2045  const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
2046  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
2047  mar_cpg_name_t group_name = cpd->group_name;
2048 
2049  struct iovec req_exec_cpg_iovec[2];
2050  struct req_exec_cpg_mcast req_exec_cpg_mcast;
2051  int msglen = req_lib_cpg_mcast->msglen;
2052  int result;
2053  cs_error_t error = CS_ERR_NOT_EXIST;
2054 
2055  log_printf(LOGSYS_LEVEL_TRACE, "got mcast request on %p", conn);
2056 
2057  switch (cpd->cpd_state) {
2058  case CPD_STATE_UNJOINED:
2059  error = CS_ERR_NOT_EXIST;
2060  break;
2062  error = CS_ERR_NOT_EXIST;
2063  break;
2065  error = CS_OK;
2066  break;
2068  error = CS_OK;
2069  break;
2070  }
2071 
2072  if (error == CS_OK) {
2073  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
2074  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
2076  req_exec_cpg_mcast.pid = cpd->pid;
2077  req_exec_cpg_mcast.msglen = msglen;
2078  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
2079  memcpy(&req_exec_cpg_mcast.group_name, &group_name,
2080  sizeof(mar_cpg_name_t));
2081 
2082  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
2083  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
2084  req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
2085  req_exec_cpg_iovec[1].iov_len = msglen;
2086 
2087  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
2088  assert(result == 0);
2089  } else {
2090  log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
2091  conn, group_name.value, cpd->cpd_state, error);
2092  }
2093 }
2094 
2095 static void message_handler_req_lib_cpg_zc_execute (
2096  void *conn,
2097  const void *message)
2098 {
2100  struct qb_ipc_request_header *header;
2102  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
2103  struct iovec req_exec_cpg_iovec[2];
2104  struct req_exec_cpg_mcast req_exec_cpg_mcast;
2105  struct req_lib_cpg_mcast *req_lib_cpg_mcast;
2106  int result;
2107  cs_error_t error = CS_ERR_NOT_EXIST;
2108 
2109  log_printf(LOGSYS_LEVEL_TRACE, "got ZC mcast request on %p", conn);
2110 
2111  header = (struct qb_ipc_request_header *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
2112  req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)header;
2113 
2114  switch (cpd->cpd_state) {
2115  case CPD_STATE_UNJOINED:
2116  error = CS_ERR_NOT_EXIST;
2117  break;
2119  error = CS_ERR_NOT_EXIST;
2120  break;
2122  error = CS_OK;
2123  break;
2125  error = CS_OK;
2126  break;
2127  }
2128 
2129  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
2131  if (error == CS_OK) {
2132  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
2133  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
2135  req_exec_cpg_mcast.pid = cpd->pid;
2136  req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
2137  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
2138  memcpy(&req_exec_cpg_mcast.group_name, &cpd->group_name,
2139  sizeof(mar_cpg_name_t));
2140 
2141  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
2142  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
2143  req_exec_cpg_iovec[1].iov_base = (char *)header + sizeof(struct req_lib_cpg_mcast);
2144  req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
2145 
2146  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
2147  if (result == 0) {
2148  res_lib_cpg_mcast.header.error = CS_OK;
2149  } else {
2150  res_lib_cpg_mcast.header.error = CS_ERR_TRY_AGAIN;
2151  }
2152  } else {
2153  res_lib_cpg_mcast.header.error = error;
2154  }
2155 
2156  api->ipc_response_send (conn, &res_lib_cpg_mcast,
2157  sizeof (res_lib_cpg_mcast));
2158 
2159 }
2160 
2161 static void message_handler_req_lib_cpg_membership (void *conn,
2162  const void *message)
2163 {
2165  (struct req_lib_cpg_membership_get *)message;
2167  struct list_head *iter;
2168  int member_count = 0;
2169 
2171  res_lib_cpg_membership_get.header.error = CS_OK;
2172  res_lib_cpg_membership_get.header.size =
2173  sizeof (struct res_lib_cpg_membership_get);
2174 
2175  for (iter = process_info_list_head.next;
2176  iter != &process_info_list_head; iter = iter->next) {
2177 
2178  struct process_info *pi = list_entry (iter, struct process_info, list);
2179  if (mar_name_compare (&pi->group, &req_lib_cpg_membership_get->group_name) == 0) {
2180  res_lib_cpg_membership_get.member_list[member_count].nodeid = pi->nodeid;
2181  res_lib_cpg_membership_get.member_list[member_count].pid = pi->pid;
2182  member_count += 1;
2183  }
2184  }
2185  res_lib_cpg_membership_get.member_count = member_count;
2186 
2188  sizeof (res_lib_cpg_membership_get));
2189 }
2190 
2191 static void message_handler_req_lib_cpg_local_get (void *conn,
2192  const void *message)
2193 {
2195 
2196  res_lib_cpg_local_get.header.size = sizeof (res_lib_cpg_local_get);
2198  res_lib_cpg_local_get.header.error = CS_OK;
2199  res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get ();
2200 
2202  sizeof (res_lib_cpg_local_get));
2203 }
2204 
2205 static void message_handler_req_lib_cpg_iteration_initialize (
2206  void *conn,
2207  const void *message)
2208 {
2210  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
2211  hdb_handle_t cpg_iteration_handle = 0;
2213  struct list_head *iter, *iter2;
2214  struct cpg_iteration_instance *cpg_iteration_instance;
2215  cs_error_t error = CS_OK;
2216  int res;
2217 
2218  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration initialize");
2219 
2220  /* Because between calling this function and *next can be some operations which will
2221  * change list, we must do full copy.
2222  */
2223 
2224  /*
2225  * Create new iteration instance
2226  */
2227  res = hdb_handle_create (&cpg_iteration_handle_t_db, sizeof (struct cpg_iteration_instance),
2228  &cpg_iteration_handle);
2229 
2230  if (res != 0) {
2231  error = CS_ERR_NO_MEMORY;
2232  goto response_send;
2233  }
2234 
2235  res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (void *)&cpg_iteration_instance);
2236 
2237  if (res != 0) {
2238  error = CS_ERR_BAD_HANDLE;
2239  goto error_destroy;
2240  }
2241 
2242  list_init (&cpg_iteration_instance->items_list_head);
2243  cpg_iteration_instance->handle = cpg_iteration_handle;
2244 
2245  /*
2246  * Create copy of process_info list "grouped by" group name
2247  */
2248  for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
2249  struct process_info *pi = list_entry (iter, struct process_info, list);
2250  struct process_info *new_pi;
2251 
2252  if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
2253  /*
2254  * Try to find processed group name in our list new list
2255  */
2256  int found = 0;
2257 
2258  for (iter2 = cpg_iteration_instance->items_list_head.next;
2259  iter2 != &cpg_iteration_instance->items_list_head;
2260  iter2 = iter2->next) {
2261  struct process_info *pi2 = list_entry (iter2, struct process_info, list);
2262 
2263  if (mar_name_compare (&pi2->group, &pi->group) == 0) {
2264  found = 1;
2265  break;
2266  }
2267  }
2268 
2269  if (found) {
2270  /*
2271  * We have this name in list -> don't add
2272  */
2273  continue ;
2274  }
2275  } else if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_ONE_GROUP) {
2276  /*
2277  * Test pi group name with request
2278  */
2279  if (mar_name_compare (&pi->group, &req_lib_cpg_iterationinitialize->group_name) != 0)
2280  /*
2281  * Not same -> don't add
2282  */
2283  continue ;
2284  }
2285 
2286  new_pi = malloc (sizeof (struct process_info));
2287  if (!new_pi) {
2288  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
2289 
2290  error = CS_ERR_NO_MEMORY;
2291 
2292  goto error_put_destroy;
2293  }
2294 
2295  memcpy (new_pi, pi, sizeof (struct process_info));
2296  list_init (&new_pi->list);
2297 
2298  if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
2299  /*
2300  * pid and nodeid -> undefined
2301  */
2302  new_pi->pid = new_pi->nodeid = 0;
2303  }
2304 
2305  /*
2306  * We will return list "grouped" by "group name", so try to find right place to add
2307  */
2308  for (iter2 = cpg_iteration_instance->items_list_head.next;
2309  iter2 != &cpg_iteration_instance->items_list_head;
2310  iter2 = iter2->next) {
2311  struct process_info *pi2 = list_entry (iter2, struct process_info, list);
2312 
2313  if (mar_name_compare (&pi2->group, &pi->group) == 0) {
2314  break;
2315  }
2316  }
2317 
2318  list_add (&new_pi->list, iter2);
2319  }
2320 
2321  /*
2322  * Now we have a full "grouped by" copy of process_info list
2323  */
2324 
2325  /*
2326  * Add instance to current cpd list
2327  */
2328  list_init (&cpg_iteration_instance->list);
2329  list_add (&cpg_iteration_instance->list, &cpd->iteration_instance_list_head);
2330 
2331  cpg_iteration_instance->current_pointer = &cpg_iteration_instance->items_list_head;
2332 
2333 error_put_destroy:
2334  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2335 error_destroy:
2336  if (error != CS_OK) {
2337  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2338  }
2339 
2340 response_send:
2343  res_lib_cpg_iterationinitialize.header.error = error;
2344  res_lib_cpg_iterationinitialize.iteration_handle = cpg_iteration_handle;
2345 
2348 }
2349 
2350 static void message_handler_req_lib_cpg_iteration_next (
2351  void *conn,
2352  const void *message)
2353 {
2354  const struct req_lib_cpg_iterationnext *req_lib_cpg_iterationnext = message;
2356  struct cpg_iteration_instance *cpg_iteration_instance;
2357  cs_error_t error = CS_OK;
2358  int res;
2359  struct process_info *pi;
2360 
2361  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration next");
2362 
2363  res = hdb_handle_get (&cpg_iteration_handle_t_db,
2364  req_lib_cpg_iterationnext->iteration_handle,
2365  (void *)&cpg_iteration_instance);
2366 
2367  if (res != 0) {
2368  error = CS_ERR_LIBRARY;
2369  goto error_exit;
2370  }
2371 
2372  assert (cpg_iteration_instance);
2373 
2374  cpg_iteration_instance->current_pointer = cpg_iteration_instance->current_pointer->next;
2375 
2376  if (cpg_iteration_instance->current_pointer == &cpg_iteration_instance->items_list_head) {
2377  error = CS_ERR_NO_SECTIONS;
2378  goto error_put;
2379  }
2380 
2381  pi = list_entry (cpg_iteration_instance->current_pointer, struct process_info, list);
2382 
2383  /*
2384  * Copy iteration data
2385  */
2386  res_lib_cpg_iterationnext.description.nodeid = pi->nodeid;
2387  res_lib_cpg_iterationnext.description.pid = pi->pid;
2388  memcpy (&res_lib_cpg_iterationnext.description.group,
2389  &pi->group,
2390  sizeof (mar_cpg_name_t));
2391 
2392 error_put:
2393  hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
2394 error_exit:
2397  res_lib_cpg_iterationnext.header.error = error;
2398 
2400  sizeof (res_lib_cpg_iterationnext));
2401 }
2402 
2403 static void message_handler_req_lib_cpg_iteration_finalize (
2404  void *conn,
2405  const void *message)
2406 {
2409  struct cpg_iteration_instance *cpg_iteration_instance;
2410  cs_error_t error = CS_OK;
2411  int res;
2412 
2413  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration finalize");
2414 
2415  res = hdb_handle_get (&cpg_iteration_handle_t_db,
2416  req_lib_cpg_iterationfinalize->iteration_handle,
2417  (void *)&cpg_iteration_instance);
2418 
2419  if (res != 0) {
2420  error = CS_ERR_LIBRARY;
2421  goto error_exit;
2422  }
2423 
2424  assert (cpg_iteration_instance);
2425 
2426  cpg_iteration_instance_finalize (cpg_iteration_instance);
2427  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
2428 
2429 error_exit:
2432  res_lib_cpg_iterationfinalize.header.error = error;
2433 
2436 }
#define TOTEM_AGREED
Definition: coroapi.h:102
int initial_totem_conf_sent
Definition: exec/cpg.c:157
const char * name
Definition: coroapi.h:492
Definition: exec/cpg.c:1901
mar_cpg_address_t member_list[]
Definition: ipc_cpg.h:390
struct list_head list
Definition: exec/cpg.c:507
mar_req_coroipcc_zc_free_t struct
Definition: ipc_cpg.h:481
mar_uint32_t sender_nodeid
Definition: exec/cpg.c:503
#define CPG_MAX_NAME_LENGTH
Definition: cpg.h:115
mar_cpg_address_t struct
Definition: ipc_cpg.h:155
void(* lib_handler_fn)(void *conn, const void *msg)
Definition: coroapi.h:469
uint64_t initial_transition_counter
Definition: exec/cpg.c:159
#define LOGSYS_LEVEL_TRACE
Definition: logsys.h:75
mar_uint32_t sender_nodeid
Definition: exec/cpg.c:511
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
Definition: cpg.h:192
The req_lib_cpg_join struct.
Definition: ipc_cpg.h:251
mar_req_coroipcc_zc_alloc_t struct
Definition: ipc_cpg.h:472
The cpg_name struct.
Definition: cpg.h:119
struct list_head * next
Definition: list.h:47
The corosync_service_engine struct.
Definition: coroapi.h:491
struct list_head list
Definition: exec/cpg.c:167
mar_uint32_t old_members __attribute__((aligned(8)))
struct corosync_service_engine * cpg_get_service_engine_ver0(void)
Definition: exec/cpg.c:455
cpg_sync_state
Definition: exec/cpg.c:137
The res_lib_cpg_partial_deliver_callback struct.
Definition: ipc_cpg.h:345
The req_lib_cpg_mcast struct.
Definition: ipc_cpg.h:304
mar_cpg_name_t group
Definition: exec/cpg.c:193
The corosync_lib_handler struct.
Definition: coroapi.h:468
The res_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:375
struct message_header header
Definition: totemsrp.c:60
struct list_head * current_pointer
Definition: exec/cpg.c:169
The res_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:449
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:77
hdb_handle_t handle
Definition: exec/cpg.c:166
uint32_t pid
Definition: exec/cpg.c:192
#define CPG_MEMBERS_MAX
Definition: cpg.h:124
struct qb_ipc_request_header header __attribute__((aligned(8)))
struct qb_ipc_request_header header __attribute__((aligned(8)))
The res_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:433
The corosync_exec_handler struct.
Definition: coroapi.h:476
coroipcs_zc_header struct
Definition: ipc_cpg.h:498
uint64_t transition_counter
Definition: exec/cpg.c:158
Definition: list.h:46
#define log_printf(level, format, args...)
Definition: logsys.h:320
void(* exec_handler_fn)(const void *msg, unsigned int nodeid)
Definition: coroapi.h:477
struct list_head list
Definition: exec/cpg.c:194
The res_lib_cpg_partial_send struct.
Definition: ipc_cpg.h:297
uint64_t server_address
Definition: ipc_cpg.h:500
void * conn
Definition: exec/cpg.c:152
struct list_head iteration_instance_list_head
Definition: exec/cpg.c:161
#define SERVICE_ID_MAKE(a, b)
Definition: coroapi.h:459
The req_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:424
#define LOGSYS_LEVEL_WARNING
Definition: logsys.h:71
void *(* ipc_private_data_get)(void *conn)
Definition: coroapi.h:256
The res_lib_cpg_join struct.
Definition: ipc_cpg.h:261
struct list_head list
Definition: exec/cpg.c:514
unsigned int flags
Definition: exec/cpg.c:156
uint32_t pid
Definition: exec/cpg.c:512
unsigned int nodeid
Definition: coroapi.h:112
void(* ipc_source_set)(mar_message_source_t *source, void *conn)
Definition: coroapi.h:252
void(* ipc_refcnt_inc)(void *conn)
Definition: coroapi.h:268
mar_req_coroipcc_zc_execute_t struct
Definition: ipc_cpg.h:490
The res_lib_cpg_mcast struct.
Definition: ipc_cpg.h:326
struct list_head list
Definition: exec/cpg.c:89
#define LOGSYS_LEVEL_ERROR
Definition: logsys.h:70
size_t size
Definition: exec/cpg.c:91
Linked list API.
mar_cpg_name_t struct
Definition: ipc_cpg.h:112
void * server_ptr
Definition: exec/cpg.c:1903
void(* ipc_refcnt_dec)(void *conn)
Definition: coroapi.h:270
struct totem_ip_address rep
Definition: coroapi.h:123
uint32_t pid
Definition: exec/cpg.c:154
cs_error_t
The cs_error_t enum.
Definition: corotypes.h:94
The req_lib_cpg_leave struct.
Definition: ipc_cpg.h:408
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:74
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
Definition: ipc_cpg.h:378
int(* ipc_dispatch_iov_send)(void *conn, const struct iovec *iov, unsigned int iov_len)
Definition: coroapi.h:265
mar_cpg_name_t group_name
Definition: exec/cpg.c:200
The req_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:457
uint8_t mar_uint8_t
Definition: mar_gen.h:51
mar_cpg_name_t group_name
Definition: exec/cpg.c:153
The corosync_api_v1 struct.
Definition: coroapi.h:225
int(* ipc_dispatch_send)(void *conn, const void *msg, size_t mlen)
Definition: coroapi.h:263
cpg_message_req_types
Definition: exec/cpg.c:78
LOGSYS_DECLARE_SUBSYS("CPG")
DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db, NULL)
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
enum cpd_state cpd_state
Definition: exec/cpg.c:155
int(* totem_mcast)(const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee)
Definition: coroapi.h:281
The res_lib_cpg_finalize struct.
Definition: ipc_cpg.h:275
uint32_t mar_uint32_t
Definition: mar_gen.h:53
int(* ipc_response_send)(void *conn, const void *msg, size_t mlen)
Definition: coroapi.h:258
unsigned int nodeid
Definition: exec/cpg.c:191
The res_lib_cpg_local_get struct.
Definition: ipc_cpg.h:289
struct list_head list
Definition: exec/cpg.c:160
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
Definition: exec/cpg.c:198
qb_handle_t hdb_handle_t
Definition: hdb.h:52
The memb_ring_id struct.
Definition: coroapi.h:122
struct qb_ipc_request_header header __attribute__((aligned(8)))
The res_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:465
struct corosync_service_engine cpg_service_engine
Definition: exec/cpg.c:434
The req_lib_cpg_partial_mcast struct.
Definition: ipc_cpg.h:314
struct list_head zcb_mapped_list_head
Definition: exec/cpg.c:162
The req_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:441
uint32_t pid
Definition: exec/cpg.c:199
struct qb_ipc_request_header header __attribute__((aligned(8)))
The res_lib_cpg_confchg_callback struct.
Definition: ipc_cpg.h:384
cpg_downlist_state_e
Definition: exec/cpg.c:142
#define list_entry(ptr, type, member)
Definition: list.h:84
void * addr
Definition: exec/cpg.c:90
mar_cpg_name_t group_name
Definition: exec/cpg.c:513
unsigned long long seq
Definition: coroapi.h:124
char type
Definition: totemrrp.c:518
The req_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:367
struct list_head items_list_head
Definition: exec/cpg.c:168
uint64_t server_addr
Definition: exec/cpg.c:1902
The res_lib_cpg_leave struct.
Definition: ipc_cpg.h:417
unsigned int nodeid
Definition: coroapi.h:75
struct memb_ring_id ring_id
Definition: totemsrp.c:64
const char *(* totem_ifaces_print)(unsigned int nodeid)
Definition: coroapi.h:291
mar_cpg_ring_id_t struct
Definition: ipc_cpg.h:230
struct qb_ipc_request_header header __attribute__((aligned(8)))
unsigned int(* totem_nodeid_get)(void)
Definition: coroapi.h:275
cpd_state
Definition: exec/cpg.c:130
The res_lib_cpg_totem_confchg_callback struct.
Definition: ipc_cpg.h:398
DECLARE_LIST_INIT(cpg_pd_list_head)
Message from another node.
Definition: ipc_cpg.h:333
The mar_message_source_t struct.
Definition: coroapi.h:50