Upgrading To A New Nginx Binary In Different Place On The Fly

Recently we need use some features which provided by the new Nginx release. And the current package is provided by the company's system team and they don't provide a new version package. So we need build it by ourselves.

Because the orignal package is installed by yum and we want to deploy new package by different method, so putting the new package in different place maybe better. So we face such a problem:

We want to upgrade Nginx to a new version, and the new version binary and all its dependency is put in different place. In the same time, the server must provide serivce continous without downtime.

How To Upgrade Nginx In-Place Without Dropping Client Connections talks about upgrading nginx in-place.

Nginx release 1.9.1 introduces a new feature that enables use of SO_REUSEPORT. With this we can complete the task.

Operation Steps

First we compile new Nginx release and set absolute path for --prefix etc. Deploy the output on the machine. Enables reuseport directive for new packages.

Copy <NEW_PACKAGE>/sbin/nginx to <OLD_PACKAGE>/sbin/nginx, and upgrade Nginx with the new <OLD_PACKAGE>/sbin/nginx binary using the methods from How To Upgrade Nginx In-Place Without Dropping Client Connections.

Start another Nginx master using <NEW_PACKAGE>/sbin/nginx. Because reuseport enabled, we can start two masters by hand. And then stop master launched by <OLD_PACKAGE>/sbin/nginx gracefully. After 'old' nginx master stopped, <NEW_PACKAGE>/run/nginx.pid maybe deleted, because we have two masters write its process id into same file. We just generate this file for the last master by hand.

Remove reuseport directive in configure file, and then reload Nginx.

OpenTSDB Code Reading: 4. Query Command

http-api: api/query
code: class net.opentsdb.tsd.QueryRpc
document: HTTP API, Query

查询DEMO

1
2
3
4
5
6
7
/api/query?start=2018/03/27-16:30:02&m=sum:proc.net.bytes

/api/query?start=2018/03/27-16:20:02&m=sum:proc.net.bytes{direction=in}

/api/query?start=2018/03/27-16:20:02&m=none:1m-avg-none:proc.net.bytes{direction=in}

/api/query?start=2018/03/27-16:30:02&tsuid=sum:00015D00000100000100000A00007F00000B000081,00015D00000100000100000A00007F00000B000080

解析请求

解析请求参数,构造、填充到 net.opentsdb.core.TSQuery 对象中。
其中Metric & TSUID参数的核心解析代码位于:

1
2
net.opentsdb.tsd.QueryRpc::parseMTypeSubQuery
net.opentsdb.tsd.QueryRpc::parseTsuidTypeSubQuery

涉及的类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 对应额一次 http/telnet 请求
net.opentsdb.core.TSQuery

// 对应一次请求中的一个 query
net.opentsdb.core.TSSubQuery

// A query to retrieve data from the TSDB.
// interface
net.opentsdb.core.Query

// Non-synchronized implementation of interface Query.
// 与 TSSubQuery 一一对应
net.opentsdb.core.TsdbQuery

//
net.opentsdb.query.filter.TagVFilter

每个query真正执行入口位于:

1
net.opentsdb.core.TsdbQuery::runAsync()

OpenTSDB 通过 scanner 过滤 time-range, metric, tagKV 筛选符合要求的数据,然后在本地再进行 aggregate, downsampling。

OpenTSDB Code Reading: 3. Commands

Class: net.opentsdb.tsd.RpcManager

该类维护了HTTP/RPC Protocol下指令,以及具体执行对象的信息。

Commands

Write

put

telnet: put
http-api: api/put
code: class PutDataPointRpc

qualifier 格式

Long比特位格式:
bit[0:2] 保存value length
bit[3] Float Flag
毫秒时间戳
bit[0:5] 保存flag
bit[6:27] 保存真实时间戳
bit[28:31] 全为1,java为大端,该信息保存在bytes[0]位置
秒时间戳
bit[0:3] 保存flag
bit[4:] 保存真实时间戳

秒与毫秒时间戳的区分
1
2
/** Mask to verify a timestamp on 4 bytes in seconds */
public static final long SECOND_MASK = 0xFFFFFFFF00000000L;

timestamp不超过~SECOND_MASK,则认为时间戳单位为秒。否则为毫秒。
参考代码:link

append模式与普通模式

append模式下,统一小时内的数据都追加到同一列中,qualifer为0x050000,共三字节。

1
2
3
4
5
6
7
8
9
10
11
// File: AppendDataPoints.java
// Line: 41
public class AppendDataPoints {
private static final Logger LOG = LoggerFactory.getLogger(AppendDataPoints.class);

/** The prefix ID of append columns */
public static final byte APPEND_COLUMN_PREFIX = 0x05;

/** The full column qualifier for append columns */
public static final byte[] APPEND_COLUMN_QUALIFIER = new byte[] {
APPEND_COLUMN_PREFIX, 0x00, 0x00};

References:

compaction

class:

  • net.opentsdb.core.CompactionQueue
  • net.opentsdb.core.CompactionQueue.Compaction
  • net.opentsdb.core.CompactionQueue.ColumnDatapointIterator

非append模式下,每次有新数据写入,就会将rowkey插入到一个CompactionQueue,然后定期处理,该对象是一个Sorted Set,插入时有去重。

CompactionQueue初始化时会启动一个thread处理任务。当前时间 - baseTime(rowkey) > 3600秒,则会进行处理。

Read

Stats

telnet: stats
http-api: api/stats
http-ui: stats
code: class StatsRpc

class: net.opentsdb.stats.StatsCollector
StatsCollector是一个抽象基类,接口实现中初始化一个StatsCollector类型对象,然后在各组件中通过调用collectStats()接口收集状态信息。
StatsCollector默认内部记录统计信息的格式为:

1
stats-name timestamp value tagk=tagv tagk=tav...

返回给用户的,没记录一条信息就会调用StatsCollector::emit(record-str)函数,子类实现emit函数,完成格式的状态。

Drop Caches

telnet: dropcaches
http-api: api/dropcaches
http-ui: dropcaches
code: class DropCachesRpc

metrics/tagk/tagv各对应一个UniqueId Instance,该类内部有 name <-> id 映射关系的缓存,该接口清空这三个对象内部缓存。code link

Version

telnet: version
http-api: api/version
http-ui: version
code: class RpcManager.Version

获取实例的版本信息。

1
2
3
4
5
6
7
8
9
10
11
{
"short_revision": "",
"repo": "/root/opentsdb-2.3.0/build",
"host": "***********",
"version": "2.3.0",
"full_revision": "",
"repo_status": "MODIFIED",
"user": "root",
"branch": "",
"timestamp": "1521782831"
}

通过脚本build-aux/gen_build_data.sh生成BuildData.java文件,运行时读取该类的信息。

Exit

telnet: exit
code: class RpcManager.Exit

关闭连接。

Help

telnet: help
code: class RpcManager.Help

返回telnet_commands列表。

HomePage

http-ui: ""
code: class RpcManager.HomePage

UI主页。

List Aggregators

http-api: api/aggregators
http-ui: aggregators
code: class ListAggregators

显示支持的聚合方式列表。

Static File

http-ui: favicon.ico|s
code: class StaticFileRpc

返回 /s/[path/to/static/file] s之后的uri部分对应的静态文件。

Logs

http-ui: logs
code: class LogsRpc

该接口有两类功能:

  1. 获取日志信息,默认plain text格式,可加参数json。
  2. 设置logger日志等级,参数 level, logger。
    level参数取值

Graph

http-ui: q
code: class GraphHandler

Suggest

http-api: api/suggest
http-ui: suggest
code: SuggestRpc

It's used for auto-complete entries and does not support wildcards.

输入参数:

  • type: metrics, tagk, tagv. required.
  • q: optional, default "".
  • max: optional, default 25.

Annotation

http-api: api/annotation|api/annotations
code: class AnnotationRpc

Show Config

http-api: api/config
code: class RpcManager.ShowConfig

两种请求

  1. api/config
    返回运行实例的配置项。
  2. api/config/filters
    返回所有的tag value filters信息。

Query

http-api: api/query
code: class QueryRpc

Reference:

http-api: api/search
code: class SearchRpc

Reference:

Serializers

http-api: api/serializers
code: class Serializers

显示支持的序列化方式列表。

Tree

http-api: api/tree
code: class TreeRpc

Reference:

  1. /api/tree
  2. /api/tree/branch
  3. /api/tree/rule
  4. /api/tree/rules
  5. /api/tree/test
  6. /api/tree/collisions
  7. /api/tree/notmatched

UniqueId

http-api: api/uid
code: class UniqueIdRpc

该接口有如下子功能:

  1. assign
    给name分配uid。

