- djm@cvs.openbsd.org 2007/10/24 03:44:02

[scp.c]
     factor out network read/write into an atomicio()-like function, and
     use it to handle short reads, apply bandwidth limits and update
     counters. make network IO non-blocking, so a small trickle of
     reads/writes has a chance of updating the progress meter; bz #799
     ok dtucker@
This commit is contained in:
Damien Miller 2007-10-26 15:39:15 +10:00
parent 89437edafd
commit c77cadbff0
1 changed files with 64 additions and 27 deletions

91
scp.c
View File

@ -78,6 +78,7 @@
#ifdef HAVE_SYS_STAT_H #ifdef HAVE_SYS_STAT_H
# include <sys/stat.h> # include <sys/stat.h>
#endif #endif
# include <sys/poll.h>
#ifdef HAVE_SYS_TIME_H #ifdef HAVE_SYS_TIME_H
# include <sys/time.h> # include <sys/time.h>
#endif #endif
@ -109,6 +110,8 @@
extern char *__progname; extern char *__progname;
#define COPY_BUFLEN 16384
int do_cmd(char *host, char *remuser, char *cmd, int *fdin, int *fdout); int do_cmd(char *host, char *remuser, char *cmd, int *fdin, int *fdout);
void bwlimit(int); void bwlimit(int);
@ -129,7 +132,7 @@ int verbose_mode = 0;
int showprogress = 1; int showprogress = 1;
/* This is the program to execute for the secured connection. ("ssh" or -S) */ /* This is the program to execute for the secured connection. ("ssh" or -S) */
char *ssh_program = _PATH_SSH_PROGRAM; char *ssh_program = "/home/djm/bin/ssh";
/* This is used to store the pid of ssh_program */ /* This is used to store the pid of ssh_program */
pid_t do_cmd_pid = -1; pid_t do_cmd_pid = -1;
@ -282,6 +285,7 @@ void sink(int, char *[]);
void source(int, char *[]); void source(int, char *[]);
void tolocal(int, char *[]); void tolocal(int, char *[]);
void toremote(char *, int, char *[]); void toremote(char *, int, char *[]);
size_t scpio(ssize_t (*)(int, void *, size_t), int, void *, size_t, off_t *);
void usage(void); void usage(void);
int int
@ -441,6 +445,43 @@ main(int argc, char **argv)
exit(errs != 0); exit(errs != 0);
} }
/*
* atomicio-like wrapper that also applies bandwidth limits and updates
* the progressmeter counter.
*/
size_t
scpio(ssize_t (*f)(int, void *, size_t), int fd, void *_p, size_t l, off_t *c)
{
u_char *p = (u_char *)_p;
size_t offset;
ssize_t r;
struct pollfd pfd;
pfd.fd = fd;
pfd.events = f == read ? POLLIN : POLLOUT;
for (offset = 0; offset < l;) {
r = f(fd, p + offset, l - offset);
if (r == 0) {
errno = EPIPE;
return offset;
}
if (r < 0) {
if (errno == EINTR)
continue;
if (errno == EAGAIN) {
(void)poll(&pfd, 1, -1); /* Ignore errors */
continue;
}
return offset;
}
offset += (size_t)r;
*c += (off_t)r;
if (limit_rate)
bwlimit(r);
}
return offset;
}
void void
toremote(char *targ, int argc, char **argv) toremote(char *targ, int argc, char **argv)
{ {
@ -583,7 +624,6 @@ source(int argc, char **argv)
static BUF buffer; static BUF buffer;
BUF *bp; BUF *bp;
off_t i, amt, statbytes; off_t i, amt, statbytes;
size_t result;
int fd = -1, haderr, indx; int fd = -1, haderr, indx;
char *last, *name, buf[2048], encname[MAXPATHLEN]; char *last, *name, buf[2048], encname[MAXPATHLEN];
int len; int len;
@ -645,7 +685,7 @@ syserr: run_err("%s: %s", name, strerror(errno));
(void) atomicio(vwrite, remout, buf, strlen(buf)); (void) atomicio(vwrite, remout, buf, strlen(buf));
if (response() < 0) if (response() < 0)
goto next; goto next;
if ((bp = allocbuf(&buffer, fd, 2048)) == NULL) { if ((bp = allocbuf(&buffer, fd, COPY_BUFLEN)) == NULL) {
next: if (fd != -1) { next: if (fd != -1) {
(void) close(fd); (void) close(fd);
fd = -1; fd = -1;
@ -654,27 +694,25 @@ next: if (fd != -1) {
} }
if (showprogress) if (showprogress)
start_progress_meter(curfile, stb.st_size, &statbytes); start_progress_meter(curfile, stb.st_size, &statbytes);
/* Keep writing after an error so that we stay sync'd up. */ set_nonblock(remout);
for (haderr = i = 0; i < stb.st_size; i += bp->cnt) { for (haderr = i = 0; i < stb.st_size; i += bp->cnt) {
amt = bp->cnt; amt = bp->cnt;
if (i + amt > stb.st_size) if (i + amt > stb.st_size)
amt = stb.st_size - i; amt = stb.st_size - i;
if (!haderr) { if (!haderr) {
result = atomicio(read, fd, bp->buf, amt); if (atomicio(read, fd, bp->buf, amt) != amt)
if (result != amt)
haderr = errno; haderr = errno;
} }
if (haderr) /* Keep writing after error to retain sync */
(void) atomicio(vwrite, remout, bp->buf, amt); if (haderr) {
else { (void)atomicio(vwrite, remout, bp->buf, amt);
result = atomicio(vwrite, remout, bp->buf, amt); continue;
if (result != amt)
haderr = errno;
statbytes += result;
} }
if (limit_rate) if (scpio(vwrite, remout, bp->buf, amt,
bwlimit(amt); &statbytes) != amt)
haderr = errno;
} }
unset_nonblock(remout);
if (showprogress) if (showprogress)
stop_progress_meter(); stop_progress_meter();
@ -780,10 +818,10 @@ bwlimit(int amount)
thresh /= 2; thresh /= 2;
if (thresh < 2048) if (thresh < 2048)
thresh = 2048; thresh = 2048;
} else if (bwend.tv_usec < 100) { } else if (bwend.tv_usec < 10000) {
thresh *= 2; thresh *= 2;
if (thresh > 32768) if (thresh > COPY_BUFLEN * 4)
thresh = 32768; thresh = COPY_BUFLEN * 4;
} }
TIMEVAL_TO_TIMESPEC(&bwend, &ts); TIMEVAL_TO_TIMESPEC(&bwend, &ts);
@ -974,7 +1012,7 @@ bad: run_err("%s: %s", np, strerror(errno));
continue; continue;
} }
(void) atomicio(vwrite, remout, "", 1); (void) atomicio(vwrite, remout, "", 1);
if ((bp = allocbuf(&buffer, ofd, 4096)) == NULL) { if ((bp = allocbuf(&buffer, ofd, COPY_BUFLEN)) == NULL) {
(void) close(ofd); (void) close(ofd);
continue; continue;
} }
@ -984,26 +1022,24 @@ bad: run_err("%s: %s", np, strerror(errno));
statbytes = 0; statbytes = 0;
if (showprogress) if (showprogress)
start_progress_meter(curfile, size, &statbytes); start_progress_meter(curfile, size, &statbytes);
for (count = i = 0; i < size; i += 4096) { set_nonblock(remin);
amt = 4096; for (count = i = 0; i < size; i += bp->cnt) {
amt = bp->cnt;
if (i + amt > size) if (i + amt > size)
amt = size - i; amt = size - i;
count += amt; count += amt;
do { do {
j = atomicio(read, remin, cp, amt); j = scpio(read, remin, cp, amt, &statbytes);
if (j == 0) { if (j == 0) {
run_err("%s", j ? strerror(errno) : run_err("%s", j != EPIPE ?
strerror(errno) :
"dropped connection"); "dropped connection");
exit(1); exit(1);
} }
amt -= j; amt -= j;
cp += j; cp += j;
statbytes += j;
} while (amt > 0); } while (amt > 0);
if (limit_rate)
bwlimit(4096);
if (count == bp->cnt) { if (count == bp->cnt) {
/* Keep reading so we stay sync'd up. */ /* Keep reading so we stay sync'd up. */
if (wrerr == NO) { if (wrerr == NO) {
@ -1017,6 +1053,7 @@ bad: run_err("%s: %s", np, strerror(errno));
cp = bp->buf; cp = bp->buf;
} }
} }
unset_nonblock(remin);
if (showprogress) if (showprogress)
stop_progress_meter(); stop_progress_meter();
if (count != 0 && wrerr == NO && if (count != 0 && wrerr == NO &&