Yangming's Blog

beware the barrenness of a busy life

Pgbouncer添加HAProxyProtocol特性

06 Mar 2019 » Network, Pgbouncer

HAProxy Protocol

在系统架构中,我们常会用到的一个组件就是haproxy。但是使用这个组件之后,我们希望对于上下游的组件来说这个ha是透明的,即,后端能够得知前端的信息,前端也能得知到后端的信息。这需要后端和前端组件支持了Proxy Protocol协议;比如Nginx中就支持Proxy Protocol协议。

如果我们自己写的后端需要了解前端的信息,那么同样需要支持该协议。该协议分为有两个版本,这里使用的测试的是第二版。测试后端实现haproxy后来解析前端的IP信息。其实在这里已经提供了一个解析函数样例read_evt,本文就是基于该函数,对accept函数进行了一次简单封装。完整的实验了一下Proxy 协议,以供同样需求的人参考。

haproxy配置

global
    nbproc 1
    stats bind-process 1
    stats socket /home/liuyangming/stats-1
    maxconn 48000

defaults
    mode tcp
    timeout connect 1s
    timeout client 4h
    timeout server 4h

frontend tcp-client
    bind 10.189.3.53:8080
    default_backend echo

backend echo
    balance leastconn
    #server localhost-tcpserver-1 127.0.0.1:8081 maxconn 12000 maxqueue 3000 check send-proxy-v2 check-send-proxy
    server localhost-tcpserver-1 127.0.0.1:8081 maxconn 12000 maxqueue 3000 send-proxy-v2

注意这里将haproxy的check关闭了,否则需要区分haproxy的地址。

测试

my_accept

在系统函数上加了点封装,主要就是在accpet的时候,已经有个haproxy的ip信息;在accept之后,再次调用recv将解析中的前端信息覆盖到我们原来的from结构。

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define BUFFER_SIZE 1024
#define on_error(...) { fprintf(stderr, __VA_ARGS__); fflush(stderr); exit(1); }
typedef union {
    struct {
        char line[108];
    } v1;
    struct {
        uint8_t sig[12];
        uint8_t ver_cmd;
        uint8_t fam;
        uint16_t len;
        union {
            struct {  /* for TCP/UDP over IPv4, len = 12 */
                uint32_t src_addr;
                uint32_t dst_addr;
                uint16_t src_port;
                uint16_t dst_port;
            } ip4;
            struct {  /* for TCP/UDP over IPv6, len = 36 */
                uint8_t  src_addr[16];
                uint8_t  dst_addr[16];
                uint16_t src_port;
                uint16_t dst_port;
            } ip6;
            struct {  /* for AF_UNIX sockets, len = 216 */
                uint8_t src_addr[108];
                uint8_t dst_addr[108];
            } unx;
        } addr;
    } v2;
} Header;

const char v2sig[12] = "\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A";

/* returns
 * =0 if needs to poll
 * <0 upon error
 * >0 if it did the job
 */
