作者存档: nick - 第3页

10.3 周围看看

假期随便找个地方玩玩吧

关于Ceph现状与未来的一些思考

转自:https://www.ustack.com/blog/sikao/

Ceph从2004年提交了第一行代码,至今为止已经10年了。这个起源于Sage博士论文,最早致力于开发下一代高性能分布式文件系统的项目,现在也成为了开源社区众人皆知的明星项目。特别是随着云计算的发展,Ceph乘上了OpenStack的春风,受到各大厂商的待见,Intel、DreamHost、SanDisk、CISCO、Yahoo等公司都或多或少的参与其中。RedHat更是一掷千金,直接砸了1.75亿美金将Sage创建的Inktank公司及其Ceph团队收入囊中,将其作为IaaS三大组件计算、网络、存储之一。

在这十年的发展过程中,Ceph似乎越来越向着云计算的方向靠拢,最先的CephFS文件系统已经不再是开发重点,甚至开发已经陷入了停滞状态。而与虚拟化相关的RBD、RGW则成了发展重点,成为发展最快的模块。但是从代码中仍然能够看到各种遗迹,似乎在告诉后来人这段饶了一个大弯的历史。

Ceph发展现在仍然快的眼花缭乱,让我们暂时停下脚步,看看经过十年发展后,现在Ceph的优势与缺点。

一、优势

  1. CRUSH算法

CRUSH算法是Ceph最初的两大创新之一(另一个是基于动态子树分区的元数据集群),也是整个RADOS的基石,是Ceph最引以为豪的地方。

CRUSH在一致性哈希基础上很好的考虑了容灾域的隔离,能够实现各类负载的副本放置规则,例如跨机房、机架感知等。同时, CRUSH算法支持副本和EC两种数据冗余方式,还提供了四种不同类型的Bucket(Uniform, List, Tree, Straw),充分考虑了实际生产过程中硬件的迭代式部署方式,虽然实际生产中大多数情况下的都是只用了一种Straw。

另外根据Sage的论文,CRUSH算法具有相当好的可扩展性,在数千OSD的情况下仍然能保证良好的负载平衡。但这更多是理论层面的,目前还没有人给出在数PB规模的生产环境中的测试结果。

总的来看,CRUSH算法仍然是目前经过实践检验的最好的数据分布算法之一。

2. 统一存储架构

Ceph最初设计的RADOS是为其实现一个高性能的文件系统服务的,并没有考虑对于块设备、对象存储的支持,也就没有什么RBD、RADOS GateWay,跟别提OpenStack和qemu之类的了。但谁想无心插柳柳成荫,由于 RADOS 出色的设计和独立简洁的访问接口,再加上Sage敏锐的眼光,Ceph社区果断推出了用于支持云计算的分布式块存储RBD和分布式对象存储RADOS GateWay,并将开发中心全面转向云计算领域。

不得不说,RADOS的设计还是非常的优秀。从架构上来看,RBD和RADOSGateWay实际上都只是RADOS的客户端而已,但得益于RADOS的优秀设计,RBD和RADOSGateWay的设计和实现都很简单,不需要考虑横向扩展、冗余、容灾、负载平衡的等复杂的分布式系统问题,同时能够提供足够多的特性和足够优秀的性能,因此迅速得到了社区的认可。另一方面,Ceph为OpenStack提供了良好的支持,成为了目前最火的OpenStack底层存储系统。乘着云计算和OpenStack的东风,Ceph作为一个统一存储系统,似乎大有舍我取谁之势。

3.丰富的特性

Ceph的特性不可谓不多,从分布式系统最基本的横向扩展、动态伸缩、冗余容灾、负载平衡等,到生产环境环境中非常实用的滚动升级、多存储池、延迟删除等,再到高大上的CephFS集群、快照、纠删码、跨存储池缓存等,不可谓不强大。

但是就像大多数开源系统一样,Ceph的基本特性,或者说真正在生产环境中用的上的特性还是非常靠谱的,但其他“高级”特性就只能打一个问号了。特别是在CephFS模块,由于Ceph社区目前的开发重点主要还是与云计算相关的部分,即RBD和RADOSGateWay,导致CephFS的开发停滞了很久,相关的特性,例如元数据集群、快照等,目前都不满足生产环境的要求。

二、缺点

  1. 代码质量

代码质量的问题,实际上是个仁者见仁智者见智的问题。

Ceph主要使用C/C++语言编写,同时外围的很多脚本和工具用了Python。之所以要说明Ceph的语言构成,是因为代码质量实际上是和语言具有密切的关系。不否认用C++也能写出优雅的代码,但相比于更加“现代”的语言,要想写出具备同样可读性、结构良好、调理清晰代码,C++要困难很多。但是,由于存储作为底层系统,对效率的追求是无止境的,因此不太可能舍弃对于内存等底层系统资源的控制,而使用Java/Python这类的语言。而作为一个开源项目,期望所有的贡献者都是C++的高手,未免有些强人所难,这似乎成了一个死结。其他类似的开源项目怎么办呢?貌似他们都用的纯c……

另一方面,Ceph广泛使用了STL,在部分核心代码中还是用了BOOST,这两者在底层核心系统代码中的可用性也一直存在争议。这更加加剧了代码质量的挑战性。

最关键的是,Ceph似乎已经被太多已经背负了太多的历史包袱,比如最核心的osd模块,最初的设计包含OSD和PG类,其中PG类负责PG的通用逻辑,OSD负责管理所有的PG。然后PG的子类ReplicatedPG实现了以副本作为冗余模式的PG。这里就存在了两个半类:OSD、PG及其子类ReplicatedPG,这两个半类实现了osd模块99%的逻辑,可以想象这两个半类会有多大。

在目前的master分支上,相关文件的大小分别是:

OSD.h+OSD.cc = 2383行+8604行 = 10987行

PG.h+PG.cc = 2256行+7611行 = 9867行

ReplicatedPG.h+ReplicatedPG.cc = 1487行+12665行 = 14152行

