e2e.cc 30.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// ----------------------------------------------------------------------
// File: e2e.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------

/************************************************************************
 * quarkdb - a redis-like highly available key-value store              *
 * Copyright (C) 2016 CERN/Switzerland                                  *
 *                                                                      *
 * This program is free software: you can redistribute it and/or modify *
 * it under the terms of the GNU General Public License as published by *
 * the Free Software Foundation, either version 3 of the License, or    *
 * (at your option) any later version.                                  *
 *                                                                      *
 * This program is distributed in the hope that it will be useful,      *
 * but WITHOUT ANY WARRANTY; without even the implied warranty of       *
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the        *
 * GNU General Public License for more details.                         *
 *                                                                      *
 * You should have received a copy of the GNU General Public License    *
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.*
 ************************************************************************/

#include "raft/RaftDispatcher.hh"
#include "raft/RaftReplicator.hh"
#include "raft/RaftTalker.hh"
#include "raft/RaftTimeouts.hh"
#include "raft/RaftCommitTracker.hh"
29
#include "raft/RaftConfig.hh"
30
#include "Poller.hh"
31
32
#include "Configuration.hh"
#include "QuarkDBNode.hh"
33
34
35
#include "test-utils.hh"
#include "RedisParser.hh"
#include <gtest/gtest.h>
36
#include "test-reply-macros.hh"
37
#include "qclient/QScanner.hh"
Georgios Bitzes's avatar
Georgios Bitzes committed
38
#include "qclient/ConnectionInitiator.hh"
39
40
41

using namespace quarkdb;
#define ASSERT_OK(msg) ASSERT_TRUE(msg.ok())
42
43
class Raft_e2e : public TestCluster3NodesFixture {};
class Raft_e2e5 : public TestCluster5NodesFixture {};
44

Georgios Bitzes's avatar
Georgios Bitzes committed
45
46
47
TEST_F(Raft_e2e, coup) {
  spinup(0); spinup(1); spinup(2);

48
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
Georgios Bitzes's avatar
Georgios Bitzes committed
49
50
51
52
53
54

  int leaderID = getLeaderID();
  ASSERT_GE(leaderID, 0);
  ASSERT_LE(leaderID, 2);

  int instigator = (leaderID+1)%3;
Georgios Bitzes's avatar
Georgios Bitzes committed
55
56
57
58
59
60
61
62
63
64
65
66
  for(int i = 1; i < 10; i++) {
    RaftTerm term = state(instigator)->getCurrentTerm();
    ASSERT_REPLY(tunnel(instigator)->exec("RAFT_ATTEMPT_COUP"), "vive la revolution");
    RETRY_ASSERT_TRUE(state(instigator)->getCurrentTerm() > term);
    RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));

    if(instigator == getLeaderID()) {
      qdb_info("Successful coup in " << i << " attempts");
      return; // pass test
    }
  }
  ASSERT_TRUE(false) << "Test has failed";
Georgios Bitzes's avatar
Georgios Bitzes committed
67
68
}