int my_accept(int fd, struct sockaddr *from, socklen_t *from_len)
{
    // get one connection
    int client_fd = -1;
    while (1) {
        client_fd = accept(fd, from, from_len);
        if (client_fd <= 0) {
            if (errno == EAGAIN) {
                continue;
            }
            printf("%d \n", errno);
            return client_fd;
        } else {
            break;
        }
    }

    Header hdr;
    int size, ret;
    do {
        ret = recv(client_fd, &hdr, sizeof(hdr), MSG_PEEK);
    } while (ret == -1 && errno == EINTR);

    if (ret == -1)
        return (errno == EAGAIN) ? 0 : -1;

    if (ret >= 16 && memcmp(&hdr.v2, v2sig, 12) == 0 &&
            (hdr.v2.ver_cmd & 0xF0) == 0x20) {
        size = 16 + ntohs(hdr.v2.len);
        if (ret < size)
            return -1; /* truncated or too large header */

        switch (hdr.v2.ver_cmd & 0xF) {
            case 0x01: /* PROXY command */
                switch (hdr.v2.fam) {
                    case 0x11:  /* TCPv4 */
                        ((struct sockaddr_in *)from)->sin_family = AF_INET;
                        ((struct sockaddr_in *)from)->sin_addr.s_addr =
                            hdr.v2.addr.ip4.src_addr;
                        printf("v2 ipv4 ip: %s\n", inet_ntoa(((struct sockaddr_in *)from)->sin_addr));
                        ((struct sockaddr_in *)from)->sin_port =
                            hdr.v2.addr.ip4.src_port;
                        goto done;
                    case 0x21:  /* TCPv6 */
                        ((struct sockaddr_in6 *)from)->sin6_family = AF_INET6;
                        memcpy(&((struct sockaddr_in6 *)from)->sin6_addr,
                                hdr.v2.addr.ip6.src_addr, 16);
                        ((struct sockaddr_in6 *)from)->sin6_port =
                            hdr.v2.addr.ip6.src_port;
                        goto done;
                }
                /* unsupported protocol, keep local connection address */
                break;
            case 0x00: /* LOCAL command */
                /* keep local connection address for LOCAL */
                printf("v2 local\n");
                break;
            default:
                printf("default\n");
                return -1; /* not a supported command */
        }
    }
    else if (ret >= 8 && memcmp(hdr.v1.line, "PROXY", 5) == 0) {
        char *end = (char*)memchr(hdr.v1.line, '\r', ret - 1);
        if (!end || end[1] != '\n')
            return -1; /* partial or invalid header */
        *end = '\0'; /* terminate the string to ease parsing */
        size = end + 2 - hdr.v1.line; /* skip header + CRLF */
        /* parse the V1 header using favorite address parsers like inet_pton.
         * return -1 upon error, or simply fall through to accept.
         */
    }
    else {/* Wrong protocol */
        return -1;
    }

done:
    do { /* we need to consume the appropriate amount of data from the socket */
        ret = recv(client_fd, &hdr, size, 0);
    } while (ret == -1 && errno == EINTR);
    return (ret >= 0) ? client_fd : -1;
}
int main (int argc, char *argv[]) {
    if (argc < 2) on_error("Usage: %s [port]\n", argv[0]);
    int port = atoi(argv[1]);

    int server_fd, err;
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd < 0) on_error("Could not create socket\n");
    int opt_val = 1;
    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt_val, sizeof opt_val);

    struct sockaddr_in server;
    server.sin_family = AF_INET;
    server.sin_port = htons(port);
    server.sin_addr.s_addr = htonl(INADDR_ANY);
    err = bind(server_fd, (struct sockaddr *) &server, sizeof(server));
    if (err < 0) on_error("Could not bind socket\n");

    err = listen(server_fd, 128);
    if (err < 0) on_error("Could not listen on socket\n");
    printf("Server is listening on %d\n", port);

    struct sockaddr_storage from;
    int from_len = sizeof(from);
    int client_fd = my_accept(server_fd, (struct sockaddr *)&from, &from_len);
    if (client_fd < 0) on_error("Could not establish new connection\n");

    printf("accept msg from: %s\n",inet_ntoa(((struct sockaddr_in *)&from)->sin_addr));

    char buf[BUFFER_SIZE];
    while (1) {
        int read = recv(client_fd, buf, BUFFER_SIZE, 0);

        if (!read) break; // done reading
        if (read < 0) printf("Client read failed\n");

        err = send(client_fd, buf, read, 0);
        if (err < 0) on_error("Client write failed\n");
    }

    return 0;
}

结果

import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('10.189.3.53', 8080))
while True:
    msg = raw_input("next msg: ")
    client.send(msg)
    response = client.recv(4096)
    print response

image-20190312171300960

Pgbouncers

原来建立这个项目的原因是,在高负载的应用中,单进程的Pgbouncer的CPU会成为系统瓶颈。后期打算将Pgbouncer改成多进程的。现在先在Pgbouncer中加了HAproxyProtocol支持,透传clientIP。

项目地址:https://github.com/layamon/pgbouncers

主要就是添加了一个参数cf_ha_proxy以及一个函数safe_accept_proxy:

loop:
	if (cf_ha_proxy) {
		fd = safe_accept_proxy(sock, &raddr.sa, &len);
	} else {
		fd = safe_accept(sock, &raddr.sa, &len);
	}

safe_accept_proxy这个函数中在accept之后接收数据,有时候会一开始就Resource temporarily unavailable。这里暂时添加了一个循环,等待数据全部接收并解析后,才进行下一步。

getproxyinfo:
    ret = safe_recv(client_fd, &hdr, sizeof(hdr), MSG_PEEK);
    if (ret < 0 && errno == EAGAIN)
		goto getproxyinfo;