本文解決的問題是:本peer節點如何接收其他節點的資料,接到資料如何處理?
之前兩節的分析是命令來源是cli client,如何連線到devops伺服器、如何傳送給consensus模組、如何傳送給chaincodesupportclient等。
接下來分析本文要討論的問題
1)在進行網路初始化的過程中執行以下內容,在建立節點engine過程中該節點作為客戶端的身份連線到其他peer
[cpp] 程式碼
peerserver, err = peer.newpeerwithengine(sechelperfunc, helper.getengine)
[cpp] 程式碼
func (p *impl) chatwithpeer(address string) error {
peerlogger.debugf("initiating chat with peer address: %s", address)
conn, err := newpeerclientconnectionwithaddress(address)
if err != nil {
peerlogger.errorf("error creating connection to peer address %s: %s", address, err)
return err
}
serverclient := pb.newpeerclient(conn)
ctx := context.background()
stream, err := serverclient.chat(ctx)
if err != nil {
peerlogger.errorf("error establishing chat with peer address %s: %s", address, err)
return err
}
peerlogger.debugf("established chat with peer address: %s", address)
err = p.handlechat(ctx, stream, true)
stream.closesend()
if err != nil {
peerlogger.errorf("ending chat with peer address %s due to error: %s", address, err)
return err
}
return nil
}
2.在handlechat執行過程中,建立訊息迴圈,而這裡的handler.handlemessage。這個handler之前介紹過,是engine的訊息響應控制代碼,該訊息響應處理來自於consensus模組
[cpp] 程式碼
func (p *impl) handlechat(ctx context.context, stream chatstream, initiatedstream bool) error {
deadline, ok := ctx.deadline()
peerlogger.debugf("current context deadline = %s, ok = %v", deadline, ok)
handler, err := p.handlerfactory(p, stream, initiatedstream, nil)
if err != nil {
return fmt.errorf("error creating handler during handlechat initiation: %s", err)
}
defer handler.stop()
for {
in, err := stream.recv()
if err == io.eof {
peerlogger.debug("received eof, ending chat")
return nil
}
if err != nil {
e := fmt.errorf("error during chat, stopping handler: %s", err)
peerlogger.error(e.error())
return e
}
err = handler.handlemessage(in)
if err != nil {
peerlogger.errorf("error handling message: %s", err)
//return err
}
}
}
3.handlemessage函式consenterchan 這個channel比較重要,該寫入操作會觸發engine.consensusfan的訊息迴圈
[cpp] 程式碼
func (handler *consensushandler) handlemessage(msg *pb.message) error {
if msg.type == pb.message_consensus {
senderpe, _ := handler.to()
select {
case handler.consenterchan <- &util.message{
msg: msg,
sender: senderpe.id,
}:
return nil
default:
err := fmt.errorf("message channel for %v full, rejecting", senderpe.id)
logger.errorf("failed to queue consensus message because: %v", err)
return err
}
}
if logger.isenabledfor(logging.debug) {
logger.debugf("did not handle message of type %s, passing on to next messagehandler", msg.type)
}
return handler.messagehandler.handlemessage(msg)
}
4.看到recvmsg這個函式是不是有點眼熟,這個操作和 hyperledger fabric 結構分析 最後一個流程是一樣的。
[cpp] 程式碼
func getengine(coord peer.messagehandlercoordinator) (peer.engine, error) {
var err error
engineonce.do(func() {
engine = new(engineimpl)
engine.helper = newhelper(coord)
engine.consenter = controller.newconsenter(engine.helper)
engine.helper.setconsenter(engine.consenter)
engine.peerendpoint, err = coord.getpeerendpoint()
engine.consensusfan = util.newmessagefan()
go func() {
logger.debug("starting up message thread for consenter")
// the channel never closes, so this should never break
for msg := range engine.consensusfan.getoutchannel() {
engine.consenter.recvmsg(msg.msg, msg.sender)
}
}()
})
return engine, err
}
5.再往下的流程與 hyperledger fabric 結構分析(二)中的一致。