69
70
TEST_F(Raft_e2e, simultaneous_clients) {
  spinup(0); spinup(1); spinup(2);
71
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
72

73
  int leaderID = getServerID(state(0)->getSnapshot()->leader);
74
75
76
  ASSERT_GE(leaderID, 0);
  ASSERT_LE(leaderID, 2);

77
  LogIndex lastEntry = journal(leaderID)->getLogSize() - 1;
78
79
80
  std::vector<std::future<redisReplyPtr>> futures;

  // send off many requests, pipeline them
81
82
83
84
  futures.emplace_back(tunnel(leaderID)->exec("get", "asdf"));
  futures.emplace_back(tunnel(leaderID)->exec("ping"));
  futures.emplace_back(tunnel(leaderID)->exec("set", "asdf", "1234"));
  futures.emplace_back(tunnel(leaderID)->exec("get", "asdf"));
85
  futures.emplace_back(tunnel(leaderID)->exec("raft_fetch", SSTR(lastEntry+1) ));
86
87
88
89
90
91

  ASSERT_REPLY(futures[0], "");
  ASSERT_REPLY(futures[1], "PONG");
  ASSERT_REPLY(futures[2], "OK");
  ASSERT_REPLY(futures[3], "1234");

92
  RaftEntry entry;
93
  ASSERT_TRUE(RaftParser::fetchResponse(futures[4].get().get(), entry));
94
95
96
  ASSERT_EQ(entry.term, state(0)->getCurrentTerm());
  ASSERT_EQ(entry.request, make_req("set", "asdf", "1234"));

97
  futures.clear();
98
99
  futures.emplace_back(tunnel(leaderID)->exec("set", "asdf", "3456"));
  futures.emplace_back(tunnel(leaderID)->exec("get", "asdf"));
100
101
102
103
104
105
106

  ASSERT_REPLY(futures[0], "OK");
  ASSERT_REPLY(futures[1], "3456");

  // make sure the log entry has been propagated to all nodes
  for(size_t i = 0; i < 3; i++) {
    std::string value;
107
    RETRY_ASSERT_TRUE(stateMachine(i)->get("asdf", value).ok() && value == "3456");
108
109
110
111
112
113
  }

  ASSERT_REPLY(tunnel(leaderID)->exec("set", "qwerty", "789"), "OK");
  futures.clear();

  // interwine pipelined requests from three connections
114
115
  qclient::QClient tunnel2(myself(leaderID).hostname, myself(leaderID).port);
  qclient::QClient tunnel3(myself(leaderID).hostname, myself(leaderID).port);
116
117
118
119
120
121

  futures.emplace_back(tunnel2.exec("get", "qwerty"));
  futures.emplace_back(tunnel(leaderID)->exec("set", "client2", "val"));
  futures.emplace_back(tunnel(leaderID)->exec("get", "client2"));
  futures.emplace_back(tunnel(leaderID)->exec("sadd", "myset", "a"));
  futures.emplace_back(tunnel2.exec("sadd", "myset", "b"));
122
  futures.emplace_back(tunnel2.exec("sadd", "myset")); // malformed request
123
124
125
126
127
128
129
130
131
132
133
  futures.emplace_back(tunnel3.exec("set", "client3", "myval"));
  futures.emplace_back(tunnel3.exec("get", "client3"));

  // not guaranteed that response will be "myval" here, since it's on a different connection
  futures.emplace_back(tunnel2.exec("get", "client3"));

  ASSERT_REPLY(futures[0], "789");
  ASSERT_REPLY(futures[1], "OK");
  ASSERT_REPLY(futures[2], "val");
  ASSERT_REPLY(futures[3], 1);
  ASSERT_REPLY(futures[4], 1);
134
135
136
  ASSERT_REPLY(futures[5], "ERR wrong number of arguments for 'sadd' command");
  ASSERT_REPLY(futures[6], "OK");
  ASSERT_REPLY(futures[7], "myval");
137

138
  redisReplyPtr reply = futures[8].get();
139
140
141
142
143
144
145
146
147
148
149
  std::string str = std::string(reply->str, reply->len);
  qdb_info("Race-y request: GET client3 ==> " << str);
  ASSERT_TRUE(str == "myval" || str == "");

  ASSERT_REPLY(tunnel2.exec("scard", "myset"), 2);

  // but here we've received an ack - response _must_ be myval
  ASSERT_REPLY(tunnel2.exec("get", "client3"), "myval");

  RaftInfo info = dispatcher(leaderID)->info();
  ASSERT_EQ(info.blockedWrites, 0u);
150
  ASSERT_EQ(info.leader, myself(leaderID));
151
152
153
154

  std::string err;
  std::string checkpointPath = SSTR(commonState.testdir << "/checkpoint");

155
  // Before taking a checkpoint, ensure node #0 is caught up
156
  RETRY_ASSERT_TRUE(stateMachine(0)->getLastApplied() == stateMachine(leaderID)->getLastApplied());
157

158
159
160
161
  ASSERT_TRUE(dispatcher()->checkpoint(checkpointPath, err));
  ASSERT_FALSE(dispatcher()->checkpoint(checkpointPath, err)); // exists already

  // pretty expensive to open two extra databases, but necessary
162
  StateMachine checkpointSM(SSTR(checkpointPath << "/state-machine"));
163
164
165
166
167
168
169
170
171
172
173
174
175
176

  std::string tmp;
  ASSERT_OK(checkpointSM.get("client3", tmp));
  ASSERT_EQ(tmp, "myval");

  ASSERT_OK(checkpointSM.get("client2", tmp));
  ASSERT_EQ(tmp, "val");

  // TODO: verify checkpointSM last applied, once atomic commits are implemented

  // ensure the checkpoint journal is identical to the original
  RaftJournal checkpointJournal(SSTR(checkpointPath << "/raft-journal"));
  ASSERT_EQ(checkpointJournal.getLogSize(), journal()->getLogSize());
  for(LogIndex i = 0; i < journal()->getLogSize(); i++) {
177
    RaftEntry entry1, entry2;
178

179
180
    ASSERT_OK(checkpointJournal.fetch(i, entry1));
    ASSERT_OK(journal()->fetch(i, entry2));
181
182
183

    ASSERT_EQ(entry1, entry2);
  }
184
}
185

186
187
TEST_F(Raft_e2e, hscan) {
  spinup(0); spinup(1); spinup(2);
188
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
189
  int leaderID = getServerID(state(0)->getSnapshot()->leader);
190
191

  for(size_t i = 1; i < 10; i++) {
192
    ASSERT_REPLY(tunnel(leaderID)->exec("hset", "hash", SSTR("f" << i), SSTR("v" << i)), 1);
193
194
  }

195
  redisReplyPtr reply = tunnel(leaderID)->exec("hscan", "hash", "0", "cOUnT", "3").get();
196
  ASSERT_REPLY(reply, std::make_pair("next:f4", make_vec("f1", "v1", "f2", "v2", "f3", "v3")));
197

198
  reply = tunnel(leaderID)->exec("hscan", "hash", "0", "asdf", "123").get();
199
200
  ASSERT_ERR(reply, "ERR syntax error");

201
  reply = tunnel(leaderID)->exec("hscan", "hash", "next:f4", "COUNT", "3").get();
202
  ASSERT_REPLY(reply, std::make_pair("next:f7", make_vec("f4", "v4", "f5", "v5", "f6", "v6")));
203

204
  reply = tunnel(leaderID)->exec("hscan", "hash", "next:f7", "COUNT", "30").get();
205
  ASSERT_REPLY(reply, std::make_pair("0", make_vec("f7", "v7", "f8", "v8", "f9", "v9")));
206

207
  reply = tunnel(leaderID)->exec("hscan", "hash", "adfaf").get();
208
209
  ASSERT_ERR(reply, "ERR invalid cursor");

210
  reply = tunnel(leaderID)->exec("hscan", "hash", "next:zz").get();
211
  ASSERT_REPLY(reply, std::make_pair("0", make_vec()));
212
213
}

