多进程编程之进程间通信-管道和消息队列

JasonLiu1919 JasonLiu1919     2022-08-06     436

关键词:

1.进程间通信

Linux作为一种新兴的操作系统,几乎支持所有的Unix下常用的进程间通信方法:管道、消息队列、共享内存、信号量、套接口等等。

2.2.1 管道

管道是进程间通信中最古老的方式,它包括无名管道和有名管道两种,前者用于父进程和子进程间的通信,后者用于运行于同一台机器上的任意两个进程间的通信。

无名管道pipe

无名管道由pipe()函数创建:

 #include <unistd.h> 
 int pipe(int filedis[2]); 

参数filedis返回两个文件描述符:filedes[0]为读而打开,filedes[1]为写而打开。filedes[1]的输出是filedes[0]的输入。下面的例子示范了如何在父进程和子进程间实现通信。
跨越fork调用的管道:
例子代码:

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

int main()
{
    int data_processed;
    int file_pipes[2];
    const char some_data[] = "123";
    char buffer[BUFSIZ + 1];
    pid_t fork_result;

    memset(buffer, ‘‘, sizeof(buffer));

    if (pipe(file_pipes) == 0)
    {
        fork_result = fork();
        if (fork_result == -1) {
            fprintf(stderr, "Fork failure");
            exit(EXIT_FAILURE);
        }

// We‘ve made sure the fork worked, so if fork_result equals zero, we‘re in the child process.

        if (fork_result == 0)
        {
            // sleep(1);//如果父进程在子进程之前退出,则可以在两个输出内容之间看到shell的提示符
            data_processed = read(file_pipes[0], buffer, BUFSIZ);
            printf("Read %d bytes: %s
", data_processed, buffer);
            exit(EXIT_SUCCESS);
        }

// Otherwise, we must be the parent process.

        else
        {

            data_processed = write(file_pipes[1], some_data,
                                   strlen(some_data));
            printf("Wrote %d bytes
", data_processed);//不管sleep(1)还是sleep(10)多少,都是先执行父进程???

        }
    }
    exit(EXIT_SUCCESS);
}

运行结果:
技术分享

基于管道的进程间通信:

int main()
{
    int mycount=0;
    ifstream m_InStream("subdata.txt");//读取的任务列表
    int data_processed;
    int file_pipes[3][2];
    string some_data;

    pid_t fork_result[3];
    for (int i = 0; i < 3; ++i)
    {
        if (pipe(file_pipes[i]) == 0)
        {
            fork_result[i] = fork();    
            if (fork_result[i] == 0) 
            {
                close(file_pipes[i][1]);
                char buf[1024] = {0};
                while(true)
                {
                    int readsize = read(file_pipes[i][0], buf, sizeof(buf)+1);
                    buf[readsize]=‘‘;//注意这里,可以存在字符之后有其他字符,可以尝试下,注释掉该语句的效果!!
                    // printf("%X,%X,%X
",buf[readsize],buf[readsize+1],buf[readsize+2]);
                    printf("Read %d byte,data=%s
",readsize, buf);
                    std::cout << flush;// fflush(stdout);
                    if (strncmp(buf, "exit", 4) == 0)
                    {
                        break;
                    }
                }
                exit(0);        
            }
            else if(fork_result[i] == -1)
            {
                fprintf(stderr, "Fork failure");
                exit(EXIT_FAILURE);
            }
        }
    }

    string oneline;
    int count=0;
    while(getline(m_InStream, oneline,‘
‘))
    {
        some_data = oneline;
        char bufs[128] = {0};
        strncpy(bufs,some_data.c_str(),some_data.size());
        data_processed = write(file_pipes[count][1], bufs, strlen(bufs));
        printf("Wrote %d bytes,data=%s
", data_processed,bufs);
        count++;
        if(count>=3)
        {
            count = count -3;           
        }
        sleep(1);
    }
    for(int i=0; i<3; ++i)
    {
        char bufs[128] = {0};
        snprintf(bufs, sizeof(bufs), "exit");
        write(file_pipes[i][1], bufs, strlen(bufs));
        sleep(1);
    }
    int status =0;
    int mpid =0;
    for(int i=0;i<3;i++)
    {
        mpid = wait(&status);
        printf("pid[%d] is exit with status[%d]
",mpid,status);
    }
    return 0;
}

运行结果如下:
技术分享

popen和pclose

有名管道(也叫做命名管道)

基于有名管道,我们可以实现不相关进程之间的数据交换,而不必有一个共同的祖先进程。有名管道是基于FIFO文件来完成的,有名管道是一种特殊类型的文件,它在系统文件中以文件名的形式存在,行为和上述介绍的无名管道pipe类似。
有名管道可由两种方式创建:命令行方式mknod系统调用和函数mkfifo。下面的两种途径都在当前目录下生成了一个名为myfifo的有名管道:
方式一:mkfifo(“my_fifo”,”rw”);
方式二:mknod my_fifo p ,my_fifo 是指文件名
在程序中可以通过以下两个不同的函数进行调用:

#include <sys/types.h>  
#include <sys/stat.h>  
int mkfifo(const char *pathname, mode_t mode);  
int mknod(const char *pathname, mode_t mode | S_FIFO, (dev_t)0);  

生成了有名管道后,就可以使用一般的文件I/O函数如open、close、read、write等来对它进行操作。
有名管道创建示例代码:

/*创建有名管道*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>

int main()
{
    int res = mkfifo("tmp/my_fifo", 0777);
    if (res == 0)
        printf("FIFO created
");
    exit(EXIT_SUCCESS);
}

与通过pipe调用创建管道不同,FIFO是以命名文件的形式存在,而不是打开的文件描述符,所以在对它进行读写操作之前必须先打开它。FIFO也用open和close函数进行打开和关闭。传递给open函数的是FIFO的路径名,而不是一个一般下的文件。
下面即是一个简单的例子,假设我们已经创建了一个名为my_fifo的有名管道。
【待补充 】

2.2.2 消息队列

消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级。一个有写权限的进程按照一定的规则对消息队列进行信息添加,对消息队列有读权限的进程则可以从消息队列中读走消息,从而实现进程间的通信。。
代码版本1:
涉及函数:
1)创建新消息队列或者取得已经存在的消息队列
int msgget(key_t key, int msgflg);
参数:
key:可以认为是一个端口号,是用来命名某个特定的消息队列,进行消息队列标识。
msgflg:
IPC_CREAT值,若没有该队列,则创建一个并返回新标识符;若已存在,则返回原标识符。
IPC_EXCL值,若没有该队列,则返回-1;若已存在,则返回0。
2)向队列读/写消息
原型:
int msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数:
msqid:消息队列的标识码
msgp:指向消息缓冲区的指针,此位置用来暂时存储发送和接收的消息,是一个用户可定义的通用结构,形态如下:

         struct msgstru 
         { long mtype; /* 消息类型,必须 > 0 */ 
           char mtext[1024]; /* 消息文本 */ 
         };

msgsz:消息的大小。
msgtyp:从消息队列内读取的消息形态。如果值为零,则表示消息队列中的所有消息都会被读取。
msgflg:用以控制当队列中没有对应类型的消息可以接收时的处理逻辑。对于msgsnd函数,msgflg控制着当前消息队列满或消息队列到达系统范围的限制时将要发生的事情。如果msgflg设置为IPC_NOWAIT,则在msgsnd()执行时若是消息队列已满,则msgsnd()将不会阻塞,不会发送信息,立即返回-1。如果执行的是msgrcv(),则在消息队列呈空时,不做等待马上返回-1,并设定错误码为ENOMSG。当msgflg为0时,msgsnd()及msgrcv()在队列呈满或呈空的情形时,采取阻塞等待的处理模式。此时对于发送进程而已发送进程将挂起以等待队列中腾出可用的空间;对于接收进程而言,该进程将会挂起以等待一条相应类型的信息到达。
3)设置消息队列属性
原型:int msgctl ( int msgqid, int cmd, struct msqid_ds *buf );
参数:msgctl 系统调用对 msgqid 标识的消息队列执行 cmd 操作,系统定义了 3 种 cmd 操作: IPC_STAT , IPC_SET , IPC_RMID
IPC_STAT : 该命令用来获取消息队列对应的 msqid_ds 数据结构,并将其保存到 buf 指定的地址空间。
IPC_SET : 该命令用来设置消息队列的属性,要设置的属性存储在buf中。
IPC_RMID : 从内核中删除 msqid 标识的消息队列。
receive代码:

/*receive.cpp */
#include<iostream>
#include <fstream>
#include <string>
#include <memory.h>
#include<stdio.h>
#include<map>
#include<vector>
#include<algorithm>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h> 
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>

using namespace std;
#define MSGKEY 1024
#define ChildNum 5
struct msgstru
{
    long msgtype;
    char msgtext[2048];
};

/*子进程,监听消息队列*/
void childproc()
{
    struct msgstru msgs;
    int msgid,ret_value;
    char str[512];
    /* First, we set up the message queue. */
    // msgid = msgget((key_t)MSGKEY, 0666 | IPC_CREAT);//该键值则唯一对应一个消息队列
    while(1)
    {
        msgid = msgget(MSGKEY,IPC_EXCL );/*检查消息队列是否存在 */
        if(msgid < 0){
            printf("msq not existed! errno=%d [%s]
",errno,strerror(errno));
            exit(EXIT_FAILURE);
        }
        /*接收消息队列*/
        ret_value = msgrcv(msgid,&msgs,sizeof(struct msgstru),0,0);
        int len = strlen(msgs.msgtext);
        // std::cout<<len<<std::endl;
        msgs.msgtext[len] = ‘‘;
        // std::cout<<"pid="<<getpid()<<","<<msgs.msgtext<<std::endl;
        if(ret_value == -1)
        {
            fprintf(stderr, "msgrcv failed with error: %d
", errno);//消息队列中的信息被取完??
            exit(EXIT_FAILURE);//消息队列为空的时候,就跳出。也可以设计成,消息队列为空时,不跳出,而是等待。
        }
        else
        {
            printf("pid=%d,data=%s
",getpid(),msgs.msgtext);           
        }
        if (strncmp(msgs.msgtext, "end", 3) == 0)
        {
            exit(EXIT_SUCCESS);//换成break的效果呢???是不一样的啊
        }
        //因为在send的时候,只send了一个end,当该标志信息被读取之后,其他的进程自然是读取不到信息的,

    }
    return;
}

int main()
{
    int i,cpid;

    /* create 5 child process */
    for (i=0;i<ChildNum;i++){
        cpid = fork();
        if (cpid < 0)
            printf("fork failed
");
        else if (cpid ==0) /*child process*/
            childproc();
    }
    int status =0;
    int mpid =0;
    // std::cout<<"father pid="<<getpid()<<std::endl;
    for(int i=0;i<ChildNum;i++)
    {
        mpid = wait(&status);
        printf("pid[%d] is exit with status[%d]
",mpid,status);
    }
    return 1;
}

send代码:

/*send.c*/
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>

using namespace std;
#define MSGKEY 1024

struct msgstru
{
   long msgtype;
   char msgtext[2048]; 
};

main()
{
    struct msgstru msgs;
    int msg_type;
    char str[256];
    int ret_value;
    int msqid;

    msqid=msgget(MSGKEY,IPC_EXCL );  /*检查消息队列是否存在*/
    if(msqid < 0){
        msqid = msgget(MSGKEY,IPC_CREAT|0666);/*创建消息队列*/
        if(msqid <0){
            printf("failed to create msq | errno=%d [%s]
",errno,strerror(errno));
            exit(-1);
        }
    } 
    ifstream m_InStream("subdata.txt");
    string oneline;
    int running=1;
    // while(running)//getline(m_InStream, oneline,‘
‘) 
    int linenun=0;
    while (running)
    {
        if(!getline(m_InStream, oneline,‘
‘))
        {
            oneline = "end";
            running = 0;
        }
        linenun++;
        // printf("input message type[0=end process]:");
        // scanf("%d",&msg_type);
        // if (msg_type == 0)
            // break;
        // printf("input message to be sent:");
        // scanf ("%s",str);
        strncpy(str,oneline.c_str(),256);
        msgs.msgtype = linenun;//这里只是顺便记录下,该字段的值,可以写
        strcpy(msgs.msgtext, str);
        /* 发送消息队列 */
        std::cout<<"pid="<<getpid()<<","<<msgs.msgtext<<std::endl;
        ret_value = msgsnd(msqid,&msgs,sizeof(struct msgstru),0);//消息队列标识符,准备发现信息的指针,信息的长度,控制标志位
        sleep(1);
        if ( ret_value < 0 ) {
            printf("msgsnd() write msg failed,errno=%d[%s]
",errno,strerror(errno));
            exit(-1);
        }
    }
    msgctl(msqid,IPC_RMID,0); //删除消息队列
    //如果这行代码注释掉的话,则receive进程不会在接收一次send发出的任务之后,就全部子进程全部退出,而是send发送一次(即启动一次send程序)退出一个进程,这里启动了5个进程,所以,需要启动5次send程序,receive程序才会退出(每次退出一个进程)。
}

运行结果如下:
send的结果:
技术分享
receive的结果:
技术分享
对于receive之后的结果,由于在getline的时候,在最后一行之后,send程序中往消息队列中写入的是一个end字符,而在receive方,只有其中一个进程获取到end字符,然后,正常退出该进程,而其他的进程,则是由于消息队列中信息为空,获取消息队列失败而退出的。所以,在结果图中可以看到有4个进程的消息队列获取结果失败,43提示符,从而退出进程,256的提示符。
查看消息队列的命令行:
查看全部ipc对象信息:
#ipcs -a
技术分享
查看消息队列信息
#ipcs -q
查看共享内存信息
#ipcs -m
查看信号量信息
#ipcs -s
删除IPC对象的ipcrm
ipcrm -[smq] ID 或者ipcrm -[SMQ] Key
-q -Q删除消息队列信息 例如ipcrm -q 98307
-m -M删除共享内存信息
-s -S删除信号量信息
如果把两个程序合并到一起,写成一个程序呢?
至于共享内存,信号量和套接字在下文再继续介绍。



























































进程间通信之消息队列通信

概念消息队列消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值消息队列也有管道一样的不足,就是每条消息的最大长度是有上... 查看详情

linux进程间通信--进程,信号,管道,消息队列,信号量,共享内存

Linux进程间通信--进程,信号,管道,消息队列,信号量,共享内存参考:《linux编程从入门到精通》,《LinuxC程序设计大全》,《unix环境高级编程》参考:C和指针学习 说明:本文非常的长,也是为了便于查找和比较,所以放... 查看详情

linux进程间通信之消息队列

我们已经知道进程通信的方式是有多种的,在上一篇博客中讲述了通过管道实现简单的进程间通信,那么接下来我们看看与之类似的另一种方式,通过消息队列来实现进程间通信。什么是消息队列消息队列提供了一种由一个进程... 查看详情

linux系统编程之进程间通信

...;     今天我们接着谈Linux系统编程中的进程间的通信,上一节我们讨论了进程的基本操作。这一节我们来讨论一下进程间的通信。      常见的进程间的通信方式有:无名管道、命名管道、... 查看详情

操作系统之进程间通信的方式都有哪些

参考技术A进程能够单独运行并且完成一些任务,但是也经常免不了和其他进程传输数据或互相通知消息,即需要进行通信,本文将简单介绍一些进程之间相互通信的技术--进程间通信(InterProcessCommunication,IPC)。由于篇幅有限... 查看详情

进程间通信——消息队列

概念消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法,每个数据块都被认为是有一个类型,接收者进程接受的数据块可以有不同的类型值。我们可以通过发送消息来避免命名管道的同步和阻塞问题。消息队列... 查看详情

进程间通信—消息队列

我会用几篇博客总结一下在Linux中进程之间通信的几种方法,我会把这个开头的摘要部分在这个系列的每篇博客中都打出来进程之间通信的方式管道消息队列信号信号量共享存储区套接字(socket)这次主要写的是消息队列,之前... 查看详情

消息队列

Linux进程间通信——使用消息队列下面来说说如何用不用消息队列来进行进程间的通信,消息队列与命名管道有很多相似之处。有关命名管道的更多内容可以参阅我的另一篇文章:Linux进程间通信——使用命名管道 一、什么... 查看详情

进程间通信——消息队列

1、消息队列的简介 消息队列就是在进程之间架设起通信的通道,信息在通道中传递(具有时间先后的),从宏观逻辑上来讲与管道是一致的。即就是消息队列也同样是:(1)、具有入口和出口;(2)、消息从入口到出口,是FIFO的... 查看详情

python之进程同步控制(锁信号量事件)进程间通信——队列和管道(代码片段)

进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)锁——multiprocess.Lock通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控... 查看详情

linux进程间通信的方式都有哪些

第一种:管道通信两个进程利用管道进行通信时,发送信息的进程称为写进程;接收信息的进程称为读进程。管道通信方式的中间介质就是文件,通常称这种文件为管道文件,它就像管道一样将一个写进程和一个读进程连接在一... 查看详情

进程间的几种通信方式的比较和线程间的几种

参考技术A您好,进程间通信方式有管道、信号量、信号、消息队列、共享内存、套接字六种。(1)管道分为有名管道和无名管道,其中无名管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间... 查看详情

linux应用开发第四章linux进程间通信应用开发(代码片段)

文章目录4Linux进程间通信应用开发4.1初识进程4.1.1进程的概念4.1.1.1程序4.1.1.2进程4.1.1.3进程和程序的联系4.1.1.4进程和程序的区别4.1.2进程的操作(创建、结束、回收)4.1.2.1创建进程4.1.2.2结束进程4.1.2.3回收进程4.2进程为什... 查看详情

进程间的通信-队列/管道以及进程间的数据共享和进程池(代码片段)

1.进程之间的通信  1)队列*****  2)管道***2 进程之间的数据共享*3 进程池*****   进程间通信IPC(Inter-ProcessCommunication)进程的概念:创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间... 查看详情

并发编程(队列)(代码片段)

队列介绍进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的创建队列的类(底层就是以管道和锁定的方式实现):Queue([maxsize]):创建共享的进程队列... 查看详情

进程间通信

...信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。2.命名管道FIFO:有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。4.消息队列MessageQueue:消息... 查看详情

linux进程通信之匿名管道(代码片段)

进程间的通信方式  进程间的通信方式包括,管道、共享内存、信号、信号量、消息队列、套接字。进程间通信的目的  进程间通信的主要目的是:数据传输、数据共享、事件通知、资源共享、进程控制等。进程间通信之管... 查看详情

进程间通信(四十七)(代码片段)

...‘True111848True314252True45512True28252True09892‘‘‘ViewCode 进程间的通信:队列,管道进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的队列:(推荐... 查看详情