需要特别注意的是,从C++继承的角度上,理解一个类,必须理解他的父类,也就是说,如果你想理解ReplicatedPG,理论上你必须同时理解PG,也就是说,要同时理解20000+行代码!

更加丧心病狂的是,这两个半类之间存在密切而复杂的调用关系,相互之间直接使用整个类,而没有什么实际上的接口隔离。严重加剧了理解代码的难度。

在EC功能以一种奇葩的方式加入到osd中之后,整个场面更加混乱。按照最初的设计,实现EC应该增加PG的一个子类,类似ErasureCodePG。但是由于ReplicatedPG包含了太多通用的代码,实际上已经和PG合二为一了,所以EC只能在ReplicatedPG的基础上改造。于是又出现了PGBackend的概念和相关的实现,这只能说是挑战人脑的极限了。

Ceph社区也曾试着梳理代码,比如添加OSDService类,作为PG与OSD通讯的接口。这样所有的PG全部调用OSDService而非OSD,相当于做了OSD与PG之间的隔离。但是似乎并没有起到足够的效果,现在已经名存实亡了。

Ceph在这样的代码质量下,还能向前走多久,委实是一件令人担忧的事情。

2. 性能

Ceph的性能总的来说还是不错的,基本上能发挥出物理硬件的性能,但是存在以下几个隐患:

1)数据双倍写入。Ceph本地存储接口(FileStore)为了支持事务,引入了日志(Journal)机制。所有的写入操作都需要先写入日志(XFS模式下),然后再写入本地文件系统。简单来说就是一份数据需要写两遍,日志+本地文件系统。这就造成了在大规模连续IO的情况下,实际上磁盘输出的吞吐量只有其物理性能的一半。

2)IO路径过长。这个问题在Ceph的客户端和服务器端都存在。以osd为例,一个IO需要经过message、OSD、FileJournal、FileStore多个模块才能完成,每个模块之间都涉及到队列和线程切换,部分模块在对IO进行处理时还要进行内存拷贝,导致整体性能不高。

3)对高性能硬件的支持有待改进。Ceph最开始是为HDD设计的,没有充分考虑全SSD,甚至更先进的PCIe SSD和NVRAM的情况NVRAM。导致这些硬件的物理性能在Ceph中无法充分发挥出来,特别是延迟和IOPS,受比较大的影响。

3. CephFS

CephFS现在在整个Ceph系统中处于一个较为尴尬的情况,因为POSIX这种借口似乎在云计算中没有用武之地,导致了社区对这个模块的关注不足,也就没有什么进展。

CephFS作为最初Ceph的设计目标,Sage投入了巨大的精力,几乎实现了所有需要的特性,并且进行了大量工程层面的优化。

正所谓成也萧何败萧何,Ceph想把CephFS模块做到足够强大,甚至是最强大,但强大的同时也意味着不菲的代价。元数据动态子树分区、目录分片、快照、权限控制、IOPS优化、故障恢复、分布式缓存、强弱一致性控制,这些高大上的名词背后都意味着复杂的工程性任务,更不要说将这些叠加在一起。很多时候,叠加不是想加,而是相乘的关系。最终的结果就是整个MDS的工程难度已经超过了可以掌控的程度,无法做出足够成熟、稳定的系统。

目前CephFS宣称其单MDS的模式是稳定的,MDS的集群的模式是不稳定的。而快照功能默认关闭,今后也够呛会有开启的可能了。

4. 业务连续性

Ceph中的RADOS采用强一致性设计,即Write-All-Read-One,这种模式的好处在于读取效率较高,而且工程难度较低,比较适合与读多写少的系统。

Write-All-Read-One的特点是必须等待所有的副本全部写入完毕才算是写入成功,这实际上对系统硬件的可靠性要求较高,因为如果在写入过程中存在任意硬件故障,则写入过程都要受影响。通常表现为卡顿,一般在数秒级别,时间长短和判断故障的机制以及故障恢复过程中IO的处理策略相关。

但是当集群非常非常大时,Write-All-Read-One对于硬件可靠性的要求几乎是无法满足的。想象一下一个10PB的系统,按照最大4TB每块盘的计算,就有2500块磁盘。按照我们以往的运维经验,每周存在一块磁盘故障是完全正常的。这种场景下,如果数据分布足够分散,实际上一块磁盘可能涉及到很多数据块,也就是说一块磁盘故障会影响很多IO,而这种情况每周发生一次。这对业务连续性的影响是已经是不可忽略的。

生产环境中的场景比这个更加复杂,因为磁盘或者硬件的故障可能不仅表现为不可写,还有可能是慢或者不稳定。这些情况对于业务连续性的影响也更加严重。

5. 社区

Ceph社区现在已经有很多厂商实际上或者号称参入进来,其中不乏Intel、Dreamhost、SanDisk这样的大厂,也不乏UnitedStack这样的Start-Up公司,还有电信、大学、研究所这类非存储领域的公司或单位。但实际上整个Ceph还是掌握在Inktank或者说RedHat的手中,绝大多数核心代码由他们贡献,也是他们Review和Merge。总的来说还是一个集权组织。

更加重要的是,Ceph相比OpenStack这种成熟完善的开源社区,缺乏足够的基础设施,例如成熟的单元测试、集成测试、测试环境、Reivew流程、贡献指引、代码规范等。导致整个社区仍然是人治、而非法制的过程,代码和系统的发展方向本质是由RedHat公司控制的。

对于以上这些问题,Ceph社区也非常清楚,并且正在或者将要改进。例如为了增加了对于SSD的支持,改进数据双倍写入问题以及更完善的社区建设和基础设施等。这些都增加了人们对Ceph的信心。

总的来说,Ceph瑕不掩瑜,仍然是一个优秀,甚至出色的开源存储系统。如果说分布式存储在云计算时代是风口上的猪,那么Ceph也是一直优秀的猪。

未来是什么样子,我们拭目以待。

咖啡暖暖

K8s集群证书问题

