引言
tars序列化过程:TARS编码协议是一种数据编解码规则,它将整形、枚举值、字符串、序列、字典、自定义结构体等数据类型按照一定的规则编码到二进制数据流中。对端接收到二进制数据流之后,按照相应的规则反序列化可得到原始数值。简单理解,TARS编码协议提供了一种将数据序列化、反序列化的方法。其角色和我们认识的protobuf、json、xml等同。
流程分析
1.客户端原始请求数据---->序列化---->服务端
借助于tars文件生成的头文件,用代理调用相应的接口,该接口利用tars_invoke(invoke 调用)即可完成序列化传输。
class ConfigAdminProxy : public tars::ServantProxy
{
public:
typedef map<string, string> TARS_CONTEXT;
tars::Int32 AddConfig(const tars::AddConfigInfo & config,std::string &result,const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseContext = NULL)
{
tars::TarsOutputStream<tars::BufferWriter> _os;
_os.write(config, 1);
_os.write(result, 2);
tars::ResponsePacket rep;
std::map<string, string> _mStatus;
tars_invoke(tars::TARSNORMAL,"AddConfig", _os.getByteBuffer(), context, _mStatus, rep);
if(pResponseContext)
{
*pResponseContext = rep.context;
}
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(rep.sBuffer);
tars::Int32 _ret;
_is.read(_ret, 0, true);
_is.read(result, 2, true);
return _ret;
} //只显示了一个类成员函数
_objectProxy->getProxyProtocol().requestFunc(msg->request, msg->sReqData);
其中msg中包含成员 RequestPacket request(请求消息体)和ResponsePacket response(响应消息体)。
这里的resquestFunc来自ProxyProtocol::tarsResponse, 即如下所示:
_proxyProtocol.requestFunc = ProxyProtocol::tarsRequest;
void ProxyProtocol::tarsRequest(const RequestPacket& request, string& buff)
{
TarsOutputStream<BufferWriter> os;
request.writeTo(os);
tars::Int32 iHeaderLen = htonl(sizeof(tars::Int32) + os.getLength());
buff.clear();
buff.reserve(sizeof(tars::Int32) + os.getLength());
buff.append((const char*)&iHeaderLen, sizeof(tars::Int32));
buff.append(os.getBuffer(), os.getLength());
}
_objectProxy->getProxyProtocol().requestFunc(msg->request, msg->sReqData);
std::unique_ptr_trans //客户端利用Transceiver类收发解析请求 。
_trans->sendRequest(msg->sReqData.c_str(),msg->sReqData.size())
2、 服务端---->反序列化---->原始请求数据
- 借助于tars文件生成的头文件,用代理调用相应的dispatch接口即可完成反序列化。消息处理线程主循环遍历所有的BindAdapter的消息队列是否有消息。如果没有就wait在_handleGroup的条件变量上(插入消息后会被触发)。wait返回后上报心跳处理业务自有消息等,而后遍历所有BindAdapter的消息队列,取出收包消息。对于状态正常的消息(非超时、过载等)调用ServantHandle的handle方法处理消息,根据消息类型是否为tars分别调用不同的处理方法进行处理,并根据数据包的Servant名找到Servant实体,调用ondispatch方法(业务实现,传入current结构(内含数据buffer),以及用作返回的vector结构)处理消息。
- onDispatch方法由tars2cpp工具自动生成的servant实现。它根据请求包中的函数名得知需要调用的函数,并将请求数据反序列化成对应的参数,然后调用相应的函数进行处理。也就是说,onDispatch负责对请求数据进行分发并处理。Servant实体处理完消息后回到ServantHandle处理线程,ServantHandle线程调用TC_EpollServer中的Send将待发消息buffer传输(ServantHandle线程不知道消息应该通过哪个网络线程返回),TC_EpollServer通过连接fd获得管理这个fd连接的网络线程,将消息buffer交给该网络线程处理。网络线程将消息buffer构造成tagSendData消息包放在自己的发送队列中。(详细的过程参考这两个函数的详解)。网络线程通过epoll_wait notify触发网络线程线程函数响应epoll,将消息队列中的包发送出去。
int onDispatch(tars::TarsCurrentPtr _current, vector<char> &_sResponseBuffer)
{
static ::std::string __tars__ConfigAdmin_all[]=
{
"AddConfig",
.......
};
pair<string*, string*> r = equal_range(__tars__ConfigAdmin_all, __tars__ConfigAdmin_all+14, _current->getFuncName());
if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
switch(r.first - __tars__ConfigAdmin_all)
{
case 0:
{
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(_current->getRequestBuffer());
tars::AddConfigInfo config;
std::string result;
if (_current->getRequestVersion() == TUPVERSION)
{
UniAttribute<tars::BufferWriter, tars::BufferReader> tarsAttr;
tarsAttr.setVersion(_current->getRequestVersion());
tarsAttr.decode(_current->getRequestBuffer());
tarsAttr.get("config", config);
tarsAttr.getByDefault("result", result, result);
}
else
{
_is.read(config, 1, true);
_is.read(result, 2, false);
}
tars::Int32 _ret = AddConfig(config,result, _current);
if(_current->isResponse())
{
if (_current->getRequestVersion() == TUPVERSION )
{
UniAttribute<tars::BufferWriter, tars::BufferReader> tarsAttr;
tarsAttr.setVersion(_current->getRequestVersion());
tarsAttr.put("", _ret);
tarsAttr.put("result", result);
tarsAttr.encode(_sResponseBuffer);
}
else
{
tars::TarsOutputStream<tars::BufferWriter> _os;
_os.write(_ret, 0);
_os.write(result, 2);
_os.swap(_sResponseBuffer);
}
}
return tars::TARSSERVERSUCCESS;
}
服务端利用onDispatch接口处理收到的二进制请求流,从_current中得到请求数据,然后把本地的调用结果放到vector & _sResponseBuffer中返回。
3、 服务端原始返回数据---->序列化----->客户端
RequestF.h 中定义了ResponsePacket结构体。
//位置:cpp/servant/libservant/TarsCurrent.cpp 221
void TarsCurrent::sendResponse(int iRet, const vector<char>& buffer, const map<string, string>& status, const string & sResultDesc)
{
//省略部分代码
………………
TarsOutputStream<BufferWriter> os;
if (_request.iVersion != TUPVERSION)
{
//将数据放到ResponsePacket结构中
ResponsePacket response;
response.iRequestId = _request.iRequestId;
response.iMessageType = _request.iMessageType;
response.cPacketType = TARSNORMAL;
response.iVersion = TARSVERSION;
response.status = status;
response.sBuffer = buffer;
response.sResultDesc = sResultDesc;
response.context = _responseContext;
response.iRet = iRet;
TLOGINFO("[TARS]TarsCurrent::sendResponse :"
<< response.iMessageType << "|"
<< _request.sServantName << "|"
<< _request.sFuncName << "|"
<< response.iRequestId << endl);
//调用序列化方法,response中的数据都保存在了os中,调用了tars.h中的write接口
response.writeTo(os);
}
//省略部分代码
…………………………
//获取内容长度
tars::Int32 iHeaderLen = htonl(sizeof(tars::Int32) + os.getLength());
string s = "";
//返回的s的格式是内容长度+内容
s.append((const char*)&iHeaderLen, sizeof(tars::Int32));
s.append(os.getBuffer(), os.getLength());
_servantHandle->sendResponse(_uid, s, _ip, _port, _fd);
}
4.客户端----->反序列化----->原始返回数据
//位置:cpp/servant/libservant/Transceiver.cpp 331
int TcpTransceiver::doResponse(list<ResponsePacket>& done)
{
…………
if(!_recvBuffer.IsEmpty())
{
try
{
//接收到的服务端的序列化好的数据
const char* data = _recvBuffer.ReadAddr();
size_t len = _recvBuffer.ReadableSize();
size_t pos = 0;
//获取协议封装类
ProxyProtocol& proto = _adapterProxy->getObjProxy()->getProxyProtocol();
if (proto.responseExFunc)
{
long id = _adapterProxy->getId();
//将data反序列化到done中
pos = proto.responseExFunc(data, len, done, (void*)id);
}
…………
}
}
这里的responseExFunc来自ProxyProtocol::tarsRespons(cpp/servant/AppProtocal.h 398)
template<uint32_t iMaxLength>
static size_t tarsResponseLen(const char* recvBuffer, size_t length, list<ResponsePacket>& done)
{
…………
TarsInputStream<BufferReader> is;
//将数据放入is中
is.setBuffer(recvBuffer + pos + sizeof(tars::Int32), iHeaderLen - sizeof(tars::Int32));
pos += iHeaderLen;
//将is中的数据进行反序列化,填充到rsp中
ResponsePacket rsp;
rsp.readFrom(is);
…………//在tars生成的代理接口函数中对rsp解析RPC调用的返回值。
}
从上面代码中可以看出:
序列化数据使用的是:ResponsePacket.writeTo()
反序列化数据使用的是:ResponsePacket.readFrom()
协议分析:
把结构化数据序列化,用大白话解释就是想办法把不同类型的数据按照顺序放在一个字符串里。反序列化就是还能从这个字符串里把类型和数据正确解析出来。一般来说,要达成正确的效果,有三个因素是必须考虑的:
- 标记数据的位置。例如是位于字符串头部还是字符串末尾,或者中间某个部分
- 标记数据的类型,例如int char float vector等
- 标记数据内容
Tars协议也跳不出这个基本规则,它的数据是由两部分组成:
| HEAD | BUF |
- HEAD为头部信息(包含了数据位置和数据类型),BUF为实际数据。注意BUF里可以继续嵌套| HEAD | BUF |这样的类型,以满足复杂数据结构的需要
像char、short、int之类的简单类型时,只需要:| HEAD | BUF |
当数据类型为vector< char >时,就变为了| HEAD1 | HEAD2 | BUF |。这时候HEAD1 存储vector类型,HEAD2 存储char类型
我们再具体看下HEAD中包括的内容:
| TAG1(4 bits) | TYPE(4 bits) | TAG2(1 byte或者8 bits)
- TYPE表示类型,用4个二进制位表示,取值范围是0~15,用来标识数据类型。下面的Tars官方代码标明了具体数据类型的TYPE值
//位置:/cpp/servant/tup/Tars.h 60行
//数据头类型
#define TarsHeadeChar 0
#define TarsHeadeShort 1
#define TarsHeadeInt32 2
#define TarsHeadeInt64 3
#define TarsHeadeFloat 4
#define TarsHeadeDouble 5
#define TarsHeadeString1 6
#define TarsHeadeString4 7
#define TarsHeadeMap 8
#define TarsHeadeList 9
#define TarsHeadeStructBegin 10
#define TarsHeadeStructEnd 11
#define TarsHeadeZeroTag 12
#define TarsHeadeSimpleList 13