mirror of
https://gitee.com/BDWare/common
synced 2026-02-14 16:49:28 +00:00
fix: binarySearch bugs in TimeSerialIndex
feat: support multi tag time index util
This commit is contained in:
@@ -10,6 +10,7 @@ public enum Permission {
|
||||
RocksDB,
|
||||
MongoDB,
|
||||
BDWareTimeSeriesDB,
|
||||
MultiTagIndexDB,
|
||||
SM2,
|
||||
AES,
|
||||
Ledger,
|
||||
|
||||
@@ -18,6 +18,7 @@ import java.util.*;
|
||||
public class MultiIndexTimeRocksDBUtil implements MultiIndexTimeDBUtilIntf {
|
||||
private static final Logger LOGGER = LogManager.getLogger(MultiIndexTimeRocksDBUtil.class);
|
||||
private final Map<String, TimeSerialIndex> secondaryIndex;
|
||||
static String primaryKey = "_DB_primary_";
|
||||
public String dbPath;
|
||||
public String tableName;
|
||||
Random random = new Random();
|
||||
@@ -42,7 +43,7 @@ public class MultiIndexTimeRocksDBUtil implements MultiIndexTimeDBUtilIntf {
|
||||
Options options = new Options();
|
||||
options.setCreateIfMissing(true);
|
||||
File lockFile = new File(file, "LOCK");
|
||||
File timeIndex = new File(dbPath + "/" + tableName + "/DB.primary.timeindex");
|
||||
File timeIndex = new File(dbPath + "/" + tableName + "/" + primaryIndex + ".timeindex");
|
||||
LOGGER.trace("create directory " + file.getAbsolutePath() + ": " + file.mkdirs());
|
||||
LOGGER.trace("delete file" + lockFile.getAbsolutePath() + ": " + lockFile.delete());
|
||||
try {
|
||||
@@ -51,8 +52,16 @@ public class MultiIndexTimeRocksDBUtil implements MultiIndexTimeDBUtilIntf {
|
||||
} catch (RocksDBException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
primaryIndex = new TimeSerialIndex(timeIndex.getAbsolutePath());
|
||||
secondaryIndex.put(primaryKey, primaryIndex);
|
||||
for (File f : timeIndex.getParentFile().listFiles()) {
|
||||
if (!f.getName().endsWith(".timeindex")) continue;
|
||||
if (f.getName().equals(timeIndex.getName())) continue;
|
||||
TimeSerialIndex index =
|
||||
new TimeSerialIndex(f.getAbsolutePath());
|
||||
secondaryIndex.putIfAbsent(f.getName().substring(0,
|
||||
f.getName().length() - ".timeindex".length()), index);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void put(String label, String val) {
|
||||
@@ -141,8 +150,13 @@ public class MultiIndexTimeRocksDBUtil implements MultiIndexTimeDBUtilIntf {
|
||||
|
||||
public long size(String label) {
|
||||
try {
|
||||
TimeSerialIndex index = getIndex(label);
|
||||
if (index != null) return index.size();
|
||||
if (label == null || label.length() == 0) {
|
||||
return primaryIndex.size();
|
||||
}
|
||||
if (secondaryIndex.containsKey(label)) {
|
||||
TimeSerialIndex index = getIndex(label);
|
||||
return index.size();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
@@ -152,17 +166,23 @@ public class MultiIndexTimeRocksDBUtil implements MultiIndexTimeDBUtilIntf {
|
||||
public List<JsonObject> queryByOffset(String label, long offset, int count) {
|
||||
List<JsonObject> ret = new ArrayList<>();
|
||||
TimeSerialIndex index = getIndex(label);
|
||||
List<Long> data = index.request(offset, count);
|
||||
for (Long l : data) {
|
||||
List<TimeSerialIndex.IndexEntry> data = index.requestIndexEntry(offset, count);
|
||||
for (TimeSerialIndex.IndexEntry entry : data) {
|
||||
try {
|
||||
String t = new String(db.get(longToByte(l)));
|
||||
String t = new String(db.get(longToByte(entry.value)));
|
||||
JsonObject jo;
|
||||
if (!t.isEmpty()) {
|
||||
jo = JsonUtil.parseStringAsJsonObject(t);
|
||||
try {
|
||||
jo = JsonUtil.parseStringAsJsonObject(t);
|
||||
} catch (Exception e) {
|
||||
jo = new JsonObject();
|
||||
jo.addProperty("data", t);
|
||||
}
|
||||
} else {
|
||||
jo = new JsonObject();
|
||||
}
|
||||
jo.addProperty("key", l.toString());
|
||||
jo.addProperty("key", entry.value);
|
||||
jo.addProperty("timestamp", entry.key);
|
||||
ret.add(jo);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
@@ -181,7 +201,7 @@ public class MultiIndexTimeRocksDBUtil implements MultiIndexTimeDBUtilIntf {
|
||||
}
|
||||
|
||||
private TimeSerialIndex getIndex(String label) {
|
||||
if (null == label) {
|
||||
if (null == label || label.length() == 0) {
|
||||
return primaryIndex;
|
||||
}
|
||||
if (secondaryIndex.containsKey(label)) {
|
||||
@@ -275,6 +295,31 @@ public class MultiIndexTimeRocksDBUtil implements MultiIndexTimeDBUtilIntf {
|
||||
// manuellyIndex可参考LenVarTimeIndex写法
|
||||
}
|
||||
|
||||
public void close() {
|
||||
db.close();
|
||||
}
|
||||
|
||||
public List<String> getIndexStartWith(String prefix) {
|
||||
List<String> ret = new ArrayList<>();
|
||||
if (prefix == null) {
|
||||
prefix = "";
|
||||
}
|
||||
for (String key : secondaryIndex.keySet()) {
|
||||
if (key.startsWith(prefix))
|
||||
ret.add(key);
|
||||
}
|
||||
ret.remove(primaryKey);
|
||||
return ret;
|
||||
}
|
||||
|
||||
public List<String> getAllIndexKey() {
|
||||
Set<String> data = secondaryIndex.keySet();
|
||||
List<String> ret = new ArrayList<>();
|
||||
ret.addAll(data);
|
||||
ret.remove(primaryKey);
|
||||
return ret;
|
||||
}
|
||||
|
||||
// TODO
|
||||
static class BytesPair {
|
||||
byte[] key, value;
|
||||
|
||||
@@ -85,6 +85,39 @@ public class TimeSerialIndex {
|
||||
} else return new ArrayList<>();
|
||||
}
|
||||
|
||||
public static class IndexEntry {
|
||||
public long key, value;
|
||||
}
|
||||
|
||||
public synchronized List<IndexEntry> requestIndexEntry(long offset, int len) {
|
||||
List<IndexEntry> ret = new ArrayList<>();
|
||||
if (offset < 0) offset = 0;
|
||||
if (offset < fileSize) {
|
||||
long pos = 0;
|
||||
try {
|
||||
pos = file.getFilePointer();
|
||||
file.seek(2L * offset * 8L);
|
||||
for (; offset < fileSize && len > 0; len--) {
|
||||
IndexEntry entry = new IndexEntry();
|
||||
entry.key = file.readLong();
|
||||
entry.value = file.readLong();
|
||||
ret.add(entry);
|
||||
offset++;
|
||||
}
|
||||
file.seek(pos);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
try {
|
||||
file.seek(pos);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
} else return new ArrayList<>();
|
||||
}
|
||||
|
||||
private long getIndex(long offset) {
|
||||
if (offset < fileSize) {
|
||||
try {
|
||||
@@ -109,7 +142,7 @@ public class TimeSerialIndex {
|
||||
public synchronized long findNearest(long timeStamp) {
|
||||
try {
|
||||
long pos = file.getFilePointer();
|
||||
long ret = binarySearch(0L, fileSize + 1, timeStamp);
|
||||
long ret = binarySearch(0L, fileSize, timeStamp);
|
||||
file.seek(pos);
|
||||
return ret;
|
||||
} catch (IOException e) {
|
||||
@@ -120,9 +153,14 @@ public class TimeSerialIndex {
|
||||
}
|
||||
|
||||
private long binarySearch(long start, long end, long timeStamp) {
|
||||
if (start >= end - 1) return start;
|
||||
if (end <= 0) return end;
|
||||
if (start >= end - 1) {
|
||||
long key = getIndex(end - 1);
|
||||
if (key >= timeStamp) {
|
||||
return end - 1;
|
||||
} else return end;
|
||||
}
|
||||
long mid = (start + end) / 2;
|
||||
if (mid >= end - 1) return mid;
|
||||
long key = getIndex(mid);
|
||||
if (key >= timeStamp) {
|
||||
return binarySearch(start, mid, timeStamp);
|
||||
|
||||
Reference in New Issue
Block a user