kubeadm配置的集群默认的证书是1年,到期后证书升级是个很麻烦的事情。这里推荐一个自动化脚本:https://github.com/yuyicai/update-kube-cert

执行update-kube-cert.sh即可将证书升级为10年有效。

已fork到本地仓库:https://g.tixbay.net/r/kube/update-kube-cert.git

呐喊 2020

夜航船

chrome 允许非https网站访问摄像头/麦等设备:

chrome://flags/#unsafely-treat-insecure-origin-as-secure

这个组合还能继续吗🤣

[原创]K8s中搭建自动恢复的redis集群

参考 https://www.jianshu.com/p/65c4baadf5d9

但是这个方案并不完美,存在问题:

pod重启后pod ip已经改变,而redis配置nodes.conf中的ip并没有变。如果单个节点down掉后重启,集群是可以恢复的,如果存在一半以上节点down的情况下,比如k8s集群重启,redis集群是不能恢复的。

如何在集群整体重启情况下自动恢复?

redis是依赖nodes.conf配置节点信息,从而相互通信。因此我们只要保证nodes.conf能更新成新的pod ip就可以了。

步骤:

pod启动后向一个所有redis实例都能访问的地方写入一个redisid和ip的对应关系。同时对nodes.conf中所有redisid,去查找对应的ip,如果能ping通(或者其他健康检测方式)则认为ip有效,更新nodes.conf。直到所有ip都在线后,启动redis实例。

思路就是这样了,下面是具体实现并贴代码:

修改redis-cluster.yaml,增加公共访问区域:

比如我这里是nfs:挂载到/sh,sh下添加start.sh和checkip.sh,由于redis镜像中没有ping工具,偷懒起见手动copy ping、libcap.so.2、libcrypto.so.10、libidn.so.11到挂载目录下,并在start.sh中指定LD_LIBRARY_PATH。

start.sh

#!/bin/bash

export LD_LIBRARY_PATH=/sh:$LD_LIBRARY_PATH

newip=`cat /etc/hosts|grep redis-app|awk '{print $1}'`
myid=`cat /var/lib/redis/nodes.conf |grep myself|awk '{print $1}'`
if [ "$newip"x == ""x ]; then
    echo "Cannot find new ip";
    exit 1;
elif [ "$myid"x == ""x ]; then
    echo "no myid";
    exit 1;
else
    echo $newip > /sh/$myid 
    echo "refresh ip: $myid -> $newip";
fi

echo "check nodes.conf"
cat /var/lib/redis/nodes.conf|grep -E "master|slave"|awk '{print $1}'|xargs -i /sh/checkip.sh {}

if [ $? -eq 0 ]; then
    echo "done nodes.conf"
    redis-server /etc/redis/redis.conf --protected-mode no
else
    echo "abort on error"
fi

checkip.sh

#!/bin/bash

