Pipe Programming

ไปป์ (Pipe)

หลายๆครั้งที่ผู้ใช้งานระบบปฏิบัติการลีนุกซ์จะต้องมีการใช้คำสั่งมากกว่าหนึ่งคำสั่งเพื่อต้องการที่จะกรองข้อมูลที่ต้องการ โดยเชื่อมชุดคำสั่งเหล่านั้นด้วยเครื่องหมาย “|” หรือเรียกว่าไปป์ (Pipe) ดังตัวอย่างข้างล่าง ซึ่งผลลัพธ์ของคำสั่งด้านซ้ายจะส่งเข้าเป็นอินพุทต่อไปของคำสั่งด้านขวาอย่างนี้ไปเรื่อยๆจนกระทั่งถึงคำสั่งสุดท้ายผลลัพธ์ก็จะแสดงออกหน้าจอในที่สุด

$  echo "Hello World" | awk '{print $1}'
Hello

ท่อ (Pipe) เป็นกระบวนการที่ใช้ในการติดต่อระหว่างโปรเซสที่ง่ายที่สุดคือการนำผลลัพธ์ที่ได้จากโปรแกรมหนึ่งไปเป็นอินพุทของอีกโปรแกรมหนึ่ง โดยไปป์จะเป็นตัวกลางในการส่งข้อมูลระหว่างโปรเซสซึ่งการส่งข้อมูลจะเป็นแบบทิศทางเดียว (unidirectional) ดังรูป

โดยเมื่อโปรเซสหนึ่งต้องการสร้างไปป์ขึ้นมา ขบวนการของเคอร์เนลก็จะทำการสร้าง file descriptor (fd) ขึ้นมาสองไฟล์สำหรับใช้ในการสร้าง pipe ระหว่างโปรเซสและเคอร์เนล โดยไฟล์ fd ตัวแรกจะถูกใช้เป็นเส้นทางการป้อนข้อมูลหรือเขียนข้อมูลเข้าไปใน pipe (write) ส่วนไฟล์ fd ตัวที่สองจะใช้เป็นตัวรับข้อมูลที่ได้มาจาก pipe (read) ทำให้โปรเซสนั้นสามารถใช้ pipe ในการส่งข้อมูลไปมาให้กันเองภายในโปรเซสได้ ดังแสดงในรูปข้างล่าง

จากรูปข้างต้นแสดงให้เห็นว่าเมื่อไฟล์ fd ทั้งสอง (pfd0 และ pfd1) ถูกเชื่อมถึงกันแล้ว เมื่อโปรเซสต้องการที่จะส่งข้อมูลผ่าน pipe (pfd1) ข้อมูลก็จะถูกส่งผ่านเคอร์เนลที่สร้างเป็น pipe ขึ้นมาแล้วผ่านข้อมูลนั้นต่อไปยังอีกด้านของ pipe เพื่อกลายเป็นอินพุทของโปรเซสมายังไฟล์ pfd0 ดังแสดงในรูปข้างล่าง

ในการใช้งานจริงนั้น pipe จะนำมาใช้มากในการสื่อสารข้อมูลระหว่างโปรเซสแม่และโปรเซสลูก เนื่องจากโปรเซสจะได้รับสืบทอด (inherit) ในทรัพยากรต่างๆจากโปรเซสแม่ เช่น file descriptors ทั้งหลายที่โปรเซสแม่ได้สร้างเอาไว้ทันที ดังแสดงในรูปข้างล่าง

จากรูปข้างต้นจะสังเกตเห็นว่าทั้งสองโปรเซสมีการเข้าใช้งานไฟล์ pfd0 และ pfd1 ดังนั้นเพื่อให้การสื่อสารระหว่างทั้งสองอยู่ในกฏเกณฑ์ของ pipe คือแบบทิศทางเดียว (unidirectional) จะต้องกำหนดว่าโปรเซสใดเป็นคนส่งและโปรเซสใดเป็นคนรับ ดังตัวอย่างข้างล่างแสดงให้เห็นถึงตัวโปรเซสลูกจะทำหน้าที่ส่งข้อมูล (write) ผ่าน pipe ไปยังโปร-เซสแม่ (read)

