启明办公

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 66|回复: 0

ZMQ发布订阅模式

[复制链接]

2

主题

3

帖子

7

积分

新手上路

Rank: 1

积分
7
发表于 2022-12-12 04:21:23 | 显示全部楼层 |阅读模式
ZeroMQ的发布订阅模式是一种单向的数据发布,服务端发布,客户端接收


发布者使用PUB套接字将消息发送到队列中,订阅者使用SUB套接字从队列中源源不断的接收消息。新的订阅者可以随时加入,但之前的消息是无法接收到的;已有的订阅者可以随时退出;订阅者还可以添加“过滤器”用来有选择性的接收消息。
使用方法简介

首先要创建一个上下文环境,然后使用它创建套接字:
void *context = zmq_ctx_new ();对于服务端来说,使用”ZMQ_PUB”创建socket,并且绑定到一个周知的端口,然后便可以不断的广播消息了:
void *publisher = zmq_socket (context, ZMQ_PUB);
int rc = zmq_bind (publisher, "tcp://*:5556");如果使用TCP连接并且订阅者是慢速的,那么消息将在发布方排队;可以使用高水位标记(High-Water Marks,HWM)来定义缓冲区的大小,在ZeroMQ v2.x版本中HWM默认是无限制的,而在v3.x中默认情况下它是1000。对于PUB套接字,当到达HWM时,将丢弃数据。设置HWM参数:
zmq_setsockopt(publisher, ZMQ_SNDHWM, &nMaxNum, sizeof(nMaxNum));对于客户端来说,要使用”ZMQ_SUB”创建socket,并且链接(zmq_connect)到待订阅的服务端;此外,要想接收到服务推送的消息,还必须使用zmq_setsockopt和ZMQ_SUBSCRIBE来配置该订阅。zmq_setsockopt的ZMQ_SUBSCRIBE选项可以带有一个”过滤器“,用以选择性的接收来自服务端的消息。该”过滤器”为空,则接收全部的消息;”过滤器”还可以有多个,它们之间是”or”的关系,即接收满足任一条件的消息。当然也可以使用zmq_setsockopt配置选项ZMQ_UNSUBSCRIBE来取消订阅,示例如下:
void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://localhost:5556");

char filter1[] = "10001 ";
char filter2[] = "20002 ";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter1, strlen (filter1)); //接收消息的前缀为filter1的消息
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter2, strlen (filter2)); //接收消息的前缀为filter2的消息接收和发送消息:此处使用的方法是zmq_recv()和zmq_send(),相对于zmq_msg_send()和zmq_msg_recv(),它们会自己调用消息发送和接收的初始化方法等。
int zmq_recv (void *s, void *buf, size_t len, int flags);
int zmq_send (void *s, const void *buf, size_t len, int flags);示例代码

服务端:
#include <iostream>
#include <string>
#include <sstream>
#include <cstring>
#include <iomanip>
#include <cstdlib>
#include <ctime>
#include <assert.h>
#include <zmq.h>

using namespace std;

int main ()
{
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket (context, ZMQ_PUB);
    int rc = zmq_bind (publisher, "tcp://*:5556");
    assert (rc == 0);

    //  Initialize random number generator
    srand(time(0));
    while (1) {
        int zipcode, temperature, relhumidity;
        zipcode     = rand() % 100000;
        temperature = rand() % 215 - 80;
        relhumidity = rand() % 50 + 10;

        ostringstream os;
        os << setw(5) << setfill('0')<< zipcode <<" "
           << temperature <<" "<< relhumidity << "\n";

        zmq_send(publisher, os.str().c_str(), strlen(os.str().c_str()), 0);
    }
    zmq_close (publisher);
    zmq_ctx_destroy (context);
    return 0;
}客户端:
#include <iostream>
#include <string>
#include <cstring>
#include <assert.h>
#include <zmq.h>

using namespace std;

int main (int argc, char *argv [])
{
    //  Socket to talk to server
    printf ("Collecting updates from weather server...\n");
    void *context = zmq_ctx_new ();
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5556");
    assert (rc == 0);

    char filter1[] = "10001 ";
    char filter2[] = "20002 ";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter1, strlen (filter1));
    assert (rc == 0);
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter2, strlen (filter2));
    assert (rc == 0);

    //  Process 100 updates
    int size;
    char buffer [256];
    for (int update_nbr = 0; update_nbr < 100; update_nbr++) {

        memset(buffer, 0, 256*sizeof(char));
        size = zmq_recv (subscriber, buffer, 255, 0);
        if (size == -1){
            cout<< "receiver error , skip this message"<<endl;
            continue;
        }
        buffer[size] = '\0';
        cout << buffer <<endl;
    }

    zmq_close (subscriber);
    zmq_ctx_destroy (context);
    return 0;
}
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|启明办公

Copyright © 2001-2013 Comsenz Inc.Template by Comsenz Inc.All Rights Reserved.

Powered by Discuz!X3.4

快速回复 返回顶部 返回列表