corosync 3.1.9
totemsrp.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2003-2006 MontaVista Software, Inc.
3 * Copyright (c) 2006-2018 Red Hat, Inc.
4 *
5 * All rights reserved.
6 *
7 * Author: Steven Dake (sdake@redhat.com)
8 *
9 * This software licensed under BSD license, the text of which follows:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/*
37 * The first version of this code was based upon Yair Amir's PhD thesis:
38 * https://corosync.github.io/corosync/doc/Yair_phd.ps.gz (ch4,5).
39 *
40 * The current version of totemsrp implements the Totem protocol specified in:
41 * https://corosync.github.io/corosync/doc/tocssrp95.ps.gz
42 *
43 * The deviations from the above published protocols are:
44 * - token hold mode where token doesn't rotate on unused ring - reduces cpu
45 * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
46 */
47
48#include <config.h>
49
50#include <assert.h>
51#ifdef HAVE_ALLOCA_H
52#include <alloca.h>
53#endif
54#include <sys/mman.h>
55#include <sys/types.h>
56#include <sys/stat.h>
57#include <sys/socket.h>
58#include <netdb.h>
59#include <sys/un.h>
60#include <sys/ioctl.h>
61#include <sys/param.h>
62#include <netinet/in.h>
63#include <arpa/inet.h>
64#include <unistd.h>
65#include <fcntl.h>
66#include <stdlib.h>
67#include <stdio.h>
68#include <errno.h>
69#include <sched.h>
70#include <time.h>
71#include <sys/time.h>
72#include <sys/poll.h>
73#include <sys/uio.h>
74#include <limits.h>
75
76#include <qb/qblist.h>
77#include <qb/qbdefs.h>
78#include <qb/qbutil.h>
79#include <qb/qbloop.h>
80
81#include <corosync/swab.h>
82#include <corosync/sq.h>
83
84#define LOGSYS_UTILS_ONLY 1
85#include <corosync/logsys.h>
86
87#include "totemsrp.h"
88#include "totemnet.h"
89
90#include "icmap.h"
91#include "totemconfig.h"
92
93#include "cs_queue.h"
94
95#define LOCALHOST_IP inet_addr("127.0.0.1")
96#define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
97#define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
98#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
99#define MAXIOVS 5
100#define RETRANSMIT_ENTRIES_MAX 30
101#define TOKEN_SIZE_MAX 64000 /* bytes */
102#define LEAVE_DUMMY_NODEID 0
103
104/*
105 * SRP address.
106 */
107struct srp_addr {
108 unsigned int nodeid;
109};
110
111/*
112 * Rollover handling:
113 * SEQNO_START_MSG is the starting sequence number after a new configuration
114 * This should remain zero, unless testing overflow in which case
115 * 0x7ffff000 and 0xfffff000 are good starting values.
116 *
117 * SEQNO_START_TOKEN is the starting sequence number after a new configuration
118 * for a token. This should remain zero, unless testing overflow in which
119 * case 07fffff00 or 0xffffff00 are good starting values.
120 */
121#define SEQNO_START_MSG 0x0
122#define SEQNO_START_TOKEN 0x0
123
124/*
125 * These can be used ot test different rollover points
126 * #define SEQNO_START_MSG 0xfffffe00
127 * #define SEQNO_START_TOKEN 0xfffffe00
128 */
129
130/*
131 * These can be used to test the error recovery algorithms
132 * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
133 * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
134 * #define TEST_DROP_MCAST_PERCENTAGE 50
135 * #define TEST_RECOVERY_MSG_COUNT 300
136 */
137
138/*
139 * we compare incoming messages to determine if their endian is
140 * different - if so convert them
141 *
142 * do not change
143 */
144#define ENDIAN_LOCAL 0xff22
145
147 MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
148 MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
149 MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
150 MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
151 MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
152 MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
153};
154
159
160/*
161 * New membership algorithm local variables
162 */
165 int set;
166};
167
168
176
177
179 int mcast;
180 int token;
181};
182
192
193
194struct rtr_item {
196 unsigned int seq;
198
199
200struct orf_token {
202 unsigned int seq;
203 unsigned int token_seq;
204 unsigned int aru;
205 unsigned int aru_addr;
207 unsigned int backlog;
208 unsigned int fcc;
213
214
215struct memb_join {
218 unsigned int proc_list_entries;
220 unsigned long long ring_seq;
221 unsigned char end_of_memb_join[0];
222/*
223 * These parts of the data structure are dynamic:
224 * struct srp_addr proc_list[];
225 * struct srp_addr failed_list[];
226 */
228
229
235
236
241
242
249
250
253 unsigned int token_seq;
255 unsigned int retrans_flg;
258 unsigned char end_of_commit_token[0];
259/*
260 * These parts of the data structure are dynamic:
261 *
262 * struct srp_addr addr[PROCESSOR_COUNT_MAX];
263 * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
264 */
266
268 struct mcast *mcast;
269 unsigned int msg_len;
271
273 struct mcast *mcast;
274 unsigned int msg_len;
275};
276
283
286
288
289 /*
290 * Flow control mcasts and remcasts on last and current orf_token
291 */
293
295
297
299
301
303
305
307
309
311
313
315
317
319
321
323
325
327
329
331
333
335
337
339
341
343
345
347
348 unsigned int my_last_aru;
349
351
353
355
356 unsigned int my_install_seq;
357
359
361
363
365
367
368 /*
369 * Queues used to order, deliver, and recover messages
370 */
372
374
376
378
380
381 /*
382 * Received up to and including
383 */
384 unsigned int my_aru;
385
386 unsigned int my_high_delivered;
387
389
391
393
395
396 unsigned int my_token_seq;
397
398 /*
399 * Timers
400 */
402
404
406
408
410
412
414
416
418
420
421 /*
422 * Function and data used to log messages
423 */
425
427
429
431
433
435
437
439 int level,
440 int subsys,
441 const char *function,
442 const char *file,
443 int line,
444 const char *format, ...)__attribute__((format(printf, 6, 7)));;
445
447
448//TODO struct srp_addr next_memb;
449
451
453
455 unsigned int nodeid,
456 const void *msg,
457 unsigned int msg_len,
459
462 const unsigned int *member_list, size_t member_list_entries,
463 const unsigned int *left_list, size_t left_list_entries,
464 const unsigned int *joined_list, size_t joined_list_entries,
465 const struct memb_ring_id *ring_id);
466
468
471
474 unsigned int nodeid);
475
477 const struct memb_ring_id *memb_ring_id,
478 unsigned int nodeid);
479
481
483
484 unsigned long long token_ring_id_seq;
485
486 unsigned int last_released;
487
488 unsigned int set_aru;
489
491
493
495
496 unsigned int my_last_seq;
497
499
501
503
504 unsigned int use_heartbeat;
505
506 unsigned int my_trc;
507
508 unsigned int my_pbl;
509
510 unsigned int my_cbl;
511
513
515
517
519
521
523
525
527
531};
532
534 int count;
536 struct totemsrp_instance *instance,
537 const void *msg,
538 size_t msg_len,
540};
541
561
562const char* gather_state_from_desc [] = {
563 [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
565 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
566 [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
567 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
568 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
569 [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
570 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
571 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
572 [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
573 [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
574 [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
575 [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
576 [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
577 [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
578 [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
579};
580
581/*
582 * forward decls
583 */
584static int message_handler_orf_token (
585 struct totemsrp_instance *instance,
586 const void *msg,
587 size_t msg_len,
589
590static int message_handler_mcast (
591 struct totemsrp_instance *instance,
592 const void *msg,
593 size_t msg_len,
595
596static int message_handler_memb_merge_detect (
597 struct totemsrp_instance *instance,
598 const void *msg,
599 size_t msg_len,
601
602static int message_handler_memb_join (
603 struct totemsrp_instance *instance,
604 const void *msg,
605 size_t msg_len,
607
608static int message_handler_memb_commit_token (
609 struct totemsrp_instance *instance,
610 const void *msg,
611 size_t msg_len,
613
614static int message_handler_token_hold_cancel (
615 struct totemsrp_instance *instance,
616 const void *msg,
617 size_t msg_len,
619
620static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
621
622static void srp_addr_to_nodeid (
623 struct totemsrp_instance *instance,
624 unsigned int *nodeid_out,
625 struct srp_addr *srp_addr_in,
626 unsigned int entries);
627
628static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
629
630static void memb_leave_message_send (struct totemsrp_instance *instance);
631
632static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
633static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
634static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
635static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
637static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
638
639static void memb_ring_id_set (struct totemsrp_instance *instance,
640 const struct memb_ring_id *ring_id);
641static void target_set_completed (void *context);
642static void memb_state_commit_token_update (struct totemsrp_instance *instance);
643static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
644static int memb_state_commit_token_send (struct totemsrp_instance *instance);
645static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
646static void memb_state_commit_token_create (struct totemsrp_instance *instance);
647static int token_hold_cancel_send (struct totemsrp_instance *instance);
648static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
649static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
650static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
651static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
652static void memb_merge_detect_endian_convert (
653 const struct memb_merge_detect *in,
654 struct memb_merge_detect *out);
655static struct srp_addr srp_addr_endian_convert (struct srp_addr in);
656static void timer_function_orf_token_timeout (void *data);
657static void timer_function_orf_token_warning (void *data);
658static void timer_function_pause_timeout (void *data);
659static void timer_function_heartbeat_timeout (void *data);
660static void timer_function_token_retransmit_timeout (void *data);
661static void timer_function_token_hold_retransmit_timeout (void *data);
662static void timer_function_merge_detect_timeout (void *data);
663static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
664static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
665static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
666
667int main_deliver_fn (
668 void *context,
669 const void *msg,
670 unsigned int msg_len,
671 const struct sockaddr_storage *system_from);
672
674 void *context,
675 const struct totem_ip_address *iface_address,
676 unsigned int iface_no);
677
679 6,
680 {
681 message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
682 message_handler_mcast, /* MESSAGE_TYPE_MCAST */
683 message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
684 message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
685 message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
686 message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
687 }
688};
689
690#define log_printf(level, format, args...) \
691do { \
692 instance->totemsrp_log_printf ( \
693 level, instance->totemsrp_subsys_id, \
694 __FUNCTION__, __FILE__, __LINE__, \
695 format, ##args); \
696} while (0);
697#define LOGSYS_PERROR(err_num, level, fmt, args...) \
698do { \
699 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
700 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
701 instance->totemsrp_log_printf ( \
702 level, instance->totemsrp_subsys_id, \
703 __FUNCTION__, __FILE__, __LINE__, \
704 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
705 } while(0)
706
707static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
708{
711 }
712 else {
713 return "UNKNOWN";
714 }
715}
716
717static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
718{
719 memset (instance, 0, sizeof (struct totemsrp_instance));
720
722
724
725 instance->my_received_flg = 1;
726
727 instance->my_token_seq = SEQNO_START_TOKEN - 1;
728
730
731 instance->set_aru = -1;
732
733 instance->my_aru = SEQNO_START_MSG;
734
736
738
739 instance->orf_token_discard = 0;
740
741 instance->originated_orf_token = 0;
742
743 instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
744
745 instance->waiting_trans_ack = 1;
746}
747
748static int pause_flush (struct totemsrp_instance *instance)
749{
752 int res = 0;
753
756
757 if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
759 "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
760 /*
761 * -1 indicates an error from recvmsg
762 */
763 do {
765 } while (res == -1);
766 }
767 return (res);
768}
769
770static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
771{
772 struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
774
776
778 /* incr latest token the index */
779 if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
780 instance->stats.latest_token = 0;
781 else
782 instance->stats.latest_token++;
783
784 if (instance->stats.earliest_token == instance->stats.latest_token) {
785 /* we have filled up the array, start overwriting */
786 if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
787 instance->stats.earliest_token = 0;
788 else
789 instance->stats.earliest_token++;
790
791 instance->stats.token[instance->stats.earliest_token].rx = 0;
792 instance->stats.token[instance->stats.earliest_token].tx = 0;
793 instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
794 }
795
796 instance->stats.token[instance->stats.latest_token].rx = time_now;
797 instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
798 } else {
799 instance->stats.token[instance->stats.latest_token].tx = time_now;
800 }
801 return 0;
802}
803
804static void totempg_mtu_changed(void *context, int net_mtu)
805{
806 struct totemsrp_instance *instance = context;
807
808 instance->totem_config->net_mtu = net_mtu - 2 * sizeof (struct mcast);
809
811 "Net MTU changed to %d, new value is %d",
812 net_mtu, instance->totem_config->net_mtu);
813}
814
815/*
816 * Exported interfaces
817 */
819 qb_loop_t *poll_handle,
820 void **srp_context,
822 totempg_stats_t *stats,
823
824 void (*deliver_fn) (
825 unsigned int nodeid,
826 const void *msg,
827 unsigned int msg_len,
829
830 void (*confchg_fn) (
832 const unsigned int *member_list, size_t member_list_entries,
833 const unsigned int *left_list, size_t left_list_entries,
834 const unsigned int *joined_list, size_t joined_list_entries,
835 const struct memb_ring_id *ring_id),
837 int waiting_trans_ack))
838{
839 struct totemsrp_instance *instance;
840 int res;
841
842 instance = malloc (sizeof (struct totemsrp_instance));
843 if (instance == NULL) {
844 goto error_exit;
845 }
846
847 totemsrp_instance_initialize (instance);
848
851
852 stats->srp = &instance->stats;
853 instance->stats.latest_token = 0;
854 instance->stats.earliest_token = 0;
855
856 instance->totem_config = totem_config;
857
858 /*
859 * Configure logging
860 */
869
870 /*
871 * Configure totem store and load functions
872 */
875
876 /*
877 * Initialize local variables for totemsrp
878 */
880
881 /*
882 * Display totem configuration
883 */
885 "Token Timeout (%d ms) retransmit timeout (%d ms)",
890 "Token warning every %d ms (%d%% of Token Timeout)",
892 if (token_warning_ms < totem_config->token_retransmit_timeout)
894 "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
895 "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
897 } else {
899 "Token warnings disabled");
900 }
902 "token hold (%d ms) retransmits before loss (%d retrans)",
905 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
909
912 "downcheck (%d ms) fail to recv const (%d msgs)",
915 "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
916
918 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
920
922 "missed count const (%d messages)",
924
926 "send threads (%d threads)", totem_config->threads);
927
929 "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
931 "max_network_delay (%d ms)", totem_config->max_network_delay);
932
933
934 cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
935 sizeof (struct message_item), instance->threaded_mode_enabled);
936
937 sq_init (&instance->regular_sort_queue,
939
940 sq_init (&instance->recovery_sort_queue,
942
943 instance->totemsrp_poll_handle = poll_handle;
944
945 instance->totemsrp_deliver_fn = deliver_fn;
946
947 instance->totemsrp_confchg_fn = confchg_fn;
948 instance->use_heartbeat = 1;
949
950 timer_function_pause_timeout (instance);
951
954 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
955 instance->use_heartbeat = 0;
956 }
957
958 if (instance->use_heartbeat) {
959 instance->heartbeat_timeout
962
963 if (instance->heartbeat_timeout >= totem_config->token_timeout) {
965 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
966 instance->heartbeat_timeout,
969 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
971 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
972 instance->use_heartbeat = 0;
973 }
974 else {
976 "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
977 }
978 }
979
981 poll_handle,
982 &instance->totemnet_context,
984 stats->srp,
985 instance,
988 totempg_mtu_changed,
989 target_set_completed);
990 if (res == -1) {
991 goto error_exit;
992 }
993
994 instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid;
995
996 /*
997 * Must have net_mtu adjusted by totemnet_initialize first
998 */
999 cs_queue_init (&instance->new_message_queue,
1001 sizeof (struct message_item), instance->threaded_mode_enabled);
1002
1003 cs_queue_init (&instance->new_message_queue_trans,
1005 sizeof (struct message_item), instance->threaded_mode_enabled);
1006
1008 &instance->token_recv_event_handle,
1010 0,
1011 token_event_stats_collector,
1012 instance);
1014 &instance->token_sent_event_handle,
1016 0,
1017 token_event_stats_collector,
1018 instance);
1019 *srp_context = instance;
1020 return (0);
1021
1023 return (-1);
1024}
1025
1027 void *srp_context)
1028{
1029 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1030
1031 memb_leave_message_send (instance);
1033 cs_queue_free (&instance->new_message_queue);
1034 cs_queue_free (&instance->new_message_queue_trans);
1035 cs_queue_free (&instance->retrans_message_queue);
1036 sq_free (&instance->regular_sort_queue);
1037 sq_free (&instance->recovery_sort_queue);
1038 free (instance);
1039}
1040
1042 void *srp_context,
1043 unsigned int nodeid,
1045{
1046 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1047 int i;
1048
1050
1051 /* Fill in 'reachable' here as the lower level UDP[u] layers don't know */
1052 for (i = 0; i < instance->my_proc_list_entries; i++) {
1053 if (instance->my_proc_list[i].nodeid == nodeid) {
1054 node_status->reachable = 1;
1055 }
1056 }
1057
1059}
1060
1061
1062/*
1063 * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1064 * with interaces_size number of items. iface_count is final number of interfaces filled by this
1065 * function.
1066 *
1067 * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1068 * and if interface was not found, -1 is returned.
1069 */
1071 void *srp_context,
1072 unsigned int nodeid,
1073 unsigned int *interface_id,
1074 struct totem_ip_address *interfaces,
1075 unsigned int interfaces_size,
1076 char ***status,
1077 unsigned int *iface_count)
1078{
1079 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1080 struct totem_ip_address *iface_ptr = interfaces;
1081 int res = 0;
1082 int i,n;
1083 int num_ifs = 0;
1084
1085 memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size);
1087
1088 for (i=0; i<INTERFACE_MAX; i++) {
1089 for (n=0; n < instance->totem_config->interfaces[i].member_count; n++) {
1090 if (instance->totem_config->interfaces[i].configured &&
1092 memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address));
1094 iface_ptr++;
1095 if (++num_ifs > interfaces_size) {
1096 res = -2;
1097 break;
1098 }
1099 }
1100 }
1101 }
1102
1105 return (res);
1106}
1107
1109 void *srp_context,
1110 const char *cipher_type,
1111 const char *hash_type)
1112{
1113 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1114 int res;
1115
1117
1118 return (res);
1119}
1120
1121
1123 void *srp_context)
1124{
1125 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1126 unsigned int res;
1127
1128 res = instance->my_id.nodeid;
1129
1130 return (res);
1131}
1132
1134 void *srp_context)
1135{
1136 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1137 int res;
1138
1139 res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family;
1140
1141 return (res);
1142}
1143
1144
1145/*
1146 * Set operations for use by the membership algorithm
1147 */
1148static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1149{
1150 if (a->nodeid == b->nodeid) {
1151 return 1;
1152 }
1153 return 0;
1154}
1155
1156static void srp_addr_to_nodeid (
1157 struct totemsrp_instance *instance,
1158 unsigned int *nodeid_out,
1159 struct srp_addr *srp_addr_in,
1160 unsigned int entries)
1161{
1162 unsigned int i;
1163
1164 for (i = 0; i < entries; i++) {
1165 nodeid_out[i] = srp_addr_in[i].nodeid;
1166 }
1167}
1168
1169static struct srp_addr srp_addr_endian_convert (struct srp_addr in)
1170{
1171 struct srp_addr res;
1172
1173 res.nodeid = swab32 (in.nodeid);
1174
1175 return (res);
1176}
1177
1178static void memb_consensus_reset (struct totemsrp_instance *instance)
1179{
1180 instance->consensus_list_entries = 0;
1181}
1182
1183static void memb_set_subtract (
1184 struct srp_addr *out_list, int *out_list_entries,
1185 struct srp_addr *one_list, int one_list_entries,
1186 struct srp_addr *two_list, int two_list_entries)
1187{
1188 int found = 0;
1189 int i;
1190 int j;
1191
1192 *out_list_entries = 0;
1193
1194 for (i = 0; i < one_list_entries; i++) {
1195 for (j = 0; j < two_list_entries; j++) {
1196 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1197 found = 1;
1198 break;
1199 }
1200 }
1201 if (found == 0) {
1204 }
1205 found = 0;
1206 }
1207}
1208
1209/*
1210 * Set consensus for a specific processor
1211 */
1212static void memb_consensus_set (
1213 struct totemsrp_instance *instance,
1214 const struct srp_addr *addr)
1215{
1216 int found = 0;
1217 int i;
1218
1219 for (i = 0; i < instance->consensus_list_entries; i++) {
1220 if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1221 found = 1;
1222 break; /* found entry */
1223 }
1224 }
1225 instance->consensus_list[i].addr = *addr;
1226 instance->consensus_list[i].set = 1;
1227 if (found == 0) {
1228 instance->consensus_list_entries++;
1229 }
1230 return;
1231}
1232
1233/*
1234 * Is consensus set for a specific processor
1235 */
1236static int memb_consensus_isset (
1237 struct totemsrp_instance *instance,
1238 const struct srp_addr *addr)
1239{
1240 int i;
1241
1242 for (i = 0; i < instance->consensus_list_entries; i++) {
1243 if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1244 return (instance->consensus_list[i].set);
1245 }
1246 }
1247 return (0);
1248}
1249
1250/*
1251 * Is consensus agreed upon based upon consensus database
1252 */
1253static int memb_consensus_agreed (
1254 struct totemsrp_instance *instance)
1255{
1257 int token_memb_entries = 0;
1258 int agreed = 1;
1259 int i;
1260
1261 memb_set_subtract (token_memb, &token_memb_entries,
1262 instance->my_proc_list, instance->my_proc_list_entries,
1263 instance->my_failed_list, instance->my_failed_list_entries);
1264
1265 for (i = 0; i < token_memb_entries; i++) {
1266 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1267 agreed = 0;
1268 break;
1269 }
1270 }
1271
1272 if (agreed && instance->failed_to_recv == 1) {
1273 /*
1274 * Both nodes agreed on our failure. We don't care how many proc list items left because we
1275 * will create single ring anyway.
1276 */
1277
1278 return (agreed);
1279 }
1280
1282
1283 return (agreed);
1284}
1285
1286static void memb_consensus_notset (
1287 struct totemsrp_instance *instance,
1290 struct srp_addr *comparison_list,
1292{
1293 int i;
1294
1296
1297 for (i = 0; i < instance->my_proc_list_entries; i++) {
1298 if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1301 }
1302 }
1303}
1304
1305/*
1306 * Is set1 equal to set2 Entries can be in different orders
1307 */
1308static int memb_set_equal (
1309 struct srp_addr *set1, int set1_entries,
1310 struct srp_addr *set2, int set2_entries)
1311{
1312 int i;
1313 int j;
1314
1315 int found = 0;
1316
1317 if (set1_entries != set2_entries) {
1318 return (0);
1319 }
1320 for (i = 0; i < set2_entries; i++) {
1321 for (j = 0; j < set1_entries; j++) {
1322 if (srp_addr_equal (&set1[j], &set2[i])) {
1323 found = 1;
1324 break;
1325 }
1326 }
1327 if (found == 0) {
1328 return (0);
1329 }
1330 found = 0;
1331 }
1332 return (1);
1333}
1334
1335/*
1336 * Is subset fully contained in fullset
1337 */
1338static int memb_set_subset (
1339 const struct srp_addr *subset, int subset_entries,
1340 const struct srp_addr *fullset, int fullset_entries)
1341{
1342 int i;
1343 int j;
1344 int found = 0;
1345
1347 return (0);
1348 }
1349 for (i = 0; i < subset_entries; i++) {
1350 for (j = 0; j < fullset_entries; j++) {
1351 if (srp_addr_equal (&subset[i], &fullset[j])) {
1352 found = 1;
1353 }
1354 }
1355 if (found == 0) {
1356 return (0);
1357 }
1358 found = 0;
1359 }
1360 return (1);
1361}
1362/*
1363 * merge subset into fullset taking care not to add duplicates
1364 */
1365static void memb_set_merge (
1366 const struct srp_addr *subset, int subset_entries,
1367 struct srp_addr *fullset, int *fullset_entries)
1368{
1369 int found = 0;
1370 int i;
1371 int j;
1372
1373 for (i = 0; i < subset_entries; i++) {
1374 for (j = 0; j < *fullset_entries; j++) {
1375 if (srp_addr_equal (&fullset[j], &subset[i])) {
1376 found = 1;
1377 break;
1378 }
1379 }
1380 if (found == 0) {
1383 }
1384 found = 0;
1385 }
1386 return;
1387}
1388
1389static void memb_set_and_with_ring_id (
1390 struct srp_addr *set1,
1392 int set1_entries,
1393 struct srp_addr *set2,
1394 int set2_entries,
1395 struct memb_ring_id *old_ring_id,
1396 struct srp_addr *and,
1397 int *and_entries)
1398{
1399 int i;
1400 int j;
1401 int found = 0;
1402
1403 *and_entries = 0;
1404
1405 for (i = 0; i < set2_entries; i++) {
1406 for (j = 0; j < set1_entries; j++) {
1407 if (srp_addr_equal (&set1[j], &set2[i])) {
1408 if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1409 found = 1;
1410 }
1411 break;
1412 }
1413 }
1414 if (found) {
1415 and[*and_entries] = set1[j];
1416 *and_entries = *and_entries + 1;
1417 }
1418 found = 0;
1419 }
1420 return;
1421}
1422
1423static void memb_set_log(
1424 struct totemsrp_instance *instance,
1425 int level,
1426 const char *string,
1427 struct srp_addr *list,
1428 int list_entries)
1429{
1430 char int_buf[32];
1431 char list_str[512];
1432 int i;
1433
1434 memset(list_str, 0, sizeof(list_str));
1435
1436 for (i = 0; i < list_entries; i++) {
1437 if (i == 0) {
1438 snprintf(int_buf, sizeof(int_buf), CS_PRI_NODE_ID, list[i].nodeid);
1439 } else {
1440 snprintf(int_buf, sizeof(int_buf), "," CS_PRI_NODE_ID, list[i].nodeid);
1441 }
1442
1443 if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) {
1444 break ;
1445 }
1447 }
1448
1449 log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str);
1450}
1451
1452static void my_leave_memb_clear(
1453 struct totemsrp_instance *instance)
1454{
1455 memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1456 instance->my_leave_memb_entries = 0;
1457}
1458
1459static unsigned int my_leave_memb_match(
1460 struct totemsrp_instance *instance,
1461 unsigned int nodeid)
1462{
1463 int i;
1464 unsigned int ret = 0;
1465
1466 for (i = 0; i < instance->my_leave_memb_entries; i++){
1467 if (instance->my_leave_memb_list[i] == nodeid){
1468 ret = nodeid;
1469 break;
1470 }
1471 }
1472 return ret;
1473}
1474
1475static void my_leave_memb_set(
1476 struct totemsrp_instance *instance,
1477 unsigned int nodeid)
1478{
1479 int i, found = 0;
1480 for (i = 0; i < instance->my_leave_memb_entries; i++){
1481 if (instance->my_leave_memb_list[i] == nodeid){
1482 found = 1;
1483 break;
1484 }
1485 }
1486 if (found == 1) {
1487 return;
1488 }
1489 if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1490 instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1491 instance->my_leave_memb_entries++;
1492 } else {
1494 "Cannot set LEAVE nodeid=" CS_PRI_NODE_ID, nodeid);
1495 }
1496}
1497
1498
1499static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1500{
1501 assert (instance != NULL);
1502 return totemnet_buffer_alloc (instance->totemnet_context);
1503}
1504
1505static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1506{
1507 assert (instance != NULL);
1509}
1510
1511static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1512{
1513 int32_t res;
1514
1520 (void *)instance,
1521 timer_function_token_retransmit_timeout,
1522 &instance->timer_orf_token_retransmit_timeout);
1523 if (res != 0) {
1524 log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1525 }
1526
1527}
1528
1529static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1530{
1531 int32_t res;
1532
1533 if (instance->my_merge_detect_timeout_outstanding == 0) {
1537 (void *)instance,
1538 timer_function_merge_detect_timeout,
1539 &instance->timer_merge_detect_timeout);
1540 if (res != 0) {
1541 log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1542 }
1543
1545 }
1546}
1547
1548static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1549{
1552}
1553
1554/*
1555 * ring_state_* is used to save and restore the sort queue
1556 * state when a recovery operation fails (and enters gather)
1557 */
1558static void old_ring_state_save (struct totemsrp_instance *instance)
1559{
1560 if (instance->old_ring_state_saved == 0) {
1561 instance->old_ring_state_saved = 1;
1562 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1563 sizeof (struct memb_ring_id));
1564 instance->old_ring_state_aru = instance->my_aru;
1567 "Saving state aru %x high seq received %x",
1568 instance->my_aru, instance->my_high_seq_received);
1569 }
1570}
1571
1572static void old_ring_state_restore (struct totemsrp_instance *instance)
1573{
1574 instance->my_aru = instance->old_ring_state_aru;
1577 "Restoring instance->my_aru %x my high seq received %x",
1578 instance->my_aru, instance->my_high_seq_received);
1579}
1580
1581static void old_ring_state_reset (struct totemsrp_instance *instance)
1582{
1584 "Resetting old ring state");
1585 instance->old_ring_state_saved = 0;
1586}
1587
1588static void reset_pause_timeout (struct totemsrp_instance *instance)
1589{
1590 int32_t res;
1591
1596 (void *)instance,
1597 timer_function_pause_timeout,
1598 &instance->timer_pause_timeout);
1599 if (res != 0) {
1600 log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1601 }
1602}
1603
1604static void reset_token_warning (struct totemsrp_instance *instance) {
1605 int32_t res;
1606
1611 (void *)instance,
1612 timer_function_orf_token_warning,
1613 &instance->timer_orf_token_warning);
1614 if (res != 0) {
1615 log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res);
1616 }
1617}
1618
1619static void reset_token_timeout (struct totemsrp_instance *instance) {
1620 int32_t res;
1621
1626 (void *)instance,
1627 timer_function_orf_token_timeout,
1628 &instance->timer_orf_token_timeout);
1629 if (res != 0) {
1630 log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1631 }
1632
1633 if (instance->totem_config->token_warning)
1634 reset_token_warning(instance);
1635}
1636
1637static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1638 int32_t res;
1639
1644 (void *)instance,
1645 timer_function_heartbeat_timeout,
1646 &instance->timer_heartbeat_timeout);
1647 if (res != 0) {
1648 log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1649 }
1650}
1651
1652
1653static void cancel_token_warning (struct totemsrp_instance *instance) {
1655}
1656
1657static void cancel_token_timeout (struct totemsrp_instance *instance) {
1659
1660 if (instance->totem_config->token_warning)
1661 cancel_token_warning(instance);
1662}
1663
1664static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1666}
1667
1668static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1669{
1671}
1672
1673static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1674{
1675 int32_t res;
1676
1680 (void *)instance,
1681 timer_function_token_hold_retransmit_timeout,
1682 &instance->timer_orf_token_hold_retransmit_timeout);
1683 if (res != 0) {
1684 log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1685 }
1686}
1687
1688static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1689{
1692}
1693
1694static void memb_state_consensus_timeout_expired (
1695 struct totemsrp_instance *instance)
1696{
1699
1700 instance->stats.consensus_timeouts++;
1701 if (memb_consensus_agreed (instance)) {
1702 memb_consensus_reset (instance);
1703
1704 memb_consensus_set (instance, &instance->my_id);
1705
1706 reset_token_timeout (instance); // REVIEWED
1707 } else {
1708 memb_consensus_notset (
1709 instance,
1712 instance->my_proc_list,
1713 instance->my_proc_list_entries);
1714
1716 instance->my_failed_list, &instance->my_failed_list_entries);
1717 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1718 }
1719}
1720
1721static void memb_join_message_send (struct totemsrp_instance *instance);
1722
1723static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1724
1725/*
1726 * Timers used for various states of the membership algorithm
1727 */
1728static void timer_function_pause_timeout (void *data)
1729{
1730 struct totemsrp_instance *instance = data;
1731
1733 reset_pause_timeout (instance);
1734}
1735
1736static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1737{
1738 old_ring_state_restore (instance);
1739 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1740 instance->stats.recovery_token_lost++;
1741}
1742
1743static void timer_function_orf_token_warning (void *data)
1744{
1745 struct totemsrp_instance *instance = data;
1747
1748 /* need to protect against the case where token_warning is set to 0 dynamically */
1749 if (instance->totem_config->token_warning) {
1751 instance->stats.token[instance->stats.latest_token].rx;
1753 "Token has not been received in %"PRIu64" ms", tv_diff);
1754 reset_token_warning(instance);
1755 } else {
1756 cancel_token_warning(instance);
1757 }
1758}
1759
1760static void timer_function_orf_token_timeout (void *data)
1761{
1762 struct totemsrp_instance *instance = data;
1763
1764 switch (instance->memb_state) {
1767 "The token was lost in the OPERATIONAL state.");
1769 "A processor failed, forming new configuration:"
1770 " token timed out (%ums), waiting %ums for consensus.",
1771 instance->totem_config->token_timeout,
1772 instance->totem_config->consensus_timeout);
1774 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1775 instance->stats.operational_token_lost++;
1776 break;
1777
1778 case MEMB_STATE_GATHER:
1780 "The consensus timeout expired (%ums).",
1781 instance->totem_config->consensus_timeout);
1782 memb_state_consensus_timeout_expired (instance);
1783 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1784 instance->stats.gather_token_lost++;
1785 break;
1786
1787 case MEMB_STATE_COMMIT:
1789 "The token was lost in the COMMIT state.");
1790 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1791 instance->stats.commit_token_lost++;
1792 break;
1793
1796 "The token was lost in the RECOVERY state.");
1797 memb_recovery_state_token_loss (instance);
1798 instance->orf_token_discard = 1;
1799 break;
1800 }
1801}
1802
1803static void timer_function_heartbeat_timeout (void *data)
1804{
1805 struct totemsrp_instance *instance = data;
1807 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1808 timer_function_orf_token_timeout(data);
1809}
1810
1811static void memb_timer_function_state_gather (void *data)
1812{
1813 struct totemsrp_instance *instance = data;
1814 int32_t res;
1815
1816 switch (instance->memb_state) {
1819 assert (0); /* this should never happen */
1820 break;
1821 case MEMB_STATE_GATHER:
1822 case MEMB_STATE_COMMIT:
1823 memb_join_message_send (instance);
1824
1825 /*
1826 * Restart the join timeout
1827 `*/
1829
1833 (void *)instance,
1834 memb_timer_function_state_gather,
1835 &instance->memb_timer_state_gather_join_timeout);
1836
1837 if (res != 0) {
1838 log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1839 }
1840 break;
1841 }
1842}
1843
1844static void memb_timer_function_gather_consensus_timeout (void *data)
1845{
1846 struct totemsrp_instance *instance = data;
1847 memb_state_consensus_timeout_expired (instance);
1848}
1849
1850static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1851{
1852 unsigned int i;
1855 unsigned int range = 0;
1856 int res;
1857 void *ptr;
1858 struct mcast *mcast;
1859
1861 "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1862
1863 range = instance->my_aru - SEQNO_START_MSG;
1864 /*
1865 * Move messages from recovery to regular sort queue
1866 */
1867// todo should i be initialized to 0 or 1 ?
1868 for (i = 1; i <= range; i++) {
1869 res = sq_item_get (&instance->recovery_sort_queue,
1870 i + SEQNO_START_MSG, &ptr);
1871 if (res != 0) {
1872 continue;
1873 }
1875
1876 /*
1877 * Convert recovery message into regular message
1878 */
1881 /*
1882 * Message is a recovery message encapsulated
1883 * in a new ring message
1884 */
1885 regular_message_item.mcast =
1886 (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1887 regular_message_item.msg_len =
1888 recovery_message_item->msg_len - sizeof (struct mcast);
1890 } else {
1891 /*
1892 * TODO this case shouldn't happen
1893 */
1894 continue;
1895 }
1896
1898 "comparing if ring id is for this processors old ring seqno " CS_PRI_RING_ID_SEQ,
1899 (uint64_t)mcast->seq);
1900
1901 /*
1902 * Only add this message to the regular sort
1903 * queue if it was originated with the same ring
1904 * id as the previous ring
1905 */
1906 if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1907 sizeof (struct memb_ring_id)) == 0) {
1908
1909 res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1910 if (res == 0) {
1911 sq_item_add (&instance->regular_sort_queue,
1913 if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1915 }
1916 }
1917 } else {
1919 "-not adding msg with seq no " CS_PRI_RING_ID_SEQ, (uint64_t)mcast->seq);
1920 }
1921 }
1922}
1923
1924/*
1925 * Change states in the state machine of the membership algorithm
1926 */
1927static void memb_state_operational_enter (struct totemsrp_instance *instance)
1928{
1930 int joined_list_entries = 0;
1931 unsigned int aru_save;
1935 unsigned int left_list[PROCESSOR_COUNT_MAX];
1936 unsigned int i;
1937 unsigned int res;
1938 char left_node_msg[1024];
1939 char joined_node_msg[1024];
1940 char failed_node_msg[1024];
1941
1942 instance->originated_orf_token = 0;
1943
1944 memb_consensus_reset (instance);
1945
1946 old_ring_state_reset (instance);
1947
1948 deliver_messages_from_recovery_to_regular (instance);
1949
1951 "Delivering to app %x to %x",
1952 instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1953
1954 aru_save = instance->my_aru;
1955 instance->my_aru = instance->old_ring_state_aru;
1956
1957 messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1958
1959 /*
1960 * Calculate joined and left list
1961 */
1962 memb_set_subtract (instance->my_left_memb_list,
1963 &instance->my_left_memb_entries,
1964 instance->my_memb_list, instance->my_memb_entries,
1965 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1966
1967 memb_set_subtract (joined_list, &joined_list_entries,
1968 instance->my_new_memb_list, instance->my_new_memb_entries,
1969 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1970
1971 /*
1972 * Install new membership
1973 */
1974 instance->my_memb_entries = instance->my_new_memb_entries;
1975 memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1976 sizeof (struct srp_addr) * instance->my_memb_entries);
1977 instance->last_released = 0;
1978 instance->my_set_retrans_flg = 0;
1979
1980 /*
1981 * Deliver transitional configuration to application
1982 */
1983 srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list,
1984 instance->my_left_memb_entries);
1985 srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1986 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1990 0, 0, &instance->my_ring_id);
1991 /*
1992 * Switch new totemsrp messages queue. Messages sent from now on are stored
1993 * in different queue so synchronization messages are delivered first. Totempg
1994 * buffers will be switched later.
1995 */
1996 instance->waiting_trans_ack = 1;
1997
1998// TODO we need to filter to ensure we only deliver those
1999// messages which are part of instance->my_deliver_memb
2000 messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
2001
2002 /*
2003 * Switch totempg buffers. This used to be right after
2004 * instance->waiting_trans_ack = 1;
2005 * line. This was causing problem, because there may be not yet
2006 * processed parts of messages in totempg buffers.
2007 * So when buffers were switched and recovered messages
2008 * got delivered it was not possible to assemble them.
2009 */
2011
2012 instance->my_aru = aru_save;
2013
2014 /*
2015 * Deliver regular configuration to application
2016 */
2017 srp_addr_to_nodeid (instance, new_memb_list_totemip,
2018 instance->my_new_memb_list, instance->my_new_memb_entries);
2019 srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
2023 0, 0,
2025
2026 /*
2027 * The recovery sort queue now becomes the regular
2028 * sort queue. It is necessary to copy the state
2029 * into the regular sort queue.
2030 */
2031 sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
2032 instance->my_last_aru = SEQNO_START_MSG;
2033
2034 /* When making my_proc_list smaller, ensure that the
2035 * now non-used entries are zero-ed out. There are some suspect
2036 * assert's that assume that there is always 2 entries in the list.
2037 * These fail when my_proc_list is reduced to 1 entry (and the
2038 * valid [0] entry is the same as the 'unused' [1] entry).
2039 */
2040 memset(instance->my_proc_list, 0,
2041 sizeof (struct srp_addr) * instance->my_proc_list_entries);
2042
2043 instance->my_proc_list_entries = instance->my_new_memb_entries;
2044 memcpy (instance->my_proc_list, instance->my_new_memb_list,
2045 sizeof (struct srp_addr) * instance->my_memb_entries);
2046
2047 instance->my_failed_list_entries = 0;
2048 /*
2049 * TODO Not exactly to spec
2050 *
2051 * At the entry to this function all messages without a gap are
2052 * deliered.
2053 *
2054 * This code throw away messages from the last gap in the sort queue
2055 * to my_high_seq_received
2056 *
2057 * What should really happen is we should deliver all messages up to
2058 * a gap, then delier the transitional configuration, then deliver
2059 * the messages between the first gap and my_high_seq_received, then
2060 * deliver a regular configuration, then deliver the regular
2061 * configuration
2062 *
2063 * Unfortunately totempg doesn't appear to like this operating mode
2064 * which needs more inspection
2065 */
2066 i = instance->my_high_seq_received + 1;
2067 do {
2068 void *ptr;
2069
2070 i -= 1;
2071 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2072 if (i == 0) {
2073 break;
2074 }
2075 } while (res);
2076
2077 instance->my_high_delivered = i;
2078
2079 for (i = 0; i <= instance->my_high_delivered; i++) {
2080 void *ptr;
2081
2082 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2083 if (res == 0) {
2085
2087 free (regular_message->mcast);
2088 }
2089 }
2090 sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2091 instance->last_released = instance->my_high_delivered;
2092
2093 if (joined_list_entries) {
2094 int sptr = 0;
2095 sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2096 for (i=0; i< joined_list_entries; i++) {
2098 }
2099 }
2100 else {
2101 joined_node_msg[0] = '\0';
2102 }
2103
2104 if (instance->my_left_memb_entries) {
2105 int sptr = 0;
2106 int sptr2 = 0;
2107 sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2108 for (i=0; i< instance->my_left_memb_entries; i++) {
2110 }
2111 for (i=0; i< instance->my_left_memb_entries; i++) {
2112 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2113 if (sptr2 == 0) {
2114 sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2115 }
2117 }
2118 }
2119 if (sptr2 == 0) {
2120 failed_node_msg[0] = '\0';
2121 }
2122 }
2123 else {
2124 left_node_msg[0] = '\0';
2125 failed_node_msg[0] = '\0';
2126 }
2127
2128 my_leave_memb_clear(instance);
2129
2131 "entering OPERATIONAL state.");
2133 "A new membership (" CS_PRI_RING_ID ") was formed. Members%s%s",
2134 instance->my_ring_id.rep,
2135 (uint64_t)instance->my_ring_id.seq,
2138
2139 if (strlen(failed_node_msg)) {
2141 "Failed to receive the leave message.%s",
2143 }
2144
2146
2147 instance->stats.operational_entered++;
2148 instance->stats.continuous_gather = 0;
2149
2150 instance->my_received_flg = 1;
2151
2152 reset_pause_timeout (instance);
2153
2154 /*
2155 * Save ring id information from this configuration to determine
2156 * which processors are transitioning from old regular configuration
2157 * in to new regular configuration on the next configuration change
2158 */
2159 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2160 sizeof (struct memb_ring_id));
2161
2162 return;
2163}
2164
2165static void memb_state_gather_enter (
2166 struct totemsrp_instance *instance,
2168{
2169 int32_t res;
2170
2171 instance->orf_token_discard = 1;
2172
2173 instance->originated_orf_token = 0;
2174
2175 memb_set_merge (
2176 &instance->my_id, 1,
2177 instance->my_proc_list, &instance->my_proc_list_entries);
2178
2179 memb_join_message_send (instance);
2180
2181 /*
2182 * Restart the join timeout
2183 */
2185
2189 (void *)instance,
2190 memb_timer_function_state_gather,
2191 &instance->memb_timer_state_gather_join_timeout);
2192 if (res != 0) {
2193 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2194 }
2195
2196 /*
2197 * Restart the consensus timeout
2198 */
2201
2205 (void *)instance,
2206 memb_timer_function_gather_consensus_timeout,
2207 &instance->memb_timer_state_gather_consensus_timeout);
2208 if (res != 0) {
2209 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2210 }
2211
2212 /*
2213 * Cancel the token loss and token retransmission timeouts
2214 */
2215 cancel_token_retransmit_timeout (instance); // REVIEWED
2216 cancel_token_timeout (instance); // REVIEWED
2217 cancel_merge_detect_timeout (instance);
2218
2219 memb_consensus_reset (instance);
2220
2221 memb_consensus_set (instance, &instance->my_id);
2222
2224 "entering GATHER state from %d(%s).",
2225 gather_from, gsfrom_to_msg(gather_from));
2226
2227 instance->memb_state = MEMB_STATE_GATHER;
2228 instance->stats.gather_entered++;
2229
2231 /*
2232 * State 3 means gather, so we are continuously gathering.
2233 */
2234 instance->stats.continuous_gather++;
2235 }
2236
2237 return;
2238}
2239
2240static void timer_function_token_retransmit_timeout (void *data);
2241
2242static void target_set_completed (
2243 void *context)
2244{
2245 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2246
2247 memb_state_commit_token_send (instance);
2248
2249}
2250
2251static void memb_state_commit_enter (
2252 struct totemsrp_instance *instance)
2253{
2254 old_ring_state_save (instance);
2255
2256 memb_state_commit_token_update (instance);
2257
2258 memb_state_commit_token_target_set (instance);
2259
2261
2263
2265
2267
2268 memb_ring_id_set (instance, &instance->commit_token->ring_id);
2269
2270 instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid);
2271
2272 instance->token_ring_id_seq = instance->my_ring_id.seq;
2273
2275 "entering COMMIT state.");
2276
2277 instance->memb_state = MEMB_STATE_COMMIT;
2278 reset_token_retransmit_timeout (instance); // REVIEWED
2279 reset_token_timeout (instance); // REVIEWED
2280
2281 instance->stats.commit_entered++;
2282 instance->stats.continuous_gather = 0;
2283
2284 /*
2285 * reset all flow control variables since we are starting a new ring
2286 */
2287 instance->my_trc = 0;
2288 instance->my_pbl = 0;
2289 instance->my_cbl = 0;
2290 /*
2291 * commit token sent after callback that token target has been set
2292 */
2293}
2294
2295static void memb_state_recovery_enter (
2296 struct totemsrp_instance *instance,
2298{
2299 int i;
2300 int local_received_flg = 1;
2301 unsigned int low_ring_aru;
2302 unsigned int range = 0;
2303 unsigned int messages_originated = 0;
2304 const struct srp_addr *addr;
2307
2308 addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2309 memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2310
2312 "entering RECOVERY state.");
2313
2314 instance->orf_token_discard = 0;
2315
2316 instance->my_high_ring_delivered = 0;
2317
2318 sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2319 cs_queue_reinit (&instance->retrans_message_queue);
2320
2322
2323 memb_state_commit_token_send_recovery (instance, commit_token);
2324
2325 instance->my_token_seq = SEQNO_START_TOKEN - 1;
2326
2327 /*
2328 * Build regular configuration
2329 */
2331 instance->totemnet_context,
2332 commit_token->addr_entries);
2333
2334 /*
2335 * Build transitional configuration
2336 */
2337 for (i = 0; i < instance->my_new_memb_entries; i++) {
2340 sizeof (struct memb_ring_id));
2341 }
2342 memb_set_and_with_ring_id (
2343 instance->my_new_memb_list,
2345 instance->my_new_memb_entries,
2346 instance->my_memb_list,
2347 instance->my_memb_entries,
2348 &instance->my_old_ring_id,
2349 instance->my_trans_memb_list,
2350 &instance->my_trans_memb_entries);
2351
2352 for (i = 0; i < instance->my_trans_memb_entries; i++) {
2354 "TRANS [%d] member " CS_PRI_NODE_ID ":", i, instance->my_trans_memb_list[i].nodeid);
2355 }
2356 for (i = 0; i < instance->my_new_memb_entries; i++) {
2358 "position [%d] member " CS_PRI_NODE_ID ":", i, addr[i].nodeid);
2360 "previous ringid (" CS_PRI_RING_ID ")",
2361 memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq);
2362
2364 "aru %x high delivered %x received flag %d",
2365 memb_list[i].aru,
2366 memb_list[i].high_delivered,
2367 memb_list[i].received_flg);
2368
2369 // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2370 }
2371 /*
2372 * Determine if any received flag is false
2373 */
2374 for (i = 0; i < commit_token->addr_entries; i++) {
2375 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2376 instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2377
2378 memb_list[i].received_flg == 0) {
2379 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2380 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2381 sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2383 break;
2384 }
2385 }
2386 if (local_received_flg == 1) {
2387 goto no_originate;
2388 } /* Else originate messages if we should */
2389
2390 /*
2391 * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2392 */
2393 for (i = 0; i < commit_token->addr_entries; i++) {
2394 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2395 instance->my_deliver_memb_list,
2396 instance->my_deliver_memb_entries) &&
2397
2398 memcmp (&instance->my_old_ring_id,
2399 &memb_list[i].ring_id,
2400 sizeof (struct memb_ring_id)) == 0) {
2401
2402 if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2403
2404 low_ring_aru = memb_list[i].aru;
2405 }
2406 if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2407 instance->my_high_ring_delivered = memb_list[i].high_delivered;
2408 }
2409 }
2410 }
2411
2412 /*
2413 * Copy all old ring messages to instance->retrans_message_queue
2414 */
2416 if (range == 0) {
2417 /*
2418 * No messages to copy
2419 */
2420 goto no_originate;
2421 }
2423
2425 "copying all old ring messages from %x-%x.",
2427
2428 for (i = 1; i <= range; i++) {
2431 void *ptr;
2432 int res;
2433
2434 res = sq_item_get (&instance->regular_sort_queue,
2435 low_ring_aru + i, &ptr);
2436 if (res != 0) {
2437 continue;
2438 }
2441 memset (&message_item, 0, sizeof (struct message_item));
2442 // TODO LEAK
2443 message_item.mcast = totemsrp_buffer_alloc (instance);
2445 memset(message_item.mcast, 0, sizeof (struct mcast));
2449 message_item.mcast->system_from = instance->my_id;
2451
2455 sizeof (struct memb_ring_id));
2456 message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2457 memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2460 cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2461 }
2463 "Originated %d messages in RECOVERY.", messages_originated);
2464 goto originated;
2465
2468 "Did not need to originate any messages in recovery.");
2469
2471 instance->my_aru = SEQNO_START_MSG;
2472 instance->my_aru_count = 0;
2473 instance->my_seq_unchanged = 0;
2475 instance->my_install_seq = SEQNO_START_MSG;
2476 instance->last_released = SEQNO_START_MSG;
2477
2478 reset_token_timeout (instance); // REVIEWED
2479 reset_token_retransmit_timeout (instance); // REVIEWED
2480
2481 instance->memb_state = MEMB_STATE_RECOVERY;
2482 instance->stats.recovery_entered++;
2483 instance->stats.continuous_gather = 0;
2484
2485 return;
2486}
2487
2489{
2490 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2491
2492 token_hold_cancel_send (instance);
2493
2494 return;
2495}
2496
2498 void *srp_context,
2499 struct iovec *iovec,
2500 unsigned int iov_len,
2501 int guarantee)
2502{
2503 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2504 int i;
2506 char *addr;
2507 unsigned int addr_idx;
2508 struct cs_queue *queue_use;
2509
2510 if (instance->waiting_trans_ack) {
2512 } else {
2513 queue_use = &instance->new_message_queue;
2514 }
2515
2516 if (cs_queue_is_full (queue_use)) {
2517 log_printf (instance->totemsrp_log_level_debug, "queue full");
2518 return (-1);
2519 }
2520
2521 memset (&message_item, 0, sizeof (struct message_item));
2522
2523 /*
2524 * Allocate pending item
2525 */
2526 message_item.mcast = totemsrp_buffer_alloc (instance);
2527 if (message_item.mcast == 0) {
2528 goto error_mcast;
2529 }
2530
2531 /*
2532 * Set mcast header
2533 */
2534 memset(message_item.mcast, 0, sizeof (struct mcast));
2539
2542
2544 message_item.mcast->system_from = instance->my_id;
2545
2546 addr = (char *)message_item.mcast;
2547 addr_idx = sizeof (struct mcast);
2548 for (i = 0; i < iov_len; i++) {
2550 addr_idx += iovec[i].iov_len;
2551 }
2552
2554
2555 log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2556 instance->stats.mcast_tx++;
2557 cs_queue_item_add (queue_use, &message_item);
2558
2559 return (0);
2560
2562 return (-1);
2563}
2564
2565/*
2566 * Determine if there is room to queue a new message
2567 */
2569{
2570 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2571 int avail;
2572 struct cs_queue *queue_use;
2573
2574 if (instance->waiting_trans_ack) {
2576 } else {
2577 queue_use = &instance->new_message_queue;
2578 }
2579 cs_queue_avail (queue_use, &avail);
2580
2581 return (avail);
2582}
2583
2584/*
2585 * ORF Token Management
2586 */
2587/*
2588 * Recast message to mcast group if it is available
2589 */
2590static int orf_token_remcast (
2591 struct totemsrp_instance *instance,
2592 int seq)
2593{
2595 int res;
2596 void *ptr;
2597
2598 struct sq *sort_queue;
2599
2600 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2601 sort_queue = &instance->recovery_sort_queue;
2602 } else {
2603 sort_queue = &instance->regular_sort_queue;
2604 }
2605
2606 res = sq_in_range (sort_queue, seq);
2607 if (res == 0) {
2608 log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2609 return (-1);
2610 }
2611
2612 /*
2613 * Get RTR item at seq, if not available, return
2614 */
2615 res = sq_item_get (sort_queue, seq, &ptr);
2616 if (res != 0) {
2617 return -1;
2618 }
2619
2621
2623 instance->totemnet_context,
2626
2627 return (0);
2628}
2629
2630
2631/*
2632 * Free all freeable messages from ring
2633 */
2634static void messages_free (
2635 struct totemsrp_instance *instance,
2636 unsigned int token_aru)
2637{
2639 unsigned int i;
2640 int res;
2641 int log_release = 0;
2642 unsigned int release_to;
2643 unsigned int range = 0;
2644
2646 if (sq_lt_compare (instance->my_last_aru, release_to)) {
2647 release_to = instance->my_last_aru;
2648 }
2649 if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2650 release_to = instance->my_high_delivered;
2651 }
2652
2653 /*
2654 * Ensure we dont try release before an already released point
2655 */
2656 if (sq_lt_compare (release_to, instance->last_released)) {
2657 return;
2658 }
2659
2660 range = release_to - instance->last_released;
2662
2663 /*
2664 * Release retransmit list items if group aru indicates they are transmitted
2665 */
2666 for (i = 1; i <= range; i++) {
2667 void *ptr;
2668
2669 res = sq_item_get (&instance->regular_sort_queue,
2670 instance->last_released + i, &ptr);
2671 if (res == 0) {
2673 totemsrp_buffer_release (instance, regular_message->mcast);
2674 }
2675 sq_items_release (&instance->regular_sort_queue,
2676 instance->last_released + i);
2677
2678 log_release = 1;
2679 }
2680 instance->last_released += range;
2681
2682 if (log_release) {
2684 "releasing messages up to and including %x", release_to);
2685 }
2686}
2687
2688static void update_aru (
2689 struct totemsrp_instance *instance)
2690{
2691 unsigned int i;
2692 int res;
2693 struct sq *sort_queue;
2694 unsigned int range;
2695 unsigned int my_aru_saved = 0;
2696
2697 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2698 sort_queue = &instance->recovery_sort_queue;
2699 } else {
2700 sort_queue = &instance->regular_sort_queue;
2701 }
2702
2703 range = instance->my_high_seq_received - instance->my_aru;
2704
2705 my_aru_saved = instance->my_aru;
2706 for (i = 1; i <= range; i++) {
2707
2708 void *ptr;
2709
2710 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2711 /*
2712 * If hole, stop updating aru
2713 */
2714 if (res != 0) {
2715 break;
2716 }
2717 }
2718 instance->my_aru += i - 1;
2719}
2720
2721/*
2722 * Multicasts pending messages onto the ring (requires orf_token possession)
2723 */
2724static int orf_token_mcast (
2725 struct totemsrp_instance *instance,
2726 struct orf_token *token,
2728{
2729 struct message_item *message_item = 0;
2730 struct cs_queue *mcast_queue;
2731 struct sq *sort_queue;
2733 struct mcast *mcast;
2734 unsigned int fcc_mcast_current;
2735
2736 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2738 sort_queue = &instance->recovery_sort_queue;
2739 reset_token_retransmit_timeout (instance); // REVIEWED
2740 } else {
2741 if (instance->waiting_trans_ack) {
2743 } else {
2744 mcast_queue = &instance->new_message_queue;
2745 }
2746
2747 sort_queue = &instance->regular_sort_queue;
2748 }
2749
2751 if (cs_queue_is_empty (mcast_queue)) {
2752 break;
2753 }
2754 message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2755
2756 message_item->mcast->seq = ++token->seq;
2757 message_item->mcast->this_seqno = instance->global_seqno++;
2758
2759 /*
2760 * Build IO vector
2761 */
2762 memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2765
2767
2768 memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2769
2770 /*
2771 * Add message to retransmit queue
2772 */
2773 sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2774
2776 instance->totemnet_context,
2779
2780 /*
2781 * Delete item from pending queue
2782 */
2783 cs_queue_item_remove (mcast_queue);
2784
2785 /*
2786 * If messages mcasted, deliver any new messages to totempg
2787 */
2788 instance->my_high_seq_received = token->seq;
2789 }
2790
2791 update_aru (instance);
2792
2793 /*
2794 * Return 1 if more messages are available for single node clusters
2795 */
2796 return (fcc_mcast_current);
2797}
2798
2799/*
2800 * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2801 * Modify's orf_token's rtr to include retransmits required by this process
2802 */
2803static int orf_token_rtr (
2804 struct totemsrp_instance *instance,
2805 struct orf_token *orf_token,
2806 unsigned int *fcc_allowed)
2807{
2808 unsigned int res;
2809 unsigned int i, j;
2810 unsigned int found;
2811 struct sq *sort_queue;
2812 struct rtr_item *rtr_list;
2813 unsigned int range = 0;
2814 char retransmit_msg[1024];
2815 char value[64];
2816
2817 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2818 sort_queue = &instance->recovery_sort_queue;
2819 } else {
2820 sort_queue = &instance->regular_sort_queue;
2821 }
2822
2824
2825 strcpy (retransmit_msg, "Retransmit List: ");
2828 "Retransmit List %d", orf_token->rtr_list_entries);
2829 for (i = 0; i < orf_token->rtr_list_entries; i++) {
2830 sprintf (value, "%x ", rtr_list[i].seq);
2832 }
2833 strcat (retransmit_msg, "");
2835 "%s", retransmit_msg);
2836 }
2837
2838 /*
2839 * Retransmit messages on orf_token's RTR list from RTR queue
2840 */
2841 for (instance->fcc_remcast_current = 0, i = 0;
2843
2844 /*
2845 * If this retransmit request isn't from this configuration,
2846 * try next rtr entry
2847 */
2848 if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2849 sizeof (struct memb_ring_id)) != 0) {
2850
2851 i += 1;
2852 continue;
2853 }
2854
2855 res = orf_token_remcast (instance, rtr_list[i].seq);
2856 if (res == 0) {
2857 /*
2858 * Multicasted message, so no need to copy to new retransmit list
2859 */
2862 memmove (&rtr_list[i], &rtr_list[i + 1],
2863 sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2864
2865 instance->stats.mcast_retx++;
2866 instance->fcc_remcast_current++;
2867 } else {
2868 i += 1;
2869 }
2870 }
2872
2873 /*
2874 * Add messages to retransmit to RTR list
2875 * but only retry if there is room in the retransmit list
2876 */
2877
2878 range = orf_token->seq - instance->my_aru;
2880
2882 (i <= range); i++) {
2883
2884 /*
2885 * Ensure message is within the sort queue range
2886 */
2887 res = sq_in_range (sort_queue, instance->my_aru + i);
2888 if (res == 0) {
2889 break;
2890 }
2891
2892 /*
2893 * Find if a message is missing from this processor
2894 */
2895 res = sq_item_inuse (sort_queue, instance->my_aru + i);
2896 if (res == 0) {
2897 /*
2898 * Determine how many times we have missed receiving
2899 * this sequence number. sq_item_miss_count increments
2900 * a counter for the sequence number. The miss count
2901 * will be returned and compared. This allows time for
2902 * delayed multicast messages to be received before
2903 * declaring the message is missing and requesting a
2904 * retransmit.
2905 */
2906 res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2908 continue;
2909 }
2910
2911 /*
2912 * Determine if missing message is already in retransmit list
2913 */
2914 found = 0;
2915 for (j = 0; j < orf_token->rtr_list_entries; j++) {
2916 if (instance->my_aru + i == rtr_list[j].seq) {
2917 found = 1;
2918 }
2919 }
2920 if (found == 0) {
2921 /*
2922 * Missing message not found in current retransmit list so add it
2923 */
2925 &instance->my_ring_id, sizeof (struct memb_ring_id));
2928 }
2929 }
2930 }
2931 return (instance->fcc_remcast_current);
2932}
2933
2934static void token_retransmit (struct totemsrp_instance *instance)
2935{
2936 instance->stats.orf_token_tx++;
2938 instance->orf_token_retransmit,
2939 instance->orf_token_retransmit_size);
2940}
2941
2942/*
2943 * Retransmit the regular token if no mcast or token has
2944 * been received in retransmit token period retransmit
2945 * the token to the next processor
2946 */
2947static void timer_function_token_retransmit_timeout (void *data)
2948{
2949 struct totemsrp_instance *instance = data;
2950
2951 switch (instance->memb_state) {
2952 case MEMB_STATE_GATHER:
2953 break;
2954 case MEMB_STATE_COMMIT:
2957 token_retransmit (instance);
2958 reset_token_retransmit_timeout (instance); // REVIEWED
2959 break;
2960 }
2961}
2962
2963static void timer_function_token_hold_retransmit_timeout (void *data)
2964{
2965 struct totemsrp_instance *instance = data;
2966
2967 switch (instance->memb_state) {
2968 case MEMB_STATE_GATHER:
2969 break;
2970 case MEMB_STATE_COMMIT:
2971 break;
2974 token_retransmit (instance);
2975 break;
2976 }
2977}
2978
2979static void timer_function_merge_detect_timeout(void *data)
2980{
2981 struct totemsrp_instance *instance = data;
2982
2984
2985 switch (instance->memb_state) {
2987 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
2988 memb_merge_detect_transmit (instance);
2989 }
2990 break;
2991 case MEMB_STATE_GATHER:
2992 case MEMB_STATE_COMMIT:
2994 break;
2995 }
2996}
2997
2998/*
2999 * Send orf_token to next member (requires orf_token)
3000 */
3001static int token_send (
3002 struct totemsrp_instance *instance,
3003 struct orf_token *orf_token,
3004 int forward_token)
3005{
3006 int res = 0;
3007 unsigned int orf_token_size;
3008
3009 orf_token_size = sizeof (struct orf_token) +
3010 (orf_token->rtr_list_entries * sizeof (struct rtr_item));
3011
3012 orf_token->header.nodeid = instance->my_id.nodeid;
3016
3017 if (forward_token == 0) {
3018 return (0);
3019 }
3020
3021 instance->stats.orf_token_tx++;
3023 orf_token,
3025
3026 return (res);
3027}
3028
3029static int token_hold_cancel_send (struct totemsrp_instance *instance)
3030{
3032
3033 /*
3034 * Only cancel if the token is currently held
3035 */
3036 if (instance->my_token_held == 0) {
3037 return (0);
3038 }
3039 instance->my_token_held = 0;
3040
3041 /*
3042 * Build message
3043 */
3050 sizeof (struct memb_ring_id));
3052
3053 instance->stats.token_hold_cancel_tx++;
3054
3056 sizeof (struct token_hold_cancel));
3057
3058 return (0);
3059}
3060
3061static int orf_token_send_initial (struct totemsrp_instance *instance)
3062{
3063 struct orf_token orf_token;
3064 int res;
3065
3070 orf_token.header.nodeid = instance->my_id.nodeid;
3075 instance->my_set_retrans_flg = 1;
3076
3077 if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3079 instance->my_set_retrans_flg = 0;
3080 } else {
3082 instance->my_set_retrans_flg = 1;
3083 }
3084
3085 orf_token.aru = 0;
3087 orf_token.aru_addr = instance->my_id.nodeid;
3088
3089 memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3090 orf_token.fcc = 0;
3091 orf_token.backlog = 0;
3092
3094
3095 res = token_send (instance, &orf_token, 1);
3096
3097 return (res);
3098}
3099
3100static void memb_state_commit_token_update (
3101 struct totemsrp_instance *instance)
3102{
3103 struct srp_addr *addr;
3105 unsigned int high_aru;
3106 unsigned int i;
3107
3108 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3110
3111 memcpy (instance->my_new_memb_list, addr,
3112 sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3113
3114 instance->my_new_memb_entries = instance->commit_token->addr_entries;
3115
3116 memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3117 &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3118
3119 memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3120 /*
3121 * TODO high delivered is really instance->my_aru, but with safe this
3122 * could change?
3123 */
3124 instance->my_received_flg =
3125 (instance->my_aru == instance->my_high_seq_received);
3126
3127 memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3128
3129 memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3130 /*
3131 * find high aru up to current memb_index for all matching ring ids
3132 * if any ring id matching memb_index has aru less then high aru set
3133 * received flag for that entry to false
3134 */
3135 high_aru = memb_list[instance->commit_token->memb_index].aru;
3136 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3137 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3138 &memb_list[i].ring_id,
3139 sizeof (struct memb_ring_id)) == 0) {
3140
3141 if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3142 high_aru = memb_list[i].aru;
3143 }
3144 }
3145 }
3146
3147 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3148 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3149 &memb_list[i].ring_id,
3150 sizeof (struct memb_ring_id)) == 0) {
3151
3152 if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3153 memb_list[i].received_flg = 0;
3154 if (i == instance->commit_token->memb_index) {
3155 instance->my_received_flg = 0;
3156 }
3157 }
3158 }
3159 }
3160
3161 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3162 instance->commit_token->memb_index += 1;
3163 assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3164 assert (instance->commit_token->header.nodeid);
3165}
3166
3167static void memb_state_commit_token_target_set (
3168 struct totemsrp_instance *instance)
3169{
3170 struct srp_addr *addr;
3171
3172 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3173
3174 /* Totemnet just looks at the node id */
3176 instance->totemnet_context,
3177 addr[instance->commit_token->memb_index %
3178 instance->commit_token->addr_entries].nodeid);
3179}
3180
3181static int memb_state_commit_token_send_recovery (
3182 struct totemsrp_instance *instance,
3183 struct memb_commit_token *commit_token)
3184{
3185 unsigned int commit_token_size;
3186
3187 commit_token->token_seq++;
3188 commit_token->header.nodeid = instance->my_id.nodeid;
3189 commit_token_size = sizeof (struct memb_commit_token) +
3190 ((sizeof (struct srp_addr) +
3192 /*
3193 * Make a copy for retransmission if necessary
3194 */
3195 memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3197
3198 instance->stats.memb_commit_token_tx++;
3199
3201 commit_token,
3203
3204 /*
3205 * Request retransmission of the commit token in case it is lost
3206 */
3207 reset_token_retransmit_timeout (instance);
3208 return (0);
3209}
3210
3211static int memb_state_commit_token_send (
3212 struct totemsrp_instance *instance)
3213{
3214 unsigned int commit_token_size;
3215
3216 instance->commit_token->token_seq++;
3217 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3218 commit_token_size = sizeof (struct memb_commit_token) +
3219 ((sizeof (struct srp_addr) +
3221 /*
3222 * Make a copy for retransmission if necessary
3223 */
3226
3227 instance->stats.memb_commit_token_tx++;
3228
3230 instance->commit_token,
3232
3233 /*
3234 * Request retransmission of the commit token in case it is lost
3235 */
3236 reset_token_retransmit_timeout (instance);
3237 return (0);
3238}
3239
3240
3241static int memb_lowest_in_config (struct totemsrp_instance *instance)
3242{
3244 int token_memb_entries = 0;
3245 int i;
3246 unsigned int lowest_nodeid;
3247
3248 memb_set_subtract (token_memb, &token_memb_entries,
3249 instance->my_proc_list, instance->my_proc_list_entries,
3250 instance->my_failed_list, instance->my_failed_list_entries);
3251
3252 /*
3253 * find representative by searching for smallest identifier
3254 */
3256
3257 lowest_nodeid = token_memb[0].nodeid;
3258 for (i = 1; i < token_memb_entries; i++) {
3260 lowest_nodeid = token_memb[i].nodeid;
3261 }
3262 }
3263 return (lowest_nodeid == instance->my_id.nodeid);
3264}
3265
3266static int srp_addr_compare (const void *a, const void *b)
3267{
3268 const struct srp_addr *srp_a = (const struct srp_addr *)a;
3269 const struct srp_addr *srp_b = (const struct srp_addr *)b;
3270
3271 if (srp_a->nodeid < srp_b->nodeid) {
3272 return -1;
3273 } else if (srp_a->nodeid > srp_b->nodeid) {
3274 return 1;
3275 } else {
3276 return 0;
3277 }
3278}
3279
3280static void memb_state_commit_token_create (
3281 struct totemsrp_instance *instance)
3282{
3284 struct srp_addr *addr;
3286 int token_memb_entries = 0;
3287
3289 "Creating commit token because I am the rep.");
3290
3291 memb_set_subtract (token_memb, &token_memb_entries,
3292 instance->my_proc_list, instance->my_proc_list_entries,
3293 instance->my_failed_list, instance->my_failed_list_entries);
3294
3295 memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3299 instance->commit_token->header.encapsulated = 0;
3300 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3301 assert (instance->commit_token->header.nodeid);
3302
3303 instance->commit_token->ring_id.rep = instance->my_id.nodeid;
3304 instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3305
3306 /*
3307 * This qsort is necessary to ensure the commit token traverses
3308 * the ring in the proper order
3309 */
3310 qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3311 srp_addr_compare);
3312
3313 instance->commit_token->memb_index = 0;
3315
3316 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3318
3320 token_memb_entries * sizeof (struct srp_addr));
3321 memset (memb_list, 0,
3323}
3324
3325static void memb_join_message_send (struct totemsrp_instance *instance)
3326{
3327 char memb_join_data[40000];
3328 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3329 char *addr;
3330 unsigned int addr_idx;
3331 size_t msg_len;
3332
3337 memb_join->header.nodeid = instance->my_id.nodeid;
3339
3340 msg_len = sizeof(struct memb_join) +
3341 ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3342
3343 if (msg_len > sizeof(memb_join_data)) {
3345 "memb_join_message too long. Ignoring message.");
3346
3347 return ;
3348 }
3349
3350 memb_join->ring_seq = instance->my_ring_id.seq;
3353 memb_join->system_from = instance->my_id;
3354
3355 /*
3356 * This mess adds the joined and failed processor lists into the join
3357 * message
3358 */
3359 addr = (char *)memb_join;
3360 addr_idx = sizeof (struct memb_join);
3361 memcpy (&addr[addr_idx],
3362 instance->my_proc_list,
3363 instance->my_proc_list_entries *
3364 sizeof (struct srp_addr));
3365 addr_idx +=
3366 instance->my_proc_list_entries *
3367 sizeof (struct srp_addr);
3368 memcpy (&addr[addr_idx],
3369 instance->my_failed_list,
3370 instance->my_failed_list_entries *
3371 sizeof (struct srp_addr));
3372 addr_idx +=
3373 instance->my_failed_list_entries *
3374 sizeof (struct srp_addr);
3375
3376 if (instance->totem_config->send_join_timeout) {
3377 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3378 }
3379
3380 instance->stats.memb_join_tx++;
3381
3383 instance->totemnet_context,
3384 memb_join,
3385 addr_idx);
3386}
3387
3388static void memb_leave_message_send (struct totemsrp_instance *instance)
3389{
3390 char memb_join_data[40000];
3391 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3392 char *addr;
3393 unsigned int addr_idx;
3396 size_t msg_len;
3397
3399 "sending join/leave message");
3400
3401 /*
3402 * add us to the failed list, and remove us from
3403 * the members list
3404 */
3405 memb_set_merge(
3406 &instance->my_id, 1,
3407 instance->my_failed_list, &instance->my_failed_list_entries);
3408
3409 memb_set_subtract (active_memb, &active_memb_entries,
3410 instance->my_proc_list, instance->my_proc_list_entries,
3411 &instance->my_id, 1);
3412
3413 msg_len = sizeof(struct memb_join) +
3414 ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3415
3416 if (msg_len > sizeof(memb_join_data)) {
3418 "memb_leave message too long. Ignoring message.");
3419
3420 return ;
3421 }
3422
3428
3429 memb_join->ring_seq = instance->my_ring_id.seq;
3432 memb_join->system_from = instance->my_id;
3433
3434 // TODO: CC Maybe use the actual join send routine.
3435 /*
3436 * This mess adds the joined and failed processor lists into the join
3437 * message
3438 */
3439 addr = (char *)memb_join;
3440 addr_idx = sizeof (struct memb_join);
3441 memcpy (&addr[addr_idx],
3444 sizeof (struct srp_addr));
3445 addr_idx +=
3447 sizeof (struct srp_addr);
3448 memcpy (&addr[addr_idx],
3449 instance->my_failed_list,
3450 instance->my_failed_list_entries *
3451 sizeof (struct srp_addr));
3452 addr_idx +=
3453 instance->my_failed_list_entries *
3454 sizeof (struct srp_addr);
3455
3456
3457 if (instance->totem_config->send_join_timeout) {
3458 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3459 }
3460 instance->stats.memb_join_tx++;
3461
3463 instance->totemnet_context,
3464 memb_join,
3465 addr_idx);
3466}
3467
3468static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3469{
3471
3479 sizeof (struct memb_ring_id));
3481
3482 instance->stats.memb_merge_detect_tx++;
3485 sizeof (struct memb_merge_detect));
3486}
3487
3488static void memb_ring_id_set (
3489 struct totemsrp_instance *instance,
3490 const struct memb_ring_id *ring_id)
3491{
3492
3493 memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3494}
3495
3497 void *srp_context,
3498 void **handle_out,
3500 int delete,
3501 int (*callback_fn) (enum totem_callback_token_type type, const void *),
3502 const void *data)
3503{
3504 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3505 struct token_callback_instance *callback_handle;
3506
3507 token_hold_cancel_send (instance);
3508
3509 callback_handle = malloc (sizeof (struct token_callback_instance));
3510 if (callback_handle == 0) {
3511 return (-1);
3512 }
3513 *handle_out = (void *)callback_handle;
3514 qb_list_init (&callback_handle->list);
3515 callback_handle->callback_fn = callback_fn;
3516 callback_handle->data = (void *) data;
3517 callback_handle->callback_type = type;
3518 callback_handle->delete = delete;
3519 switch (type) {
3521 qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3522 break;
3524 qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3525 break;
3526 }
3527
3528 return (0);
3529}
3530
3532{
3533 struct token_callback_instance *h;
3534
3535 if (*handle_out) {
3537 qb_list_del (&h->list);
3538 free (h);
3539 h = NULL;
3540 *handle_out = 0;
3541 }
3542}
3543
3544static void token_callbacks_execute (
3545 struct totemsrp_instance *instance,
3547{
3548 struct qb_list_head *list, *tmp_iter;
3549 struct qb_list_head *callback_listhead = 0;
3551 int res;
3552 int del;
3553
3554 switch (type) {
3557 break;
3560 break;
3561 default:
3562 assert (0);
3563 }
3564
3568 if (del == 1) {
3569 qb_list_del (list);
3570 }
3571
3575 /*
3576 * This callback failed to execute, try it again on the next token
3577 */
3578 if (res == -1 && del == 1) {
3580 } else if (del) {
3582 }
3583 }
3584}
3585
3586/*
3587 * Flow control functions
3588 */
3589static unsigned int backlog_get (struct totemsrp_instance *instance)
3590{
3591 unsigned int backlog = 0;
3592 struct cs_queue *queue_use = NULL;
3593
3594 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3595 if (instance->waiting_trans_ack) {
3597 } else {
3598 queue_use = &instance->new_message_queue;
3599 }
3600 } else
3601 if (instance->memb_state == MEMB_STATE_RECOVERY) {
3602 queue_use = &instance->retrans_message_queue;
3603 }
3604
3605 if (queue_use != NULL) {
3606 backlog = cs_queue_used (queue_use);
3607 }
3608
3609 instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3610 return (backlog);
3611}
3612
3613static int fcc_calculate (
3614 struct totemsrp_instance *instance,
3615 struct orf_token *token)
3616{
3617 unsigned int transmits_allowed;
3618 unsigned int backlog_calc;
3619
3621
3622 if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3623 transmits_allowed = instance->totem_config->window_size - token->fcc;
3624 }
3625
3626 instance->my_cbl = backlog_get (instance);
3627
3628 /*
3629 * Only do backlog calculation if there is a backlog otherwise
3630 * we would result in div by zero
3631 */
3632 if (token->backlog + instance->my_cbl - instance->my_pbl) {
3633 backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3634 (token->backlog + instance->my_cbl - instance->my_pbl);
3635 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3636 transmits_allowed = backlog_calc;
3637 }
3638 }
3639
3640 return (transmits_allowed);
3641}
3642
3643/*
3644 * don't overflow the RTR sort queue
3645 */
3646static void fcc_rtr_limit (
3647 struct totemsrp_instance *instance,
3648 struct orf_token *token,
3649 unsigned int *transmits_allowed)
3650{
3653 assert (check >= 0);
3654 if (sq_lt_compare (instance->last_released +
3656 instance->totem_config->window_size,
3657
3658 token->seq)) {
3659
3660 *transmits_allowed = 0;
3661 }
3662}
3663
3664static void fcc_token_update (
3665 struct totemsrp_instance *instance,
3666 struct orf_token *token,
3667 unsigned int msgs_transmitted)
3668{
3669 token->fcc += msgs_transmitted - instance->my_trc;
3670 token->backlog += instance->my_cbl - instance->my_pbl;
3671 instance->my_trc = msgs_transmitted;
3672 instance->my_pbl = instance->my_cbl;
3673}
3674
3675/*
3676 * Sanity checkers
3677 */
3678static int check_orf_token_sanity(
3679 const struct totemsrp_instance *instance,
3680 const void *msg,
3681 size_t msg_len,
3683{
3684 int rtr_entries;
3685 const struct orf_token *token = (const struct orf_token *)msg;
3686 size_t required_len;
3687
3688 if (msg_len < sizeof(struct orf_token)) {
3690 "Received orf_token message is too short... ignoring.");
3691
3692 return (-1);
3693 }
3694
3697 } else {
3699 }
3700
3701 required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3702 if (msg_len < required_len) {
3704 "Received orf_token message is too short... ignoring.");
3705
3706 return (-1);
3707 }
3708
3709 return (0);
3710}
3711
3712static int check_mcast_sanity(
3713 struct totemsrp_instance *instance,
3714 const void *msg,
3715 size_t msg_len,
3717{
3718
3719 if (msg_len < sizeof(struct mcast)) {
3721 "Received mcast message is too short... ignoring.");
3722
3723 return (-1);
3724 }
3725
3726 return (0);
3727}
3728
3729static int check_memb_merge_detect_sanity(
3730 struct totemsrp_instance *instance,
3731 const void *msg,
3732 size_t msg_len,
3734{
3735
3736 if (msg_len < sizeof(struct memb_merge_detect)) {
3738 "Received memb_merge_detect message is too short... ignoring.");
3739
3740 return (-1);
3741 }
3742
3743 return (0);
3744}
3745
3746static int check_memb_join_sanity(
3747 struct totemsrp_instance *instance,
3748 const void *msg,
3749 size_t msg_len,
3751{
3752 const struct memb_join *mj_msg = (const struct memb_join *)msg;
3753 unsigned int proc_list_entries;
3754 unsigned int failed_list_entries;
3755 size_t required_len;
3756
3757 if (msg_len < sizeof(struct memb_join)) {
3759 "Received memb_join message is too short... ignoring.");
3760
3761 return (-1);
3762 }
3763
3764 proc_list_entries = mj_msg->proc_list_entries;
3765 failed_list_entries = mj_msg->failed_list_entries;
3766
3770 }
3771
3772 required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3773 if (msg_len < required_len) {
3775 "Received memb_join message is too short... ignoring.");
3776
3777 return (-1);
3778 }
3779
3780 return (0);
3781}
3782
3783static int check_memb_commit_token_sanity(
3784 struct totemsrp_instance *instance,
3785 const void *msg,
3786 size_t msg_len,
3788{
3789 const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3790 unsigned int addr_entries;
3791 size_t required_len;
3792
3793 if (msg_len < sizeof(struct memb_commit_token)) {
3795 "Received memb_commit_token message is too short... ignoring.");
3796
3797 return (0);
3798 }
3799
3800 addr_entries= mct_msg->addr_entries;
3803 }
3804
3805 required_len = sizeof(struct memb_commit_token) +
3807 if (msg_len < required_len) {
3809 "Received memb_commit_token message is too short... ignoring.");
3810
3811 return (-1);
3812 }
3813
3814 return (0);
3815}
3816
3817static int check_token_hold_cancel_sanity(
3818 struct totemsrp_instance *instance,
3819 const void *msg,
3820 size_t msg_len,
3822{
3823
3824 if (msg_len < sizeof(struct token_hold_cancel)) {
3826 "Received token_hold_cancel message is too short... ignoring.");
3827
3828 return (-1);
3829 }
3830
3831 return (0);
3832}
3833
3834/*
3835 * Message Handlers
3836 */
3837
3838#ifdef GIVEINFO
3839uint64_t tv_old;
3840#endif
3841/*
3842 * message handler called when TOKEN message type received
3843 */
3844static int message_handler_orf_token (
3845 struct totemsrp_instance *instance,
3846 const void *msg,
3847 size_t msg_len,
3849{
3850 char token_storage[1500];
3851 char token_convert[1500];
3852 struct orf_token *token = NULL;
3853 int forward_token;
3854 unsigned int transmits_allowed;
3855 unsigned int mcasted_retransmit;
3856 unsigned int mcasted_regular;
3857 unsigned int last_aru;
3858
3859#ifdef GIVEINFO
3862
3864 tv_diff = tv_current - tv_old;
3865 tv_old = tv_current;
3866
3868 "Time since last token %0.4f ms", tv_diff / (float)QB_TIME_NS_IN_MSEC);
3869#endif
3870
3871 if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3872 return (0);
3873 }
3874
3875 if (instance->orf_token_discard) {
3876 return (0);
3877 }
3878#ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3880 return (0);
3881 }
3882#endif
3883
3885 orf_token_endian_convert ((struct orf_token *)msg,
3886 (struct orf_token *)token_convert);
3887 msg = (struct orf_token *)token_convert;
3888 }
3889
3890 /*
3891 * Make copy of token and retransmit list in case we have
3892 * to flush incoming messages from the kernel queue
3893 */
3894 token = (struct orf_token *)token_storage;
3895 memcpy (token, msg, sizeof (struct orf_token));
3896 memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3897 sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3898
3899
3900 /*
3901 * Handle merge detection timeout
3902 */
3903 if (token->seq == instance->my_last_seq) {
3904 start_merge_detect_timeout (instance);
3905 instance->my_seq_unchanged += 1;
3906 } else {
3907 cancel_merge_detect_timeout (instance);
3908 cancel_token_hold_retransmit_timeout (instance);
3909 instance->my_seq_unchanged = 0;
3910 }
3911
3912 instance->my_last_seq = token->seq;
3913
3914#ifdef TEST_RECOVERY_MSG_COUNT
3915 if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3916 return (0);
3917 }
3918#endif
3919 instance->flushing = 1;
3921 instance->flushing = 0;
3922
3923 /*
3924 * Determine if we should hold (in reality drop) the token
3925 */
3926 instance->my_token_held = 0;
3927 if (instance->my_ring_id.rep == instance->my_id.nodeid &&
3928 instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3929 instance->my_token_held = 1;
3930 } else {
3931 if (instance->my_ring_id.rep != instance->my_id.nodeid &&
3932 instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3933 instance->my_token_held = 1;
3934 }
3935 }
3936
3937 /*
3938 * Hold onto token when there is no activity on ring and
3939 * this processor is the ring rep
3940 */
3941 forward_token = 1;
3942 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
3943 if (instance->my_token_held) {
3944 forward_token = 0;
3945 }
3946 }
3947
3948 switch (instance->memb_state) {
3949 case MEMB_STATE_COMMIT:
3950 /* Discard token */
3951 break;
3952
3954 messages_free (instance, token->aru);
3955 /*
3956 * Do NOT add break, this case should also execute code in gather case.
3957 */
3958
3959 case MEMB_STATE_GATHER:
3960 /*
3961 * DO NOT add break, we use different free mechanism in recovery state
3962 */
3963
3965 /*
3966 * Discard tokens from another configuration
3967 */
3968 if (memcmp (&token->ring_id, &instance->my_ring_id,
3969 sizeof (struct memb_ring_id)) != 0) {
3970
3971 if ((forward_token)
3972 && instance->use_heartbeat) {
3973 reset_heartbeat_timeout(instance);
3974 }
3975 else {
3976 cancel_heartbeat_timeout(instance);
3977 }
3978
3979 return (0); /* discard token */
3980 }
3981
3982 /*
3983 * Discard retransmitted tokens
3984 */
3985 if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
3986 return (0); /* discard token */
3987 }
3988
3989 /*
3990 * Token is valid so trigger callbacks
3991 */
3992 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
3993
3994 last_aru = instance->my_last_aru;
3995 instance->my_last_aru = token->aru;
3996
3997 transmits_allowed = fcc_calculate (instance, token);
3998 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3999
4001 instance->my_token_held == 1 &&
4002 (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
4003 instance->my_token_held = 0;
4004 forward_token = 1;
4005 }
4006
4007 fcc_rtr_limit (instance, token, &transmits_allowed);
4008 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4009/*
4010if (mcasted_regular) {
4011printf ("mcasted regular %d\n", mcasted_regular);
4012printf ("token seq %d\n", token->seq);
4013}
4014*/
4015 fcc_token_update (instance, token, mcasted_retransmit +
4017
4018 if (sq_lt_compare (instance->my_aru, token->aru) ||
4019 instance->my_id.nodeid == token->aru_addr ||
4020 token->aru_addr == 0) {
4021
4022 token->aru = instance->my_aru;
4023 if (token->aru == token->seq) {
4024 token->aru_addr = 0;
4025 } else {
4026 token->aru_addr = instance->my_id.nodeid;
4027 }
4028 }
4029 if (token->aru == last_aru && token->aru_addr != 0) {
4030 instance->my_aru_count += 1;
4031 } else {
4032 instance->my_aru_count = 0;
4033 }
4034
4035 /*
4036 * We really don't follow specification there. In specification, OTHER nodes
4037 * detect failure of one node (based on aru_count) and my_id IS NEVER added
4038 * to failed list (so node never mark itself as failed)
4039 */
4040 if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
4041 token->aru_addr == instance->my_id.nodeid) {
4042
4044 "FAILED TO RECEIVE");
4045
4046 instance->failed_to_recv = 1;
4047
4048 memb_set_merge (&instance->my_id, 1,
4049 instance->my_failed_list,
4050 &instance->my_failed_list_entries);
4051
4052 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4053 } else {
4054 instance->my_token_seq = token->token_seq;
4055 token->token_seq += 1;
4056
4057 if (instance->memb_state == MEMB_STATE_RECOVERY) {
4058 /*
4059 * instance->my_aru == instance->my_high_seq_received means this processor
4060 * has recovered all messages it can recover
4061 * (ie: its retrans queue is empty)
4062 */
4063 if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4064
4065 if (token->retrans_flg == 0) {
4066 token->retrans_flg = 1;
4067 instance->my_set_retrans_flg = 1;
4068 }
4069 } else
4070 if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4071 token->retrans_flg = 0;
4072 instance->my_set_retrans_flg = 0;
4073 }
4075 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4076 token->retrans_flg, instance->my_set_retrans_flg,
4077 cs_queue_is_empty (&instance->retrans_message_queue),
4078 instance->my_retrans_flg_count, token->aru);
4079 if (token->retrans_flg == 0) {
4080 instance->my_retrans_flg_count += 1;
4081 } else {
4082 instance->my_retrans_flg_count = 0;
4083 }
4084 if (instance->my_retrans_flg_count == 2) {
4085 instance->my_install_seq = token->seq;
4086 }
4088 "install seq %x aru %x high seq received %x",
4089 instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4090 if (instance->my_retrans_flg_count >= 2 &&
4091 instance->my_received_flg == 0 &&
4092 sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4093 instance->my_received_flg = 1;
4094 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4095 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4096 sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4097 }
4098 if (instance->my_retrans_flg_count >= 3 &&
4099 sq_lte_compare (instance->my_install_seq, token->aru)) {
4100 instance->my_rotation_counter += 1;
4101 } else {
4102 instance->my_rotation_counter = 0;
4103 }
4104 if (instance->my_rotation_counter == 2) {
4106 "retrans flag count %x token aru %x install seq %x aru %x %x",
4107 instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4108 instance->my_aru, token->seq);
4109
4110 memb_state_operational_enter (instance);
4111 instance->my_rotation_counter = 0;
4112 instance->my_retrans_flg_count = 0;
4113 }
4114 }
4115
4117 token_send (instance, token, forward_token);
4118
4119#ifdef GIVEINFO
4121 tv_diff = tv_current - tv_old;
4122 tv_old = tv_current;
4124 "I held %0.4f ms",
4125 tv_diff / (float)QB_TIME_NS_IN_MSEC);
4126#endif
4127 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4128 messages_deliver_to_app (instance, 0,
4129 instance->my_high_seq_received);
4130 }
4131
4132 /*
4133 * Deliver messages after token has been transmitted
4134 * to improve performance
4135 */
4136 reset_token_timeout (instance); // REVIEWED
4137 reset_token_retransmit_timeout (instance); // REVIEWED
4138 if (instance->my_id.nodeid == instance->my_ring_id.rep &&
4139 instance->my_token_held == 1) {
4140
4141 start_token_hold_retransmit_timeout (instance);
4142 }
4143
4144 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4145 }
4146 break;
4147 }
4148
4149 if ((forward_token)
4150 && instance->use_heartbeat) {
4151 reset_heartbeat_timeout(instance);
4152 }
4153 else {
4154 cancel_heartbeat_timeout(instance);
4155 }
4156
4157 return (0);
4158}
4159
4160static void messages_deliver_to_app (
4161 struct totemsrp_instance *instance,
4162 int skip,
4163 unsigned int end_point)
4164{
4166 unsigned int i;
4167 int res;
4168 struct mcast *mcast_in;
4169 struct mcast mcast_header;
4170 unsigned int range = 0;
4172 unsigned int my_high_delivered_stored = 0;
4174
4175 range = end_point - instance->my_high_delivered;
4176
4177 if (range) {
4179 "Delivering %x to %x", instance->my_high_delivered,
4180 end_point);
4181 }
4184
4185 /*
4186 * Deliver messages in order from rtr queue to pending delivery queue
4187 */
4188 for (i = 1; i <= range; i++) {
4189
4190 void *ptr = 0;
4191
4192 /*
4193 * If out of range of sort queue, stop assembly
4194 */
4195 res = sq_in_range (&instance->regular_sort_queue,
4197 if (res == 0) {
4198 break;
4199 }
4200
4201 res = sq_item_get (&instance->regular_sort_queue,
4203 /*
4204 * If hole, stop assembly
4205 */
4206 if (res != 0 && skip == 0) {
4207 break;
4208 }
4209
4211
4212 if (res != 0) {
4213 continue;
4214
4215 }
4216
4218
4219 mcast_in = sort_queue_item_p->mcast;
4220 assert (mcast_in != (struct mcast *)0xdeadbeef);
4221
4223 if (mcast_in->header.magic != TOTEM_MH_MAGIC) {
4225 mcast_endian_convert (mcast_in, &mcast_header);
4226 } else {
4227 memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4228 }
4229
4230 aligned_system_from = mcast_header.system_from;
4231
4232 /*
4233 * Skip messages not originated in instance->my_deliver_memb
4234 */
4235 if (skip &&
4236 memb_set_subset (&aligned_system_from,
4237 1,
4238 instance->my_deliver_memb_list,
4239 instance->my_deliver_memb_entries) == 0) {
4240
4242
4243 continue;
4244 }
4245
4246 /*
4247 * Message found
4248 */
4250 "Delivering MCAST message with seq %x to pending delivery queue",
4251 mcast_header.seq);
4252
4253 /*
4254 * Message is locally originated multicast
4255 */
4256 instance->totemsrp_deliver_fn (
4257 mcast_header.header.nodeid,
4258 ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4259 sort_queue_item_p->msg_len - sizeof (struct mcast),
4261 }
4262}
4263
4264/*
4265 * recv message handler called when MCAST message type received
4266 */
4267static int message_handler_mcast (
4268 struct totemsrp_instance *instance,
4269 const void *msg,
4270 size_t msg_len,
4272{
4274 struct sq *sort_queue;
4275 struct mcast mcast_header;
4277
4278 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4279 return (0);
4280 }
4281
4283 mcast_endian_convert (msg, &mcast_header);
4284 } else {
4285 memcpy (&mcast_header, msg, sizeof (struct mcast));
4286 }
4287
4288 if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4289 sort_queue = &instance->recovery_sort_queue;
4290 } else {
4291 sort_queue = &instance->regular_sort_queue;
4292 }
4293
4294 assert (msg_len <= FRAME_SIZE_MAX);
4295
4296#ifdef TEST_DROP_MCAST_PERCENTAGE
4297 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4298 return (0);
4299 }
4300#endif
4301
4302 /*
4303 * If the message is foreign execute the switch below
4304 */
4305 if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4306 sizeof (struct memb_ring_id)) != 0) {
4307
4308 aligned_system_from = mcast_header.system_from;
4309
4310 switch (instance->memb_state) {
4312 memb_set_merge (
4314 instance->my_proc_list, &instance->my_proc_list_entries);
4315 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4316 break;
4317
4318 case MEMB_STATE_GATHER:
4319 if (!memb_set_subset (
4321 1,
4322 instance->my_proc_list,
4323 instance->my_proc_list_entries)) {
4324
4325 memb_set_merge (&aligned_system_from, 1,
4326 instance->my_proc_list, &instance->my_proc_list_entries);
4327 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4328 return (0);
4329 }
4330 break;
4331
4332 case MEMB_STATE_COMMIT:
4333 /* discard message */
4334 instance->stats.rx_msg_dropped++;
4335 break;
4336
4338 /* discard message */
4339 instance->stats.rx_msg_dropped++;
4340 break;
4341 }
4342 return (0);
4343 }
4344
4346 "Received ringid (" CS_PRI_RING_ID ") seq %x",
4347 mcast_header.ring_id.rep,
4348 (uint64_t)mcast_header.ring_id.seq,
4349 mcast_header.seq);
4350
4351 /*
4352 * Add mcast message to rtr queue if not already in rtr queue
4353 * otherwise free io vectors
4354 */
4355 if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4356 sq_in_range (sort_queue, mcast_header.seq) &&
4357 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4358
4359 /*
4360 * Allocate new multicast memory block
4361 */
4362// TODO LEAK
4363 sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4364 if (sort_queue_item.mcast == NULL) {
4365 return (-1); /* error here is corrected by the algorithm */
4366 }
4367 memcpy (sort_queue_item.mcast, msg, msg_len);
4368 sort_queue_item.msg_len = msg_len;
4369
4370 if (sq_lt_compare (instance->my_high_seq_received,
4371 mcast_header.seq)) {
4372 instance->my_high_seq_received = mcast_header.seq;
4373 }
4374
4375 sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4376 }
4377
4378 update_aru (instance);
4379 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4380 messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4381 }
4382
4383/* TODO remove from retrans message queue for old ring in recovery state */
4384 return (0);
4385}
4386
4387static int message_handler_memb_merge_detect (
4388 struct totemsrp_instance *instance,
4389 const void *msg,
4390 size_t msg_len,
4392{
4395
4396 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4397 return (0);
4398 }
4399
4401 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4402 } else {
4404 sizeof (struct memb_merge_detect));
4405 }
4406
4407 /*
4408 * do nothing if this is a merge detect from this configuration
4409 */
4410 if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4411 sizeof (struct memb_ring_id)) == 0) {
4412
4413 return (0);
4414 }
4415
4417
4418 /*
4419 * Execute merge operation
4420 */
4421 switch (instance->memb_state) {
4423 memb_set_merge (&aligned_system_from, 1,
4424 instance->my_proc_list, &instance->my_proc_list_entries);
4425 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4426 break;
4427
4428 case MEMB_STATE_GATHER:
4429 if (!memb_set_subset (
4431 1,
4432 instance->my_proc_list,
4433 instance->my_proc_list_entries)) {
4434
4435 memb_set_merge (&aligned_system_from, 1,
4436 instance->my_proc_list, &instance->my_proc_list_entries);
4437 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4438 return (0);
4439 }
4440 break;
4441
4442 case MEMB_STATE_COMMIT:
4443 /* do nothing in commit */
4444 break;
4445
4447 /* do nothing in recovery */
4448 break;
4449 }
4450 return (0);
4451}
4452
4453static void memb_join_process (
4454 struct totemsrp_instance *instance,
4455 const struct memb_join *memb_join)
4456{
4457 struct srp_addr *proc_list;
4458 struct srp_addr *failed_list;
4459 int gather_entered = 0;
4463
4467
4468 log_printf(instance->totemsrp_log_level_trace, "memb_join_process");
4469 memb_set_log(instance, instance->totemsrp_log_level_trace,
4470 "proclist", proc_list, memb_join->proc_list_entries);
4471 memb_set_log(instance, instance->totemsrp_log_level_trace,
4473 memb_set_log(instance, instance->totemsrp_log_level_trace,
4474 "my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4475 memb_set_log(instance, instance->totemsrp_log_level_trace,
4476 "my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4477
4479 if (instance->flushing) {
4482 "Discarding LEAVE message during flush, nodeid=" CS_PRI_NODE_ID,
4484 if (memb_join->failed_list_entries > 0) {
4485 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4486 }
4487 } else {
4489 "Discarding JOIN message during flush, nodeid=" CS_PRI_NODE_ID, memb_join->header.nodeid);
4490 }
4491 return;
4492 } else {
4495 "Received LEAVE message from " CS_PRI_NODE_ID, memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4496 if (memb_join->failed_list_entries > 0) {
4497 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4498 }
4499 }
4500 }
4501
4502 }
4503
4504 if (memb_set_equal (proc_list,
4506 instance->my_proc_list,
4507 instance->my_proc_list_entries) &&
4508
4509 memb_set_equal (failed_list,
4511 instance->my_failed_list,
4512 instance->my_failed_list_entries)) {
4513
4515 memb_consensus_set (instance, &aligned_system_from);
4516 }
4517
4518 if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4519 instance->failed_to_recv = 0;
4520 instance->my_proc_list[0] = instance->my_id;
4521 instance->my_proc_list_entries = 1;
4522 instance->my_failed_list_entries = 0;
4523
4524 memb_state_commit_token_create (instance);
4525
4526 memb_state_commit_enter (instance);
4527 return;
4528 }
4529 if (memb_consensus_agreed (instance) &&
4530 memb_lowest_in_config (instance)) {
4531
4532 memb_state_commit_token_create (instance);
4533
4534 memb_state_commit_enter (instance);
4535 } else {
4536 goto out;
4537 }
4538 } else
4539 if (memb_set_subset (proc_list,
4541 instance->my_proc_list,
4542 instance->my_proc_list_entries) &&
4543
4544 memb_set_subset (failed_list,
4546 instance->my_failed_list,
4547 instance->my_failed_list_entries)) {
4548
4549 goto out;
4550 } else
4551 if (memb_set_subset (&aligned_system_from, 1,
4552 instance->my_failed_list, instance->my_failed_list_entries)) {
4553
4554 goto out;
4555 } else {
4556 memb_set_merge (proc_list,
4558 instance->my_proc_list, &instance->my_proc_list_entries);
4559
4560 if (memb_set_subset (
4561 &instance->my_id, 1,
4563
4564 memb_set_merge (
4566 instance->my_failed_list, &instance->my_failed_list_entries);
4567 } else {
4568 if (memb_set_subset (
4570 instance->my_memb_list,
4571 instance->my_memb_entries)) {
4572
4573 if (memb_set_subset (
4575 instance->my_failed_list,
4576 instance->my_failed_list_entries) == 0) {
4577
4578 memb_set_merge (failed_list,
4580 instance->my_failed_list, &instance->my_failed_list_entries);
4581 } else {
4582 memb_set_subtract (fail_minus_memb,
4586 instance->my_memb_list,
4587 instance->my_memb_entries);
4588
4589 memb_set_merge (fail_minus_memb,
4591 instance->my_failed_list,
4592 &instance->my_failed_list_entries);
4593 }
4594 }
4595 }
4596 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4597 gather_entered = 1;
4598 }
4599
4600out:
4601 if (gather_entered == 0 &&
4602 instance->memb_state == MEMB_STATE_OPERATIONAL) {
4603
4604 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4605 }
4606}
4607
4608static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4609{
4610 int i;
4611 struct srp_addr *in_proc_list;
4612 struct srp_addr *in_failed_list;
4613 struct srp_addr *out_proc_list;
4614 struct srp_addr *out_failed_list;
4615
4616 out->header.magic = TOTEM_MH_MAGIC;
4617 out->header.version = TOTEM_MH_VERSION;
4618 out->header.type = in->header.type;
4619 out->header.nodeid = swab32 (in->header.nodeid);
4620 out->system_from = srp_addr_endian_convert(in->system_from);
4621 out->proc_list_entries = swab32 (in->proc_list_entries);
4622 out->failed_list_entries = swab32 (in->failed_list_entries);
4623 out->ring_seq = swab64 (in->ring_seq);
4624
4625 in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4626 in_failed_list = in_proc_list + out->proc_list_entries;
4627 out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4628 out_failed_list = out_proc_list + out->proc_list_entries;
4629
4630 for (i = 0; i < out->proc_list_entries; i++) {
4631 out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]);
4632 }
4633 for (i = 0; i < out->failed_list_entries; i++) {
4634 out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]);
4635 }
4636}
4637
4638static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4639{
4640 int i;
4641 struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4642 struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4645
4646 out->header.magic = TOTEM_MH_MAGIC;
4647 out->header.version = TOTEM_MH_VERSION;
4648 out->header.type = in->header.type;
4649 out->header.nodeid = swab32 (in->header.nodeid);
4650 out->token_seq = swab32 (in->token_seq);
4651 out->ring_id.rep = swab32(in->ring_id.rep);
4652 out->ring_id.seq = swab64 (in->ring_id.seq);
4653 out->retrans_flg = swab32 (in->retrans_flg);
4654 out->memb_index = swab32 (in->memb_index);
4655 out->addr_entries = swab32 (in->addr_entries);
4656
4657 in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4658 out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4659 for (i = 0; i < out->addr_entries; i++) {
4660 out_addr[i] = srp_addr_endian_convert (in_addr[i]);
4661
4662 /*
4663 * Only convert the memb entry if it has been set
4664 */
4665 if (in_memb_list[i].ring_id.rep != 0) {
4666 out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep);
4667
4668 out_memb_list[i].ring_id.seq =
4671 out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4672 out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4673 }
4674 }
4675}
4676
4677static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4678{
4679 int i;
4680
4681 out->header.magic = TOTEM_MH_MAGIC;
4682 out->header.version = TOTEM_MH_VERSION;
4683 out->header.type = in->header.type;
4684 out->header.nodeid = swab32 (in->header.nodeid);
4685 out->seq = swab32 (in->seq);
4686 out->token_seq = swab32 (in->token_seq);
4687 out->aru = swab32 (in->aru);
4688 out->ring_id.rep = swab32(in->ring_id.rep);
4689 out->aru_addr = swab32(in->aru_addr);
4690 out->ring_id.seq = swab64 (in->ring_id.seq);
4691 out->fcc = swab32 (in->fcc);
4692 out->backlog = swab32 (in->backlog);
4693 out->retrans_flg = swab32 (in->retrans_flg);
4694 out->rtr_list_entries = swab32 (in->rtr_list_entries);
4695 for (i = 0; i < out->rtr_list_entries; i++) {
4696 out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep);
4697 out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4698 out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4699 }
4700}
4701
4702static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4703{
4704 out->header.magic = TOTEM_MH_MAGIC;
4705 out->header.version = TOTEM_MH_VERSION;
4706 out->header.type = in->header.type;
4707 out->header.nodeid = swab32 (in->header.nodeid);
4708 out->header.encapsulated = in->header.encapsulated;
4709
4710 out->seq = swab32 (in->seq);
4711 out->this_seqno = swab32 (in->this_seqno);
4712 out->ring_id.rep = swab32(in->ring_id.rep);
4713 out->ring_id.seq = swab64 (in->ring_id.seq);
4714 out->node_id = swab32 (in->node_id);
4715 out->guarantee = swab32 (in->guarantee);
4716 out->system_from = srp_addr_endian_convert(in->system_from);
4717}
4718
4719static void memb_merge_detect_endian_convert (
4720 const struct memb_merge_detect *in,
4721 struct memb_merge_detect *out)
4722{
4723 out->header.magic = TOTEM_MH_MAGIC;
4724 out->header.version = TOTEM_MH_VERSION;
4725 out->header.type = in->header.type;
4726 out->header.nodeid = swab32 (in->header.nodeid);
4727 out->ring_id.rep = swab32(in->ring_id.rep);
4728 out->ring_id.seq = swab64 (in->ring_id.seq);
4729 out->system_from = srp_addr_endian_convert (in->system_from);
4730}
4731
4732static int ignore_join_under_operational (
4733 struct totemsrp_instance *instance,
4734 const struct memb_join *memb_join)
4735{
4736 struct srp_addr *proc_list;
4737 struct srp_addr *failed_list;
4738 unsigned long long ring_seq;
4740
4745
4746 if (memb_set_subset (&instance->my_id, 1,
4748 return (1);
4749 }
4750
4751 /*
4752 * In operational state, my_proc_list is exactly the same as
4753 * my_memb_list.
4754 */
4755 if ((memb_set_subset (&aligned_system_from, 1,
4756 instance->my_memb_list, instance->my_memb_entries)) &&
4757 (ring_seq < instance->my_ring_id.seq)) {
4758 return (1);
4759 }
4760
4761 return (0);
4762}
4763
4764static int message_handler_memb_join (
4765 struct totemsrp_instance *instance,
4766 const void *msg,
4767 size_t msg_len,
4769{
4770 const struct memb_join *memb_join;
4771 struct memb_join *memb_join_convert = alloca (msg_len);
4773
4774 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4775 return (0);
4776 }
4777
4780 memb_join_endian_convert (msg, memb_join_convert);
4781
4782 } else {
4783 memb_join = msg;
4784 }
4785
4787
4788 /*
4789 * If the process paused because it wasn't scheduled in a timely
4790 * fashion, flush the join messages because they may be queued
4791 * entries
4792 */
4793 if (pause_flush (instance)) {
4794 return (0);
4795 }
4796
4797 if (instance->token_ring_id_seq < memb_join->ring_seq) {
4799 }
4800 switch (instance->memb_state) {
4802 if (!ignore_join_under_operational (instance, memb_join)) {
4803 memb_join_process (instance, memb_join);
4804 }
4805 break;
4806
4807 case MEMB_STATE_GATHER:
4808 memb_join_process (instance, memb_join);
4809 break;
4810
4811 case MEMB_STATE_COMMIT:
4812 if (memb_set_subset (&aligned_system_from,
4813 1,
4814 instance->my_new_memb_list,
4815 instance->my_new_memb_entries) &&
4816
4817 memb_join->ring_seq >= instance->my_ring_id.seq) {
4818
4819 memb_join_process (instance, memb_join);
4820 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4821 }
4822 break;
4823
4825 if (memb_set_subset (&aligned_system_from,
4826 1,
4827 instance->my_new_memb_list,
4828 instance->my_new_memb_entries) &&
4829
4830 memb_join->ring_seq >= instance->my_ring_id.seq) {
4831
4832 memb_join_process (instance, memb_join);
4833 memb_recovery_state_token_loss (instance);
4834 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4835 }
4836 break;
4837 }
4838 return (0);
4839}
4840
4841static int message_handler_memb_commit_token (
4842 struct totemsrp_instance *instance,
4843 const void *msg,
4844 size_t msg_len,
4846{
4850 int sub_entries;
4851
4852 struct srp_addr *addr;
4853
4855 "got commit token");
4856
4857 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4858 return (0);
4859 }
4860
4862 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4863 } else {
4864 memcpy (memb_commit_token_convert, msg, msg_len);
4865 }
4868
4869#ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4871 return (0);
4872 }
4873#endif
4874 switch (instance->memb_state) {
4876 /* discard token */
4877 break;
4878
4879 case MEMB_STATE_GATHER:
4880 memb_set_subtract (sub, &sub_entries,
4881 instance->my_proc_list, instance->my_proc_list_entries,
4882 instance->my_failed_list, instance->my_failed_list_entries);
4883
4884 if (memb_set_equal (addr,
4886 sub,
4887 sub_entries) &&
4888
4890 memcpy (instance->commit_token, memb_commit_token, msg_len);
4891 memb_state_commit_enter (instance);
4892 }
4893 break;
4894
4895 case MEMB_STATE_COMMIT:
4896 /*
4897 * If retransmitted commit tokens are sent on this ring
4898 * filter them out and only enter recovery once the
4899 * commit token has traversed the array. This is
4900 * determined by :
4901 * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4902 */
4903 if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4905 memb_state_recovery_enter (instance, memb_commit_token);
4906 }
4907 break;
4908
4910 if (instance->my_id.nodeid == instance->my_ring_id.rep) {
4911
4912 /* Filter out duplicated tokens */
4913 if (instance->originated_orf_token) {
4914 break;
4915 }
4916
4917 instance->originated_orf_token = 1;
4918
4920 "Sending initial ORF token");
4921
4922 // TODO convert instead of initiate
4923 orf_token_send_initial (instance);
4924 reset_token_timeout (instance); // REVIEWED
4925 reset_token_retransmit_timeout (instance); // REVIEWED
4926 }
4927 break;
4928 }
4929 return (0);
4930}
4931
4932static int message_handler_token_hold_cancel (
4933 struct totemsrp_instance *instance,
4934 const void *msg,
4935 size_t msg_len,
4937{
4938 const struct token_hold_cancel *token_hold_cancel = msg;
4939
4940 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4941 return (0);
4942 }
4943
4944 if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4945 sizeof (struct memb_ring_id)) == 0) {
4946
4947 instance->my_seq_unchanged = 0;
4948 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
4949 timer_function_token_retransmit_timeout (instance);
4950 }
4951 }
4952 return (0);
4953}
4954
4955static int check_message_header_validity(
4956 void *context,
4957 const void *msg,
4958 unsigned int msg_len,
4959 const struct sockaddr_storage *system_from)
4960{
4961 struct totemsrp_instance *instance = context;
4962 const struct totem_message_header *message_header = msg;
4963 const char *guessed_str;
4964 const char *msg_byte = msg;
4965
4966 if (msg_len < sizeof (struct totem_message_header)) {
4968 "Message received from %s is too short... Ignoring %u.",
4969 totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len);
4970 return (-1);
4971 }
4972
4973 if (message_header->magic != TOTEM_MH_MAGIC &&
4975 /*
4976 * We've received ether Knet, old version of Corosync,
4977 * or something else. Do some guessing to display (hopefully)
4978 * helpful message
4979 */
4980 guessed_str = NULL;
4981
4982 if (message_header->magic == 0xFFFF) {
4983 /*
4984 * Corosync 2.2 used header with two UINT8_MAX
4985 */
4986 guessed_str = "Corosync 2.2";
4987 } else if (message_header->magic == 0xFEFE) {
4988 /*
4989 * Corosync 2.3+ used header with two UINT8_MAX - 1
4990 */
4991 guessed_str = "Corosync 2.3+";
4992 } else if (msg_byte[0] == 0x01) {
4993 /*
4994 * Knet has stable1 with first byte of message == 1
4995 */
4996 guessed_str = "unencrypted Kronosnet";
4997 } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
4998 /*
4999 * Unencrypted Corosync 1.x/OpenAIS has first byte
5000 * 0-5. Collision with Knet (but still worth the try)
5001 */
5002 guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
5003 } else {
5004 /*
5005 * Encrypted Kronosned packet has a hash at the end of
5006 * the packet and nothing specific at the beginning of the
5007 * packet (just encrypted data).
5008 * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest
5009 * is in the beginning of the packet.
5010 *
5011 * So it's not possible to reliably detect ether of them.
5012 */
5013 guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
5014 }
5015
5017 "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
5019 guessed_str);
5020
5021 return (-1);
5022 }
5023
5024 if (message_header->version != TOTEM_MH_VERSION) {
5026 "Message received from %s has unsupported version %u... Ignoring",
5028 message_header->version);
5029
5030 return (-1);
5031 }
5032
5033 return (0);
5034}
5035
5036
5038 void *context,
5039 const void *msg,
5040 unsigned int msg_len,
5041 const struct sockaddr_storage *system_from)
5042{
5043 struct totemsrp_instance *instance = context;
5044 const struct totem_message_header *message_header = msg;
5045
5046 if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
5047 return -1;
5048 }
5049
5050 switch (message_header->type) {
5052 instance->stats.orf_token_rx++;
5053 break;
5054 case MESSAGE_TYPE_MCAST:
5055 instance->stats.mcast_rx++;
5056 break;
5058 instance->stats.memb_merge_detect_rx++;
5059 break;
5061 instance->stats.memb_join_rx++;
5062 break;
5064 instance->stats.memb_commit_token_rx++;
5065 break;
5067 instance->stats.token_hold_cancel_rx++;
5068 break;
5069 default:
5071 "Message received from %s has wrong type... ignoring %d.\n",
5073 (int)message_header->type);
5074
5075 instance->stats.rx_msg_dropped++;
5076 return 0;
5077 }
5078 /*
5079 * Handle incoming message
5080 */
5082 instance,
5083 msg,
5084 msg_len,
5085 message_header->magic != TOTEM_MH_MAGIC);
5086}
5087
5089 void *context,
5090 const struct totem_ip_address *interface_addr,
5091 unsigned short ip_port,
5092 unsigned int iface_no)
5093{
5094 struct totemsrp_instance *instance = context;
5095 int res;
5096
5098
5100 instance->totemnet_context,
5102 ip_port,
5103 iface_no);
5104
5105 return (res);
5106}
5107
5108/* Contrary to its name, this only gets called when the interface is enabled */
5110 void *context,
5111 const struct totem_ip_address *iface_addr,
5112 unsigned int iface_no)
5113{
5114 struct totemsrp_instance *instance = context;
5115 int num_interfaces;
5116 int i;
5117 int res = 0;
5118
5119 if (!instance->my_id.nodeid) {
5120 instance->my_id.nodeid = iface_addr->nodeid;
5121 }
5123
5124 if (instance->iface_changes++ == 0) {
5125 instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid);
5126 /*
5127 * Increase the ring_id sequence number. This doesn't follow specification.
5128 * Solves problem with restarted leader node (node with lowest nodeid) before
5129 * rest of the cluster forms new membership and guarantees unique ring_id for
5130 * new singleton configuration.
5131 */
5132 instance->my_ring_id.seq++;
5133
5134 instance->token_ring_id_seq = instance->my_ring_id.seq;
5135 log_printf (
5136 instance->totemsrp_log_level_debug,
5137 "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5138 instance->my_ring_id.rep,
5139 (uint64_t)instance->my_ring_id.seq);
5140
5141 if (instance->totemsrp_service_ready_fn) {
5142 instance->totemsrp_service_ready_fn ();
5143 }
5144
5145 }
5146
5147 num_interfaces = 0;
5148 for (i = 0; i < INTERFACE_MAX; i++) {
5149 if (instance->totem_config->interfaces[i].configured) {
5151 }
5152 }
5153
5154 if (instance->iface_changes >= num_interfaces) {
5155 /* We need to clear orig_interfaces so that 'commit' diffs against nothing */
5156 instance->totem_config->orig_interfaces = malloc (sizeof (struct totem_interface) * INTERFACE_MAX);
5157 assert(instance->totem_config->orig_interfaces != NULL);
5159
5161
5162 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5163 free(instance->totem_config->orig_interfaces);
5164 }
5165 return res;
5166}
5167
5169 totem_config->net_mtu -= 2 * sizeof (struct mcast);
5170}
5171
5173 void *context,
5174 void (*totem_service_ready) (void))
5175{
5176 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5177
5179}
5180
5182 void *context,
5183 const struct totem_ip_address *member,
5184 int iface_no)
5185{
5186 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5187 int res;
5188
5189 res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no);
5190
5191 return (res);
5192}
5193
5195 void *context,
5196 const struct totem_ip_address *member,
5197 int iface_no)
5198{
5199 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5200 int res;
5201
5202 res = totemnet_member_remove (instance->totemnet_context, member, iface_no);
5203
5204 return (res);
5205}
5206
5208{
5209 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5210
5211 instance->threaded_mode_enabled = 1;
5212}
5213
5214void totemsrp_trans_ack (void *context)
5215{
5216 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5217
5218 instance->waiting_trans_ack = 0;
5220}
5221
5222
5224{
5225 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5226 int res;
5227
5229 return (res);
5230}
5231
5233{
5234 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5235 int res;
5236
5238 return (res);
5239}
5240
5241void totemsrp_stats_clear (void *context, int flags)
5242{
5243 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5244
5245 memset(&instance->stats, 0, sizeof(totemsrp_stats_t));
5248 }
5249}
5250
5251void totemsrp_force_gather (void *context)
5252{
5253 timer_function_orf_token_timeout(context);
5254}
totem_configuration_type
The totem_configuration_type enum.
Definition coroapi.h:132
@ TOTEM_CONFIGURATION_REGULAR
Definition coroapi.h:133
@ TOTEM_CONFIGURATION_TRANSITIONAL
Definition coroapi.h:134
#define INTERFACE_MAX
Definition coroapi.h:88
#define MESSAGE_QUEUE_MAX
Definition coroapi.h:98
unsigned int nodeid
Definition coroapi.h:0
unsigned char addr[TOTEMIP_ADDRLEN]
Definition coroapi.h:2
totem_callback_token_type
The totem_callback_token_type enum.
Definition coroapi.h:142
@ TOTEM_CALLBACK_TOKEN_SENT
Definition coroapi.h:144
@ TOTEM_CALLBACK_TOKEN_RECEIVED
Definition coroapi.h:143
#define PROCESSOR_COUNT_MAX
Definition coroapi.h:96
#define CS_PRI_RING_ID_SEQ
Definition corotypes.h:61
#define CS_PRI_NODE_ID
Definition corotypes.h:59
#define CS_PRI_RING_ID
Definition corotypes.h:62
uint32_t flags
uint32_t value
icmap_map_t icmap_get_global_map(void)
Return global icmap.
Definition icmap.c:268
#define LOGSYS_LEVEL_DEBUG
Definition logsys.h:76
struct srp_addr addr
Definition totemsrp.c:164
int guarantee
Definition totemsrp.c:190
unsigned int node_id
Definition totemsrp.c:189
struct memb_ring_id ring_id
Definition totemsrp.c:188
struct totem_message_header header
Definition totemsrp.c:184
unsigned int seq
Definition totemsrp.c:186
int this_seqno
Definition totemsrp.c:187
struct srp_addr system_from
Definition totemsrp.c:185
Definition totemsrp.c:243
unsigned int aru
Definition totemsrp.c:245
unsigned int received_flg
Definition totemsrp.c:247
struct memb_ring_id ring_id
Definition totemsrp.c:244
unsigned int high_delivered
Definition totemsrp.c:246
unsigned int retrans_flg
Definition totemsrp.c:255
struct totem_message_header header
Definition totemsrp.c:252
unsigned char end_of_commit_token[0]
Definition totemsrp.c:258
unsigned int token_seq
Definition totemsrp.c:253
struct memb_ring_id ring_id
Definition totemsrp.c:254
struct srp_addr system_from
Definition totemsrp.c:217
struct totem_message_header header
Definition totemsrp.c:216
unsigned char end_of_memb_join[0]
Definition totemsrp.c:221
unsigned long long ring_seq
Definition totemsrp.c:220
unsigned int failed_list_entries
Definition totemsrp.c:219
unsigned int proc_list_entries
Definition totemsrp.c:218
struct totem_message_header header
Definition totemsrp.c:231
struct memb_ring_id ring_id
Definition totemsrp.c:233
struct srp_addr system_from
Definition totemsrp.c:232
The memb_ring_id struct.
Definition coroapi.h:122
unsigned long long seq
Definition coroapi.h:124
unsigned int rep
Definition totem.h:150
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition totemsrp.c:535
unsigned int msg_len
Definition totemsrp.c:269
struct mcast * mcast
Definition totemsrp.c:268
unsigned int backlog
Definition totemsrp.c:207
unsigned int token_seq
Definition totemsrp.c:203
unsigned int aru_addr
Definition totemsrp.c:205
unsigned int fcc
Definition totemsrp.c:208
unsigned int aru
Definition totemsrp.c:204
int rtr_list_entries
Definition totemsrp.c:210
struct rtr_item rtr_list[0]
Definition totemsrp.c:211
int retrans_flg
Definition totemsrp.c:209
unsigned int seq
Definition totemsrp.c:202
struct totem_message_header header
Definition totemsrp.c:201
struct memb_ring_id ring_id
Definition totemsrp.c:206
struct memb_ring_id ring_id
Definition totemsrp.c:195
unsigned int seq
Definition totemsrp.c:196
unsigned int msg_len
Definition totemsrp.c:274
struct mcast * mcast
Definition totemsrp.c:273
The sq struct.
Definition sq.h:43
unsigned int nodeid
Definition totemsrp.c:108
struct qb_list_head list
Definition totemsrp.c:170
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition totemsrp.c:171
enum totem_callback_token_type callback_type
Definition totemsrp.c:172
struct totem_message_header header
Definition totemsrp.c:238
struct memb_ring_id ring_id
Definition totemsrp.c:239
unsigned int max_messages
Definition totem.h:220
unsigned int heartbeat_failures_allowed
Definition totem.h:214
unsigned int token_timeout
Definition totem.h:182
unsigned int window_size
Definition totem.h:218
struct totem_logging_configuration totem_logging_configuration
Definition totem.h:208
unsigned int downcheck_timeout
Definition totem.h:200
unsigned int miss_count_const
Definition totem.h:242
struct totem_interface * interfaces
Definition totem.h:165
unsigned int cancel_token_hold_on_retransmit
Definition totem.h:248
unsigned int fail_to_recv_const
Definition totem.h:202
unsigned int merge_timeout
Definition totem.h:198
struct totem_interface * orig_interfaces
Definition totem.h:166
unsigned int net_mtu
Definition totem.h:210
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totem.h:250
unsigned int token_retransmits_before_loss_const
Definition totem.h:190
unsigned int max_network_delay
Definition totem.h:216
unsigned int seqno_unchanged_const
Definition totem.h:204
unsigned int consensus_timeout
Definition totem.h:196
unsigned int threads
Definition totem.h:212
unsigned int send_join_timeout
Definition totem.h:194
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totem.h:254
unsigned int token_retransmit_timeout
Definition totem.h:186
unsigned int token_warning
Definition totem.h:184
unsigned int join_timeout
Definition totem.h:192
unsigned int token_hold_timeout
Definition totem.h:188
struct totem_ip_address boundto
Definition totem.h:84
uint8_t configured
Definition totem.h:89
int member_count
Definition totem.h:90
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition totem.h:97
struct totem_ip_address mcast_addr
Definition totem.h:85
The totem_ip_address struct.
Definition coroapi.h:111
unsigned int nodeid
Definition coroapi.h:112
unsigned short family
Definition coroapi.h:113
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition totem.h:101
void(*) in log_level_security)
Definition totem.h:110
unsigned int nodeid
Definition totem.h:131
unsigned short magic
Definition totem.h:127
struct totem_ip_address mcast_address
Definition totemsrp.c:452
totemsrp_stats_t stats
Definition totemsrp.c:516
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:320
int consensus_list_entries
Definition totemsrp.c:300
int my_merge_detect_timeout_outstanding
Definition totemsrp.c:346
unsigned int my_last_seq
Definition totemsrp.c:496
qb_loop_timer_handle timer_heartbeat_timeout
Definition totemsrp.c:419
unsigned int my_token_seq
Definition totemsrp.c:396
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition totemsrp.c:413
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:298
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition totemsrp.c:415
uint64_t pause_timestamp
Definition totemsrp.c:512
uint32_t threaded_mode_enabled
Definition totemsrp.c:522
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:316
void * totemnet_context
Definition totemsrp.c:500
int my_leave_memb_entries
Definition totemsrp.c:338
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:308
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:312
int my_failed_list_entries
Definition totemsrp.c:326
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:310
unsigned int use_heartbeat
Definition totemsrp.c:504
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totemsrp.c:472
void(*) enum memb_stat memb_state)
Definition totemsrp.c:446
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition totemsrp.c:417
struct cs_queue new_message_queue
Definition totemsrp.c:371
int orf_token_retransmit_size
Definition totemsrp.c:394
unsigned int my_high_seq_received
Definition totemsrp.c:354
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition totemsrp.c:454
uint32_t orf_token_discard
Definition totemsrp.c:518
struct qb_list_head token_callback_sent_listhead
Definition totemsrp.c:390
unsigned int last_released
Definition totemsrp.c:486
unsigned int set_aru
Definition totemsrp.c:488
int totemsrp_log_level_notice
Definition totemsrp.c:430
struct cs_queue new_message_queue_trans
Definition totemsrp.c:373
int totemsrp_log_level_trace
Definition totemsrp.c:434
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition totemsrp.c:392
unsigned int my_trc
Definition totemsrp.c:506
struct cs_queue retrans_message_queue
Definition totemsrp.c:375
struct memb_ring_id my_ring_id
Definition totemsrp.c:340
int totemsrp_log_level_error
Definition totemsrp.c:426
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition totemsrp.c:469
unsigned int old_ring_state_high_seq_received
Definition totemsrp.c:494
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition totemsrp.c:409
qb_loop_timer_handle timer_pause_timeout
Definition totemsrp.c:401
unsigned int my_high_ring_delivered
Definition totemsrp.c:364
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition totemsrp.c:407
struct totem_config * totem_config
Definition totemsrp.c:502
int my_deliver_memb_entries
Definition totemsrp.c:334
void(* totemsrp_service_ready_fn)(void)
Definition totemsrp.c:467
int my_trans_memb_entries
Definition totemsrp.c:330
uint32_t originated_orf_token
Definition totemsrp.c:520
void * token_recv_event_handle
Definition totemsrp.c:528
struct sq recovery_sort_queue
Definition totemsrp.c:379
qb_loop_timer_handle timer_orf_token_timeout
Definition totemsrp.c:403
qb_loop_timer_handle timer_merge_detect_timeout
Definition totemsrp.c:411
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:322
void * token_sent_event_handle
Definition totemsrp.c:529
unsigned int my_high_delivered
Definition totemsrp.c:386
int totemsrp_log_level_security
Definition totemsrp.c:424
int totemsrp_log_level_warning
Definition totemsrp.c:428
struct memb_commit_token * commit_token
Definition totemsrp.c:514
char commit_token_storage[40000]
Definition totemsrp.c:530
struct memb_ring_id my_old_ring_id
Definition totemsrp.c:342
struct timeval tv_old
Definition totemsrp.c:498
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totemsrp.c:476
qb_loop_t * totemsrp_poll_handle
Definition totemsrp.c:450
unsigned int my_install_seq
Definition totemsrp.c:356
qb_loop_timer_handle timer_orf_token_warning
Definition totemsrp.c:405
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:314
struct srp_addr my_id
Definition totemsrp.c:304
unsigned int my_cbl
Definition totemsrp.c:510
struct qb_list_head token_callback_received_listhead
Definition totemsrp.c:388
unsigned int my_last_aru
Definition totemsrp.c:348
unsigned int my_aru
Definition totemsrp.c:384
uint32_t waiting_trans_ack
Definition totemsrp.c:524
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition totemsrp.c:438
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition totemsrp.c:460
struct sq regular_sort_queue
Definition totemsrp.c:377
unsigned long long token_ring_id_seq
Definition totemsrp.c:484
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:318
int totemsrp_log_level_debug
Definition totemsrp.c:432
unsigned int my_pbl
Definition totemsrp.c:508
struct totem_ip_address my_addrs[INTERFACE_MAX]
Definition totemsrp.c:306
uint64_t memb_join_tx
Definition totemstats.h:59
uint32_t continuous_gather
Definition totemstats.h:78
uint64_t recovery_entered
Definition totemstats.h:74
uint64_t rx_msg_dropped
Definition totemstats.h:77
uint64_t gather_entered
Definition totemstats.h:70
uint64_t memb_commit_token_rx
Definition totemstats.h:65
uint64_t mcast_retx
Definition totemstats.h:62
uint64_t mcast_tx
Definition totemstats.h:61
uint64_t memb_commit_token_tx
Definition totemstats.h:64
uint64_t operational_token_lost
Definition totemstats.h:69
uint64_t operational_entered
Definition totemstats.h:68
uint64_t gather_token_lost
Definition totemstats.h:71
uint64_t commit_token_lost
Definition totemstats.h:73
uint64_t token_hold_cancel_tx
Definition totemstats.h:66
uint64_t orf_token_rx
Definition totemstats.h:56
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition totemstats.h:90
uint64_t recovery_token_lost
Definition totemstats.h:75
uint64_t commit_entered
Definition totemstats.h:72
uint64_t memb_merge_detect_rx
Definition totemstats.h:58
uint64_t memb_join_rx
Definition totemstats.h:60
uint64_t orf_token_tx
Definition totemstats.h:55
uint64_t memb_merge_detect_tx
Definition totemstats.h:57
uint64_t mcast_rx
Definition totemstats.h:63
uint64_t token_hold_cancel_rx
Definition totemstats.h:67
uint64_t consensus_timeouts
Definition totemstats.h:76
#define swab64(x)
The swab64 macro.
Definition swab.h:65
#define swab16(x)
The swab16 macro.
Definition swab.h:39
#define swab32(x)
The swab32 macro.
Definition swab.h:51
totem_event_type
Definition totem.h:290
#define TOTEM_MH_VERSION
Definition totem.h:124
#define FRAME_SIZE_MAX
Definition totem.h:52
cfg_message_crypto_reconfig_phase_t
Definition totem.h:154
#define TOTEM_NODE_STATUS_STRUCTURE_VERSION
Definition totem.h:264
#define TOTEM_MH_MAGIC
Definition totem.h:123
char type
Definition totem.h:2
int totemconfig_commit_new_params(struct totem_config *totem_config, icmap_map_t map)
const char * totemip_sa_print(const struct sockaddr *sa)
Definition totemip.c:234
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition totemip.c:123
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, int(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), int(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Definition totemnet.c:317
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemnet.c:471
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
Definition totemnet.c:553
void * totemnet_buffer_alloc(void *net_context)
Definition totemnet.c:367
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:414
int totemnet_send_flush(void *net_context)
Definition totemnet.c:404
void totemnet_buffer_release(void *net_context, void *ptr)
Definition totemnet.c:375
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:426
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition totemnet.c:533
int totemnet_finalize(void *net_context)
Definition totemnet.c:306
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
Definition totemnet.c:292
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition totemnet.c:497
int totemnet_processor_count_set(void *net_context, int processor_count)
Definition totemnet.c:383
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
Definition totemnet.c:510
int totemnet_recv_flush(void *net_context)
Definition totemnet.c:394
int totemnet_iface_check(void *net_context)
Definition totemnet.c:452
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:439
int totemnet_recv_mcast_empty(void *net_context)
Definition totemnet.c:522
int totemnet_nodestatus_get(void *net_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemnet.c:484
void totemnet_stats_clear(void *net_context)
Definition totemnet.c:619
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
Definition totemnet.c:589
int totemnet_crypto_reconfigure_phase(void *net_context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemnet.c:603
Totem Network interface - also does encryption/decryption.
int totemsrp_my_family_get(void *srp_context)
Definition totemsrp.c:1133
#define SEQNO_START_TOKEN
Definition totemsrp.c:122
int main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition totemsrp.c:5109
unsigned long long ring_seq
Definition totemsrp.c:4
#define RETRANSMIT_ENTRIES_MAX
Definition totemsrp.c:100
unsigned int seq
Definition totemsrp.c:2
#define log_printf(level, format, args...)
Definition totemsrp.c:690
void totemsrp_force_gather(void *context)
Definition totemsrp.c:5251
int rtr_list_entries
Definition totemsrp.c:9
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition totemsrp.c:5172
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition totemsrp.c:818
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition totemsrp.c:3496
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition totemsrp.c:97
void totemsrp_threaded_mode_enable(void *context)
Definition totemsrp.c:5207
struct rtr_item rtr_list[0]
Definition totemsrp.c:10
message_type
Definition totemsrp.c:146
@ MESSAGE_TYPE_MEMB_COMMIT_TOKEN
Definition totemsrp.c:151
@ MESSAGE_TYPE_TOKEN_HOLD_CANCEL
Definition totemsrp.c:152
@ MESSAGE_TYPE_ORF_TOKEN
Definition totemsrp.c:147
@ MESSAGE_TYPE_MEMB_JOIN
Definition totemsrp.c:150
@ MESSAGE_TYPE_MEMB_MERGE_DETECT
Definition totemsrp.c:149
@ MESSAGE_TYPE_MCAST
Definition totemsrp.c:148
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition totemsrp.c:5168
#define TOKEN_SIZE_MAX
Definition totemsrp.c:101
encapsulation_type
Definition totemsrp.c:155
@ MESSAGE_NOT_ENCAPSULATED
Definition totemsrp.c:157
@ MESSAGE_ENCAPSULATED
Definition totemsrp.c:156
unsigned int failed_list_entries
Definition totemsrp.c:3
struct message_handlers totemsrp_message_handlers
Definition totemsrp.c:678
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemsrp.c:1041
#define LEAVE_DUMMY_NODEID
Definition totemsrp.c:102
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition totemsrp.c:96
int guarantee
Definition totemsrp.c:6
unsigned int aru
Definition totemsrp.c:3
gather_state_from
Definition totemsrp.c:542
@ TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
Definition totemsrp.c:546
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
Definition totemsrp.c:551
@ TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
Definition totemsrp.c:549
@ TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
Definition totemsrp.c:543
@ TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
Definition totemsrp.c:552
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
Definition totemsrp.c:545
@ TOTEMSRP_GSFROM_MERGE_DURING_JOIN
Definition totemsrp.c:554
@ TOTEMSRP_GSFROM_INTERFACE_CHANGE
Definition totemsrp.c:558
@ TOTEMSRP_GSFROM_GATHER_MISSING1
Definition totemsrp.c:544
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
Definition totemsrp.c:547
@ TOTEMSRP_GSFROM_MAX
Definition totemsrp.c:559
@ TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
Definition totemsrp.c:556
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
Definition totemsrp.c:548
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
Definition totemsrp.c:550
@ TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
Definition totemsrp.c:555
@ TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
Definition totemsrp.c:553
@ TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
Definition totemsrp.c:557
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition totemsrp.c:1108
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition totemsrp.c:2568
void totemsrp_stats_clear(void *context, int flags)
Definition totemsrp.c:5241
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemsrp.c:5088
void totemsrp_finalize(void *srp_context)
Definition totemsrp.c:1026
struct memb_ring_id ring_id
Definition totemsrp.c:4
void totemsrp_trans_ack(void *context)
Definition totemsrp.c:5214
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemsrp.c:5232
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition totemsrp.c:1122
int addr_entries
Definition totemsrp.c:5
unsigned int backlog
Definition totemsrp.c:6
#define SEQNO_START_MSG
Definition totemsrp.c:121
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition totemsrp.c:2488
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition totemsrp.c:3531
unsigned int received_flg
Definition totemsrp.c:3
struct message_item __attribute__
int main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition totemsrp.c:5037
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5181
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition totemsrp.c:1070
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition totemsrp.c:5223
unsigned int high_delivered
Definition totemsrp.c:2
struct srp_addr system_from
Definition totemsrp.c:1
unsigned int proc_list_entries
Definition totemsrp.c:2
const char * gather_state_from_desc[]
Definition totemsrp.c:562
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5194
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition totemsrp.c:2497
memb_state
Definition totemsrp.c:277
@ MEMB_STATE_GATHER
Definition totemsrp.c:279
@ MEMB_STATE_RECOVERY
Definition totemsrp.c:281
@ MEMB_STATE_COMMIT
Definition totemsrp.c:280
@ MEMB_STATE_OPERATIONAL
Definition totemsrp.c:278
Totem Single Ring Protocol.
#define TOTEMPG_STATS_CLEAR_TRANSPORT
Definition totemstats.h:116
#define TOTEM_TOKEN_STATS_MAX
Definition totemstats.h:89