为什么要通过 etcdraft 来进行共识?

我觉得有以下原因

  1. solo 并不适合大多数场景, 例如: 组织A, 组织B, 都想在自己放置共识节点
  2. kafka 虽然能满足以上需求, 但是 kafka 加上 zookeeper 需要额外部署并且实在是太重了, 不方便部署

所以基于 etcdraft 的共识来了, 解决了以上的痛点

重要的话说三遍!

千万不要错过文章中的源码部分, 里面有很多很多的注释!!! 千万不要错过文章中的源码部分, 里面有很多很多的注释!!! 千万不要错过文章中的源码部分, 里面有很多很多的注释!!!

核心接口

以下是我认为实现 etcdraft 共识核心的接口

// ClusterServer 集群Server接口
type ClusterServer interface {
    Step(Cluster_StepServer) error
}

// ClusterClient 集群Client接口
type ClusterClient interface {
    Step(ctx context.Context, opts ...grpc.CallOption) (orderer.Cluster_StepClient, error)
}

// Handler 用于共识的两个接口
type Handler interface {
    OnConsensus(channel string, sender uint64, req *orderer.ConsensusRequest) error
    OnSubmit(channel string, sender uint64, req *orderer.SubmitRequest) error
}

// Consenter 共识排序接口
type Consenter interface {
    HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}

// Chain 共识核心的接口
type Chain interface {
    // 接收普通的交易消息
    Order(env *cb.Envelope, configSeq uint64) error
    // 接收配置消息
    Configure(config *cb.Envelope, configSeq uint64) error
    WaitReady() error
    Errored() <-chan struct{}
    Start()
    Halt()
}

Orderer初始化

启动 etcd 节点是需要设置 ETCD_NAME 的, 也就是节点的 ID, 但是在启动 Orderer 的时候我们并没有做相关的设置, 我觉得设计很巧妙

如果你启动过 fabric-samples/first-network 你应该知道里面的 configtx.yaml, 用于生成 genesis.blok

genesis.block 中可以设置 OrdererType, 以及 EtcdRaft 相关的配置

...
Profiles:
    SampleMultiNodeEtcdRaft:
        ...
        Orderer:
            <<: *OrdererDefaults
            OrdererType: etcdraft
            EtcdRaft:
                Consenters:
                - Host: orderer.example.com
                  Port: 7050
                  ClientTLSCert: crypto-config/ordererOrganizations/example.com/orderers/orderer.example.com/tls/server.crt
                  ServerTLSCert: crypto-config/ordererOrganizations/example.com/orderers/orderer.example.com/tls/server.crt
                - Host: orderer2.example.com
                  Port: 7050
                ...
            Addresses:
                - orderer.example.com:7050
                - orderer2.example.com:7050
                ...
...

如果你做过添加组织的操作, 或者反序列化过 genesis.block, 就可以看到这个 block 中的 json 数据

数据做过删减, 只抽出了需要讲解的地方, 如下

