Skip to content

Commit

Permalink
Merge pull request #299 from cruzdb/bench-updates
Browse files Browse the repository at this point in the history
Bench updates
  • Loading branch information
dotnwat committed Dec 4, 2018
2 parents f46e106 + 5fab76d commit eea69c5
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 33 deletions.
25 changes: 25 additions & 0 deletions run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash
set -e
set -x

while true; do
for w in 1 2 100; do
for s in 1 2 100; do
for z in 0 1 2; do
for q in 1 8 32; do
rm -rf /tmp/db
mkdir /tmp/db
bin/zlog_bench \
--backend lmdb \
--db-path /tmp/db \
--runtime 10 \
--verify \
--width $w \
--slots $s \
--size $z \
--qdepth $q
done
done
done
done
done
8 changes: 7 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ endif(WITH_JNI)
add_executable(zlog_bench bench.cc)
target_link_libraries(zlog_bench
libzlog
zlog_backend_ram
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_SYSTEM_LIBRARY}
)

add_executable(zlog zlog.cc)
target_link_libraries(zlog
libzlog
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_SYSTEM_LIBRARY}
)
Expand Down
239 changes: 212 additions & 27 deletions src/bench.cc
Original file line number Diff line number Diff line change
@@ -1,46 +1,231 @@
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <random>
#include <thread>
#include <signal.h>
#include <time.h>
#include <unistd.h>
#include <boost/program_options.hpp>
#include "zlog/backend/ram.h"
#include "zlog/options.h"
#include "zlog/log.h"

namespace po = boost::program_options;

class rand_data_gen {
public:
rand_data_gen(size_t buf_size, size_t samp_size) :
buf_size_(buf_size),
dist_(0, buf_size_ - samp_size - 1)
{}

void generate() {
std::uniform_int_distribution<uint64_t> d(
std::numeric_limits<uint64_t>::min(),
std::numeric_limits<uint64_t>::max());
buf_.reserve(buf_size_);
while (buf_.size() < buf_size_) {
uint64_t val = d(gen_);
buf_.append((const char *)&val, sizeof(val));
}
if (buf_.size() > buf_size_)
buf_.resize(buf_size_);
}

inline const char *sample() {
assert(!buf_.empty());
return buf_.c_str() + dist_(gen_);
}

private:
const size_t buf_size_;
std::string buf_;
std::default_random_engine gen_;
std::uniform_int_distribution<size_t> dist_;
};

static inline uint64_t __getns(clockid_t clock)
{
struct timespec ts;
clock_gettime(clock, &ts);
return (((uint64_t)ts.tv_sec) * 1000000000ULL) + ts.tv_nsec;
}

static inline uint64_t getus()
{
return __getns(CLOCK_MONOTONIC) / 1000;
}

static std::atomic<bool> shutdown;
static std::atomic<uint64_t> op_count;

static std::mutex lock;
static std::condition_variable cond;

static void sig_handler(int sig)
{
shutdown = true;
}

static void stats_entry()
{
while (true) {
auto start_ops_count = op_count.load();
auto start_us = getus();

std::unique_lock<std::mutex> lk(lock);
cond.wait_for(lk, std::chrono::seconds(1),
[&] { return shutdown.load(); });
if (shutdown) {
break;
}

auto end_us = getus();
auto end_ops_count = op_count.load();

auto elapsed_us = end_us - start_us;

auto iops = (double)((end_ops_count - start_ops_count) *
1000000ULL) / (double)elapsed_us;

std::cout << iops << std::endl;
}
}