214
215
216
TEST_F(Raft_e2e, scan) {
  spinup(0); spinup(1); spinup(2);
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
217
  int leaderID = getServerID(state(0)->getSnapshot()->leader);
218
219
220
221
222
223
224
225
226
227

  for(size_t i = 1; i < 10; i++) {
    ASSERT_REPLY(tunnel(leaderID)->exec("set", SSTR("f" << i), SSTR("v" << i)), "OK");
  }

  redisReplyPtr reply = tunnel(leaderID)->exec("scan", "0", "MATCH", "f[1-2]").get();
  ASSERT_REPLY(reply, std::make_pair("0", make_vec("f1", "f2")));

  reply = tunnel(leaderID)->exec("scan", "0", "MATCH", "f*", "COUNT", "3").get();
  ASSERT_REPLY(reply, std::make_pair("next:f4", make_vec("f1", "f2", "f3")));
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249

  // without MATCH
  reply = tunnel(leaderID)->exec("scan", "0", "COUNT", "3").get();
  ASSERT_REPLY(reply, std::make_pair("next:f4", make_vec("f1", "f2", "f3")));

  // with "*" MATCH pattern
  reply = tunnel(leaderID)->exec("scan", "0", "COUNT", "3", "MATCH", "*").get();
  ASSERT_REPLY(reply, std::make_pair("next:f4", make_vec("f1", "f2", "f3")));

  QScanner scanner(*tunnel(leaderID), "f*", 3);

  std::vector<std::string> ret;
  ASSERT_TRUE(scanner.next(ret));
  ASSERT_EQ(ret, make_vec("f1", "f2", "f3"));

  ASSERT_TRUE(scanner.next(ret));
  ASSERT_EQ(ret, make_vec("f4", "f5", "f6"));

  ASSERT_TRUE(scanner.next(ret));
  ASSERT_EQ(ret, make_vec("f7", "f8", "f9"));

  ASSERT_FALSE(scanner.next(ret));
250
251
}

252
253
254
TEST_F(Raft_e2e, test_many_redis_commands) {
  spinup(0); spinup(1); spinup(2);
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
255
  int leaderID = getServerID(state(0)->getSnapshot()->leader);
256
257
258
259
260
261
262
263
264

  std::vector<std::future<redisReplyPtr>> futures;
  futures.emplace_back(tunnel(leaderID)->exec("SADD", "myset", "a", "b", "c"));
  futures.emplace_back(tunnel(leaderID)->exec("SCARD", "myset"));
  futures.emplace_back(tunnel(leaderID)->exec("Smembers", "myset"));
  futures.emplace_back(tunnel(leaderID)->exec("srem", "myset", "a", "b"));
  futures.emplace_back(tunnel(leaderID)->exec("srem", "myset", "b"));
  futures.emplace_back(tunnel(leaderID)->exec("scard", "myset"));
  futures.emplace_back(tunnel(leaderID)->exec("smembers", "myset"));
265
  futures.emplace_back(tunnel(leaderID)->exec("get", "empty_key"));
266
267
268

  ASSERT_REPLY(futures[0], 3);
  ASSERT_REPLY(futures[1], 3);
269
  ASSERT_REPLY(futures[2], make_vec("a", "b", "c"));
270
271
272
  ASSERT_REPLY(futures[3], 2);
  ASSERT_REPLY(futures[4], 0);
  ASSERT_REPLY(futures[5], 1);
273
  ASSERT_REPLY(futures[6], make_vec("c"));
274
  ASSERT_NIL(futures[7]);
275
276
277
278
279
280
281
282
283

  futures.clear();

  futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "a", "b"));
  futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "b", "c"));
  futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "c", "d"));
  futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "a", "d"));
  futures.emplace_back(tunnel(leaderID)->exec("hdel", "myhash", "a", "b", "b"));
  futures.emplace_back(tunnel(leaderID)->exec("hdel", "myhash", "a"));
284
285
286
287
288
289
290
291
292
293
  futures.emplace_back(tunnel(leaderID)->exec("sadd", "myhash", "wrongtype"));
  futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash"));
  futures.emplace_back(tunnel(leaderID)->exec("hdel", "myhash", "c"));
  futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash"));
  futures.emplace_back(tunnel(leaderID)->exec("sadd", "myhash", "wrongtype"));
  futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash"));
  futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "a", "b"));
  futures.emplace_back(tunnel(leaderID)->exec("srem", "myhash", "wrongtype"));
  futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash"));
  futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "a", "b"));
