p2p网络编程-3-案例实践:pubsub(代码片段)
libp2p网络通信中还有一种方式就是PubSub模式,也称订阅发布的模式,官方给出了订阅发布模式的一个案例=> 聊天室
在此学习记录一下
官方代码地址:https://github.com/libp2p/go-libp2p/tree/master/examples/pubsub
一、效果演示
二、代码理解
2.1 总体框架
总的来说代码构成由这五个步骤:

1~2两步较为简单不再赘述, 下面几点分点描述
2.2 创建mDNS节点发现服务
注意,使用mDNS作为节点发现需要保证所有的节点在同一个局域网即节点发现的范围在同一个局域网下
// discoveryNotifee gets notified when we find a new peer via mDNS discovery
// 节点发现的通告结构体,继承Notifee
type discoveryNotifee struct
h host.Host
// HandlePeerFound connects to peers discovered via mDNS. Once they're connected,
// the PubSub system will automatically start interacting with them if they also
// support PubSub.
// 继承函数,节点发现后的处理函数:自动链接节点
func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo)
fmt.Printf("discovered new peer %s\\n", pi.ID.Pretty())
err := n.h.Connect(context.Background(), pi)
if err != nil
fmt.Printf("error connecting to peer %s: %s\\n", pi.ID.Pretty(), err)
// setupDiscovery creates an mDNS discovery service and attaches it to the libp2p Host.
// This lets us automatically discover peers on the same LAN and connect to them.
func setupDiscovery(ctx context.Context, h host.Host) error
// setup mDNS discovery to find local peers
disc := mdns.NewMdnsService(h, DiscoveryServiceTag)
n := discoveryNotifeeh: h
disc.RegisterNotifee(&n)
return nil
2.3 加入聊天房
每个节点通过订阅房间的统一topic实现PubSub
1.聊天房数据结构
与一个topic一一对应,可以通过ChatRoom.Publish在topic中发布消息,并且接收所有的消息到Messages的channel中。
type ChatRoom struct
// Messages is a channel of messages received from other peers in the chat room
Messages chan *ChatMessage
ctx context.Context
ps *pubsub.PubSub
topic *pubsub.Topic
sub *pubsub.Subscription
roomName string
self peer.ID
nick string
// ChatMessage gets converted to/from JSON and sent in the body of pubsub messages.
type ChatMessage struct
Message string
SenderID string
SenderNick string
func (cr *ChatRoom) Publish(message string) error
m := ChatMessage
Message: message,
SenderID: cr.self.Pretty(),
SenderNick: cr.nick,
msgBytes, err := json.Marshal(m)
if err != nil
return err
return cr.topic.Publish(cr.ctx, msgBytes)
2.加入聊天房逻辑

分为三步,成功后返回一个新的ChatRoom实例
func JoinChatRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, nickname string, roomName string) (*ChatRoom, error)
// join the pubsub topic
topic, err := ps.Join(topicName(roomName))
if err != nil
return nil, err
// and subscribe to it
sub, err := topic.Subscribe()
if err != nil
return nil, err
cr := &ChatRoom
ctx: ctx,
ps: ps,
topic: topic,
sub: sub,
self: selfID,
nick: nickname,
roomName: roomName,
Messages: make(chan *ChatMessage, ChatRoomBufSize),
// start reading messages from the subscription in a loop
go cr.readLoop()
return cr, nil
发布和订阅较为直观,下面是循环读取:
3.循环读取消息内容
循环读取内容,并将被容加入到消息channel中
// readLoop pulls messages from the pubsub topic and pushes them onto the Messages channel.
func (cr *ChatRoom) readLoop()
for
// Next returns the next message in our subscription
// 找到下一个消息
msg, err := cr.sub.Next(cr.ctx)
if err != nil
close(cr.Messages)
return
// only forward messages delivered by others
// 只接收别人的消息
if msg.ReceivedFrom == cr.self
continue
// 反序列化
cm := new(ChatMessage)
err = json.Unmarshal(msg.Data, cm)
if err != nil
continue
// send valid messages onto the Messages channel
// 把消息加入 Messages channel
cr.Messages <- cm
4.获取当前topic所有连接者
func (cr *ChatRoom) ListPeers() []peer.ID
// ListPeers returns a list of peers we are connected to in the given topic.
return cr.ps.ListPeers(topicName(cr.roomName))
对于UI部分不是重点,会使用即可
总体来说案例使用较为简单,可以快速上手!
觉得不错的话,请点赞关注呦~~你的关注就是博主的动力
关注公众号,查看更多go开发、密码学和区块链科研内容:
相关内容
react使用pubsub事件订阅,组件间通信
1、PubSub使用方式
1.1 react导入库
npm install pubsub-js --save
1.2 react 页面引入pubsubjs
1.3 pubsubjs使用
2、React实例使用监听实现传参
2.1 子页面home.js使用PubSub.publish发送state
- import React, Component from ‘react‘;
- import PubSub from ‘pubsub-js‘;
- class Home extends Component
- constructor(props)
- super(props);
- this.state=
- increase:‘increase‘,
- decrease:‘decrease‘
- buttonIncrease()
- PubSub.publish(‘PubSubmessag‘,this.state.increase);
- buttonDecrease()
- PubSub.publish(‘PubSubmessage‘, this.state.decrease);
- render()
- return (
- <div>
- Some state changes:
- <button onClick=this.buttonIncrease.bind(this)>Increase</button>
- <button onClick=this.buttonDecrease.bind(this)>Decrease</button>
- </div>
- )
- export default Home;
2.2 父页面App.js接收使用PubSub.subscribe订阅指定消息,PubSub.unsubscribe取消订阅消息
- import React, Component from ‘react‘;
- import Link from ‘react-router-dom‘;
- import PubSub from ‘pubsub-js‘;
- export default class App extends Component
- constructor(props)
- super(props);
- this.state=
- increase:‘none‘,
- componentDidMount()
- this.pubsub_token = PubSub.subscribe(‘PubSubmessage‘, function (topic,message)
- this.setState(
- increase: message
- );
- .bind(this));
- componentWillUnmount()
- PubSub.unsubscribe(this.pubsub_token);
- render()
- return (
- <div>
- <header>
- Links:
- <Link to="/App/home">Home</Link>
- </header>
- <div style= marginTop: ‘1.5em‘ > this.props.children</div>
- <div style= marginTop: ‘1.5em‘ > this.state.increase</div>
- </div>
- )
当在子页面单击increase、decrease按钮,会发送不同的消息给父页面,父页面收到消息,利用this.state.increase进行呈现,此时你会发现它是实时变化的,因为它会实时监听子组件发送的消息。
PS:React-Router4.0建立子父组件关系
- import React, Component from ‘react‘;
- import BrowserRouter from ‘react-router-dom‘;
- import Router, Route, from ‘react-router‘
- import App from ‘../components/App.js‘
- import Home from ‘../components/Home.js‘
- export default class RouterIndex extends Component
- render()
- return (
- <BrowserRouter>
- <App path="/App" component=App>
- <Route path="/App/home" component=Home />
- </App>
- </BrowserRouter>
- )