HAProxy Protocol
在系统架构中,我们常会用到的一个组件就是haproxy
。但是使用这个组件之后,我们希望对于上下游的组件来说这个ha是透明的,即,后端能够得知前端的信息,前端也能得知到后端的信息。这需要后端和前端组件支持了Proxy Protocol协议;比如Nginx中就支持Proxy Protocol协议。
如果我们自己写的后端需要了解前端的信息,那么同样需要支持该协议。该协议分为有两个版本,这里使用的测试的是第二版。测试后端实现haproxy后来解析前端的IP信息。其实在这里已经提供了一个解析函数样例read_evt
,本文就是基于该函数,对accept函数进行了一次简单封装。完整的实验了一下Proxy 协议,以供同样需求的人参考。
haproxy配置
global
nbproc 1
stats bind-process 1
stats socket /home/liuyangming/stats-1
maxconn 48000
defaults
mode tcp
timeout connect 1s
timeout client 4h
timeout server 4h
frontend tcp-client
bind 10.189.3.53:8080
default_backend echo
backend echo
balance leastconn
#server localhost-tcpserver-1 127.0.0.1:8081 maxconn 12000 maxqueue 3000 check send-proxy-v2 check-send-proxy
server localhost-tcpserver-1 127.0.0.1:8081 maxconn 12000 maxqueue 3000 send-proxy-v2
注意这里将haproxy的check关闭了,否则需要区分haproxy的地址。
测试
my_accept
在系统函数上加了点封装,主要就是在accpet的时候,已经有个haproxy的ip信息;在accept之后,再次调用recv
将解析中的前端信息覆盖到我们原来的from
结构。
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define BUFFER_SIZE 1024
#define on_error(...) { fprintf(stderr, __VA_ARGS__); fflush(stderr); exit(1); }
typedef union {
struct {
char line[108];
} v1;
struct {
uint8_t sig[12];
uint8_t ver_cmd;
uint8_t fam;
uint16_t len;
union {
struct { /* for TCP/UDP over IPv4, len = 12 */
uint32_t src_addr;
uint32_t dst_addr;
uint16_t src_port;
uint16_t dst_port;
} ip4;
struct { /* for TCP/UDP over IPv6, len = 36 */
uint8_t src_addr[16];
uint8_t dst_addr[16];
uint16_t src_port;
uint16_t dst_port;
} ip6;
struct { /* for AF_UNIX sockets, len = 216 */
uint8_t src_addr[108];
uint8_t dst_addr[108];
} unx;
} addr;
} v2;
} Header;
const char v2sig[12] = "\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A";
/* returns
* =0 if needs to poll
* <0 upon error
* >0 if it did the job
*/
int my_accept(int fd, struct sockaddr *from, socklen_t *from_len)
{
// get one connection
int client_fd = -1;
while (1) {
client_fd = accept(fd, from, from_len);
if (client_fd <= 0) {
if (errno == EAGAIN) {
continue;
}
printf("%d \n", errno);
return client_fd;
} else {
break;
}
}
Header hdr;
int size, ret;
do {
ret = recv(client_fd, &hdr, sizeof(hdr), MSG_PEEK);
} while (ret == -1 && errno == EINTR);
if (ret == -1)
return (errno == EAGAIN) ? 0 : -1;
if (ret >= 16 && memcmp(&hdr.v2, v2sig, 12) == 0 &&
(hdr.v2.ver_cmd & 0xF0) == 0x20) {
size = 16 + ntohs(hdr.v2.len);
if (ret < size)
return -1; /* truncated or too large header */
switch (hdr.v2.ver_cmd & 0xF) {
case 0x01: /* PROXY command */
switch (hdr.v2.fam) {
case 0x11: /* TCPv4 */
((struct sockaddr_in *)from)->sin_family = AF_INET;
((struct sockaddr_in *)from)->sin_addr.s_addr =
hdr.v2.addr.ip4.src_addr;
printf("v2 ipv4 ip: %s\n", inet_ntoa(((struct sockaddr_in *)from)->sin_addr));
((struct sockaddr_in *)from)->sin_port =
hdr.v2.addr.ip4.src_port;
goto done;
case 0x21: /* TCPv6 */
((struct sockaddr_in6 *)from)->sin6_family = AF_INET6;
memcpy(&((struct sockaddr_in6 *)from)->sin6_addr,
hdr.v2.addr.ip6.src_addr, 16);
((struct sockaddr_in6 *)from)->sin6_port =
hdr.v2.addr.ip6.src_port;
goto done;
}
/* unsupported protocol, keep local connection address */
break;
case 0x00: /* LOCAL command */
/* keep local connection address for LOCAL */
printf("v2 local\n");
break;
default:
printf("default\n");
return -1; /* not a supported command */
}
}
else if (ret >= 8 && memcmp(hdr.v1.line, "PROXY", 5) == 0) {
char *end = (char*)memchr(hdr.v1.line, '\r', ret - 1);
if (!end || end[1] != '\n')
return -1; /* partial or invalid header */
*end = '\0'; /* terminate the string to ease parsing */
size = end + 2 - hdr.v1.line; /* skip header + CRLF */
/* parse the V1 header using favorite address parsers like inet_pton.
* return -1 upon error, or simply fall through to accept.
*/
}
else {/* Wrong protocol */
return -1;
}
done:
do { /* we need to consume the appropriate amount of data from the socket */
ret = recv(client_fd, &hdr, size, 0);
} while (ret == -1 && errno == EINTR);
return (ret >= 0) ? client_fd : -1;
}
int main (int argc, char *argv[]) {
if (argc < 2) on_error("Usage: %s [port]\n", argv[0]);
int port = atoi(argv[1]);
int server_fd, err;
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd < 0) on_error("Could not create socket\n");
int opt_val = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt_val, sizeof opt_val);
struct sockaddr_in server;
server.sin_family = AF_INET;
server.sin_port = htons(port);
server.sin_addr.s_addr = htonl(INADDR_ANY);
err = bind(server_fd, (struct sockaddr *) &server, sizeof(server));
if (err < 0) on_error("Could not bind socket\n");
err = listen(server_fd, 128);
if (err < 0) on_error("Could not listen on socket\n");
printf("Server is listening on %d\n", port);
struct sockaddr_storage from;
int from_len = sizeof(from);
int client_fd = my_accept(server_fd, (struct sockaddr *)&from, &from_len);
if (client_fd < 0) on_error("Could not establish new connection\n");
printf("accept msg from: %s\n",inet_ntoa(((struct sockaddr_in *)&from)->sin_addr));
char buf[BUFFER_SIZE];
while (1) {
int read = recv(client_fd, buf, BUFFER_SIZE, 0);
if (!read) break; // done reading
if (read < 0) printf("Client read failed\n");
err = send(client_fd, buf, read, 0);
if (err < 0) on_error("Client write failed\n");
}
return 0;
}
结果
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('10.189.3.53', 8080))
while True:
msg = raw_input("next msg: ")
client.send(msg)
response = client.recv(4096)
print response
Pgbouncers
原来建立这个项目的原因是,在高负载的应用中,单进程的Pgbouncer的CPU会成为系统瓶颈。后期打算将Pgbouncer改成多进程的。现在先在Pgbouncer中加了HAproxyProtocol支持,透传clientIP。
项目地址:https://github.com/layamon/pgbouncers
主要就是添加了一个参数cf_ha_proxy
以及一个函数safe_accept_proxy
:
loop:
if (cf_ha_proxy) {
fd = safe_accept_proxy(sock, &raddr.sa, &len);
} else {
fd = safe_accept(sock, &raddr.sa, &len);
}
safe_accept_proxy
这个函数中在accept之后接收数据,有时候会一开始就Resource temporarily unavailable。这里暂时添加了一个循环,等待数据全部接收并解析后,才进行下一步。
getproxyinfo:
ret = safe_recv(client_fd, &hdr, sizeof(hdr), MSG_PEEK);
if (ret < 0 && errno == EAGAIN)
goto getproxyinfo;