if [ $# -ne 1 ]; then
    exit 1;
fi

while :
do
    echo "while";
    chkip=`cat /sh/$1`
    if [ "$chkip"x == ""x ]; then
        sleep 1s;
    else
        /sh/ping -c1 $chkip;
        if [ $? -eq 0 ]; then
            oldip=`cat /var/lib/redis/nodes.conf |grep -E "^$1"|awk '{print $2}'|cut -d ":" -f1`
            if [ "$oldip"x == ""x ]; then
                echo "no old ip";
                exit 1;
            else
                echo "oldip=$oldip and newip=$chkip"
                sed -i "s/$oldip/$chkip/g" /var/lib/redis/nodes.conf
                echo "done $1 $chkip";
                exit 0;
            fi
        else
            sleep 1s;
        fi
    fi
done

最后把pod的启动脚本改为/sh/start.sh,更新pod:

kubectl apply -f redis-stateful.yaml

测试:把redis-stateful整体移除,并重新create,集群正常恢复,以下是pod日志:

refresh ip: 2d05fb2406a254f08664b5ff5d26a151b5b262cc -> 10.244.4.68
check nodes.conf
while
PING 10.244.4.68 (10.244.4.68) 56(84) bytes of data.
64 bytes from 10.244.4.68: icmp_seq=1 ttl=64 time=0.072 ms

--- 10.244.4.68 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.072/0.072/0.072/0.000 ms
oldip=10.244.4.66 and newip=10.244.4.68
done 2d05fb2406a254f08664b5ff5d26a151b5b262cc 10.244.4.68
while
PING 10.244.1.89 (10.244.1.89) 56(84) bytes of data.

--- 10.244.1.89 ping statistics ---
1 packets transmitted, 0 received, 100% packet loss, time 0ms

while
PING 10.244.1.89 (10.244.1.89) 56(84) bytes of data.

--- 10.244.1.89 ping statistics ---
1 packets transmitted, 0 received, 100% packet loss, time 0ms

while
PING 10.244.1.91 (10.244.1.91) 56(84) bytes of data.
64 bytes from 10.244.1.91: icmp_seq=1 ttl=62 time=1.86 ms

--- 10.244.1.91 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 1.861/1.861/1.861/0.000 ms
oldip=10.244.1.89 and newip=10.244.1.91
done 1665fb20c43e7468c54bbdea7ed6e283283669df 10.244.1.91
while
PING 10.244.2.135 (10.244.2.135) 56(84) bytes of data.
64 bytes from 10.244.2.135: icmp_seq=1 ttl=62 time=1.87 ms

--- 10.244.2.135 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 1.870/1.870/1.870/0.000 ms
oldip=10.244.2.133 and newip=10.244.2.135
done 7a17b9c2f70a053f98fb480492a8e904d330f9ac 10.244.2.135
while
PING 10.244.1.90 (10.244.1.90) 56(84) bytes of data.
64 bytes from 10.244.1.90: icmp_seq=1 ttl=62 time=1.96 ms

--- 10.244.1.90 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 1.967/1.967/1.967/0.000 ms
oldip=10.244.1.88 and newip=10.244.1.90
done 12cf9455a800f940f427c745318d1300a6730103 10.244.1.90
while
PING 10.244.4.69 (10.244.4.69) 56(84) bytes of data.
64 bytes from 10.244.4.69: icmp_seq=1 ttl=64 time=0.106 ms

--- 10.244.4.69 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.106/0.106/0.106/0.000 ms
oldip=10.244.4.67 and newip=10.244.4.69
done d1e352d917cb9dfd5be2b053ef34473e11c7ea23 10.244.4.69
while
PING 10.244.2.136 (10.244.2.136) 56(84) bytes of data.
64 bytes from 10.244.2.136: icmp_seq=1 ttl=62 time=1.89 ms

--- 10.244.2.136 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 1.899/1.899/1.899/0.000 ms
oldip=10.244.2.134 and newip=10.244.2.136
done 283aa9be8d0d4b25bfb79cf0a7eb084284b4f44d 10.244.2.136
done nodes.conf
78:C 04 Jul 2019 08:54:28.879 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
78:C 04 Jul 2019 08:54:28.879 # Redis version=5.0.4, bits=64, commit=00000000, modified=0, pid=78, just started
78:C 04 Jul 2019 08:54:28.879 # Configuration loaded
78:M 04 Jul 2019 08:54:28.887 * Node configuration loaded, I'm 2d05fb2406a254f08664b5ff5d26a151b5b262cc
78:M 04 Jul 2019 08:54:28.888 * Running mode=cluster, port=6379.
78:M 04 Jul 2019 08:54:28.888 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
78:M 04 Jul 2019 08:54:28.888 # Server initialized
78:M 04 Jul 2019 08:54:28.888 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
78:M 04 Jul 2019 08:54:28.890 * Reading RDB preamble from AOF file...
78:M 04 Jul 2019 08:54:28.890 * Reading the remaining AOF tail...
78:M 04 Jul 2019 08:54:28.892 * DB loaded from append only file: 0.004 seconds
78:M 04 Jul 2019 08:54:28.892 * Ready to accept connections
78:M 04 Jul 2019 08:54:28.899 * Clear FAIL state for node 283aa9be8d0d4b25bfb79cf0a7eb084284b4f44d: replica is reachable again.
78:M 04 Jul 2019 08:54:32.367 * Clear FAIL state for node 1665fb20c43e7468c54bbdea7ed6e283283669df: replica is reachable again.
78:M 04 Jul 2019 08:54:32.462 * Clear FAIL state for node d1e352d917cb9dfd5be2b053ef34473e11c7ea23: replica is reachable again.
78:M 04 Jul 2019 08:54:33.316 * Replica 10.244.1.91:6379 asks for synchronization
78:M 04 Jul 2019 08:54:33.316 * Partial resynchronization not accepted: Replication ID mismatch (Replica asked for 'ce61d735f3070e58c8d1eef8d705416e1dec10d8', my replication IDs are 'b726060355dca8aa11c4bf4c47e29d4423347021' and '0000000000000000000000000000000000000000')
78:M 04 Jul 2019 08:54:33.316 * Starting BGSAVE for SYNC with target: disk
78:M 04 Jul 2019 08:54:33.318 * Background saving started by pid 82
82:C 04 Jul 2019 08:54:33.325 * DB saved on disk
82:C 04 Jul 2019 08:54:33.328 * RDB: 4 MB of memory used by copy-on-write
78:M 04 Jul 2019 08:54:33.372 * Background saving terminated with success
78:M 04 Jul 2019 08:54:33.376 * Synchronization with replica 10.244.1.91:6379 succeeded
78:M 04 Jul 2019 08:54:34.061 * Marking node 12cf9455a800f940f427c745318d1300a6730103 as failing (quorum reached).
78:M 04 Jul 2019 08:54:35.294 # Failover auth granted to d1e352d917cb9dfd5be2b053ef34473e11c7ea23 for epoch 47
78:M 04 Jul 2019 08:54:37.016 * Clear FAIL state for node 12cf9455a800f940f427c745318d1300a6730103: replica is reachable again.
78:M 04 Jul 2019 08:54:39.654 * Clear FAIL state for node 7a17b9c2f70a053f98fb480492a8e904d330f9ac: is reachable again and nobody is serving its slots after some time.
78:M 04 Jul 2019 08:54:40.364 # Cluster state changed: ok

[转]关于TCP打洞技术

[转自] https://blog.csdn.net/zhongguoren666/article/details/7489809

建立穿越NAT设备的p2p的 TCP 连接只比UDP复杂一点点,TCP协议的“打洞”从协议层来看是与UDP
的“打洞”过程非常相似的。尽管如此,基于TCP协议的打洞至今为止还没有被很好的理解,这也
造成了对其提供支持的NAT设备不是很多。

在NAT设备支持的前提下,基于TCP的“打洞”技术实际上与基于UDP的“打洞”技术一样快捷、可靠。实际上,只要NAT设备支持的话,基于TCP的p2p技术的健壮性将比基于UDP的技术的更强一些,因为TCP协议的状态机给出了一种标准的方法来精确的获取某个TCP session的生命期,而UDP协议则无法做到这一点。

1 套接字和TCP端口的重用

实现基于TCP协议的p2p“打洞”过程中,最主要的问题不是来自于TCP协议,而是来自于来自于应用程序的API接口。这是由于标准的伯克利(Berkeley)套接字的API是围绕着构建客户端/服务器程序而设计的,API允许TCP流套接字通过调用connect()函数来建立向外的连接,或者通过listen()和accept函数接受来自外部的连接。

但是,TCP协议并没有象UDP那样的“同一个端口既可以向外连接,又能够接受来自外部的连接”的API。而且更糟的是,TCP的套接字通常仅允许建立1对1的响应,即应用程序在将一个套接字绑定到本地的一个端口以后,任何试图将第二个套接字绑定到该端口的操作都会失败。

为了让TCP“打洞”能够顺利工作,我们需要使用一个本地的TCP端口来监听来自外部的TCP连接,同时建立多个向外的TCP连接。幸运的是,所有的主流操作系统都能够支持一个特殊的TCP套接字参数,通常叫做“SO_REUSEADDR”,该参数允许应用程序将多个套接字绑定到本地的一个endpoint(只要所有要绑定的套接字都设置了SO_REUSEADDR参数即可)。BSD系统引入了SO_REUSEPORT参数,该参数用于区分
端口重用还是地址重用,在这样的系统里面,上述所有的参数必须都设置才行。

2 打开p2p的TCP流

假定客户端A希望建立与B的TCP连接。我们像通常一样假定A和B已经与公网上的已知服务器S建立了TCP连接。服务器记录下来每个联入的客户端的公网和内网的endpoints,如同为UDP服务的时候一样从协议层来看,TCP“打洞”与UDP“打洞”是几乎完全相同的过程:

1)、 S启动两个网络侦听,一个叫【主连接】侦听,一个叫【协助打洞】的侦听。
2)、 A和B分别与S的【主连接】保持联系。

