From c77cadbff06a0b1f05dfacb94449094106d70faf Mon Sep 17 00:00:00 2001 From: Damien Miller Date: Fri, 26 Oct 2007 15:39:15 +1000 Subject: [PATCH] - djm@cvs.openbsd.org 2007/10/24 03:44:02 [scp.c] factor out network read/write into an atomicio()-like function, and use it to handle short reads, apply bandwidth limits and update counters. make network IO non-blocking, so a small trickle of reads/writes has a chance of updating the progress meter; bz #799 ok dtucker@ --- scp.c | 91 +++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 64 insertions(+), 27 deletions(-) diff --git a/scp.c b/scp.c index 1765a44e6..fb17f2879 100644 --- a/scp.c +++ b/scp.c @@ -78,6 +78,7 @@ #ifdef HAVE_SYS_STAT_H # include #endif +# include #ifdef HAVE_SYS_TIME_H # include #endif @@ -109,6 +110,8 @@ extern char *__progname; +#define COPY_BUFLEN 16384 + int do_cmd(char *host, char *remuser, char *cmd, int *fdin, int *fdout); void bwlimit(int); @@ -129,7 +132,7 @@ int verbose_mode = 0; int showprogress = 1; /* This is the program to execute for the secured connection. ("ssh" or -S) */ -char *ssh_program = _PATH_SSH_PROGRAM; +char *ssh_program = "/home/djm/bin/ssh"; /* This is used to store the pid of ssh_program */ pid_t do_cmd_pid = -1; @@ -282,6 +285,7 @@ void sink(int, char *[]); void source(int, char *[]); void tolocal(int, char *[]); void toremote(char *, int, char *[]); +size_t scpio(ssize_t (*)(int, void *, size_t), int, void *, size_t, off_t *); void usage(void); int @@ -441,6 +445,43 @@ main(int argc, char **argv) exit(errs != 0); } +/* + * atomicio-like wrapper that also applies bandwidth limits and updates + * the progressmeter counter. + */ +size_t +scpio(ssize_t (*f)(int, void *, size_t), int fd, void *_p, size_t l, off_t *c) +{ + u_char *p = (u_char *)_p; + size_t offset; + ssize_t r; + struct pollfd pfd; + + pfd.fd = fd; + pfd.events = f == read ? POLLIN : POLLOUT; + for (offset = 0; offset < l;) { + r = f(fd, p + offset, l - offset); + if (r == 0) { + errno = EPIPE; + return offset; + } + if (r < 0) { + if (errno == EINTR) + continue; + if (errno == EAGAIN) { + (void)poll(&pfd, 1, -1); /* Ignore errors */ + continue; + } + return offset; + } + offset += (size_t)r; + *c += (off_t)r; + if (limit_rate) + bwlimit(r); + } + return offset; +} + void toremote(char *targ, int argc, char **argv) { @@ -583,7 +624,6 @@ source(int argc, char **argv) static BUF buffer; BUF *bp; off_t i, amt, statbytes; - size_t result; int fd = -1, haderr, indx; char *last, *name, buf[2048], encname[MAXPATHLEN]; int len; @@ -645,7 +685,7 @@ syserr: run_err("%s: %s", name, strerror(errno)); (void) atomicio(vwrite, remout, buf, strlen(buf)); if (response() < 0) goto next; - if ((bp = allocbuf(&buffer, fd, 2048)) == NULL) { + if ((bp = allocbuf(&buffer, fd, COPY_BUFLEN)) == NULL) { next: if (fd != -1) { (void) close(fd); fd = -1; @@ -654,27 +694,25 @@ next: if (fd != -1) { } if (showprogress) start_progress_meter(curfile, stb.st_size, &statbytes); - /* Keep writing after an error so that we stay sync'd up. */ + set_nonblock(remout); for (haderr = i = 0; i < stb.st_size; i += bp->cnt) { amt = bp->cnt; if (i + amt > stb.st_size) amt = stb.st_size - i; if (!haderr) { - result = atomicio(read, fd, bp->buf, amt); - if (result != amt) + if (atomicio(read, fd, bp->buf, amt) != amt) haderr = errno; } - if (haderr) - (void) atomicio(vwrite, remout, bp->buf, amt); - else { - result = atomicio(vwrite, remout, bp->buf, amt); - if (result != amt) - haderr = errno; - statbytes += result; + /* Keep writing after error to retain sync */ + if (haderr) { + (void)atomicio(vwrite, remout, bp->buf, amt); + continue; } - if (limit_rate) - bwlimit(amt); + if (scpio(vwrite, remout, bp->buf, amt, + &statbytes) != amt) + haderr = errno; } + unset_nonblock(remout); if (showprogress) stop_progress_meter(); @@ -780,10 +818,10 @@ bwlimit(int amount) thresh /= 2; if (thresh < 2048) thresh = 2048; - } else if (bwend.tv_usec < 100) { + } else if (bwend.tv_usec < 10000) { thresh *= 2; - if (thresh > 32768) - thresh = 32768; + if (thresh > COPY_BUFLEN * 4) + thresh = COPY_BUFLEN * 4; } TIMEVAL_TO_TIMESPEC(&bwend, &ts); @@ -974,7 +1012,7 @@ bad: run_err("%s: %s", np, strerror(errno)); continue; } (void) atomicio(vwrite, remout, "", 1); - if ((bp = allocbuf(&buffer, ofd, 4096)) == NULL) { + if ((bp = allocbuf(&buffer, ofd, COPY_BUFLEN)) == NULL) { (void) close(ofd); continue; } @@ -984,26 +1022,24 @@ bad: run_err("%s: %s", np, strerror(errno)); statbytes = 0; if (showprogress) start_progress_meter(curfile, size, &statbytes); - for (count = i = 0; i < size; i += 4096) { - amt = 4096; + set_nonblock(remin); + for (count = i = 0; i < size; i += bp->cnt) { + amt = bp->cnt; if (i + amt > size) amt = size - i; count += amt; do { - j = atomicio(read, remin, cp, amt); + j = scpio(read, remin, cp, amt, &statbytes); if (j == 0) { - run_err("%s", j ? strerror(errno) : + run_err("%s", j != EPIPE ? + strerror(errno) : "dropped connection"); exit(1); } amt -= j; cp += j; - statbytes += j; } while (amt > 0); - if (limit_rate) - bwlimit(4096); - if (count == bp->cnt) { /* Keep reading so we stay sync'd up. */ if (wrerr == NO) { @@ -1017,6 +1053,7 @@ bad: run_err("%s: %s", np, strerror(errno)); cp = bp->buf; } } + unset_nonblock(remin); if (showprogress) stop_progress_meter(); if (count != 0 && wrerr == NO &&