/* cluster.cc * Core code for managing Babylon clustering. */ #include #include #include //#include "cluster.h" #include "ippool.h" bcluster_t *g_cluster; bcluster_peer_t::bcluster_peer_t(bcluster_t *parent, u32 ip, int node_id) { memset(&m_sin, 0, sizeof(m_sin)); m_sin.sin_family = AF_INET; m_sin.sin_addr.s_addr = htonl(ip); m_sin.sin_port = htons(BCLUSTER_PORT); m_node_id = node_id; m_state = BCLUSTER_PEER_IDLE; m_peer_fd = -1; m_cluster = parent; m_vote = -1; } bcluster_peer_t::~bcluster_peer_t() { if (m_peer_fd != -1) { SelectSetEvents(m_peer_fd, SEL_NONE); close(m_peer_fd); } } char **strtoargv(char *str) { char **argv; int nr = 1; for (int i=0; str[i]; i++) { if (str[i] == ' ') nr++; } argv = (char **)calloc(nr+1, sizeof(char *)); for (nr=0; *str; ) { argv[nr++] = str; while (*str && !isspace(*str)) str++; if (*str && isspace(*str)) *str++ = 0; } return argv; } bcluster_t::bcluster_t(char *name, u32 ip, int node_id, int quorum) { memset(m_peers, 0, sizeof(m_peers)); memset(&m_sin, 0, sizeof(m_sin)); m_sin.sin_family = AF_INET; m_sin.sin_addr.s_addr = htonl(ip); m_sin.sin_port = htons(BCLUSTER_PORT); m_node_self = node_id; m_quorum = quorum; m_name = strdup(name); m_cluster_state = BCLUSTER_STATE_INACTIVE; m_listen_fd = -1; m_our_vote = -1; m_master = -1; m_my_request_id = 0; //FIXME: get_random(m_election_id, BCLUSTER_ELECTION_ID_BYTES); memset(m_election_id, node_id, BCLUSTER_ELECTION_ID_BYTES); return; } bcluster_t::~bcluster_t() { for (unsigned i=0; iprintf("bcluster usage:\n"); cfd->printf(" bcluster create \n"); cfd->printf(" bcluster destroy\n"); cfd->printf(" bcluster add-peer \n"); cfd->printf(" bcluster del-peer \n"); cfd->printf(" bcluster status\n"); cfd->printf(" bcluster ippool-print\n"); cfd->printf(" bcluster ippool-alloc [--cluster] [--user=] \n"); cfd->done(-2); } void bcluster_t::add_peer(ctrlfd_t *cfd, u32 ip, int node_id) { if (m_peers[node_id]) { cfd->printf("node id %d already exists\n", node_id); cfd->done(-2); return; } if (node_id == m_node_self) { cfd->printf("can't add node id %d -- that's us!\n", node_id); cfd->done(-2); return; } m_peers[node_id] = new bcluster_peer_t(this, ip, node_id); cfd->printf("peer %d added\n", node_id); cfd->done(0); } void bcluster_t::del_peer(ctrlfd_t *cfd, u32 ip, int node_id) { if (!m_peers[node_id]) { cfd->printf("node id %d does not exist\n", node_id); cfd->done(-2); return; } if (m_peers[node_id]->m_sin.sin_addr.s_addr != htonl(ip)) { cfd->printf("node %d has ip %s which does not match ip %s\n", node_id, iptostr(ntohl(m_peers[node_id]->m_sin.sin_addr.s_addr)), iptostr(ip)); cfd->done(-2); return; } delete m_peers[node_id]; m_peers[node_id] = NULL; cfd->printf("cluster peer %d deleted\n", node_id); cfd->done(0); } static void do_bcluster_add_del_peer(ctrlfd_t *cfd, char **argv, int add) { if (!g_cluster) { cfd->printf("error: bcluster create not yet performed\n"); cfd->done(-2); return; } if (!argv[0] || !argv[1] || argv[2]) { cfd->printf("%s-peer: too %s arguments\n", add ? "add" : "del", (!argv[0] || !argv[1]) ? "few" : "many"); cfd->done(-2); return; } unsigned long ip = strtoip(argv[0]); if (ip == (unsigned long)-1L) { cfd->printf("invalid ip address '%s'\n", argv[0]); cfd->done(-2); return; } int node_id = atoi(argv[1]); if (node_id < 0 || node_id >= MAX_BCLUSTER_PEERS) { cfd->printf("invalid node id %d/'%s'\n", node_id, argv[1]); cfd->done(-2); return; } if (add) g_cluster->add_peer(cfd, ip, node_id); else g_cluster->del_peer(cfd, ip, node_id); } static void do_bcluster_create(ctrlfd_t *cfd, char **argv) { if (!argv[0] || !argv[1] || !argv[2] || !argv[3] || argv[4]) { do_bcluster_usage(cfd); return; } if (g_cluster) { cfd->printf("cluster already created\n"); cfd->done(-2); return; } unsigned long ip = strtoip(argv[1]); if (ip == (unsigned long)-1L) { cfd->printf("invalid ip address '%s'\n", argv[1]); cfd->done(-2); return; } int node_id = atoi(argv[2]); if (node_id < 0 || node_id >= MAX_BCLUSTER_PEERS) { cfd->printf("bcluster create: invalid node id %d (%s)\n", node_id, argv[2]); cfd->done(-2); return; } int quorum = atoi(argv[3]); if (quorum < 1 || quorum > (MAX_BCLUSTER_PEERS/2 + 1)) { cfd->printf("bcluster create: invalid quorum count %d\n", quorum); cfd->done(-2); return; } g_cluster = new bcluster_t(argv[0], ip, node_id, quorum); cfd->printf("cluster created\n"); cfd->done(0); } static void do_bcluster_destroy(ctrlfd_t *cfd, char **argv) { if (argv[0]) { do_bcluster_usage(cfd); return; } if (!g_cluster) { cfd->printf("no cluster to destroy\n"); cfd->done(-2); return; } delete g_cluster; g_cluster = NULL; cfd->printf("cluster destroyed\n"); cfd->done(0); } const char *bcluster_peer_t::get_state(void) { static const char *peer_states[] = { "idle", "connecting", "hello", "active", }; return peer_states[m_state]; } const char *bcluster_t::get_state(void) { static const char *states[] = { "inactive", "awaiting-quorum", "active", "master", "slave", }; return states[m_cluster_state]; } void bcluster_t::status(ctrlfd_t *cfd) { cfd->printf("state: %s\n", get_state()); for (unsigned i=0; iprintf("node %d state %s\n", i, m_peers[i]->get_state()); } } cfd->done(0); } static void do_bcluster_status(ctrlfd_t *cfd, char **argv) { if (argv[0]) { do_bcluster_usage(cfd); return; } if (!g_cluster) { cfd->printf("not active\n"); cfd->done(0); return; } g_cluster->status(cfd); } static void do_bcluster_start(ctrlfd_t *cfd, char **argv) { if (argv[0]) { do_bcluster_usage(cfd); return; } if (!g_cluster) { cfd->printf("cluster not active\n"); cfd->done(-2); return; } g_cluster->start(cfd); } static void do_bcluster_ippool_print(ctrlfd_t *cfd, char **argv) { print_pool(cfd); } class bcluster_ippool_allocator_t : public ippool_requestor_t { public: ctrlfd_t *m_cfd; long m_nr, m_received; bcluster_ippool_allocator_t(ctrlfd_t *cfd, long nr) { m_cfd = cfd; m_nr = nr; } virtual void ippool_response(u32 ip) { m_cfd->printf("got ip %s\n", iptostr(ip)); if (!--m_nr) { m_cfd->done(0); delete this; } } }; static void do_bcluster_ippool_alloc(ctrlfd_t *cfd, char **argv) { bcluster_ippool_allocator_t *requestor = NULL; int use_cluster = 0; const char *username = NULL; while (argv[0]) { if (!strcmp(argv[0], "--cluster")) { use_cluster = 1; argv++; continue; } if (!strncmp(argv[0], "--user=", 7)) { if (username) goto usage; username = argv[0] + 7; if (!username[0]) goto usage; argv++; continue; } break; } if (!argv[0] || !argv[1] || argv[2]) { usage: do_bcluster_usage(cfd); return; } long nr = atoi(argv[0]); if (nr <= 0) { cfd->printf("invalid ip count\n"); cfd->done(-2); return; } if (use_cluster) requestor = new bcluster_ippool_allocator_t(cfd, nr); cfd->printf("requesting %ld ips from pool '%s'\n", nr, argv[1]); for (long i=0; iprintf("got ip %s\n", iptostr(ip)); } } if (!use_cluster) cfd->done(0); } void do_bcluster(ctrlfd_t *cfd, char *str) { if (*str == ' ') str++; char **argv = strtoargv(str); if (!argv[0]) goto usage; if (!strcmp(argv[0], "create")) do_bcluster_create(cfd, argv+1); else if (!strcmp(argv[0], "destroy")) do_bcluster_destroy(cfd, argv+1); else if (!strcmp(argv[0], "add-peer")) do_bcluster_add_del_peer(cfd, argv+1, 1); else if (!strcmp(argv[0], "del-peer")) do_bcluster_add_del_peer(cfd, argv+1, 0); else if (!strcmp(argv[0], "status")) do_bcluster_status(cfd, argv+1); else if (!strcmp(argv[0], "start")) do_bcluster_start(cfd, argv+1); else if (!strcmp(argv[0], "ippool-print")) do_bcluster_ippool_print(cfd, argv+1); else if (!strcmp(argv[0], "ippool-alloc")) do_bcluster_ippool_alloc(cfd, argv+1); else usage: do_bcluster_usage(cfd); free(argv); } void bcluster_peer_t::show_running_config(ctrlfd_t *cfd, int verbose) { cfd->printf("bcluster add-peer %s %d\n", iptostr(ntohl(m_sin.sin_addr.s_addr)), m_node_id); } void bcluster_t::show_running_config(ctrlfd_t *cfd, int verbose) { cfd->printf("bcluster create %s %s %d %d\n", m_name, iptostr(ntohl(m_sin.sin_addr.s_addr)), m_node_self, m_quorum); for (unsigned i=0; ishow_running_config(cfd, verbose); } } void bcluster_show_running_config(ctrlfd_t *cfd, int verbose) { if (!g_cluster) return; g_cluster->show_running_config(cfd, verbose); cfd->my_putchar('\n'); } void bcluster_t::start(ctrlfd_t *cfd) { const char *what; int one = 1; if (m_cluster_state != BCLUSTER_STATE_INACTIVE) { cfd->printf("failed: cluster already in state %s\n", get_state()); cfd->done(-2); return; } m_listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); what = "socket"; if (m_listen_fd < 0) goto common_err; what = "setsockopt"; if (setsockopt(m_listen_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) goto common_err; what = "bind"; if (bind(m_listen_fd, (struct sockaddr *)&m_sin, sizeof(m_sin)) < 0) goto common_err; what = "listen"; if (listen(m_listen_fd, MAX_BCLUSTER_PEERS) < 0) goto common_err; make_nonblock(m_listen_fd); SelectSetEvents(m_listen_fd, SEL_READ); for (unsigned i=0; itry_connect(); } cfd->printf("ready to accept connections\n"); cfd->done(0); return; common_err: cfd->perror(what); cfd->done(-2); if (m_listen_fd != -1) { close(m_listen_fd); m_listen_fd = -1; } } void bcluster_t::SelectEvent(int fd, SelectEventType event) { struct sockaddr_in sin; socklen_t len = sizeof(sin); int new_fd = accept(fd, (struct sockaddr *)&sin, &len); if (new_fd == -1) { perror("bcluster_t::SelectEvent - accept"); return; } for (unsigned i=0; iaccept_fd(new_fd, &sin)) return; } close(new_fd); fprintf(stderr, "bcluster_t: dropping incoming connection from unknown peer ip %s:%d\n", iptostr(ntohl(sin.sin_addr.s_addr)), ntohs(sin.sin_port)); } int bcluster_peer_t::accept_fd(int fd, struct sockaddr_in *sin) { if (sin->sin_addr.s_addr != m_sin.sin_addr.s_addr) { fprintf(stderr, "address mismatch\n"); return 0; } if (m_peer_fd != -1) { fprintf(stderr, "bcluster_peer_t::accept_fd: dropping\n"); close(fd); return 1; } m_peer_fd = fd; make_nonblock(fd); SelectSetEvents(m_peer_fd, SEL_READ); m_rx_buf_offset = 0; m_rx_buf_left = 0; fprintf(stderr, "bcluster_peer_t: node %d connection accepted\n", m_node_id); m_state = BCLUSTER_PEER_HELLO; send_hello(); return 1; } /* try_connect * Try to establish a TCP connection to a peer. */ void bcluster_peer_t::try_connect(void) { fprintf(stderr, "try_connect\n"); if (m_peer_fd != -1) return; if (m_state != BCLUSTER_PEER_IDLE) return; m_peer_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (m_peer_fd == -1) return; fprintf(stderr, "try_connect b\n"); make_nonblock(m_peer_fd); struct sockaddr_in sin = m_cluster->m_sin; sin.sin_port = 0; if (bind(m_peer_fd, (struct sockaddr *)&sin, sizeof(sin))) { perror("bind"); goto err; } fprintf(stderr, "sin.sin_addr.s_addr = %s port = %d\n", iptostr(ntohl(m_sin.sin_addr.s_addr)), ntohs(m_sin.sin_port)); if (connect(m_peer_fd, (struct sockaddr *)&m_sin, sizeof(m_sin)) && errno != EINPROGRESS) { perror("connect"); goto err; } fprintf(stderr, "try_connect c\n"); m_state = BCLUSTER_PEER_CONNECTING; SelectSetEvents(m_peer_fd, SEL_RW); return; err: close(m_peer_fd); m_peer_fd = -1; } void bcluster_peer_t::SelectEvent(int fd, SelectEventType event) { if (fd != m_peer_fd) return; if (event & SEL_WRITE) { if (m_state == BCLUSTER_PEER_CONNECTING) { m_state = BCLUSTER_PEER_HELLO; send_hello(); } SelectSetEvents(m_peer_fd, SEL_READ); } if (event & SEL_READ) { if (m_rx_buf_offset && !m_rx_buf_left) m_rx_buf_offset = 0; if (m_rx_buf_left && m_rx_buf_offset > PEER_BUFFER_SIZE/2) { memmove(m_rx_buf, &m_rx_buf[m_rx_buf_offset], m_rx_buf_left); m_rx_buf_offset = 0; } unsigned tail = m_rx_buf_offset + m_rx_buf_left; unsigned avail = PEER_BUFFER_SIZE - tail; int got = read(fd, &m_rx_buf[m_rx_buf_offset+m_rx_buf_left], avail); if (!got) { peer_down(); return; } if (got < 0) { perror("bcluster_peer_t::SelectEvent -- read"); } else { m_rx_buf_left += got; try_parse_rx_packet(); } } } void bcluster_peer_t::try_parse_rx_packet(void) { struct bcluster_packet_header *hdr; again: if (m_rx_buf_left < sizeof(struct bcluster_packet_header)) return; hdr = (struct bcluster_packet_header *)&m_rx_buf[m_rx_buf_offset]; if (hdr->magic != BCLUSTER_PACKET_MAGIC) { fprintf(stderr, "bcluster_peer_t: peer %d down: bad magic\n", m_node_id); peer_down(); return; } if (hdr->len > PEER_BUFFER_SIZE/2) { fprintf(stderr, "bcluster_peer_t: peer %d down: packet too long\n", m_node_id); peer_down(); return; } if (hdr->len > m_rx_buf_left) return; m_rx_buf_left -= hdr->len; m_rx_buf_offset += hdr->len; if (!parse_rx_packet((struct bcluster_packet *)hdr)) return; if (m_rx_buf_left && m_state != BCLUSTER_PEER_IDLE) goto again; } int bcluster_peer_t::parse_rx_packet(struct bcluster_packet *pkt) { #if 0 fprintf(stderr, "node %d peer:parse_rx_packet(%d)\n", m_cluster->m_node_self, pkt->hdr.type); #endif switch (pkt->hdr.type) { case BCLUSTER_PACKET_TYPE_HELLO: return rx_hello(pkt); case BCLUSTER_PACKET_TYPE_VOTE: return rx_vote(pkt); case BCLUSTER_PACKET_TYPE_MASTER_ASSERT: return rx_master_assert(pkt); case BCLUSTER_PACKET_TYPE_REQUEST: return rx_request(pkt); case BCLUSTER_PACKET_TYPE_RESPONSE: return rx_response(pkt); default: peer_down(); return 0; } } bcluster_req_master_t::bcluster_req_master_t(bcluster_t *cluster, struct bcluster_packet *pkt, bcluster_req_t *parent, bcluster_peer_t *peer) { m_parent_req = parent; m_cluster = cluster; m_id = m_cluster->m_my_request_id++; m_orig_id = pkt->u.request.id; m_peer = peer; m_next = NULL; if (m_cluster->m_cluster_reqs_last) m_cluster->m_cluster_reqs_last->m_next = this; m_cluster->m_cluster_reqs_last = this; if (!m_cluster->m_cluster_reqs) m_cluster->m_cluster_reqs = this; if (pkt->u.request.user[0]) lcp_probe_user(pkt->u.request.user); m_response = request_ip_address(pkt->u.request.pool, NULL); pkt->u.request.id = m_id; m_cluster->send_cluster_req_to_slaves(this, pkt); } int bcluster_peer_t::rx_request(struct bcluster_packet *pkt) { #if 0 fprintf(stderr, "node %d peer:rx_request id(%d)\n", m_cluster->m_node_self, pkt->u.request.id); #endif if (m_cluster->m_cluster_state == BCLUSTER_STATE_MASTER) { new bcluster_req_master_t(m_cluster, pkt, NULL, this); return 1; } bcluster_ippool_request(this, pkt); return 1; } int bcluster_peer_t::rx_response(struct bcluster_packet *pkt) { #if 0 fprintf(stderr, "node %d peer:rx_response id %d\n", m_cluster->m_node_self, pkt->u.response.id); #endif for (bcluster_req_t **reqp = &m_cluster->m_cluster_reqs; *reqp; reqp = &(*reqp)->m_next) { if ((*reqp)->m_id == pkt->u.response.id) { bcluster_req_t *req = *reqp; //*reqp = req->m_next; req->rx_response(this, pkt); return 1; } } *(char *)0 = 0; return 0; } int bcluster_peer_t::rx_hello(struct bcluster_packet *pkt) { if ((pkt->u.hello.sender_node_id != m_node_id) || (pkt->u.hello.receiver_node_id != m_cluster->m_node_self)) { fprintf(stderr, "bcluster_peer_t::rx_hello - node mismatch\n"); peer_down(); return 0; } memcpy(m_election_id, pkt->u.hello.sender_election_id, BCLUSTER_ELECTION_ID_BYTES); switch (m_state) { case BCLUSTER_PEER_CONNECTING: send_hello(); /* fall through */ case BCLUSTER_PEER_HELLO: m_state = BCLUSTER_PEER_ACTIVE; m_cluster->peer_transition(); return 1; default: fprintf(stderr, "rx_hello: out of state(%s)\n", get_state()); peer_down(); return 0; } } void bcluster_peer_t::peer_down(void) { SelectSetEvents(m_peer_fd, SEL_NONE); close(m_peer_fd); m_peer_fd = -1; switch (m_state) { case BCLUSTER_PEER_IDLE: break; case BCLUSTER_PEER_CONNECTING: case BCLUSTER_PEER_HELLO: m_state = BCLUSTER_PEER_IDLE; break; case BCLUSTER_PEER_ACTIVE: m_state = BCLUSTER_PEER_IDLE; m_cluster->peer_transition(); break; } } void bcluster_t::send_vote(void) { u8 *election_id = m_election_id; int node_id = m_node_self; for (unsigned i=0; im_election_id, election_id, BCLUSTER_ELECTION_ID_BYTES) < 0) { election_id = peer->m_election_id; node_id = i; } } m_our_vote = node_id; fprintf(stderr, "election vote selected peer %d\n", node_id); for (unsigned i=0; isend_vote(m_our_vote); } } void bcluster_t::peer_transition(void) { unsigned quorum = 0; for (unsigned i=0; im_state == BCLUSTER_PEER_ACTIVE) quorum++; } if (quorum >= m_quorum) { switch (m_cluster_state) { case BCLUSTER_STATE_INACTIVE: case BCLUSTER_STATE_AWAITING_QUORUM: m_cluster_state = BCLUSTER_STATE_ACTIVE; fprintf(stderr, "quorum acheived\n"); send_vote(); return; case BCLUSTER_STATE_ACTIVE: return; case BCLUSTER_STATE_MASTER: case BCLUSTER_STATE_SLAVE: return; } } else { switch (m_cluster_state) { case BCLUSTER_STATE_SLAVE: case BCLUSTER_STATE_MASTER: /* FIXME: lost mastership */ *(char *)0 = 0; /* fall throught */ case BCLUSTER_STATE_ACTIVE: m_cluster_state = BCLUSTER_STATE_AWAITING_QUORUM; fprintf(stderr, "quorum lost\n"); return; case BCLUSTER_STATE_AWAITING_QUORUM: return; case BCLUSTER_STATE_INACTIVE: return; } } } void bcluster_peer_t::send_pkt(struct bcluster_packet *pkt) { pkt->hdr.len = sizeof(*pkt); pkt->hdr.magic = BCLUSTER_PACKET_MAGIC; int len = write(m_peer_fd, pkt, pkt->hdr.len); if (len < 0) { if (errno == EPIPE) ; else perror("bcluster_peer_t::send_pkt: write"); } else if ((unsigned)len != pkt->hdr.len) { fprintf(stderr, "short write %d %d\n", len, pkt->hdr.len); } } void bcluster_peer_t::send_hello(void) { struct bcluster_packet pkt; memset(&pkt, 0, sizeof(pkt)); pkt.hdr.len = sizeof(pkt); pkt.hdr.type = BCLUSTER_PACKET_TYPE_HELLO; pkt.hdr.magic = BCLUSTER_PACKET_MAGIC; pkt.u.hello.sender_node_id = m_cluster->m_node_self; pkt.u.hello.receiver_node_id = m_node_id; memcpy(pkt.u.hello.sender_election_id, m_cluster->m_election_id, BCLUSTER_ELECTION_ID_BYTES); send_pkt(&pkt); } void bcluster_peer_t::send_vote(int node_id) { struct bcluster_packet pkt; memset(&pkt, 0, sizeof(pkt)); pkt.hdr.len = sizeof(pkt); pkt.hdr.type = BCLUSTER_PACKET_TYPE_VOTE; pkt.hdr.magic = BCLUSTER_PACKET_MAGIC; pkt.u.vote.vote_id = node_id; send_pkt(&pkt); } void bcluster_peer_t::send_master_assert(int node_id) { struct bcluster_packet pkt; memset(&pkt, 0, sizeof(pkt)); pkt.hdr.len = sizeof(pkt); pkt.hdr.type = BCLUSTER_PACKET_TYPE_MASTER_ASSERT; pkt.hdr.magic = BCLUSTER_PACKET_MAGIC; pkt.u.master_assert.master_id = node_id; send_pkt(&pkt); } int bcluster_peer_t::rx_vote(struct bcluster_packet *pkt) { switch (m_state) { case BCLUSTER_PEER_ACTIVE: m_vote = pkt->u.vote.vote_id; m_cluster->maybe_assert_mastership(); return 1; default: fprintf(stderr, "rx_hello: out of state(%s)\n", get_state()); peer_down(); return 0; } } void bcluster_t::maybe_assert_mastership(void) { if (m_cluster_state != BCLUSTER_STATE_ACTIVE) { /* FIXME? */ fprintf(stderr, "maybe_assert_mastership in state '%s'\n", get_state()); return; } if (m_our_vote == -1) { fprintf(stderr, "maybe_assert_mastership: no vote active\n"); /* FIXME? */ return; } unsigned count = 0; if (m_our_vote == m_node_self) count++; for (unsigned i=0; im_vote == m_node_self) count++; } if (count >= m_quorum) { fprintf(stderr, "I (%d) AM MASTER\n", m_node_self); assert_mastership(); } } void bcluster_t::assert_mastership(void) { if (m_cluster_state != BCLUSTER_STATE_ACTIVE) *(char *)0 = 0; m_cluster_state = BCLUSTER_STATE_MASTER; m_master = m_node_self; for (unsigned i=0; isend_master_assert(m_node_self); } } int bcluster_peer_t::rx_master_assert(struct bcluster_packet *pkt) { switch (m_state) { case BCLUSTER_PEER_ACTIVE: m_cluster->master_asserted(pkt->u.master_assert.master_id); return 1; default: fprintf(stderr, "rx_master_assert: out of state(%s)\n", get_state()); peer_down(); return 0; } } void bcluster_t::master_asserted(int id) { m_master = id; m_cluster_state = BCLUSTER_STATE_SLAVE; } void bcluster_t::send_cluster_req_to_slaves(bcluster_req_t *req, struct bcluster_packet *pkt) { req->m_peer_mask = 0; for (unsigned i=0; im_state != BCLUSTER_PEER_ACTIVE) continue; req->m_peer_mask |= 1U << i; peer->send_pkt(pkt); } } void bcluster_t::cluster_req(bcluster_req_t *req) { struct bcluster_packet pkt; memset(&pkt, 0, sizeof(pkt)); pkt.hdr.magic = BCLUSTER_PACKET_MAGIC; pkt.hdr.type = BCLUSTER_PACKET_TYPE_REQUEST; req->m_id = m_my_request_id++; if (req->m_cluster) *(char *)0 = 0; switch(m_cluster_state) { case BCLUSTER_STATE_MASTER: req->build_pkt(&pkt); //req->master_get_response(); /* FIXME: threading issues */ //send_cluster_req_to_slaves(req, &pkt); new bcluster_req_master_t(this, &pkt, req, NULL); break; case BCLUSTER_STATE_SLAVE: if (m_master == -1) *(char *)0 = 0; req->m_cluster = this; if (m_cluster_reqs_last) m_cluster_reqs_last->m_next = req; m_cluster_reqs_last = req; req->m_next = NULL; if (!m_cluster_reqs) m_cluster_reqs = req; req->build_pkt(&pkt); m_peers[m_master]->send_pkt(&pkt); break; default: /* FIXME -- queue packet, state state machine */ *(char *)0 = 0; } } bcluster_req_t::bcluster_req_t() { m_cluster = NULL; return; } bcluster_req_t::~bcluster_req_t() { if (m_cluster) { bcluster_req_t **reqp, *prev = NULL; for (reqp = &m_cluster->m_cluster_reqs; *reqp; reqp = &(*reqp)->m_next) { if (*reqp == this) { *reqp = m_next; goto out; } prev = *reqp; } *(char *)0 = 0; out: if (this == m_cluster->m_cluster_reqs) m_cluster->m_cluster_reqs = m_next; if (this == m_cluster->m_cluster_reqs_last) m_cluster->m_cluster_reqs_last = prev; ; } } void bcluster_req_t::build_pkt(struct bcluster_packet *pkt) { pkt->u.request.id = m_id; return; } void bcluster_req_master_t::rx_response(bcluster_peer_t *peer, struct bcluster_packet *pkt) { if (pkt->u.response.ip != m_response) *(char *)0 = 0; if (!(m_peer_mask & (1U << peer->m_node_id))) *(char *)0 = 0; /* FIXME */ m_peer_mask &= ~(1U << peer->m_node_id); if (m_peer_mask) return; /* still awaiting responses */ if (m_parent_req) m_parent_req->rx_response(NULL, pkt); else { pkt->u.response.id = m_orig_id; m_peer->send_pkt(pkt); } delete this; }