Hydra:用于强一致性分布式应用的无序列化网络排序
摘要
许多分布式系统(例如状态机复制和分布式数据库)依赖于在系统中一组节点之间建立一致的操作顺序。传统上,这种顺序通常通过应用层协议(例如 Paxos 或两阶段锁定)来实现。然而,最近的研究表明,将排序作为一种网络服务可以显著提高性能,但当前的网络排序实现需要将所有请求通过单个排序器(sequencer)路由,这导致了可扩展性、容错性和负载均衡方面的限制。
我们的工作 Hydra 通过使用分布式网络排序器集合来提供网络排序,从而克服了这些限制。Hydra 利用了网络排序器之间的松散同步时钟来建立跨排序器的消息顺序,利用每个排序器的序列号来检测消息丢失,并通过周期性的时间戳消息来在某些排序器空闲时强制系统进展。
为了展示 Hydra 的优势,我们基于 Hydra 的网络原语共同设计了一个状态机复制协议和一个分布式事务系统。与基于序列化的网络排序系统相比,Hydra 在这两种应用中都展示了与传统方法相同的性能提升,但在可扩展性、更短的排序器故障切换时间以及更好的网络级负载均衡方面表现出显著优势。
1 引言
在数据中心应用中,复制技术几乎无处不在。共识协议(例如 Paxos、Viewstamped Replication 和 Raft)被广泛用于维护数据的多个副本,从而提供一个单一正确服务的假象,即使单个副本发生故障并恢复时,服务仍然可用。然而,这些协议会带来显著的延迟和吞吐量开销。
最近的一些研究表明,利用网络内处理可以缓解这些开销 [42, 43, 56]。这种网络排序(network sequencing)方法通过排序器(sequencer)对请求进行路由——排序器通常在可编程交换机或中间盒上实现,给每个请求分配一个单调递增的序列号。通过预先为所有请求建立全序关系,这些方法使更轻量级的共识协议成为可能,从而最终带来显著的性能提升。例如,Network-Ordered Paxos 的吞吐量达到非复制、非容错系统的 98%,而其延迟仅高出 10% [43]。
然而,在实践中采用这种方法并不容易。其根本的难点在于,网络排序需要串行化:所有复制服务的流量必须通过单个排序器。这在生产网络中引发了三个主要挑战。首先,单一排序器必须处理所有请求流量,构成了扩展性的瓶颈。其次,它对特定应用流量引入了新的路由需求,而网络运营商通常不愿接受这些需求。限制路径多样性会干扰现有的负载均衡和容错策略,这些策略经过精心设计以实现网络性能优化。最后,它导致了网络和应用级恢复之间的不良耦合。替换一个失效或无法访问的排序器需要同时协调网络路由的更新和排序器状态的恢复(通过共识协议)。这增加了部署的复杂性,并延长了恢复过程中的系统停机时间。基于我们在大规模生产数据中心中的经验,这三个问题都构成了实际应用的严重障碍。
本文提出了一个问题:是否可以在不依赖串行化的情况下实现网络排序?我们对此给出了肯定的回答,并提出了 Hydra 的设计,这是一种新的网络排序协议,允许多个活动排序器对数据包进行排序。Hydra 的排序器运行一个轻量级的协调协议,每个排序器独立地为请求分配序列号,这些序列号可以合并以建立全局操作顺序。具体来说,Hydra 结合了每个排序器的序列号和排序器之间松散同步的物理时钟,从而在分配全局顺序的同时有效地检测丢失的消息。
Hydra 是一个实用的协议;我们开发了一个运行于终端主机的软件实现,以及一个运行于 Intel Tofino 可编程交换机的 P4 [11] 实现。后者仅使用了交换机资源的一小部分,证明了其在现代网络设备上的可行性。Hydra 的排序功能使得现有的 NOPaxos [43] 和 Eris [42] 复制与事务处理协议可以以最小的修改运行,同时提高了这些协议对排序器故障的容忍能力,且性能代价极低。
我们的评估结果表明,与基于原子多播(atomic multicast)的基线相比,Hydra 的吞吐量提高了 378%,延迟减少了 42%,同时能够扩展到大量的接收者、多播组和排序器。与使用网络串行化方法的系统相比,Hydra 显著改善了网络级负载均衡,并将系统停机时间减少了 5 倍。此外,Hydra 在实现这些优势的同时没有牺牲性能:基于 Hydra 的状态机复制系统的延迟仅为 5 μs,吞吐量接近 NOPaxos 的 83%;而基于 Hydra 的事务系统的吞吐量比 Eris 高 47%。
2 背景
在许多分布式系统中,建立一致的操作顺序是基础性的需求:
状态机复制 [43, 52, 53, 61] 要求所有正确的副本执行一组全序的客户端操作;
分布式事务系统 [5, 14, 17, 19, 26, 39] 要求数据存储的所有分片以可串行化的顺序处理事务;
分布式缓存 [50, 55] 需要一致的更新以保证缓存一致性。
传统上,为了保证强一致性,系统需要运行复杂的应用层分布式协议,这通常需要服务器之间的协调。例如,许多状态机复制协议 [52, 53, 61] 指定一个单一的领导者来为操作分配顺序,并要求领导者在向客户端返回结果之前与副本通信。现有的分布式数据库还必须为每个客户端事务执行并发控制、原子提交和共识协议。这种昂贵的协调与现代数据中心应用对高吞吐量、低延迟和高扩展性的要求不相符。
2.1 网络中的请求排序
这些协议的需求源于一个基本假设:完全异步的网络可能会随意丢弃、重新排序或延迟消息。一系列经典的分布式计算研究提出了更强的通信原语以简化分布式应用,包括虚拟同步 [8, 9]、原子广播 [10, 34] 和原子多播 [28]。这些原语提供了确保所有正确接收者以相同顺序交付相同消息集的广播或多播操作。这些保证可以省去共识协议的需求,但实现这些原语本身是一个等价于共识的问题 [13],因此应用无法享受性能上的提升。
没有可靠性保证的网络排序.最近的一些研究 [42, 43, 56] 提出了新的网络模型,在保证和实现效率之间取得平衡。这种新模型将一致的消息排序责任转移到网络中,但将消息可靠传递的责任保留给应用层协议。通过在网络中提供排序保证,这种网络/协议协同设计方法使得复制协议的速度比传统设计更快;而通过不强制可靠性,该网络模型的实现足够简单且高效。
这些系统实现网络排序的一个关键机制是 网络内串行化。例如,Speculative Paxos [56] 首先将所有客户端请求路由到网络中的指定交换机,然后再进行多播至副本服务器。该单一交换机作为串行化点,确保高概率地使所有副本以相同顺序接收客户端请求。NOPaxos [43] 扩展了这种串行化方法,利用可编程交换机 ASIC 提供请求排序的保证。指定的交换机为每个客户端请求打上序列号,接收方通过仅按序列号顺序处理请求来确保一致的排序。此外,序列号使副本能够通过检测序列号的间隙来识别丢失的消息。
Eris [42] 进一步推广了排序方法,支持发送到多个复制组的请求(例如,用于实现容错分布式事务)。排序器交换机为每个复制组维护一个计数器,并在每个客户端请求上,原子性地增加所有目标组的计数器值。这些计数器向量确保所有多组操作的一致顺序,同时仍允许接收方独立检测丢失的消息。
2.2 网络内串行化的局限性
现有研究 [42, 43, 56] 展示了网络排序方法在性能上的优势以及其在多种分布式系统中的适用性。然而,现有网络排序解决方案采用的网络内串行化方法存在以下重要局限性:
扩展性瓶颈
串行化方法的关键要求是,所有客户端请求都必须经过单个排序器设备。这种设备可能成为性能瓶颈。虽然基于交换机的排序器能够维持比实际执行操作的基于服务器的副本更高的排序速率,但对于像 Eris [42] 这样单个排序器服务于多个副本组的分片数据库系统来说,排序器容量仍可能成为扩展性的限制。此外,如果排序器是在终端主机上实现的(对于许多部署来说可能更实际),那么基于 CPU 的数据包处理性能较差,这与系统的水平扩展能力相矛盾。
系统停机时间延长
作为所有客户端请求的串行化点,排序器的故障将导致整个分布式系统不可用。排序器的故障转移比传统的恢复(例如,在 Paxos 部署中更换领导者)更加复杂,因为它将网络重路由与应用级恢复耦合在一起。为了恢复操作,网络控制平面必须首先检测故障并在整个网络范围内进行路由更改以将客户端流量重定向到新的排序器,然后启动视图更改过程以确保系统状态一致;只有完成这些步骤,副本才能处理来自新排序器的请求。在大型数据中心网络中进行路由更改代价高昂:以往研究 [38, 42, 43] 显示,仅更新单个交换机中的转发表就可能需要超过 200 毫秒。在完成这一冗长的路由更改过程之前,系统将保持不可用状态。
数据中心网络属性恶化
数据中心网络经过精心设计,以提供高可靠性、高性能和高成本效率 [2, 27, 49]。通过向网络中添加冗余路径并使用诸如 ECMP(等价多路径)之类的协议,这些网络能够有效地负载均衡流量、容忍链路和交换机故障,并维持高的双向带宽。然而,将流量通过单一交换机串行化会减少可用路径数量,并可能轻易破坏这些理想的网络属性。例如,这可能导致排序器交换机处的链路拥塞。
与多流水线交换机不兼容
许多交换机 ASIC(专用集成电路)通过使用多个(例如 2-8 个 [18])独立的流水线扩展处理能力,这些流水线之间几乎没有共享资源。然而,现有的排序方法要求更新并原子性地读取序列号的单一副本。这种需求限制了将网络排序逻辑部署到单个交换机流水线 [36]。这不仅将网络排序器的最大吞吐量限制为交换机容量的一小部分,还复杂化了布线和路由,因为每个流水线都绑定到特定的物理端口。
3 使用多个排序器进行排序
Hydra 允许多个活动排序器同时工作,防止单个排序器成为扩展性瓶颈或单点故障。这使得 Hydra 能够支持新的网络排序器部署模型。
3.1 部署选项
Hydra 支持一系列部署模型:
根交换机.早期研究设想使用树形拓扑中位于根部的可编程交换机作为排序器,利用其在数据中心网络中的中心地位。这样的交换机可以处理高负载请求,使得单一交换机容量之外的扩展性问题变得不再迫切。然而,这些交换机通过使用多个 ASIC 和转发流水线来实现高性能,而之前的排序器设计无法支持这种架构。
Hydra 不仅可以通过解耦排序器故障转移与底层网络的重路由延迟(详见第 7.5 节)来提高可用性,还能通过使用多个 Hydra 排序器 而不是将所有排序流量路由到一个交换机,提供路径多样性。这种方法允许更好的链路级负载均衡,并增强了对链路故障的弹性(详见第 7.4 节)。
机架顶(ToR)交换机 许多现有的数据中心架构无法在网络核心部署可编程交换机:它们在根层使用大型的多 ASIC 机箱式交换机 [15, 27],而在这种配置下无法使用可编程交换机。例如,基于 Tofino 的交换机仅支持较小的 32/64 端口配置。在许多场景下,使用机架顶(ToR)交换机作为排序器因此成为更实际的选择。然而,在这种部署中,扩展性和容错能力是需要重点解决的问题:ToR 交换机故障更为常见 [25],并且其上行链路经常出现拥塞问题。Hydra 可以通过采用多个排序器的方式来避免这两个问题(详见第 7.4 节)。
排序器设备 我们的经验表明,在现有交换机(无论是 ToR 还是其他类型)中逐步部署新功能可能是一项挑战:协调与现有交换机功能的更新以及验证自定义数据平面的正确性都面临困难。一种有吸引力的替代方法是采用一组交换机作为专用的“排序器设备”,将其作为边缘设备附加到网络中,而不是成为网络结构的一部分 [57],类似于其他网络功能加速器的建议 [37, 64]。同样,单个排序器的容错性以及其网络链路的拥塞问题(可能无法利用交换机的全部带宽)是主要关注点,而 Hydra 可以有效缓解这些问题。
终端主机 最后一种方法是放弃专用硬件,而使用终端主机作为排序器 [43]。这种方法提供了显而易见的部署优势,对于许多环境来说,这可能是唯一可行的方法。然而,在这种情况下,扩展性和容错能力仍然是关键问题:Eris 的终端主机排序器仅能勉强应对一个拥有 15 个分片的数据库负载 [42],因此这种方法仅适用于较小的部署。Hydra 的多排序器方法突破了这一限制,为无法使用专用硬件的环境提供了一种实用且可扩展的解决方案(详见第 7.1.3 节)。
3.2 地址与路由
无论采用何种部署选项,Hydra 都能轻松与现有的数据中心路由结构集成。每个 Hydra 部署都有一个唯一的 IP 地址。部署中的每个排序器通过 BGP 任播(anycast)公布其 IP 地址,使得当排序器加入或离开部署时,路由能够动态更新。消息通过传统的最短路径路由和负载均衡技术(如 ECMP)被路由到单个排序器。或者,在基于 SDN 的设计中,集中式控制器可以为排序器组安装合适的任播路由。
除了这些路由更改之外,Hydra 不需要对网络中的其他元素(除了排序器本身)进行任何更改。这是一个关键的设计约束,也是 Hydra 与其他排序方法(如 1Pipe [41])的区别所在。1Pipe 在每个交换机之间交换时间戳,而其他补充技术(如 RDMA)需要复杂的网络内流量控制 [29]。
4 Hydra: 无串行化的网络排序
4.1 高级抽象
Hydra 提供的核心抽象是一个组通信协议。一个 Hydra 部署由接收者组(receiver groups)组成,每个组包含一个或多个接收者。Hydra 提供了一种组播(groupcast)原语,发送方可以指定一个或多个组作为目标,消息会被多播到目标组中的接收者。Hydra 的组播原语为参与者提供以下属性:
部分排序 Hydra 组播消息是部分排序的(部分顺序关系记为 ≺)——所有目标组重叠的组播消息是可比较的。如果组播消息 m1 被排序在 m2 之前(即 m1≺m2),且一个接收者收到了 m1 和 m2,那么所有接收者都会在 m2 之前交付 m1。
不可靠的传递 Hydra 仅提供尽力而为(best effort)的消息传递。组播消息不保证交付到任何目标接收者。
丢失检测 如果组播消息未能传递到其所有接收者,原语会通过交付一个 DROP-NOTIFICATION 通知剩余的接收者。更正式地说,设 R 为消息 m 的接收者组集合,那么以下两个条件之一将成立:
R中的所有接收者组要么交付 m 或 m 的 DROP-NOTIFICATION,
要么 R 中没有任何接收者组交付 m 或 m 的 DROP-NOTIFICATION。
这些保证与 NOPaxos 和 Eris [42, 43] 中的网络抽象提供的保证一致。然而,Hydra 在扩展性、快速故障恢复以及排序器之间的负载均衡方面实现了突破,而这些是之前设计的短板。
4.2 先前方法:集中式排序器
近期的一些研究工作 [6, 7, 42, 43, 63] 提议使用网络中的专用设备(例如可编程交换机、网络处理器或终端主机服务器)作为集中式排序器来建立消息顺序。特别是,Eris [42] 构建了一种多序列化的组播(groupcast)原语,该原语提供了与我们在 §4.1 中规定的相同保证。
为了实现多序列化组播,集中式排序器为系统中的每个组维护一个序列号。组播消息的发送方会将所有接收组编码在一个特殊的数据包头中,数据包首先被路由到排序器。当排序器接收到组播数据包时,会原子性地为每个接收组的序列号递增,并将一个多重标记(multi-stamp)写入数据包。多重标记包含一组 ⟨group-id, sequence-num⟩,每个接收组对应一个条目。然后,组播数据包被转发到每个接收组中的每个接收者。
组播接收者跟踪它们从排序器期望的下一个序列号。当接收者接收到组播数据包时,它会检查多重标记中与其组 ID 对应的序列号。如果序列号低于预期值(表明消息乱序或重复),接收者会拒绝该数据包,并向应用程序发送一个 DROP-NOTIFICATION。如果序列号高于预期值,接收者也会发送一个 DROP-NOTIFICATION。
多序列化提供了 §4.1 中的三项属性:
原子性地递增序列号:如果两个组播消息的接收组有重叠,排序器确保这些组中的所有接收者以一致的顺序交付这两个消息。
按组维护序列号:从排序器到接收者的任何数据包丢失都会导致接收到的序列号中出现间隙,因此触发 DROP-NOTIFICATION。
丢包通知:确保应用层可感知丢失消息。
然而,使用集中式排序器也引入了之前描述的种种限制(如扩展性和容错能力不足)。
Algorithm 1 SequencerHandlePacket(pkt)
id: sequencer ID
N: total number of Hydra groups
clk: switch physical clock
seq[N]: sequence number for each group
1: pkt.id←id
2: pkt.c←clk
3: for grp in pkt.grps do
4: pkt.seq[grp] ← ++seq[grp]
5: endfor
6: Forward pkt
4.2 先前方法:集中式排序器
近期的一些研究工作 [6, 7, 42, 43, 63] 提议使用网络中的专用设备(例如可编程交换机、网络处理器或终端主机服务器)作为集中式排序器来建立消息顺序。特别是,Eris [42] 构建了一种多序列化的组播(groupcast)原语,该原语提供了与我们在 §4.1 中规定的相同保证。
为了实现多序列化组播,集中式排序器为系统中的每个组维护一个序列号。组播消息的发送方会将所有接收组编码在一个特殊的数据包头中,数据包首先被路由到排序器。当排序器接收到组播数据包时,会原子性地为每个接收组的序列号递增,并将一个多重标记(multi-stamp)写入数据包。多重标记包含一组 ⟨group-id, sequence-num⟩,每个接收组对应一个条目。然后,组播数据包被转发到每个接收组中的每个接收者。
组播接收者跟踪它们从排序器期望的下一个序列号。当接收者接收到组播数据包时,它会检查多重标记中与其组 ID 对应的序列号。如果序列号低于预期值(表明消息乱序或重复),接收者会拒绝该数据包,并向应用程序发送一个 DROP-NOTIFICATION。如果序列号高于预期值,接收者也会发送一个 DROP-NOTIFICATION。
多序列化提供了 §4.1 中的三项属性:
原子性地递增序列号:如果两个组播消息的接收组有重叠,排序器确保这些组中的所有接收者以一致的顺序交付这两个消息。
按组维护序列号:从排序器到接收者的任何数据包丢失都会导致接收到的序列号中出现间隙,因此触发 DROP-NOTIFICATION。
丢包通知:确保应用层可感知丢失消息。
然而,使用集中式排序器也引入了之前描述的种种限制(如扩展性和容错能力不足)。
4.3 使用多个排序器实现一致性排序
将多序列化组播直接应用于多排序器部署会违反 §4.1 中列出的保证。假设每个排序器独立地为每个接收组维护序列号,并且组播消息可以被转发到任意排序器。考虑两个组播消息 m1 和 m2,它们都发送到组 G1,但分别通过两个不同的排序器路由。由于排序器独立维护序列号,这两个排序器可能会为 m1 和 m2 写入相同的序列号。当 G1 中的接收者收到 m1 和 m2 时,无法在保持丢包检测的同时一致地排序消息。例如,通过使用排序器 ID 来打破排序的平局,虽然可以在接收者间保持一致性,但只接收到 m1 和 m2 中较大值的接收者将无法推断较小消息的存在。
为了在支持多个排序器的同时实现 §4.1 中的所有保证,我们提出了一种新方法:结合排序器之间松散同步的物理时钟和每个排序器的序列号来建立一致的排序和丢包检测,同时通过周期性刷新消息(flush messages)来确保接收者的进度。
4.3.1 使用物理时钟进行消息排序
Hydra 使用序列号和物理时钟的组合对消息进行排序。具体来说,每个 Hydra 排序器都拥有一个严格单调递增的本地物理时钟;每个排序器还为每个接收组维护一个序列号。排序器之间的物理时钟是松散同步的,但这对安全性来说不是必须的;时钟偏移(skew)只会减缓进度,而不会破坏安全性。Hydra 的安全性仅依赖于物理时钟不会倒退。这一要求已经是常见的实践:现有的时钟同步协议(如 NTP)确保时钟只能向前移动 [51]。
每条 Hydra 组播消息在被转发到目标组中的所有接收者之前,会先路由到一个排序器。当排序器接收到组播消息时,除了为每个接收组递增序列号并插入一个多重标记(multi-stamp)外,还会将当前的时钟值写入数据包(算法 1 的第 2-5 行)。需要注意的是,读取时钟值和递增序列号必须在一个原子块中完成。物理时钟的严格单调性以及上述的原子性要求确保以下条件成立:对于两个具有重叠接收组 gg 的组播消息 m1 和 m2,如果它们由同一排序器排序,则 s1≠s2∧(s1<s2⇔c1<c2),其中 s1 和 s2 是为组 g分配的序列号,c1 和 c2 是为组 g 分配的时钟值。
通过将时钟值插入到每条组播消息中,Hydra 以如下方式定义组播消息的部分排序(≺):对于具有重叠接收组和时钟值 c1 和 c2c2 的组播消息 m1 和 m2,如果它们由排序器 ID 分别为 i 和 j 的排序器排序,则 m1≺m2 当且仅当 c1<c2∨(c1=c2∧i<j)。使用排序器 ID 来打破相同时钟值之间的平局是为了保证 §4.1 中的部分排序属性。
Hydra 组播接收者根据上述部分排序将组播消息交付给用户。如果接收者在收到 m1 之前收到了 m2,且 m1≺m2,则接收者必须在交付 m2 之前交付 m1 的 DROP-NOTIFICATION,或者将 m2 添加到缓冲区中直到收到 m1。然而,仅基于物理时钟的交付不足以检测消息丢失。
4.3.2 结合物理时钟和多重标记实现丢包检测
为消息附加序列号提供了一种有用的属性:通过观察序列号中的间隙,可以检测丢失的消息。然而,当使用物理时钟对消息排序时,这一属性会丧失——接收者在看到一个时钟值为 c 的消息时,无法判断是否错过了任何时钟值为 c′<c 的消息。为了实现丢包检测,Hydra 结合了物理时钟值和来自多个排序器的序列号。Hydra 接收者会将收到的消息缓存在本地,并按照时钟值顺序进行交付,但只有在确定(基于序列号)不会再收到来自其他排序器的更低时钟值的消息后,才会交付这些消息。
具体来说,每个 Hydra 接收者为每个排序器 ii 维护两个值(算法 2):从排序器 i 接收到的最大组序列号 s[i],以及从排序器 i 接收到的消息中的最大时钟值 c[i]。设 cmin 为所有 (c[i],i) 元组中按 ≺ 顺序的最小值。Hydra 接收者根据以下规则交付消息:
按照时钟值和排序器 ID 顺序交付待处理的组播消息(第 10 行)。
对于每个序列号,每个排序器仅交付一次消息或 DROP-NOTIFICATION(第 1 行和第 6 行)。
仅交付满足 m⪯cmin 的组播消息 m(第 11 行)。
当收到来自排序器 ii 的组播消息 m 且其序列号为 s 时,如果 s>s[i]+1,则为每个从 s[i]+1 到 s−1 的消息生成 DROP-NOTIFICATION(第 5 行)。
从 §4.3.1 的讨论中可知,规则 (1) 和 (2) 确保了 Hydra 组播的部分排序属性。为了说明规则 (3) 和 (4) 如何实现丢包检测,我们利用一个关键的不变性:对于组 gg 中的接收者 rr,以及任何以 gg 作为接收组的组播消息 mm,如果 m⪯cmin,则 rr 要么已经接收到 m,要么已经接收到另一条带有来自同一排序器的更高序列号的组播消息 m′。基于这一不变性,Hydra 组播的丢包检测属性得以保证,因为 r 要么交付 m(如果接收到 m,则满足规则 (1) 和 (3)),要么交付 m 的 DROP-NOTIFICATION(如果接收到 m′,则满足规则 (4))。作为优化,Hydra 接收者可以延迟交付 DROP-NOTIFICATION,直到需要生成消息以推进 cmin。由于 Hydra 能够处理消息乱序,这种优化不会影响接收者协议的正确性。
4.3.3 使用刷新消息确保进度
到目前为止,Hydra 的组播设计已经实现了 §4.1 中列出的所有属性,但仍然存在一个问题:为了让接收者在交付消息上取得进展,它需要接收来自所有排序器的组播消息,以推进 cmin。例如,如果接收者已经收到消息 m≻cmin,为了交付 m,接收者需要收到来自其他排序器的消息以推进 cmin。因此,任何一个排序器长时间保持空闲都会阻碍系统中所有组播接收者的进度。
为了解决这一问题,每个排序器会定期向所有组播接收者发送一个刷新消息(flush message),该消息包含排序器的当前时钟值以及它发送给每个接收组的最新序列号(但不递增)。当接收者从排序器 i 收到刷新消息时,会按照相同的过程(§4.3.2)推进 c[i]、cmin 和 s[i]。应用程序对此类刷新消息不可见。然而,接收者仍会使用刷新消息中的序列号,根据规则 (4) 生成 DROP-NOTIFICATION(算法 2 第 5 行)。同样,Hydra 接收者可以将 DROP-NOTIFICATION 的交付延迟到需要时再执行,以实现优化。
上述协议保证了,在没有故障的情况下(关于故障处理将在后文讨论),所有接收到的组播消息最终都会被交付。不同排序器的时钟偏差可能会延迟消息的交付(最多延迟等于时钟偏移量),因为每个接收者的 cmin 取决于时钟值最慢的排序器,但不会违反任何安全属性。
图 1 显示了从单个接收者视角出发,接收来自两个排序器的组播消息和刷新消息的示例执行过程。在执行的每个步骤中,接收者接收一条传入的消息,交付组播消息和 DROP-NOTIFICATION,并根据 §4.3.2 中定义的规则将待处理的组播消息保存在其本地缓冲区中。
4.4 处理排序器故障
如果一个排序器发生故障,或排序器与某些接收组之间的链路出现故障,一些(或全部)组播接收者将停止交付消息,因为它们无法从故障排序器接收消息,也无法推进 cmincmin。我们通过一个重新配置协议来解决这个问题。
具体而言,每个 Hydra 部署使用一个集中式、容错的配置服务来管理一系列配置。每个配置指定排序器和组播接收者的集合(这里仅讨论配置中排序器的变化)。组播接收者也在本地存储当前配置。当接收者怀疑当前配置 nn 中的排序器 jj 发生故障时(例如,在超时时间内未收到来自排序器 jj 的消息),它会通知配置服务。配置服务创建一个新配置 n+1n+1,将排序器 jj 移除,并将新配置发送给所有组播接收者。
当组播接收者接收到新配置时,它们运行一个一致性协议,以就每个接收组应从故障排序器交付的最后一个序列号达成一致。为此,每个 Hydra 接收者还存储每个排序器的多重标记中针对每个接收组的最大序列号(不仅仅是它自身的组)。为了继续排序器的移除过程,每个接收者向配置服务发送一条消息,包含从故障排序器接收到的所有组的最大序列号,并停止处理来自该排序器的更高序列号的消息。一旦配置服务从每个接收组接收到足够数量的这些消息,它会将这些消息聚合起来,确定每个接收组从故障排序器接收到或应接收到的最高序列号。配置服务随后向每个组发送一条移除消息。组播接收者根据这条移除消息交付所有必要的 DROP-NOTIFICATION,并继续按照 §4.3.2 中的规则交付消息;移除消息充当来自故障排序器的最终刷新消息(具有无限大的时间戳)。一旦所有来自故障排序器的待处理消息被交付,接收者可以安全地切换到新配置。为了避免由于不同配置导致的不一致性,接收者在向应用交付消息时始终附加其当前的配置编号。
讨论
与 NOPaxos 和 Eris 等单排序器系统最初使用的恢复协议相比,Hydra 的恢复协议在可用性方面表现如何?与这些系统类似,Hydra 在排序器发生故障时会中断消息的交付。然而,Hydra 接收者在运行上述协议后,可以恢复从其他排序器接收并交付消息。这种恢复只需要接收者之间的协调,而不需要网络层的参与。因此,其不可用期仅取决于故障检测时间和一致性协议的延迟,这可以比网络重路由的持续时间短几个数量级。此外,Hydra 可以支持更激进的排序器移除,使用更短的超时时间。尽管部署更多的排序器会增加排序器故障的概率,但通过在关键路径上避免网络重路由,Hydra 仍然实现了系统可用性的整体提升。
添加新排序器
要将排序器 kk 添加到系统中,配置服务会类似地创建一个新配置 n+1n+1,将排序器 kk 添加进去,并将新配置发送给所有接收者。一旦接收者收到新配置,它将停止向应用交付组播消息,并等待接收到来自新排序器 kk 的刷新消息(其时间戳高于接收者最新交付的消息)。接收者随后将该刷新消息发送给配置服务。当配置服务从每个接收组接收到足够数量的刷新消息时,它选择具有最高时间戳的刷新消息,记为 tktk,并将其广播给所有接收者;tktk 实际上充当新配置的起始时间。接收者随后继续交付先前配置的消息,直到下一个要交付的消息的时间戳大于 tktk。此时,接收者切换到新配置,将 s[k]s[k] 设置为从配置服务接收到的刷新消息中的序列号,并开始从新排序器交付消息。需要注意的是,如果一个以前被移除的排序器重新加入到新配置中,该排序器的 ID 会被重新分配为一个与所有其他 ID 不同的值,其序列号也会全部重置。
4.5 正确性
我们在附录 B 中详细讨论了 Hydra 协议的安全性。此外,Hydra 组播及排序器的添加/移除协议的 TLA+ 规范(见附录 C)已通过模型检测,验证了 Hydra 的安全性保证。
Hydra 协议的活性性质非常直接:只要(1)接收者继续从未发生故障的排序器接收组播或刷新消息,并且(2)配置服务保持可用,且能够与每个接收组的法定数量的接收者通信以移除故障排序器并完成新排序器的添加,Hydra 的组播消息将被交付。
4.6 优化
刷新消息(Flush messages)有助于推进 Hydra 接收者的进度。然而,以过于激进的频率生成刷新消息会对接收者的性能产生负面影响,因为这些刷新消息会消耗网络、CPU 和 I/O 资源。为了在消息交付的延迟与吞吐量之间取得平衡,我们提出了两项优化:接收端刷新消息请求和网络内刷新消息聚合。
4.6.1 接收端刷新消息请求
在 §4.3.3 描述的基础协议中,排序器会定期向所有接收者发送刷新消息。我们可以手动调整排序器生成刷新消息的间隔 TT,以平衡延迟和吞吐量的权衡:较小的 TT 可以改善消息交付的延迟,但会增加接收者的负载,而较大的 TT 则会产生相反的效果。然而,由于刷新消息会广播给所有接收者,这种 “一刀切” 的策略无法考虑不同接收者的处理能力和负载水平。此外,在每 TT 个时间单位盲目地发送刷新消息(尤其是当 TT 较小时),可能会导致大量不必要的流量。例如,当某个接收者当前没有需要交付的消息时,在下一条带有更高时钟值的 Hydra 消息到达之前,任何发送给该接收者的刷新消息都不会对其交付进度产生影响,因此是完全不必要的。
我们关键的观察是,接收者而非排序器更清楚何时需要刷新消息:接收者仅在其存在未交付的消息时才需要刷新消息。因此,我们提出一种以接收者为中心的优化,即排序器不再主动生成刷新消息,而是由接收者在需要时显式请求刷新消息。这种优化还支持接收者端的多种请求策略。例如,为了优化延迟,接收者可以在收到无法交付的组播消息时立即请求刷新消息;为了优化吞吐量,接收者可以延迟请求刷新消息,相当于一种批处理方法。接收者还可以采用更复杂的策略,根据其当前负载自适应地决定请求的延迟,从而在延迟和吞吐量之间进行动态优化。
4.6.2 网络内刷新消息聚合
我们的消息交付规则(§4.3.2)要求:接收者仅在收到来自所有其他排序器的更高时钟值消息时才能交付一条组播消息。这一规则的一个隐含结果是:交付一条组播消息所需的刷新消息数量与排序器的数量呈线性增长。为了进一步减少由过多刷新消息引起的处理开销,我们提出了一种高级优化技术,灵感来自最近的网络内聚合研究。
具体而言,我们利用连接到 Hydra 接收者的 ToR(Top-of-Rack)可编程交换机,来跟踪每个排序器的时钟值和序列号。当交换机接收到刷新消息时,它会更新这些值,但不会立即将刷新消息转发给接收者。只有当存储的最小时钟值变得足够大时,交换机才会向接收者发送一条包含所有时钟值和序列号的聚合刷新消息。为了准确确定这个阈值,接收者在其刷新消息请求中附加所有未交付消息的最大时钟值。ToR 交换机将此值用作时钟阈值,保证聚合刷新消息能够使接收者交付请求时缓冲区中所有未交付的消息。通过应用这种网络内聚合优化,无论排序器的数量如何,接收者处理的刷新消息数量始终保持恒定。
5 Hydra 实现
一个 Hydra 部署包含一组动态的组播发送者、接收者和排序器,由配置服务管理。我们使用基于 SDN 的集中控制方法来管理组播路由:一个基于 POX [58] 的 SDN 控制器安装规则,将组播消息路由到随机选择的可达排序器。当使用端主机排序器时,我们采用源路由方法:配置服务跟踪排序器的地址,这些地址会被缓存到 Hydra 发送者中。发送者在发送组播消息时,随机选择一个排序器,通过单播方式将消息发送给它。这种方法不需要特殊的网络路由。
排序器的实现
Hydra 排序器的状态维护最小化,包括:一个唯一的排序器 ID,每个接收组的序列号,以及单调递增的物理时钟。我们的设计原则之一是简洁性,这使得我们能够在不同硬件平台上高效实现 Hydra 排序器。
在可编程交换机中实现网络内排序 在网络交换机的数据平面中实现 Hydra 排序器可以提供最高的排序性能,因为当前的可编程交换机能够以每秒数十亿个数据包的速度处理数据包,交换延迟始终在几百纳秒内。Hydra 组播被实现为基于 UDP 的应用层协议。我们为 Hydra 组播保留了一个特殊的 UDP 端口,并在 UDP 头后附加一个定制的 Hydra 头。Hydra 头包括一个位图(指定目标组)、一个序列号向量(为每个目标组提供一个序列号)以及一个单一的时钟值。交换机实现为每个接收组使用一个交换机寄存器数组元素来存储其当前的序列号。交换机检查位图的每一位,并为每个启用的位递增对应的序列号寄存器,将序列号写入 Hydra 头中。由于在处理组播消息时,各组之间没有依赖关系,位检查和序列号更新可以并行完成,从而显著减少所需的流水线阶段,支持更高数量的组。随后,交换机将硬件时钟时间写入消息头,并使用复制引擎将数据包组播给接收者。
端主机排序器 在端主机服务器上实现 Hydra 排序器提供了更高的灵活性和可移植性,对于无法部署专用硬件的场景尤其具有吸引力。缺点是数据包处理性能相对较低。然而,Hydra 协议通过增加排序器数量来实现排序性能的线性扩展。正如我们在评估中(§7.1.3)所展示的那样,Hydra 的吞吐量随着排序器数量的增加呈线性增长。
发送者和接收者库
Hydra 提供用户态的库,用于发送和接收组播消息。除了与配置服务协调以跟踪活动的排序器和组外,该库还实现了接收者端缓冲,以按正确顺序交付消息,并支持 §4.6.1 中描述的刷新消息请求策略。我们为这些库实现了两种 I/O 栈:一种是基于轮询的 DPDK [23] 栈,用于高效、绕过内核的包处理;另一种是基于 Linux 的传输方式,使用套接字和 libevent [45],以提高兼容性。我们在 §7 的评估中使用了 DPDK 栈。
6. 使用 Hydra 构建分布式系统
Hydra 的组播原语在其保证与实现效率之间具有独特的权衡。与单播和 IP 组播等尽力而为(best-effort)原语相比,Hydra 提供了强大的消息排序保证;与原子广播(atomic broadcast)和原子组播(atomic multicast)原语相比,Hydra 不保证可靠的消息交付,但可以通过单阶段协议高效实现。为了展示其设计的优势,我们将 Hydra 应用于两个最近的分布式系统——NOPaxos 和 Eris [42, 43],并构建了一个称为 HydraPaxos 的状态机复制系统以及一个分布式事务处理系统 HydraTxn。
Hydra 的组播提供与 NOPaxos 和 Eris 使用的网络协议(有序不可靠组播和多序列组播)相同的保证。因此,Hydra 可以直接与这些现有协议结合。HydraPaxos 和 HydraTxn 使用 NOPaxos 和 Eris 的协议来容忍服务器故障并处理 DROP-NOTIFICATION,同时利用 Hydra 提供消息排序保证,并支持排序器的添加和移除。对 NOPaxos 和 Eris 的唯一必要修改是禁用它们的排序器故障处理协议,因为这些由 Hydra 本身处理。在正常情况下,HydraPaxos 和 HydraTxn 都可以在单次往返中提交操作。
HydraPaxos
HydraPaxos 是基于 NOPaxos 的状态机复制系统,能够容忍少于一半副本的崩溃故障(或者等价地,在 2f + 1 个副本中,HydraPaxos 可容忍 f 个崩溃故障)。只要应用状态机是确定性的,HydraPaxos 即可保证线性一致性(linearizability)[30]。每个 HydraPaxos 部署都会注册一个唯一的 Hydra 组播地址。HydraPaxos 客户端将状态机操作作为组播消息发送到一个单一的目标组。HydraPaxos 部署中的每个副本都充当该组的单个 Hydra 接收者。HydraPaxos 操作在正常情况下通过单次往返处理完成。一旦 Hydra 将操作交付给副本,副本会使用 NOPaxos 协议确保操作被持久提交。
简单来说,每个副本会将操作添加到其日志中,领导副本(leader)会根据当前状态执行操作。客户端在收到来自多数副本(包括领导副本)的一致响应后,认为操作已提交。当副本收到 DROP-NOTIFICATION 时,副本需要就消息的命运(是处理还是永久忽略)达成共识,以确保线性一致性。副本首先尝试通过联系组中的其他副本来恢复丢失的消息。如果副本无法恢复丢失的消息,它们会协调(由领导者驱动)将该消息提交为 NO-OP。
HydraTxn
HydraTxn 是一个容错的分布式事务处理系统。HydraTxn 将整个数据存储划分为多个分片(shards),每个分片在多个服务器上复制。客户端将数据读写包装到事务中。HydraTxn 保证事务的原子性和严格可串行化的执行,同时能够容忍每个分片中少于一半的副本故障。
与 HydraPaxos 类似,每个 HydraTxn 部署使用一个唯一的 Hydra 组播地址。部署的每个分片被分配一个唯一的组,分片中的每个副本注册为 Hydra 组接收者,处理 Hydra 消息和 DROP-NOTIFICATION。对于满足独立事务条件的事务(即没有跨分片依赖且不需要客户端交互的存储过程),客户端将事务作为单个 Hydra 组播消息发送到所有涉及的分片。
HydraTxn 还支持更通用的事务,通过将其分解为多个独立事务,并在服务器上使用两阶段锁(two-phase locking)以确保隔离。与 HydraPaxos 类似,独立事务在正常情况下通过单次往返处理完成。参与事务的每个分片的副本会记录事务并回复客户端,其中每个分片的领导副本会额外执行该事务。客户端在收到每个分片的多数法定响应后,认为事务已提交。
由于事务组播可能涉及多个分片,DROP-NOTIFICATION 要求所有相关分片(而不仅仅是本地组)就接收/丢弃决策达成一致。与 Eris [42] 类似,我们使用一个逻辑上独立的、容错的故障协调服务来管理该一致性协议。 HydraTxn 的冗余排序器设计显著提高了系统的可靠性和可用性。在排序器故障时,HydraTxn 的性能迅速恢复到最大吞吐量,而不依赖于网络控制平面进行重新路由(如 Eris 的做法)。相比之下,Eris 在重新路由期间会完全不可用,这进一步证明了我们冗余排序器方法的优越性。
8. 相关工作
有序组通信原语(如原子组播 [28])有着悠久的历史,可追溯到虚拟同步(virtual synchrony)[8],并已被广泛实现和使用 [4, 9, 32, 34, 62]。经典的原子广播模型等价于共识(consensus)[13]。我们的工作明确采用了由 NOPaxos [43] 和 Eris [42] 引入的有序但不可靠的通信模型,该模型支持网络加速的排序。
其他分布式系统也使用排序器。例如,CORFU [63] 将不可靠的排序器与基于闪存的复制存储结合,构建了一个共享日志,可以用于构建分布式数据结构 [7]。vCorfu [63] 将其扩展为类似于多序列化的多日志抽象。Scalog [22] 通过将日志数据分布到复制数据分片并使用基于 Paxos 的排序层定期排序日志条目,解决了先前共享日志设计中的阻塞重配置和排序器可扩展性问题。Hydra 不保证消息的持久性,因此避免了分片内复制的开销。此外,Hydra 消除了排序器在关键路径上的协调,从而降低了消息交付延迟,并避免了集中式排序服务的潜在瓶颈。Percolator [54] 使用排序器进行事务处理,而确定性数据库如 Calvin [60]、SLOG [60] 和 Aria [47] 则将排序器与事务调度程序结合,以实现并发控制。
Hydra 建立在改进共识协议可扩展性的研究基础之上。其使用多个活动排序器和刷新消息的机制类似于 Mencius 的旋转领导者(rotating leader)[48]。Hydra 使用松散同步的时钟 [46] 来建立全局顺序,这一思想也被 CLOCC [1]、Spanner [17] 和 TAPIR [66] 等并发控制协议所采用。像 PTP [59] 这样的协议使时钟同步在数据中心内广泛可用,而近期的研究如 Sundial [44] 和 DPTP [35] 展示了同步精度的进步。Hydra 使用时间戳对操作进行排序的方法类似于 TEMPO [24]。然而,TEMPO 需要至少一个半 RTT 来提交时间戳,而 Hydra 即使在存在并发请求的情况下,也可以通过网络排序器在半个 RTT 内提交时间戳。与 TEMPO 类似,Hydra 也会等待来自其他排序器的更高时间戳,以确保时间戳的稳定性。
Hydra 的设计支持将可编程设备用作排序器,包括 PISA/RMT 交换机 ASIC [12]。最近的研究表明,这些交换机可以实现复杂协议(如共识 [20, 21] 和链式复制 [33])。与 NOPaxos 和 Eris 类似,Hydra 在交换机上有意实现有限的排序功能,而将大部分协议复杂性保留在终端主机上。RedPlane [37] 和 SwiSh [64, 65] 提供了用于复制交换机数据平面状态的抽象,分别用于提高可靠性和可扩展性;然而,排序操作需要强一致性和频繁更新,这对于两者来说是最差的性能场景,因此需要采用不同的方法。
另一个并行研究项目 1Pipe [41] 使用数据中心中的可编程交换机实现因果和完全有序的通信。在 1Pipe 中,发送者将本地时间戳附加到消息中,接收者按照时间戳顺序严格交付消息。为了确定时间戳何时可以安全交付,1Pipe 的交换机跟踪所有入网链接的屏障信息,并将聚合的屏障时间戳写入每个数据包中。主机和交换机会定期在空闲链接上发送信标消息以确保进度。
Hydra 同样使用时间戳对消息进行排序。但一个关键区别在于,1Pipe 使用由发送者生成的时间戳,而 Hydra 的时间戳是由排序器生成的。因此,1Pipe 需要网络中所有节点的时钟同步,并且在每个交换机上进行网络内计算,这在非同质网络(例如并非所有交换机都可编程 [57])中是一项部署挑战;而 Hydra 更适合实际部署,仅需在排序器和副本上运行逻辑,并且仅对排序器之间的时钟进行同步。此外,在 1Pipe 的部署中,网络中任何节点、链路或交换机的故障都会导致所有接收者的进度停滞;而在 Hydra 中,只有与排序器相关的本地故障可能会影响进度。