月度存档: 6 月 2019

[原创]K8s中搭建自动恢复的redis集群

参考 https://www.jianshu.com/p/65c4baadf5d9

但是这个方案并不完美,存在问题:

pod重启后pod ip已经改变,而redis配置nodes.conf中的ip并没有变。如果单个节点down掉后重启,集群是可以恢复的,如果存在一半以上节点down的情况下,比如k8s集群重启,redis集群是不能恢复的。

如何在集群整体重启情况下自动恢复?

redis是依赖nodes.conf配置节点信息,从而相互通信。因此我们只要保证nodes.conf能更新成新的pod ip就可以了。

步骤:

pod启动后向一个所有redis实例都能访问的地方写入一个redisid和ip的对应关系。同时对nodes.conf中所有redisid,去查找对应的ip,如果能ping通(或者其他健康检测方式)则认为ip有效,更新nodes.conf。直到所有ip都在线后,启动redis实例。

思路就是这样了,下面是具体实现并贴代码:

修改redis-cluster.yaml,增加公共访问区域:

比如我这里是nfs:挂载到/sh,sh下添加start.sh和checkip.sh,由于redis镜像中没有ping工具,偷懒起见手动copy ping、libcap.so.2、libcrypto.so.10、libidn.so.11到挂载目录下,并在start.sh中指定LD_LIBRARY_PATH。

start.sh

#!/bin/bash

export LD_LIBRARY_PATH=/sh:$LD_LIBRARY_PATH

newip=`cat /etc/hosts|grep redis-app|awk '{print $1}'`
myid=`cat /var/lib/redis/nodes.conf |grep myself|awk '{print $1}'`
if [ "$newip"x == ""x ]; then
    echo "Cannot find new ip";
    exit 1;
elif [ "$myid"x == ""x ]; then
    echo "no myid";
    exit 1;
else
    echo $newip > /sh/$myid 
    echo "refresh ip: $myid -> $newip";
fi

echo "check nodes.conf"
cat /var/lib/redis/nodes.conf|grep -E "master|slave"|awk '{print $1}'|xargs -i /sh/checkip.sh {}

if [ $? -eq 0 ]; then
    echo "done nodes.conf"
    redis-server /etc/redis/redis.conf --protected-mode no
else
    echo "abort on error"
fi

checkip.sh

#!/bin/bash

