#include "config.h" #include "ctrlfd.h" #include "selectops.h" #include #define BCLUSTER_PORT 12643 #define MAX_BCLUSTER_PEERS 5 class bcluster_t; class bcluster_req_t; struct sockaddr_in; enum bcluster_peer_state { BCLUSTER_PEER_IDLE, BCLUSTER_PEER_CONNECTING, BCLUSTER_PEER_HELLO, BCLUSTER_PEER_ACTIVE, }; #define BCLUSTER_PACKET_TYPE_HELLO 1 #define BCLUSTER_PACKET_TYPE_VOTE 2 #define BCLUSTER_PACKET_TYPE_MASTER_ASSERT 3 #define BCLUSTER_PACKET_TYPE_REQUEST 4 #define BCLUSTER_PACKET_TYPE_RESPONSE 5 struct bcluster_packet_header { u32 len; u32 type; #define BCLUSTER_PACKET_MAGIC 0x123456981243821aULL u64 magic; }; #define BCLUSTER_ELECTION_ID_BYTES 16 struct bcluster_hello_pkt { u8 sender_node_id; u8 receiver_node_id; u8 sender_election_id[BCLUSTER_ELECTION_ID_BYTES]; }; struct bcluster_vote_pkt { u8 vote_id; }; struct bcluster_master_assert_pkt { u8 master_id; }; enum bcluster_req_type_t { BCLUSTER_REQ_IPPOOL_REQ, }; struct bcluster_request_pkt { enum bcluster_req_type_t request_type; unsigned id; char pool[8]; char user[32]; }; struct bcluster_response_pkt { enum bcluster_req_type_t request_type; unsigned id; unsigned ip; }; struct bcluster_packet { struct bcluster_packet_header hdr; union { struct bcluster_hello_pkt hello; struct bcluster_vote_pkt vote; struct bcluster_master_assert_pkt master_assert; struct bcluster_request_pkt request; struct bcluster_response_pkt response; } u; }; #define PEER_BUFFER_SIZE 65536 class bcluster_peer_t : public SelectEventHandler { public: bcluster_t *m_cluster; struct sockaddr_in m_sin; int m_node_id; int m_peer_fd; int m_vote; bcluster_peer_state m_state; u8 m_election_id[BCLUSTER_ELECTION_ID_BYTES]; unsigned m_rx_buf_offset; unsigned m_rx_buf_left; u8 m_rx_buf[PEER_BUFFER_SIZE]; bcluster_peer_t(bcluster_t *parent, u32 ip, int node_id); ~bcluster_peer_t(); void show_running_config(ctrlfd_t *cfd, int verbose); const char *get_state(void); int accept_fd(int fd, struct sockaddr_in *sin); void peer_down(void); void try_connect(void); virtual void SelectEvent(int fd, SelectEventType event); void try_parse_rx_packet(void); int parse_rx_packet(struct bcluster_packet *pkt); int rx_hello(struct bcluster_packet *pkt); int rx_vote(struct bcluster_packet *pkt); int rx_master_assert(struct bcluster_packet *pkt); int rx_request(struct bcluster_packet *pkt); int rx_response(struct bcluster_packet *pkt); void send_hello(void); void send_vote(int node_id); void send_master_assert(int node_id); void send_pkt(struct bcluster_packet *pkt); }; enum bcluster_state { BCLUSTER_STATE_INACTIVE, BCLUSTER_STATE_AWAITING_QUORUM, BCLUSTER_STATE_ACTIVE, BCLUSTER_STATE_MASTER, BCLUSTER_STATE_SLAVE, }; class bcluster_t : public SelectEventHandler { public: bcluster_t(char *name, u32 ip, int node_id, int quorum); ~bcluster_t(); void add_peer(ctrlfd_t *cfd, u32 ip, int node_id); void del_peer(ctrlfd_t *cfd, u32 ip, int node_id); const char *get_state(void); void status(ctrlfd_t *cfd); void start(ctrlfd_t *cfd); void show_running_config(ctrlfd_t *cfd, int verbose); virtual void SelectEvent(int fd, SelectEventType event); bcluster_peer_t *m_peers[MAX_BCLUSTER_PEERS]; char *m_name; int m_node_self; int m_our_vote; int m_master; unsigned m_quorum; enum bcluster_state m_cluster_state; struct sockaddr_in m_sin; unsigned m_my_request_id; int m_listen_fd; u8 m_election_id[BCLUSTER_ELECTION_ID_BYTES]; void peer_transition(void); void send_vote(void); void maybe_assert_mastership(void); void assert_mastership(void); void master_asserted(int id); void cluster_req(bcluster_req_t *req); void send_cluster_req_to_slaves(bcluster_req_t *req, struct bcluster_packet *pkt); bcluster_req_t *m_cluster_reqs; bcluster_req_t *m_cluster_reqs_last; }; class bcluster_req_t { public: bcluster_t *m_cluster; bcluster_req_t *m_next; unsigned m_id; unsigned m_response; unsigned m_peer_mask; bcluster_req_t(); virtual ~bcluster_req_t(); virtual void build_pkt(struct bcluster_packet *pkt); virtual void rx_response(bcluster_peer_t *peer, struct bcluster_packet *pkt) { return; } virtual void master_get_response(void) { return; } }; class bcluster_req_master_t : public bcluster_req_t { public: unsigned m_orig_id; bcluster_req_t *m_parent_req; bcluster_peer_t *m_peer; bcluster_req_master_t(bcluster_t *cluster, struct bcluster_packet *pkt, bcluster_req_t *parent_req, bcluster_peer_t *peer); virtual ~bcluster_req_master_t() { return; }; virtual void rx_response(bcluster_peer_t *peer, struct bcluster_packet *pkt); }; extern bcluster_t *g_cluster; extern void bcluster_ippool_request(bcluster_peer_t *peer, struct bcluster_packet *pkt); extern void bcluster_ippool_response(bcluster_peer_t *peer, struct bcluster_packet *pkt);