[fs] Remove unused VFS dispatcher

Change-Id: Iba3355ee37c3de9f60d437fccd7ae15902f85be6
Esse commit está contido em:
Sean Klein
2017-09-07 12:04:14 -07:00
commit 5514c11d3c
7 arquivos alterados com 0 adições e 700 exclusões
-2
Ver Arquivo
@@ -15,12 +15,10 @@ static_library("fs") {
"include/fs/mapped-vmo.h",
"include/fs/trace.h",
"include/fs/vfs-client.h",
"include/fs/vfs-dispatcher.h",
"include/fs/vfs.h",
"async-dispatcher.cpp",
"mapped-vmo.cpp",
"vfs.cpp",
"vfs-dispatcher.cpp",
"vfs-mount.cpp",
"vfs-rpc.cpp",
"vfs-unmount.cpp",
-84
Ver Arquivo
@@ -1,84 +0,0 @@
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#pragma once
#include <stdlib.h>
#include <stdint.h>
#include <threads.h>
#include <magenta/compiler.h>
#include <magenta/types.h>
#include <magenta/syscalls/port.h>
#include <mx/event.h>
#include <mx/port.h>
#include <fbl/array.h>
#include <fbl/intrusive_double_list.h>
#include <fbl/ref_counted.h>
#include <fbl/unique_ptr.h>
#include <mxio/dispatcher.h>
#include <fs/vfs.h>
#include "dispatcher.h"
namespace fs {
class Handler : public fbl::DoublyLinkedListable<fbl::unique_ptr<Handler>> {
public:
Handler(mx::channel channel, vfs_dispatcher_cb_t cb, void* cookie) :
channel_(fbl::move(channel)), cb_(cb), cookie_(cookie) {
}
~Handler();
mx_status_t SetAsyncCallback(const mx::port& dispatch_port);
mx_status_t CancelAsyncCallback(const mx::port& dispatch_port);
mx_status_t ExecuteCallback(mxio_dispatcher_cb_t dispatch_cb) {
return dispatch_cb(channel_.get(), (void*) cb_, cookie_);
}
void ExecuteCloseCallback(mxio_dispatcher_cb_t dispatch_cb) {
dispatch_cb(MX_HANDLE_INVALID, (void*) cb_, cookie_);
}
void Close();
private:
DISALLOW_COPY_ASSIGN_AND_MOVE(Handler);
mx::channel channel_;
vfs_dispatcher_cb_t cb_;
void* cookie_;
};
// VfsDispatcher is a dispatcher which uses a pool of threads to distribute
// requests to an underlying handlers concurrently.
class VfsDispatcher final : public fs::Dispatcher {
public:
~VfsDispatcher();
static mx_status_t Create(mxio_dispatcher_cb_t cb, uint32_t pool_size,
fbl::unique_ptr<fs::VfsDispatcher>* out);
void DisconnectHandler(Handler*, bool);
void RunOnCurrentThread();
mx_status_t AddVFSHandler(mx::channel channel, vfs_dispatcher_cb_t cb, void* iostate) final;
private:
VfsDispatcher(mxio_dispatcher_cb_t cb, uint32_t pool_size);
mx_status_t Start(const char* name);
mxio_dispatcher_cb_t cb_;
uint32_t pool_size_;
fbl::Array<thrd_t> t_;
mx::event shutdown_event_;
mtx_t lock_;
fbl::DoublyLinkedList<fbl::unique_ptr<Handler>> handlers_ __TA_GUARDED(lock_);
uint32_t n_threads_;
// NOTE: port_ intentionally declared after handlers_, so it
// is shut down before the handlers are destroyed.
mx::port port_;
};
} // namespace fs
-1
Ver Arquivo
@@ -15,7 +15,6 @@ MODULE_SRCS += \
$(LOCAL_DIR)/vfs-mount.cpp \
$(LOCAL_DIR)/vfs-unmount.cpp \
$(LOCAL_DIR)/vfs-rpc.cpp \
$(LOCAL_DIR)/vfs-dispatcher.cpp \
$(LOCAL_DIR)/vfs-watcher.cpp \
MODULE_STATIC_LIBS := \
-255
Ver Arquivo
@@ -1,255 +0,0 @@
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <limits.h>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#include <magenta/compiler.h>
#include <magenta/syscalls.h>
#include <magenta/syscalls/port.h>
#include <magenta/threads.h>
#include <mx/event.h>
#include <mx/object.h>
#include <mx/port.h>
#include <mxio/debug.h>
#include <mxio/dispatcher.h>
#include <fbl/algorithm.h>
#include <fbl/alloc_checker.h>
#include <fbl/auto_lock.h>
#include <fs/vfs-dispatcher.h>
#define MXDEBUG 0
// ** NOTE -- this prototype multithreaded dispatcher is only used by
// ** minfs as part of ongoing multithread development. Not yet safe
// ** for general consumption
namespace fs {
Handler::~Handler() {
Close();
}
mx_status_t Handler::SetAsyncCallback(const mx::port& dispatch_port) {
// queue a message on port whenever the channel is readable or closed
return channel_.wait_async(dispatch_port, (uint64_t)(uintptr_t)this,
MX_CHANNEL_READABLE|MX_CHANNEL_PEER_CLOSED,
MX_WAIT_ASYNC_ONCE);
}
mx_status_t Handler::CancelAsyncCallback(const mx::port& dispatch_port) {
return dispatch_port.cancel(channel_.get(), (uint64_t)(uintptr_t)this);
}
void Handler::Close() {
channel_.reset();
}
VfsDispatcher::VfsDispatcher(mxio_dispatcher_cb_t cb, uint32_t pool_size) :
cb_(cb), pool_size_(pool_size), n_threads_(0) {
mtx_init(&lock_, mtx_plain);
}
VfsDispatcher::~VfsDispatcher() {
mx_status_t status;
// *up to clients to lock to prevent add/run activity during destructor*
// kill off worker threads, so no new cb activity
// - send suicide events -- existing queue clears then workers exit
// - join all threads
// close port so no queue new activity; no new handlers can be added,
// - remaining messages discarded
// clean up and delete remaining handlers
// suicide: cause worker threads to wake and die
// (ideally, we could close the port and the threads would die on their own)
// shut down existing handlers (to prevent races)
if ((status = shutdown_event_.signal(0u, MX_EVENT_SIGNALED)) != MX_OK) {
FS_TRACE_ERROR("couldn't send kill signal to thread\n");
}
// reap worker threads
for (unsigned i = 0; i < n_threads_; i++) {
int rc;
int r = thrd_join(t_[i], &rc);
if (r != thrd_success) {
printf("mxio_dispatcher_destroy: join failure %d\n", r);
}
}
}
static void GetThreadName(char* name, size_t namelen) {
mx_status_t r = mx_object_get_property(thrd_get_mx_handle(thrd_current()),
MX_PROP_NAME, name, namelen);
if (r != MX_OK)
strncpy(name, "???", namelen);
}
void VfsDispatcher::DisconnectHandler(Handler* handler, bool need_close_cb) {
// close handle, so we get no further messages
handler->Close();
if (need_close_cb) {
handler->ExecuteCloseCallback(cb_);
}
}
void VfsDispatcher::RunOnCurrentThread() {
mx_status_t r;
// when draining queue, limit the number of messages you take
// at once, so you don't dominate the cpu
constexpr unsigned kMaxMessageBatchSize = 4;
char tname[128];
GetThreadName(tname, sizeof(tname));
for (;;) {
mx_port_packet_t packet;
if ((r = port_.wait(MX_TIME_INFINITE, &packet, 0u)) < 0) {
xprintf("mxio_dispatcher: port wait failed %d, worker exiting\n", r);
return;
}
xprintf("port_wait: thread %s \n", tname);
if ((packet.signal.observed & MX_EVENT_SIGNALED) != 0) {
// reset for the next thread
r = shutdown_event_.wait_async(port_, 0u, MX_EVENT_SIGNALED,
MX_WAIT_ASYNC_ONCE);
if (r != MX_OK) {
FS_TRACE_ERROR("vfs-dispatcher: error, couldn't reset thread event\n");
}
// exit thread
xprintf("%s: suicide\n", tname);
return;
}
xprintf("thrd_: port_wait: returns key %p effective:%#x \n",
(void*)packet.key, packet.signal.observed);
Handler* handler = (Handler*)(uintptr_t)packet.key;
if (packet.signal.observed & MX_CHANNEL_READABLE) {
// hit cb multiple times if we know multi packets available
for (unsigned ix = 0; ix < fbl::min(kMaxMessageBatchSize, (unsigned)packet.signal.count); ++ix) {
if ((r = handler->ExecuteCallback(cb_)) != MX_OK) {
// error or close: invoke callback in case of error
DisconnectHandler(handler, r != ERR_DISPATCHER_DONE);
goto free_handler;
}
}
// maybe more work to do: re-arm handler to fire again
if ((r = handler->SetAsyncCallback(port_))!= MX_OK){
DisconnectHandler(handler, true);
goto free_handler;
}
} else if (packet.signal.observed & MX_CHANNEL_PEER_CLOSED) {
DisconnectHandler(handler, true);
free_handler:
{
fbl::AutoLock md_lock(&lock_);
handlers_.erase(*handler);
}
}
}
}
mx_status_t VfsDispatcher::Create(mxio_dispatcher_cb_t cb, uint32_t pool_size,
fbl::unique_ptr<fs::VfsDispatcher>* out) {
fbl::AllocChecker ac;
fbl::unique_ptr<fs::VfsDispatcher> dispatcher(new (&ac) fs::VfsDispatcher(cb, pool_size));
if (!ac.check()) {
return MX_ERR_NO_MEMORY;
}
thrd_t* t = new (&ac) thrd_t[pool_size];
if (!ac.check()) {
return MX_ERR_NO_MEMORY;
}
dispatcher->t_.reset(t, pool_size);
mx_status_t status;
if ((status = mx::port::create(0, &dispatcher->port_)) != MX_OK) {
return status;
}
if ((status = mx::event::create(0u, &dispatcher->shutdown_event_)) != MX_OK) {
return status;
}
status = dispatcher->shutdown_event_.wait_async(dispatcher->port_, 0u,
MX_EVENT_SIGNALED,
MX_WAIT_ASYNC_ONCE);
if (status != MX_OK) {
return status;
}
if ((status = dispatcher->Start("VFS Dispatcher")) != MX_OK) {
return status;
}
*out = fbl::move(dispatcher);
return MX_OK;
}
static int mxio_dispatcher_thread(void* arg) {
VfsDispatcher* md = reinterpret_cast<VfsDispatcher*>(arg);
md->RunOnCurrentThread();
return 0;
}
mx_status_t VfsDispatcher::Start(const char* name) {
char namebuf[NAME_MAX];
fbl::AutoLock md_lock(&lock_);
mx_status_t r;
if (n_threads_ != 0) {
// already initialized
return MX_ERR_BAD_STATE;
}
xprintf("starting dispatcher with %d threads\n", pool_size_);
for (uint32_t i = 0; i < pool_size_; i++) {
if (pool_size_ > 1) {
snprintf(namebuf, sizeof(namebuf), "%s-%u", name, n_threads_);
} else {
snprintf(namebuf, sizeof(namebuf), "%s", name);
}
xprintf("start thread %s\n", namebuf);
if ((r = thrd_create_with_name(&t_[n_threads_], mxio_dispatcher_thread,
this, namebuf)) != thrd_success) {
return MX_ERR_NO_RESOURCES;
} else {
n_threads_++;
}
}
return MX_OK;
}
mx_status_t VfsDispatcher::AddVFSHandler(mx::channel channel, vfs_dispatcher_cb_t cb, void* cookie) {
fbl::AllocChecker ac;
fbl::unique_ptr<Handler> handler(new (&ac) Handler(fbl::move(channel), cb, cookie));
if (!ac.check()) {
return MX_ERR_NO_MEMORY;
}
fbl::AutoLock md_lock(&lock_);
// set us up to receive read/close callbacks from handler on port_
mx_status_t status;
if ((status = handler->SetAsyncCallback(port_)) != MX_OK) {
return status;
} else {
handlers_.push_back(fbl::move(handler));
}
return MX_OK;
}
} // namespace fs
-9
Ver Arquivo
@@ -1,9 +0,0 @@
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <unittest/unittest.h>
int main(int argc, char** argv) {
return unittest_run_all_tests(argc, argv) ? 0 : -1;
}
-29
Ver Arquivo
@@ -1,29 +0,0 @@
# Copyright 2017 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
LOCAL_DIR := $(GET_LOCAL_DIR)
MODULE := $(LOCAL_DIR)
MODULE_TYPE := usertest
MODULE_NAME := dispatch-test
MODULE_SRCS := \
$(LOCAL_DIR)/main.cpp \
$(LOCAL_DIR)/test-multi-dispatch.cpp \
MODULE_STATIC_LIBS := \
system/ulib/fs \
system/ulib/mx \
system/ulib/fbl \
system/ulib/mxcpp \
MODULE_LIBS := \
system/ulib/c \
system/ulib/magenta \
system/ulib/mxio \
system/ulib/unittest \
include make/module.mk
@@ -1,320 +0,0 @@
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <fcntl.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <threads.h>
#include <unistd.h>
#include <fs/dispatcher.h>
#include <fs/vfs-dispatcher.h>
#include <magenta/types.h>
#include <magenta/syscalls.h>
#include <magenta/syscalls/port.h>
#include <mxio/debug.h>
#include <fbl/algorithm.h>
#include <fbl/alloc_checker.h>
#include <fbl/unique_ptr.h>
#include <unittest/unittest.h>
#undef MXDEBUG
#define MXDEBUG 0
// multithreaded dispatcher (vfs-dispatch) test suite
static constexpr auto MAX_MSG = 120;
static constexpr auto STR_DATA = "testdata";
static constexpr auto STR_KILL = "exit";
static constexpr auto kMaxFlushTime = 15; // wait at most 15 sec for messages to flush
class Msg : public mx_port_packet_t {
public:
char str[64];
unsigned idx;
unsigned worker;
Msg(unsigned _idx, const char* _str, unsigned _worker):
idx(_idx), worker(_worker) {
type = MX_PKT_TYPE_USER;
strcpy(str, _str);
}
};
class Handler {
cnd_t writer_finished_cond_;
mtx_t writer_finished_lock_;
unsigned writer_count_;
public:
unsigned counts[MAX_MSG];
void signal_finished() {
mtx_lock(&writer_finished_lock_);
writer_count_--;
cnd_signal(&writer_finished_cond_);
mtx_unlock(&writer_finished_lock_);
}
bool wait_for_finish() {
mx_status_t status;
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += kMaxFlushTime;
mtx_lock(&writer_finished_lock_);
while(writer_count_ > 0) {
status = cnd_timedwait(&writer_finished_cond_, &writer_finished_lock_, &ts);
ASSERT_EQ(status, 0);
}
mtx_unlock(&writer_finished_lock_);
return true;
}
Handler(unsigned n_writers) {
mtx_init(&writer_finished_lock_, mtx_plain);
cnd_init(&writer_finished_cond_);
writer_count_ = n_writers;
memset(counts, 0, sizeof(counts));
}
};
// we write operations down a channel, which result in callbacks
// to make sure we've given the channel a chance to clear, we send
// a final message with a sentinel value which signals a "done" condition
// the tests wait for all writers to report finished before tearing down
// the dispatcher
static bool signal_finished(mx_handle_t ch) {
Msg pmsg(0, STR_KILL, 0);
mx_status_t status;
status = mx_channel_write(ch, 0u, &pmsg, sizeof(pmsg), nullptr, 0u);
ASSERT_EQ(status, MX_OK);
return true;
}
using cb_t = mx_status_t (*)(Msg* msg, Handler* cookie);
static mx_status_t handler_cb(mxrio_msg_t* msg_, void* cookie) {
auto msg = reinterpret_cast<Msg*>(msg_);
auto handler = reinterpret_cast<Handler*>(cookie);
if (strcmp(msg->str, STR_KILL) == 0) {
// this is the dispatch from the last message sent. signal
// that this part of the test is over.
handler->signal_finished();
} else {
// after several levels of indirection, receive a message that
// contains a unique index [0,MAX_MSG-]; bump the handler counts
// for that index. we should get one bump per bucket.
ASSERT_LT(msg->idx, MAX_MSG);
ASSERT_EQ(strcmp(msg->str, STR_DATA), 0, "channel read bad string payoad");
xprintf("worker %u: inc %u\n", msg->worker, msg->idx);
handler->counts[msg->idx]++;
// one thread can race through most of our callbacks; yield to make
// sure we mix things up a little
thrd_yield();
}
return MX_OK;
}
static mx_status_t disp_cb(mx_handle_t h, void* handler_cb, void* handler_data) {
auto cb = reinterpret_cast<fs::vfs_dispatcher_cb_t>(handler_cb);
// read the message and call the handler
ASSERT_NE(h, 0, "unexpected handle close in dispatcher");
Msg imsg(0, "", 0);
uint32_t dsz = sizeof(imsg);
mx_status_t r;
r = mx_channel_read(h, 0u, &imsg, nullptr, dsz, 0, &dsz, nullptr);
ASSERT_EQ(r, 0, "channel read failed");
ASSERT_EQ(dsz, sizeof(imsg), "channel read unexpected length");
ASSERT_LT(imsg.idx, MAX_MSG, "channel read bad index payload");
r = cb((mxrio_msg_t*) &imsg, handler_data);
ASSERT_EQ(r, MX_OK, "dispatch callback");
r = mx_channel_write(h, 0, &imsg, sizeof(imsg), nullptr, 0);
ASSERT_EQ(r, MX_OK, "channel reply");
return r;
}
bool test_multi_basic(void) {
// send MAX_MSG indexed writes down a channel attached to a dispatcher
// attached handler will bump a counter for each dispatched index
// make sure we get one bump for each and every message
static constexpr unsigned DISPATCH_POOL_SIZE = 4;
BEGIN_TEST;
// create dispatcher
mx_status_t status;
fbl::unique_ptr<fs::VfsDispatcher> disp;
ASSERT_EQ(MX_OK, fs::VfsDispatcher::Create(disp_cb, DISPATCH_POOL_SIZE, &disp));
// create a channel; write to one end, bind the other to the server port
mx::channel ch[2];
status = mx::channel::create(0u, &ch[0], &ch[1]);
ASSERT_EQ(status, MX_OK);
// associate a handler object that will track state
Handler handler(1);
status = disp->AddVFSHandler(fbl::move(ch[1]), handler_cb, &handler);
ASSERT_EQ(status, MX_OK);
// write MAX_MSG messages -- should result in all handler counts == 1
for (auto msgno=0; msgno<MAX_MSG; msgno++) {
Msg omsg(msgno, STR_DATA, 0);
status = ch[0].write(0u, &omsg, sizeof(omsg), nullptr, 0u);
ASSERT_EQ(status, MX_OK);
thrd_yield();
}
signal_finished(ch[0].get());
handler.wait_for_finish();
// tear down the dispatcher object (closes and waits for thread pool)
disp = nullptr;
ch[0].reset();
// when all callbacks have finished, the handler counts
// should all have been bumped
for (auto i=0; i<MAX_MSG; i++) {
ASSERT_EQ(handler.counts[i], 1);
}
END_TEST;
}
struct Work {
uint32_t worker;
uint32_t iter;
mx_handle_t ch;
uint32_t* idx;
uint32_t idx_len;
};
static int parallel_writer_thread(void* arg) {
Work* work = (Work*)arg;
// write a random subset of MAX_MSG messages
xprintf("WORKER %d: ch: %u idx: %p\n", work->worker, work->ch, work->idx);
for (uint32_t iter=0; iter<work->iter; iter++) {
for (uint32_t i=0; i<work->idx_len; i++) {
Msg omsg(work->idx[i], STR_DATA, work->worker);
xprintf("write msg %d\n", work->idx[i]);
mx_status_t status;
status = mx_channel_write(work->ch, 0u, &omsg, sizeof(omsg), nullptr, 0u);
ASSERT_EQ(status, MX_OK);
thrd_yield();
}
}
signal_finished(work->ch);
return true;
}
static bool parallel_write(mx_handle_t ch, Handler* handler,
uint32_t idx[],
uint32_t idx_len, uint32_t n_writers, uint32_t iter) {
ASSERT_EQ(idx_len % n_writers, 0, "msg count must be multiple of pool size");
uint32_t n_work = idx_len / n_writers;
Work work[n_writers];
for (uint32_t th=0; th < n_writers; th++) {
work[th].worker = th;
work[th].iter = iter;
work[th].ch = ch;
work[th].idx = idx + (th*n_work);
work[th].idx_len = n_work;
}
thrd_t t[n_writers];
// spin off your workers
for (uint32_t th=0; th < n_writers; th++) {
char name[128];
snprintf(name, sizeof(name), "th-%d", th);
int status = thrd_create_with_name(&t[th], (thrd_start_t)parallel_writer_thread, (void*)(work+th), name);
ASSERT_EQ(status, thrd_success);
}
// wait for all of the workers t signal they're done
handler->wait_for_finish();
// wait for the writer threads to exit
for (uint32_t th=0; th<n_writers; th++) {
int rc;
int status = thrd_join(t[th], &rc);
ASSERT_EQ(status, thrd_success);
ASSERT_EQ(rc, true);
}
return true;
}
bool test_multi_multi(void) {
// similar to multi_basic, only partition the work of sending
// messages among several threads and randomize the order of
// the messages being sent
static constexpr unsigned DISPATCH_POOL_SIZE = 4;
static constexpr unsigned WRITER_POOL_SIZE = 6;
static constexpr unsigned WRITE_ITER = 5;
BEGIN_TEST;
// create dispatcher
mx_status_t status;
fbl::unique_ptr<fs::VfsDispatcher> disp;
ASSERT_EQ(MX_OK, fs::VfsDispatcher::Create(disp_cb, DISPATCH_POOL_SIZE, &disp));
// create a channel; write to one end, bind the other to the server port
mx::channel ch[2];
status = mx::channel::create(0u, &ch[0], &ch[1]);
ASSERT_EQ(status, MX_OK);
// associate a handler object that will track state
Handler handler(WRITER_POOL_SIZE);
status = disp->AddVFSHandler(fbl::move(ch[1]), handler_cb, &handler);
ASSERT_EQ(status, MX_OK);
// make sure the counters get bumped in random order
uint32_t idx[MAX_MSG];
for (uint32_t i=0; i<fbl::count_of(idx); i++) {
idx[i] = i;
}
for (uint32_t i=0; i<fbl::count_of(idx); i++) {
auto i1 = rand() % MAX_MSG;
auto i2 = rand() % MAX_MSG;
auto tmp = idx[i1];
idx[i1] = idx[i2];
idx[i2] = tmp;
}
parallel_write(ch[0].get(), &handler, idx, fbl::count_of(idx), WRITER_POOL_SIZE, WRITE_ITER);
// tear down the dispatcher object (closes and waits for thread pool)
disp = nullptr;
ch[0].reset();
// all counts should be bumped == WRITE_ITER
for (auto i=0; i<MAX_MSG; i++) {
ASSERT_EQ(handler.counts[i], WRITE_ITER);
}
END_TEST;
}
BEGIN_TEST_CASE(multi_dispatch_tests)
RUN_TEST_MEDIUM(test_multi_basic)
RUN_TEST_MEDIUM(test_multi_multi)
END_TEST_CASE(multi_dispatch_tests)