首先看看Khipu官方github的介紹(來自https://github.com/khipu-io )
就是說Khipu是一個基於Scala/Akka實現的Ethereum協議,目前正在開發中,已經上線的alpha版本的主要feature包括:
1. 儘可能並行的執行block內部的交易。目前80%的block內部的交易都可以並行執行。
2. 為blockchain專門設計實現了一個儲存引擎(Kesque),基於Kafka的log引擎開發,對於99%以上的隨機讀只需要1次disk io。
再來看看專案進展
目前Khipu實現了3個大的模組:
* 節點發現(訪問)
* 快速同步(同步到近期的狀態資料快照和所有的區塊)
* 常規同步(同步區塊並執行區塊內包含的交易)
後續待開發的feature就不細說了。簡單來說就是目前這個0.1.0-alpha版本實現了一個不完全節點(不能說輕節點因為不是光同步區塊頭),或者說資料備份節點的功能。它可以同步區塊,執行並驗證區塊交易的合法性。但不能作為一個完整的節點存在因為還不能接受,或者產生交易,也不能出塊因為挖礦共識這塊還沒有實現。雖然協議實現還不完全,但專案設計和實現上都有不少值得借鑑的地方,也能看出開發者做了一番深入的思考。
Khipu設計思路
和github上的介紹一致,Khipu的設計思路是比較清晰的,如圖所示:
整個service由兩大塊組成:Akka部分主要負責處理區塊(內部的交易),Kafka部分完成一個database的功能,用於儲存區塊和各種狀態資料。首先我們看看Khipu如何處理區塊。
區塊處理
由於Khipu實現基於Akka,先簡單介紹一下akka和actor模型。
什麼是akka
Akka是一個用scala編寫的庫,用於簡化編寫可容錯的,可擴充套件的,高併發應用。akka使用actor模型來提升抽象能力,提供更好的平臺來構建可擴充套件的,彈性的應用。對於比較難處理的錯誤,akka採用“let it crash”模型來處理,這種模式可以使得一個任務的處理失敗不會導致整個應用的crash,使你的系統擁有強大的自愈能力,也不需要重啟來恢復系統。同時akka的分散式部署更加簡單透明。
actor模型
actor模型並非什麼新鮮事務,它早在20世紀70年代就被提出了,主要目的是為了解決分散式程式設計中的一系列問題。actor模型具有以下優點:1.更簡單的、高度抽象的併發處理。2.非同步的,非阻塞的,高效能的事件驅動程式設計模型。3.非常輕量級的事件驅動處理。
基於上述兩個模組,Khipu的區塊處理邏輯不復雜,其主要的工作都是在並行執行區塊交易上,這段並行處理的邏輯就是一套map reduce,(極端)簡化的流程如下圖:(要感謝Scala的map reduce特性,換Java來估計程式碼量翻好多倍)
這裡Map task容易理解,分開區塊內的交易到多個並行的世界裡面執行各自的交易並改變各自的狀態,主要的問題在於Reduce階段,如何合併這一系列的世界狀態。在合併這一塊,Khipu採用了衝突檢測的辦法,定義了兩種不同的衝突模式(並行的狀態衝突和世界狀態衝突)。
object ProgramState { trait ParallelRace case object OnAccount extends ParallelRace case object OnError extends ParallelRace } object BlockWorldState { sealed trait RaceCondition case object OnAddress extends RaceCondition case object OnAccount extends RaceCondition case object OnStorage extends RaceCondition case object OnCode extends RaceCondition } |
lazy val kesque = new Kesque(kafkaProps) log.info(s"Kesque started using config file: $kafkaConfigFile") private val futureTables = Future.sequence(List( Future(kesque.getTable(Array(KesqueDataSource.account))), Future(kesque.getTable(Array(KesqueDataSource.storage))), Future(kesque.getTable(Array(KesqueDataSource.evmcode))), Future(kesque.getTimedTable(Array( KesqueDataSource.header, KesqueDataSource.body, KesqueDataSource.receipts ), 1024000)) )) private val List(accountTable, storageTable, evmcodeTable, blockTable) = Await.result(futureTables, Duration.Inf) //private val headerTable = kesque.getTimedTable(Array(KesqueDataSource.header), 1024000) //private val bodyTable = kesque.getTable(Array(KesqueDataSource.body), 1024000) //private val receiptTable = kesque.getTable(Array(KesqueDataSource.receipts), 1024000) lazy val accountNodeDataSource = new KesqueDataSource(accountTable, KesqueDataSource.account) lazy val storageNodeDataSource = new KesqueDataSource(storageTable, KesqueDataSource.storage) lazy val evmCodeDataSource = new KesqueDataSource(evmcodeTable, KesqueDataSource.evmcode) lazy val blockHeadersDataSource = new KesqueDataSource(blockTable, KesqueDataSource.header) lazy val blockBodiesDataSource = new KesqueDataSource(blockTable, KesqueDataSource.body) lazy val receiptsDataSource = new KesqueDataSource(blockTable, KesqueDataSource.receipts) |
def read(key: Array[Byte], topic: String): Option[TVal] = { try { readLock.lock val valueIndex = topicIndex(topic) caches(valueIndex).get(Hash(key)) match { case None => val hash = Hash(key) hashOffsets.get(hash.hashCode, valueIndex) match { case IntIntsMap.NO_VALUE => None case offsets => var foundValue: Option[TVal] = None var foundOffset = Int.MinValue var i = offsets.length - 1 // loop backward to find newest one while (i >= 0 && foundValue.isEmpty) { val offset = offsets(i) val (topicPartition, result) = db.read(topic, offset, fetchMaxBytes).head val recs = result.info.records.records.iterator while (recs.hasNext) { // NOTE: the records are offset resversed !! val rec = recs.next if (rec.offset == offset && java.util.Arrays.equals(db.getBytes(rec.key), key)) { foundOffset = offset foundValue = if (rec.hasValue) Some(TVal(db.getBytes(rec.value), rec.timestamp)) else None } } i -= 1 } foundValue foreach { x => caches(valueIndex).put(hash, (x, foundOffset)) } foundValue } case Some((value, offset)) => Some(value) } } finally { readLock.unlock() } } |
更多區塊鏈資訊:www.qukuaiwang.com.cn/news