如果您正在開發自定義區塊鏈,您可能希望允許用戶訪問自定義對象。在某些情況下,您可能還希望在某些感興趣的對象發生變化時通知用戶,這樣他們就不必經常輪詢它。
這就是為什麼我們將向您展示如何通過利用Substrate 提供的存儲和存儲更改通知流為您定義的對象創建自定義RPC 訂閱👆🏻
首先,確保您已經設置了RPC 模塊,因為訂閱方法將被定義為其中的一部分。查找有關實現自定義RPC 的更多信息 這裡.
出於本演示的目的,我們將使用以下內容 自定義對象 結構體,與相關聯 對象狀態 枚舉:
pub struct CustomObj {
pub name: Vec<u8>,
pub status: ObjStatus,
pub count: u32,
}pub enum ObjStatus {
Green,
Yellow,
Red,
Blue,
}
為簡單起見,我們假設我們的存儲中只有這些對象之一,並且該值存儲在 自定義對象值 存儲值。公開的允許用戶訂閱和取消訂閱此對象狀態的RPC 方法是“rpc_customSubscribe“, 和”rpc_custom取消訂閱“ 分別。
訂閱由一對訂閱-取消訂閱方法組成,在收到訂閱或取消訂閱請求時調用。這些方法使用 發布訂閱 宏來指示它們屬於哪個訂閱,以及它們關聯的RPC 方法。
讓我們繼續定義當“rpc_customSubscribe” 由用戶調用:
#[pubsub(
subscription = "custom",
subscribe,
name = "rpc_customSubscribe",
)]
fn custom_subscribe(&self, _: Self::Metadata, _: Subscriber<S>);
如果我們希望我們的subscribe 方法從用戶那裡接收參數,我們可以相應地更改簽名:
fn custom_subscribe(&self, _: Self::Metadata, subscriber: Subscriber<S>, param: T);
使用 發布訂閱,我們指定 custom_subscribe 是“自定義”訂閱的一部分,它是訂閱方法,用戶可以通過“rpc_customSubscribe” RPC 方法。
你會記得我們之前提到的訂閱是由訂閱🙋🏻和取消訂閱🙅🏻兩種方法組成的。因此,在我們添加 退訂 我們的“自定義”訂閱方法:
#[pubsub(
subscription = "custom",
unsubscribe,
name = "rpc_customUnsubscribe",
)]
fn custom_unsubscribe(&self, _: Option<Self::Metadata>, _: SubscriptionId) -> jsonrpc_core::Result<bool>;
✅ 退訂方法必須相同 訂閱,即我們的例子中的“custom”,並且必須用註釋 退訂. 我們可以為它選擇任何我們想要的名稱,但為了一致性(以及使用我們的RPC 接口的人的理智😌),我們使用從subscribe 方法派生的名稱。
與我們相關的subscribe-unsubscribe 方法的參數是 訂閱者:訂閱者 和 參數:T. S 表示作為通知發送給訂閱者的項目的數據類型,T 表示參數的數據類型,如果我們的subscribe 方法接受的話。在我們的例子中, 自定義對象值 是CustomObj 類型的單個獨立值,但是如果我們使用StorageMap 來保存多個CustomObj 項目,那麼我們可以使用 噸 索引StorageMap。
讓我們假設 custom_subscribe() 和 custom_unsubcribe() 是一個 應用程序接口 trait,我們希望這個trait 由我們的 處理程序 類型。 RpcHandler 必須定義什麼 元數據 類型是,並且還將有一個 訂閱管理器 為我們做大部分繁重的工作。
對於元數據類型,我們將簡單地使用 sc_rpc::元數據:
struct RpcHandler<C> {
type Metadata: sc_rpc::Metadata; -- snip --
client: Arc<C>,
manager: jsonrpc_pubsub::manager::SubscriptionManager,}
訂閱管理器 負責為每個訂閱分配一個唯一標識符,向訂閱者發送通知,並處理取消訂閱過程。我們所要做的就是將我們希望我們的訂閱者以一種形式接收的數據傳遞給它 未來, 經理會負責其他一切💫
要使用存儲更改通知流,我們的 處理程序 需要一個暴露由定義的方法的客戶端 區塊鏈事件 trait — Substrate 客戶端已經這樣做了,所以我們要做的就是在我們的RPC 模塊中指定它:
impl<C> RpcApi<<Block as BlockT>::Hash> for RpcHandler<C>
where
--snip--
C: BlockchainEvents<Block>
下面是 custom_subscribe RpcHandler 實現的方法:
fn custom_subscribe(
&self,
_metadata: Self::Metadata,
subscriber: Subscriber<CustomObj>,
){
// 1. compute the key for our object
let my_module = twox_128(b”MyModule”);
let obj = twox_128(b”CustomObjValue”);
let mut key = vec![];
key.extend(my_module);
key.extend(obj);
let key: StorageKey = StorageKey(key);
let keys = Into::<Option<Vec<_>>>::into(vec![key]); // 2. obtain a stream that will receive all storage changes
related to the key
let stream = match self
.client
.storage_changes_notification_stream(
keys.as_ref().map(|x| &**x), None)
{
Ok(stream) => stream,
Err(err) => {
let _ = subscriber.reject(client_err(err).into());
return;
}
}; // 3. manipulate the stream before passing it to the
SubscriptionManager
let stream = stream
.filter_map(move |(block, changes)| match
get_custom_obj(changes) {
Ok(state) => future::ready(
Some(
Ok::<_, ()>(Ok(state))
)
),
Err(_) => future::ready(None),
})
.compat();
// 4. add this Subscriber to our SubscriptionManager
self.manager.add(subscriber, |sink| {
sink.sink_map_err(|e| warn!(“Error sending notifications:
{:?}”, e))
.send_all(stream)
.map(|_| ())
});
}
我們也重現 get_custom_obj(),這需要 存儲變更集 通過流發送並將其轉換為 自定義對象.
👉🏻我們只收到與密鑰相關的更改 自定義對象值,所以我們要做的就是將數據解碼為CustomObj。如果數據無法解碼,我們返回一個 錯誤 並且流將丟棄它,確保用戶只收到有效的CustomObj 項目。
fn get_custom_obj(changes: StorageChangeSet) -> Result<CustomObj> {
for (_, _, data) in changes.iter() {
match data {
Some(data) => {
let mut value: &[u8] = &data.0.clone();
match CustomObj::decode(&mut value) {
Ok(obj) => return Ok(obj),
Err(_) => warn!("unable to decode object"),
}
}
None => warn!("empty change set"),
};
}
Err(Error::internal_error())
}
讓我們仔細看看那裡發生了什麼🔎
- 我們計算存儲中對象的鍵; 在這種情況下,對像在 我的模塊,它的名字是 自定義對象值. 根據您的數據的存儲方式,密鑰的計算會有所不同——這裡有一個 複習 關於如何計算密鑰。
- 使用密鑰,我們請求一個流,即一個通道的接收端(更準確地說,一個 追踪無界接收者); 每當我們的狀態 自定義對象 更改,這些更改將作為 (塊::哈希,存儲變更集) 元組。
- 一旦我們有了流,我們必須以一種可以傳遞給 訂閱管理器. 此外,我們還想將 (塊::哈希,存儲變更集) 更新為我們的訂閱者希望收到的CustomObj 通知。
- 最後,我們稱 添加 的功能 訂閱管理器 並向它傳遞一個閉包,該閉包會將流上收到的所有項目發送給訂閱者。如果您想在收到訂閱請求時發送初始值,您可以將其添加到流中:
let initial: CustomObj = --snip--;
self.manager.add(subscriber, |sink| {
sink.sink_map_err(|e| warn!("Error sending notifications:
{:?}", e))
.send_all(stream::iter_result(vec!
[Ok(Ok((initial))]).chain(stream))
.map(|_| ())
});
可以通過調用拒絕訂閱 拒絕 訂閱者的方法,就像我們上面所做的那樣,以防無法獲得流。您也可以根據特定於應用程序的原因拒絕訂閱。
我們訂閱的取消訂閱方法非常簡單,我們只需要告訴 訂閱管理器 取消與關聯的訂閱 訂閱ID:
fn custom_unsubscribe(&self, _metadata: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
Ok(self.manager.cancel(id))}
是時候檢查我們的實現是否有效了。 RPC 訂閱是通過WebSockets 進行的,因此我們使用 貓 連接到我們本地節點的WebSocket 端口並發出訂閱請求:
$ wscat -c ws://127.0.0.1:9944
> {"id":1, "jsonrpc":"2.0", "method":"rpc_customSubscribe"}
節點會向我們發回一個類似這樣的響應👇🏻
< {"jsonrpc":"2.0","result":"j5S1whvZETzBtXFn","id":1}
這裡, ”j5S1whvZETzBtXFn”是我們訂閱的ID,我們稍後會用它來取消訂閱。 這也是價值 custom_unsubscribe() 叫做。
我們將通過Polkadot UI 使用Sudo 托盤來更改存儲並觸發對訂閱者的通知。
JSON 鍵/值文件的格式為
{"key": "value"}
我們在本演示中使用的值下方提供(我們不包括鍵,因為它取決於CustomObjValue 的存儲位置)。我們從設置開始 自定義對象值 到”0x10aabbccdd0000000000”,對應於名稱: 0xaabbccdd, 地位: 綠 和計數器: 0.
然後,我們通過將CustomObjValue 設置為以下內容來模擬計數器的遞增兩次,狀態從綠色🟢變為紅色🔴:
“0x10aabbccdd0001000000”“0x10aabbccdd0002000000”“0x10aabbccdd0202000000”
我們還將值更改為“0x11aabbccdd0211000000”; 但是,由於這是一個無效的CustomObj 值,因此在進行此更改時用戶不會收到通知。
最後,我們將值設置為“0x10aabbccdd0312000000”,然後我們通過發出以下命令取消訂閱:
> {"id":1, "jsonrpc":"2.0", "method":"rpc_customUnsubscribe", "params":["j5S1whvZETzBtXFn"]}
此圖顯示了我們與WebSocket 端點的交互:我們發出的命令為白色字體,響應和更新為藍色:
用sudo更新值的全過程和收到的通知📹👇🏻
在這篇文章中,我們✨ 實現了一個訂閱,用於在特定對象的狀態發生變化時接收通知✨
值得注意的是, 區塊鏈事件 trait 定義了兩個額外的函數, 導入通知流 和 finality_notification_stream. 如果您要提供的更新與對象狀態無關,而是與塊創建或終結有關,那麼這兩個函數可能會派上用場!