if [ $# -ne 1 ]; then
    exit 1;
fi

while :
do
    echo "while";
    chkip=`cat /sh/$1`
    if [ "$chkip"x == ""x ]; then
        sleep 1s;
    else
        /sh/ping -c1 $chkip;
        if [ $? -eq 0 ]; then
            oldip=`cat /var/lib/redis/nodes.conf |grep -E "^$1"|awk '{print $2}'|cut -d ":" -f1`
            if [ "$oldip"x == ""x ]; then
                echo "no old ip";
                exit 1;
            else
                echo "oldip=$oldip and newip=$chkip"
                sed -i "s/$oldip/$chkip/g" /var/lib/redis/nodes.conf
                echo "done $1 $chkip";
                exit 0;
            fi
        else
            sleep 1s;
        fi
    fi
done

最后把pod的启动脚本改为/sh/start.sh,更新pod:

kubectl apply -f redis-stateful.yaml

测试:把redis-stateful整体移除,并重新create,集群正常恢复,以下是pod日志:

refresh ip: 2d05fb2406a254f08664b5ff5d26a151b5b262cc -> 10.244.4.68
check nodes.conf
while
PING 10.244.4.68 (10.244.4.68) 56(84) bytes of data.
64 bytes from 10.244.4.68: icmp_seq=1 ttl=64 time=0.072 ms

--- 10.244.4.68 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.072/0.072/0.072/0.000 ms
oldip=10.244.4.66 and newip=10.244.4.68
done 2d05fb2406a254f08664b5ff5d26a151b5b262cc 10.244.4.68
while
PING 10.244.1.89 (10.244.1.89) 56(84) bytes of data.

--- 10.244.1.89 ping statistics ---
1 packets transmitted, 0 received, 100% packet loss, time 0ms

while
PING 10.244.1.89 (10.244.1.89) 56(84) bytes of data.

--- 10.244.1.89 ping statistics ---
1 packets transmitted, 0 received, 100% packet loss, time 0ms

while
PING 10.244.1.91 (10.244.1.91) 56(84) bytes of data.
64 bytes from 10.244.1.91: icmp_seq=1 ttl=62 time=1.86 ms

--- 10.244.1.91 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 1.861/1.861/1.861/0.000 ms
oldip=10.244.1.89 and newip=10.244.1.91
done 1665fb20c43e7468c54bbdea7ed6e283283669df 10.244.1.91
while
PING 10.244.2.135 (10.244.2.135) 56(84) bytes of data.
64 bytes from 10.244.2.135: icmp_seq=1 ttl=62 time=1.87 ms

--- 10.244.2.135 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 1.870/1.870/1.870/0.000 ms
oldip=10.244.2.133 and newip=10.244.2.135
done 7a17b9c2f70a053f98fb480492a8e904d330f9ac 10.244.2.135
while
PING 10.244.1.90 (10.244.1.90) 56(84) bytes of data.
64 bytes from 10.244.1.90: icmp_seq=1 ttl=62 time=1.96 ms

--- 10.244.1.90 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 1.967/1.967/1.967/0.000 ms
oldip=10.244.1.88 and newip=10.244.1.90
done 12cf9455a800f940f427c745318d1300a6730103 10.244.1.90
while
PING 10.244.4.69 (10.244.4.69) 56(84) bytes of data.
64 bytes from 10.244.4.69: icmp_seq=1 ttl=64 time=0.106 ms

--- 10.244.4.69 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.106/0.106/0.106/0.000 ms
oldip=10.244.4.67 and newip=10.244.4.69
done d1e352d917cb9dfd5be2b053ef34473e11c7ea23 10.244.4.69
while
PING 10.244.2.136 (10.244.2.136) 56(84) bytes of data.
64 bytes from 10.244.2.136: icmp_seq=1 ttl=62 time=1.89 ms

--- 10.244.2.136 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 1.899/1.899/1.899/0.000 ms
oldip=10.244.2.134 and newip=10.244.2.136
done 283aa9be8d0d4b25bfb79cf0a7eb084284b4f44d 10.244.2.136
done nodes.conf
78:C 04 Jul 2019 08:54:28.879 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
78:C 04 Jul 2019 08:54:28.879 # Redis version=5.0.4, bits=64, commit=00000000, modified=0, pid=78, just started
78:C 04 Jul 2019 08:54:28.879 # Configuration loaded
78:M 04 Jul 2019 08:54:28.887 * Node configuration loaded, I'm 2d05fb2406a254f08664b5ff5d26a151b5b262cc
78:M 04 Jul 2019 08:54:28.888 * Running mode=cluster, port=6379.
78:M 04 Jul 2019 08:54:28.888 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
78:M 04 Jul 2019 08:54:28.888 # Server initialized
78:M 04 Jul 2019 08:54:28.888 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
78:M 04 Jul 2019 08:54:28.890 * Reading RDB preamble from AOF file...
78:M 04 Jul 2019 08:54:28.890 * Reading the remaining AOF tail...
78:M 04 Jul 2019 08:54:28.892 * DB loaded from append only file: 0.004 seconds
78:M 04 Jul 2019 08:54:28.892 * Ready to accept connections
78:M 04 Jul 2019 08:54:28.899 * Clear FAIL state for node 283aa9be8d0d4b25bfb79cf0a7eb084284b4f44d: replica is reachable again.
78:M 04 Jul 2019 08:54:32.367 * Clear FAIL state for node 1665fb20c43e7468c54bbdea7ed6e283283669df: replica is reachable again.
78:M 04 Jul 2019 08:54:32.462 * Clear FAIL state for node d1e352d917cb9dfd5be2b053ef34473e11c7ea23: replica is reachable again.
78:M 04 Jul 2019 08:54:33.316 * Replica 10.244.1.91:6379 asks for synchronization
78:M 04 Jul 2019 08:54:33.316 * Partial resynchronization not accepted: Replication ID mismatch (Replica asked for 'ce61d735f3070e58c8d1eef8d705416e1dec10d8', my replication IDs are 'b726060355dca8aa11c4bf4c47e29d4423347021' and '0000000000000000000000000000000000000000')
78:M 04 Jul 2019 08:54:33.316 * Starting BGSAVE for SYNC with target: disk
78:M 04 Jul 2019 08:54:33.318 * Background saving started by pid 82
82:C 04 Jul 2019 08:54:33.325 * DB saved on disk
82:C 04 Jul 2019 08:54:33.328 * RDB: 4 MB of memory used by copy-on-write
78:M 04 Jul 2019 08:54:33.372 * Background saving terminated with success
78:M 04 Jul 2019 08:54:33.376 * Synchronization with replica 10.244.1.91:6379 succeeded
78:M 04 Jul 2019 08:54:34.061 * Marking node 12cf9455a800f940f427c745318d1300a6730103 as failing (quorum reached).
78:M 04 Jul 2019 08:54:35.294 # Failover auth granted to d1e352d917cb9dfd5be2b053ef34473e11c7ea23 for epoch 47
78:M 04 Jul 2019 08:54:37.016 * Clear FAIL state for node 12cf9455a800f940f427c745318d1300a6730103: replica is reachable again.
78:M 04 Jul 2019 08:54:39.654 * Clear FAIL state for node 7a17b9c2f70a053f98fb480492a8e904d330f9ac: is reachable again and nobody is serving its slots after some time.
78:M 04 Jul 2019 08:54:40.364 # Cluster state changed: ok

1/30 图片发布计划

一天记录一张图片,持续30天

1/30,记忆深处隐约有画面:四月烟雨,幽深小巷,白墙黑瓦,碎石径上青苔点。

[转]关于TCP打洞技术

[转自] https://blog.csdn.net/zhongguoren666/article/details/7489809

建立穿越NAT设备的p2p的 TCP 连接只比UDP复杂一点点,TCP协议的“打洞”从协议层来看是与UDP
的“打洞”过程非常相似的。尽管如此,基于TCP协议的打洞至今为止还没有被很好的理解,这也
造成了对其提供支持的NAT设备不是很多。

在NAT设备支持的前提下,基于TCP的“打洞”技术实际上与基于UDP的“打洞”技术一样快捷、可靠。实际上,只要NAT设备支持的话,基于TCP的p2p技术的健壮性将比基于UDP的技术的更强一些,因为TCP协议的状态机给出了一种标准的方法来精确的获取某个TCP session的生命期,而UDP协议则无法做到这一点。

1 套接字和TCP端口的重用

实现基于TCP协议的p2p“打洞”过程中,最主要的问题不是来自于TCP协议,而是来自于来自于应用程序的API接口。这是由于标准的伯克利(Berkeley)套接字的API是围绕着构建客户端/服务器程序而设计的,API允许TCP流套接字通过调用connect()函数来建立向外的连接,或者通过listen()和accept函数接受来自外部的连接。

但是,TCP协议并没有象UDP那样的“同一个端口既可以向外连接,又能够接受来自外部的连接”的API。而且更糟的是,TCP的套接字通常仅允许建立1对1的响应,即应用程序在将一个套接字绑定到本地的一个端口以后,任何试图将第二个套接字绑定到该端口的操作都会失败。

为了让TCP“打洞”能够顺利工作,我们需要使用一个本地的TCP端口来监听来自外部的TCP连接,同时建立多个向外的TCP连接。幸运的是,所有的主流操作系统都能够支持一个特殊的TCP套接字参数,通常叫做“SO_REUSEADDR”,该参数允许应用程序将多个套接字绑定到本地的一个endpoint(只要所有要绑定的套接字都设置了SO_REUSEADDR参数即可)。BSD系统引入了SO_REUSEPORT参数,该参数用于区分
端口重用还是地址重用,在这样的系统里面,上述所有的参数必须都设置才行。

2 打开p2p的TCP流

假定客户端A希望建立与B的TCP连接。我们像通常一样假定A和B已经与公网上的已知服务器S建立了TCP连接。服务器记录下来每个联入的客户端的公网和内网的endpoints,如同为UDP服务的时候一样从协议层来看,TCP“打洞”与UDP“打洞”是几乎完全相同的过程:

1)、 S启动两个网络侦听,一个叫【主连接】侦听,一个叫【协助打洞】的侦听。
2)、 A和B分别与S的【主连接】保持联系。

