/* Copyright 2000-2001 Red Hat, Inc. All Rights Reserved. * Written by Benjamin LaHaise * This file may be distributed under the terms of the * GNU Public License v2 or later, but is provided with NO WARRANTY. * * asyncbuf2: aio test program that provides an elastic buffer between * stdin and stdout. * 2005-06-23: bcrl: modified to use native libaio interface. */ #include #include #include #include #include #include #include #include #define BUFSIZE (4 * 1024 * 1024) struct stream { char buffer[BUFSIZE]; size_t buf_bytes; loff_t roff; loff_t woff; unsigned rpos; unsigned wpos; int in_fd; int out_fd; int write_active, read_active; int in_done, out_done; int read_active, write_active; struct iocb read_iocb; struct iocb write_iocb; }; /* borrowed from linux/list.h */ #define struct_from_entry(ptr, type, member) \ ((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member))) void read_callback(io_context_t ctx, struct iocb *iocb, long res, long res2) { struct stream *s = struct_from_entry(iocb, struct stream, read_iocb); if (res < 0) { fprintf(stderr, "write: %ld (%s) %ld\n", res, strerror(-res), res2); exit(1); } else if (res) { s->rpos += res; } else s->in_done = 1; s->read_active = 0; } void write_callback(io_context_t ctx, struct iocb *iocb, long res, long res2) { struct stream *s = struct_from_entry(iocb, struct stream, write_iocb); if (res < 0) { fprintf(stderr, "write: %ld (%s) %ld\n", res, strerror(-res), res2); exit(1); } else if (res) { s->wpos += res; } else s->out_done = 1; s->write_active = 0; } int main (int argc, char *argv[]) { struct stream stream, *s = &stream; int i; ssize_t ret; time_t then = time(NULL); int page_size = getpagesize(); memset(&stream, 0, sizeof stream); //buffer = foobuffer + page_size - ((unsigned long)foobuffer & (page_size - 1)); if (argc == 3) { fprintf(stderr, "using O_SYNC\n"); s->in_fd = open(argv[1], O_RDONLY); if (s->in_fd < 0) { perror("open() input"); return 1; } s->out_fd = open(argv[2], O_WRONLY | O_SYNC | O_CREAT, 0644); if (s->out_fd < 0) { perror("open() output"); return 1; } } else if (argv != 1) { fprintf(stderr, "usage: asyncbuf infile outfile or asyncbuf\n"); return 2; } do { if (!s->write_active && s->buf_bytes) { unsigned nbytes = s->buf_bytes; if ((nbytes >> 2) > WRITESIZE) nbytes >>= 2; if (wpos + write_iocb.aio_nbytes >= BUFSIZE) s->write_iocb.aio_nbytes = BUFSIZE - wpos; io_prep_pwrite(&s->write_iocb, s->out_fd, s->buffer + s->wpos, s->buf_bytes, s->woff); do ret = aio_write(&write_iocb); while (ret && errno == EINTR); if (ret) goto write_err; s->write_active = 1; } if (read_wait && (BUFSIZE - buf_bytes) >= MINREADSPACE) read_wait = 0; if (!read_wait && !read_active && buf_bytes < BUFSIZE && !saw_eof) { read_iocb.aio_sigevent.sigev_notify = SIGEV_NONE; read_iocb.aio_offset = roff; read_iocb.aio_fildes = in_fd; read_iocb.aio_buf = buffer + rpos; read_iocb.aio_nbytes = BUFSIZE - buf_bytes; if (read_iocb.aio_nbytes > READSIZE) read_iocb.aio_nbytes = READSIZE; if (rpos + read_iocb.aio_nbytes >= BUFSIZE) read_iocb.aio_nbytes = BUFSIZE - rpos; do ret = aio_read(&read_iocb); while (ret && errno == EINTR); if (ret) goto read_err; read_active = 1; } i = 0; if (write_active) aio_list[i++] = &write_iocb; if (read_active) aio_list[i++] = &read_iocb; ret = aio_suspend(&aio_list, i, NULL); if (ret) goto err; if (read_active && (ret = aio_error(&read_iocb)) != EINPROGRESS) { read_active = 0; if (ret) goto read_err; ret = aio_return(&read_iocb); if (ret < 0) goto read_err; buf_bytes += ret; roff += ret; rpos += ret; rpos %= BUFSIZE; if (!ret) saw_eof = 1; if (buf_bytes == BUFSIZE) read_wait = 1; } if (write_active && (ret = aio_error(&write_iocb)) != EINPROGRESS) { write_active = 0; if (ret) goto write_err; ret = aio_return(&write_iocb); if (ret < 0) goto write_err; buf_bytes -= ret; woff += ret; wpos += ret; wpos %= BUFSIZE; if (!ret) break; } if (time(NULL) - then >= 1) { then = time(NULL); fprintf(stderr, "\r%3ld%%", (long)(buf_bytes * 100) / BUFSIZE); } #if 0 fprintf(stderr, "wpos: %ld rpos: %ld buf_bytes: %ld saw_eof: %d i: %d\n", wpos, rpos, buf_bytes, saw_eof, i); #endif } while (i); return 0; err: perror("aio_suspend"); fprintf(stderr, "ret: %d\n", ret); return 1; read_err: perror("aio_read"); fprintf(stderr, "ret: %d\n", ret); return 1; write_err: perror("aio_write"); fprintf(stderr, "ret: %d\n", ret); return 1; }