แต่อย่างไรก็ตามก็สามารถสร้างการสื่อสารข้อมูลในอยู่ในแบบสองทิศทางได้ (bi-directional) โดยการเพิ่ม pipe ขึ้นมาอีกท่อ โดยกำหนดให้โปรเซสแม่เป็นคนส่งข้อมูล (write) ไปยังโปรเซสลูก (read) ได้เช่นกัน ซึ่ง system call ที่ใช้ในการเขียนและอ่านข้อมูลก็ใช้ชื่อฟังก์ชันว่า write() และ read() ตรงๆได้ทันที ยกเว้นไม่สามารถใช้ฟังก์ชัน lseek() ได้กับ file descriptor ของ pipe ได้ ดังนั้นถ้าต้องการใช้โปรเซสสามารถสื่อสารกันได้ทั้งสองทิศทาง (bi-directional) จะต้องใช้ pipe จำนวน n*(n-1) โดยที่ n คือจำนวนโปรเซส

ฟังก์ชันและตัวแปรที่เกี่ยวข้อง

  • ไลบรารีที่เกี่ยวข้องคือ unistd.h

  • ฟังก์ชัน int pipe(int *fd_couple)

    • ทำหน้าที่สร้าง pipe และเก็บรายละเอียดของ file descriptors ทั้งสอง

การพัฒนาโปรแกรมสื่อสารด้วยวิธีการสตรีมข้อมูลระหว่างโพรเซสชนิดทางเดียวนี้จะใช้ฟังก์ชัน pipe ที่อยู่ภายในไฟล์ไลบรารีชื่อว่า unistd.h

#include   <unistd.h>
int pipe ( int pfd[2] );  <---  pfd ตัวแปร array สำหรับเก็บค่า file descriptors                                                                                
                                          pfd[0] - file descriptor สำหรับการอ่าน
                                          pfd[1] - file descriptor สำหรับการเขียน               

สถานะการทำงานของ pipe

0 - ดำเนินการได้สำเร็จ

-1 - การดำเนินการล้มเหลว

จากรูปข้างบนแสดงการทำงานของฟังก์ชัน pipe ซึ่งจะสร้างช่องทางสื่อสารชนิดทางเดียวระหว่างโปรเซสโดยจะคืนค่า file descriptor ทั้งสองฝั่ง ผ่านตัวแปร pfd[] กล่าวคือ ปลายหนึ่งสำหรับอ่าน (pfd[0]) และอีกปลายหนึ่งสำหรับเขียน (pfd[1]) ค่าของ file descriptor เป็นชนิดจำนวนเต็ม (int) ที่ระบบปฏิบัติการลีนุกซ์จะใช้ในการอ้างอิงถึงแฟ้มที่มีการเปิดใช้งานดังนั้นเมื่อเรียกใช้งานฟังก์ชัน pipe จะต้องส่งอาเรย์ของจำนวนเต็มที่มีสมาชิก 2 ตัวให้แก่ ฟังก์ชัน pipe()

 int  pfd[2];       // file descriptor ของ pipe

ตัวอย่างที่ 1

แสดงตัวอย่างการสร้าง pipe โดยใช้คำสั่ง pipe() เพื่อเป็นท่อเชื่อมระหว่างโปรเซสแม่และโปรเซสลูก โดยที่โปรเซสลูกจะทำการส่งข้อมูลไปยังโปรเซสแม่ โดยการเขียนลง pfd[1] ดังรูปข้างล่าง

