本篇主要内容为分布式系统
在单机程序并发或并行修改全局变量时,需要对修改行为加锁以创造临界区。
在分布式场景下,我们也需要这种“抢占”的逻辑,我们可以使用Redis提供的setnx
命令
package main
import (
"fmt"
"sync"
"time"
"github.com/go-redis/redis"
)
func incr() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
var lockKey = "counter_lock"
var counterKey = "counter"
// lock
resp := client.SetNX(lockKey, 1, time.Second*5)
lockSuccess, err := resp.Result()
if err != nil || !lockSuccess {
fmt.Println(err, "lock result: ", lockSuccess)
return
}
// counter ++
getResp := client.Get(counterKey)
cntValue, err := getResp.Int64()
if err == nil || err == redis.Nil {
cntValue++
resp := client.Set(counterKey, cntValue, 0)
_, err := resp.Result()
if err != nil {
// log err
println("set value error!")
}
}
println("current counter is ", cntValue)
delResp := client.Del(lockKey)
unlockSuccess, err := delResp.Result()
if err == nil && unlockSuccess > 0 {
println("unlock success!")
} else {
println("unlock failed", err)
}
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
incr()
}()
}
wg.Wait()
}
通过代码和执行结果可以看到,我们远程调用setnx
运行流程上和单机的trylock非常相似,如果获取锁失败,那么相关的任务逻辑就不应该继续向前执行。
setnx
很适合在高并发场景下,用来争抢一些“唯一”的资源。比如交易撮合系统中卖家发起订单,而多个买家会对其进行并发争抢。这种场景我们没有办法依赖具体的时间来判断先后,因为不管是用户设备的时间,还是分布式场景下的各台机器的时间,都是没有办法在合并后保证正确的时序的。哪怕是我们同一个机房的集群,不同的机器的系统时间可能也会有细微的差别。
ZooKeeper是Hadoop Ecosystem中非常重要的组件,它的主要功能是为分布式系统提供一致性协调(Coordination)服务,与之对应的Google的类似服务叫Chubby。 分布式环境中大多数服务是允许部分失败,也允许数据不一致,但有些最基础的服务是需要高可靠性,高一致性的,这些服务是其他分布式服务运转的基础,比如naming service、分布式lock等,这些分布式的基础服务有以下要求:
package main
import (
"time"
"github.com/samuel/go-zookeeper/zk"
)
func main() {
c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second) //*10)
if err != nil {
panic(err)
}
l := zk.NewLock(c, "/lock", zk.WorldACL(zk.PermAll))
err = l.Lock()
if err != nil {
panic(err)
}
println("lock succ, do your business logic")
time.Sleep(time.Second * 10)
// do some thing
l.Unlock()
println("unlock succ, finish business logic")
}
基于ZooKeeper的锁与基于Redis的锁的不同之处在于Lock成功之前会一直阻塞,这与我们单机场景中的mutex.Lock
很相似。
其原理也是基于临时Sequence节点和watch API,例如我们这里使用的是/lock
节点。Lock会在该节点下的节点列表中插入自己的值,只要节点下的子节点发生变化,就会通知所有watch该节点的程序。这时候程序会检查当前节点下最小的子节点的id是否与自己的一致。如果一致,说明加锁成功了。
这种分布式的阻塞锁比较适合分布式任务调度场景,但不适合高频次持锁时间短的抢锁场景。按照Google的Chubby论文里的阐述,基于强一致协议的锁适用于粗粒度
的加锁操作。这里的粗粒度指锁占用时间较长。我们在使用时也应思考在自己的业务场景中使用是否合适。
Zookeeper的保证
l 顺序性,client的updates请求都会根据它发出的顺序被顺序的处理;
l 原子性, 一个update操作要么成功要么失败,没有其他可能的结果;
l 一致的镜像,client不论连接到哪个server,展示给它都是同一个视图;
l 可靠性,一旦一个update被应用就被持久化了,除非另一个update请求更新了当前值
l 实时性,对于每个client它的系统视图都是最新的
Zookeeper中的角色
领导者(Leader) : 领导者不接受client的请求,负责进行投票的发起和决议,最终更新状态。
跟随者(Follower): Follower用于接收客户请求并返回客户结果。参与Leader发起的投票。
观察者(observer): Oberserver可以接收客户端连接,将写请求转发给leader节点。但是Observer不参加投票过程,只是同步leader的状态。Observer为系统扩展提供了一种方法。
学习者 ( Learner ) : 和leader进行状态同步的server统称Learner,上述Follower和Observer都是Learner。
ZooKeeper集群
通常Zookeeper由2n+1台servers组成,每个server都知道彼此的存在。每个server都维护的内存状态镜像以及持久化存储的事务日志和快照。对于2n+1台server,只要有n+1台(大多数)server可用,整个系统保持可用。
系统启动时,集群中的server会选举出一台server为Leader,其它的就作为follower(这里先不考虑 observer角色)。接着由follower来服务client的请求,对于不改变系统一致性状态的读操作,由follower的本地内存数据库直接 给client返回结果;对于会改变系统状态的更新操作,则交由Leader进行提议投票,超过半数通过后返回结果给client。
Zookeeper的核心是原子广播,这个机制保证了各个server之间的同步。实现这个机制的协议叫做Zab协议。Zab协议有两 种模式,它们分别是恢复模式和广播模式。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数server的完成了和 leader的状态同步以后,恢复模式就结束了。状态同步保证了leader和server具有相同的系统状态。
一旦leader已经和多数的follower进行了状态同步后,他就可以开始广播消息了,即进入广播状态。这时候当一个server 加入zookeeper服务中,它会在恢复模式下启动,发现leader,并和leader进行状态同步。待到同步结束,它也参与消息广播。 Zookeeper服务一直维持在Broadcast状态,直到leader崩溃了或者leader失去了大部分的followers支持
Broadcast模式极其类似于分布式事务中的2pc(two-phrase commit 两阶段提交):即leader提起一个决议,由followers进行投票,leader对投票结果进行计算决定是否通过该决议,如果通过执行该决议(事务),否则什么也不做。
广播模式需要保证proposal被按顺序处理,因此zk采用了递增的事务id号(zxid)来保证。所有的提议(proposal) 都在被提出的时候加上了zxid。实现中zxid是一个64为的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被 选出来,它都会有一个新的epoch。低32位是个递增计数。
当leader崩溃或者leader失去大多数的follower,这时候zk进入恢复模式,恢复模式需要重新选举出一个新的leader,让所有的server都恢复到一个正确的状态。
首先看一下选举的过程,zk的实现中用了基于paxos算法(主要是fastpaxos)的实现。具体如下:
1.每个Server启动以后都询问其它的Server它要投票给谁。
2.对于其他server的询问,server每次根据自己的状态都回复自己推荐的leader的id和上一次处理事务的zxid(系统启动时每个server都会推荐自己)
3.收到所有Server回复以后,就计算出zxid最大的哪个Server,并将这个Server相关信息设置成下一次要投票的Server。
4.计算这过程中获得票数最多的sever为获胜者,如果获胜者的票数超过半数,则改server被选为leader。否则,继续这个过程,直到leader被选举出来。
此外恢复模式下,如果是重新刚从崩溃状态恢复的或者刚启动的的server还会从磁盘快照中恢复数据和会话信息。(zk会记录事务日志并定期进行快照,方便在恢复时进行状态恢复)
选完leader以后,zk就进入状态同步过程。
1.leader就会开始等待server连接
2.Follower连接leader,将最大的zxid发送给leader
3.Leader根据follower的zxid确定同步点
4.完成同步后通知follower 已经成为uptodate状态
5.Follower收到uptodate消息后,又可以重新接受client的请求进行服务了。
主线程的工作:
QuorumPeer线程
QuorumPeer有几种状态
当一个Server发现选举的结果自己是Leader把自己的状态改成Leading,如果Server推荐了其他人为Server它 将自己的状态改成Following。做Leader的server如果发现拥有的follower少于半数时,它重新进入looking状态,重新进行 leader选举过程。(Observing状态是根据配置设置的)
Leader主线程
1.首先leader开始恢复数据和清除session
启动zk实例,建立请求处理链(Leader的请求处理 链):PrepRequestProcessor->ProposalRequestProcessor->CommitProcessor->Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor
2.得到一个新的epoch,标识一个新的leader , 并获得最大zxid(方便进行数据同步)
3.建立一个学习者接受线程(来接受新的followers的连接,follower连接后确定followers的zxvid号,来确定是需要对follower进行什么同步措施,比如是差异同步(diff),还是截断(truncate)同步,还是快照同步)
4.向follower建立一个握手过程leader->follower NEWLEADER消息,并等待直到多数server发送了ack
5.Leader不断的查看已经同步了的follower数量,如果同步数量少于半数,则回到looking状态重新进行leaderElection过程,否则继续step5.
Follower的工作流程
1.启动zk实例,建立请求处理链:FollowerRequestProcessor->CommitProcessor->FinalProcessor
2.follower首先会连接leader,并将zxid和id发给leader
3.接收NEWLEADER消息,完成握手过程。
4.同leader进行状态同步
5.完成同步后,follower可以接收client的连接
6.接收到client的请求,根据请求类型
l 对于写操作, FollowerRequestProcessor会将该操作作为LEADER.REQEST发给LEADER由LEADER发起投票。
l 对于读操作,则通过请求处理链的最后一环FinalProcessor将结果返回给客户端
对于observer的流程不再赘述,observer流程和Follower的唯一不同的地方就是observer不会参加leader发起的投票。
learnerCnxAcceptor线程
1.该线程监听Learner的连接
2.接受Learner请求,并为每个Learner创建一个LearnerHandler来服务
LearnerHandler线程的服务流程
1.检查server来的第一个包是否为follower.info或者observer.info,如果不是则无法建立握手。
2.得到Learner的zxvid,对比自身的zxvid,确定同步点
3.和Learner建立第二次握手,向Learner发送NEWLEADER消息
4.与server进行数据同步。
5.同步结束,知会server同步已经ok,可以接收client的请求。
6.不断读取follower消息判断消息类型
i. 如果是LEADER.ACK,记录follower的ack消息,超过半数ack,将proposal提交(Commit)
ii. 如果是LEADER.PING,则维持session(延长session失效时间)
iii. 如果是LEADER.REQEST,则将request放入请求链进行处理–Leader写请求发起proposal,然后根据follower回复的结 果来确定是否commit的。最后由FinallRequestProcessor来实际进行持久化,并回复信息给相应的response给server
为了提高吞吐量通常我们只要增加服务器到Zookeeper集群中。但是当服务器增加到一定程度,会导致投票的压力增大从而使得吞吐量降低。因此我们引出了一个角色:Observer。
Observers 的需求源于 ZooKeeper follower服务器在上述工作流程中实际扮演了两个角色。它们从客户端接受连接与操作请求,之后对操作结果进行投票。这两个职能在 ZooKeeper集群扩展的时候彼此制约。如果我们希望增加 ZooKeeper 集群服务的客户数量(我们经常考虑到有上万个客户端的情况),那么我们必须增加服务器的数量,来支持这么多的客户端。然而,从一致性协议的描述可以看到, 增加服务器的数量增加了对协议的投票部分的压力。领导节点必须等待集群中过半数的服务器响应投票。于是,节点的增加使得部分计算机运行较慢,从而拖慢整个 投票过程的可能性也随之提高,投票操作的会随之下降。这正是我们在实际操作中看到的问题——随着 ZooKeeper 集群变大,投票操作的吞吐量会下降。
所以需要增加客户节点数量的期望和我们希望保持较好吞吐性能的期望间进行权衡。要打破这一耦合关系,引入了不参与投票的服务器,称为 Observers。 Observers 可以接受客户端的连接,将写请求转发给领导节点。但是,领导节点不会要求 Observers 参加投票。相反,Observers 不参与投票过程,仅仅和其他服务节点一起得到投票结果。
这个简单的扩展给 ZooKeeper 的可伸缩性带来了全新的镜像。我们现在可以加入很多 Observers 节点,而无须担心严重影响写吞吐量。规模伸缩并非无懈可击——协议中的一歩(通知阶段)仍然与服务器的数量呈线性关系。但是,这里的穿行开销非常低。因此 可以认为在通知服务器阶段的开销无法成为主要瓶颈。
此外Observer还可以成为特定场景下,广域网部署的一种方案。原因有三点:1.为了获得更好的读性能,需要让客户端足够近,但如 果将投票服务器分布在两个数据中心,投票的延迟太大会大幅降低吞吐,是不可取的。因此希望能够不影响投票过程,将投票服务器放在同一个IDC进行部 署,Observer可以跨IDC部署。2. 投票过程中,Observer和leader之间的消息、要远小于投票服务器和server的消息,这样远程部署对带宽要求就较小。3.由于 Observers即使失效也不会影响到投票集群,这样如果数据中心间链路发生故障,不会影响到服务本身的可用性。这种故障的发生概率要远高于一个数据中 心中机架间的连接的故障概率,所以不依赖于这种链路是个优点。
Session 是指 Client 与 ZooKeeper Server 间的会话。应用进程启动时,会与 ZooKeeper Server 建立一个 TCP 长连接,同时创建并关联到一个 Session,从连接建立开始,客户端会话的生命周期也开始。客户端能够通过心跳检测与 Server 保持有效的会话,通过为 Session 设置 TTL 来判断 Session 心跳是否超时,如果心跳检测超时,需要重新创建 Session。
当且仅当 Client 和 Server 发生网络分区时才会发生 Session 超时行为:
etcd是分布式系统中,功能上与ZooKeeper类似的组件,这两年越来越火了。上面基于ZooKeeper我们实现了分布式阻塞锁,基于etcd,也可以实现类似的功能
package main
import (
"log"
"github.com/zieckey/etcdsync"
)
func main() {
m, err := etcdsync.New("/lock", 10, []string{"http://127.0.0.1:2379"})
if m == nil || err != nil {
log.Printf("etcdsync.New failed")
return
}
err = m.Lock()
if err != nil {
log.Printf("etcdsync.Lock failed")
return
}
log.Printf("etcdsync.Lock OK")
log.Printf("Get the lock. Do something here.")
err = m.Unlock()
if err != nil {
log.Printf("etcdsync.Unlock failed")
} else {
log.Printf("etcdsync.Unlock OK")
}
}
etcd中没有像ZooKeeper那样的Sequence节点。所以其锁实现和基于ZooKeeper实现的有所不同。在上述示例代码中使用的etcdsync的Lock流程是:
/lock
路径下是否有值,如果有值,说明锁已经被别人抢了/lock
下的事件,此时陷入阻塞/lock
路径下发生事件时,当前进程被唤醒。检查发生的事件是否是删除事件(说明锁被持有者主动unlock),或者过期事件(说明锁过期失效)。如果是的话,那么回到 1,走抢锁流程。值得一提的是,在etcdv3的API中官方已经提供了可以直接使用的锁API
业务还在单机就可以搞定的量级时,那么按照需求使用任意的单机锁方案就可以。
如果发展到了分布式服务阶段,但业务规模不大,qps很小的情况下,使用哪种锁方案都差不多。如果公司内已有可以使用的ZooKeeper、etcd或者Redis集群,那么就尽量在不引入新的技术栈的情况下满足业务需求。
业务发展到一定量级的话,就需要从多方面来考虑了。首先是你的锁是否在任何恶劣的条件下都不允许数据丢失,如果不允许,那么就不要使用Redis的setnx
的简单锁。
对锁数据的可靠性要求极高的话,那只能使用etcd或者ZooKeeper这种通过一致性协议保证数据可靠性的锁方案。但可靠的背面往往都是较低的吞吐量和较高的延迟。需要根据业务的量级对其进行压力测试,以确保分布式锁所使用的etcd或ZooKeeper集群可以承受得住实际的业务请求压力。需要注意的是,etcd和Zookeeper集群是没有办法通过增加节点来提高其性能的。要对其进行横向扩展,只能增加搭建多个集群来支持更多的请求。这会进一步提高对运维和监控的要求。多个集群可能需要引入proxy,没有proxy那就需要业务去根据某个业务id来做分片。如果业务已经上线的情况下做扩展,还要考虑数据的动态迁移。这些都不是容易的事情。
在选择具体的方案时,还是需要多加思考,对风险早做预估。
Consul 是 HashiCorp 公司推出的开源工具,用于实现分布式系统的服务发现与配置。 Consul 是分布式的、高可用的、可横向扩展的。它具备以下特性 : service discovery:consul 通过 DNS 或者 HTTP 接口使服务注册和服务发现变的很容易,一些外部服务,例如 saas 提供的也可以一样注册。 health checking:健康检测使 consul 可以快速的告警在集群中的操作。和服务发现的集成,可以防止服务转发到故障的服务上面。 key/value storage:一个用来存储动态配置的系统。提供简单的 HTTP 接口,可以在任何地方操作。 multi-datacenter:无需复杂的配置,即可支持任意数量的区域。
Consul 用 Golang 实现,因此具有天然可移植性 (支持 Linux、windows 和 macOS)。安装包仅包含一个可执行文件。Consul 安装非常简单,只需要下载对应系统的软件包并解压后就可使用
# 这里以 Linux系统为例:
$ wget https://releases.hashicorp.com/consul/1.2.0/consul_1.2.0_linux_amd64.zip
$ unzip consul_1.2.0_linux_amd64.zip
$ mv consul /usr/local/bin/ 12345
# 至于版本可以换成所需的版本,也可以直接去github下载直接解压
核心特性:
服务发现:可以方便的实现服务注册,通过 DNS 或者 HTTP 应用程序可以很容易的找到他所依赖的服务.
Key/Value 存储:使用 Key/Value 进行数据存储。
多数据中心:Consul 支持开箱即用的多数据中心。这意味着用户不需要担心建立额外的抽象层让业务扩展到多个区域
健康检查:可以对指定服务进行健康检查例如,ResponseStatus 是否为 200,避免将流量转发到不健康的服务上。
服务发现
consul 可以用来实现分布式系统的服务发现与配置。client 把服务请求传递给 server,server 负责提供服务以及和其他数据中心交互。问题是,既然 server 端提供了所有服务,那为何还需要多此一举地用 client 端来接收一次服务请求。我想,采用这种架构有以下几种理由:首先 server 端的网络连接资源有限。对于一个分布式系统,一般情况下访问量是很大的。如果用户能不通过 client 直接地访问数据中心,那么数据中心必然要为每个用户提供一个单独的连接资源 (线程,端口号等等),那么 server 端的负担会非常大。所以很有必要用大量的 client 端来分散用户的连接请求,在 client 端先统一整合用户的服务请求,然后一次性地通过一个单一的链接发送大量的请求给 server 端,能够大量减少 server 端的网络负担。其次,在 client 端可以对用户的请求进行一些处理来提高服务的效率,比如将相同的请求合并成同一个查询,再比如将之前的查询通过 cookie 的形式缓存下来。但是这些功能都需要消耗不少的计算和存储资源。如果在 server 端提供这些功能,必然加重 server 端的负担,使得 server 端更加不稳定。而通过 client 端来进行这些服务就没有这些问题了,因为 client 端不提供实际服务,有很充足的计算资源来进行这些处理这些工作。最后还有一点,consul 规定只要接入一个 client 就能将自己注册到一个服务网络当中。这种架构使得系统的可扩展性非常的强,网络的拓扑变化可以特别的灵活。这也是依赖于 client—server 结构的。如果系统中只有几个数据中心存在,那网络的扩张也无从谈起了
consul的node模块
其实次模块只是 Consul-client,如果使用则不需要编写 Consul 的对应 client 配置文件,也可以不使用直接再配置文件中编写。
// consul.js
const Consul = require('consul');
class ConsulConfig {
constructor () {
const serviceName = 'consul-demo';
this.consul = new Consul({
host: '192.168.6.128',
port: 8500,
promisify: true,
});
this.consul.agent.service.register({
name: serviceName,
address: '192.168.20.193',
port: 3000,
check: {
http: 'http://192.168.20.193:3000/health',
interval: '10s',
timeout: '5s',
}
}, function(err, result) {
if (err) {
console.error(err);
throw err;
}
console.log(serviceName + ' 注册成功!');
})
}
async getConfig(key) {
const result = await this.consul.kv.get(key);
if (!result) {
return Promise.reject(key + '不存在');
}
return JSON.parse(result.Value);
}
async getUserConfig(key) {
const result = await this.getConfig('develop/user');
if (!key) {
return result;
}
return result[key];
}
async setUserConfig(key, val) {
const user = await this.getConfig('develop/user');
user[key] = val;
return this.consul.kv.set('develop/user', JSON.stringify(user))
}
}
module.exports = ConsulConfig;
熔断器
https://github.com/Netflix/Hystrix
分布式存储系统
https://github.com/seaweedfs/seaweedfs
https://jimmysong.io/kubernetes-handbook/practice/rook.html
Rook 将分布式存储软件转变为自我管理,自我缩放和自我修复的存储服务。它通过自动化部署,引导、配置、供应、扩展、升级、迁移、灾难恢复、监控和资源管理来实现。 Rook 使用基础的云原生容器管理、调度和编排平台提供的功能来履行其职责
Rook 利用扩展点深入融入云原生环境,为调度、生命周期管理、资源管理、安全性、监控和用户体验提供无缝体验。
使用helm部署
helm init -i jimmysong/kubernetes-helm-tiller:v2.8.1
helm repo add rook-alpha https://charts.rook.io/alpha
helm install rook-alpha/rook --name rook --namespace rook-system