AcWing「Linux基础课」第 6 讲 Thrift

Thrift 实战

服务调用结构图

image-20220403130734191.png

创建项目文件目录 match_system、game、thriftreadme.md 文件

  • thrift 目录用于存放 远程调用接口文件
  • match_system:是匹配系统
  • game:是游戏系统

1、在 thrift 下编写 match.thrift 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
namespace cpp match_service

struct User {
1: i32 id,
2: string name,
3: i32 score
}

service Match {
i32 add_user(1: User user, 2: string info),

i32 remove_user(1: User user, 2: string info),
}

2、在 match_system/src 下通过 match.thrift 文件生成 C++ 服务端客户端代码,但我们只用服务端

1
thrift -r --gen cpp ../../thrift/match.thrift

生成的代码目录

1
2
3
4
5
6
7
8
acs@a7e3435d46dd:~/homework/lesson_6/thrift_lesson/match_system/src$ tree .
.
`-- gen-cpp
|-- Match.cpp
|-- Match.h
|-- Match_server.skeleton.cpp
|-- match_types.cpp
`-- match_types.h

gen-cpp 文件夹改名为 match_server 作为匹配系统的 server 端

把生成的服务端代码 Match_server.skeleton.cpp 移动到 src 目录下并改名为 main.cpp

3、修改 main.cpp 写自己的代码逻辑,Match Server 1.0

先给两个函数添加 return 0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>

#include <iostream>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace ::match_service;
using namespace std;

class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}

int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");

return 0;
}

int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");

return 0;
}

};

int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);

cout << "Start Match Server" << endl;

server.serve();
return 0;
}

编译测试

  1. 编译: g+t -c main.cpp match server/*.cpp,编译所有的 .cpp 文件
  2. 链接: g++ *.o -o main -lthrift,将所有 .o 文件链接起来,-lthrift 是 thrift 的动态链接库

小技巧:写 thrift 代码先编译跑通环境,再逐步往里添加模块

4、进入 game/src 目录生成 python 服务端客户端代码,只用客户端

1
thrift -r --gen py ../../thrift/match.thrift

生成的代码目录

1
2
3
4
5
6
7
8
.
|-- __init__.py
`-- match
|-- Match-remote # python 服务器端代码
|-- Match.py # python 客户端代码
|-- __init__.py
|-- constants.py
`-- ttypes.py

gen-cpp 文件夹改名为 match_client 作为匹配系统的 client 端

Match-remote 用不到可以删掉

创建客户端逻辑文件 client.py,从官网 Tutorial 中复制修改。修改成从终端读入操作,客户端搞定!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from match_client.match import Match
from match_client.match.ttypes import User

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

# 从终端读入内容
from sys import stdin


def operate(op, user_id, username, score):
# Make socket
transport = TSocket.TSocket('localhost', 9090)

# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)

# Wrap in a protocol
protocol = TBinaryProtocol.TBinaryProtocol(transport)

# Create a client to use the protocol encoder
client = Match.Client(protocol)

# Connect!
transport.open()

user = User(user_id, username, score)

if op == "add":
client.add_user(user, "")
elif op == "remove":
client.remove_user(user, "")

# Close!
transport.close()


def main():
# 从终端中读入操作
for line in stdin:
op, user_id, username, score = line.split(' ')
operate(op, int(user_id), username, int(score))


if __name__ == "__main__":
main()

4、完善 main.cpp ,Match Server 2.0,开一个线程去执行匹配任务,同时主线程去和客户端通信。

需要用到锁、条件变量来进行线程同步

匹配逻辑:每次取匹配池里的前两名玩家进行匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace ::match_service;
using namespace std;

struct Task
{
User user;
string type;
};

struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;

class Pool
{
public:
void save_result(int a, int b)
{
printf("Match Result: % d %d\n", a, b);
}

void match()
{
while (users.size() > 1)
{
auto a = users[0], b = users[1];
users.erase(users.begin());
users.erase(users.begin());

save_result(a.id, b.id);
}
}

void add(User user)
{
users.push_back(user);
}

void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i ++ )
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
break;
}
}

private:
vector<User> users;
}pool;



class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}

int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");

unique_lock<mutex> lck(message_queue.m); // 为消息队列加锁,函数退出会自动解锁
message_queue.q.push({user, "add"});
message_queue.cv.notify_all(); // 唤醒所有被阻塞的线程

return 0;
}

int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");

unique_lock<mutex> lck(message_queue.m); // 加锁,两个方法同时只能有一个方法可以获取到消息队列的锁
message_queue.q.push({user, "remove"});
message_queue.cv.notify_all(); // 删除也是一个任务,也需要唤醒所有被阻塞的线程

return 0;
}

};

void consume_task()
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty())
{
message_queue.cv.wait(lck); // 阻塞当前线程,等待唤醒
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 解锁
// do task

if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);

pool.match();
}
}
}

int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

cout << "Start Match Server" << endl;

// 开一个线程执行匹配任务
thread matching_thread(consume_task);

TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
server.serve();
return 0;
}

我们只修改了 main.cpp,所以只需要编译 main.cpp 即可

  • 编译:g++ -c main.cpp

  • 链接:g++ *.o -o main -lthrift -pthread-pthread 是线程动态链接库

5、在 match_server 端调用远程保存数据服务,所以需要创建保存数据客户端 save_client

thrift 目录下定义 save.thrift 接口

1
2
3
4
5
6
7
8
9
namespace cpp save_service

service Save {
# username: myserver的名称
# password: myserver的密码的md5sum的前8位
# 用户名密码验证成功会返回0,验证失败会返回1
# 验证成功后,结果会被保存到myserver:homework/lesson_6/result.txt中
i32 save_data(1: string username, 2: string password, 3: i32 player1_id, 4: i32 player2_id)
}

获取密码

  • 首先 homework 4 getinfo,获取用户名 ip 地址和密码
  • 然后输入 md5sum 命令,输入密码,按 Ctrl + d 生成加密后的密码,取前 8 位。

match_server 端

image-20220403110434884.png

match_client 端

image-20220403110537310.png

myserver 端

image-20220403110609181.png

main.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include "save_client/Save.h"

#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace ::match_service;
using namespace ::save_service;

using namespace std;


struct Task
{
User user;
string type;
};

struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;

// 玩家池
class Pool
{
public:
void save_result(int a, int b)
{
printf("Match Result: %d %d\n", a, b);
// myserver 的 ip 地址
std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
SaveClient client(protocol);

try {
transport->open();

// 通过 myserver 用户名和加密后的密码,才能把数据保存到 myserver 的 result.txt 上
int res = client.save_data("acs_4851", "c4cd762a", a, b);
// if (!res) puts("success");
// else puts("failed");


transport->close();
} catch (TException& tx) {
cout << "ERROR: " << tx.what() << endl;
}
}

void match()
{
while (users.size() > 1)
{
auto a = users[0], b = users[1];
users.erase(users.begin());
users.erase(users.begin());

save_result(a.id, b.id);
}
}

void add(User user)
{
users.push_back(user);
}

void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i ++ )
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
break;
}
}

private:
vector<User> users;
}pool;

class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}

int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");

unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "add"});
// 唤醒
message_queue.cv.notify_all();

return 0;
}

int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");

unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "remove"});
// 唤醒
message_queue.cv.notify_all();

return 0;
}

};

void consume_task()
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty())
{
// 等待
message_queue.cv.wait(lck);
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
// 解锁
lck.unlock();

// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);

pool.match();
}
}
}

int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);


cout << "Start Match Server" << endl;

// 开线程
thread matching_thread(consume_task);

server.serve();
return 0;
}

6、继续完善 main.cpp 匹配逻辑 Match Server 3.0,改为每 1s 钟匹配一次,发现两名玩家分值差小于等于 50 就匹配成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 玩家池
class Pool
{
public:
...
void match()
{
while (users.size() > 1)
{
sort(users.begin(), users.end(), [](User& a, User& b){
return a.score < b.score;
});

bool flag = true;
for (uint32_t i = 1; i < users.size(); i ++ )
{
auto a = users[i - 1], b = users[i];
if (b.score - a.score <= 50)
{
users.erase(users.begin() + i - 1, users.begin() + i + 1);
save_result(a.id, b.id);

flag = false;
break;
}
}
if (flag) break;
}
}
...

private:
vector<User> users;
}pool;

完整代码

match_server 端

image-20220403112113226.png

match_client 端

image-20220403112124384.png

7、继续完善 main.cpp 匹配逻辑 Match Server 4.0,将原来的单线程匹配升级为多线程。每次调用 consume_task 匹配函数就新建一个线程。

原来使用的是 TSimpleServer,简单版的 Server,现在改用 TThreadServer(T - Thrift)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int main(int argc, char **argv) {
TThreadedServer server(
std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()),
std::make_shared<TServerSocket>(9090), //port
std::make_shared<TBufferedTransportFactory>(),
std::make_shared<TBinaryProtocolFactory>());

cout << "Start Match Server" << endl;

// 开线程
thread matching_thread(consume_task);

server.serve();
return 0;
}

还需要添加一个工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class MatchCloneFactory : virtual public MatchIfFactory {
public:
~MatchCloneFactory() override = default;
MatchIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
{
std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport);
/*cout << "Incoming connection\n";
cout << "\tSocketInfo: " << sock->getSocketInfo() << "\n";
cout << "\tPeerHost: " << sock->getPeerHost() << "\n";
cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
cout << "\tPeerPort: " << sock->getPeerPort() << "\n";*/
return new MatchHandler;
}
void releaseHandler(MatchIf* handler) override {
delete handler;
}
};