// pipe1.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int main(int argc, char *argv[]) {
	pid_t pid;
	int pfd[2];
	int ret;
	char buf[20];

	ret = pipe(pfd);
	if (ret == -1) {
		perror("pipe");
		exit(1);
	}
	pid = fork();
	if (pid == 0) {
		/* Child Process */
		printf("Child Process\n");
		write(pfd[1], "Hello there!", 12);
	} else {
		/* Parent Process */
		printf("Parent Process\n");
		read(pfd[0], buf, 15);
		printf("buf : %s\n", buf);
	}
	return 0;
}

คอมไพล์โปรแกรม pipe.c และทดสอบการส่งข้อมูลระหว่างโปรเซสทั้งสอง (โปรเซส parent และโปรเซส child)

$ gcc -o pipe1 pipe1.c 
$ ./pipe1 
Parent Process
Child Process
buf : Hello there!

ฟังก์ชันและตัวแปรที่เกี่ยวข้อง

นอกจากนั้นการสร้าง pipe ยังสามารถสร้างไฟล์ pipe สำหรับชุดคำสั่งโปรเซสที่ต้องการโดยใช้ฟังก์ชัน popen() ซึ่งภายในฟังก์ชัน popen() จะมีการเรียกฟังก์ชัน pipe() อยู่ภายใน และจะต้องระบุด้วยว่าจะให้ชุดคำสั่งโปรเซสนี้เป็นตัวรับข้อมูล (write) หรือส่งข้อมูล (read) ให้ชัดเจน โดยชุดคำสั่งโปรเซสจะถูกนำไปรันอยู่บน shell ของระบบอีกทอดหนึ่ง ดังนั้นเมื่อมีการสร้าง pipe ด้วยคำสั่ง popen ก็จะต้องใช้ฟังก์ชัน pclose() เพื่อทำลาย pipe ที่สร้างขึ้นมา

  • ฟังก์ชัน FILE *popen(char *command, char *type)

  • ฟังก์ชัน int pclose(FILE *stream)

ตัวอย่างที่ 2

แสดงตัวอย่างการสร้าง pipe สำหรับชุดคำสั่ง sort ที่จะรับข้อมูล (write) จากค่าใช้ตัวแปรอาเรย์ เพื่อทำการจัดเรียงข้อมูล

/*****************************************************************************
 Excerpt from "Linux Programmer's Guide - Chapter 6"
 (C)opyright 1994-1995, Scott Burkett
 ***************************************************************************** 
 Filename: pipe2.c
 *****************************************************************************/

#include <stdio.h>
#include <stdlib.h>

#define MAXSTRS 5

int main(void) {
	int cntr;
	FILE *pipe_fp;
	char *strings[MAXSTRS] = { "echo", "bravo", "alpha", "charlie", "delta" };

	/* Create one way pipe line with call to popen() */
	if ((pipe_fp = popen("sort", "w")) == NULL) {
		perror("popen");
		exit(1);
	}

	/* Processing loop */
	for (cntr = 0; cntr < MAXSTRS; cntr++) {
		fputs(strings[cntr], pipe_fp);
		fputc('\n', pipe_fp);
	}

	/* Close the pipe */
	pclose(pipe_fp);

	return (0);
}
$ gcc -o pipe2 pipe2.c -Wall
$ ./pipe2 
alpha
bravo
charlie
delta
echo

เนื่องจากฟังก์ชัน popen() จะใช้ shell ในการรันคำสั่งที่ระบุไปให้ ดังนั้นการใช้ชุดคำสั่งก็สามารถทำได้เหมือนกับการพิมพ์คำสั่งบน shell (command line) ได้เช่นกัน ตัวอย่างเช่น

popen("ls /usr/include", "r");
popen("sort > /tmp/foo", "w");
popen("sort | uniq | more", "w");

ตัวอย่างที่ 3

แสดงตัวอย่างการใช้ popen() เพื่อรับข้อมูลที่อ่านได้จากคำสั่ง ls แล้วส่งต่อไปเป็นอินพุทให้กับอีกคำสั่ง (sort) เพื่อทำการจัดเรียงข้อมูลต่อไป

#include <stdio.h>
#include <stdlib.h>