3)、当A需要和B建立直接的TCP连接时,首先连接S的【协助打洞】端口,并发送协助连接申请。同时在该端口号上启动侦听。注意由于要在相同的网络终端上绑定到不同的套接字上,所以必须为这些套接字设置 SO_REUSEADDR 属性(即允许重用),否则侦听会失败。

4)、 S的【协助打洞】连接收到A的申请后通过【主连接】通知B,并将A经过NAT-A转换后的公网IP地址和端口等信息告诉B。

5)、 B收到S的连接通知后首先与S的【协助打洞】端口连接,随便发送一些数据后立即断开,这样做的目的是让S能知道B经过NAT-B转换后的公网IP和端口号。

6)、 B尝试与A的经过NAT-A转换后的公网IP地址和端口进行connect,根据不同的路由器会有不同的结果,有些路由器在这个操作就能建立连接(例如我用的TPLink R402),大多数路由器对于不请自到的SYN请求包直接丢弃而导致connect失败,但NAT-A会纪录此次连接的源地址和端口号,为接下来真正的连接做好了准备,这就是所谓的打洞,即B向A打了一个洞,下次A就能直接连接到B刚才使用的端口号了。

7)、客户端B打洞的同时在相同的端口上启动侦听。B在一切准备就绪以后通过与S的【主连接】回复消息“我已经准备好”,S在收到以后将B经过NAT-B转换后的公网IP和端口号告诉给A。