3)、当A需要和B建立直接的TCP连接时,首先连接S的【协助打洞】端口,并发送协助连接申请。同时在该端口号上启动侦听。注意由于要在相同的网络终端上绑定到不同的套接字上,所以必须为这些套接字设置 SO_REUSEADDR 属性(即允许重用),否则侦听会失败。

4)、 S的【协助打洞】连接收到A的申请后通过【主连接】通知B,并将A经过NAT-A转换后的公网IP地址和端口等信息告诉B。

5)、 B收到S的连接通知后首先与S的【协助打洞】端口连接,随便发送一些数据后立即断开,这样做的目的是让S能知道B经过NAT-B转换后的公网IP和端口号。

6)、 B尝试与A的经过NAT-A转换后的公网IP地址和端口进行connect,根据不同的路由器会有不同的结果,有些路由器在这个操作就能建立连接(例如我用的TPLink R402),大多数路由器对于不请自到的SYN请求包直接丢弃而导致connect失败,但NAT-A会纪录此次连接的源地址和端口号,为接下来真正的连接做好了准备,这就是所谓的打洞,即B向A打了一个洞,下次A就能直接连接到B刚才使用的端口号了。

7)、客户端B打洞的同时在相同的端口上启动侦听。B在一切准备就绪以后通过与S的【主连接】回复消息“我已经准备好”,S在收到以后将B经过NAT-B转换后的公网IP和端口号告诉给A。

8)、 A收到S回复的B的公网IP和端口号等信息以后,开始连接到B公网IP和端口号,由于在步骤6中B曾经尝试连接过A的公网IP地址和端口,NAT-A纪录了此次连接的信息,所以当A主动连接B时,NAT-B会认为是合法的SYN数据,并允许通过,从而直接的TCP连接建立起来了。

图7
与UDP不同的是,使用UDP协议的每个客户端只需要一个套接字即可完成与服务器S通信,
并同时与多个p2p客户端通信的任务;而TCP客户端必须处理多个套接字绑定到同一个本地
TCP端口的问题,如图7所示。

现在来看更加实际的一种情景:A与B分别位于不同的NAT设备后面。如同使用UDP协议进行“打洞”
操作遇到的问题一样,TCP的“打洞”操作也会遇到内网的IP与“伪”公网IP重复造成连接失败或者错误连接之类的问题。

客户端向彼此公网endpoint发起连接的操作,会使得各自的NAT设备打开新的“洞”以允许A与B的
TCP数据通过。如果NAT设备支持TCP“打洞”操作的话,一个在客户端之间的基于TCP协议的流
通道就会自动建立起来。如果A向B发送的第一个SYN包发到了B的NAT设备,而B在此前没有向
A发送SYN包,B的NAT设备会丢弃这个包,这会引起A的“连接失败”或“无法连接”问题。而此时,由于A已经向B发送过SYN包,B发往A的SYN包将被看作是由A发往B的包的回应的一部分,
所以B发往A的SYN包会顺利地通过A的NAT设备,到达A,从而建立起A与B的p2p连接。

3 从应用程序的角度来看TCP“打洞”

从应用程序的角度来看,在进行TCP“打洞”的时候都发生了什么呢?

假定A首先向B发出SYN包,该包发往B的公网endpoint,并且被B的NAT设备丢弃,但是B发往A的公网endpoint的SYN包则通过A的NAT到达了A,然后,会发生以下的两种结果中的一种,具体是哪一种取决于操作系统对TCP协议的实现:

(1)A的TCP实现会发现收到的SYN包就是其发起连接并希望联入的B的SYN包,通俗一点来说
就是“说曹操,曹操到”的意思,本来A要去找B,结果B自己找上门来了。A的TCP协议栈因此
会把B做为A向B发起连接connect的一部分,并认为连接已经成功。程序A调用的异步connect()
函数将成功返回,A的listen()等待从外部联入的函数将没有任何反映。此时,B联入A的操作
在A程序的内部被理解为A联入B连接成功,并且A开始使用这个连接与B开始p2p通信。

由于A收到的SYN包中不包含A需要的ACK数据,因此,A的TCP将用SYN-ACK包回应B的公网endpoint,
并且将使用先前A发向B的SYN包一样的序列号。一旦B的TCP收到由A发来的SYN-ACK包,则把自己
的ACK包发给A,然后两端建立起TCP连接。简单地说,第一种,就是即使A发往B的SYN包被B的NAT
丢弃了,但是由于B发往A的包到达了A。结果是,A认为自己连接成功了,B也认为自己连接成功
了,不管是谁成功了,总之连接是已经建立起来了。

(2)另外一种结果是,A的TCP实现没有像(1)中所讲的那么“智能”,它没有发现现在联入的B
就是自己希望联入的。就好比在机场接人,明明遇到了自己想要接的人却不认识,误认为是其它
的人,安排别人给接走了,后来才知道是自己错过了机会,但是无论如何,人已经接到了任务
已经完成了。然后,A通过常规的listen()函数和accept()函数得到与B的连接,而由A发起的向
B的公网endpoint的连接会以失败告终。尽管A向B的连接失败,A仍然得到了B发起的向A的连接,
等效于A与B之间已经联通,不管中间过程如何,A与B已经连接起来了,结果是A和B的基于TCP协议
的p2p连接已经建立起来了。