另外需要替换一些变量名,将 Calculator 替换成 Match完整代码

8、终极版,在第 6 步的基础上继续完善 main.cpp 匹配逻辑,原来的匹配条件是如果两个人的分支相差不超过 50 就匹配成功,但这种实现存在问题,比如当前匹配池中有两名玩家,一个 1000 分一个 500 分,在没有其他玩家参与的情况下,这两名玩家将永远不会匹配成功,这是不合理的。

所以我们可以添加等待时间机制,在等待一段时间后,还没能匹配成功的话,就将他两匹配到一起。

具体逻辑:

  • 每个玩家添加绑定一个等待时间,每等一秒就将分值扩大 50
  • 如果此时有两个玩家的分值都满足可以接收的差值,就匹配成功

main.cpp 匹配逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
bool check_match(uint32_t i, uint32_t j)
{
auto a = users[i], b = users[j];

int dt = abs(a.score - b.score);
int a_max_diff = wt[i] * 50; // 每等待 1s 扩大 50 分差
int b_max_diff = wt[i] * 50;

return dt <= a_max_diff && dt <= b_max_diff;

}

void match()
{
for (uint32_t i = 0; i < wt.size(); i ++ )
wt[i] ++ ; // 表示等待秒数 + 1

while (users.size() > 1)
{
bool flag = true;
for (uint32_t i = 0; i < users.size(); i ++ )
{
for (uint32_t j = i + 1; j < users.size(); j ++ )
{
if (check_match(i, j))
{
auto a = users[i], b = users[j];
users.erase(users.begin() + j); // 将两名玩家删除,先删后面的,再删前面的,防止下标变化
users.erase(users.begin() + i);
wt.erase(wt.begin() + j); // 将两名玩家从等待数组中删除
wt.erase(wt.begin() + i);
save_result(a.id, b.id);
flag = false;
break;
}
}
if (!flag) break;
}
if (flag) break;
}
}

再修改 add_user()remove_user() 方法,添加等待数组的添加和删除。

完整代码

作业代码

仓库地址:https://git.acwing.com/tonngw/thrift_lesson

成功截图

image-20220403131055315.png

Author: tonngw
Link: https://tonngw.com/2022/04/04/AcWing/第 6 讲 Thrift/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.