294
  futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash", "myhash", "asdf"));
295
296
297
298
299
  futures.emplace_back(tunnel(leaderID)->exec("hexists", "myhash", "a"));
  futures.emplace_back(tunnel(leaderID)->exec("hexists", "myhash", "b"));
  futures.emplace_back(tunnel(leaderID)->exec("sismember", "myhash", "b"));
  futures.emplace_back(tunnel(leaderID)->exec("scard", "myhash"));
  futures.emplace_back(tunnel(leaderID)->exec("scard", "does-not-exist"));
300
  futures.emplace_back(tunnel(leaderID)->exec("raft_invalid_command"));
301
  futures.emplace_back(tunnel(leaderID)->exec("quarkdb_invalid_command"));
302
  futures.emplace_back(tunnel(leaderID)->exec("raft_fetch_last", "7"));
303

304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
  size_t count = 0;
  ASSERT_REPLY(futures[count++], 1);
  ASSERT_REPLY(futures[count++], 1);
  ASSERT_REPLY(futures[count++], 1);
  ASSERT_REPLY(futures[count++], 0);
  ASSERT_REPLY(futures[count++], 2);
  ASSERT_REPLY(futures[count++], 0);
  ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
  ASSERT_REPLY(futures[count++], 1);
  ASSERT_REPLY(futures[count++], 1);
  ASSERT_REPLY(futures[count++], 0);
  ASSERT_REPLY(futures[count++], 1);
  ASSERT_REPLY(futures[count++], 1);
  ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
  ASSERT_REPLY(futures[count++], 1);
  ASSERT_REPLY(futures[count++], 0);
  ASSERT_REPLY(futures[count++], 1);
  ASSERT_REPLY(futures[count++], 2);
  ASSERT_REPLY(futures[count++], 1);
  ASSERT_REPLY(futures[count++], 0);
  ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
  ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
  ASSERT_REPLY(futures[count++], 0);
327
  ASSERT_REPLY(futures[count++], "ERR internal dispatching error");
328
  ASSERT_REPLY(futures[count++], "ERR internal dispatching error");
329

330
331
332
333
334
335
336
337
338
339
340
  redisReplyPtr entries = futures[count++].get();
  std::vector<RaftEntry> lastEntries;

  ASSERT_TRUE(RaftParser::fetchLastResponse(entries, lastEntries));
  for(size_t i = 1; i <= 7; i++) {
    RaftEntry comparison;
    LogIndex index = journal(leaderID)->getLogSize() - i;
    ASSERT_OK(journal(leaderID)->fetch(index, comparison));
    ASSERT_EQ(lastEntries[7 - i], comparison);
  }

341
342
  futures.clear();
  futures.emplace_back(tunnel(leaderID)->exec("set", "mystring", "asdf"));
343
344
  futures.emplace_back(tunnel(leaderID)->exec("keys", "*"));
  futures.emplace_back(tunnel(leaderID)->exec("exists", "mystring", "myset", "myhash", "adfa", "myhash"));
345
  futures.emplace_back(tunnel(leaderID)->exec("del", "myhash", "myset", "mystring"));
346
  futures.emplace_back(tunnel(leaderID)->exec("exists", "mystring", "myset", "myhash", "adfa", "myhash"));
347
348
349
  futures.emplace_back(tunnel(leaderID)->exec("del", "myhash", "myset"));

  ASSERT_REPLY(futures[0], "OK");
350
  ASSERT_REPLY(futures[1], make_vec("myhash", "myset", "mystring"));
351
352
353
354
  ASSERT_REPLY(futures[2], 4);
  ASSERT_REPLY(futures[3], 3);
  ASSERT_REPLY(futures[4], 0);
  ASSERT_REPLY(futures[5], 0);
355
356
357
358
359
360
361

  futures.clear();
  futures.emplace_back(tunnel(leaderID)->exec("set", "a", "aa"));
  futures.emplace_back(tunnel(leaderID)->exec("set", "aa", "a"));
  futures.emplace_back(tunnel(leaderID)->exec("get", "a"));
  futures.emplace_back(tunnel(leaderID)->exec("del", "a"));
  futures.emplace_back(tunnel(leaderID)->exec("get", "aa"));
362
  futures.emplace_back(tunnel(leaderID)->exec("keys", "*"));
363
364
365
366
367
368

  ASSERT_REPLY(futures[0], "OK");
  ASSERT_REPLY(futures[1], "OK");
  ASSERT_REPLY(futures[2], "aa");
  ASSERT_REPLY(futures[3], 1);
  ASSERT_REPLY(futures[4], "a");
369
  ASSERT_REPLY(futures[5], make_vec("aa"));
370
371

  futures.clear();
372
373
  futures.emplace_back(tunnel(leaderID)->exec("config_getall"));
  futures.emplace_back(tunnel(leaderID)->exec("config_set", "some.config.value", "1234"));
374
375
  futures.emplace_back(tunnel(leaderID)->exec("flushall"));
  futures.emplace_back(tunnel(leaderID)->exec("del", "aa"));
