src: add 4 files
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,31 @@
|
||||
package com.hypixel.hytale.server.core.universe.world.storage;
|
||||
|
||||
import com.hypixel.hytale.component.Holder;
|
||||
import it.unimi.dsi.fastutil.longs.LongSet;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public interface IChunkSaver extends Closeable {
|
||||
@Nonnull
|
||||
CompletableFuture<Void> saveHolder(int var1, int var2, @Nonnull Holder<ChunkStore> var3);
|
||||
|
||||
@Nonnull
|
||||
CompletableFuture<Void> removeHolder(int var1, int var2);
|
||||
|
||||
/**
|
||||
* Deletes the entire region file containing the specified chunk coordinates.
|
||||
* Used for corruption recovery when the region file is unrecoverable.
|
||||
*/
|
||||
@Nonnull
|
||||
default CompletableFuture<Void> deleteRegionFile(int x, int z) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
LongSet getIndexes() throws IOException;
|
||||
|
||||
void flush() throws IOException;
|
||||
}
|
||||
@@ -0,0 +1,474 @@
|
||||
package com.hypixel.hytale.server.core.universe.world.storage.provider;
|
||||
|
||||
import com.hypixel.fastutil.longs.Long2ObjectConcurrentHashMap;
|
||||
import com.hypixel.hytale.codec.Codec;
|
||||
import com.hypixel.hytale.codec.KeyedCodec;
|
||||
import com.hypixel.hytale.codec.builder.BuilderCodec;
|
||||
import com.hypixel.hytale.codec.codecs.array.ArrayCodec;
|
||||
import com.hypixel.hytale.component.Resource;
|
||||
import com.hypixel.hytale.component.ResourceType;
|
||||
import com.hypixel.hytale.component.Store;
|
||||
import com.hypixel.hytale.component.SystemGroup;
|
||||
import com.hypixel.hytale.component.system.StoreSystem;
|
||||
import com.hypixel.hytale.math.util.ChunkUtil;
|
||||
import com.hypixel.hytale.metrics.MetricProvider;
|
||||
import com.hypixel.hytale.metrics.MetricResults;
|
||||
import com.hypixel.hytale.metrics.MetricsRegistry;
|
||||
import com.hypixel.hytale.server.core.universe.Universe;
|
||||
import com.hypixel.hytale.server.core.universe.world.World;
|
||||
import com.hypixel.hytale.server.core.universe.world.storage.BufferChunkLoader;
|
||||
import com.hypixel.hytale.server.core.universe.world.storage.BufferChunkSaver;
|
||||
import com.hypixel.hytale.server.core.universe.world.storage.ChunkStore;
|
||||
import com.hypixel.hytale.server.core.universe.world.storage.IChunkLoader;
|
||||
import com.hypixel.hytale.server.core.universe.world.storage.IChunkSaver;
|
||||
import com.hypixel.hytale.sneakythrow.SneakyThrow;
|
||||
import com.hypixel.hytale.storage.IndexedStorageFile;
|
||||
import it.unimi.dsi.fastutil.ints.IntList;
|
||||
import it.unimi.dsi.fastutil.ints.IntListIterator;
|
||||
import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry;
|
||||
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
|
||||
import it.unimi.dsi.fastutil.longs.LongSet;
|
||||
import it.unimi.dsi.fastutil.longs.LongSets;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.Closeable;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class IndexedStorageChunkStorageProvider implements IChunkStorageProvider {
|
||||
public static final String ID = "IndexedStorage";
|
||||
@Nonnull
|
||||
public static final BuilderCodec<IndexedStorageChunkStorageProvider> CODEC = BuilderCodec.builder(
|
||||
IndexedStorageChunkStorageProvider.class, IndexedStorageChunkStorageProvider::new
|
||||
)
|
||||
.documentation("Uses the indexed storage file format to store chunks.")
|
||||
.<Boolean>appendInherited(
|
||||
new KeyedCodec<>("FlushOnWrite", Codec.BOOLEAN), (o, i) -> o.flushOnWrite = i, o -> o.flushOnWrite, (o, p) -> o.flushOnWrite = p.flushOnWrite
|
||||
)
|
||||
.documentation(
|
||||
"Controls whether the indexed storage flushes during writes.\nRecommended to be enabled to prevent corruption of chunks during unclean shutdowns."
|
||||
)
|
||||
.add()
|
||||
.build();
|
||||
private boolean flushOnWrite = true;
|
||||
|
||||
public IndexedStorageChunkStorageProvider() {
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public IChunkLoader getLoader(@Nonnull Store<ChunkStore> store) {
|
||||
return new IndexedStorageChunkStorageProvider.IndexedStorageChunkLoader(store, this.flushOnWrite);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public IChunkSaver getSaver(@Nonnull Store<ChunkStore> store) {
|
||||
return new IndexedStorageChunkStorageProvider.IndexedStorageChunkSaver(store, this.flushOnWrite);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public String toString() {
|
||||
return "IndexedStorageChunkStorageProvider{}";
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private static String toFileName(int regionX, int regionZ) {
|
||||
return regionX + "." + regionZ + ".region.bin";
|
||||
}
|
||||
|
||||
private static long fromFileName(@Nonnull String fileName) {
|
||||
String[] split = fileName.split("\\.");
|
||||
if (split.length != 4) {
|
||||
throw new IllegalArgumentException("Unexpected file name format!");
|
||||
} else if (!"region".equals(split[2])) {
|
||||
throw new IllegalArgumentException("Unexpected file name format!");
|
||||
} else if (!"bin".equals(split[3])) {
|
||||
throw new IllegalArgumentException("Unexpected file extension!");
|
||||
} else {
|
||||
int regionX = Integer.parseInt(split[0]);
|
||||
int regionZ = Integer.parseInt(split[1]);
|
||||
return ChunkUtil.indexChunk(regionX, regionZ);
|
||||
}
|
||||
}
|
||||
|
||||
public static class IndexedStorageCache implements Closeable, MetricProvider, Resource<ChunkStore> {
|
||||
@Nonnull
|
||||
public static final MetricsRegistry<IndexedStorageChunkStorageProvider.IndexedStorageCache> METRICS_REGISTRY = new MetricsRegistry<IndexedStorageChunkStorageProvider.IndexedStorageCache>()
|
||||
.register(
|
||||
"Files",
|
||||
cache -> cache.cache
|
||||
.long2ObjectEntrySet()
|
||||
.stream()
|
||||
.map(IndexedStorageChunkStorageProvider.IndexedStorageCache.CacheEntryMetricData::new)
|
||||
.toArray(IndexedStorageChunkStorageProvider.IndexedStorageCache.CacheEntryMetricData[]::new),
|
||||
new ArrayCodec<>(
|
||||
IndexedStorageChunkStorageProvider.IndexedStorageCache.CacheEntryMetricData.CODEC,
|
||||
IndexedStorageChunkStorageProvider.IndexedStorageCache.CacheEntryMetricData[]::new
|
||||
)
|
||||
);
|
||||
private final Long2ObjectConcurrentHashMap<IndexedStorageFile> cache = new Long2ObjectConcurrentHashMap<>(true, ChunkUtil.NOT_FOUND);
|
||||
private Path path;
|
||||
|
||||
public IndexedStorageCache() {
|
||||
}
|
||||
|
||||
public static ResourceType<ChunkStore, IndexedStorageChunkStorageProvider.IndexedStorageCache> getResourceType() {
|
||||
return Universe.get().getIndexedStorageCacheResourceType();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public Long2ObjectConcurrentHashMap<IndexedStorageFile> getCache() {
|
||||
return this.cache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
IOException exception = null;
|
||||
Iterator<IndexedStorageFile> iterator = this.cache.values().iterator();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
try {
|
||||
iterator.next().close();
|
||||
iterator.remove();
|
||||
} catch (Exception var4) {
|
||||
if (exception == null) {
|
||||
exception = new IOException("Failed to close one or more loaders!");
|
||||
}
|
||||
|
||||
exception.addSuppressed(var4);
|
||||
}
|
||||
}
|
||||
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public IndexedStorageFile getOrTryOpen(int regionX, int regionZ, boolean flushOnWrite) {
|
||||
return this.cache.computeIfAbsent(ChunkUtil.indexChunk(regionX, regionZ), k -> {
|
||||
Path regionFile = this.path.resolve(IndexedStorageChunkStorageProvider.toFileName(regionX, regionZ));
|
||||
if (!Files.exists(regionFile)) {
|
||||
return null;
|
||||
} else {
|
||||
try {
|
||||
IndexedStorageFile open = IndexedStorageFile.open(regionFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
|
||||
open.setFlushOnWrite(flushOnWrite);
|
||||
return open;
|
||||
} catch (FileNotFoundException var8) {
|
||||
return null;
|
||||
} catch (Exception var9) {
|
||||
// Corruption detected - rename file and return null to trigger regeneration
|
||||
System.err.println("[IndexedStorageCache] Corrupted region file detected: " + regionFile + " - " + var9.getMessage());
|
||||
try {
|
||||
Path corruptedPath = regionFile.resolveSibling(regionFile.getFileName() + ".corrupted");
|
||||
Files.move(regionFile, corruptedPath, StandardCopyOption.REPLACE_EXISTING);
|
||||
System.err.println("[IndexedStorageCache] Renamed to: " + corruptedPath);
|
||||
} catch (IOException moveErr) {
|
||||
System.err.println("[IndexedStorageCache] Failed to rename corrupted file: " + moveErr.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks a region file as corrupted by renaming it with .corrupted suffix.
|
||||
* The file will be regenerated on next access.
|
||||
*/
|
||||
public void markRegionCorrupted(int regionX, int regionZ) {
|
||||
long key = ChunkUtil.indexChunk(regionX, regionZ);
|
||||
IndexedStorageFile file = this.cache.remove(key);
|
||||
if (file != null) {
|
||||
try {
|
||||
file.close();
|
||||
} catch (IOException e) {
|
||||
// Ignore close errors
|
||||
}
|
||||
}
|
||||
Path regionFile = this.path.resolve(IndexedStorageChunkStorageProvider.toFileName(regionX, regionZ));
|
||||
if (Files.exists(regionFile)) {
|
||||
try {
|
||||
Path corruptedPath = regionFile.resolveSibling(regionFile.getFileName() + ".corrupted");
|
||||
Files.move(regionFile, corruptedPath, StandardCopyOption.REPLACE_EXISTING);
|
||||
System.err.println("[IndexedStorageCache] Marked region as corrupted: " + corruptedPath);
|
||||
} catch (IOException e) {
|
||||
System.err.println("[IndexedStorageCache] Failed to mark region as corrupted: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public IndexedStorageFile getOrCreate(int regionX, int regionZ, boolean flushOnWrite) {
|
||||
return this.cache.computeIfAbsent(ChunkUtil.indexChunk(regionX, regionZ), k -> {
|
||||
try {
|
||||
if (!Files.exists(this.path)) {
|
||||
try {
|
||||
Files.createDirectory(this.path);
|
||||
} catch (FileAlreadyExistsException var8) {
|
||||
}
|
||||
}
|
||||
|
||||
Path regionFile = this.path.resolve(IndexedStorageChunkStorageProvider.toFileName(regionX, regionZ));
|
||||
IndexedStorageFile open = IndexedStorageFile.open(regionFile, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
|
||||
open.setFlushOnWrite(flushOnWrite);
|
||||
return open;
|
||||
} catch (IOException var9) {
|
||||
throw SneakyThrow.sneakyThrow(var9);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public LongSet getIndexes() throws IOException {
|
||||
if (!Files.exists(this.path)) {
|
||||
return LongSets.EMPTY_SET;
|
||||
} else {
|
||||
LongOpenHashSet chunkIndexes = new LongOpenHashSet();
|
||||
|
||||
try (Stream<Path> stream = Files.list(this.path)) {
|
||||
stream.forEach(path -> {
|
||||
if (!Files.isDirectory(path)) {
|
||||
long regionIndex;
|
||||
try {
|
||||
regionIndex = IndexedStorageChunkStorageProvider.fromFileName(path.getFileName().toString());
|
||||
} catch (IllegalArgumentException var15) {
|
||||
return;
|
||||
}
|
||||
|
||||
int regionX = ChunkUtil.xOfChunkIndex(regionIndex);
|
||||
int regionZ = ChunkUtil.zOfChunkIndex(regionIndex);
|
||||
IndexedStorageFile regionFile = this.getOrTryOpen(regionX, regionZ, true);
|
||||
if (regionFile != null) {
|
||||
IntList blobIndexes = regionFile.keys();
|
||||
IntListIterator iterator = blobIndexes.iterator();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
int blobIndex = iterator.nextInt();
|
||||
int localX = ChunkUtil.xFromColumn(blobIndex);
|
||||
int localZ = ChunkUtil.zFromColumn(blobIndex);
|
||||
int chunkX = regionX << 5 | localX;
|
||||
int chunkZ = regionZ << 5 | localZ;
|
||||
chunkIndexes.add(ChunkUtil.indexChunk(chunkX, chunkZ));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return chunkIndexes;
|
||||
}
|
||||
}
|
||||
|
||||
public void flush() throws IOException {
|
||||
IOException exception = null;
|
||||
|
||||
for (IndexedStorageFile indexedStorageFile : this.cache.values()) {
|
||||
try {
|
||||
indexedStorageFile.force(false);
|
||||
} catch (Exception var5) {
|
||||
if (exception == null) {
|
||||
exception = new IOException("Failed to close one or more loaders!");
|
||||
}
|
||||
|
||||
exception.addSuppressed(var5);
|
||||
}
|
||||
}
|
||||
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public MetricResults toMetricResults() {
|
||||
return METRICS_REGISTRY.toMetricResults(this);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Resource<ChunkStore> clone() {
|
||||
return new IndexedStorageChunkStorageProvider.IndexedStorageCache();
|
||||
}
|
||||
|
||||
private static class CacheEntryMetricData {
|
||||
@Nonnull
|
||||
private static final Codec<IndexedStorageChunkStorageProvider.IndexedStorageCache.CacheEntryMetricData> CODEC = BuilderCodec.builder(
|
||||
IndexedStorageChunkStorageProvider.IndexedStorageCache.CacheEntryMetricData.class,
|
||||
IndexedStorageChunkStorageProvider.IndexedStorageCache.CacheEntryMetricData::new
|
||||
)
|
||||
.append(new KeyedCodec<>("Key", Codec.LONG), (entry, o) -> entry.key = o, entry -> entry.key)
|
||||
.add()
|
||||
.append(new KeyedCodec<>("File", IndexedStorageFile.METRICS_REGISTRY), (entry, o) -> entry.value = o, entry -> entry.value)
|
||||
.add()
|
||||
.build();
|
||||
private long key;
|
||||
private IndexedStorageFile value;
|
||||
|
||||
public CacheEntryMetricData() {
|
||||
}
|
||||
|
||||
public CacheEntryMetricData(@Nonnull Entry<IndexedStorageFile> entry) {
|
||||
this.key = entry.getLongKey();
|
||||
this.value = entry.getValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class IndexedStorageCacheSetupSystem extends StoreSystem<ChunkStore> {
|
||||
public IndexedStorageCacheSetupSystem() {
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public SystemGroup<ChunkStore> getGroup() {
|
||||
return ChunkStore.INIT_GROUP;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSystemAddedToStore(@Nonnull Store<ChunkStore> store) {
|
||||
World world = store.getExternalData().getWorld();
|
||||
store.getResource(IndexedStorageChunkStorageProvider.IndexedStorageCache.getResourceType()).path = world.getSavePath().resolve("chunks");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSystemRemovedFromStore(@Nonnull Store<ChunkStore> store) {
|
||||
}
|
||||
}
|
||||
|
||||
public static class IndexedStorageChunkLoader extends BufferChunkLoader implements MetricProvider {
|
||||
private final boolean flushOnWrite;
|
||||
|
||||
public IndexedStorageChunkLoader(@Nonnull Store<ChunkStore> store, boolean flushOnWrite) {
|
||||
super(store);
|
||||
this.flushOnWrite = flushOnWrite;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
this.getStore().getResource(IndexedStorageChunkStorageProvider.IndexedStorageCache.getResourceType()).close();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public CompletableFuture<ByteBuffer> loadBuffer(int x, int z) {
|
||||
int regionX = x >> 5;
|
||||
int regionZ = z >> 5;
|
||||
int localX = x & 31;
|
||||
int localZ = z & 31;
|
||||
int index = ChunkUtil.indexColumn(localX, localZ);
|
||||
IndexedStorageChunkStorageProvider.IndexedStorageCache indexedStorageCache = this.getStore()
|
||||
.getResource(IndexedStorageChunkStorageProvider.IndexedStorageCache.getResourceType());
|
||||
return CompletableFuture.supplyAsync(SneakyThrow.sneakySupplier(() -> {
|
||||
IndexedStorageFile chunks = indexedStorageCache.getOrTryOpen(regionX, regionZ, this.flushOnWrite);
|
||||
return chunks == null ? null : chunks.readBlob(index);
|
||||
}));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public LongSet getIndexes() throws IOException {
|
||||
return this.getStore().getResource(IndexedStorageChunkStorageProvider.IndexedStorageCache.getResourceType()).getIndexes();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public MetricResults toMetricResults() {
|
||||
return this.getStore().getExternalData().getSaver() instanceof IndexedStorageChunkStorageProvider.IndexedStorageChunkSaver
|
||||
? null
|
||||
: this.getStore().getResource(IndexedStorageChunkStorageProvider.IndexedStorageCache.getResourceType()).toMetricResults();
|
||||
}
|
||||
}
|
||||
|
||||
public static class IndexedStorageChunkSaver extends BufferChunkSaver implements MetricProvider {
|
||||
private final boolean flushOnWrite;
|
||||
|
||||
protected IndexedStorageChunkSaver(@Nonnull Store<ChunkStore> store, boolean flushOnWrite) {
|
||||
super(store);
|
||||
this.flushOnWrite = flushOnWrite;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
IndexedStorageChunkStorageProvider.IndexedStorageCache indexedStorageCache = this.getStore()
|
||||
.getResource(IndexedStorageChunkStorageProvider.IndexedStorageCache.getResourceType());
|
||||
indexedStorageCache.close();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public CompletableFuture<Void> saveBuffer(int x, int z, @Nonnull ByteBuffer buffer) {
|
||||
int regionX = x >> 5;
|
||||
int regionZ = z >> 5;
|
||||
int localX = x & 31;
|
||||
int localZ = z & 31;
|
||||
int index = ChunkUtil.indexColumn(localX, localZ);
|
||||
IndexedStorageChunkStorageProvider.IndexedStorageCache indexedStorageCache = this.getStore()
|
||||
.getResource(IndexedStorageChunkStorageProvider.IndexedStorageCache.getResourceType());
|
||||
return CompletableFuture.runAsync(SneakyThrow.sneakyRunnable(() -> {
|
||||
IndexedStorageFile chunks = indexedStorageCache.getOrCreate(regionX, regionZ, this.flushOnWrite);
|
||||
chunks.writeBlob(index, buffer);
|
||||
}));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public CompletableFuture<Void> removeBuffer(int x, int z) {
|
||||
int regionX = x >> 5;
|
||||
int regionZ = z >> 5;
|
||||
int localX = x & 31;
|
||||
int localZ = z & 31;
|
||||
int index = ChunkUtil.indexColumn(localX, localZ);
|
||||
IndexedStorageChunkStorageProvider.IndexedStorageCache indexedStorageCache = this.getStore()
|
||||
.getResource(IndexedStorageChunkStorageProvider.IndexedStorageCache.getResourceType());
|
||||
return CompletableFuture.runAsync(SneakyThrow.sneakyRunnable(() -> {
|
||||
IndexedStorageFile chunks = indexedStorageCache.getOrTryOpen(regionX, regionZ, this.flushOnWrite);
|
||||
if (chunks != null) {
|
||||
chunks.removeBlob(index);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public CompletableFuture<Void> deleteRegionFile(int x, int z) {
|
||||
int regionX = x >> 5;
|
||||
int regionZ = z >> 5;
|
||||
IndexedStorageChunkStorageProvider.IndexedStorageCache indexedStorageCache = this.getStore()
|
||||
.getResource(IndexedStorageChunkStorageProvider.IndexedStorageCache.getResourceType());
|
||||
return CompletableFuture.runAsync(() -> {
|
||||
indexedStorageCache.markRegionCorrupted(regionX, regionZ);
|
||||
});
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public LongSet getIndexes() throws IOException {
|
||||
return this.getStore().getResource(IndexedStorageChunkStorageProvider.IndexedStorageCache.getResourceType()).getIndexes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
this.getStore().getResource(IndexedStorageChunkStorageProvider.IndexedStorageCache.getResourceType()).flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricResults toMetricResults() {
|
||||
return this.getStore().getResource(IndexedStorageChunkStorageProvider.IndexedStorageCache.getResourceType()).toMetricResults();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -126,12 +126,21 @@ public class IndexedStorageFile implements Closeable {
|
||||
throw new IOException("file channel is empty");
|
||||
}
|
||||
|
||||
storageFile.readHeader();
|
||||
storageFile.memoryMapBlobIndexes();
|
||||
if (storageFile.version == 0) {
|
||||
storageFile = migrateV0(path, blobCount, segmentSize, options, attrs, storageFile);
|
||||
} else {
|
||||
storageFile.readUsedSegments();
|
||||
try {
|
||||
storageFile.readHeader();
|
||||
storageFile.memoryMapBlobIndexes();
|
||||
if (storageFile.version == 0) {
|
||||
storageFile = migrateV0(path, blobCount, segmentSize, options, attrs, storageFile);
|
||||
} else {
|
||||
storageFile.readUsedSegments();
|
||||
}
|
||||
} catch (CorruptedStorageException e) {
|
||||
// Corruption detected - delete and recreate the file
|
||||
System.err.println("[IndexedStorageFile] Corruption detected in " + path + ": " + e.getMessage() + " - deleting and recreating");
|
||||
storageFile.close();
|
||||
Files.delete(path);
|
||||
storageFile = new IndexedStorageFile(path, FileChannel.open(path, options, attrs));
|
||||
storageFile.create(blobCount, segmentSize);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,6 +148,15 @@ public class IndexedStorageFile implements Closeable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Exception thrown when storage file corruption is detected.
|
||||
*/
|
||||
public static class CorruptedStorageException extends IOException {
|
||||
public CorruptedStorageException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
||||
private static IndexedStorageFile migrateV0(
|
||||
Path path, int blobCount, int segmentSize, Set<? extends OpenOption> options, FileAttribute<?>[] attrs, IndexedStorageFile storageFile
|
||||
) throws IOException {
|
||||
@@ -262,6 +280,9 @@ public class IndexedStorageFile implements Closeable {
|
||||
this.mappedBlobIndexes = this.fileChannel.map(MapMode.READ_WRITE, HEADER_LENGTH, this.blobCount * 4L);
|
||||
}
|
||||
|
||||
private static final int MAX_COMPRESSED_LENGTH_LIMIT = 256 * 1024 * 1024; // 256MB
|
||||
private static final int MAX_SEGMENT_INDEX = 10_000_000; // ~40GB at 4KB segments
|
||||
|
||||
protected void readUsedSegments() throws IOException {
|
||||
long stamp = this.usedSegmentsLock.writeLock();
|
||||
|
||||
@@ -276,25 +297,25 @@ public class IndexedStorageFile implements Closeable {
|
||||
firstSegmentIndex = this.mappedBlobIndexes.getInt(indexPos);
|
||||
if (firstSegmentIndex == 0) {
|
||||
compressedLength = 0;
|
||||
} else if (firstSegmentIndex < 0 || firstSegmentIndex > MAX_SEGMENT_INDEX) {
|
||||
// Corrupted segment index - file is corrupt
|
||||
throw new CorruptedStorageException(
|
||||
"Invalid segment index " + firstSegmentIndex + " for blob " + blobIndex);
|
||||
} else {
|
||||
try {
|
||||
ByteBuffer blobHeaderBuffer = this.readBlobHeader(firstSegmentIndex);
|
||||
compressedLength = blobHeaderBuffer.getInt(COMPRESSED_LENGTH_OFFSET);
|
||||
} catch (Exception e) {
|
||||
// Corrupted blob header - skip this blob
|
||||
compressedLength = 0;
|
||||
ByteBuffer blobHeaderBuffer = this.readBlobHeader(firstSegmentIndex);
|
||||
compressedLength = blobHeaderBuffer.getInt(COMPRESSED_LENGTH_OFFSET);
|
||||
if (compressedLength < 0 || compressedLength > MAX_COMPRESSED_LENGTH_LIMIT) {
|
||||
throw new CorruptedStorageException(
|
||||
"Invalid compressed length " + compressedLength + " for blob " + blobIndex);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.indexLocks[blobIndex].unlockRead(segmentStamp);
|
||||
}
|
||||
|
||||
// Validate to prevent OOM from corrupted compressedLength values
|
||||
// Max reasonable compressed length ~256MB, prevents BitSet from allocating huge arrays
|
||||
if (compressedLength > 0 && compressedLength < 256 * 1024 * 1024 && firstSegmentIndex > 0) {
|
||||
if (compressedLength > 0 && firstSegmentIndex > 0) {
|
||||
int segmentsCount = this.requiredSegments(BLOB_HEADER_LENGTH + compressedLength);
|
||||
int toIndex = firstSegmentIndex + segmentsCount;
|
||||
// Ensure no overflow and reasonable range
|
||||
if (segmentsCount > 0 && toIndex > firstSegmentIndex) {
|
||||
this.usedSegments.set(firstSegmentIndex, toIndex);
|
||||
}
|
||||
@@ -509,11 +530,9 @@ public class IndexedStorageFile implements Closeable {
|
||||
}
|
||||
}
|
||||
|
||||
private static final int MAX_COMPRESSED_LENGTH = 256 * 1024 * 1024; // 256MB max to prevent OOM
|
||||
|
||||
@Nonnull
|
||||
protected ByteBuffer readSegments(int firstSegmentIndex, int compressedLength) throws IOException {
|
||||
if (compressedLength <= 0 || compressedLength > MAX_COMPRESSED_LENGTH) {
|
||||
if (compressedLength <= 0 || compressedLength > MAX_COMPRESSED_LENGTH_LIMIT) {
|
||||
throw new IOException("Invalid compressed length: " + compressedLength);
|
||||
}
|
||||
if (firstSegmentIndex <= 0) {
|
||||
@@ -558,52 +577,52 @@ public class IndexedStorageFile implements Closeable {
|
||||
dest.limit(dest.position());
|
||||
dest.position(0);
|
||||
int indexPos = blobIndex * 4;
|
||||
long stamp = this.indexLocks[blobIndex].writeLock();
|
||||
|
||||
try {
|
||||
int oldSegmentLength = 0;
|
||||
int oldFirstSegmentIndex = this.mappedBlobIndexes.getInt(indexPos);
|
||||
if (oldFirstSegmentIndex != 0) {
|
||||
try {
|
||||
ByteBuffer blobHeaderBuffer = this.readBlobHeader(oldFirstSegmentIndex);
|
||||
int oldCompressedLength = blobHeaderBuffer.getInt(COMPRESSED_LENGTH_OFFSET);
|
||||
// Validate to prevent corruption from causing invalid segment ranges
|
||||
if (oldCompressedLength >= 0) {
|
||||
oldSegmentLength = this.requiredSegments(BLOB_HEADER_LENGTH + oldCompressedLength);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// Old blob header is corrupted/unreadable - skip clearing old segments
|
||||
// This leaks segments but prevents corruption from propagating
|
||||
oldSegmentLength = 0;
|
||||
}
|
||||
}
|
||||
|
||||
int firstSegmentIndex = this.writeSegments(dest);
|
||||
if (this.flushOnWrite) {
|
||||
this.fileChannel.force(false);
|
||||
}
|
||||
|
||||
this.mappedBlobIndexes.putInt(indexPos, firstSegmentIndex);
|
||||
if (this.flushOnWrite) {
|
||||
this.mappedBlobIndexes.force(indexPos, 4);
|
||||
}
|
||||
|
||||
if (oldSegmentLength > 0 && oldFirstSegmentIndex > 0) {
|
||||
int toIndex = oldFirstSegmentIndex + oldSegmentLength;
|
||||
// Validate range to prevent IndexOutOfBoundsException
|
||||
if (toIndex > oldFirstSegmentIndex) {
|
||||
long usedSegmentsStamp = this.usedSegmentsLock.writeLock();
|
||||
long stamp = this.indexLocks[blobIndex].writeLock();
|
||||
|
||||
try {
|
||||
int oldSegmentLength = 0;
|
||||
int oldFirstSegmentIndex = this.mappedBlobIndexes.getInt(indexPos);
|
||||
if (oldFirstSegmentIndex != 0) {
|
||||
try {
|
||||
this.usedSegments.clear(oldFirstSegmentIndex, toIndex);
|
||||
} finally {
|
||||
this.usedSegmentsLock.unlockWrite(usedSegmentsStamp);
|
||||
ByteBuffer blobHeaderBuffer = this.readBlobHeader(oldFirstSegmentIndex);
|
||||
int oldCompressedLength = blobHeaderBuffer.getInt(COMPRESSED_LENGTH_OFFSET);
|
||||
// Validate to prevent corruption from causing invalid segment ranges
|
||||
if (oldCompressedLength >= 0) {
|
||||
oldSegmentLength = this.requiredSegments(BLOB_HEADER_LENGTH + oldCompressedLength);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// Old blob header is corrupted/unreadable - skip clearing old segments
|
||||
// This leaks segments but prevents corruption from propagating
|
||||
oldSegmentLength = 0;
|
||||
}
|
||||
}
|
||||
|
||||
int firstSegmentIndex = this.writeSegments(dest);
|
||||
if (this.flushOnWrite) {
|
||||
this.fileChannel.force(false);
|
||||
}
|
||||
|
||||
this.mappedBlobIndexes.putInt(indexPos, firstSegmentIndex);
|
||||
if (this.flushOnWrite) {
|
||||
this.mappedBlobIndexes.force(indexPos, 4);
|
||||
}
|
||||
|
||||
if (oldSegmentLength > 0 && oldFirstSegmentIndex > 0) {
|
||||
int toIndex = oldFirstSegmentIndex + oldSegmentLength;
|
||||
// Validate range to prevent IndexOutOfBoundsException
|
||||
if (toIndex > oldFirstSegmentIndex) {
|
||||
long usedSegmentsStamp = this.usedSegmentsLock.writeLock();
|
||||
|
||||
try {
|
||||
this.usedSegments.clear(oldFirstSegmentIndex, toIndex);
|
||||
} finally {
|
||||
this.usedSegmentsLock.unlockWrite(usedSegmentsStamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.indexLocks[blobIndex].unlockWrite(stamp);
|
||||
}
|
||||
} finally {
|
||||
this.indexLocks[blobIndex].unlockWrite(stamp);
|
||||
}
|
||||
} else {
|
||||
throw new IndexOutOfBoundsException("Index out of range: " + blobIndex + " blobCount: " + this.blobCount);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user