{
    "data":{
        "data":[
            {
                "payload":{
                    "config":{
                        "channel_group":{
                            "groups":{
                                "Orderer":{
                                    "values":{
                                        "ConsensusType":{
                                            "value":{
                                                "metadata":{
                                                    "consenters":[
                                                        {
                                                            "client_tls_cert":"client tls cert",
                                                            "host":"orderer.example.com",
                                                            "port":7050,
                                                            "server_tls_cert":"server tls cert"
                                                        },
                                                        {
                                                            "client_tls_cert":"client tls cert",
                                                            "host":"orderer2.example.com",
                                                            "port":7050,
                                                            "server_tls_cert":"server tls cert"
                                                        }
                                                    ],
                                                    "options":{
                                                        "election_tick":10,
                                                        "heartbeat_tick":1,
                                                        "max_inflight_blocks":5,
                                                        "snapshot_interval_size":20971520,
                                                        "tick_interval":"500ms"
                                                    }
                                                },
                                                "state":"STATE_NORMAL",
                                                "type":"etcdraft"
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                },
                "signature":null
            }
        ]
    },
    "header":{

    },
    "metadata":{

    }
}

Orderer 启动之处, 会去读取 system channel 最新的配置块, 如果没有则会读取 genesis.blok

判断其中的 ConsensusType 如果是 etcdraft, 就会使用 clusterGRPCServer 并传入 initializeMultichannelRegistrar(...)

initializeMultichannelRegistrar(...) 会再次判断 ConsensusType 如果是 etcdraft 就会通过 initializeEtcdraftConsenter(...) 初始化 etcdraftConsenter 实例对象

同时也会初始化 cluster.Service, 它实现了 ClusterServer 核心接口

最后将 所有的 consenters 传入 multichannel.Initialize 并在其中进行初始化

multichannel.Initialize 会调用 etcdraft.ConsenterHandleChain() 也就是上面说的核心接口

// orderer/common/server/main.go
func Start(cmd string, conf *localconfig.TopLevel) {
    // 读取 genesis.blok
    bootstrapBlock := extractBootstrapBlock(conf)
    ...
    // 创建 LedgerFactory, 如果不存在 system channel 的配置块返回 nil
    lf, _ := createLedgerFactory(conf, metricsProvider)
    sysChanLastConfigBlock := extractSysChanLastConfig(lf, bootstrapBlock)
    // 选择引导块, 在 sysChanLastConfigBlock 和 bootstrapBlock 块中选择, 如果 sysChanLastConfigBlock 不为 nil, 则优先返回
    clusterBootBlock := selectClusterBootBlock(bootstrapBlock, sysChanLastConfigBlock)
    ...
    // 判断 clusterBootBlock 中的共识类型如果是 `etcdraft` 就进行集群的初始化工作
    clusterType := isClusterType(clusterBootBlock)
    ...
    // 初始化多通道
    manager := initializeMultichannelRegistrar(clusterBootBlock, r, clusterDialer, clusterServerConfig, clusterGRPCServer, conf, signer, metricsProvider, opsSystem, lf, tlsCallback)
}

// orderer/common/server/main.go
func initializeMultichannelRegistrar(...) *multichannel.Registrar {
    genesisBlock := extractBootstrapBlock(conf)
    ...
    registrar := multichannel.NewRegistrar(*conf, lf, signer, metricsProvider, callbacks...)
    ...
    // 加载 solo, kafka 共识
    consenters["solo"] = solo.New()
    var kafkaMetrics *kafka.Metrics
    consenters["kafka"], kafkaMetrics = kafka.New(conf.Kafka, metricsProvider, healthChecker)
    ...
    // 判断是否是 `etcdraft` 共识, 如果是就将 etcdraft 的实例也加入到 consenters 中
    if isClusterType(bootstrapBlock) {
        initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider)
    }
    registrar.Initialize(consenters)
    return registrar
}

// orderer/common/multichannel/registrar.go
func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
    r.consenters = consenters
    existingChains := r.ledgerFactory.ChainIDs()

    for _, chainID := range existingChains {
        ...
        // 这里判断是否是 system channel
        if _, ok := ledgerResources.ConsortiumsConfig(); ok {
            ...
            chain := newChainSupport(
                r,
                ledgerResources,
                r.consenters,
                r.signer,
                r.blockcutterMetrics,
            )
            ...
            defer chain.start()
        } else {
            ...
        }
    }
...
}

// orderer/common/multichannel/chainsupport.go
func newChainSupport(...) *ChainSupport {
    ...
    // 这里会根据 ledger 获取 ConsensusType 选择最终要用到的共识算法, 根据本文会选出 `etcdraft`
    consenterType := ledgerResources.SharedConfig().ConsensusType()
    consenter, ok := consenters[consenterType]
    ...
    // 调用 HandleChain
    cs.Chain, err = consenter.HandleChain(cs, metadata)
    ...
    return cs
}

EtcdRaft初始化

代码从现在起就进入了 etcdraft 中的核心部分, 里面可以看到系统是如何自动设置 raftID 的, 始于 HandleChain, 终于…

HandleChain 中会取出当前 channel 也就是 system channel 最新的配置信息并获取 ConsensusMetadata 也就是上面 json 中的 channel_group.groups.Orderer.ConsensusType.value.metadata

这是一个数组, 系统会根据当前 orderer 所设置的证书在这个数组中的 index 来设置 raftID, 妙啊,真是妙啊!

在这个方法中会初始化 cluster.RPC, 没错这个东西实现了上面所说 ClusterClient 核心接口, 也就是说 ClusterServerClusterClient 是一对多关系

最终调用 etcdraft.NewChain(...) 进行 etcdraft.Chain 的创建, etcdraft.Chain 实现了 Chain 这个核心接口

到此, 上面所说的核心接口已全部出现了

// orderer/consensus/etcdraft/consenter.go
func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *common.Metadata) (consensus.Chain, error) {
    // 获取 ConsensusMetadata
    m := &etcdraft.ConfigMetadata{}
    if err := proto.Unmarshal(support.SharedConfig().ConsensusMetadata(), m); err != nil {
        return nil, errors.Wrap(err, "failed to unmarshal consensus metadata")
    }
    ...
    consenters := map[uint64]*etcdraft.Consenter{}
    for i, consenter := range m.Consenters {
        consenters[blockMetadata.ConsenterIds[i]] = consenter
    }
    // 设置ID
    id, err := c.detectSelfID(consenters)
    ...
    // 初始化 cluster.RPC 每一个 chan 都会有个 rpc, 也印证了我上面说的, cluster.server 和 cluster.client 是一对一关系
    rpc := &cluster.RPC{
        Timeout:       c.OrdererConfig.General.Cluster.RPCTimeout,
        Logger:        c.Logger,
        Channel:       support.ChainID(),
        Comm:          c.Communication,
        StreamsByType: cluster.NewStreamsByType(),
    }

    return NewChain(
        support,
        opts,
        c.Communication,
        rpc,
        func() (BlockPuller, error) { return newBlockPuller(support, c.Dialer, c.OrdererConfig.General.Cluster) },
        func() {
            c.InactiveChainRegistry.TrackChain(support.ChainID(), nil, func() { c.CreateChain(support.ChainID()) })
        },
        nil,
    )
}

// orderer/consensus/etcdraft/chan.go
func NewChain(...) (*Chain, error) {
    ...
    // 此方法基本都是初始化工作, 需要注意的是 rpc 给了 chan
    c := &Chain{
        configurator:     conf,
        rpc:              rpc,
        channelID:        support.ChainID(),
        raftID:           opts.RaftID,
        ...
    }
    ...
    // raft 的配置文件, 也是从 block 中获取的.
    config := &raft.Config{
        ...
    }
    // node, 里面有 raft 的 node
    c.Node = &node{
        ...
    }

    return c, nil
}

EtcdRaft启动

这里又要回到 newChainSupport() 中了, 这里就复制那边的代码了, 在此函数退出的时候 defer chain.start() 会调用 start 的方法

里面会启动 etcdraft 中的 node.start() 在里面又会启动 etcd 中的 node.start().

// orderer/consensus/etcdraft/chan.go
func (c *Chain) Start() {
    ...
    // 启动了 etcdraft.Node
    c.Node.start(c.fresh, isJoin)
    ...
    // serveRequest 是个核心的函数, 在里面会做 raft 的角色切换等一堆的操作
    go c.serveRequest()

    es := c.newEvictionSuspector()

    interval := DefaultLeaderlessCheckInterval
    if c.opts.LeaderCheckInterval != 0 {
        interval = c.opts.LeaderCheckInterval
    }

    c.periodicChecker = &PeriodicCheck{
        Logger:        c.logger,
        Report:        es.confirmSuspicion,
        CheckInterval: interval,
        Condition:     c.suspectEviction,
    }
    c.periodicChecker.Run()
}

// orderer/consensus/etcdraft/node.go
func (n *node) start(fresh, join bool) {
    ...
    var campaign bool
    // 是否是新节点标记位
    if fresh {
        // 是否是新加入标记位
        if join {
            raftPeers = nil
            n.logger.Info("Starting raft node to join an existing channel")
        } else {
            n.logger.Info("Starting raft node as part of a new channel")

            // determine the node to start campaign by selecting the node with ID equals to:
            //                hash(channelID) % cluster_size + 1
            sha := sha256.Sum256([]byte(n.chainID))
            number, _ := proto.DecodeVarint(sha[24:])
            if n.config.ID == number%uint64(len(raftPeers))+1 {
                campaign = true
            }
        }
        // 最终会启动 raft 的 node.
        // 有点像设置配置文件.
        // raftPeers, 对应的就是 ETCD_INITIAL_CLUSTER
        n.Node = raft.StartNode(n.config, raftPeers)
    } else {
        n.logger.Info("Restarting raft node")
        n.Node = raft.RestartNode(n.config)
    }

    go n.run(campaign)
}

一笔交易

Chain 接口中的 Order(...) 方法, 用于接收普通交易消息

当一个消息进入 Order 方法中后, 会将消息封装成 orderer.SubmitRequest 并发送

Chain.Submit(...) 方法中, 会先消息在封装 orderer.submit 并存入 chan Chain.submitC 通道中

在判断 leader 是否是当前节点, 如果不是, 就会把数据通过 rpc 发送给 leader

Chain.rpc 是一个实现了 ClusterClient 可以用于将消息发送到其他的 orderer

st=>start
e=>end
opOrder=>operation: Order(...)
condLeader=>condition: IsLeader
opRPC=>operation: rpc.SendSubmit(...)

st->opOrder->condLeader(no)->opRPC->e
condLeader(yes)->e
// orderer/consensus/etcdraft/chain.go
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
    c.Metrics.NormalProposalsReceived.Add(1)
    // 这里将 env, configSeq 以及 channelID 封装成了 SubmitRequest 并发给了 Submit 方法
    return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}

// orderer/consensus/etcdraft/chain.go
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
    ...
    // 获取一个 chan 用于接收网络当前的 leader
    leadC := make(chan uint64, 1)
    select {
    // 这个 case 就是将 req, 以及用于接收 leader 的 chan, 再次封装发给了 Chain 中的 chan submitC
    case c.submitC <- &submit{req, leadC}:
        // 等待 submitC 被处理, 并返回 leader 的 id
        lead := <-leadC
        // 如果没有 leader 就报错
        if lead == raft.None {
            c.Metrics.ProposalFailures.Add(1)
            return errors.Errorf("no Raft leader")
        }

        // 如果自己不是 leader, 就会将消息发给 leader.
        if lead != c.raftID {
            if err := c.rpc.SendSubmit(lead, req); err != nil {
                c.Metrics.ProposalFailures.Add(1)
                return err
            }
        }
    ...
    return nil
}

一个块的同步

这个方法, 我认为算是 etcdraft 核心方法了, 有兴趣的小伙伴可以深究一下这个方法

里面涉及到了 Leader, Follower 的角色切换, 共识等实现

这里还是跟着上一步继续向下深究, 在交易消息到达 Leader 后就要开始共识了

如果节点是 Leader 则会进行排序

// orderer/consensus/etcdraft/chain.go
func (c *Chain) serveRequest() {
    ...
    // 下方会使用到里面的协程
    becomeLeader := func() (chan<- *common.Block, context.CancelFunc) {
        ...
        ctx, cancel := context.WithCancel(context.Background())
        go func(ctx context.Context, ch <-chan *common.Block) {
            for {
                select {
                case b := <-ch:
                    data := utils.MarshalOrPanic(b)
                    // 这里会丢给 raft node 进行同步 block, 里面的处理非常复杂.., 属于 raft 中的东西了, 这里就不细追了.
                    if err := c.Node.Propose(ctx, data); err != nil {
                        c.logger.Errorf("Failed to propose block [%d] to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
                        return
                    }
                    c.logger.Debugf("Proposed block [%d] to raft consensus", b.Header.Number)

                case <-ctx.Done():
                    c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
                    return
                }
            }
        }(ctx, ch)
        return ch, cancel
    }
    ...
    for {
        select {
        case s := <-submitC:
            // leader 进行出块操作, 如果未符合要求,比如大小不够, 就会 pending
            batches, pending, err := c.ordered(s.req)
            if err != nil {
                c.logger.Errorf("Failed to order message: %s", err)
                continue
            }
            if pending {
                startTimer() // no-op if timer is already started
            } else {
                stopTimer()
            }

            // 生成生成 block 并将 block 传给 propC, propC 是一个 chan 结构, 所以会给上面的协程做处理
            c.propose(propC, bc, batches...)

            if c.configInflight {
                c.logger.Info("Received config transaction, pause accepting transaction till it is committed")
                submitC = nil
            } else if c.blockInflight >= c.opts.MaxInflightBlocks {
                c.logger.Debugf("Number of in-flight blocks (%d) reaches limit (%d), pause accepting transaction",
                    c.blockInflight, c.opts.MaxInflightBlocks)
                submitC = nil
            }
            ...
        case <-timer.C():
            ticking = false

            batch := c.support.BlockCutter().Cut()
            if len(batch) == 0 {
                c.logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
                continue
            }

            c.logger.Debugf("Batch timer expired, creating block")
            c.propose(propC, bc, batch) // we are certain this is normal block, no need to block
        }
    }
}

一个块的存储

根据 raft 共识同步消息的流程是 uncommitted -> committed 两个阶段, 都会在上面完成

如果是 committed 就会去写块

// orderer/consensus/etcdraft/chain.go
func (c *Chain) serveRequest() {
    ...
    // 下方会使用到里面的协程
    becomeLeader := func() (chan<- *common.Block, context.CancelFunc) {
        ...
        ctx, cancel := context.WithCancel(context.Background())
        go func(ctx context.Context, ch <-chan *common.Block) {
            for {
                select {
                case b := <-ch:
                    data := utils.MarshalOrPanic(b)
                    // 这里会丢给 raft node 进行同步 block, 里面的处理非常复杂.., 属于 raft 中的东西了, 这里就不细追了.
                    if err := c.Node.Propose(ctx, data); err != nil {
                        c.logger.Errorf("Failed to propose block [%d] to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
                        return
                    }
                    c.logger.Debugf("Proposed block [%d] to raft consensus", b.Header.Number)

                case <-ctx.Done():
                    c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
                    return
                }
            }
        }(ctx, ch)
        return ch, cancel
    }
    ...
    for {
        select {
        ...
        case app := <-c.applyC:
            ...
            // 这里面会讲 raftlog 中的 block 落地.
            c.apply(app.entries)
            ...
        }
    }
}

// orderer/consensus/etcdraft/chain.go
func (c *Chain) apply(ents []raftpb.Entry) {
    ...
    for i := range ents {
        // 只有两种类型 EntryNormal, EntryConfChange, block 消息是 EntryNormal
        switch ents[i].Type {
        case raftpb.EntryNormal:
            ...
            block := utils.UnmarshalBlockOrPanic(ents[i].Data)
            // 这里就会写入 block 了
            c.writeBlock(block, ents[i].Index)
            c.Metrics.CommittedBlockNumber.Set(float64(block.Header.Number))
        ...
        case raftpb.EntryConfChange:
            ...
        }

        if ents[i].Index > c.appliedIndex {
            c.appliedIndex = ents[i].Index
        }
    }

    if c.accDataSize >= c.sizeLimit {
        b := utils.UnmarshalBlockOrPanic(ents[position].Data)

        select {
        case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
            c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
                "taking snapshot at block [%d] (index: %d), last snapshotted block number is %d, current nodes: %+v",
                c.accDataSize, c.sizeLimit, b.Header.Number, c.appliedIndex, c.lastSnapBlockNum, c.confState.Nodes)
            c.accDataSize = 0
            c.lastSnapBlockNum = b.Header.Number
            c.Metrics.SnapshotBlockNumber.Set(float64(b.Header.Number))
        default:
            c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotIntervalSize is too small")
        }
    }

    return
}