376
377
378
  futures.emplace_back(tunnel(leaderID)->exec("config_get", "some.config.value", "1234"));
  futures.emplace_back(tunnel(leaderID)->exec("config_get", "some.config.value"));
  futures.emplace_back(tunnel(leaderID)->exec("config_getall"));
379

380
381
382
383
384
385
386
  ASSERT_REPLY(futures[0], "");
  ASSERT_REPLY(futures[1], "OK");
  ASSERT_REPLY(futures[2], "OK");
  ASSERT_REPLY(futures[3], 0);
  ASSERT_REPLY(futures[4], "ERR wrong number of arguments for 'config_get' command");
  ASSERT_REPLY(futures[5], "1234");
  ASSERT_REPLY(futures[6], make_vec("some.config.value", "1234"));
387
388
389
390
391
392

  futures.clear();
  futures.emplace_back(tunnel(leaderID)->exec("hset", "hash", "key1", "v1"));
  futures.emplace_back(tunnel(leaderID)->exec("hset", "hash2", "key1", "v1"));
  futures.emplace_back(tunnel(leaderID)->exec("exists", "hash", "hash2"));
  futures.emplace_back(tunnel(leaderID)->exec("del", "hash"));
393
394
  futures.emplace_back(tunnel(leaderID)->exec("raft_info"));
  futures.emplace_back(tunnel(leaderID)->exec("bad_command"));
395
396
397
398
399
400
401
  futures.emplace_back(tunnel(leaderID)->exec("exists", "hash"));
  futures.emplace_back(tunnel(leaderID)->exec("exists", "hash2"));

  ASSERT_REPLY(futures[0], 1);
  ASSERT_REPLY(futures[1], 1);
  ASSERT_REPLY(futures[2], 2);
  ASSERT_REPLY(futures[3], 1);
402
403
404
405
  // ignore futures[4]
  ASSERT_REPLY(futures[5], "ERR unknown command 'bad_command'");
  ASSERT_REPLY(futures[6], 0);
  ASSERT_REPLY(futures[7], 1);
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432

  futures.clear();
  futures.emplace_back(tunnel(leaderID)->exec("hmset", "hmset_test", "f1", "v1", "f2", "v2"));
  futures.emplace_back(tunnel(leaderID)->exec("exists", "hmset_test"));
  futures.emplace_back(tunnel(leaderID)->exec("hmset", "test"));
  futures.emplace_back(tunnel(leaderID)->exec("hmset", "hmset_test", "f2", "v3", "f4"));
  futures.emplace_back(tunnel(leaderID)->exec("hget", "hmset_test", "f1"));
  futures.emplace_back(tunnel(leaderID)->exec("hlen", "hmset_test"));
  futures.emplace_back(tunnel(leaderID)->exec("hmset", "hmset_test", "f2", "value2", "f3", "value3"));
  futures.emplace_back(tunnel(leaderID)->exec("hlen", "hmset_test"));
  futures.emplace_back(tunnel(leaderID)->exec("hget", "hmset_test", "f2"));
  futures.emplace_back(tunnel(leaderID)->exec("hmset", "hmset_test", "f3", "v3"));
  futures.emplace_back(tunnel(leaderID)->exec("hget", "hmset_test", "f3"));
  futures.emplace_back(tunnel(leaderID)->exec("hlen", "hmset_test"));

  ASSERT_REPLY(futures[0], "OK");
  ASSERT_REPLY(futures[1], 1);
  ASSERT_REPLY(futures[2], "ERR wrong number of arguments for 'hmset' command");
  ASSERT_REPLY(futures[3], "ERR wrong number of arguments for 'hmset' command");
  ASSERT_REPLY(futures[4], "v1");
  ASSERT_REPLY(futures[5], 2);
  ASSERT_REPLY(futures[6], "OK");
  ASSERT_REPLY(futures[7], 3);
  ASSERT_REPLY(futures[8], "value2");
  ASSERT_REPLY(futures[9], "OK");
  ASSERT_REPLY(futures[10], "v3");
  ASSERT_REPLY(futures[11], 3);
433
434
435
436

  futures.clear();
  futures.emplace_back(tunnel(leaderID)->exec("lpush", "list_test", "i1", "i2", "i3", "i4"));
  futures.emplace_back(tunnel(leaderID)->exec("exists", "list_test"));
Georgios Bitzes's avatar
Georgios Bitzes committed
437
  futures.emplace_back(tunnel(leaderID)->exec("llen", "list_test"));
438
  futures.emplace_back(tunnel(leaderID)->exec("lpop", "list_test"));
Georgios Bitzes's avatar
Georgios Bitzes committed
439
  futures.emplace_back(tunnel(leaderID)->exec("llen", "list_test"));
440
  futures.emplace_back(tunnel(leaderID)->exec("rpop", "list_test"));
Georgios Bitzes's avatar
Georgios Bitzes committed
441
  futures.emplace_back(tunnel(leaderID)->exec("llen", "list_test"));
442
  futures.emplace_back(tunnel(leaderID)->exec("del", "list_test"));
Georgios Bitzes's avatar
Georgios Bitzes committed
443
  futures.emplace_back(tunnel(leaderID)->exec("llen", "list_test"));