int main(int argc, char **argv)
{
auto backend = std::unique_ptr<zlog::storage::ram::RAMBackend>(
new zlog::storage::ram::RAMBackend());
std::string log_name;
uint32_t width;
uint32_t slots;
size_t entry_size;
int qdepth;
bool excl_open;
bool verify;
int runtime;
std::string backend;
std::string pool;
std::string db_path;
bool blackhole;

po::options_description opts("Benchmark options");
opts.add_options()
("help", "show help message")
("name", po::value<std::string>(&log_name)->default_value("bench"), "log name")
("width", po::value<uint32_t>(&width)->default_value(10), "stripe width")
("slots", po::value<uint32_t>(&slots)->default_value(10), "object slots")
("size", po::value<size_t>(&entry_size)->default_value(1024), "entry size")
("qdepth", po::value<int>(&qdepth)->default_value(1), "queue depth")
("excl", po::bool_switch(&excl_open), "exclusive open")
("verify", po::bool_switch(&verify), "verify writes")
("runtime", po::value<int>(&runtime)->default_value(0), "runtime")

("backend", po::value<std::string>(&backend)->required(), "backend")
("pool", po::value<std::string>(&pool)->default_value("zlog"), "pool (ceph)")
("db-path", po::value<std::string>(&db_path)->default_value("/tmp/zlog.bench.db"), "db path (lmdb)")
("blackhole", po::bool_switch(&blackhole), "black hole (ram)")
;

po::variables_map vm;
po::store(po::parse_command_line(argc, argv, opts), vm);

if (vm.count("help")) {
std::cout << opts << std::endl;
return 1;
}

po::notify(vm);

runtime = std::max(runtime, 0);

zlog::Options options;
options.backend = std::move(backend);
options.backend_name = backend;

if (backend == "ceph") {
options.backend_options["pool"] = pool;
// zero-length string here causes default path search
options.backend_options["conf_file"] = "";
}

if (backend == "lmdb") {
options.backend_options["path"] = db_path;
}

if (backend == "ram") {
if (blackhole) {
options.backend_options["blackhole"] = "true";
}
}

options.create_if_missing = true;
options.error_if_exists = true;
options.init_stripe_on_create = true;
options.error_if_exists = excl_open;

options.stripe_width = width;
options.stripe_slots = slots;
options.max_inflight_ops = qdepth;

zlog::Log *log;
int ret = zlog::Log::Open(options, "mylog", &log);
assert(ret == 0);

#if 0
for (int i = 0; i < 100000; i++) {
uint64_t pos;
int ret = log->Append("data", &pos);
assert(ret == 0);
std::cout << pos << std::endl;
}
#else
std::mutex lock;
for (int i = 0; i < 50000; i++) {
int ret = log->appendAsync("data", [&](int ret, uint64_t pos) {
std::lock_guard<std::mutex> lk(lock);
std::cout << pos << " " << ret << std::endl;
int ret = zlog::Log::Open(options, log_name, &log);
if (ret) {
std::cerr << "log::open failed: " << strerror(-ret) << std::endl;
return -1;
}

signal(SIGINT, sig_handler);
signal(SIGALRM, sig_handler);
alarm(runtime);

rand_data_gen dgen(1ULL << 22, entry_size);
dgen.generate();

// TODO: by always logging the same entry data we may trigger low-level
// compression to take affect, if such a thing exists. something to be aware
// of and watch out for.
const auto entry_data = std::string(dgen.sample(), entry_size);

std::thread stats_thread(stats_entry);

op_count = 0;
while (!shutdown) {
int ret = log->appendAsync(entry_data, [&](int ret, uint64_t pos) {
if (ret && ret != -ESHUTDOWN) {
std::cerr << "appendAsync cb failed: " << strerror(-ret) << std::endl;
assert(0);
return;
}
op_count++;
});
assert(ret == 0);
if (ret) {
std::cerr << "appendAsync failed: " << strerror(-ret) << std::endl;
assert(0);
break;
}
}

shutdown = true;
cond.notify_one();
stats_thread.join();

if (verify) {
uint64_t tail;
auto ret = log->CheckTail(&tail);
if (ret) {
std::cerr << "checktail failed: " << strerror(-ret) << std::endl;
} else {
for (uint64_t pos = 0; pos < tail; pos++) {
std::string data;
ret = log->Read(pos, &data);
if (ret) {
std::cerr << "read failed at pos " << pos << ": " << strerror(-ret) << std::endl;
} else if (data != entry_data) {
std::cerr << "verify failed at pos " << pos << std::endl;
assert(0);
}
}
}
}
#endif

// will wait for async ops to run callbacks
std::cout << "done looping" << std::endl;
sleep(3);
std::cout << "done sleeping" << std::endl;
delete log;

return 0;
Expand Down
4 changes: 3 additions & 1 deletion src/include/zlog/backend/ram.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace ram {
class RAMBackend : public Backend {
public:
RAMBackend() :
blackhole_(false),
options_{{"scheme", "ram"}}
{}

Expand Down Expand Up @@ -97,8 +98,9 @@ class RAMBackend : public Backend {

private:
mutable std::mutex lock_;
bool blackhole_;
std::map<std::string, std::string> options_;
std::map<std::string,
std::unordered_map<std::string,
boost::variant<LinkObject, ProjectionObject, LogObject>> objects_;
};

Expand Down
3 changes: 0 additions & 3 deletions src/include/zlog/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ struct Options {
// advanced
int max_refresh_views_read = 20;

int width = 10;
int entries_per_object = 200;

Statistics* statistics = nullptr;
std::vector<std::string> http;

Expand Down
8 changes: 8 additions & 0 deletions src/libzlog/striper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ int Striper::propose_sequencer()
auto& stripe = it->second;
int ret = seal_stripe(stripe, next_epoch, &max_pos, &empty);
if (ret < 0) {
if (ret == -ESPIPE) {
update_current_view(v.epoch());
return 0;
}
return ret;
}

Expand All @@ -363,6 +367,10 @@ int Striper::propose_sequencer()
auto& stripe = it->second;
int ret = seal_stripe(stripe, next_epoch, nullptr, nullptr);
if (ret < 0) {
if (ret == -ESPIPE) {
update_current_view(v.epoch());
return 0;
}
return ret;
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/storage/ram/ram.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <vector>
#include <atomic>
#include <boost/algorithm/string.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
Expand All @@ -17,6 +18,11 @@ RAMBackend::~RAMBackend()
int RAMBackend::Initialize(
const std::map<std::string, std::string>& opts)
{
auto it = opts.find("blackhole");
if (it != opts.end()) {
blackhole_ = boost::iequals(it->second, "yes") ||
boost::iequals(it->second, "true");
}
return 0;
}

Expand Down Expand Up @@ -306,7 +312,9 @@ int RAMBackend::Write(const std::string& oid, const std::string& data,
auto it = lobj->entries.find(position);
if (it == lobj->entries.end()) {
LogEntry entry;
entry.data = data;
if (!blackhole_) {
entry.data = data;
}
lobj->entries.emplace(position, entry);
lobj->maxpos = std::max(lobj->maxpos, position);
return 0;
Expand Down
Loading

0 comments on commit eea69c5

Please sign in to comment.