8)、 A收到S回复的B的公网IP和端口号等信息以后,开始连接到B公网IP和端口号,由于在步骤6中B曾经尝试连接过A的公网IP地址和端口,NAT-A纪录了此次连接的信息,所以当A主动连接B时,NAT-B会认为是合法的SYN数据,并允许通过,从而直接的TCP连接建立起来了。

图7
与UDP不同的是,使用UDP协议的每个客户端只需要一个套接字即可完成与服务器S通信,
并同时与多个p2p客户端通信的任务;而TCP客户端必须处理多个套接字绑定到同一个本地
TCP端口的问题,如图7所示。

现在来看更加实际的一种情景:A与B分别位于不同的NAT设备后面。如同使用UDP协议进行“打洞”
操作遇到的问题一样,TCP的“打洞”操作也会遇到内网的IP与“伪”公网IP重复造成连接失败或者错误连接之类的问题。

客户端向彼此公网endpoint发起连接的操作,会使得各自的NAT设备打开新的“洞”以允许A与B的
TCP数据通过。如果NAT设备支持TCP“打洞”操作的话,一个在客户端之间的基于TCP协议的流
通道就会自动建立起来。如果A向B发送的第一个SYN包发到了B的NAT设备,而B在此前没有向
A发送SYN包,B的NAT设备会丢弃这个包,这会引起A的“连接失败”或“无法连接”问题。而此时,由于A已经向B发送过SYN包,B发往A的SYN包将被看作是由A发往B的包的回应的一部分,
所以B发往A的SYN包会顺利地通过A的NAT设备,到达A,从而建立起A与B的p2p连接。

3 从应用程序的角度来看TCP“打洞”

从应用程序的角度来看,在进行TCP“打洞”的时候都发生了什么呢?

假定A首先向B发出SYN包,该包发往B的公网endpoint,并且被B的NAT设备丢弃,但是B发往A的公网endpoint的SYN包则通过A的NAT到达了A,然后,会发生以下的两种结果中的一种,具体是哪一种取决于操作系统对TCP协议的实现:

(1)A的TCP实现会发现收到的SYN包就是其发起连接并希望联入的B的SYN包,通俗一点来说
就是“说曹操,曹操到”的意思,本来A要去找B,结果B自己找上门来了。A的TCP协议栈因此
会把B做为A向B发起连接connect的一部分,并认为连接已经成功。程序A调用的异步connect()
函数将成功返回,A的listen()等待从外部联入的函数将没有任何反映。此时,B联入A的操作
在A程序的内部被理解为A联入B连接成功,并且A开始使用这个连接与B开始p2p通信。

由于A收到的SYN包中不包含A需要的ACK数据,因此,A的TCP将用SYN-ACK包回应B的公网endpoint,
并且将使用先前A发向B的SYN包一样的序列号。一旦B的TCP收到由A发来的SYN-ACK包,则把自己
的ACK包发给A,然后两端建立起TCP连接。简单地说,第一种,就是即使A发往B的SYN包被B的NAT
丢弃了,但是由于B发往A的包到达了A。结果是,A认为自己连接成功了,B也认为自己连接成功
了,不管是谁成功了,总之连接是已经建立起来了。

(2)另外一种结果是,A的TCP实现没有像(1)中所讲的那么“智能”,它没有发现现在联入的B
就是自己希望联入的。就好比在机场接人,明明遇到了自己想要接的人却不认识,误认为是其它
的人,安排别人给接走了,后来才知道是自己错过了机会,但是无论如何,人已经接到了任务
已经完成了。然后,A通过常规的listen()函数和accept()函数得到与B的连接,而由A发起的向
B的公网endpoint的连接会以失败告终。尽管A向B的连接失败,A仍然得到了B发起的向A的连接,
等效于A与B之间已经联通,不管中间过程如何,A与B已经连接起来了,结果是A和B的基于TCP协议
的p2p连接已经建立起来了。

