skynet范例研究-服务端

服务端代码踩了不少坑,其中在学习lua时还碰到坑爹的教程错误,而且skynet还没有api文档,初次看服务端源码时,会有些摸不着头脑,傻傻分不清哪部分属于skynet功能,哪部分属于自定义封装,有些时候不得不打开skynet源码查看,这时skynet的另一个优点就体现出来了,skynet核心源码并不多,要查起来也不会特别费时间,不过还是希望能有个详细的api文档,至少对初学者来说会方便不少,降低门槛。

下面开始分析范例中的服务端代码。 范例github地址:

https://github.com/cloudwu/skynet_sample

源码分为3个文件夹,分别为service、lualib、src。
其中service主要是服务端业务逻辑,lualib为基础工具封装,src为c语言服务封装

一般阅读代码时先从main入手,跟着逻辑一步一步不断深入。

main.lua

文件路径:service/main.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
local skynet = require "skynet"
skynet.start(function()
skynet.error("Server start") -- 打印Server start
if not skynet.getenv "daemon" then
local console = skynet.newservice("console") -- 创建控制台
end
skynet.newservice("debug_console",8000) -- 启动控制台服务
local proto = skynet.uniqueservice "protoloader" -- 启动封装的protoloader服务
skynet.call(proto, "lua", "load", { -- 调用protoloader服务的load接口
"proto.c2s", -- 客户端 to 服务器
"proto.s2c", -- 服务器 to 客户端
})
local hub = skynet.uniqueservice "hub" -- 启动hub服务
skynet.call(hub, "lua", "open", "0.0.0.0", 5678) -- 调用hub服务中的open接口,监听5678网络端口
skynet.exit()
end)

入口函数很简洁,首先启动了控制台服务,该服务使得skynet在运行时,可以打印log以及信息到终端界面。
protoloader服务为proto协议的封装,注意这里的protoloader为自定义封装,而不是skynet核心代码里自带的那个。
首先调用skynet.uniqueservice挂起了protoloader服务,所以接下来看看protoloader是怎么运行的。

protoloader.lua

文件路径:service/protoloader.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
local skynet = require "skynet"
local sprotoparser = require "sprotoparser" --加载skynet/lualib下的sproto解析器
local sprotoloader = require "sprotoloader" --sproto加器器
local service = require "service" --加载范例根目录lualib下的service
local log = require "log" --加载范例根目录lualib下的log
local loader = {} --保存函数
local data = {} --保存加载后的sproto协议在skynet sprotoparser里的序号,key值为文件名
local function load(name)
local filename = string.format("proto/%s.sproto", name)
local f = assert(io.open(filename), "Can't open " .. name)
local t = f:read "a"
f:close() --以上为读取文件内容
return sprotoparser.parse(t) --调用skynet的sprotoparser解析sproto协议
end
function loader.load(list)
for i, name in ipairs(list) do
local p = load(name) --加载sproto协议
log("load proto \[%s\] in slot %d", name, i)
data\[name\] = i
sprotoloader.save(p, i) --保存解析后的sproto协议
end
end
function loader.index(name)
return data\[name\] --返回sproto协议在skynet sprotoloader里序号
end
-- 初始化服务的info信息和函数
service.init {
command = loader,
info = data
}

主要是定义了如何加载proto协议并保存,以及获取协议的编号(slot),然后最后面调用了另一个lua文件的代码service.init方法。

service.lua

