p2p网络编程-3-案例实践:pubsub(代码片段)

p2p网络编程-3-案例实践:pubsub(代码片段)

libp2p网络通信中还有一种方式就是PubSub模式,也称订阅发布的模式,官方给出了订阅发布模式的一个案例=> 聊天室

在此学习记录一下

官方代码地址:https://github.com/libp2p/go-libp2p/tree/master/examples/pubsub

一、效果演示

二、代码理解

2.1 总体框架

总的来说代码构成由这五个步骤:

1~2两步较为简单不再赘述, 下面几点分点描述

2.2 创建mDNS节点发现服务

注意,使用mDNS作为节点发现需要保证所有的节点在同一个局域网即节点发现的范围在同一个局域网下

// discoveryNotifee gets notified when we find a new peer via mDNS discovery
// 节点发现的通告结构体,继承Notifee
type discoveryNotifee struct 
	h host.Host


// HandlePeerFound connects to peers discovered via mDNS. Once they're connected,
// the PubSub system will automatically start interacting with them if they also
// support PubSub.
// 继承函数,节点发现后的处理函数:自动链接节点
func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) 
	fmt.Printf("discovered new peer %s\\n", pi.ID.Pretty())
	err := n.h.Connect(context.Background(), pi)
	if err != nil 
		fmt.Printf("error connecting to peer %s: %s\\n", pi.ID.Pretty(), err)
	


// setupDiscovery creates an mDNS discovery service and attaches it to the libp2p Host.
// This lets us automatically discover peers on the same LAN and connect to them.
func setupDiscovery(ctx context.Context, h host.Host) error 
	// setup mDNS discovery to find local peers
	disc := mdns.NewMdnsService(h, DiscoveryServiceTag)

	n := discoveryNotifeeh: h
	disc.RegisterNotifee(&n)
	return nil

2.3 加入聊天房

每个节点通过订阅房间的统一topic实现PubSub

1.聊天房数据结构

与一个topic一一对应,可以通过ChatRoom.Publish在topic中发布消息,并且接收所有的消息到Messages的channel中。

type ChatRoom struct 
	// Messages is a channel of messages received from other peers in the chat room
	Messages chan *ChatMessage

	ctx   context.Context
	ps    *pubsub.PubSub
	topic *pubsub.Topic
	sub   *pubsub.Subscription

	roomName string
	self     peer.ID
	nick     string


// ChatMessage gets converted to/from JSON and sent in the body of pubsub messages.
type ChatMessage struct 
	Message    string
	SenderID   string
	SenderNick string


func (cr *ChatRoom) Publish(message string) error 
	m := ChatMessage
		Message:    message,
		SenderID:   cr.self.Pretty(),
		SenderNick: cr.nick,
	
	msgBytes, err := json.Marshal(m)
	if err != nil 
		return err
	
	return cr.topic.Publish(cr.ctx, msgBytes)

2.加入聊天房逻辑

分为三步,成功后返回一个新的ChatRoom实例

func JoinChatRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, nickname string, roomName string) (*ChatRoom, error) 
	// join the pubsub topic
	topic, err := ps.Join(topicName(roomName))
	if err != nil 
		return nil, err
	

	// and subscribe to it
	sub, err := topic.Subscribe()
	if err != nil 
		return nil, err
	

	cr := &ChatRoom
		ctx:      ctx,
		ps:       ps,
		topic:    topic,
		sub:      sub,
		self:     selfID,
		nick:     nickname,
		roomName: roomName,
		Messages: make(chan *ChatMessage, ChatRoomBufSize),
	

	// start reading messages from the subscription in a loop
	go cr.readLoop()
	return cr, nil

发布和订阅较为直观,下面是循环读取:

3.循环读取消息内容

循环读取内容,并将被容加入到消息channel中

// readLoop pulls messages from the pubsub topic and pushes them onto the Messages channel.
func (cr *ChatRoom) readLoop() 
	for 
		// Next returns the next message in our subscription
		// 找到下一个消息
		msg, err := cr.sub.Next(cr.ctx)
		if err != nil 
			close(cr.Messages)
			return
		
		// only forward messages delivered by others
		// 只接收别人的消息
		if msg.ReceivedFrom == cr.self 
			continue
		
		// 反序列化
		cm := new(ChatMessage)
		err = json.Unmarshal(msg.Data, cm)
		if err != nil 
			continue
		
		// send valid messages onto the Messages channel
		// 把消息加入 Messages channel
		cr.Messages <- cm
	

4.获取当前topic所有连接者

func (cr *ChatRoom) ListPeers() []peer.ID 
  // ListPeers returns a list of peers we are connected to in the given topic.
	return cr.ps.ListPeers(topicName(cr.roomName))

对于UI部分不是重点,会使用即可

总体来说案例使用较为简单,可以快速上手!

觉得不错的话,请点赞关注呦~~你的关注就是博主的动力
关注公众号,查看更多go开发、密码学和区块链科研内容:

相关内容

react使用pubsub事件订阅,组件间通信

1、PubSub使用方式

1.1 react导入库

npm install pubsub-js --save

1.2 react 页面引入pubsubjs

import PubSub from ‘pubsub-js‘

1.3 pubsubjs使用

发送消息:PubSub.publish(名称,参数)
订阅消息:PubSub.subscrib(名称,函数)
取消订阅:PubSub.unsubscrib(名称)
 
PS:pubsubjs源码及使用详情https://github.com/mroderick/PubSubJS
 

2、React实例使用监听实现传参

 

2.1 子页面home.js使用PubSub.publish发送state

[javascript] view plain copy
  1. import React,  Component  from ‘react‘;  
  2. import PubSub from ‘pubsub-js‘;  
  3. class Home extends Component   
  4.   constructor(props)  
  5.     super(props);  
  6.     this.state=  
  7.       increase:‘increase‘,  
  8.       decrease:‘decrease‘  
  9.       
  10.     
  11.   buttonIncrease()  
  12.     PubSub.publish(‘PubSubmessag‘,this.state.increase);  
  13.     
  14.   buttonDecrease()  
  15.      PubSub.publish(‘PubSubmessage‘, this.state.decrease);  
  16.     
  17.   render()   
  18.     return (  
  19.       <div>  
  20.         Some state changes:  
  21.         <button onClick=this.buttonIncrease.bind(this)>Increase</button>  
  22.         <button onClick=this.buttonDecrease.bind(this)>Decrease</button>  
  23.       </div>  
  24.     )  
  25.     
  26.   
  27. export default Home;  

2.2 父页面App.js接收使用PubSub.subscribe订阅指定消息,PubSub.unsubscribe取消订阅消息

[javascript] view plain copy
  1. import React,  Component  from ‘react‘;  
  2. import  Link from ‘react-router-dom‘;  
  3. import PubSub from ‘pubsub-js‘;  
  4.   
  5. export default class App extends Component  
  6. constructor(props)  
  7.   super(props);  
  8.   this.state=  
  9.     increase:‘none‘,  
  10.     
  11.   
  12. componentDidMount()  
  13.   this.pubsub_token = PubSub.subscribe(‘PubSubmessage‘, function (topic,message)   
  14.     this.setState(  
  15.       increase: message  
  16.     );  
  17.   .bind(this));  
  18.   
  19. componentWillUnmount()  
  20.   PubSub.unsubscribe(this.pubsub_token);  
  21.   
  22.   render()   
  23.   return (  
  24.     <div>  
  25.       <header>  
  26.         Links:       
  27.         <Link to="/App/home">Home</Link>     
  28.       </header>   
  29.       <div style= marginTop: ‘1.5em‘ > this.props.children</div>  
  30.       <div style= marginTop: ‘1.5em‘ > this.state.increase</div>  
  31.     </div>  
  32.   )  
  33.   
  34.   

当在子页面单击increase、decrease按钮,会发送不同的消息给父页面,父页面收到消息,利用this.state.increase进行呈现,此时你会发现它是实时变化的,因为它会实时监听子组件发送的消息。

 

PS:React-Router4.0建立子父组件关系

子父组件关系建立是依靠React-Router4.0来建立的,附上子父组件关系的源码,若对RR4.0不太了解,可参考http://blog.csdn.net/zfan520/article/details/78563034
[javascript] view plain copy
  1. import React,  Component  from ‘react‘;  
  2. import BrowserRouter  from ‘react-router-dom‘;  
  3. import  Router, Route,  from ‘react-router‘  
  4.   
  5. import  App from ‘../components/App.js‘  
  6. import  Home from ‘../components/Home.js‘  
  7.   
  8. export default class RouterIndex extends Component   
  9.   render()   
  10.     return (   
  11.           <BrowserRouter>  
  12.             <App path="/App" component=App>  
  13.               <Route path="/App/home" component=Home />  
  14.             </App>  
  15.           </BrowserRouter>  
  16.     )  
  17.     
  18.  
赞(1)

文章来源于网络,原文链接请点击 这里
文章版权归作者所有,如作者不同意请直接联系小编删除。
作者:文杰@