第一种结果适用于基于BSD的操作系统对于TCP的实现,而第二种结果更加普遍一些,多数linux和
windows系统都会按照第二种结果来处理。

代码:

// 服务器地址和端口号定义

define SRV_TCP_MAIN_PORT    4000  // 服务器主连接的端口号 define SRV_TCP_HOLE_PORT    8000  // 服务器响应客户端打洞申请的端口号

这两个端口是固定的,服务器S启动时就开始侦听这两个端口了。

//
// 将新客户端登录信息发送给所有已登录的客户端,但不发送给自己
//
BOOL SendNewUserLoginNotifyToAll (LPCTSTR lpszClientIP, UINT nClientPort, DWORD dwID)
{
ASSERT ( lpszClientIP && nClientPort > 0 );
g_CSFor_PtrAry_SockClient.Lock();
for ( int i=0; ipSockClient = (CSockClient)g_PtrAry_SockClient.GetAt(i);     if ( pSockClient && pSockClient->m_bMainConn && pSockClient->m_dwID > 0 && pSockClient->m_dwID != dwID )
{
if (!pSockClient->SendNewUserLoginNotify (lpszClientIP, nClientPort, dwID))
{
g_CSFor_PtrAry_SockClient.Unlock();
return FALSE;
}
}
}
g_CSFor_PtrAry_SockClient.Unlock ();
return TRUE;
}

当有新的客户端连接到服务器时,服务器负责将该客户端的信息(IP地址、端口号)发送给其他客户端。

//
// 执行者:客户端A
// 有新客户端B登录了,我(客户端A)连接服务器端口 SRV_TCP_HOLE_PORT ,申请与B建立直接的TCP连接
//
BOOL Handle_NewUserLogin ( CSocket &MainSock, t_NewUserLoginPkt *pNewUserLoginPkt )
{
printf ( “New user ( %s:%u:%u ) login server”, pNewUserLoginPkt->szClientIP,
pNewUserLoginPkt->nClientPort, pNewUserLoginPkt->dwID );
BOOL bRet = FALSE;
DWORD dwThreadID = 0;
t_ReqConnClientPkt ReqConnClientPkt;
CSocket Sock;
CString csSocketAddress;
  char    szRecvBuffer[NET_BUFFER_SIZE] = {0};
int     nRecvBytes = 0;
// 创建打洞Socket,连接服务器协助打洞的端口号 SRV_TCP_HOLE_PORT:
try
{
if ( !Sock.Socket () )
{
printf ( “Create socket failed : %s”, hwFormatMessage(GetLastError()) );
goto finished;
}
     UINT nOptValue = 1;
if ( !Sock.SetSockOpt ( SO_REUSEADDR, &nOptValue , sizeof(UINT) ) )
{
printf ( “SetSockOpt socket failed : %s”, hwFormatMessage(GetLastError()) );
goto finished;
}
if ( !Sock.Bind ( 0 ) )
{
  printf ( “Bind socket failed : %s”, hwFormatMessage(GetLastError()) );
goto finished;
}
if ( !Sock.Connect ( g_pServerAddess, SRV_TCP_HOLE_PORT ) )
{
printf ( “Connect to [%s:%d] failed : %s”, g_pServerAddess,
SRV_TCP_HOLE_PORT, hwFormatMessage(GetLastError()) );
goto finished;
}
}
catch ( CException e )
{
char szError[255] = {0};
e.GetErrorMessage( szError, sizeof(szError) );
printf ( “Exception occur, %s”, szError );
goto finished;
}
g_pSock_MakeHole = &Sock;
ASSERT ( g_nHolePort == 0 );
VERIFY ( Sock.GetSockName ( csSocketAddress, g_nHolePort ) );
// 创建一个线程来侦听端口 g_nHolePort 的连接请求
dwThreadID = 0;
g_hThread_Listen = ::CreateThread ( NULL, 0, ::ThreadProc_Listen, LPVOID(NULL), 0, &dwThreadID );
if (!HANDLE_IS_VALID(g_hThread_Listen) ) return FALSE;
Sleep ( 3000 );
// 我(客户端A)向服务器协助打洞的端口号 SRV_TCP_HOLE_PORT 发送申请,

    // 希望与新登录的客户端B建立连接
// 服务器会将我的打洞用的外部IP和端口号告诉客户端B:
   ASSERT ( g_WelcomePkt.dwID > 0 );
ReqConnClientPkt.dwInviterID = g_WelcomePkt.dwID;
ReqConnClientPkt.dwInvitedID = pNewUserLoginPkt->dwID;
if ( Sock.Send ( &ReqConnClientPkt, sizeof(t_ReqConnClientPkt) ) != sizeof(t_ReqConnClientPkt) )
goto finished;
// 等待服务器回应,将客户端B的外部IP地址和端口号告诉我(客户端A):
nRecvBytes = Sock.Receive ( szRecvBuffer, sizeof(szRecvBuffer) );
if ( nRecvBytes > 0 )
{
ASSERT ( nRecvBytes == sizeof(t_SrvReqDirectConnectPkt) );
    PACKET_TYPE pePacketType = (PACKET_TYPE)szRecvBuffer;
ASSERT ( pePacketType && pePacketType == PACKET_TYPE_TCP_DIRECT_CONNECT );     Sleep ( 1000 );     Handle_SrvReqDirectConnect ( (t_SrvReqDirectConnectPkt)szRecvBuffer );
printf ( “Handle_SrvReqDirectConnect end” );
}
// 对方断开连接了
else
{
goto finished;
}

bRet = TRUE;
finished:
g_pSock_MakeHole = NULL;
return bRet;
}