文件路径:lualib/service.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
local skynet = require "skynet"
local log = require "log"
local service = {}
-- 初始化服务,主要功能为:1:注册服务info 2:注册服务的命令函数 3:启动服务
function service.init(mod)
local funcs = mod.command
if mod.info then
skynet.info_func(function() --
return mod.info
end)
-- 这里仅作调试用,当在调试模式下,输入 “info 服务ID” 就会打印上面返回的信息
-- 调试模式的启动方法为 nc 127.0.0.1 8000
end
skynet.start(function()
if mod.require then
local s = mod.require
for _, name in ipairs(s) do
service\[name\] = skynet.uniqueservice(name) --启动服务,并将该服务器保存在service下
end
end
if mod.init then
mod.init()
end
skynet.dispatch("lua", function (_,_, cmd, ...) -- 修改lua协议的dispatch函数,对当前调用init的服务注册函数
-- skynet.dispatch函数也是服务启动的结束标示
local f = funcs\[cmd\] --获取命令函数
if f then
skynet.ret(skynet.pack(f(...))) --返回命令调用结果,所有通过ret的返回值都要用pack打包
else
log("Unknown command : \[%s\]", cmd)
skynet.response()(false)
end
end)
end)
end
--[[skynet.ret 在当前协程(为处理请求方消息而产生的协程)中给请求方(消息来源)的消息做回应
skynet.retpack 跟skynet.ret的区别是向请求方作回应时要用skynet.pack打包]]--
return service

该代码做的主要有3个工作,

  • 设定当前skynet服务debug info信息
  • 设定当前skynet服务在接收到其他服务调用信息时的处理,比如上一段protoloader代码中的load函数和index函数。其他服务只要发送”函数名”或者“参数”,即可调用对应的函数。其中一个值得注意的点是,skynet中服务与服务的通信必须通过skynet.retskynet.retpack函数进行打包和封包处理。
  • 加载其他服务,并保存在service这个table中。
  • 往后所有服务的代码都会通过init函数处理info信息、注册函数、加载其他服务。

回到main.lua中,紧接着调用了service.init中设定的load函数。然后挂起了hub服务。 为了不让文章变的太过冗长,下面会省略一些具体实现逻辑,主要以梳理流程为主,只有在关键点才会有详细的解释。

hub.lua

文件路径:service/hub.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
local skynet = require "skynet"
local socket = require "socket"
local proxy = require "socket\_proxy" --加载范例lualib目录下的socket\_proxy
local log = require "log"
local service = require "service"
local hub = {} --保存函数
local data = { socket = {} } --保存监听到的链接
-- 调用auth服务
local function auth_socket(fd)
...
end
local function assign_agent(fd, userid)
...
end
--所有监听到的新链接都会首先交给这个函数处理
function new_socket(fd, addr)
...
end
--打开监听
function hub.open(ip, port)
...
end
--关闭监听
function hub.close()
...
end
--服务初始化,并挂起auth和manager服务
service.init {
...
require = {
"auth",
"manager",
}
}

可以看到没有执行具体逻辑,主要以定义为主,并启动了两个服务,manager和auth。 在最开始local proxy = require “socket_proxy”中还启动了socket_proxyd服务。 不过在这边并有具体的逻辑执行,所以先不管,顺着流程继续看。

监听网络端口

回到main.lua,调用了hub服务中的open函数,开始监听网络端口。来看看该函数的具体实现。

1
2
3
4
5
6
7
8
function hub.open(ip, port)
log("Listen %s:%d", ip, port)
assert(data.fd == nil, "Already open") --判断监听是否打开
data.fd = socket.listen(ip, port) --新建监听端口
data.ip = ip
data.port = port
socket.start(data.fd, new\_socket) --开始监听,将监听到的链接返回到new\_socket函数
end

这时如果客户端发起请求,就会将监听到的链接交给new_socket函数处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
function new_socket(fd, addr)
data.socket\[fd\] = "\[AUTH\]"
proxy.subscribe(fd) --将新链接提交给socketproxy
local ok , userid = pcall(auth_socket, fd)
if ok then
data.socket\[fd\] = userid
if pcall(assign_agent, fd, userid) then
return -- succ
else
...
end
else
...
end
end
```lua
首先调用了socket_proxy.lua中的subscribe函数

_文件路径:lualib/socket_proxy.lua_

```lua
function proxy.subscribe(fd)
local addr = map\[fd\]
if not addr then
addr = skynet.call(proxyd, "lua", fd) --向proxyd 发送命令,创建基于fd链接的服务,详见socket_proxyd.lua
map\[fd\] = addr --保存服务地址
end
end

向proxyd服务发送链接对象
_文件路径:service/socket_proxyd.lua_