第一种结果适用于基于BSD的操作系统对于TCP的实现,而第二种结果更加普遍一些,多数linux和
windows系统都会按照第二种结果来处理。

代码:

// 服务器地址和端口号定义

define SRV_TCP_MAIN_PORT    4000  // 服务器主连接的端口号 define SRV_TCP_HOLE_PORT    8000  // 服务器响应客户端打洞申请的端口号

这两个端口是固定的,服务器S启动时就开始侦听这两个端口了。

//
// 将新客户端登录信息发送给所有已登录的客户端,但不发送给自己
//
BOOL SendNewUserLoginNotifyToAll (LPCTSTR lpszClientIP, UINT nClientPort, DWORD dwID)
{
ASSERT ( lpszClientIP && nClientPort > 0 );
g_CSFor_PtrAry_SockClient.Lock();
for ( int i=0; ipSockClient = (CSockClient)g_PtrAry_SockClient.GetAt(i);     if ( pSockClient && pSockClient->m_bMainConn && pSockClient->m_dwID > 0 && pSockClient->m_dwID != dwID )
{
if (!pSockClient->SendNewUserLoginNotify (lpszClientIP, nClientPort, dwID))
{
g_CSFor_PtrAry_SockClient.Unlock();
return FALSE;
}
}
}
g_CSFor_PtrAry_SockClient.Unlock ();
return TRUE;
}

当有新的客户端连接到服务器时,服务器负责将该客户端的信息(IP地址、端口号)发送给其他客户端。

//
// 执行者:客户端A
// 有新客户端B登录了,我(客户端A)连接服务器端口 SRV_TCP_HOLE_PORT ,申请与B建立直接的TCP连接
//
BOOL Handle_NewUserLogin ( CSocket &MainSock, t_NewUserLoginPkt *pNewUserLoginPkt )
{
printf ( “New user ( %s:%u:%u ) login server”, pNewUserLoginPkt->szClientIP,
pNewUserLoginPkt->nClientPort, pNewUserLoginPkt->dwID );
BOOL bRet = FALSE;
DWORD dwThreadID = 0;
t_ReqConnClientPkt ReqConnClientPkt;
CSocket Sock;
CString csSocketAddress;
  char    szRecvBuffer[NET_BUFFER_SIZE] = {0};
int     nRecvBytes = 0;
// 创建打洞Socket,连接服务器协助打洞的端口号 SRV_TCP_HOLE_PORT:
try
{
if ( !Sock.Socket () )
{
printf ( “Create socket failed : %s”, hwFormatMessage(GetLastError()) );
goto finished;
}
     UINT nOptValue = 1;
if ( !Sock.SetSockOpt ( SO_REUSEADDR, &nOptValue , sizeof(UINT) ) )
{
printf ( “SetSockOpt socket failed : %s”, hwFormatMessage(GetLastError()) );
goto finished;
}
if ( !Sock.Bind ( 0 ) )
{
  printf ( “Bind socket failed : %s”, hwFormatMessage(GetLastError()) );
goto finished;
}
if ( !Sock.Connect ( g_pServerAddess, SRV_TCP_HOLE_PORT ) )
{
printf ( “Connect to [%s:%d] failed : %s”, g_pServerAddess,
SRV_TCP_HOLE_PORT, hwFormatMessage(GetLastError()) );
goto finished;
}
}
catch ( CException e )
{
char szError[255] = {0};
e.GetErrorMessage( szError, sizeof(szError) );
printf ( “Exception occur, %s”, szError );
goto finished;
}
g_pSock_MakeHole = &Sock;
ASSERT ( g_nHolePort == 0 );
VERIFY ( Sock.GetSockName ( csSocketAddress, g_nHolePort ) );
// 创建一个线程来侦听端口 g_nHolePort 的连接请求
dwThreadID = 0;
g_hThread_Listen = ::CreateThread ( NULL, 0, ::ThreadProc_Listen, LPVOID(NULL), 0, &dwThreadID );
if (!HANDLE_IS_VALID(g_hThread_Listen) ) return FALSE;
Sleep ( 3000 );
// 我(客户端A)向服务器协助打洞的端口号 SRV_TCP_HOLE_PORT 发送申请,

    // 希望与新登录的客户端B建立连接
// 服务器会将我的打洞用的外部IP和端口号告诉客户端B:
   ASSERT ( g_WelcomePkt.dwID > 0 );
ReqConnClientPkt.dwInviterID = g_WelcomePkt.dwID;
ReqConnClientPkt.dwInvitedID = pNewUserLoginPkt->dwID;
if ( Sock.Send ( &ReqConnClientPkt, sizeof(t_ReqConnClientPkt) ) != sizeof(t_ReqConnClientPkt) )
goto finished;
// 等待服务器回应,将客户端B的外部IP地址和端口号告诉我(客户端A):
nRecvBytes = Sock.Receive ( szRecvBuffer, sizeof(szRecvBuffer) );
if ( nRecvBytes > 0 )
{
ASSERT ( nRecvBytes == sizeof(t_SrvReqDirectConnectPkt) );
    PACKET_TYPE pePacketType = (PACKET_TYPE)szRecvBuffer;
ASSERT ( pePacketType && pePacketType == PACKET_TYPE_TCP_DIRECT_CONNECT );     Sleep ( 1000 );     Handle_SrvReqDirectConnect ( (t_SrvReqDirectConnectPkt)szRecvBuffer );
printf ( “Handle_SrvReqDirectConnect end” );
}
// 对方断开连接了
else
{
goto finished;
}

bRet = TRUE;
finished:
g_pSock_MakeHole = NULL;
return bRet;
}

这里假设客户端A先启动,当客户端B启动后客户端A将收到服务器S的新客户端登录的通知,并得到客户端B的公网IP和端口,客户端A启动线程连接S的【协助打洞】端口(本地端口号可以用GetSocketName()函数取得,假设为M),请求S协助TCP打洞,然后启动线程侦听该本地端口(前面假设的M)上的连接请求,然后等待服务器的回应。