这里假设客户端A先启动,当客户端B启动后客户端A将收到服务器S的新客户端登录的通知,并得到客户端B的公网IP和端口,客户端A启动线程连接S的【协助打洞】端口(本地端口号可以用GetSocketName()函数取得,假设为M),请求S协助TCP打洞,然后启动线程侦听该本地端口(前面假设的M)上的连接请求,然后等待服务器的回应。

//
// 客户端A请求我(服务器)协助连接客户端B,这个包应该在打洞Socket中收到
//
BOOL CSockClient::Handle_ReqConnClientPkt(t_ReqConnClientPkt *pReqConnClientPkt)
{
ASSERT ( !m_bMainConn );
CSockClient *pSockClient_B = FindSocketClient ( pReqConnClientPkt->dwInvitedID );
   if ( !pSockClient_B ) return FALSE;
printf ( “%s:%u:%u invite %s:%u:%u connection”,

        m_csPeerAddress, m_nPeerPort, m_dwID,

pSockClient_B->m_csPeerAddress,

        pSockClient_B->m_nPeerPort,

        pSockClient_B->m_dwID );
// 客户端A想要和客户端B建立直接的TCP连接,服务器负责将A的外部IP和端口号告诉给B:
t_SrvReqMakeHolePkt SrvReqMakeHolePkt;
SrvReqMakeHolePkt.dwInviterID = pReqConnClientPkt->dwInviterID;
   SrvReqMakeHolePkt.dwInviterHoleID = m_dwID;
  SrvReqMakeHolePkt.dwInvitedID = pReqConnClientPkt->dwInvitedID;
STRNCPY_CS ( SrvReqMakeHolePkt.szClientHoleIP, m_csPeerAddress );
SrvReqMakeHolePkt.nClientHolePort = m_nPeerPort;
if ( pSockClient_B->SendChunk ( &SrvReqMakeHolePkt, sizeof(t_SrvReqMakeHolePkt), 0 ) != sizeof(t_SrvReqMakeHolePkt) )
return FALSE;
// 等待客户端B打洞完成,完成以后通知客户端A直接连接客户端外部IP和端口号
if ( !HANDLE_IS_VALID(m_hEvtWaitClientBHole) )
return FALSE;
if ( WaitForSingleObject ( m_hEvtWaitClientBHole, 6000*1000 ) == WAIT_OBJECT_0 )
{
if ( SendChunk (&m_SrvReqDirectConnectPkt, sizeof(t_SrvReqDirectConnectPkt), 0)
== sizeof(t_SrvReqDirectConnectPkt) )
return TRUE;
}
return FALSE;
}

服务器S收到客户端A的协助打洞请求后通知客户端B,要求客户端B向客户端A打洞,即让客户端B尝试与客户端A的公网IP和端口进行connect。