1
2
3
4
5
6
7
8
9
10
local function subscribe(fd)
local addr = socket\_fd\_addr\[fd\]
if addr then
return addr --如果连接已经保存则直接返回服务ID
end
addr = assert(skynet.launch("package", skynet.self(), fd)) --创建c语言服务package(连接代理),跑当前服务(self()返回当前服务ID),返回的是c服务的地址
socket\_fd\_addr\[fd\] = addr --保存c服务的地址,key值为链接
socket\_addr\_fd\[addr\] = fd --保存链接,key值为c服务的地址
socket_init\[addr\] = skynet.response() --保存该链接的response函数,回应函数,这里同时会回应之前call过来的服务,告诉他addr
end

这里创建了一个c语言编写的package服务,不必关心具体实现,只要知道它的功能主要是让当前链接成为一个单独的skynet服务即可,返回值是该服务的ID。 最后通过skynet.response函数返回服务ID给调用前的函数,也就是socket_proxy的subscribe。

处理链接数据

一路回到hub.lua的new_socket函数。又将链接提交给auth_socket函数

1
2
3
local function auth_socket(fd)
return (skynet.call(service.auth, "lua", "shakehand" , fd))
end

通过service.call启动service中保存的auth服务,调用其中的shakehand函数,并发送fd链接对象。
文件路径:service/auth.lua

1
2
3
4
function auth.shakehand(fd)
local c = client.dispatch { fd = fd } --将链接交给client对信息进行处理
return c.userid
end

这里可以理解为和客户端一第次“握手”,调用client的dispatch函数,并发送链接。

文件路径:lualib/client.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
--消息处理
function client.dispatch( c )
local fd = c.fd
proxy.subscribe(fd)
local ERROR = {}
while true do
local msg, sz = proxy.read(fd) --读取连接发来的数据
local type, name, args, response = host:dispatch(msg, sz) --sproto解析数据
assert(type == "REQUEST") --此处保证连接数据为请求
if c.exit then --exit参数为退出循环标志
return c
end
local f = handler\[name\] --通过sproto解析出来的数据,获取回调函数
if f then
-- f may block , so fork and run
-- 此处创建一个协程运行回调函数
skynet.fork(function()
local ok, result = pcall(f, c, args) -- 回调函数具体详见agent,auth,回调函数在那边实现
-- 此处回调函数都为显式传参,将c显式传到回调函数中
if ok then
proxy.write(fd, response(result)) -- 使用sproto解析出来的response函数包装返回值,并发送数据
else
log("raise error = %s", result)
proxy.write(fd, response(ERROR, result))
end
end)
else
-- unsupported command, disconnected
error ("Invalid command " .. name)
end
end
end

这个函数为消息处理最关键的函数,所有请求都通过这里进行分发,其中回调函数都保存在handler这个table中,当前范例的回调函数都在agent.lua和,auth.lua中定义。 通过代码可以看到,该函数是一个死循环,也就是说,会不断的读取客户端发送过来的信息进行处理。只有当exit变量为ture时才退出,这个exit怎么来的先不管。 接下来通过客户端发送的请求流程继续梳理。

网络请求流程

回顾下客户端的消息流程: 首先会发送signin请求,发现sign失败,用户不存在,继续发送signup请求注册用户,注册成功后再次发送signin请求,登入成功,接着会发送login请求,后面的就是正常的业务请求了。请求顺序如下: signin > signup > signin > login > other 一步一步来,首先是signin,回到上面的dispatch,通过请求信息调用名为signin回调函数。signin的回调实现在auth.lua中
文件路径:service/auth.lua

1
2
3
4
5
6
7
8
9
10
11
-- signin登入请求回调
function cli:signin(args)
log("signin userid = %s", args.userid)
if users\[args.userid\] then
self.userid = args.userid --self为修改隐式参数
self.exit = true --退出client中dispatch循环,表示登入成功,退出auth服务,进入下一个服务
return SUCC
else
return FAIL
end
end

可以看到,当客户端第一次请求signin时,是返回失败FAIL的。这时又会回到client的dispatch函数,返回客户端注册失败消息后,继续读取链接信息。 当客户端接到signin失败后,会紧接着发送signup请求。 同样来到auth.lua

