From de7115b373ba0be3861c65de9b606a3e0e9d29a3 Mon Sep 17 00:00:00 2001 From: "djm@openbsd.org" Date: Sat, 7 Aug 2021 00:02:41 +0000 Subject: [PATCH] upstream: support for "cross"-loading files/directories, i.e. downloading from one SFTP server while simultaneously uploading to another. feedback & ok markus@ OpenBSD-Commit-ID: 3982878e29d8df0fa4ddc502f5ff6126ac714235 --- sftp-client.c | 425 +++++++++++++++++++++++++++++++++++++++++++++++++- sftp-client.h | 19 ++- 2 files changed, 442 insertions(+), 2 deletions(-) diff --git a/sftp-client.c b/sftp-client.c index d16b0c444..9f777e43f 100644 --- a/sftp-client.c +++ b/sftp-client.c @@ -1,4 +1,4 @@ -/* $OpenBSD: sftp-client.c,v 1.144 2021/08/07 00:01:29 djm Exp $ */ +/* $OpenBSD: sftp-client.c,v 1.145 2021/08/07 00:02:41 djm Exp $ */ /* * Copyright (c) 2001-2004 Damien Miller * @@ -2048,6 +2048,429 @@ upload_dir(struct sftp_conn *conn, const char *src, const char *dst, return ret; } +static void +handle_dest_replies(struct sftp_conn *to, const char *to_path, int synchronous, + u_int *nreqsp, u_int *write_errorp) +{ + struct sshbuf *msg; + u_char type; + u_int id, status; + int r; + struct pollfd pfd; + + if ((msg = sshbuf_new()) == NULL) + fatal_f("sshbuf_new failed"); + + /* Try to eat replies from the upload side */ + while (*nreqsp > 0) { + debug3_f("%u outstanding replies", *nreqsp); + if (!synchronous) { + /* Bail out if no data is ready to be read */ + pfd.fd = to->fd_in; + pfd.events = POLLIN; + if ((r = poll(&pfd, 1, 0)) == -1) { + if (errno == EINTR) + break; + fatal_f("poll: %s", strerror(errno)); + } else if (r == 0) + break; /* fd not ready */ + } + sshbuf_reset(msg); + get_msg(to, msg); + + if ((r = sshbuf_get_u8(msg, &type)) != 0 || + (r = sshbuf_get_u32(msg, &id)) != 0) + fatal_fr(r, "dest parse"); + debug3("Received dest reply T:%u I:%u R:%u", type, id, *nreqsp); + if (type != SSH2_FXP_STATUS) { + fatal_f("Expected SSH2_FXP_STATUS(%d) packet, got %d", + SSH2_FXP_STATUS, type); + } + if ((r = sshbuf_get_u32(msg, &status)) != 0) + fatal_fr(r, "parse dest status"); + debug3("dest SSH2_FXP_STATUS %u", status); + if (status != SSH2_FX_OK) { + /* record first error */ + if (*write_errorp == 0) + *write_errorp = status; + } + /* + * XXX this doesn't do full reply matching like do_upload and + * so cannot gracefully truncate terminated uploads at a + * high-water mark. ATM the only caller of this function (scp) + * doesn't support transfer resumption, so this doesn't matter + * a whole lot. + * + * To be safe, do_crossload truncates the destination file to + * zero length on upload failure, since we can't trust the + * server not to have reordered replies that could have + * inserted holes where none existed in the source file. + * + * XXX we could get a more accutate progress bar if we updated + * the counter based on the reply from the destination... + */ + (*nreqsp)--; + } + debug3_f("done: %u outstanding replies", *nreqsp); +} + +int +do_crossload(struct sftp_conn *from, struct sftp_conn *to, + const char *from_path, const char *to_path, + Attrib *a, int preserve_flag) +{ + struct sshbuf *msg; + int write_error, read_error, lmodified = 0, r; + u_int64_t offset = 0, size; + u_int id, buflen, num_req, max_req, status = SSH2_FX_OK; + u_int num_upload_req; + off_t progress_counter; + u_char *from_handle, *to_handle; + size_t from_handle_len, to_handle_len; + struct request { + u_int id; + size_t len; + u_int64_t offset; + TAILQ_ENTRY(request) tq; + }; + TAILQ_HEAD(reqhead, request) requests; + struct request *req; + u_char type; + + TAILQ_INIT(&requests); + + if (a == NULL && (a = do_stat(from, from_path, 0)) == NULL) + return -1; + + if ((a->flags & SSH2_FILEXFER_ATTR_PERMISSIONS) && + (!S_ISREG(a->perm))) { + error("Cannot download non-regular file: %s", from_path); + return(-1); + } + if (a->flags & SSH2_FILEXFER_ATTR_SIZE) + size = a->size; + else + size = 0; + + buflen = from->download_buflen; + if (buflen > to->upload_buflen) + buflen = to->upload_buflen; + + /* Send open request to read side */ + if (send_open(from, from_path, "origin", SSH2_FXF_READ, NULL, + &from_handle, &from_handle_len) != 0) + return -1; + + /* Send open request to write side */ + a->flags &= ~SSH2_FILEXFER_ATTR_SIZE; + a->flags &= ~SSH2_FILEXFER_ATTR_UIDGID; + a->perm &= 0777; + if (!preserve_flag) + a->flags &= ~SSH2_FILEXFER_ATTR_ACMODTIME; + if (send_open(to, to_path, "dest", + SSH2_FXF_WRITE|SSH2_FXF_CREAT|SSH2_FXF_TRUNC, a, + &to_handle, &to_handle_len) != 0) { + do_close(from, from_handle, from_handle_len); + return -1; + } + + /* Read from remote "from" and write to remote "to" */ + offset = 0; + write_error = read_error = num_req = num_upload_req = 0; + max_req = 1; + progress_counter = 0; + + if (showprogress && size != 0) + start_progress_meter(from_path, size, &progress_counter); + if ((msg = sshbuf_new()) == NULL) + fatal_f("sshbuf_new failed"); + while (num_req > 0 || max_req > 0) { + u_char *data; + size_t len; + + /* + * Simulate EOF on interrupt: stop sending new requests and + * allow outstanding requests to drain gracefully + */ + if (interrupted) { + if (num_req == 0) /* If we haven't started yet... */ + break; + max_req = 0; + } + + /* Send some more requests */ + while (num_req < max_req) { + debug3("Request range %llu -> %llu (%d/%d)", + (unsigned long long)offset, + (unsigned long long)offset + buflen - 1, + num_req, max_req); + req = xcalloc(1, sizeof(*req)); + req->id = from->msg_id++; + req->len = buflen; + req->offset = offset; + offset += buflen; + num_req++; + TAILQ_INSERT_TAIL(&requests, req, tq); + send_read_request(from, req->id, req->offset, + req->len, from_handle, from_handle_len); + } + + /* Try to eat replies from the upload side (nonblocking) */ + handle_dest_replies(to, to_path, 0, + &num_upload_req, &write_error); + + sshbuf_reset(msg); + get_msg(from, msg); + if ((r = sshbuf_get_u8(msg, &type)) != 0 || + (r = sshbuf_get_u32(msg, &id)) != 0) + fatal_fr(r, "parse"); + debug3("Received origin reply T:%u I:%u R:%d", + type, id, max_req); + + /* Find the request in our queue */ + for (req = TAILQ_FIRST(&requests); + req != NULL && req->id != id; + req = TAILQ_NEXT(req, tq)) + ; + if (req == NULL) + fatal("Unexpected reply %u", id); + + switch (type) { + case SSH2_FXP_STATUS: + if ((r = sshbuf_get_u32(msg, &status)) != 0) + fatal_fr(r, "parse status"); + if (status != SSH2_FX_EOF) + read_error = 1; + max_req = 0; + TAILQ_REMOVE(&requests, req, tq); + free(req); + num_req--; + break; + case SSH2_FXP_DATA: + if ((r = sshbuf_get_string(msg, &data, &len)) != 0) + fatal_fr(r, "parse data"); + debug3("Received data %llu -> %llu", + (unsigned long long)req->offset, + (unsigned long long)req->offset + len - 1); + if (len > req->len) + fatal("Received more data than asked for " + "%zu > %zu", len, req->len); + lmodified = 1; + + /* Write this chunk out to the destination */ + sshbuf_reset(msg); + if ((r = sshbuf_put_u8(msg, SSH2_FXP_WRITE)) != 0 || + (r = sshbuf_put_u32(msg, to->msg_id++)) != 0 || + (r = sshbuf_put_string(msg, to_handle, + to_handle_len)) != 0 || + (r = sshbuf_put_u64(msg, req->offset)) != 0 || + (r = sshbuf_put_string(msg, data, len)) != 0) + fatal_fr(r, "compose write"); + send_msg(to, msg); + debug3("Sent message SSH2_FXP_WRITE I:%u O:%llu S:%zu", + id, (unsigned long long)offset, len); + num_upload_req++; + progress_counter += len; + free(data); + + if (len == req->len) { + TAILQ_REMOVE(&requests, req, tq); + free(req); + num_req--; + } else { + /* Resend the request for the missing data */ + debug3("Short data block, re-requesting " + "%llu -> %llu (%2d)", + (unsigned long long)req->offset + len, + (unsigned long long)req->offset + + req->len - 1, num_req); + req->id = from->msg_id++; + req->len -= len; + req->offset += len; + send_read_request(from, req->id, + req->offset, req->len, + from_handle, from_handle_len); + /* Reduce the request size */ + if (len < buflen) + buflen = MAXIMUM(MIN_READ_SIZE, len); + } + if (max_req > 0) { /* max_req = 0 iff EOF received */ + if (size > 0 && offset > size) { + /* Only one request at a time + * after the expected EOF */ + debug3("Finish at %llu (%2d)", + (unsigned long long)offset, + num_req); + max_req = 1; + } else if (max_req < from->num_requests) { + ++max_req; + } + } + break; + default: + fatal("Expected SSH2_FXP_DATA(%u) packet, got %u", + SSH2_FXP_DATA, type); + } + } + + if (showprogress && size) + stop_progress_meter(); + + /* Drain replies from the server (blocking) */ + debug3_f("waiting for %u replies from destination", num_upload_req); + handle_dest_replies(to, to_path, 1, &num_upload_req, &write_error); + + /* Sanity check */ + if (TAILQ_FIRST(&requests) != NULL) + fatal("Transfer complete, but requests still in queue"); + /* Truncate at 0 length on interrupt or error to avoid holes at dest */ + if (read_error || write_error || interrupted) { + debug("truncating \"%s\" at 0", to_path); + do_close(to, to_handle, to_handle_len); + free(to_handle); + if (send_open(to, to_path, "dest", + SSH2_FXF_WRITE|SSH2_FXF_CREAT|SSH2_FXF_TRUNC, a, + &to_handle, &to_handle_len) != 0) { + error("truncation failed for \"%s\"", to_path); + to_handle = NULL; + } + } + if (read_error) { + error("Couldn't read from origin file \"%s\" : %s", + from_path, fx2txt(status)); + status = -1; + do_close(from, from_handle, from_handle_len); + if (to_handle != NULL) + do_close(to, to_handle, to_handle_len); + } else if (write_error) { + error("Couldn't write to \"%s\": %s", + to_path, fx2txt(write_error)); + status = SSH2_FX_FAILURE; + do_close(from, from_handle, from_handle_len); + if (to_handle != NULL) + do_close(to, to_handle, to_handle_len); + } else { + if (do_close(from, from_handle, from_handle_len) != 0 || + interrupted) + status = -1; + else + status = SSH2_FX_OK; + if (to_handle != NULL) { + /* Need to resend utimes after write */ + if (preserve_flag) + do_fsetstat(to, to_handle, to_handle_len, a); + do_close(to, to_handle, to_handle_len); + } + } + sshbuf_free(msg); + free(from_handle); + free(to_handle); + + return status == SSH2_FX_OK ? 0 : -1; +} + +static int +crossload_dir_internal(struct sftp_conn *from, struct sftp_conn *to, + const char *from_path, const char *to_path, + int depth, Attrib *dirattrib, int preserve_flag, int print_flag) +{ + int i, ret = 0; + SFTP_DIRENT **dir_entries; + char *filename, *new_from_path = NULL, *new_to_path = NULL; + mode_t mode = 0777; + + if (depth >= MAX_DIR_DEPTH) { + error("Maximum directory depth exceeded: %d levels", depth); + return -1; + } + + if (dirattrib == NULL && + (dirattrib = do_stat(from, from_path, 1)) == NULL) { + error("Unable to stat remote directory \"%s\"", from_path); + return -1; + } + if (!S_ISDIR(dirattrib->perm)) { + error("\"%s\" is not a directory", from_path); + return -1; + } + if (print_flag) + mprintf("Retrieving %s\n", from_path); + + dirattrib->flags &= ~SSH2_FILEXFER_ATTR_SIZE; + dirattrib->flags &= ~SSH2_FILEXFER_ATTR_UIDGID; + if (dirattrib->flags & SSH2_FILEXFER_ATTR_PERMISSIONS) { + mode = dirattrib->perm & 01777; + dirattrib->perm = mode | (S_IWUSR|S_IXUSR); /* temp */ + } else { + debug("Server did not send permissions for " + "directory \"%s\"", to_path); + } + if (do_mkdir(to, to_path, dirattrib, print_flag) != 0) + return -1; + + if (do_readdir(from, from_path, &dir_entries) == -1) { + error("%s: Failed to get directory contents", from_path); + return -1; + } + + for (i = 0; dir_entries[i] != NULL && !interrupted; i++) { + free(new_from_path); + free(new_to_path); + + filename = dir_entries[i]->filename; + new_from_path = path_append(from_path, filename); + new_to_path = path_append(to_path, filename); + + if (S_ISDIR(dir_entries[i]->a.perm)) { + if (strcmp(filename, ".") == 0 || + strcmp(filename, "..") == 0) + continue; + if (crossload_dir_internal(from, to, + new_from_path, new_to_path, + depth + 1, &(dir_entries[i]->a), preserve_flag, + print_flag) == -1) + ret = -1; + } else if (S_ISREG(dir_entries[i]->a.perm) ) { + if (do_crossload(from, to, new_from_path, new_to_path, + &(dir_entries[i]->a), preserve_flag) == -1) { + error("Transfer of file %s to %s failed", + new_from_path, new_to_path); + ret = -1; + } + } else + logit("%s: not a regular file\n", new_from_path); + + } + free(new_to_path); + free(new_from_path); + + dirattrib->perm = mode; /* original mode */ + do_setstat(to, to_path, dirattrib); + + free_sftp_dirents(dir_entries); + + return ret; +} + +int +crossload_dir(struct sftp_conn *from, struct sftp_conn *to, + const char *from_path, const char *to_path, + Attrib *dirattrib, int preserve_flag, int print_flag) +{ + char *from_path_canon; + int ret; + + if ((from_path_canon = do_realpath(from, from_path)) == NULL) { + error("Unable to canonicalize path \"%s\"", from_path); + return -1; + } + + ret = crossload_dir_internal(from, to, from_path_canon, to_path, 0, + dirattrib, preserve_flag, print_flag); + free(from_path_canon); + return ret; +} + char * path_append(const char *p1, const char *p2) { diff --git a/sftp-client.h b/sftp-client.h index 6f6c49fb2..c7fa04f5b 100644 --- a/sftp-client.h +++ b/sftp-client.h @@ -1,4 +1,4 @@ -/* $OpenBSD: sftp-client.h,v 1.30 2021/03/31 22:16:34 djm Exp $ */ +/* $OpenBSD: sftp-client.h,v 1.31 2021/08/07 00:02:41 djm Exp $ */ /* * Copyright (c) 2001-2004 Damien Miller @@ -150,6 +150,23 @@ int do_upload(struct sftp_conn *, const char *, const char *, int, int, int); int upload_dir(struct sftp_conn *, const char *, const char *, int, int, int, int); +/* + * Download a 'from_path' from the 'from' connection and upload it to + * to 'to' connection at 'to_path'. + */ +int +do_crossload(struct sftp_conn *from, struct sftp_conn *to, + const char *from_path, const char *to_path, + Attrib *a, int preserve_flag); + +/* + * Recursively download a directory from 'from_path' from the 'from' + * connection and upload it to 'to' connection at 'to_path'. + */ +int crossload_dir(struct sftp_conn *from, struct sftp_conn *to, + const char *from_path, const char *to_path, + Attrib *dirattrib, int preserve_flag, int print_flag); + /* Concatenate paths, taking care of slashes. Caller must free result. */ char *path_append(const char *, const char *);