//
// 执行者:客户端B
// 处理服务器要我(客户端B)向另外一个客户端(A)打洞,打洞操作在线程中进行。
// 先连接服务器协助打洞的端口号 SRV_TCP_HOLE_PORT ,通过服务器告诉客户端A我(客户端B)的外部IP地址和端口号,然后启动线程进行打洞,
// 客户端A在收到这些信息以后会发起对我(客户端B)的外部IP地址和端口号的连接(这个连接在客户端B打洞完成以后进行,所以
// 客户端B的NAT不会丢弃这个SYN包,从而连接能建立)
//
BOOL Handle_SrvReqMakeHole(CSocket &MainSock, t_SrvReqMakeHolePkt *pSrvReqMakeHolePkt)
{
ASSERT ( pSrvReqMakeHolePkt );
// 创建Socket,连接服务器协助打洞的端口号 SRV_TCP_HOLE_PORT,连接建立以后发送一个断开连接的请求给服务器,然后连接断开
// 这里连接的目的是让服务器知道我(客户端B)的外部IP地址和端口号,以通知客户端A
CSocket Sock;
try
{
if ( !Sock.Create () )
{
printf ( “Create socket failed : %s”, hwFormatMessage(GetLastError()) );
return FALSE;
}
if ( !Sock.Connect ( g_pServerAddess, SRV_TCP_HOLE_PORT ) )
{
printf ( “Connect to [%s:%d] failed : %s”, g_pServerAddess,
SRV_TCP_HOLE_PORT, hwFormatMessage(GetLastError()) );
return FALSE;
}
}
catch ( CException e )
{
char szError[255] = {0};
     e.GetErrorMessage( szError, sizeof(szError) );
printf ( “Exception occur, %s”, szError );
return FALSE;
}
CString csSocketAddress;
ASSERT ( g_nHolePort == 0 );
VERIFY ( Sock.GetSockName ( csSocketAddress, g_nHolePort ) );
// 连接服务器协助打洞的端口号 SRV_TCP_HOLE_PORT,发送一个断开连接的请求,然后将连接断开,服务器在收到这个包的时候也会将
// 连接断开
t_ReqSrvDisconnectPkt ReqSrvDisconnectPkt;
ReqSrvDisconnectPkt.dwInviterID = pSrvReqMakeHolePkt->dwInvitedID;
ReqSrvDisconnectPkt.dwInviterHoleID = pSrvReqMakeHolePkt->dwInviterHoleID;
ReqSrvDisconnectPkt.dwInvitedID = pSrvReqMakeHolePkt->dwInvitedID;
ASSERT ( ReqSrvDisconnectPkt.dwInvitedID == g_WelcomePkt.dwID );
if ( Sock.Send ( &ReqSrvDisconnectPkt, sizeof(t_ReqSrvDisconnectPkt) ) != sizeof(t_ReqSrvDisconnectPkt) )
return FALSE;
Sleep ( 100 );
Sock.Close ();
// 创建一个线程来向客户端A的外部IP地址、端口号打洞
t_SrvReqMakeHolePkt *pSrvReqMakeHolePkt_New = new t_SrvReqMakeHolePkt;
if ( !pSrvReqMakeHolePkt_New ) return FALSE;
memcpy (pSrvReqMakeHolePkt_New, pSrvReqMakeHolePkt, sizeof(t_SrvReqMakeHolePkt));
DWORD dwThreadID = 0;
g_hThread_MakeHole = ::CreateThread ( NULL, 0, ::ThreadProc_MakeHole,
         LPVOID(pSrvReqMakeHolePkt_New), 0, &dwThreadID );
if (!HANDLE_IS_VALID(g_hThread_MakeHole) )

         return FALSE;
// 创建一个线程来侦听端口 g_nHolePort 的连接请求
dwThreadID = 0;
g_hThread_Listen = ::CreateThread ( NULL, 0, ::ThreadProc_Listen, LPVOID(NULL), 0, &dwThreadID );
if (!HANDLE_IS_VALID(g_hThread_Listen) )

         return FALSE;

     // 等待打洞和侦听完成
HANDLE hEvtAry[] = { g_hEvt_ListenFinished, g_hEvt_MakeHoleFinished };
if ( ::WaitForMultipleObjects ( LENGTH(hEvtAry), hEvtAry, TRUE, 30*1000 ) == WAIT_TIMEOUT )
return FALSE;
t_HoleListenReadyPkt HoleListenReadyPkt;
HoleListenReadyPkt.dwInvitedID = pSrvReqMakeHolePkt->dwInvitedID;
HoleListenReadyPkt.dwInviterHoleID = pSrvReqMakeHolePkt->dwInviterHoleID;
HoleListenReadyPkt.dwInvitedID = pSrvReqMakeHolePkt->dwInvitedID;
if ( MainSock.Send ( &HoleListenReadyPkt, sizeof(t_HoleListenReadyPkt) ) != sizeof(t_HoleListenReadyPkt) )
{
printf ( “Send HoleListenReadyPkt to %s:%u failed : %s”,
         g_WelcomePkt.szClientIP, g_WelcomePkt.nClientPort,
hwFormatMessage(GetLastError()) );
return FALSE;
}

return TRUE;
}