1
2
3
4
5
6
7
8
9
10
-- signup注册账号请求回调
function cli:signup(args)
log("signup userid = %s", args.userid)
if users\[args.userid\] then
return FAIL
else
users\[args.userid\] = true
return SUCC
end
end

这时就回返回成功,回到dispath发送注册成功消息。 接下来客户端再次发送sigin请求就会signin成功。 要注意的是,在signin回调函数中,如果signin成功则会修改exit变量,这会导致退出client的dispatch循环。为什么这里能修改exit变量呢,原因是所有回调函数都是隐式传参的,在函数定义中使用了“冒号”,表示隐式传送self参数。在dispatch中可以看到调用回调时,都会传进c变量。pcall(f, c, args) 登入成功后就会退出dispatch的循环,那么跟踪代码退到hub中的new_socket方法,发现又调用了assign_agent函数。

1
2
3
local function assign_agent(fd, userid)
skynet.call(service.manager, "lua", "assign", fd, userid)
end

assign_gent则调用了service中保存的manager服务,同时调用manage服务中的assign方法,并传入链接对象和登入成功返回的userid。
文件路径:service/manager.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function manager.assign(fd, userid)
local agent
repeat
agent = users\[userid\] --判断是否有当前用户的服务
if not agent then --若没有则创建一个
agent = new_agent()
if not users\[userid\] then
-- double check
users\[userid\] = agent
else
free_agent(agent)
agent = users\[userid\]
end
end
until skynet.call(agent, "lua", "assign", fd, userid) --此处返回ture,跳出循环
log("Assign %d to %s \[%s\]", fd, userid, agent)
end

顺着流程走,由于客户端第一次登入,会调用new_agent方法。

1
2
3
4
local function new_agent()
-- todo: use a pool
return skynet.newservice "agent"
end

这里会为每个用户启动一个agent服务。 接着until中会调用该agent服务的assign方法。
文件路径:service/agent.lua

1
2
3
4
5
function agent.assign(fd, userid)
...
skynet.fork(new_user, fd)
return true
end

做了很简单一件事,调用了new_user这个方法,这里要注意的是,使用了skynet.fork方法调用,该方法的作用是,不会阻塞线程,为什么要这么做接着往后看。

1
2
3
4
local function new_user(fd)
local ok, error = pcall(client.dispatch , { fd = fd }) --进入客户端消息循环,若此处客户端长时间没有任何操作,而报超时错误返回
...
end

可以看到,这里调用了client中的dispatch,前面知道该函数是个死循环,这就是为什么之前使用了skynet.fork的原因了。 好了,这样就又进入消息处理循环了,之前客户端已经发送了signin,signup,signin,这三个请求,接下来会发送login请求,去到login请求回调。该回调定义在agent.lua中。

1
2
3
4
5
6
7
8
9
10
11
12
function cli:login()
assert(not self.login)
if data.fd then --重复登入
log("login fail %s fd=%d", data.userid, self.fd)
return { ok = false }
end
data.fd = self.fd
self.login = true
log("login succ %s fd=%d", data.userid, self.fd)
client.push(self, "push", { text = "welcome" }) -- push message to client
return { ok = true }
end

主要就是调用client.push函数发送welcome信息,再来看看client中push函数的实现

1
2
3
function client.push(c, t, data)
proxy.write(c.fd, sender(t, data))
end

调用socket_proxy中的write

1
2
3
function proxy.write(fd, msg, sz)
skynet.send(get_addr(fd), "client", msg, sz)
end

这里用到了skynet.send函数,第一参数是服务地址,就是之前创建的package服务。 以上就是范例中服务端代码的主要逻辑。

另外我对服务端代码进行了大部分注释。
链接: http://pan.baidu.com/s/1dFKD2oh 密码: rfc9

扩展阅读

[skynet 的一个简单范例-云风的BLOG]
(http://blog.codingnow.com/2016/06/skynet_sample.html)
skynet LuaAPI
skynet API参考
skynet github 项目

  • 本文作者: Tshine Zheng
  • 本文链接: 399.html
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!