444
445
446
447
448
449
450
451
452
453
  futures.emplace_back(tunnel(leaderID)->exec("lpop", "list_test"));
  futures.emplace_back(tunnel(leaderID)->exec("rpush", "list_test", "i5", "i6", "i7", "i8"));
  futures.emplace_back(tunnel(leaderID)->exec("set", "list_test", "asdf"));
  futures.emplace_back(tunnel(leaderID)->exec("lpop", "list_test"));
  futures.emplace_back(tunnel(leaderID)->exec("rpop", "list_test"));
  futures.emplace_back(tunnel(leaderID)->exec("rpop", "list_test"));
  futures.emplace_back(tunnel(leaderID)->exec("lpop", "list_test"));
  futures.emplace_back(tunnel(leaderID)->exec("set", "list_test", "asdf"));
  futures.emplace_back(tunnel(leaderID)->exec("lpop", "list_test"));

Georgios Bitzes's avatar
Georgios Bitzes committed
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
  int i = 0;
  ASSERT_REPLY(futures[i++], 4);
  ASSERT_REPLY(futures[i++], 1);
  ASSERT_REPLY(futures[i++], 4);
  ASSERT_REPLY(futures[i++], "i4");
  ASSERT_REPLY(futures[i++], 3);
  ASSERT_REPLY(futures[i++], "i1");
  ASSERT_REPLY(futures[i++], 2);
  ASSERT_REPLY(futures[i++], 1);
  ASSERT_REPLY(futures[i++], 0);
  ASSERT_NIL(futures[i++]);
  ASSERT_REPLY(futures[i++], 4);
  ASSERT_REPLY(futures[i++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
  ASSERT_REPLY(futures[i++], "i5");
  ASSERT_REPLY(futures[i++], "i8");
  ASSERT_REPLY(futures[i++], "i7");
  ASSERT_REPLY(futures[i++], "i6");
  ASSERT_REPLY(futures[i++], "OK");
  ASSERT_REPLY(futures[i++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value");
473
474
}

475
476
TEST_F(Raft_e2e, replication_with_trimmed_journal) {
  spinup(0); spinup(1);
477
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1));
478

479
  int leaderID = getServerID(state(0)->getSnapshot()->leader);
480
481
482
483
  int firstSlaveID = (leaderID+1)%2;
  ASSERT_GE(leaderID, 0);
  ASSERT_LE(leaderID, 1);

484
  // First, disable automatic resilvering..
485
  EncodedConfigChange configChange = raftconfig(leaderID)->setResilveringEnabled(false);
486
487
  ASSERT_TRUE(configChange.error.empty());
  ASSERT_REPLY(tunnel(leaderID)->execute(configChange.request), "OK");
488
489

  // send off many requests, pipeline them
490
  std::vector<std::future<redisReplyPtr>> futures;
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
  for(size_t i = 0; i < testreqs.size(); i++) {
    futures.emplace_back(tunnel(leaderID)->execute(testreqs[i]));
  }

  for(size_t i = 0; i < 2; i++) {
    ASSERT_REPLY(futures[i], "OK");
  }

  for(size_t i = 2; i < futures.size(); i++) {
    ASSERT_REPLY(futures[i], 1);
  }

  // now let's trim leader's journal..
  journal(leaderID)->trimUntil(4);

  // and verify it's NOT possible to bring node #2 up to date
  spinup(2);
508
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
509
510
511
512
513
514
515
516
517

  ASSERT_EQ(journal(2)->getLogSize(), 1);
  ASSERT_EQ(journal(2)->getLogStart(), 0);

  // a divine intervention fills up the missing entries in node #2 journal
  for(size_t i = 1; i < 5; i++) {
    RaftEntry entry;
    ASSERT_TRUE(journal(firstSlaveID)->fetch(i, entry).ok());

518
    journal(2)->append(i, entry);
519
520
521
  }

  // now verify node #2 can be brought up to date successfully
522
523
524
525
526
  RETRY_ASSERT_TRUE(
    journal(0)->getLogSize() == journal(1)->getLogSize() &&
    journal(1)->getLogSize() == journal(2)->getLogSize()
  );

527
528
  ASSERT_EQ(journal(2)->getLogSize(), journal(leaderID)->getLogSize());
  ASSERT_EQ(journal(2)->getLogSize(), journal(firstSlaveID)->getLogSize());
529
530
531

  // Verify resilvering didn't happen.
  ASSERT_EQ(journal(2)->getLogStart(), 0);
532
}
Georgios Bitzes's avatar
Georgios Bitzes committed
533
534
535

TEST_F(Raft_e2e, membership_updates) {
  spinup(0); spinup(1); spinup(2);
536
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
537
  int leaderID = getServerID(state(0)->getSnapshot()->leader);
Georgios Bitzes's avatar
Georgios Bitzes committed
538
539
540
541
  ASSERT_REPLY(tunnel(leaderID)->exec("set", "pi", "3.141516"), "OK");

  // throw a node out of the cluster
  int victim = (leaderID+1) % 3;
542
  RETRY_ASSERT_TRUE(checkFullConsensus(0, 1, 2));
543
  int index = journal(leaderID)->getLogSize() - 1;
Georgios Bitzes's avatar
Georgios Bitzes committed
544
  ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(victim).toString()), "OK");
