看一下架構圖:
上圖可以看出來,其實SDK就是為了APP服務的,圖中的應用程式其實就是給的例子,讓大家能快速上手。然後另外兩部分一個是和抽象層(共識和P2P)通訊的,另外一個是用來呼叫各種外掛的。
SDK從開始到現在,也進行了好幾次比較大的改動了,至於今後會不會再有大的改動,也不敢肯定。所以說,做成外掛化,是一個最好的選擇,到時候看誰不順眼,直接搞掉就可以了,喜歡誰,把外掛接進來就OK。
1、plugins層
在外掛層其實圖中畫的並不是很完全只是一個示意。主要的幾個外掛包括staking、IBC、 bank、 auth、 governance 、tx、 keys等幾個。staking主要是控制Atom持有者相關貢獻。類似一個匯率機制,動態變化。IBC其實就是鏈間通訊機制,因為各個通訊鏈是透過外掛插入到HUB中進行通訊,所以需要一個相應的通訊機制來保證通訊的安全性。governance這個模組目前在原始碼中看好像註釋了不少,只保留了較少的東西,它主要是治理相關的實現,如提議、投票等。bank其實就是提供了一系列的通訊介面(資產轉移的),所以叫“銀行”。
2、APP層
這一層基本沒啥可說的,應該就是客戶開發的APP,但是為了能讓客戶迅速進入,提供了三個相關的Demo。其中Basecoin是第一個完成的,是一個相對完整的應用,實現了SDK的核心模組的擴充套件,提供了諸如帳戶管理、管理交易型別、處理儲存等。
其它兩個都是相關的擴充套件。
3、BaseApp
這一層主要是ABCI的通訊,和Tendermint進行互動,Cosmos的核心就在這裡。
二、原始碼流程
1、啟動流程
從主程式的介面來分析原始碼:
這裡只分析前兩步,最後一步等分析Tendermint時再展開分析。
func NewGaiaApp(logger log.Logger, db dbm.DB) *GaiaApp { cdc := MakeCodec() // create your application object //建立一個相關的APP,其它所有的APP都可以按照這個方法 var app = &GaiaApp{ BaseApp: bam.NewBaseApp(appName, cdc, logger, db), cdc: cdc, keyMain: sdk.NewKVStoreKey("main"), keyAccount: sdk.NewKVStoreKey("acc"), keyIBC: sdk.NewKVStoreKey("ibc"), keyStake: sdk.NewKVStoreKey("stake"), keySlashing: sdk.NewKVStoreKey("slashing"), } // define the accountMapper //帳戶管理--從KVSTROE抽象 app.accountMapper = auth.NewAccountMapper( app.cdc, app.keyAccount, // target store &auth.BaseAccount{}, // prototype ) // add handlers //新增各種操作——它們都從KVSTORE抽象出來,但是它們的抽象度更高,或者可以認為是accountMapper的更高一層。 //處理帳戶的操作,再抽象一層 app.coinKeeper = bank.NewKeeper(app.accountMapper) app.ibcMapper = ibc.NewMapper(app.cdc, app.keyIBC, app.RegisterCodespace(ibc.DefaultCodespace)) //處理Atom app.stakeKeeper = stake.NewKeeper(app.cdc, app.keyStake, app.coinKeeper, app.RegisterCodespace(stake.DefaultCodespace)) //設定懲罰機制操作者 app.slashingKeeper = slashing.NewKeeper(app.cdc, app.keySlashing, app.stakeKeeper, app.RegisterCodespace(slashing.DefaultCodespace)) // register message routes //這個是重點,在這裡註冊路由的控制代碼 app.Router(). AddRoute("bank", bank.NewHandler(app.coinKeeper)). AddRoute("ibc", ibc.NewHandler(app.ibcMapper, app.coinKeeper)). AddRoute("stake", stake.NewHandler(app.stakeKeeper)) // initialize BaseApp //初始化相關引數 app.SetInitChainer(app.initChainer) app.SetBeginBlocker(app.BeginBlocker) app.SetEndBlocker(app.EndBlocker) //設定許可權控制控制代碼 app.SetAnteHandler(auth.NewAnteHandler(app.accountMapper, app.feeCollectionKeeper)) //從KV資料庫載入相關資料--在當前版本中,IVAL儲存是KVStore基礎的實現 app.MountStoresIAVL(app.keyMain, app.keyAccount, app.keyIBC, app.keyStake, app.keySlashing) err := app.LoadLatestVersion(app.keyMain) if err != nil { cmn.Exit(err.Error()) } return app } // custom tx codec //將相關的編碼器註冊到相關的各方 func MakeCodec() *wire.Codec { var cdc = wire.NewCodec() ibc.RegisterWire(cdc) bank.RegisterWire(cdc) stake.RegisterWire(cdc) slashing.RegisterWire(cdc) auth.RegisterWire(cdc) sdk.RegisterWire(cdc) wire.RegisterCrypto(cdc) return cdc } //其下為具體的上面的HANDLER的設定 // application updates every end block func (app *GaiaApp) BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock) abci.ResponseBeginBlock { tags := slashing.BeginBlocker(ctx, req, app.slashingKeeper) return abci.ResponseBeginBlock{ Tags: tags.ToKVPairs(), } } // application updates every end block func (app *GaiaApp) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) abci.ResponseEndBlock { validatorUpdates := stake.EndBlocker(ctx, app.stakeKeeper) return abci.ResponseEndBlock{ ValidatorUpdates: validatorUpdates, } } // custom logic for gaia initialization func (app *GaiaApp) initChainer(ctx sdk.Context, req abci.RequestInitChain) abci.ResponseInitChain { stateJSON := req.AppStateBytes // TODO is this now the whole genesis file? var genesisState GenesisState err := app.cdc.UnmarshalJSON(stateJSON, &genesisState) if err != nil { panic(err) // TODO https://github.com/cosmos/cosmos-sdk/issues/468 // return sdk.ErrGenesisParse("").TraceCause(err, "") } // load the accounts for _, gacc := range genesisState.Accounts { acc := gacc.ToAccount() app.accountMapper.SetAccount(ctx, acc) } // load the initial stake information stake.InitGenesis(ctx, app.stakeKeeper, genesisState.StakeData) return abci.ResponseInitChain{} } |
// Implements ABCI // InitChain runs the initialization logic directly on the CommitMultiStore and commits it. func (app *BaseApp) InitChain(req abci.RequestInitChain) (res abci.ResponseInitChain) { if app.initChainer == nil { return } // Initialize the deliver state and run initChain app.setDeliverState(abci.Header{}) app.initChainer(app.deliverState.ctx, req) // no error // NOTE: we don't commit, but BeginBlock for block 1 // starts from this deliverState return } func (app *BaseApp) setDeliverState(header abci.Header) { ms := app.cms.CacheMultiStore() app.deliverState = &state{ ms: ms, ctx: sdk.NewContext(ms, header, false, nil, app.Logger), } } |
// application updates every end block func (app *GaiaApp) BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock) abci.ResponseBeginBlock { tags := slashing.BeginBlocker(ctx, req, app.slashingKeeper) return abci.ResponseBeginBlock{ Tags: tags.ToKVPairs(), } } // slashing begin block functionality func BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, sk Keeper) (tags sdk.Tags) { // Tag the height heightBytes := make([]byte, 8) binary.LittleEndian.PutUint64(heightBytes, uint64(req.Header.Height)) tags = sdk.NewTags("height", heightBytes) // Deal with any equivocation evidence for _, evidence := range req.ByzantineValidators { pk, err := tmtypes.PB2TM.PubKey(evidence.Validator.PubKey) if err != nil { panic(err) } switch string(evidence.Type) { case tmtypes.ABCIEvidenceTypeDuplicateVote: //處理驗證器在同一高度簽名兩個塊 sk.handleDoubleSign(ctx, evidence.Height, evidence.Time, pk) default: ctx.Logger().With("module", "x/slashing").Error(fmt.Sprintf("Ignored unknown evidence type: %s", string(evidence.Type))) } } // Iterate over all the validators which *should* have signed this block for _, validator := range req.Validators { present := validator.SignedLastBlock pubkey, err := tmtypes.PB2TM.PubKey(validator.Validator.PubKey) if err != nil { panic(err) } sk.handleValidatorSignature(ctx, pubkey, present) } return } |
// Implements ABCI func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBlock) { if app.endBlocker != nil { res = app.endBlocker(app.deliverState.ctx, req) } else { res.ValidatorUpdates = app.valUpdates } return } |
// Implements ABCI func (app *BaseApp) Commit() (res abci.ResponseCommit) { header := app.deliverState.ctx.BlockHeader() /* // Write the latest Header to the store headerBytes, err := proto.Marshal(&header) if err != nil { panic(err) } app.db.SetSync(dbHeaderKey, headerBytes) */ // Write the Deliver state and commit the MultiStore app.deliverState.ms.Write() commitID := app.cms.Commit() app.Logger.Debug("Commit synced", "commit", commitID, ) // Reset the Check state to the latest committed // NOTE: safe because Tendermint holds a lock on the mempool for Commit. // Use the header from this latest block. app.setCheckState(header) // Empty the Deliver state app.deliverState = nil return abci.ResponseCommit{ Data: commitID.Hash, } } |
// Implements ABCI. // Delegates to CommitMultiStore if it implements Queryable func (app *BaseApp) Query(req abci.RequestQuery) (res abci.ResponseQuery) { path := strings.Split(req.Path, "/") // first element is empty string if len(path) > 0 && path[0] == "" { path = path[1:] } // "/app" prefix for special application queries if len(path) >= 2 && path[0] == "app" { var result sdk.Result switch path[1] { case "simulate": txBytes := req.Data tx, err := app.txDecoder(txBytes) if err != nil { result = err.Result() } else { result = app.Simulate(tx) } default: result = sdk.ErrUnknownRequest(fmt.Sprintf("Unknown query: %s", path)).Result() } value := app.cdc.MustMarshalBinary(result) return abci.ResponseQuery{ Code: uint32(sdk.ABCICodeOK), Value: value, } } // "/store" prefix for store queries if len(path) >= 1 && path[0] == "store" { queryable, ok := app.cms.(sdk.Queryable) if !ok { msg := "multistore doesn't support queries" return sdk.ErrUnknownRequest(msg).QueryResult() } req.Path = "/" + strings.Join(path[1:], "/") return queryable.Query(req) } // "/p2p" prefix for p2p queries if len(path) >= 4 && path[0] == "p2p" { if path[1] == "filter" { if path[2] == "addr" { return app.FilterPeerByAddrPort(path[3]) } if path[2] == "pubkey" { return app.FilterPeerByPubKey(path[3]) } } } msg := "unknown query path" return sdk.ErrUnknownRequest(msg).QueryResult() } |
// Implements ABCI func (app *BaseApp) CheckTx(txBytes []byte) (res abci.ResponseCheckTx) { // Decode the Tx. var result sdk.Result var tx, err = app.txDecoder(txBytes) if err != nil { result = err.Result() } else { result = app.runTx(runTxModeCheck, txBytes, tx) } return abci.ResponseCheckTx{ Code: uint32(result.Code), Data: result.Data, Log: result.Log, GasWanted: result.GasWanted, GasUsed: result.GasUsed, Fee: cmn.KI64Pair{ []byte(result.FeeDenom), result.FeeAmount, }, Tags: result.Tags, } } |
// IBC transfer command func IBCTransferCmd(cdc *wire.Codec) *cobra.Command { cmd := &cobra.Command{ Use: "transfer", RunE: func(cmd *cobra.Command, args []string) error { ctx := context.NewCoreContextFromViper().WithDecoder(authcmd.GetAccountDecoder(cdc)) // get the from address from, err := ctx.GetFromAddress() if err != nil { return err } // build the message msg, err := buildMsg(from) if err != nil { return err } // get password res, err := ctx.EnsureSignBuildBroadcast(ctx.FromAddressName, msg, cdc) if err != nil { return err } fmt.Printf("Committed at block %d. Hash: %s\n", res.Height, res.Hash.String()) return nil }, } cmd.Flags().String(flagTo, "", "Address to send coins") cmd.Flags().String(flagAmount, "", "Amount of coins to send") cmd.Flags().String(flagChain, "", "Destination chain to send coins") return cmd } |
// flags--代表從一個空間轉向另外一個窠 const ( FlagFromChainID = "from-chain-id" FlagFromChainNode = "from-chain-node" FlagToChainID = "to-chain-id" FlagToChainNode = "to-chain-node" ) type relayCommander struct { cdc *wire.Codec address sdk.Address decoder auth.AccountDecoder mainStore string ibcStore string accStore string logger log.Logger } // IBC relay command func IBCRelayCmd(cdc *wire.Codec) *cobra.Command { cmdr := relayCommander{ cdc: cdc, decoder: authcmd.GetAccountDecoder(cdc), ibcStore: "ibc", mainStore: "main", accStore: "acc", logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)), } cmd := &cobra.Command{ Use: "relay", Run: cmdr.runIBCRelay, } cmd.Flags().String(FlagFromChainID, "", "Chain ID for ibc node to check outgoing packets") cmd.Flags().String(FlagFromChainNode, "tcp://localhost:46657", "<host>:<port> to tendermint rpc interface for this chain") cmd.Flags().String(FlagToChainID, "", "Chain ID for ibc node to broadcast incoming packets") cmd.Flags().String(FlagToChainNode, "tcp://localhost:36657", "<host>:<port> to tendermint rpc interface for this chain") cmd.MarkFlagRequired(FlagFromChainID) cmd.MarkFlagRequired(FlagFromChainNode) cmd.MarkFlagRequired(FlagToChainID) cmd.MarkFlagRequired(FlagToChainNode) viper.BindPFlag(FlagFromChainID, cmd.Flags().Lookup(FlagFromChainID)) viper.BindPFlag(FlagFromChainNode, cmd.Flags().Lookup(FlagFromChainNode)) viper.BindPFlag(FlagToChainID, cmd.Flags().Lookup(FlagToChainID)) viper.BindPFlag(FlagToChainNode, cmd.Flags().Lookup(FlagToChainNode)) return cmd } //啟動遍歷監聽 func (c relayCommander) runIBCRelay(cmd *cobra.Command, args []string) { fromChainID := viper.GetString(FlagFromChainID) fromChainNode := viper.GetString(FlagFromChainNode) toChainID := viper.GetString(FlagToChainID) toChainNode := viper.GetString(FlagToChainNode) address, err := context.NewCoreContextFromViper().GetFromAddress() if err != nil { panic(err) } c.address = address c.loop(fromChainID, fromChainNode, toChainID, toChainNode) } func (c relayCommander) loop(fromChainID, fromChainNode, toChainID, toChainNode string) { ctx := context.NewCoreContextFromViper() // get password passphrase, err := ctx.GetPassphraseFromStdin(ctx.FromAddressName) if err != nil { panic(err) } ingressKey := ibc.IngressSequenceKey(fromChainID) OUTER: for { time.Sleep(5 * time.Second) processedbz, err := query(toChainNode, ingressKey, c.ibcStore) if err != nil { panic(err) } var processed int64 if processedbz == nil { processed = 0 } else if err = c.cdc.UnmarshalBinary(processedbz, &processed); err != nil { panic(err) } lengthKey := ibc.EgressLengthKey(toChainID) egressLengthbz, err := query(fromChainNode, lengthKey, c.ibcStore) if err != nil { c.logger.Error("Error querying outgoing packet list length", "err", err) continue OUTER //TODO replace with continue (I think it should just to the correct place where OUTER is now) } var egressLength int64 if egressLengthbz == nil { egressLength = 0 } else if err = c.cdc.UnmarshalBinary(egressLengthbz, &egressLength); err != nil { panic(err) } if egressLength > processed { c.logger.Info("Detected IBC packet", "number", egressLength-1) } seq := c.getSequence(toChainNode) for i := processed; i < egressLength; i++ { egressbz, err := query(fromChainNode, ibc.EgressKey(toChainID, i), c.ibcStore) if err != nil { c.logger.Error("Error querying egress packet", "err", err) continue OUTER // TODO replace to break, will break first loop then send back to the beginning (aka OUTER) } err = c.broadcastTx(seq, toChainNode, c.refine(egressbz, i, passphrase)) seq++ if err != nil { c.logger.Error("Error broadcasting ingress packet", "err", err) continue OUTER // TODO replace to break, will break first loop then send back to the beginning (aka OUTER) } c.logger.Info("Relayed IBC packet", "number", i) } } } func (c relayCommander) broadcastTx(seq int64, node string, tx []byte) error { _, err := context.NewCoreContextFromViper().WithNodeURI(node).WithSequence(seq + 1).BroadcastTx(tx) return err } //處理接收的訊息 func (c relayCommander) refine(bz []byte, sequence int64, passphrase string) []byte { var packet ibc.IBCPacket if err := c.cdc.UnmarshalBinary(bz, &packet); err != nil { panic(err) } msg := ibc.IBCReceiveMsg{ IBCPacket: packet, Relayer: c.address, Sequence: sequence, } ctx := context.NewCoreContextFromViper().WithSequence(sequence) res, err := ctx.SignAndBuild(ctx.FromAddressName, passphrase, msg, c.cdc) if err != nil { panic(err) } return res } |
// RegisterRoutes - Central function to define routes that get registered by the main application func RegisterRoutes(ctx context.CoreContext, r *mux.Router, cdc *wire.Codec, kb keys.Keybase) { r.HandleFunc("/ibc/{destchain}/{address}/send", TransferRequestHandlerFn(cdc, kb, ctx)).Methods("POST") } |