int main(void) {
	FILE *pipein_fp, *pipeout_fp;
	char readbuf[80];

	/* Create one way pipe line with call to popen() */
	if ((pipein_fp = popen("ls /", "r")) == NULL) {
		perror("popen");
		exit(1);
	}

	/* Create one way pipe line with call to popen() */
	if ((pipeout_fp = popen("sort", "w")) == NULL) {
		perror("popen");
		exit(1);
	}

	/* Processing loop */
	while (fgets(readbuf, 80, pipein_fp))
		fputs(readbuf, pipeout_fp);

	/* Close the pipes */
	pclose(pipein_fp);
	pclose(pipeout_fp);

	return (0);
}
$ ls /
NAS   courseweb_backup  etc  initrd.img.old  lib64  mnt   root  selinux  tftpboot  var  bin   dev  home   lib    lost+found  opt   run   srv   tmp   vmlinuz  boot  ee_backup   initrd.img  lib32   media   proc  sbin  sys   usr   vmlinuz.old

$ gcc -o pipe3 pipe3.c -Wall
$ ./pipe3 
NAS
bin
boot
courseweb_backup
dev
ee_backup
etc
home
initrd.img
initrd.img.old
lib
lib32
lib64
lost+found
media
mnt
opt
proc
root
run
sbin
selinux
srv
sys
tftpboot
tmp
usr
var
vmlinuz
vmlinuz.old

ตัวอย่างที่ 4

แสดงตัวอย่างการรับอาร์กิวเมนต์จากผู้ใช้ผ่านการเรียกโปรแกรมจาก command line โดยอาร์กิวเมนต์ตัวแรกจะเป็นคำสั่งที่ต้องการให้รับข้อมูล (write) และอาร์กิวเมนต์ที่สองจะเป็นไฟล์ข้อมูลที่ต้องการจะให้เปิดอ่าน (rt)

/*****************************************************************************
 Excerpt from "Linux Programmer's Guide - Chapter 6"
 (C)opyright 1994-1995, Scott Burkett
 ***************************************************************************** 
 Filename: popen.c
 *****************************************************************************/

#include <stdio.h>
#include <stdlib.h>

int main(int argc, char *argv[]) {
	FILE *pipe_fp, *infile;
	char readbuf[80];

	if (argc != 3) {
		fprintf(stderr, "USAGE: popen3 [command] [filename]\n");
		exit(1);
	}

	/* Open up input file */
	if ((infile = fopen(argv[2], "rt")) == NULL) {
		perror("fopen");
		exit(1);
	}

	/* Create one way pipe line with call to popen() */
	if ((pipe_fp = popen(argv[1], "w")) == NULL) {
		perror("popen");
		exit(1);
	}

	/* Processing loop */
	do {
		fgets(readbuf, 80, infile);
		if (feof(infile))
			break;

		fputs(readbuf, pipe_fp);
	} while (!feof(infile));

	fclose(infile);
	pclose(pipe_fp);

	return (0);
}
$ gcc -o popen popen.c -Wall

$ ./popen sort popen.c
$ ./popen cat popen.c
$ ./popen more popen.c
$ ./popen cat popen.c | grep main

ตัวอย่างที่ 5

แสดงตัวอย่างการประยุกต์การส่งข้อความ (message) ระหว่างโปรเซสต่างๆ โดยโปรเซสที่ i ต้องการส่งไปยังโปรเซสที่ j โดยถ้าโปรเซสใดรับข้อความเพียงสองครั้งแล้วก็จะสิ้นสุดการทำงาน

/*
 *  pipe4.c
 *  
 *  A set of processes randomly messaging each the other, with pipes.
 *
 *
 *  Created by Mij <mij@bitchx.it> on 05/01/05.
 *  Original source file available on http://mij.oltrelinux.com/devel/unixprg/
 *
 */

#include <stdio.h>
/* for read() and write() */
#include <sys/types.h>
#include <sys/uio.h>
/* for strlen and others */
#include <string.h>
/* for pipe() */
#include <unistd.h>
/* for [s]random() */
#include <stdlib.h>
/* for time() [seeding srandom()] */
#include <time.h>
/* for signals */
#include <signal.h>

#define PROCS_NUM               15          /* 1 < number of processes involved <= 255 */
#define MAX_PAYLOAD_LENGTH      50          /* message length */
#define DEAD_PROC               -1          /* a value to mark a dead process' file descriptors with */

/*      ***                     DATA TYPES                  ***     */
/* a process address */
typedef char proc_addr;

/* a message */
struct message_s {
	proc_addr src_id;
	short int length;
	char *payload;
};
/*      ***                     FUNCTION PROTOTYPES         ***     */
/* send message to process with id dest */
int send_proc_message(proc_addr dest, char *message);
/* receive a message in the process' queue of received ones */
int receive_proc_message(struct message_s *msg);
/* mark process file descriptors closed */
void mark_proc_closed(proc_addr process);

/*              ***             GLOBAL VARS                 ***     */
/* they are OK to be global here. */
proc_addr my_address; /* stores the id of the process */
int proc_pipes[PROCS_NUM][2]; /* stores the pipes of every process involved */

int main(int argc, char *argv[]) {
	pid_t child_pid;
	pid_t my_children[PROCS_NUM]; /* PIDs of the children */
	int i, ret;
	char msg_text[MAX_PAYLOAD_LENGTH]; /* payload of the message to send */
	proc_addr msg_recipient;
	struct message_s msg;

	/* create a pipe for me (the parent) */
	pipe(proc_pipes[0]);

	/* initializing proc_pipes struct */
	for (i = 1; i < PROCS_NUM; i++) {
		/* creating one pipe for every (future) process */
		ret = pipe(proc_pipes[i]);
		if (ret) {
			perror("Error creating pipe");
			abort();
		}
	}

	/* fork [1..NUM_PROCS] children. 0 is me. */
	for (i = 1; i < PROCS_NUM; i++) {
		/* setting the child address */
		my_address = my_address + 1;

		child_pid = fork();
		if (!child_pid) {
			/* child */
			sleep(1);

			/* closing other process' pipes read ends */
			for (i = 0; i < PROCS_NUM; i++) {
				if (i != my_address)
					close(proc_pipes[i][0]);
			}

			/* init random num generator */
			srandom(time(NULL));

			/* my_address is now my address, and will hereby become a "constant" */
			/* producing some message for the other processes */
			while (random() % (2 * PROCS_NUM)) {
				/* interleaving... */
				sleep((unsigned int) (random() % 2));

				/* choosing a random recipient (including me) */
				msg_recipient = (proc_addr) (random() % PROCS_NUM);

				/* preparing and sending the message */
				sprintf(msg_text, "hello from process %u.", (int) my_address);
				ret = send_proc_message(msg_recipient, msg_text);
				if (ret > 0) {
					/* message has been correctly sent */
					printf("    --> %d: sent message to %u\n", my_address,
							msg_recipient);
				} else {
					/* the child we tried to message does no longer exist */
					mark_proc_closed(msg_recipient);
					printf("    --> %d: recipient %u is no longer available\n",
							my_address, msg_recipient);
				}
			}

			/* now, reading the first 2 messages we've been sent */
			for (i = 0; i < 2; i++) {
				ret = receive_proc_message(&msg);
				if (ret < 0)
					break;
				printf(
						"<--     Process %d, received message from %u: \"%s\".\n",
						my_address, msg.src_id, msg.payload);
			};

			/* i'm exiting. making my pipe widowed */
			close(proc_pipes[my_address][0]);

			printf("# %d: i am exiting.\n", my_address);
			exit(0);
		}

		/* saving the child pid (for future killing) */
		my_children[my_address] = child_pid;

		/* parent. I don't need the read descriptor of the pipe */
		close(proc_pipes[my_address][0]);

		/* this is for making srandom() consistent */
		sleep(1);
	}

	/* expecting the user request to terminate... */
	printf("Please press ENTER when you like me to flush the children...\n");
	getchar();

	printf("Ok, terminating dandling processes...\n");
	/* stopping freezed children */
	for (i = 1; i < PROCS_NUM; i++) {
		kill(my_children[i], SIGTERM);
	}
	printf("Done. Exiting.\n");

	return 0;
}