545
  RETRY_ASSERT_TRUE(dispatcher(leaderID)->info().commitIndex == index + 1);
Georgios Bitzes's avatar
Georgios Bitzes committed
546
547

  // verify the cluster has not been disrupted
548
  ASSERT_EQ(state(leaderID)->getSnapshot()->leader, myself(leaderID));
Georgios Bitzes's avatar
Georgios Bitzes committed
549
550
551

  // add it back as an observer, verify consensus
  ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_ADD_OBSERVER", myself(victim).toString()), "OK");
552

553
554
555
  RETRY_ASSERT_TRUE(dispatcher(0)->info().commitIndex == index + 2);
  RETRY_ASSERT_TRUE(dispatcher(1)->info().commitIndex == index + 2);
  RETRY_ASSERT_TRUE(dispatcher(2)->info().commitIndex == index + 2);
556

557
  ASSERT_EQ(state(victim)->getSnapshot()->status, RaftStatus::FOLLOWER);
Georgios Bitzes's avatar
Georgios Bitzes committed
558

559
560
  ASSERT_EQ(state(0)->getSnapshot()->leader, state(1)->getSnapshot()->leader);
  ASSERT_EQ(state(1)->getSnapshot()->leader, state(2)->getSnapshot()->leader);
Georgios Bitzes's avatar
Georgios Bitzes committed
561
562
563
564
565

  ASSERT_EQ(journal(0)->getLogSize(), journal(1)->getLogSize());
  ASSERT_EQ(journal(1)->getLogSize(), journal(2)->getLogSize());

  // cannot be a leader, it's an observer
566
  ASSERT_NE(state(0)->getSnapshot()->leader, myself(victim));
Georgios Bitzes's avatar
Georgios Bitzes committed
567
568

  // add back as a full voting member
569
  leaderID = getServerID(state(0)->getSnapshot()->leader);
Georgios Bitzes's avatar
Georgios Bitzes committed
570
  ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_PROMOTE_OBSERVER", myself(victim).toString()), "OK");
571
  RETRY_ASSERT_TRUE(dispatcher(leaderID)->info().commitIndex == index + 3);
572
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
Georgios Bitzes's avatar
Georgios Bitzes committed
573
574
}

575
576
TEST_F(Raft_e2e, reject_dangerous_membership_update) {
  spinup(0); spinup(1);
Georgios Bitzes's avatar
Georgios Bitzes committed
577
  RETRY_ASSERT_TRUE(checkFullConsensus(0, 1));
578
579
580
581
582
583
  int leaderID = getLeaderID();

  // make sure dangerous node removal is prevented
  int victim = (leaderID+1) % 2;
  redisReplyPtr reply = tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(victim).toString()).get();
  ASSERT_ERR(reply, "ERR membership update blocked, new cluster would not have an up-to-date quorum");
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602

  // Try to remove a non-existent node
  ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", RaftServer("random_host", 123).toString()),
    "ERR random_host:123 is neither an observer nor a full node.");

  // Make sure we can remove the third node
  ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(2).toString()), "OK");
  RaftMembership membership = journal(leaderID)->getMembership();
  RETRY_ASSERT_TRUE(journal(leaderID)->getCommitIndex() == membership.epoch);

  // Add it back as observer
  ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_ADD_OBSERVER", myself(2).toString()), "OK");
  membership = journal(leaderID)->getMembership();
  RETRY_ASSERT_TRUE(journal(leaderID)->getCommitIndex() == membership.epoch);

  // Remove it again
  ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(2).toString()), "OK");
  membership = journal(leaderID)->getMembership();
  RETRY_ASSERT_TRUE(journal(leaderID)->getCommitIndex() == membership.epoch);
603
604
}

Georgios Bitzes's avatar
Georgios Bitzes committed
605
606
607
TEST_F(Raft_e2e5, membership_updates_with_disruptions) {
  // let's get this party started
  spinup(0); spinup(1); spinup(2); spinup(3);
608
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2, 3));
Georgios Bitzes's avatar
Georgios Bitzes committed
609
610

  // throw node #4 out of the cluster
611
  int leaderID = getServerID(state(0)->getSnapshot()->leader);
Georgios Bitzes's avatar
Georgios Bitzes committed
612
  ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(4).toString()), "OK");
613
614
615
616
617
618

  // wait until membership update has been committed
  RaftMembership membership = journal(leaderID)->getMembership();
  ASSERT_GT(membership.epoch, 0u);
  ASSERT_EQ(membership.nodes.size(), 4u);
  RETRY_ASSERT_TRUE(journal(leaderID)->getCommitIndex() == membership.epoch);
Georgios Bitzes's avatar
Georgios Bitzes committed
619
620
621

  // .. and now spinup node #4 :> Ensure it doesn't disrupt the current leader
  spinup(4);
622
  std::this_thread::sleep_for(raftclock()->getTimeouts().getHigh()*2);
623
  ASSERT_EQ(leaderID, getServerID(state(0)->getSnapshot()->leader));
