36 #include <sys/types.h>
37 #include <sys/socket.h>
39 #include <sys/ioctl.h>
40 #include <netinet/in.h>
48 #include <arpa/inet.h>
55 #include <qb/qbipc_common.h>
63 #define MESSAGE_REQ_SYNC_BARRIER 0
64 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1
80 const unsigned int *trans_list,
81 size_t trans_list_entries,
82 const unsigned int *member_list,
83 size_t member_list_entries,
98 struct qb_ipc_request_header header __attribute__((aligned(8)));
105 struct qb_ipc_request_header header __attribute__((aligned(8)));
106 struct memb_ring_id ring_id __attribute__((aligned(8)));
113 static int my_processing_idx = 0;
123 static size_t my_member_list_entries = 0;
125 static size_t my_trans_list_entries = 0;
127 static int my_processor_list_entries = 0;
131 static int my_service_list_entries = 0;
133 static void (*sync_synchronization_completed) (void);
135 static void sync_deliver_fn (
138 unsigned int msg_len,
139 int endian_conversion_required);
141 static int schedwrk_processor (
const void *context);
143 static void sync_process_enter (
void);
145 static void sync_process_call_init (
void);
152 static void *sync_group_handle;
159 int (*sync_callbacks_retrieve) (
162 void (*synchronization_completed) (
void))
172 "Couldn't initialize groups interface.");
185 sync_synchronization_completed = synchronization_completed;
191 static void sync_barrier_handler (
unsigned int nodeid,
const void *msg)
195 int barrier_reached = 1;
197 if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id,
203 for (i = 0; i < my_processor_list_entries; i++) {
204 if (my_processor_list[i].nodeid == nodeid) {
208 for (i = 0; i < my_processor_list_entries; i++) {
209 if (my_processor_list[i].received == 0) {
213 if (barrier_reached) {
215 my_service_list[my_processing_idx].name);
222 my_processing_idx += 1;
223 if (my_service_list_entries == my_processing_idx) {
224 sync_synchronization_completed ();
226 sync_process_enter ();
231 static void dummy_sync_abort (
void)
235 static int dummy_sync_process (
void)
240 static void dummy_sync_activate (
void)
244 static int service_entry_compare (
const void *a,
const void *b)
252 static void sync_service_build_handler (
unsigned int nodeid,
const void *msg)
256 int barrier_reached = 1;
258 int qsort_trigger = 0;
260 if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id,
265 for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) {
268 for (j = 0; j < my_service_list_entries; j++) {
269 if (req_exec_service_build_message->service_list[i] ==
270 my_service_list[j].service_id) {
276 my_service_list[my_service_list_entries].
state =
PROCESS;
277 my_service_list[my_service_list_entries].
service_id =
278 req_exec_service_build_message->service_list[i];
279 sprintf (my_service_list[my_service_list_entries].name,
280 "Unknown External Service (id = %d)\n",
281 req_exec_service_build_message->service_list[i]);
282 my_service_list[my_service_list_entries].
sync_init =
284 my_service_list[my_service_list_entries].
sync_abort =
290 my_service_list_entries += 1;
296 qsort (my_service_list, my_service_list_entries,
299 for (i = 0; i < my_processor_list_entries; i++) {
300 if (my_processor_list[i].nodeid == nodeid) {
304 for (i = 0; i < my_processor_list_entries; i++) {
305 if (my_processor_list[i].received == 0) {
309 if (barrier_reached) {
311 sync_process_enter ();
315 static void sync_deliver_fn (
318 unsigned int msg_len,
319 int endian_conversion_required)
321 struct qb_ipc_request_header *
header = (
struct qb_ipc_request_header *)msg;
323 switch (header->id) {
325 sync_barrier_handler (nodeid, msg);
328 sync_service_build_handler (nodeid, msg);
333 static void barrier_message_transmit (
void)
336 struct req_exec_barrier_message req_exec_barrier_message;
338 req_exec_barrier_message.header.size =
sizeof (
struct req_exec_barrier_message);
341 memcpy (&req_exec_barrier_message.ring_id, &my_ring_id,
344 iovec.iov_base = (
char *)&req_exec_barrier_message;
345 iovec.iov_len =
sizeof (req_exec_barrier_message);
351 static void service_build_message_transmit (
struct req_exec_service_build_message *service_build_message)
355 service_build_message->header.size =
sizeof (
struct req_exec_service_build_message);
358 memcpy (&service_build_message->ring_id, &my_ring_id,
361 iovec.iov_base = (
void *)service_build_message;
362 iovec.iov_len =
sizeof (
struct req_exec_service_build_message);
368 static void sync_barrier_enter (
void)
371 barrier_message_transmit ();
374 static void sync_process_call_init (
void)
377 size_t old_trans_list_entries = 0;
381 memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
382 sizeof (
unsigned int));
383 old_trans_list_entries = my_trans_list_entries;
385 my_trans_list_entries = 0;
386 for (o = 0; o < old_trans_list_entries; o++) {
387 for (m = 0; m < my_member_list_entries; m++) {
388 if (old_trans_list[o] == my_member_list[m]) {
389 my_trans_list[my_trans_list_entries] = my_member_list[m];
390 my_trans_list_entries++;
396 for (i = 0; i < my_service_list_entries; i++) {
398 my_service_list[i].
sync_init (my_trans_list,
399 my_trans_list_entries, my_member_list,
400 my_member_list_entries,
406 static void sync_process_enter (
void)
415 if (my_service_list_entries == 0) {
417 sync_synchronization_completed ();
420 for (i = 0; i < my_processor_list_entries; i++) {
429 static void sync_servicelist_build_enter (
430 const unsigned int *member_list,
431 size_t member_list_entries,
434 struct req_exec_service_build_message service_build;
440 for (i = 0; i < member_list_entries; i++) {
441 my_processor_list[i].
nodeid = member_list[i];
444 my_processor_list_entries = member_list_entries;
446 memcpy (my_member_list, member_list,
447 member_list_entries *
sizeof (
unsigned int));
448 my_member_list_entries = member_list_entries;
450 my_processing_idx = 0;
453 my_service_list_entries = 0;
463 my_service_list[my_service_list_entries].
state =
PROCESS;
464 my_service_list[my_service_list_entries].
service_id = i;
465 strcpy (my_service_list[my_service_list_entries].
name,
471 my_service_list_entries += 1;
474 for (i = 0; i < my_service_list_entries; i++) {
475 service_build.service_list[i] =
478 service_build.service_list_entries = my_service_list_entries;
480 service_build_message_transmit (&service_build);
483 sync_process_call_init ();
486 static int schedwrk_processor (
const void *context)
490 if (my_service_list[my_processing_idx].state ==
PROCESS) {
492 res = my_service_list[my_processing_idx].
sync_process ();
497 sync_barrier_enter();
506 const unsigned int *member_list,
507 size_t member_list_entries,
511 memcpy (&my_ring_id, ring_id,
sizeof (
struct memb_ring_id));
513 sync_servicelist_build_enter (member_list, member_list_entries,
518 const unsigned int *member_list,
519 size_t member_list_entries,
523 memcpy (my_trans_list, member_list, member_list_entries *
524 sizeof (
unsigned int));
525 my_trans_list_entries = member_list_entries;
534 my_service_list[my_processing_idx].
sync_abort ();
void sync_start(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
Totem Single Ring Protocol.
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
int sync_init(int(*sync_callbacks_retrieve)(int service_id, struct sync_callbacks *callbacks), void(*synchronization_completed)(void))
void(* sync_activate)(void)
struct message_header header
int totempg_groups_initialize(void **instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
#define log_printf(level, format, args...)
void(* sync_activate)(void)
void schedwrk_destroy(hdb_handle_t handle)
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
#define LOGSYS_LEVEL_ERROR
void sync_save_transitional(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
int totempg_groups_mcast_joined(void *instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
#define LOGSYS_LEVEL_DEBUG
enum sync_process_state state
#define MESSAGE_REQ_SYNC_SERVICE_BUILD
#define PROCESSOR_COUNT_MAX
#define MESSAGE_REQ_SYNC_BARRIER
#define SERVICES_COUNT_MAX
struct qb_ipc_request_header header __attribute__((aligned(8)))
int(* my_sync_callbacks_retrieve)(int service_id, struct sync_callbacks *callbacks)
int totempg_groups_join(void *instance, const struct totempg_group *groups, size_t group_cnt)
LOGSYS_DECLARE_SUBSYS("SYNC")
struct memb_ring_id ring_id
int(* sync_process)(void)
int schedwrk_create(hdb_handle_t *handle, int(schedwrk_fn)(const void *), const void *context)
int(* sync_process)(void)