    1
    /api/uid/assign?metric=nameA,nameB&tagk=nameC,nameD&tagv=nameE,nameF
  2. uidmeta
    2.1. GET
    获取uid meta信息

    1
    /api/uid/uidmeta?type=tagk&uid=000013

    2.2. POST
    设置meta信息。
    2.3. DELETE
    删除meta信息。

  3. tsmeta
    Handles CRUD calls to individual TSMeta data entries
  4. rename
    更改uid对应的名字,一次只能更新一个。
    1
    /api/uid/rename?tagk=OldName&name=NewName

DieDieDie

telnet: diediedie
http: diediedie
code: class DieDieDie

关闭服务端。

OpenTSDB Code Reading: 2. Server Framework

入口: net.opentsdb.tools.TSDMain

Startup Plugin

代码文件 net.opentsdb.tools StartupPlugin.java

用途:

The StartupPlugin allows users to interact with the OpenTSDB configuration as soon as it is completely parsed, just before OpenTSDB begins to use it.
服务初始化时,准备提供服务时,退出时,会调用plugin接口。代码中没有提供已实现的Startup Plugin。

配置项:

1
2
3
tsd.startup.enable
tsd.core.plugin_path
tsd.startup.plugin

Reference:

Server Framework

服务使用Netty实现,其Channel Pipeline的框架实现在net.opentsdb.tsd PipelineFactory.java

根据文档,服务支持两类接口HTTP & RPC,二者使用同一端口,区分方式是根据请求数据的第一个字节,如果第一个字节是['A', 'Z']之间的字符,则认为是一个HTTP方式的请求,否则是RPC请求。参考代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// File: PipelineFactory.java
// Line: 132
/**
* Dynamically changes the {@link ChannelPipeline} based on the request.
* If a request uses HTTP, then this changes the pipeline to process HTTP.
* Otherwise, the pipeline is changed to processes an RPC.
*/
final class DetectHttpOrRpc extends FrameDecoder {

@Override
protected Object decode(final ChannelHandlerContext ctx,
final Channel chan,
final ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 1) { // Yes sometimes we can be called
return null; // with an empty buffer...
}

final int firstbyte = buffer.getUnsignedByte(buffer.readerIndex());
final ChannelPipeline pipeline = ctx.getPipeline();
// None of the commands in the RPC protocol start with a capital ASCII
// letter for the time being, and all HTTP commands do (GET, POST, etc.)
// so use this as a cheap way to differentiate the two.
if ('A' <= firstbyte && firstbyte <= 'Z') {
pipeline.addLast("decoder", new HttpRequestDecoder());
if (tsdb.getConfig().enable_chunked_requests()) {
pipeline.addLast("aggregator", new HttpChunkAggregator(
tsdb.getConfig().max_chunked_requests()));
}
// allow client to encode the payload (ie : with gziped json)
pipeline.addLast("inflater", new HttpContentDecompressor());
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("deflater", new HttpContentCompressor());
} else {
pipeline.addLast("framer", new LineBasedFrameDecoder(1024));
pipeline.addLast("encoder", ENCODER);
pipeline.addLast("decoder", DECODER);
}

pipeline.addLast("timeout", timeoutHandler);
pipeline.remove(this);
pipeline.addLast("handler", rpchandler);

// Forward the buffer to the next handler.
return buffer.readBytes(buffer.readableBytes());
}

}

根据上述代码,我们可以总结两种接口类型的处理Pipeline如下:

  • HTTP Protocol
    1. connmgr
    2. detect
    3. decoder
    4. aggregator(optional)
    5. inflater
    6. encoder
    7. deflater
    8. timeout
    9. handler
  • RPC Protocol
    1. connmgr
    2. detect
    3. framer
    4. encoder
    5. decoder
    6. timeout
    7. handler

Handler Task

HTTP Protocol

connmgr

Class: net.opentsdb.tsd.ConnectionManager

Keeps track of all existing connections.
Limits the max connections established.

detect

File: PipelineFactory.java class DetectHttpOrRpc

根据请求数据判断协议类型,并添加后续的pipeline handler。

decoder

Class: org.jboss.netty.handler.codec.http.HttpRequestDecoder

官方的请求解析类。

aggregator

Class: org.jboss.netty.handler.codec.http.HttpChunkAggregator

inflater

Class: org.jboss.netty.handler.codec.http.HttpContentDecompressor

encoder

Class: org.jboss.netty.handler.codec.http.HttpResponseEncoder

deflater

Class: org.jboss.netty.handler.codec.http.HttpContentCompressor

timeout

Class: org.jboss.netty.handler.timeout.IdleStateHandler

handler

Class: net.opentsdb.tsd.RpcHandler
因为 HTTP/RPC handler 不同,解析出来的message类型也不同,该类根据消息类型在net.opentsdb.tsd.RpcManager中查询对应的 RPC(Telnet)/HTTP 指令执行函数并执行。

RpcManager中对HTTP官方指令和插件指令各单独维护了一个Map
,二者请求的区别在于uri前缀,如果uri前缀为plugin,则认为是请求插件中提供的功能。然后在对应的Map中查询然后执行。每个Plugin对应一个uri(route)。

1
2
3
4
5
# 请求官方指令
/api/put

# 请求插件指令
/plugin/plugin-path

OpenTSDB还支持RPC Plugin,该插件的实现逻辑与HTTP Protocol不同,Plugin实现自己独立的服务框架,与主框架间仅仅是共享一个net.opentsdb.core.TSDB对象。参考RpcPlugin抽象基类定义:RpcPlugin.java

RPC Protocol

与HTTP Protocol相同的handler此处就不再列出。

framer

Class: net.opentsdb.tsd.LineBasedFrameDecoder

Decodes telnet-style frames delimited by new-lines.

encoder

Class: org.jboss.netty.handler.codec.string.StringEncoder

decoder

Class: net.opentsdb.tsd.WordSplitter

Splits a ChannelBuffer in multiple space separated words.

References:

OpenTSDB Code Reading: 1. Code About UID

code: https://github.com/kmiku7/opentsdb/tree/comment-v2.3.0
package: net.opentsdb.uid

Exception Class

FailedToAssignUniqueIdException/RuntimeException

当在库中持久化 name -> uid 映射关系失败时抛出,信息中包含重试次数(attempts)。

NoSuchUniqueId/NoSuchElementException

没有uid对应的name。

NoSuchUniqueName/NoSuchElementException

没有name对应的uid。

这几个列中均声明了如下变量:

1
static final long serialVersionUID = 1266815261;

java class实现了Serializable Interface时需要定义该字段,这几个异常类均继承自java异常类,因此间接实现了Serializable接口。

References:

Utility Class

RandomUniqueId

生成一个width字节长度的随机数。

该类目前只由net.opentsdb.uid.UniqueId使用。实现中却依赖到net.opentsdb.core.TSDB类,观察没有必要。

1
2
3
4
5
6
7
import net.opentsdb.core.TSDB;

...

public static long getRandomUID() {
return getRandomUID(TSDB.metrics_width());
}

uid默认长度为3字节,2.2版本以前通过修改代码改变长度,2.2以后支持配置项。但是根据代码实现,RandomUniqueId class限制了width长度不大于7字节,UniqueId class限制了width长度在[1, 8]字节之间。这也许是开发过程遗留的bug。

UniqueId

metric/tagk/tagv 每种name对应独立的UniqueId Instance。
UniqueId class 实现了 UniqueIdInterface,但是根据注释描述,UniqueIdInterface没有用处,属于一种过渡设计。

ID生成

自增

在tsdb-uid表中ROW:{\0x0} 维护了 metric/tagk/tagv 的当前最大ID,数据如下:

1
2
3
\x00 column=id:metrics, timestamp=1521783825851, value=\x00\x00\x00\x00\x00\x00\x01\xF3
\x00 column=id:tagk, timestamp=1521783825817, value=\x00\x00\x00\x00\x00\x00\x00\x11
\x00 column=id:tagv, timestamp=1521783825853, value=\x00\x00\x00\x00\x00\x00\x00\xE2

Random

使用RandomUniqueId class生成。
注意该版本代码有Bug:

1
2
3
4
5
6
7
8
9
10
11
// File: UniqueId.java
// Line: 509
if (randomize_id) {
return Deferred.fromResult(RandomUniqueId.getRandomUID());
} else {

// File: RandomUniqueId.java
// Line: 45
public static long getRandomUID() {
return getRandomUID(TSDB.metrics_width());
}

通过无参版本全都使用了metrics的长度,而没有根据具体类型区分。如果tagk/tagv配置的uid长度比metric的长,会导致部分字节没有使用。如果tagk/tagv配置的uid长度比metric的短,则会因为id-length(tagk/tagv)范围外的字节非零导致失败。
参考初始化代码,文档,只用metrics支持random uid,其他不支持。link

参考代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// File: UniqueId.java
// Line: 517
/**
* Create the reverse mapping.
* We do this before the forward one so that if we die before creating
* the forward mapping we don't run the risk of "publishing" a
* partially assigned ID. The reverse mapping on its own is harmless
* but the forward mapping without reverse mapping is bad as it would
* point to an ID that cannot be resolved.
*/
private Deferred<Boolean> createReverseMapping(final Object arg) {
if (!(arg instanceof Long)) {
throw new IllegalStateException("Expected a Long but got " + arg);
}
id = (Long) arg;
if (id <= 0) {
throw new IllegalStateException("Got a negative ID from HBase: " + id);
}
LOG.info("Got ID=" + id
+ " for kind='" + kind() + "' name='" + name + "'");
row = Bytes.fromLong(id);
// row.length should actually be 8.
if (row.length < id_width) {
throw new IllegalStateException("OMG, row.length = " + row.length
+ " which is less than " + id_width
+ " for id=" + id
+ " row=" + Arrays.toString(row));
}
// Verify that we're going to drop bytes that are 0.
for (int i = 0; i < row.length - id_width; i++) {
if (row[i] != 0) {
final String message = "All Unique IDs for " + kind()
+ " on " + id_width + " bytes are already assigned!";
LOG.error("OMG " + message);
throw new IllegalStateException(message);
}
}
// Shrink the ID on the requested number of bytes.
row = Arrays.copyOfRange(row, row.length - id_width, row.length);

state = CREATE_FORWARD_MAPPING;
// We are CAS'ing the KV into existence -- the second argument is how
// we tell HBase we want to atomically create the KV, so that if there
// is already a KV in this cell, we'll fail. Technically we could do
// just a `put' here, as we have a freshly allocated UID, so there is
// not reason why a KV should already exist for this UID, but just to
// err on the safe side and catch really weird corruption cases, we do
// a CAS instead to create the KV.
return client.compareAndSet(reverseMapping(), HBaseClient.EMPTY_ARRAY);
}

另外根据代码实现,配置长度不能大约sizeof(long)的长度。metrics/tagk/tagv的取值空间是互相独立的,因为不同类型的name放在不同的列中,数据如下:

1
2
3
4
5
\x00\x00\x01 column=name:metrics, timestamp=1521783821055, value=tcollector.reader.lines_collected
\x00\x00\x01 column=name:tagk, timestamp=1521783821228, value=host
\x00\x00\x01 column=name:tagv, timestamp=1521783821237, value=999928e09e92
namespace column=id:tagv, timestamp=1521783825636, value=\x00\x00\xAA
net.sockstat.ipfragqueues column=id:metrics, timestamp=1521783825806, value=\x00\x01\xE4

写库过程中均使用了CompareAndSet方式,库中value应为空。

name查询/find suggestions of names given a search term

查询是按照metrics/tagk/tagv维度进行。目测代码支持混合查询。使用scanner实现,参考代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// File: UniqueId.java
// Line: 1184
private static Scanner getSuggestScanner(final HBaseClient client,
final byte[] tsd_uid_table, final String search,
final byte[] kind_or_null, final int max_results) {
final byte[] start_row;
final byte[] end_row;
if (search.isEmpty()) {
start_row = START_ROW;
end_row = END_ROW;
} else {
start_row = toBytes(search);
end_row = Arrays.copyOf(start_row, start_row.length);
end_row[start_row.length - 1]++;
}
final Scanner scanner = client.newScanner(tsd_uid_table);
scanner.setStartKey(start_row);
scanner.setStopKey(end_row);
scanner.setFamily(ID_FAMILY);
if (kind_or_null != null) {
scanner.setQualifier(kind_or_null);
}
scanner.setMaxNumRows(max_results <= 4096 ? max_results : 4096);
return scanner;
}

References:

UID filter

代码中提供了Plugin机制对metric/tagk/tagv进行验证过滤,通过的name才可以在创建。但是该功能目前未在文档中提及。

相关类:

  • UniqueIdFilterPlugin
  • UniqueIdWhitelistFilter

相关配置项:

1
2
tsd.uidfilter.enable
tsd.uidfilter.plugin

从实现上看,只支持一个Plugin生效。默认不使用。