🔍
📢

Linux进程间通信实例 - 管道(pipe)、有名管道(FIFO)、信号(signal)、消息队列、共享内存、信号量、套接字(socket)

linux常见进程通信方式包括: 管道(pipe)、有名管道(FIFO)、信号(signal)、消息队列、共享内存、信号量、套接字(socket)。
管道 管道是单向、先进先出的无结构的字节流。用于父子进程之间的通信。关键系统调用如下: int pipe( int fd[2] );fd数组用于返回两个fd,分别表示通道的两端。 int main(){ int pid; int fd[2]; if(pipe(fd)<0){//父进程创建管道 perror("Fail to pipe"); exit(EXIT_FAILURE); } if((pid=fork())<0){ perror("Fail to fork"); exit(EXIT_FAILURE); }else if(pid == 0){ close(fd[1]);//表示管道的方向,fd[1]用于写 child_read_pipe(fd[0]);//子进程读取管道 }else{ close(fd[0]);//fd[0]用于读 father_write_pipe(fd[1]);//父进程写入管道 } }
有名管道 有名管道以设备文件的形式存在,可被任意知道名字的进程使用,而不止在只有亲缘关系的进程之间。 要使用有名管道,必须先建立它,并与他的一段相连,才能打开进行读写。当文件不再需要时,要显示删除。系统调用: int mknod( char *pathname, mode_t mode, dev_t dev); server端,创建管道: #define FIFO_FILE "MYFIFO" //有名管道server端 int main(void) { FILE *fp; char readbuf[80]; /* Create the FIFO if it does not exist */ umask(0); mknod(FIFO_FILE, S_IFIFO|0666, 0); while(1) { fp = fopen(FIFO_FILE, "r"); fgets(readbuf, 80, fp); printf("Received string: %s\n", readbuf); fclose(fp); } return(0); } client端: //有名管道client端 int main(int argc, char *argv[]) { FILE *fp; if ( argc != 2 ) { printf("USAGE: fifoclient [string]\n"); exit(1); } if((fp = fopen(FIFO_FILE, "w")) == NULL) { perror("fopen"); exit(1); } fputs(argv[1], fp); fclose(fp); return(0); } 有名管道创建后,以设备文件形式存在,标志位有p。 信号 一个进程发出信号,另一个进程捕获此信号并作出动作。比如linux下ctrl+z快捷键终止进程,其实是向进程发送了SIGINT信号,进程捕获该信号并终止。 常用信号如下: SIGALRM 由alarm函数的定时器产生。 SIGHUP SIGHUP和控制台操作有关,当控制台被关闭时系统会向拥有控制台sessionID的所有进程发送HUP信号,默认HUP信号的action是 exit,如果远程登陆启动某个服务进程并在程序运行时关闭连接的话会导致服务进程退出,所以一般服务进程都会用nohup工具启动或写成一个 daemon。 SIGINT 键盘中断信号,比ctrl+z SIGKILL 强制kill进程,不可被捕获或忽略。 SIGPIPE Broken Pipe,向通道写时没有对应的读进程。 SIGTERM 要求进程结束运行,linux系统关机时会发送此信号,kill命令默认也是发送此信号。 SIGUSER1 SIGUSER2 进程之间用这两个信号通信。 注册信号处理函数: int sigaction(int signum, const struct sigaction *act,struct sigaction *oldact); signum为信号,act为包含新的信号处理函数,oldact保存旧的信号处理函数。 sigaction结构体定义如下: struct sigaction { void (*sa_handler)(int); void (*sa_sigaction)(int, siginfo_t *, void *); sigset_t sa_mask; int sa_flags; void (*sa_restorer)(void); }; sa_handler和sa_sigaction为信号处理函数,一般以联合体的形式存在,由sa_handler指定的处理函数只有一个参数,即信号值,所以信号不能传递除信号值之外的任何信息;由sa_sigaction指定的信号处理函数带有三个参数,是为实时信号而设的(当然同样支持非实时信号),它指定一个3参数信号处理函数。第一个参数为信号值,第三个参数没有使用(posix没有规范使用该参数的标准),第二个参数是指向siginfo_t结构的指针,结构中包含信号携带的数据值。 siginfo_t结构如下,siginfo_t结构中的si_value要么持有一个4字节的整数值,要么持有一个指针,这就构成了与信号相关的数据 typedef struct { int si_signo; int si_errno; int si_code; union sigval si_value; } siginfo_t; union sigval { int sival_int; void *sival_ptr; } sigset_t信号集用来描述信号的集合,linux所支持的所有信号可以全部或部分的出现在信号集中,主要与信号阻塞相关函数配合使用。sa_mask指定在信号处理程序执行过程中,哪些信号应当被阻塞。缺省情况下当前信号本身被阻塞,防止信号的嵌套发送,除非指定SA_NODEFER或者SA_NOMASK标志位。 sa_flags中包含了许多标志位,比较重要的标志位是SA_SIGINFO,表示信号处理函数是用sa_handler还是sa_sigaction。 指定信号处理函数实例如下: void ouch(int sig) { printf("\nOUCH! - I got signal %d\n", sig); } int main() { struct sigaction act; act.sa_handler = ouch; //创建空的信号屏蔽字,即不屏蔽任何信息 sigemptyset(&act.sa_mask); //使sigaction函数重置为默认行为 act.sa_flags = SA_RESETHAND; sigaction(SIGINT, &act, 0); while(1) { printf("Hello World!\n"); sleep(1); } return 0; } 发送信号系统调用: int kill(pid_t pid, int sig); alarm函数在经过预定时间后向发送一个SIGALRM信号,seconds为0表示所有已设置的闹钟请求。 unsigned int alarm(unsigned int seconds); static int alarm_fired = 0; void ouch(int sig) { alarm_fired = 1; } int main() { //关联信号处理函数 signal(SIGALRM, ouch); //调用alarm函数,5秒后发送信号SIGALRM alarm(5); //挂起进程 pause(); //接收到信号后,恢复正常执行 if(alarm_fired == 1) printf("Receive a signal %d\n", SIGALRM); exit(0); } 消息队列 消息队列是存在于内核的链表结构。消息含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。消息队列的消息不能超过4056 bytes。 消息队列相关的系统调用如下: int msgget(key_t key, int msgflg); 创建或获取队列,key用来标识队列,msgflag表示消息队列的访问权限,和文件的访问权限一样。msgflg可以与IPC_CREAT做或操作,表示当key所命名的消息队列不存在时创建一个消息队列,如果key所命名的消息队列存在时,IPC_CREAT标志会被忽略,而只返回一个标识符。 int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg); 发送消息,msgid表示消息队列标识,msg_ptr表示消息指针,msg_ptr所指向结构体第一个成员必须为长整型消息类型,如下所示,mtext除可为char数组类型外,还可为任意其它类型 struct msgbuf { long mtype; /* type of message */ char mtext[1]; /* message text */ }; msg_sz表示消息长度,不包括类型字段,msgflagmsgflg用于控制当前消息队列满或队列消息到达系统范围的限制时将要发生的事情。 int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg); 接收消息,msgtype>0表示接收消息的类型,msgtype为0表示不分类型接收第一个消息。如果它小于零,就获取类型等于或小于msgtype的绝对值的第一个消息。其它参数和msgsend类似。 int msgctl(int msgid, int command, struct msgid_ds *buf); msgctl用于控制消息队列,每个消息队列在内核中存在相应的数据结构msgid_ds,其结构如下: /* one msqid structure for each queue on the system */ struct msqid_ds { struct ipc_perm msg_perm; struct msg *msg_first; /* first message on queue */ struct msg *msg_last; /* last message in queue */ time_t msg_stime; /* last msgsnd time */ time_t msg_rtime; /* last msgrcv time */ time_t msg_ctime; /* last change time */ struct wait_queue *wwait; struct wait_queue *rwait; ushort msg_cbytes; ushort msg_qnum; ushort msg_qbytes; /* max number of bytes on queue */ ushort msg_lspid; /* pid of last msgsnd */ ushort msg_lrpid; /* last receive pid */ }; command是将要采取的动作,它可以取3个值: IPC_STAT 获取消息队列在内核中的msgid_ds结构,并存储到buf中。 IPC_SET 设置消息队列的ipc_perm成员 IPC_RMID 从内核中移除消息队列。 接受者: struct msg_st { long int msg_type; char text[BUFSIZ]; }; int main() { int running = 1; int msgid = -1; struct msg_st data; long int msgtype = 0; //注意1 //建立消息队列 msgid = msgget((key_t)1234, 0666 | IPC_CREAT); if(msgid == -1) { fprintf(stderr, "msgget failed with error: %d\n", errno); exit(EXIT_FAILURE); } //从队列中获取消息,直到遇到end消息为止 while(running) { if(msgrcv(msgid, (void*)&data, BUFSIZ, msgtype, 0) == -1) { fprintf(stderr, "msgrcv failed with errno: %d\n", errno); exit(EXIT_FAILURE); } printf("You wrote: %s\n",data.text); //遇到end结束 if(strncmp(data.text, "end", 3) == 0) running = 0; } //删除消息队列 if(msgctl(msgid, IPC_RMID, 0) == -1) { fprintf(stderr, "msgctl(IPC_RMID) failed\n"); exit(EXIT_FAILURE); } exit(EXIT_SUCCESS); } 发送者: #define MAX_TEXT 512 struct msg_st { long int msg_type; char text[MAX_TEXT]; }; int main() { int running = 1; struct msg_st data; char buffer[BUFSIZ]; int msgid = -1; //建立消息队列 msgid = msgget((key_t)1234, 0666 | IPC_CREAT); if(msgid == -1) { fprintf(stderr, "msgget failed with error: %d\n", errno); exit(EXIT_FAILURE); } //向消息队列中写消息,直到写入end while(running) { //输入数据 printf("Enter some text: "); fgets(buffer, BUFSIZ, stdin); data.msg_type = 1; //注意2 strcpy(data.text, buffer); //向队列发送数据 if(msgsnd(msgid, (void*)&data, MAX_TEXT, 0) == -1) { fprintf(stderr, "msgsnd failed\n"); exit(EXIT_FAILURE); } //输入end结束输入 if(strncmp(buffer, "end", 3) == 0) running = 0; sleep(1); } exit(EXIT_SUCCESS); } 共享内存 不同进程之间共享的内存通常安排为同一段物理内存。进程可以将同一段共享内存连接到它们自己的地址空间中,所有进程都可以访问共享内存中的地址。共享内存并未提供同步机制,也就是说,在第一个进程结束对共享内存的写操作之前,并无自动机制可以阻止第二个进程开始对它进行读取。所以我们通常需要用其他的机制来同步对共享内存的访问,例如前面说到的信号量。 系统调用如下: int shmget(key_t key, size_t size, int shmflg); key为共享内存段命名,size以字节为单位指定需要共享的内存容量,shmflg是权限标志,它的作用与open函数的mode参数一样,将其与IPC_CREAT做或操作表示共享内存不存在则创建。共享内存的权限标志与文件的读写权限一样。 void *shmat(int shm_id, const void *shm_addr, int shmflg); 启动对该共享内存的访问,并把共享内存连接到当前进程的地址空间。shm_addr指定共享内存连接到当前进程中的地址位置,通常为空,表示让系统来选择共享内存的地址。shm_flg是一组标志位,通常为0。调用成功时返回一个指向共享内存第一个字节的指针,如果调用失败返回-1. int shmdt(const void *shmaddr); 将共享内存从当前进程中分离。 int shmctl(int shm_id, int command, struct shmid_ds *buf);和消息队列含义相似。 读进程: int main() { int running = 1;//程序是否继续运行的标志 void *shm = NULL;//分配的共享内存的原始首地址 struct shared_use_st *shared;//指向shm int shmid;//共享内存标识符 //创建共享内存 shmid = shmget((key_t)1234, sizeof(struct shared_use_st), 0666|IPC_CREAT); if(shmid == -1) { fprintf(stderr, "shmget failed\n"); exit(EXIT_FAILURE); } //将共享内存连接到当前进程的地址空间 shm = shmat(shmid, 0, 0); if(shm == (void*)-1) { fprintf(stderr, "shmat failed\n"); exit(EXIT_FAILURE); } printf("\nMemory attached at %X\n", (int)shm); //设置共享内存 shared = (struct shared_use_st*)shm; shared->written = 0; while(running)//读取共享内存中的数据 { //没有进程向共享内存定数据有数据可读取 if(shared->written != 0) { printf("You wrote: %s", shared->text); sleep(rand() % 3); //读取完数据,设置written使共享内存段可写 shared->written = 0; //输入了end,退出循环(程序) if(strncmp(shared->text, "end", 3) == 0) running = 0; } else//有其他进程在写数据,不能读取数据 sleep(1); } //把共享内存从当前进程中分离 if(shmdt(shm) == -1) { fprintf(stderr, "shmdt failed\n"); exit(EXIT_FAILURE); } //删除共享内存 if(shmctl(shmid, IPC_RMID, 0) == -1) { fprintf(stderr, "shmctl(IPC_RMID) failed\n"); exit(EXIT_FAILURE); } exit(EXIT_SUCCESS); } 写进程: int main() { int running = 1; void *shm = NULL; struct shared_use_st *shared = NULL; char buffer[BUFSIZ + 1];//用于保存输入的文本 int shmid; //创建共享内存 shmid = shmget((key_t)1234, sizeof(struct shared_use_st), 0666|IPC_CREAT); if(shmid == -1) { fprintf(stderr, "shmget failed\n"); exit(EXIT_FAILURE); } //将共享内存连接到当前进程的地址空间 shm = shmat(shmid, (void*)0, 0); if(shm == (void*)-1) { fprintf(stderr, "shmat failed\n"); exit(EXIT_FAILURE); } printf("Memory attached at %X\n", (int)shm); //设置共享内存 shared = (struct shared_use_st*)shm; while(running)//向共享内存中写数据 { //数据还没有被读取,则等待数据被读取,不能向共享内存中写入文本 while(shared->written == 1) { sleep(1); printf("Waiting...\n"); } //向共享内存中写入数据 printf("Enter some text: "); fgets(buffer, BUFSIZ, stdin); strncpy(shared->text, buffer, TEXT_SZ); //写完数据,设置written使共享内存段可读 shared->written = 1; //输入了end,退出循环(程序) if(strncmp(buffer, "end", 3) == 0) running = 0; } //把共享内存从当前进程中分离 if(shmdt(shm) == -1) { fprintf(stderr, "shmdt failed\n"); exit(EXIT_FAILURE); } sleep(2); exit(EXIT_SUCCESS); } 信号量 信号量用来协调对共享资源的操作,只允许对它进行等待(即P(信号变量))和发送(即V(信号变量))信息操作,且对信号量的操作都是原子操作。 系统调用: int semget(key_t key, int num_sems, int sem_flags); num_sems表示信号量数目。其它两个参数和消息队列的msgget函数参数含义相似。 int semop(int sem_id, struct sembuf *sem_opa, size_t num_sem_ops); 改变信号量的值,sem_id是由semget返回的信号量标识符,sem_opa指向信号量集操作的数组。sembuf结构的定义如下: struct sembuf{ short sem_num;//除非使用一组信号量,否则它为0 short sem_op;//信号量在一次操作中需要改变的数据,-1,即P(等待)操作+1,即V(发送信号)操作,0表示sleep直到信号量的值为0,即资源得到百分百利用。 short sem_flg;//通常为SEM_UNDO,使操作系统跟踪信号,并在进程没有释放该信号量而终止时,操作系统释放信号量;IPC_NOWAIT得不到资源立刻返回。 }; int semctl ( int semid, int semnum, int cmd, union semun arg ); 控制信号量信息。semnum表示semid信号量集中的哪一个信号,cmd代表对信号的操作, IPC_STAT/IPC_SET/IPC_RMID等。 /* arg for semctl system calls. */ union semun { int val; /* value for SETVAL */ struct semid_ds *buf; /* buffer for IPC_STAT & IPC_SET */ ushort *array; /* array for GETALL & SETALL */ struct seminfo *__buf; /* buffer for IPC_INFO */ void *__pad; }; 两个进程打印字符的实例,竞争打到控制台: int main(int argc, char *argv[]) { char message = 'X'; int i = 0; //创建信号量 sem_id = semget((key_t)1234, 1, 0666 | IPC_CREAT); if(argc > 1) { //程序第一次被调用,初始化信号量 if(!set_semvalue()) { fprintf(stderr, "Failed to initialize semaphore\n"); exit(EXIT_FAILURE); } //设置要输出到屏幕中的信息,即其参数的第一个字符 message = argv[1][0]; sleep(2); } for(i = 0; i < 10; ++i) { //进入临界区 if(!semaphore_p()) exit(EXIT_FAILURE); //向屏幕中输出数据 printf("%c", message); //清理缓冲区,然后休眠随机时间 fflush(stdout); sleep(rand() % 3); //离开临界区前再一次向屏幕输出数据 printf("%c", message); fflush(stdout); //离开临界区,休眠随机时间后继续循环 if(!semaphore_v()) exit(EXIT_FAILURE); sleep(rand() % 2); } sleep(10); printf("\n%d - finished\n", getpid()); if(argc > 1) { //如果程序是第一次被调用,则在退出前删除信号量 sleep(3); del_semvalue(); } exit(EXIT_SUCCESS); } static int set_semvalue() { //用于初始化信号量,在使用信号量前必须这样做 union semun sem_union; sem_union.val = 1; if(semctl(sem_id, 0, SETVAL, sem_union) == -1) return 0; return 1; } static void del_semvalue() { //删除信号量 union semun sem_union; if(semctl(sem_id, 0, IPC_RMID, sem_union) == -1) fprintf(stderr, "Failed to delete semaphore\n"); } static int semaphore_p() { //对信号量做减1操作,即等待P(sv) struct sembuf sem_b; sem_b.sem_num = 0; sem_b.sem_op = -1;//P() sem_b.sem_flg = SEM_UNDO; if(semop(sem_id, &sem_b, 1) == -1) { fprintf(stderr, "semaphore_p failed\n"); return 0; } return 1; } static int semaphore_v() { //这是一个释放操作,它使信号量变为可用,即发送信号V(sv) struct sembuf sem_b; sem_b.sem_num = 0; sem_b.sem_op = 1;//V() sem_b.sem_flg = SEM_UNDO; if(semop(sem_id, &sem_b, 1) == -1) { fprintf(stderr, "semaphore_v failed\n"); return 0; } return 1; } 各ipc方式比较 管道用于具有亲缘关系的进程间通信,有名管道的每个管道具有名字,使没有亲缘关系的进程间也可以通信。 信号是比较复杂的通信方式,用于通知接受进程有某种事件发生,除了用于进程间通信外,进程还可以发送信号给进程本身。 消息队列是消息的链接表,消息队列克服了信号承载信息量少,管道只能承载无格式字节流以及缓冲区大小受限等缺点。 共享内存使得多个进程可以访问同一块内存空间,是最快的可用IPC形式,针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。 信号量(semaphore)主要作为进程间以及同一进程不同线程之间的同步手段。 套接口(Socket)更为一般的进程间通信机制,可用于不同机器之间的进程间通信。

本文源码地址 - https://github.com/killianxu/ipc_example