Browse Source

Remove write_buf, simplify FUSE msg dispatching

pull/1092/head
Antonio SJ Musumeci 2 years ago
parent
commit
dba9c4451a
  1. 9
      libfuse/include/fuse_lowlevel.h
  2. 3
      libfuse/lib/buffer.c
  3. 3
      libfuse/lib/fuse_i.h
  4. 270
      libfuse/lib/fuse_lowlevel.c

9
libfuse/include/fuse_lowlevel.h

@ -234,8 +234,7 @@ struct fuse_lowlevel_ops
* @param ino the inode number * @param ino the inode number
* @param fi for future use, currently always NULL * @param fi for future use, currently always NULL
*/ */
void (*getattr)(fuse_req_t req, uint64_t ino,
fuse_file_info_t *fi);
void (*getattr)(fuse_req_t req, uint64_t ino, fuse_file_info_t *fi);
/** /**
* Set file attributes * Set file attributes
@ -957,8 +956,10 @@ struct fuse_lowlevel_ops
* @param offset the offset supplied to fuse_lowlevel_notify_retrieve() * @param offset the offset supplied to fuse_lowlevel_notify_retrieve()
* @param bufv the buffer containing the returned data * @param bufv the buffer containing the returned data
*/ */
void (*retrieve_reply)(fuse_req_t req, void *cookie, uint64_t ino,
off_t offset, struct fuse_bufvec *bufv);
void (*retrieve_reply)(fuse_req_t req,
void *cookie,
uint64_t ino,
off_t offset);
/** /**
* Forget about multiple inodes * Forget about multiple inodes

3
libfuse/lib/buffer.c

@ -182,8 +182,7 @@ static ssize_t fuse_buf_splice(const struct fuse_buf *dst, size_t dst_off,
} }
while (len) { while (len) {
res = splice(src->fd, srcpos, dst->fd, dstpos, len,
splice_flags);
res = splice(src->fd, srcpos, dst->fd, dstpos, len, splice_flags);
if (res == -1) { if (res == -1) {
if (copied) if (copied)
break; break;

3
libfuse/lib/fuse_i.h

@ -46,8 +46,7 @@ struct fuse_notify_req
void (*reply)(struct fuse_notify_req *, void (*reply)(struct fuse_notify_req *,
fuse_req_t, fuse_req_t,
uint64_t, uint64_t,
const void *,
const struct fuse_buf *);
const void *);
struct fuse_notify_req *next; struct fuse_notify_req *next;
struct fuse_notify_req *prev; struct fuse_notify_req *prev;
}; };

270
libfuse/lib/fuse_lowlevel.c

@ -1296,58 +1296,6 @@ do_write(fuse_req_t req,
req->f->op.write(req,nodeid,param,arg->size,arg->offset,&fi); req->f->op.write(req,nodeid,param,arg->size,arg->offset,&fi);
} }
static
void
do_write_buf(fuse_req_t req,
uint64_t nodeid,
const void *inarg,
const struct fuse_buf *ibuf)
{
struct fuse_ll *f = req->f;
struct fuse_bufvec bufv = {
.buf[0] = *ibuf,
.count = 1,
};
fuse_file_info_t fi = {0};
struct fuse_write_in *arg = (struct fuse_write_in *) inarg;
fi.fh = arg->fh;
fi.writepage = arg->write_flags & 1;
if (req->f->conn.proto_minor < 9)
{
bufv.buf[0].mem = ((char *) arg) + FUSE_COMPAT_WRITE_IN_SIZE;
bufv.buf[0].size -= sizeof(struct fuse_in_header) +
FUSE_COMPAT_WRITE_IN_SIZE;
assert(!(bufv.buf[0].flags & FUSE_BUF_IS_FD));
}
else
{
fi.lock_owner = arg->lock_owner;
fi.flags = arg->flags;
if (!(bufv.buf[0].flags & FUSE_BUF_IS_FD))
bufv.buf[0].mem = PARAM(arg);
bufv.buf[0].size -= sizeof(struct fuse_in_header) +
sizeof(struct fuse_write_in);
}
if (bufv.buf[0].size < arg->size)
{
fprintf(stderr, "fuse: do_write_buf: buffer size too small\n");
fuse_reply_err(req, EIO);
goto out;
}
bufv.buf[0].size = arg->size;
req->f->op.write_buf(req, nodeid, &bufv, arg->offset, &fi);
out:
/* Need to reset the pipe if ->write_buf() didn't consume all data */
if ((ibuf->flags & FUSE_BUF_IS_FD) && bufv.idx < bufv.count)
fuse_ll_clear_pipe(f);
}
static static
void void
do_flush(fuse_req_t req, do_flush(fuse_req_t req,
@ -1973,8 +1921,7 @@ static
void void
do_notify_reply(fuse_req_t req, do_notify_reply(fuse_req_t req,
uint64_t nodeid, uint64_t nodeid,
const void *inarg,
const struct fuse_buf *buf)
const void *inarg)
{ {
struct fuse_ll *f = req->f; struct fuse_ll *f = req->f;
struct fuse_notify_req *nreq; struct fuse_notify_req *nreq;
@ -1982,9 +1929,9 @@ do_notify_reply(fuse_req_t req,
pthread_mutex_lock(&f->lock); pthread_mutex_lock(&f->lock);
head = &f->notify_list; head = &f->notify_list;
for (nreq = head->next; nreq != head; nreq = nreq->next)
for(nreq = head->next; nreq != head; nreq = nreq->next)
{ {
if (nreq->unique == req->unique)
if(nreq->unique == req->unique)
{ {
list_del_nreq(nreq); list_del_nreq(nreq);
break; break;
@ -1992,8 +1939,8 @@ do_notify_reply(fuse_req_t req,
} }
pthread_mutex_unlock(&f->lock); pthread_mutex_unlock(&f->lock);
if (nreq != head)
nreq->reply(nreq, req, nodeid, inarg, buf);
if(nreq != head)
nreq->reply(nreq, req, nodeid, inarg);
} }
static static
@ -2031,7 +1978,7 @@ send_notify_iov(struct fuse_ll *f,
{ {
struct fuse_out_header out; struct fuse_out_header out;
if (!f->got_init)
if(!f->got_init)
return -ENOTCONN; return -ENOTCONN;
out.unique = 0; out.unique = 0;
@ -2045,7 +1992,7 @@ send_notify_iov(struct fuse_ll *f,
int int
fuse_lowlevel_notify_poll(fuse_pollhandle_t *ph) fuse_lowlevel_notify_poll(fuse_pollhandle_t *ph)
{ {
if (ph != NULL)
if(ph != NULL)
{ {
struct fuse_notify_poll_wakeup_out outarg; struct fuse_notify_poll_wakeup_out outarg;
struct iovec iov[2]; struct iovec iov[2];
@ -2208,46 +2155,14 @@ void
fuse_ll_retrieve_reply(struct fuse_notify_req *nreq, fuse_ll_retrieve_reply(struct fuse_notify_req *nreq,
fuse_req_t req, fuse_req_t req,
uint64_t ino, uint64_t ino,
const void *inarg,
const struct fuse_buf *ibuf)
const void *inarg)
{ {
struct fuse_ll *f = req->f;
struct fuse_retrieve_req *rreq = struct fuse_retrieve_req *rreq =
container_of(nreq, struct fuse_retrieve_req, nreq); container_of(nreq, struct fuse_retrieve_req, nreq);
const struct fuse_notify_retrieve_in *arg = inarg;
struct fuse_bufvec bufv = {
.buf[0] = *ibuf,
.count = 1,
};
if (!(bufv.buf[0].flags & FUSE_BUF_IS_FD))
bufv.buf[0].mem = PARAM(arg);
bufv.buf[0].size -= sizeof(struct fuse_in_header) +
sizeof(struct fuse_notify_retrieve_in);
if (bufv.buf[0].size < arg->size)
{
fprintf(stderr, "fuse: retrieve reply: buffer size too small\n");
fuse_reply_none(req);
goto out;
}
bufv.buf[0].size = arg->size;
if (req->f->op.retrieve_reply)
{
req->f->op.retrieve_reply(req, rreq->cookie, ino,
arg->offset, &bufv);
}
else
{
fuse_reply_none(req); fuse_reply_none(req);
}
out:
free(rreq); free(rreq);
if ((ibuf->flags & FUSE_BUF_IS_FD) && bufv.idx < bufv.count)
fuse_ll_clear_pipe(f);
} }
int int
@ -2266,15 +2181,15 @@ fuse_lowlevel_notify_retrieve(struct fuse_chan *ch,
if (!ch) if (!ch)
return -EINVAL; return -EINVAL;
f = (struct fuse_ll *)fuse_session_data(fuse_chan_session(ch));
if (!f)
f = (struct fuse_ll*)fuse_session_data(fuse_chan_session(ch));
if(!f)
return -ENODEV; return -ENODEV;
if (f->conn.proto_minor < 15)
if(f->conn.proto_minor < 15)
return -ENOSYS; return -ENOSYS;
rreq = malloc(sizeof(*rreq)); rreq = malloc(sizeof(*rreq));
if (rreq == NULL)
if(rreq == NULL)
return -ENOMEM; return -ENOMEM;
pthread_mutex_lock(&f->lock); pthread_mutex_lock(&f->lock);
@ -2293,7 +2208,7 @@ fuse_lowlevel_notify_retrieve(struct fuse_chan *ch,
iov[1].iov_len = sizeof(outarg); iov[1].iov_len = sizeof(outarg);
err = send_notify_iov(f, ch, FUSE_NOTIFY_RETRIEVE, iov, 2); err = send_notify_iov(f, ch, FUSE_NOTIFY_RETRIEVE, iov, 2);
if (err)
if(err)
{ {
pthread_mutex_lock(&f->lock); pthread_mutex_lock(&f->lock);
list_del_nreq(&rreq->nreq); list_del_nreq(&rreq->nreq);
@ -2362,78 +2277,29 @@ static struct {
[FUSE_POLL] = { do_poll, "POLL" }, [FUSE_POLL] = { do_poll, "POLL" },
[FUSE_FALLOCATE] = { do_fallocate, "FALLOCATE" }, [FUSE_FALLOCATE] = { do_fallocate, "FALLOCATE" },
[FUSE_DESTROY] = { do_destroy, "DESTROY" }, [FUSE_DESTROY] = { do_destroy, "DESTROY" },
[FUSE_NOTIFY_REPLY] = { (void *) 1, "NOTIFY_REPLY" },
[FUSE_NOTIFY_REPLY] = { do_notify_reply, "NOTIFY_REPLY" },
[FUSE_BATCH_FORGET] = { do_batch_forget, "BATCH_FORGET" }, [FUSE_BATCH_FORGET] = { do_batch_forget, "BATCH_FORGET" },
[FUSE_COPY_FILE_RANGE] = { do_copy_file_range, "COPY_FILE_RANGE" }, [FUSE_COPY_FILE_RANGE] = { do_copy_file_range, "COPY_FILE_RANGE" },
}; };
#define FUSE_MAXOP (sizeof(fuse_ll_ops) / sizeof(fuse_ll_ops[0])) #define FUSE_MAXOP (sizeof(fuse_ll_ops) / sizeof(fuse_ll_ops[0]))
static
int
fuse_ll_copy_from_pipe(struct fuse_bufvec *dst,
struct fuse_bufvec *src)
{
int res = fuse_buf_copy(dst, src, 0);
if (res < 0)
{
fprintf(stderr, "fuse: copy from pipe: %s\n", strerror(-res));
return res;
}
if (res < fuse_buf_size(dst))
{
fprintf(stderr, "fuse: copy from pipe: short read\n");
return -1;
}
return 0;
}
static static
void void
fuse_ll_process_buf(void *data, fuse_ll_process_buf(void *data,
const struct fuse_buf *buf, const struct fuse_buf *buf,
struct fuse_chan *ch) struct fuse_chan *ch)
{ {
struct fuse_ll *f = (struct fuse_ll *) data;
const size_t write_header_size = sizeof(struct fuse_in_header) +
sizeof(struct fuse_write_in);
struct fuse_bufvec bufv = { .buf[0] = *buf, .count = 1 };
struct fuse_bufvec tmpbuf = FUSE_BUFVEC_INIT(write_header_size);
struct fuse_ll *f = (struct fuse_ll*)data;
struct fuse_in_header *in; struct fuse_in_header *in;
const void *inarg; const void *inarg;
struct fuse_req *req; struct fuse_req *req;
void *mbuf = NULL;
int err; int err;
int res;
if (buf->flags & FUSE_BUF_IS_FD)
{
if (buf->size < tmpbuf.buf[0].size)
tmpbuf.buf[0].size = buf->size;
mbuf = malloc(tmpbuf.buf[0].size);
if (mbuf == NULL)
{
fprintf(stderr, "fuse: failed to allocate header\n");
goto clear_pipe;
}
tmpbuf.buf[0].mem = mbuf;
res = fuse_ll_copy_from_pipe(&tmpbuf, &bufv);
if (res < 0)
goto clear_pipe;
in = mbuf;
}
else
{
in = buf->mem; in = buf->mem;
}
req = fuse_ll_alloc_req(f); req = fuse_ll_alloc_req(f);
if (req == NULL)
if(req == NULL)
{ {
struct fuse_out_header out = { struct fuse_out_header out = {
.unique = in->unique, .unique = in->unique,
@ -2445,7 +2311,7 @@ fuse_ll_process_buf(void *data,
}; };
fuse_send_msg(f, ch, &iov, 1); fuse_send_msg(f, ch, &iov, 1);
goto clear_pipe;
return;
} }
req->unique = in->unique; req->unique = in->unique;
@ -2469,50 +2335,17 @@ fuse_ll_process_buf(void *data,
} }
err = ENOSYS; err = ENOSYS;
if (in->opcode >= FUSE_MAXOP || !fuse_ll_ops[in->opcode].func)
goto reply_err;
if ((buf->flags & FUSE_BUF_IS_FD) && write_header_size < buf->size &&
(in->opcode != FUSE_WRITE || !f->op.write_buf) &&
in->opcode != FUSE_NOTIFY_REPLY)
{
void *newmbuf;
err = ENOMEM;
newmbuf = realloc(mbuf, buf->size);
if (newmbuf == NULL)
goto reply_err;
mbuf = newmbuf;
tmpbuf = FUSE_BUFVEC_INIT(buf->size - write_header_size);
tmpbuf.buf[0].mem = mbuf + write_header_size;
res = fuse_ll_copy_from_pipe(&tmpbuf, &bufv);
err = -res;
if (res < 0)
if(in->opcode >= FUSE_MAXOP)
goto reply_err; goto reply_err;
in = mbuf;
}
inarg = (void *) &in[1];
if (in->opcode == FUSE_WRITE)
do_write_buf(req, in->nodeid, inarg, buf);
else if (in->opcode == FUSE_NOTIFY_REPLY)
do_notify_reply(req, in->nodeid, inarg, buf);
else
inarg = (void*)&in[1];
fuse_ll_ops[in->opcode].func(req, in->nodeid, inarg); fuse_ll_ops[in->opcode].func(req, in->nodeid, inarg);
out_free:
free(mbuf);
return; return;
reply_err: reply_err:
fuse_reply_err(req, err); fuse_reply_err(req, err);
clear_pipe:
if (buf->flags & FUSE_BUF_IS_FD)
fuse_ll_clear_pipe(f);
goto out_free;
return;
} }
enum { enum {
@ -2643,34 +2476,33 @@ fuse_ll_receive_buf(struct fuse_session *se,
struct fuse_ll *f = fuse_session_data(se); struct fuse_ll *f = fuse_session_data(se);
size_t bufsize = buf->size; size_t bufsize = buf->size;
struct fuse_ll_pipe *llp; struct fuse_ll_pipe *llp;
struct fuse_buf tmpbuf;
int err; int err;
int res; int res;
if (f->conn.proto_minor < 14 || !(f->conn.want & FUSE_CAP_SPLICE_READ))
if(f->conn.proto_minor < 14 || !(f->conn.want & FUSE_CAP_SPLICE_READ))
goto fallback; goto fallback;
llp = fuse_ll_get_pipe(f); llp = fuse_ll_get_pipe(f);
if (llp == NULL) if (llp == NULL)
goto fallback; goto fallback;
if (llp->size < bufsize)
if(llp->size < bufsize)
{ {
if (llp->can_grow)
if(llp->can_grow)
{ {
res = fcntl(llp->pipe[0], F_SETPIPE_SZ, bufsize); res = fcntl(llp->pipe[0], F_SETPIPE_SZ, bufsize);
if (res == -1)
if(res == -1)
{ {
llp->can_grow = 0; llp->can_grow = 0;
goto fallback; goto fallback;
} }
llp->size = res; llp->size = res;
} }
if (llp->size < bufsize)
if(llp->size < bufsize)
goto fallback; goto fallback;
} }
res = splice(fuse_chan_fd(se->ch), NULL, llp->pipe[1], NULL, bufsize, 0);
res = splice(fuse_chan_fd(se->ch), NULL, llp->pipe[1], NULL, bufsize, SPLICE_F_MOVE);
err = errno; err = errno;
if(fuse_session_exited(se)) if(fuse_session_exited(se))
@ -2696,50 +2528,14 @@ fuse_ll_receive_buf(struct fuse_session *se,
return -EIO; return -EIO;
} }
tmpbuf = (struct fuse_buf) {
.size = res,
.flags = FUSE_BUF_IS_FD,
.fd = llp->pipe[0],
};
/*
* Don't bother with zero copy for small requests.
* fuse_loop_mt() needs to check for FORGET so this more than
* just an optimization.
*/
if (res < sizeof(struct fuse_in_header) +
sizeof(struct fuse_write_in) + pagesize)
{
struct fuse_bufvec src = { .buf[0] = tmpbuf, .count = 1 };
struct fuse_bufvec dst = { .buf[0] = *buf, .count = 1 };
res = fuse_buf_copy(&dst, &src, 0);
if (res < 0) {
fprintf(stderr, "fuse: copy from pipe: %s\n",
strerror(-res));
fuse_ll_clear_pipe(f);
return res;
}
if (res < tmpbuf.size)
{
fprintf(stderr, "fuse: copy from pipe: short read\n");
fuse_ll_clear_pipe(f);
return -EIO;
}
buf->size = tmpbuf.size;
return buf->size;
}
*buf = tmpbuf;
struct iovec iov = { buf->mem, res };
res = vmsplice(llp->pipe[0], &iov, 1, 0);
return res; return res;
fallback: fallback:
res = fuse_chan_recv(se->ch, buf->mem, bufsize);
if (res <= 0)
res = fuse_chan_recv(se->ch, buf->mem, buf->size);
if(res <= 0)
return res; return res;
buf->size = res; buf->size = res;
@ -2752,10 +2548,10 @@ int
fuse_ll_receive_buf(struct fuse_session *se, fuse_ll_receive_buf(struct fuse_session *se,
struct fuse_buf *buf) struct fuse_buf *buf)
{ {
(void) se;
int res;
int res = fuse_chan_recv(se->ch, buf->mem, buf->size);
if (res <= 0)
res = fuse_chan_recv(se->ch, buf->mem, buf->size);
if(res <= 0)
return res; return res;
buf->size = res; buf->size = res;

Loading…
Cancel
Save