OpenTSDB Code Reading: 1. Code About UID

code: https://github.com/kmiku7/opentsdb/tree/comment-v2.3.0
package: net.opentsdb.uid

Exception Class

FailedToAssignUniqueIdException/RuntimeException

当在库中持久化 name -> uid 映射关系失败时抛出,信息中包含重试次数(attempts)。

NoSuchUniqueId/NoSuchElementException

没有uid对应的name。

NoSuchUniqueName/NoSuchElementException

没有name对应的uid。

这几个列中均声明了如下变量:

1
static final long serialVersionUID = 1266815261;

java class实现了Serializable Interface时需要定义该字段,这几个异常类均继承自java异常类,因此间接实现了Serializable接口。

References:

Utility Class

RandomUniqueId

生成一个width字节长度的随机数。

该类目前只由net.opentsdb.uid.UniqueId使用。实现中却依赖到net.opentsdb.core.TSDB类,观察没有必要。

1
2
3
4
5
6
7
import net.opentsdb.core.TSDB;

...

public static long getRandomUID() {
return getRandomUID(TSDB.metrics_width());
}

uid默认长度为3字节,2.2版本以前通过修改代码改变长度,2.2以后支持配置项。但是根据代码实现,RandomUniqueId class限制了width长度不大于7字节,UniqueId class限制了width长度在[1, 8]字节之间。这也许是开发过程遗留的bug。

UniqueId

metric/tagk/tagv 每种name对应独立的UniqueId Instance。
UniqueId class 实现了 UniqueIdInterface,但是根据注释描述,UniqueIdInterface没有用处,属于一种过渡设计。

ID生成

自增

在tsdb-uid表中ROW:{\0x0} 维护了 metric/tagk/tagv 的当前最大ID,数据如下:

1
2
3
\x00 column=id:metrics, timestamp=1521783825851, value=\x00\x00\x00\x00\x00\x00\x01\xF3
\x00 column=id:tagk, timestamp=1521783825817, value=\x00\x00\x00\x00\x00\x00\x00\x11
\x00 column=id:tagv, timestamp=1521783825853, value=\x00\x00\x00\x00\x00\x00\x00\xE2

Random

使用RandomUniqueId class生成。
注意该版本代码有Bug:

1
2
3
4
5
6
7
8
9
10
11
// File: UniqueId.java
// Line: 509
if (randomize_id) {
return Deferred.fromResult(RandomUniqueId.getRandomUID());
} else {

// File: RandomUniqueId.java
// Line: 45
public static long getRandomUID() {
return getRandomUID(TSDB.metrics_width());
}

通过无参版本全都使用了metrics的长度,而没有根据具体类型区分。如果tagk/tagv配置的uid长度比metric的长,会导致部分字节没有使用。如果tagk/tagv配置的uid长度比metric的短,则会因为id-length(tagk/tagv)范围外的字节非零导致失败。
参考初始化代码,文档,只用metrics支持random uid,其他不支持。link

参考代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// File: UniqueId.java
// Line: 517
/**
* Create the reverse mapping.
* We do this before the forward one so that if we die before creating
* the forward mapping we don't run the risk of "publishing" a
* partially assigned ID. The reverse mapping on its own is harmless
* but the forward mapping without reverse mapping is bad as it would
* point to an ID that cannot be resolved.
*/
private Deferred<Boolean> createReverseMapping(final Object arg) {
if (!(arg instanceof Long)) {
throw new IllegalStateException("Expected a Long but got " + arg);
}
id = (Long) arg;
if (id <= 0) {
throw new IllegalStateException("Got a negative ID from HBase: " + id);
}
LOG.info("Got ID=" + id
+ " for kind='" + kind() + "' name='" + name + "'");
row = Bytes.fromLong(id);
// row.length should actually be 8.
if (row.length < id_width) {
throw new IllegalStateException("OMG, row.length = " + row.length
+ " which is less than " + id_width
+ " for id=" + id
+ " row=" + Arrays.toString(row));
}
// Verify that we're going to drop bytes that are 0.
for (int i = 0; i < row.length - id_width; i++) {
if (row[i] != 0) {
final String message = "All Unique IDs for " + kind()
+ " on " + id_width + " bytes are already assigned!";
LOG.error("OMG " + message);
throw new IllegalStateException(message);
}
}
// Shrink the ID on the requested number of bytes.
row = Arrays.copyOfRange(row, row.length - id_width, row.length);

state = CREATE_FORWARD_MAPPING;
// We are CAS'ing the KV into existence -- the second argument is how
// we tell HBase we want to atomically create the KV, so that if there
// is already a KV in this cell, we'll fail. Technically we could do
// just a `put' here, as we have a freshly allocated UID, so there is
// not reason why a KV should already exist for this UID, but just to
// err on the safe side and catch really weird corruption cases, we do
// a CAS instead to create the KV.
return client.compareAndSet(reverseMapping(), HBaseClient.EMPTY_ARRAY);
}

另外根据代码实现,配置长度不能大约sizeof(long)的长度。metrics/tagk/tagv的取值空间是互相独立的,因为不同类型的name放在不同的列中,数据如下:

1
2
3
4
5
\x00\x00\x01 column=name:metrics, timestamp=1521783821055, value=tcollector.reader.lines_collected
\x00\x00\x01 column=name:tagk, timestamp=1521783821228, value=host
\x00\x00\x01 column=name:tagv, timestamp=1521783821237, value=999928e09e92
namespace column=id:tagv, timestamp=1521783825636, value=\x00\x00\xAA
net.sockstat.ipfragqueues column=id:metrics, timestamp=1521783825806, value=\x00\x01\xE4

写库过程中均使用了CompareAndSet方式,库中value应为空。

name查询/find suggestions of names given a search term

查询是按照metrics/tagk/tagv维度进行。目测代码支持混合查询。使用scanner实现,参考代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// File: UniqueId.java
// Line: 1184
private static Scanner getSuggestScanner(final HBaseClient client,
final byte[] tsd_uid_table, final String search,
final byte[] kind_or_null, final int max_results) {
final byte[] start_row;
final byte[] end_row;
if (search.isEmpty()) {
start_row = START_ROW;
end_row = END_ROW;
} else {
start_row = toBytes(search);
end_row = Arrays.copyOf(start_row, start_row.length);
end_row[start_row.length - 1]++;
}
final Scanner scanner = client.newScanner(tsd_uid_table);
scanner.setStartKey(start_row);
scanner.setStopKey(end_row);
scanner.setFamily(ID_FAMILY);
if (kind_or_null != null) {
scanner.setQualifier(kind_or_null);
}
scanner.setMaxNumRows(max_results <= 4096 ? max_results : 4096);
return scanner;
}

References:

UID filter

代码中提供了Plugin机制对metric/tagk/tagv进行验证过滤,通过的name才可以在创建。但是该功能目前未在文档中提及。

相关类:

  • UniqueIdFilterPlugin
  • UniqueIdWhitelistFilter

相关配置项:

1
2
tsd.uidfilter.enable
tsd.uidfilter.plugin

从实现上看,只支持一个Plugin生效。默认不使用。