如果你想知道我和igorcoding是怎样在Tarantool内部建立一个系统的,请继续往下看。
如果你用过Mail.Ru电子邮件服务,你应该知道它可以从其他账号收集邮件。如果支持OAuth协议,那么在收集其他账号的邮件时,我们就不需要让用户提供第三方服务凭证了,而是用OAuth令牌来代替。此外,Mail.Ru Group有很多项目要求通过第三方服务授权,并且需要用户的OAuth令牌才能处理某些应用。因此,我们决定建立一个存储和更新令牌的服务。
OAuth令牌结构由以下3-4个字段组成:
Oauth结构代码
{ “token_type” : “bearer”, “access_token” : “XXXXXX”, “refresh_token” : “YYYYYY”, “expires_in” : 3600 }
- 访问令牌(access_token)——允许你执行动作、获取用户数据、下载用户的好友列表等等;
- 更新令牌(refresh_token)——让你重新获取新的access_token,不限次数;
- 过期时间(expires_in)——令牌到期时间戳或任何其他预定义时间,如果你的access_token到期了,你就不能继续访问所需的资源。
现在我们看一下服务的简单框架。设想有一些前端可以在我们的服务上写入和读出令牌,还有一个独立的更新器,一旦令牌到期,就可以通过更新器从OAuth服务提供商获取新的访问令牌。
如上图所示,数据库的结构也十分简单,由两个数据库节点(主和从)组成,为了说明两个数据库节点分别位于两个数据中心,二者之间由一条垂直的虚线隔开,其中一个数据中心包含主数据库节点及其前端和更新器,另一个数据中心包含从数据库节点及其前端,以及访问主数据库节点的更新器。
我们面临的主要问题在于令牌的使用期(一个小时)。详细了解这个项目之后,也许有人会问“在一小时内更新1000万条记录,这真的是高负载服务吗?如果我们用一个数除一下,结果大约是3000rps”。然而,如果因为数据库维护或故障,甚至服务器故障(一切皆有可能)导致一部分记录没有得到更新,那事情将会变得比较麻烦。比如,如果我们的服务(主数据库)因为某些原因持续中断15分钟,就会导致25%的服务中断(四分之一的令牌变成无效,不能再继续使用);如果服务中断30分钟,将会有一半的数据不能得到更新;如果中断1小时,那么所有的令牌都将失效。假设数据库瘫痪一个小时,我们重启系统,然后整个1000万条令牌都需要进行快速更新。这算不算高负载服务呢?
一开始一切都还进展地比较顺利,但是两年后,我们进行了逻辑扩展,增加了几个指标,并且开始执行一些辅助逻辑…….总之,Tarantool耗尽了CPU资源。尽管所有资源都是递耗资源,但这样的结果确实让我们大吃一惊。
幸运的是,系统管理员帮我们安装了当时库存中内存最大的CPU,解决了我们随后6个月的CPU需求。但这只是权宜之计,我们必须想出一个解决办法。当时,我们学习了一个新版的Tarantool(我们的系统是用Tarantool 1.5写的,这个版本除了在Mail.Ru Group,其他地方基本没用过)。Tarantool 1.6大力提倡主主备份,于是我们想:为什么不在连接主主备份的三个数据中心分别建立一个数据库备份呢?这听起来是个不错的计划。
三个主机、三个数据中心和三个更新器,都分别连接自己的主数据库。即使一个或者两个主机瘫痪了,系统仍然照常运行,对吧?那么这个方案的缺点是什么呢?缺点就是,我们将一个OAuth服务提供商的请求数量有效地增加到了三倍,也就是说,有多少个副本,我们就要更新几乎相同数量的令牌,这样不行。最直接的解决办法就是,想办法让各个节点自己决定谁是leader,那样就只需要更新存储在leader上的节点了。
选择leader节点的算法有很多,其中有一个算法叫Paxos,相当复杂,不知道怎样简化,于是我们决定用Raft代替。Raft是一个非常通俗易懂的算法,谁能通信就选谁做leader,一旦通信连接失败或者其他因素,就重新选leader。具体实施办法如下:
Tarantool外部既没有Raft也没有Paxos,但是我们可以使用net.box内置模式,让所有节点连接成一个网状网(即每一个节点连接剩下所有节点),然后直接在这些连接上用Raft算法选出leader节点。最后,所有节点要么成为leader节点,要么成为follower节点,或者二者都不是。
如果你觉得Raft算法实施起来有困难,下面的Lua代码可以帮到你:
Lua代码
local r = self.pool.call(self.FUNC.request_vote, self.term, self.uuid) self._vote_count = self:count_votes(r) if self._vote_count > self._nodes_count / 2 then log.info(“[raft-srv] node %d won elections”, self.id) self:_set_state(self.S.LEADER) self:_set_leader({ id=self.id, uuid=self.uuid }) self._vote_count = 0 self:stop_election_timer() self:start_heartbeater() else log.info(“[raft-srv] node %d lost elections”, self.id) self:_set_state(self.S.IDLE) self:_set_leader(msgpack.NULL) self._vote_count = 0 self:start_election_timer() end
现在我们给远程服务器发送请求(其他Tarantool副本)并计算来自每一个节点的票数,如果我们有一个quorum,我们就选定了一个leader,然后发送heartbeats,告诉其他节点我们还活着。如果我们在选举中失败了,我们可以发起另一场选举,一段时间之后,我们又可以投票或被选为leader。
只要我们有一个quorum,选中一个leader,我们就可以将更新器指派给所有节点,但是只准它们为leader服务。
这样我们就规范了流量,由于任务是由单一的节点派出,因此每一个更新器获得大约三分之一的任务,有了这样的设置,我们可以失去任何一台主机,因为如果某台主机出故障了,我们可以发起另一个选举,更新器也可以切换到另一个节点。然而,和其他分布式系统一样,有好几个问题与quorum有关。
如果各个数据中心之间失去联系了,那么我们需要有一些适当的机制去维持整个系统正常运转,还需要有一套机制能恢复系统的完整性。Raft成功地做到了这两点:
假设Dataline数据中心掉线了,那么该位置的节点就变成了“废弃”节点,也就是说该节点就看不到其他节点了,集群中的其他节点可以看到这个节点丢失了,于是引发了另一个选举,然后新的集群节点(即上级节点)被选为leader,整个系统仍然保持运转,因为各个节点之间仍然保持一致性(大半部分节点仍然互相可见)。
那么问题来了,与丢失的数据中心有关的更新器怎么样了呢?Raft说明书没有给这样的节点一个单独的名字,通常,没有quorum的节点和不能与leader联系的节点会被闲置下来。然而,它可以自己建立网络连接然后更新令牌,一般来说,令牌都是在连接模式时更新,但是,也许用一个连接“废弃”节点的更新器也可以更新令牌。一开始我们并不确定这样做有意义,这样不会导致冗余更新吗?
这个问题我们需要在实施系统的过程中搞清楚。我们的第一个想法是不更新:我们有一致性、有quorum,丢失任何一个成员,我们都不应该更新。但是后来我们有了另一个想法,我们看一下Tarantool中的主主备份,假设有两个主节点和一个变量(key)X=1,我们同时在每一个节点上给这个变量赋一个新值,一个赋值为2,另一个赋值为3,然后,两个节点互相交换备份日志(就是X变量的值)。在一致性上,这样实施主主备份是很糟糕的(无意冒犯Tarantool开发者)。
如果我们需要严格的一致性,这样是行不通的。然而,回忆一下我们的OAuth令牌是由以下两个重要因素组成:
- 更新令牌,本质上永久有效;
- 访问令牌,有效期为一个小时;
我们的更新器有一个refresh函数,可以从一个更新令牌获取任意数量的访问令牌,一旦发布,它们都将保持一个小时内有效。
我们考虑一下以下场景:两个follower节点正在和一个leader节点交互,它们更新自己的令牌,接收第一个访问令牌,这个访问令牌被复制,于是现在每一个节点都有这个访问令牌,然后,连接中断了,所以,其中一个follower节点变成了“废弃”节点,它没有quorum,既看不到leader也看不到其他follower,然而,我们允许我们的更新器去更新位于“废弃”节点上的令牌,如果“废弃”节点没有连接网络,那么整个方案都将停止运行。尽管如此,如果发生简单的网络拆分,更新器还是可以维持正常运行。
一旦网络拆分结束,“废弃”节点重新加入集群,就会引发另一场选举或者数据交换。注意,第二和第三个令牌一样,也是“好的”。
原始的集群成员恢复之后,下一次更新将只在一个节点上发生,然后备份。换句话来说,当集群拆分之后,被拆分的各个部分各自独立更新,但是一旦重新整合,数据一致性也因此恢复。通常,需要N/2+1个活动节点(对于一个3节点集群,就是需要2个活动节点)去保持集群正常运转。尽管如此,对我们而言,即使只有1个活动节点也足够了,它会发送尽可能多的外部请求。
重申一下,我们已经讨论了请求数量逐渐增加的情况,在网络拆分或节点中断时期,我们能够提供一个单一的活动节点,我们会像平时一样更新这个节点,如果出现绝对拆分(即当一个集群被分成最大数量的节点,每一个节点有一个网络连接),如上所述,OAuth服务提供商的请求数量将提升至三倍。但是,由于这个事件发生的时间相对短暂,所以情况不是太糟,我们可不希望一直工作在拆分模式。通常情况下,系统处于有quorum和网络连接,并且所有节点都启动运行的状态。
还有一个问题没有解决:我们已经达到了CPU上限,最直接的解决办法就是分片。
假设我们有两个数据库分片,每一个都有备份,有一个这样的函数,给定一些key值,就可以计算出哪一个分片上有所需要的数据。如果我们通过电子邮件分片,一部分地址存储在一个分片上,另一部分地址存储在另一个分片上,我们很清楚我们的数据在哪里。
有两种方法可以分片。一种是客户端分片,我们选择一个返回分片数量的连续的分片函数,比如CRC32、Guava或Sumbur,这个函数在所有客户端的实现方式都一样。这种方法的一个明显优势在于数据库对分片一无所知,你的数据库正常运转,然后分片就发生了。
然而,这种方法也存在一个很严重的缺陷。一开始,客户端非常繁忙。如果你想要一个新的分片,你需要把分片逻辑加进客户端,这里的最大的问题是,可能一些客户端在使用这种模式,而另一些客户端却在使用另一种完全不同的模式,而数据库本身却不知道有两种不同的分片模式。
我们选择另一种方法—数据库内部分片,这种情况下,数据库代码变得更加复杂,但是为了折中我们可以使用简单的客户端,每一个连接数据库的客户端被路由到任意节点,由一个特殊函数计算出哪一个节点应该被连接、哪一个节点应该被控制。前面提到,由于数据库变得更加复杂,因此为了折中,客户端就变得更加简单了,但是这样的话,数据库就要对其数据全权负责。此外,最困难的事就是重新分片,如果你有一大堆客户端无法更新,相比之下,如果数据库负责管理自己的数据,那重新分片就会变得非常简单。
具体怎样实施呢?
六边形代表Tarantool实体,有3个节点组成分片1,另一个3节点集群作为分片2,如果我们将所有节点互相连接,结果会怎样呢?根据Raft,我们可以知道每一个集群的状态,谁是leader服务器谁是follower服务器也一目了然,由于是集群内连接,我们还可以知道其他分片(例如它的leader分片或者follower分片)的状态。总的来说,如果访问第一个分片的用户发现这并不是他需要的分片,我们很清楚地知道应该指导他往哪里走。
我们来看一些简单的例子。
假设用户向驻留在第一个分片上的key发出请求,该请求被第一个分片上的某一个节点接收,这个节点知道谁是leader,于是将请求重新路由到分片leader,反过来,分片leader对这个key进行读或写,并且将结果反馈给用户。
第二个场景:用户的请求到达第一个分片中的相同节点,但是被请求的key却在第二个分片上,这种情况也可以用类似的方法处理,第一个分片知道第二个分片上谁是leader,然后把请求送到第二个分片的leader进行转发和处理,再将结果返回给用户。
这个方案十分简单,但也存在一定的缺陷,其中最大的问题就是连接数,在二分片的例子中,每一个节点连接到其他剩下的节点,连接数是6*5=30,如果再加一个3节点分片,那么连接数就增加到72,这会不会有点多呢?
我们该如何解决这个问题呢?我们只需要增加一些Tarantool实例,我们叫它代理,而不叫分片或数据库,用代理去解决所有的分片问题:包括计算key值和定位分片领导。另一方面,Raft集群保持自包含,只在分片内部工作。当用户访问代理时,代理计算出所需要的分片,如果需要的是leader,就对用户作相应的重定向,如果不是leader,就将用户重定向至分片内的任意节点。
由此产生的复杂性是线性的,取决于节点数量。现在一共3个节点,每个节点3个分片,连接数少了几倍。
代理方案的设计考虑到了进一步规模扩展(当分片数量大于2时),当只有2个分片时,连接数不变,但是当分片数量增加时,连接数会剧减。分片列表存储在Lua配置文件中,所以,如果想要获取新列表,我们只需要重载代码就好了。
综上所述,首先,我们进行主主备份,应用Raft算法,然后加入分片和代理,最后我们得到的是一个单块,一个集群,所以说,目前这个方案看上去是比较简单的。
剩下的就是只读或只写令牌的的前端了,我们有更新器可以更新令牌,获得更新令牌后把它传到OAuth服务提供商,然后写一个新的访问令牌。
前面说过我们的一些辅助逻辑耗尽了CPU资源,现在我们将这些辅助资源移到另一个集群上。
辅助逻辑主要和地址簿有关,给定一个用户令牌,就会有一个对应的地址簿,地址簿上的数据量和令牌一样,为了不耗尽一台机器上的CPU资源,我们显然需要一个与副本相同的集群,只需要加一堆更新地址簿的更新器就可以了(这个任务比较少见,因此地址簿不会和令牌一起更新)。
最后,通过整合这两个集群,我们得到一个相对简单的完整结构:
为什么我们本可以使用标准队列却还要用自己的队列呢?这和我们的令牌更新模型有关。令牌一旦发布,有效期就是一个小时,当令牌快要到期时,需要进行更新,而令牌更新必须在某个特定的时间点之前完成。
假设系统中断了,但是我们有一堆已到期的令牌,而在我们更新这些令牌的同时,又有其他令牌陆续到期,虽然我们最后肯定能全部更新完,但是如果我们先更新那些即将到期的(60秒内),再用剩下的资源去更新已经到期的,是不是会更合理一些?(优先级别最低的是还有4-5分钟才到期的令牌)
用第三方软件来实现这个逻辑并不是件容易的事,然而,对于Tarantool来说却不费吹灰之力。看一个简单的方案:在Tarantool中有一个存储数据的元组,这个元组的一些ID设置了基础key值,为了得到我们需要的队列,我们只需要添加两个字段:status(队列令牌状态)和time(到期时间或其他预定义时间)。
现在我们考虑一下队列的两个主要功能—put和take。put就是写入新数据。给定一些负载,put时自己设置好status和time,然后写数据,这就是建立一个新的元组。
至于take,是指建立一个基于索引的迭代器,挑出那些等待解决的任务(处于就绪状态的任务),然后核查一下是不是该接收这些任务了,或者这些任务是否已经到期了。如果没有任务,take就切换到wait模式。除了内置Lua,Tarantool还有一些所谓的通道,这些通道本质上是互联光纤同步原语。任何光纤都可以建立一个通道然后说“我在这等着”,剩下的其他光纤可以唤醒这个通道然后给它发送信息。
等待中的函数(等待发布任务、等待指定时间或其他)建立一个通道,给通道贴上适当的标签,将通道放置在某个地方,然后进行监听。如果我们收到一个紧急的更新令牌,put会给通道发出通知,然后take接收更新任务。
Tarantool有一个特殊的功能:如果一个令牌被意外发布,或者一个更新令牌被take接收,或者只是出现接收任务的现象,以上三种情况Tarantool都可以跟踪到客户端中断。我们将每一个连接与指定给该连接的任务联系起来,并将这些映射关系保持在会话保存中。假设由于网络中断导致更新过程失败,而且我们不知道这个令牌是否会被更新并被写回到数据库。于是,客户端发生中断了,搜索与失败过程相关的所有任务的会话保存,然后自动将它们释放。随后,任意已发布的任务都可以用同一个通道给另一个put发送信息,该put会快速接收和执行任务。
实际上,具体实施方案并不需要太多代码:
Put和take代码
function put(data) local t = box.space.queue:auto_increment({ ‘r’, -- [[ status ]] util.time(), -- [[ time ]] data -- [[ any payload ]] }) return t end function take(timeout) local start_time = util.time() local q_ind = box.space.tokens.index.queue local _,t while true do local it = util.iter(q_ind, {‘r’}, {iterator = box.index.GE}) _,t = it() if t and t[F.tokens.status] ~= ‘t’ then break end local left = (start_time + timeout) — util.time() if left <= 0 then return end t = q:wait(left) if t then break end end t = q:taken(t) return t end function queue:taken(task) local sid = box.session.id() if self._consumers[sid] == nil then self._consumers[sid] = {} end local k = task[self.f_id] local t = self:set_status(k, ‘t’) self._consumers[sid][k] = {util.time(), box.session.peer(sid), t} self._taken[k] = sid return t end function on_disconnect() local sid = box.session.id local now = util.time() if self._consumers[sid] then local consumers = self._consumers[sid] for k, rec in pairs(consumers) do time, peer, task = unpack(rec) local v = box.space[self.space].index[self.index_primary]:get({k}) if v and v[self.f_status] == ‘t’ then v = self:release(v[self.f_id]) end end self._consumers[sid] = nil end end
Put只是接收用户想要插入队列的所有数据,并将其写入某个空间,如果是一个简单的索引式FIFO队列,设置好状态和当前时间,然后返回该任务。
接下来要和take有点关系了,但仍然比较简单。我们建立一个迭代器,等待接收新任务。Taken函数只需要将任务标记成“已接收”,但有一点很重要,taken函数还能记住哪个任务是由哪个进程接收的。On_disconnect函数可以发布某个特定连接,或者发布由某个特定用户接收的所有任务。
首先,我们解决了连接中断的问题,这个问题十分常见,使用上述的系统让我们摆脱了这个困扰。
分片帮助我们扩展内存,然后,我们将连接数从二次方减少到了线性,优化了业务任务的队列逻辑:如果发生延期,更新我们所能更新的一切令牌,这些延期并非都是我们的故障引起的,有可能是Google、Microsoft或者其他服务端对OAuth服务提供商进行改造,然后导致我们这边出现大量的未更新的令牌。
去数据库内部运算吧,走近数据,你将拥有便利、高效、可扩展和灵活的运算体验!
评论