OpenTSDB Code Reading: 2. Server Framework

入口: net.opentsdb.tools.TSDMain

Startup Plugin

代码文件 net.opentsdb.tools StartupPlugin.java

用途:

The StartupPlugin allows users to interact with the OpenTSDB configuration as soon as it is completely parsed, just before OpenTSDB begins to use it.
服务初始化时,准备提供服务时,退出时,会调用plugin接口。代码中没有提供已实现的Startup Plugin。

配置项:

1
2
3
tsd.startup.enable
tsd.core.plugin_path
tsd.startup.plugin

Reference:

Server Framework

服务使用Netty实现,其Channel Pipeline的框架实现在net.opentsdb.tsd PipelineFactory.java

根据文档,服务支持两类接口HTTP & RPC,二者使用同一端口,区分方式是根据请求数据的第一个字节,如果第一个字节是['A', 'Z']之间的字符,则认为是一个HTTP方式的请求,否则是RPC请求。参考代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// File: PipelineFactory.java
// Line: 132
/**
* Dynamically changes the {@link ChannelPipeline} based on the request.
* If a request uses HTTP, then this changes the pipeline to process HTTP.
* Otherwise, the pipeline is changed to processes an RPC.
*/
final class DetectHttpOrRpc extends FrameDecoder {

@Override
protected Object decode(final ChannelHandlerContext ctx,
final Channel chan,
final ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 1) { // Yes sometimes we can be called
return null; // with an empty buffer...
}

final int firstbyte = buffer.getUnsignedByte(buffer.readerIndex());
final ChannelPipeline pipeline = ctx.getPipeline();
// None of the commands in the RPC protocol start with a capital ASCII
// letter for the time being, and all HTTP commands do (GET, POST, etc.)
// so use this as a cheap way to differentiate the two.
if ('A' <= firstbyte && firstbyte <= 'Z') {
pipeline.addLast("decoder", new HttpRequestDecoder());
if (tsdb.getConfig().enable_chunked_requests()) {
pipeline.addLast("aggregator", new HttpChunkAggregator(
tsdb.getConfig().max_chunked_requests()));
}
// allow client to encode the payload (ie : with gziped json)
pipeline.addLast("inflater", new HttpContentDecompressor());
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("deflater", new HttpContentCompressor());
} else {
pipeline.addLast("framer", new LineBasedFrameDecoder(1024));
pipeline.addLast("encoder", ENCODER);
pipeline.addLast("decoder", DECODER);
}

pipeline.addLast("timeout", timeoutHandler);
pipeline.remove(this);
pipeline.addLast("handler", rpchandler);

// Forward the buffer to the next handler.
return buffer.readBytes(buffer.readableBytes());
}

}

根据上述代码,我们可以总结两种接口类型的处理Pipeline如下:

  • HTTP Protocol
    1. connmgr
    2. detect
    3. decoder
    4. aggregator(optional)
    5. inflater
    6. encoder
    7. deflater
    8. timeout
    9. handler
  • RPC Protocol
    1. connmgr
    2. detect
    3. framer
    4. encoder
    5. decoder
    6. timeout
    7. handler

Handler Task

HTTP Protocol

connmgr

Class: net.opentsdb.tsd.ConnectionManager

Keeps track of all existing connections.
Limits the max connections established.

detect

File: PipelineFactory.java class DetectHttpOrRpc

根据请求数据判断协议类型,并添加后续的pipeline handler。

decoder

Class: org.jboss.netty.handler.codec.http.HttpRequestDecoder

官方的请求解析类。

aggregator

Class: org.jboss.netty.handler.codec.http.HttpChunkAggregator

inflater

Class: org.jboss.netty.handler.codec.http.HttpContentDecompressor

encoder

Class: org.jboss.netty.handler.codec.http.HttpResponseEncoder

deflater

Class: org.jboss.netty.handler.codec.http.HttpContentCompressor

timeout

Class: org.jboss.netty.handler.timeout.IdleStateHandler

handler

Class: net.opentsdb.tsd.RpcHandler
因为 HTTP/RPC handler 不同,解析出来的message类型也不同,该类根据消息类型在net.opentsdb.tsd.RpcManager中查询对应的 RPC(Telnet)/HTTP 指令执行函数并执行。

RpcManager中对HTTP官方指令和插件指令各单独维护了一个Map
,二者请求的区别在于uri前缀,如果uri前缀为plugin,则认为是请求插件中提供的功能。然后在对应的Map中查询然后执行。每个Plugin对应一个uri(route)。

1
2
3
4
5
# 请求官方指令
/api/put

# 请求插件指令
/plugin/plugin-path

OpenTSDB还支持RPC Plugin,该插件的实现逻辑与HTTP Protocol不同,Plugin实现自己独立的服务框架,与主框架间仅仅是共享一个net.opentsdb.core.TSDB对象。参考RpcPlugin抽象基类定义:RpcPlugin.java

RPC Protocol

与HTTP Protocol相同的handler此处就不再列出。

framer

Class: net.opentsdb.tsd.LineBasedFrameDecoder

Decodes telnet-style frames delimited by new-lines.

encoder

Class: org.jboss.netty.handler.codec.string.StringEncoder

decoder

Class: net.opentsdb.tsd.WordSplitter

Splits a ChannelBuffer in multiple space separated words.

References: