[AIO] add aio support to pipes using aio_down and retry This patch implements aio pipes, and has the standard pipe code use the sync kiocb retry mechanism to complete synchronous read/write calls. The async pipe implementation depends on the earlier aio_down patch. Pipes are fully featured in their support for cancellation and signal handling, but do not current send a SIGPIPE for aio requests, but they do get -EPIPE. Signed-off-by: Benjamin LaHaise diff -purN --exclude=description 59_fix_sync_iocbs/fs/pipe.c 60_pipe_aio/fs/pipe.c --- 59_fix_sync_iocbs/fs/pipe.c 2005-06-20 13:33:32.000000000 -0400 +++ 60_pipe_aio/fs/pipe.c 2005-08-08 17:15:49.000000000 -0400 @@ -46,6 +46,49 @@ void pipe_wait(struct inode * inode) down(PIPE_SEM(*inode)); } +static int pipe_aio_waiter(wait_queue_t *wait, unsigned mode, int sync, + void *key) +{ + struct kiocb *iocb = io_wait_to_kiocb(wait); + + list_del_init(&wait->task_list); + iocb->ki_cancel = NULL; /* We're removed from the wait queue, so our + * cancellation code no longer applies. + */ + kick_iocb(iocb); + return 1; +} + +static int pipe_aio_cancel(struct kiocb *kiocb, struct io_event *event) +{ + struct inode *inode = kiocb->ki_filp->f_dentry->d_inode; + wait_queue_head_t *wq = PIPE_WAIT(*inode); + int ret = 0; + + spin_lock_irq(&wq->lock); + if (kiocb->ki_cancel == pipe_aio_cancel) { + kiocb->ki_cancel = NULL; + list_del_init(&kiocb->ki_wait.task_list); + if (event) { + event->res = -EINTR; + event->res2 = 0; + } + } else + ret = -EAGAIN; + spin_unlock_irq(&wq->lock); + return ret; +} + +static long pipe_aio_wait(struct kiocb *kiocb, struct inode *inode) +{ + kiocb->ki_wait.func = pipe_aio_waiter; + kiocb->ki_cancel = pipe_aio_cancel; + add_wait_queue(PIPE_WAIT(*inode), &kiocb->ki_wait); + aio_up(kiocb, PIPE_SEM(*inode)); + kiocbSetIntr(kiocb); + return -EIOCBRETRY; +} + static inline int pipe_iov_copy_from_user(void *to, struct iovec *iov, unsigned long len) { @@ -115,9 +158,12 @@ static struct pipe_buf_operations anon_p }; static ssize_t -pipe_readv(struct file *filp, const struct iovec *_iov, - unsigned long nr_segs, loff_t *ppos) +pipe_aio_read(struct kiocb *kiocb, char __user *buf, size_t len, loff_t pos) { + struct iovec _iov[2] = {{ .iov_base = (void __user *)buf, .iov_len = len }}; + unsigned long nr_segs = 1; + struct file *filp = kiocb->ki_filp; + struct inode *inode = filp->f_dentry->d_inode; struct pipe_inode_info *info; int do_wakeup; @@ -125,14 +171,25 @@ pipe_readv(struct file *filp, const stru struct iovec *iov = (struct iovec *)_iov; size_t total_len; + /* In retries we need to remove ourself from the wait queue at this + * point. Checking ki_cancel is a convenient way of checking for + * this case, as we clear the cancel operation when the iocb is + * removed from the wait queue. + */ + if (kiocb->ki_cancel == pipe_aio_cancel) + pipe_aio_cancel(kiocb, NULL); + total_len = iov_length(iov, nr_segs); /* Null read succeeds. */ if (unlikely(total_len == 0)) return 0; do_wakeup = 0; - ret = 0; - down(PIPE_SEM(*inode)); + ret = aio_down(kiocb, PIPE_SEM(*inode)); + if (ret) + return ret; + + ret = kiocb->ki_nbytes - kiocb->ki_left; info = inode->i_pipe; for (;;) { int bufs = info->nrbufs; @@ -155,6 +212,8 @@ pipe_readv(struct file *filp, const stru break; } ret += chars; + kiocb->ki_left -= chars; + kiocb->ki_buf += chars; buf->offset += chars; buf->len -= chars; if (!buf->len) { @@ -186,7 +245,7 @@ pipe_readv(struct file *filp, const stru break; } } - if (signal_pending(current)) { + if (is_sync_kiocb(kiocb) && signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } @@ -194,9 +253,9 @@ pipe_readv(struct file *filp, const stru wake_up_interruptible_sync(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT); } - pipe_wait(inode); + return pipe_aio_wait(kiocb, inode); } - up(PIPE_SEM(*inode)); + aio_up(kiocb, PIPE_SEM(*inode)); /* Signal writers asynchronously that there is more room. */ if (do_wakeup) { wake_up_interruptible(PIPE_WAIT(*inode)); @@ -208,16 +267,12 @@ pipe_readv(struct file *filp, const stru } static ssize_t -pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) +pipe_aio_write(struct kiocb *kiocb, const char __user *buf, size_t len, loff_t pos) { - struct iovec iov = { .iov_base = buf, .iov_len = count }; - return pipe_readv(filp, &iov, 1, ppos); -} + struct iovec _iov[2] = {{ .iov_base = (void __user *)buf, .iov_len = len }}; + unsigned long nr_segs = 1; -static ssize_t -pipe_writev(struct file *filp, const struct iovec *_iov, - unsigned long nr_segs, loff_t *ppos) -{ + struct file *filp = kiocb->ki_filp; struct inode *inode = filp->f_dentry->d_inode; struct pipe_inode_info *info; ssize_t ret; @@ -231,13 +286,33 @@ pipe_writev(struct file *filp, const str if (unlikely(total_len == 0)) return 0; + /* In retries we need to remove ourself from the wait queue at this + * point. Checking ki_cancel is a convenient way of checking for + * this case, as we clear the cancel operation when the iocb is + * removed from the wait queue. + */ + if (kiocb->ki_cancel == pipe_aio_cancel) + pipe_aio_cancel(kiocb, NULL); + do_wakeup = 0; - ret = 0; - down(PIPE_SEM(*inode)); + ret = aio_down(kiocb, PIPE_SEM(*inode)); + if (ret) + return ret; + + /* Undo the WRITERS++ done below where we are queued. We use + * kiocb->private to flag if we were waiting, as the higher layers + * initialize it to NULL at the beginning of a request's life. + */ + if (kiocb->ki_user_data) { + PIPE_WAITING_WRITERS(*inode)--; + kiocb->ki_user_data = 0; + } + info = inode->i_pipe; if (!PIPE_READERS(*inode)) { - send_sig(SIGPIPE, current, 0); + if (is_sync_kiocb(kiocb)) + send_sig(SIGPIPE, current, 0); ret = -EPIPE; goto out; } @@ -257,6 +332,8 @@ pipe_writev(struct file *filp, const str do_wakeup = 1; if (error) goto out; + iov->iov_base += chars; + iov->iov_len -= chars; buf->len += chars; total_len -= chars; ret = chars; @@ -267,8 +344,10 @@ pipe_writev(struct file *filp, const str for (;;) { int bufs; + if (!PIPE_READERS(*inode)) { - send_sig(SIGPIPE, current, 0); + if (is_sync_kiocb(kiocb)) + send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; } @@ -304,6 +383,8 @@ pipe_writev(struct file *filp, const str break; } ret += chars; + kiocb->ki_left -= chars; + kiocb->ki_buf += chars; /* Insert it into the buffer array */ buf->page = page; @@ -323,7 +404,7 @@ pipe_writev(struct file *filp, const str if (!ret) ret = -EAGAIN; break; } - if (signal_pending(current)) { + if (is_sync_kiocb(kiocb) && signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } @@ -333,11 +414,11 @@ pipe_writev(struct file *filp, const str do_wakeup = 0; } PIPE_WAITING_WRITERS(*inode)++; - pipe_wait(inode); - PIPE_WAITING_WRITERS(*inode)--; + kiocb->ki_user_data = 1; /* Flag for retry. */ + return pipe_aio_wait(kiocb, inode); } out: - up(PIPE_SEM(*inode)); + aio_up(kiocb, PIPE_SEM(*inode)); if (do_wakeup) { wake_up_interruptible(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN); @@ -347,6 +428,7 @@ out: return ret; } +#if 0 static ssize_t pipe_write(struct file *filp, const char __user *buf, size_t count, loff_t *ppos) @@ -354,6 +436,7 @@ pipe_write(struct file *filp, const char struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count }; return pipe_writev(filp, &iov, 1, ppos); } +#endif static ssize_t bad_pipe_r(struct file *filp, char __user *buf, size_t count, loff_t *ppos) @@ -362,11 +445,23 @@ bad_pipe_r(struct file *filp, char __use } static ssize_t +bad_pipe_aio_r(struct kiocb *iocb, char __user *buf, size_t count, loff_t pos) +{ + return -EBADF; +} + +static ssize_t bad_pipe_w(struct file *filp, const char __user *buf, size_t count, loff_t *ppos) { return -EBADF; } +static ssize_t +bad_pipe_aio_w(struct kiocb *iocb, const char __user *buf, size_t count, loff_t pos) +{ + return -EBADF; +} + static int pipe_ioctl(struct inode *pino, struct file *filp, unsigned int cmd, unsigned long arg) @@ -565,8 +660,8 @@ pipe_rdwr_open(struct inode *inode, stru */ struct file_operations read_fifo_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, + .read = do_sync_read, + .aio_read = pipe_aio_read, .write = bad_pipe_w, .poll = fifo_poll, .ioctl = pipe_ioctl, @@ -578,8 +673,9 @@ struct file_operations read_fifo_fops = struct file_operations write_fifo_fops = { .llseek = no_llseek, .read = bad_pipe_r, - .write = pipe_write, - .writev = pipe_writev, + .write = do_sync_write, + .aio_read = bad_pipe_aio_r, + .aio_write = pipe_aio_write, .poll = fifo_poll, .ioctl = pipe_ioctl, .open = pipe_write_open, @@ -589,10 +685,10 @@ struct file_operations write_fifo_fops = struct file_operations rdwr_fifo_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, - .write = pipe_write, - .writev = pipe_writev, + .read = do_sync_read, + .write = do_sync_write, + .aio_read = pipe_aio_read, + .aio_write = pipe_aio_write, .poll = fifo_poll, .ioctl = pipe_ioctl, .open = pipe_rdwr_open, @@ -602,9 +698,10 @@ struct file_operations rdwr_fifo_fops = struct file_operations read_pipe_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, + .read = do_sync_read, .write = bad_pipe_w, + .aio_read = pipe_aio_read, + .aio_write = bad_pipe_aio_w, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_read_open, @@ -615,8 +712,9 @@ struct file_operations read_pipe_fops = struct file_operations write_pipe_fops = { .llseek = no_llseek, .read = bad_pipe_r, - .write = pipe_write, - .writev = pipe_writev, + .write = do_sync_write, + .aio_read = bad_pipe_aio_r, + .aio_write = pipe_aio_write, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_write_open, @@ -626,10 +724,10 @@ struct file_operations write_pipe_fops = struct file_operations rdwr_pipe_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, - .write = pipe_write, - .writev = pipe_writev, + .read = do_sync_read, + .write = do_sync_write, + .aio_read = pipe_aio_read, + .aio_write = pipe_aio_write, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_rdwr_open,