int send_proc_message(proc_addr dest, char *message) {
	int ret;
	char *msg = (char*) malloc(sizeof(message) + 2);

	/* the write should be atomic. Doing our best */
	msg[0] = (char) dest;
	memcpy((void*) &(msg[1]), (void*) message, strlen(message) + 1);

	/* send message, including the "header" the trailing '\0' */
	ret = write(proc_pipes[dest][1], msg, strlen(msg) + 2);
	free(msg);

	return ret;
}

int receive_proc_message(struct message_s *msg) {
	char c = 'x';
	char temp_string[MAX_PAYLOAD_LENGTH];
	int ret, i = 0;

	/* first, getting the message sender */
	ret = read(proc_pipes[my_address][0], &c, 1);
	if (ret == 0) {
		return 0;
	}
	msg->src_id = (proc_addr) c;

	do {
		ret = read(proc_pipes[my_address][0], &c, 1);
		temp_string[i++] = c;
	} while ((ret > 0) && (c != '\0') && (i < MAX_PAYLOAD_LENGTH));

	if (c == '\0') {
		/* msg correctly received. Preparing message packet */

		msg->payload = (char*) malloc(strlen(temp_string) + 1);
		strncpy(msg->payload, temp_string, strlen(temp_string) + 1);

		return 0;
	}

	return -1;
}

void mark_proc_closed(proc_addr process) {
	proc_pipes[process][0] = DEAD_PROC;
	proc_pipes[process][1] = DEAD_PROC;
}
$ gcc --ansi --pedantic -o pipe4 pipe4.c

$ ./pipe4 
    --> 1: sent message to 3
    --> 1: sent message to 8
    --> 2: sent message to 0
    --> 1: sent message to 5
    --> 2: sent message to 4
    --> 1: sent message to 10
    --> 2: sent message to 9
    --> 2: sent message to 0
    --> 3: sent message to 11
    --> 2: sent message to 6
    --> 1: sent message to 2
    --> 2: sent message to 12
    --> 3: sent message to 14
<--     Process 4, received message from 4: "hello from process 2.".
    --> 3: sent message to 13
    --> 1: sent message to 3
    --> 1: sent message to 9
    --> 1: sent message to 12
    --> 1: sent message to 3
    --> 2: sent message to 6
    --> 3: sent message to 7
<--     Process 5, received message from 5: "hello from process 1.".
    --> 2: sent message to 12
    --> 2: sent message to 5
    --> 1: sent message to 7
    --> 1: sent message to 4
<--     Process 4, received message from 0: "hello from process 1.".
# 4: i am exiting.
    --> 1: sent message to 12
    --> 1: sent message to 2
<--     Process 5, received message from 0: "hello from process 2.".
# 5: i am exiting.
    --> 2: sent message to 14
    --> 2: sent message to 1
    --> 2: sent message to 10
    --> 1: sent message to 8
    --> 1: sent message to 0
    --> 1: sent message to 0
    --> 6: sent message to 13
    --> 6: sent message to 8
    --> 6: sent message to 14
    --> 8: sent message to 14
    --> 6: sent message to 14
    --> 8: sent message to 8
    --> 8: sent message to 11
    --> 9: sent message to 14
    --> 10: sent message to 0
^C

Last updated

Assoc. Prof. Wiroon Sriborrirux, Founder of Advance Innovation Center (AIC) and Bangsaen Design House (BDH), Electrical Engineering Department, Faculty of Engineering, Burapha University