客户端B收到服务器S的打洞通知后,先连接S的【协助打洞】端口号(本地端口号可以用 GetSocketName()函数取得,假设为X),启动线程尝试连接客户端A的公网IP和端口号,根据路由器不同,连接情况各异,如果运气好直接连接就成功了,即使连接失败,但打洞便完成了。同时还要启动线程在相同的端口(即与S的【协助打洞】端口号建立连接的本地端口号X)上侦听到来的连接,等待客户端A直接连接该端口号。

//
// 执行者:客户端A
// 服务器要求主动端(客户端A)直接连接被动端(客户端B)的外部IP和端口号
//
BOOL Handle_SrvReqDirectConnect ( t_SrvReqDirectConnectPkt *pSrvReqDirectConnectPkt )
{
ASSERT ( pSrvReqDirectConnectPkt );
printf ( “You can connect direct to ( IP:%s PORT:%d ID:%u )”,

        pSrvReqDirectConnectPkt->szInvitedIP,
pSrvReqDirectConnectPkt->nInvitedPort, pSrvReqDirectConnectPkt->dwInvitedID );
// 直接与客户端B建立TCP连接,如果连接成功说明TCP打洞已经成功了。
CSocket Sock;
try
{
if ( !Sock.Socket () )
{
printf ( “Create socket failed : %s”, hwFormatMessage(GetLastError()) );
return FALSE;
}
UINT nOptValue = 1;
if ( !Sock.SetSockOpt ( SO_REUSEADDR, &nOptValue , sizeof(UINT) ) )
{
printf( “SetSockOpt socket failed : %s”, hwFormatMessage(GetLastError()));
return FALSE;
}
if ( !Sock.Bind ( g_nHolePort ) )
{
printf ( “Bind socket failed : %s”, hwFormatMessage(GetLastError()) );
return FALSE;
}
for ( int ii=0; ii<100; ii++ )     {        if ( WaitForSingleObject ( g_hEvt_ConnectOK, 0 ) == WAIT_OBJECT_0 )          break;       DWORD dwArg = 1;       if ( !Sock.IOCtl ( FIONBIO, &dwArg ) )       {         printf ( “IOCtl failed : %s”, hwFormatMessage(GetLastError()) );       }       if ( !Sock.Connect ( pSrvReqDirectConnectPkt->szInvitedIP, pSrvReqDirectConnectPkt->nInvitedPort ) )
{
printf ( “Connect to [%s:%d] failed : %s”,
pSrvReqDirectConnectPkt->szInvitedIP,
pSrvReqDirectConnectPkt->nInvitedPort,
hwFormatMessage(GetLastError()) );
Sleep (100);
}
else

                break;
}
if ( WaitForSingleObject ( g_hEvt_ConnectOK, 0 ) != WAIT_OBJECT_0 )
{
if ( HANDLE_IS_VALID ( g_hEvt_ConnectOK ) )

              SetEvent ( g_hEvt_ConnectOK );
printf ( “Connect to [%s:%d] successfully !!!”,
           pSrvReqDirectConnectPkt->szInvitedIP,

                pSrvReqDirectConnectPkt->nInvitedPort );

// 接收测试数据
printf ( “Receiving data …” );
char szRecvBuffer[NET_BUFFER_SIZE] = {0};
int nRecvBytes = 0;
    for ( int i=0; i<1000; i++ )       {         nRecvBytes = Sock.Receive ( szRecvBuffer, sizeof(szRecvBuffer) );         if ( nRecvBytes > 0 )
{
printf ( “–>>> Received Data : %s”, szRecvBuffer );
memset ( szRecvBuffer, 0, sizeof(szRecvBuffer) );
SLEEP_BREAK ( 1 );
}
else
{
SLEEP_BREAK ( 300 );
}
}
}
}
catch ( CException e )
{
    char szError[255] = {0};
e.GetErrorMessage( szError, sizeof(szError) );
printf ( “Exception occur, %s”, szError );
     return FALSE;
}
return TRUE;
}

在客户端B打洞和侦听准备好以后,服务器S回复客户端A,客户端A便直接与客户端B的公网IP和端口进行连接,收发数据可以正常进行,为了测试是否真正地直接TCP连接,在数据收发过程中可以将服务器S强行终止,看是否数据收发还正常进行着。