//
// 客户端A请求我(服务器)协助连接客户端B,这个包应该在打洞Socket中收到
//
BOOL CSockClient::Handle_ReqConnClientPkt(t_ReqConnClientPkt *pReqConnClientPkt)
{
ASSERT ( !m_bMainConn );
CSockClient *pSockClient_B = FindSocketClient ( pReqConnClientPkt->dwInvitedID );
   if ( !pSockClient_B ) return FALSE;
printf ( “%s:%u:%u invite %s:%u:%u connection”,

        m_csPeerAddress, m_nPeerPort, m_dwID,

pSockClient_B->m_csPeerAddress,

        pSockClient_B->m_nPeerPort,

        pSockClient_B->m_dwID );
// 客户端A想要和客户端B建立直接的TCP连接,服务器负责将A的外部IP和端口号告诉给B:
t_SrvReqMakeHolePkt SrvReqMakeHolePkt;
SrvReqMakeHolePkt.dwInviterID = pReqConnClientPkt->dwInviterID;
   SrvReqMakeHolePkt.dwInviterHoleID = m_dwID;
  SrvReqMakeHolePkt.dwInvitedID = pReqConnClientPkt->dwInvitedID;
STRNCPY_CS ( SrvReqMakeHolePkt.szClientHoleIP, m_csPeerAddress );
SrvReqMakeHolePkt.nClientHolePort = m_nPeerPort;
if ( pSockClient_B->SendChunk ( &SrvReqMakeHolePkt, sizeof(t_SrvReqMakeHolePkt), 0 ) != sizeof(t_SrvReqMakeHolePkt) )
return FALSE;
// 等待客户端B打洞完成,完成以后通知客户端A直接连接客户端外部IP和端口号
if ( !HANDLE_IS_VALID(m_hEvtWaitClientBHole) )
return FALSE;
if ( WaitForSingleObject ( m_hEvtWaitClientBHole, 6000*1000 ) == WAIT_OBJECT_0 )
{
if ( SendChunk (&m_SrvReqDirectConnectPkt, sizeof(t_SrvReqDirectConnectPkt), 0)
== sizeof(t_SrvReqDirectConnectPkt) )
return TRUE;
}
return FALSE;
}

服务器S收到客户端A的协助打洞请求后通知客户端B,要求客户端B向客户端A打洞,即让客户端B尝试与客户端A的公网IP和端口进行connect。

//
// 执行者:客户端B
// 处理服务器要我(客户端B)向另外一个客户端(A)打洞,打洞操作在线程中进行。
// 先连接服务器协助打洞的端口号 SRV_TCP_HOLE_PORT ,通过服务器告诉客户端A我(客户端B)的外部IP地址和端口号,然后启动线程进行打洞,
// 客户端A在收到这些信息以后会发起对我(客户端B)的外部IP地址和端口号的连接(这个连接在客户端B打洞完成以后进行,所以
// 客户端B的NAT不会丢弃这个SYN包,从而连接能建立)
//
BOOL Handle_SrvReqMakeHole(CSocket &MainSock, t_SrvReqMakeHolePkt *pSrvReqMakeHolePkt)
{
ASSERT ( pSrvReqMakeHolePkt );
// 创建Socket,连接服务器协助打洞的端口号 SRV_TCP_HOLE_PORT,连接建立以后发送一个断开连接的请求给服务器,然后连接断开
// 这里连接的目的是让服务器知道我(客户端B)的外部IP地址和端口号,以通知客户端A
CSocket Sock;
try
{
if ( !Sock.Create () )
{
printf ( “Create socket failed : %s”, hwFormatMessage(GetLastError()) );
return FALSE;
}
if ( !Sock.Connect ( g_pServerAddess, SRV_TCP_HOLE_PORT ) )
{
printf ( “Connect to [%s:%d] failed : %s”, g_pServerAddess,
SRV_TCP_HOLE_PORT, hwFormatMessage(GetLastError()) );
return FALSE;
}
}
catch ( CException e )
{
char szError[255] = {0};
     e.GetErrorMessage( szError, sizeof(szError) );
printf ( “Exception occur, %s”, szError );
return FALSE;
}
CString csSocketAddress;
ASSERT ( g_nHolePort == 0 );
VERIFY ( Sock.GetSockName ( csSocketAddress, g_nHolePort ) );
// 连接服务器协助打洞的端口号 SRV_TCP_HOLE_PORT,发送一个断开连接的请求,然后将连接断开,服务器在收到这个包的时候也会将
// 连接断开
t_ReqSrvDisconnectPkt ReqSrvDisconnectPkt;
ReqSrvDisconnectPkt.dwInviterID = pSrvReqMakeHolePkt->dwInvitedID;
ReqSrvDisconnectPkt.dwInviterHoleID = pSrvReqMakeHolePkt->dwInviterHoleID;
ReqSrvDisconnectPkt.dwInvitedID = pSrvReqMakeHolePkt->dwInvitedID;
ASSERT ( ReqSrvDisconnectPkt.dwInvitedID == g_WelcomePkt.dwID );
if ( Sock.Send ( &ReqSrvDisconnectPkt, sizeof(t_ReqSrvDisconnectPkt) ) != sizeof(t_ReqSrvDisconnectPkt) )
return FALSE;
Sleep ( 100 );
Sock.Close ();
// 创建一个线程来向客户端A的外部IP地址、端口号打洞
t_SrvReqMakeHolePkt *pSrvReqMakeHolePkt_New = new t_SrvReqMakeHolePkt;
if ( !pSrvReqMakeHolePkt_New ) return FALSE;
memcpy (pSrvReqMakeHolePkt_New, pSrvReqMakeHolePkt, sizeof(t_SrvReqMakeHolePkt));
DWORD dwThreadID = 0;
g_hThread_MakeHole = ::CreateThread ( NULL, 0, ::ThreadProc_MakeHole,
         LPVOID(pSrvReqMakeHolePkt_New), 0, &dwThreadID );
if (!HANDLE_IS_VALID(g_hThread_MakeHole) )

         return FALSE;
// 创建一个线程来侦听端口 g_nHolePort 的连接请求
dwThreadID = 0;
g_hThread_Listen = ::CreateThread ( NULL, 0, ::ThreadProc_Listen, LPVOID(NULL), 0, &dwThreadID );
if (!HANDLE_IS_VALID(g_hThread_Listen) )

         return FALSE;

     // 等待打洞和侦听完成
HANDLE hEvtAry[] = { g_hEvt_ListenFinished, g_hEvt_MakeHoleFinished };
if ( ::WaitForMultipleObjects ( LENGTH(hEvtAry), hEvtAry, TRUE, 30*1000 ) == WAIT_TIMEOUT )
return FALSE;
t_HoleListenReadyPkt HoleListenReadyPkt;
HoleListenReadyPkt.dwInvitedID = pSrvReqMakeHolePkt->dwInvitedID;
HoleListenReadyPkt.dwInviterHoleID = pSrvReqMakeHolePkt->dwInviterHoleID;
HoleListenReadyPkt.dwInvitedID = pSrvReqMakeHolePkt->dwInvitedID;
if ( MainSock.Send ( &HoleListenReadyPkt, sizeof(t_HoleListenReadyPkt) ) != sizeof(t_HoleListenReadyPkt) )
{
printf ( “Send HoleListenReadyPkt to %s:%u failed : %s”,
         g_WelcomePkt.szClientIP, g_WelcomePkt.nClientPort,
hwFormatMessage(GetLastError()) );
return FALSE;
}

return TRUE;
}