Georgios Bitzes's avatar
Georgios Bitzes committed
624
625

  // verify the cluster has not been disrupted
626
  ASSERT_EQ(state(leaderID)->getSnapshot()->leader, myself(leaderID));
Georgios Bitzes's avatar
Georgios Bitzes committed
627
628
629
630
631
632

  // remove one more node
  int victim = (leaderID+1) % 5;
  if(victim == 4) victim = 2;

  ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(victim).toString()), "OK");
633
  std::this_thread::sleep_for(raftclock()->getTimeouts().getHigh()*2);
Georgios Bitzes's avatar
Georgios Bitzes committed
634
635

  // verify the cluster has not been disrupted
636
  ASSERT_EQ(state(leaderID)->getSnapshot()->leader, myself(leaderID));
Georgios Bitzes's avatar
Georgios Bitzes committed
637
638
639
640
641

  // issue a bunch of writes and reads
  ASSERT_REPLY(tunnel(leaderID)->exec("set", "123", "abc"), "OK");
  ASSERT_REPLY(tunnel(leaderID)->exec("get", "123"), "abc");
}
642

Georgios Bitzes's avatar
Georgios Bitzes committed
643
TEST_F(Raft_e2e, leader_steps_down_after_follower_loss) {
644
645
646
647
648
649
650
651
652
653
654
655
656
657
  // cluster with 2 nodes
  spinup(0); spinup(1);
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1));

  int leaderID = getLeaderID();
  ASSERT_GE(leaderID, 0);
  ASSERT_LE(leaderID, 1);

  RaftTerm term = state(leaderID)->getCurrentTerm();

  int followerID = (leaderID + 1)%2;
  spindown(followerID);

  RETRY_ASSERT_TRUE(term < state(leaderID)->getCurrentTerm());
658
  ASSERT_TRUE(state(leaderID)->getSnapshot()->leader.empty());
659
}
660
661
662
663
664
665
666
667
668
669
670

TEST_F(Raft_e2e, stale_reads) {
  spinup(0); spinup(1); spinup(2);
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));

  int leaderID = getLeaderID();
  int follower = (getLeaderID() + 1) % 3;

  ASSERT_REPLY(tunnel(leaderID)->exec("set", "abc", "1234"), "OK");
  ASSERT_REPLY(tunnel(follower)->exec("get", "abc"), SSTR("MOVED 0 " << myself(leaderID).toString()));

Georgios Bitzes's avatar
Georgios Bitzes committed
671
  ASSERT_REPLY(tunnel(follower)->exec("activate-stale-reads"), "OK");
672
673
674
675
676
677
678

  redisReplyPtr reply = tunnel(follower)->exec("get", "abc").get();
  qdb_info("Race-y read: " << std::string(reply->str, reply->len));

  RETRY_ASSERT_TRUE(checkFullConsensus(0, 1, 2));
  ASSERT_REPLY(tunnel(follower)->exec("get", "abc"), "1234");
}
Georgios Bitzes's avatar
Georgios Bitzes committed
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709

TEST_F(Raft_e2e, monitor) {
  spinup(0); spinup(1); spinup(2);
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));

  int leaderID = getLeaderID();

  // We can't use QClient for this, it can't handle the output of MONITOR
  qclient::ConnectionInitiator initiator("localhost", myself(leaderID).port);
  ASSERT_TRUE(initiator.ok());

  Link link(initiator.getFd());
  BufferedReader reader(&link);

  ASSERT_EQ(link.Send("*1\r\n$7\r\nMONITOR\r\n"), 17);
  ASSERT_EQ(link.Send("random string"), 13);

  std::string response;
  RETRY_ASSERT_TRUE(reader.consume(5, response));
  ASSERT_EQ(response, "+OK\r\n");

  tunnel(leaderID)->exec("set", "abc", "aaaa");
  response.clear();
  RETRY_ASSERT_TRUE(reader.consume(21, response));
  ASSERT_EQ(response, "+\"set\" \"abc\" \"aaaa\"\r\n");

  tunnel(leaderID)->exec("get", "abc");
  response.clear();
  RETRY_ASSERT_TRUE(reader.consume(14, response));
  ASSERT_EQ(response, "+\"get\" \"abc\"\r\n");
}
Georgios Bitzes's avatar
Georgios Bitzes committed
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727

TEST_F(Raft_e2e, hincrbymulti) {
  spinup(0); spinup(1); spinup(2);
  RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));

  int leaderID = getLeaderID();

  ASSERT_REPLY(tunnel(leaderID)->exec("hincrbymulti", "h1", "h2", "3", "h2", "h3", "4"), 7);
  ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h1", "h2"), "3");
  ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h2", "h3"), "4");

  ASSERT_REPLY(tunnel(leaderID)->exec("hincrbymulti", "h1", "h2", "-5", "h2", "h3", "20", "h4", "h8"), "ERR wrong number of arguments for 'hincrbymulti' command");
  ASSERT_REPLY(tunnel(leaderID)->exec("hincrbymulti", "h1", "h2", "-5", "h2", "h3", "20", "h4", "h8", "13"), 35);

  ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h1", "h2"), "-2");
  ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h2", "h3"), "24");
  ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h4", "h8"), "13");
}