客户端B收到服务器S的打洞通知后,先连接S的【协助打洞】端口号(本地端口号可以用 GetSocketName()函数取得,假设为X),启动线程尝试连接客户端A的公网IP和端口号,根据路由器不同,连接情况各异,如果运气好直接连接就成功了,即使连接失败,但打洞便完成了。同时还要启动线程在相同的端口(即与S的【协助打洞】端口号建立连接的本地端口号X)上侦听到来的连接,等待客户端A直接连接该端口号。

//
// 执行者:客户端A
// 服务器要求主动端(客户端A)直接连接被动端(客户端B)的外部IP和端口号
//
BOOL Handle_SrvReqDirectConnect ( t_SrvReqDirectConnectPkt *pSrvReqDirectConnectPkt )
{
ASSERT ( pSrvReqDirectConnectPkt );
printf ( “You can connect direct to ( IP:%s PORT:%d ID:%u )”,

        pSrvReqDirectConnectPkt->szInvitedIP,
pSrvReqDirectConnectPkt->nInvitedPort, pSrvReqDirectConnectPkt->dwInvitedID );
// 直接与客户端B建立TCP连接,如果连接成功说明TCP打洞已经成功了。
CSocket Sock;
try
{
if ( !Sock.Socket () )
{
printf ( “Create socket failed : %s”, hwFormatMessage(GetLastError()) );
return FALSE;
}
UINT nOptValue = 1;
if ( !Sock.SetSockOpt ( SO_REUSEADDR, &nOptValue , sizeof(UINT) ) )
{
printf( “SetSockOpt socket failed : %s”, hwFormatMessage(GetLastError()));
return FALSE;
}
if ( !Sock.Bind ( g_nHolePort ) )
{
printf ( “Bind socket failed : %s”, hwFormatMessage(GetLastError()) );
return FALSE;
}
for ( int ii=0; ii<100; ii++ )     {        if ( WaitForSingleObject ( g_hEvt_ConnectOK, 0 ) == WAIT_OBJECT_0 )          break;       DWORD dwArg = 1;       if ( !Sock.IOCtl ( FIONBIO, &dwArg ) )       {         printf ( “IOCtl failed : %s”, hwFormatMessage(GetLastError()) );       }       if ( !Sock.Connect ( pSrvReqDirectConnectPkt->szInvitedIP, pSrvReqDirectConnectPkt->nInvitedPort ) )
{
printf ( “Connect to [%s:%d] failed : %s”,
pSrvReqDirectConnectPkt->szInvitedIP,
pSrvReqDirectConnectPkt->nInvitedPort,
hwFormatMessage(GetLastError()) );
Sleep (100);
}
else

                break;
}
if ( WaitForSingleObject ( g_hEvt_ConnectOK, 0 ) != WAIT_OBJECT_0 )
{
if ( HANDLE_IS_VALID ( g_hEvt_ConnectOK ) )

              SetEvent ( g_hEvt_ConnectOK );
printf ( “Connect to [%s:%d] successfully !!!”,
           pSrvReqDirectConnectPkt->szInvitedIP,

                pSrvReqDirectConnectPkt->nInvitedPort );

// 接收测试数据
printf ( “Receiving data …” );
char szRecvBuffer[NET_BUFFER_SIZE] = {0};
int nRecvBytes = 0;
    for ( int i=0; i<1000; i++ )       {         nRecvBytes = Sock.Receive ( szRecvBuffer, sizeof(szRecvBuffer) );         if ( nRecvBytes > 0 )
{
printf ( “–>>> Received Data : %s”, szRecvBuffer );
memset ( szRecvBuffer, 0, sizeof(szRecvBuffer) );
SLEEP_BREAK ( 1 );
}
else
{
SLEEP_BREAK ( 300 );
}
}
}
}
catch ( CException e )
{
    char szError[255] = {0};
e.GetErrorMessage( szError, sizeof(szError) );
printf ( “Exception occur, %s”, szError );
     return FALSE;
}
return TRUE;
}

在客户端B打洞和侦听准备好以后,服务器S回复客户端A,客户端A便直接与客户端B的公网IP和端口进行连接,收发数据可以正常进行,为了测试是否真正地直接TCP连接,在数据收发过程中可以将服务器S强行终止,看是否数据收发还正常进行着。