src: update IndexedStorageChunkStorageProvider
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
package com.hypixel.hytale.server.core.universe.world.storage.provider;
|
package com.hypixel.hytale.server.core.universe.world.storage.provider;
|
||||||
|
|
||||||
import com.hypixel.fastutil.longs.Long2ObjectConcurrentHashMap;
|
import com.hypixel.fastutil.longs.Long2ObjectConcurrentHashMap;
|
||||||
|
import com.hypixel.hytale.logger.HytaleLogger;
|
||||||
import com.hypixel.hytale.codec.Codec;
|
import com.hypixel.hytale.codec.Codec;
|
||||||
import com.hypixel.hytale.codec.KeyedCodec;
|
import com.hypixel.hytale.codec.KeyedCodec;
|
||||||
import com.hypixel.hytale.codec.builder.BuilderCodec;
|
import com.hypixel.hytale.codec.builder.BuilderCodec;
|
||||||
@@ -42,7 +43,14 @@ import java.nio.file.Path;
|
|||||||
import java.nio.file.StandardCopyOption;
|
import java.nio.file.StandardCopyOption;
|
||||||
import java.nio.file.StandardOpenOption;
|
import java.nio.file.StandardOpenOption;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.logging.Level;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
public class IndexedStorageChunkStorageProvider implements IChunkStorageProvider {
|
public class IndexedStorageChunkStorageProvider implements IChunkStorageProvider {
|
||||||
@@ -118,11 +126,178 @@ public class IndexedStorageChunkStorageProvider implements IChunkStorageProvider
|
|||||||
IndexedStorageChunkStorageProvider.IndexedStorageCache.CacheEntryMetricData[]::new
|
IndexedStorageChunkStorageProvider.IndexedStorageCache.CacheEntryMetricData[]::new
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
private static final HytaleLogger LOGGER = HytaleLogger.forEnclosingClass();
|
||||||
|
|
||||||
|
/** How long a region file can be idle before being closed (in milliseconds) */
|
||||||
|
private static final long IDLE_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(5);
|
||||||
|
/** How often to check for idle region files (in milliseconds) */
|
||||||
|
private static final long CLEANUP_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrapper for IndexedStorageFile that tracks usage for async-safe cleanup.
|
||||||
|
*/
|
||||||
|
private static class CachedFile {
|
||||||
|
final IndexedStorageFile file;
|
||||||
|
final AtomicInteger activeOps = new AtomicInteger(0);
|
||||||
|
volatile long lastAccessTime;
|
||||||
|
volatile boolean markedForClose = false;
|
||||||
|
|
||||||
|
CachedFile(IndexedStorageFile file) {
|
||||||
|
this.file = file;
|
||||||
|
this.lastAccessTime = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
void recordAccess() {
|
||||||
|
this.lastAccessTime = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquire a reference for an operation. Returns false if file is marked for close.
|
||||||
|
*/
|
||||||
|
boolean acquire() {
|
||||||
|
if (markedForClose) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
activeOps.incrementAndGet();
|
||||||
|
// Double-check after increment
|
||||||
|
if (markedForClose) {
|
||||||
|
activeOps.decrementAndGet();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
recordAccess();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void release() {
|
||||||
|
activeOps.decrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isIdle(long threshold) {
|
||||||
|
return lastAccessTime < threshold && activeOps.get() == 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private final Long2ObjectConcurrentHashMap<IndexedStorageFile> cache = new Long2ObjectConcurrentHashMap<>(true, ChunkUtil.NOT_FOUND);
|
private final Long2ObjectConcurrentHashMap<IndexedStorageFile> cache = new Long2ObjectConcurrentHashMap<>(true, ChunkUtil.NOT_FOUND);
|
||||||
|
private final Map<Long, CachedFile> trackedFiles = new ConcurrentHashMap<>();
|
||||||
private Path path;
|
private Path path;
|
||||||
|
private ScheduledExecutorService cleanupExecutor;
|
||||||
|
|
||||||
public IndexedStorageCache() {
|
public IndexedStorageCache() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void startCleanupTask() {
|
||||||
|
if (cleanupExecutor != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOGGER.at(Level.INFO).log("Starting region file cleanup task (idle timeout: %ds, interval: %ds)",
|
||||||
|
IDLE_TIMEOUT_MS / 1000, CLEANUP_INTERVAL_MS / 1000);
|
||||||
|
cleanupExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
|
||||||
|
Thread t = new Thread(r, "IndexedStorageCache-Cleanup");
|
||||||
|
t.setDaemon(true);
|
||||||
|
return t;
|
||||||
|
});
|
||||||
|
cleanupExecutor.scheduleAtFixedRate(() -> {
|
||||||
|
try {
|
||||||
|
cleanupIdleFiles();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.at(Level.SEVERE).withCause(e).log("Region cleanup task failed");
|
||||||
|
}
|
||||||
|
}, CLEANUP_INTERVAL_MS, CLEANUP_INTERVAL_MS, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanupIdleFiles() {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
long threshold = now - IDLE_TIMEOUT_MS;
|
||||||
|
int totalFiles = trackedFiles.size();
|
||||||
|
int idleCount = 0;
|
||||||
|
|
||||||
|
LOGGER.at(Level.INFO).log("Running region cleanup check: %d tracked files", totalFiles);
|
||||||
|
|
||||||
|
for (Map.Entry<Long, CachedFile> entry : trackedFiles.entrySet()) {
|
||||||
|
long regionKey = entry.getKey();
|
||||||
|
CachedFile cached = entry.getValue();
|
||||||
|
|
||||||
|
long idleTimeMs = now - cached.lastAccessTime;
|
||||||
|
int activeOps = cached.activeOps.get();
|
||||||
|
|
||||||
|
if (cached.isIdle(threshold)) {
|
||||||
|
idleCount++;
|
||||||
|
// Mark for close first - prevents new acquisitions
|
||||||
|
cached.markedForClose = true;
|
||||||
|
|
||||||
|
// Double-check no operations started between isIdle check and marking
|
||||||
|
if (cached.activeOps.get() > 0) {
|
||||||
|
cached.markedForClose = false;
|
||||||
|
LOGGER.at(Level.INFO).log("Region cleanup skipped - ops started during close");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Safe to close now
|
||||||
|
cache.remove(regionKey);
|
||||||
|
trackedFiles.remove(regionKey);
|
||||||
|
|
||||||
|
int regionX = ChunkUtil.xOfChunkIndex(regionKey);
|
||||||
|
int regionZ = ChunkUtil.zOfChunkIndex(regionKey);
|
||||||
|
try {
|
||||||
|
cached.file.close();
|
||||||
|
LOGGER.at(Level.INFO).log("%d,%d region unloaded", regionX, regionZ);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOGGER.at(Level.WARNING).withCause(e).log("Failed to close idle region file %d.%d", regionX, regionZ);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOGGER.at(Level.INFO).log("Region %d not idle: lastAccess=%dms ago, activeOps=%d, threshold=%dms",
|
||||||
|
regionKey, idleTimeMs, activeOps, IDLE_TIMEOUT_MS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (totalFiles > 0) {
|
||||||
|
LOGGER.at(Level.INFO).log("Region cleanup complete: %d/%d files were idle", idleCount, totalFiles);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void trackFile(long regionKey, IndexedStorageFile file) {
|
||||||
|
trackedFiles.computeIfAbsent(regionKey, k -> new CachedFile(file));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void recordAccess(long regionKey) {
|
||||||
|
CachedFile cached = trackedFiles.get(regionKey);
|
||||||
|
if (cached != null) {
|
||||||
|
cached.recordAccess();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquire a file for use. Must call releaseFile() when done.
|
||||||
|
* Returns null if file doesn't exist or is being closed.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
public IndexedStorageFile acquireFile(int regionX, int regionZ, boolean flushOnWrite) {
|
||||||
|
long regionKey = ChunkUtil.indexChunk(regionX, regionZ);
|
||||||
|
IndexedStorageFile file = getOrTryOpen(regionX, regionZ, flushOnWrite);
|
||||||
|
if (file == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
CachedFile cached = trackedFiles.get(regionKey);
|
||||||
|
if (cached != null && cached.acquire()) {
|
||||||
|
return file;
|
||||||
|
}
|
||||||
|
|
||||||
|
// File is being closed, retry to get a fresh one
|
||||||
|
return getOrTryOpen(regionX, regionZ, flushOnWrite);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release a file after use.
|
||||||
|
*/
|
||||||
|
public void releaseFile(int regionX, int regionZ) {
|
||||||
|
long regionKey = ChunkUtil.indexChunk(regionX, regionZ);
|
||||||
|
CachedFile cached = trackedFiles.get(regionKey);
|
||||||
|
if (cached != null) {
|
||||||
|
cached.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static ResourceType<ChunkStore, IndexedStorageChunkStorageProvider.IndexedStorageCache> getResourceType() {
|
public static ResourceType<ChunkStore, IndexedStorageChunkStorageProvider.IndexedStorageCache> getResourceType() {
|
||||||
return Universe.get().getIndexedStorageCacheResourceType();
|
return Universe.get().getIndexedStorageCacheResourceType();
|
||||||
@@ -135,6 +310,17 @@ public class IndexedStorageChunkStorageProvider implements IChunkStorageProvider
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
// Shutdown cleanup task first
|
||||||
|
if (cleanupExecutor != null) {
|
||||||
|
cleanupExecutor.shutdown();
|
||||||
|
try {
|
||||||
|
cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
cleanupExecutor = null;
|
||||||
|
}
|
||||||
|
|
||||||
IOException exception = null;
|
IOException exception = null;
|
||||||
Iterator<IndexedStorageFile> iterator = this.cache.values().iterator();
|
Iterator<IndexedStorageFile> iterator = this.cache.values().iterator();
|
||||||
|
|
||||||
@@ -150,6 +336,8 @@ public class IndexedStorageChunkStorageProvider implements IChunkStorageProvider
|
|||||||
exception.addSuppressed(var4);
|
exception.addSuppressed(var4);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trackedFiles.clear();
|
||||||
|
|
||||||
if (exception != null) {
|
if (exception != null) {
|
||||||
throw exception;
|
throw exception;
|
||||||
@@ -158,7 +346,9 @@ public class IndexedStorageChunkStorageProvider implements IChunkStorageProvider
|
|||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public IndexedStorageFile getOrTryOpen(int regionX, int regionZ, boolean flushOnWrite) {
|
public IndexedStorageFile getOrTryOpen(int regionX, int regionZ, boolean flushOnWrite) {
|
||||||
return this.cache.computeIfAbsent(ChunkUtil.indexChunk(regionX, regionZ), k -> {
|
long regionKey = ChunkUtil.indexChunk(regionX, regionZ);
|
||||||
|
IndexedStorageFile file = this.cache.computeIfAbsent(regionKey, k -> {
|
||||||
|
startCleanupTask();
|
||||||
Path regionFile = this.path.resolve(IndexedStorageChunkStorageProvider.toFileName(regionX, regionZ));
|
Path regionFile = this.path.resolve(IndexedStorageChunkStorageProvider.toFileName(regionX, regionZ));
|
||||||
if (!Files.exists(regionFile)) {
|
if (!Files.exists(regionFile)) {
|
||||||
return null;
|
return null;
|
||||||
@@ -171,18 +361,23 @@ public class IndexedStorageChunkStorageProvider implements IChunkStorageProvider
|
|||||||
return null;
|
return null;
|
||||||
} catch (Exception var9) {
|
} catch (Exception var9) {
|
||||||
// Corruption detected - rename file and return null to trigger regeneration
|
// Corruption detected - rename file and return null to trigger regeneration
|
||||||
System.err.println("[IndexedStorageCache] Corrupted region file detected: " + regionFile + " - " + var9.getMessage());
|
LOGGER.at(Level.SEVERE).withCause(var9).log("Corrupted region file detected: %s", regionFile);
|
||||||
try {
|
try {
|
||||||
Path corruptedPath = regionFile.resolveSibling(regionFile.getFileName() + ".corrupted");
|
Path corruptedPath = regionFile.resolveSibling(regionFile.getFileName() + ".corrupted");
|
||||||
Files.move(regionFile, corruptedPath, StandardCopyOption.REPLACE_EXISTING);
|
Files.move(regionFile, corruptedPath, StandardCopyOption.REPLACE_EXISTING);
|
||||||
System.err.println("[IndexedStorageCache] Renamed to: " + corruptedPath);
|
LOGGER.at(Level.WARNING).log("Renamed corrupted file to: %s", corruptedPath);
|
||||||
} catch (IOException moveErr) {
|
} catch (IOException moveErr) {
|
||||||
System.err.println("[IndexedStorageCache] Failed to rename corrupted file: " + moveErr.getMessage());
|
LOGGER.at(Level.SEVERE).withCause(moveErr).log("Failed to rename corrupted file: %s", regionFile);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
if (file != null) {
|
||||||
|
trackFile(regionKey, file);
|
||||||
|
recordAccess(regionKey);
|
||||||
|
}
|
||||||
|
return file;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -192,6 +387,10 @@ public class IndexedStorageChunkStorageProvider implements IChunkStorageProvider
|
|||||||
public void markRegionCorrupted(int regionX, int regionZ) {
|
public void markRegionCorrupted(int regionX, int regionZ) {
|
||||||
long key = ChunkUtil.indexChunk(regionX, regionZ);
|
long key = ChunkUtil.indexChunk(regionX, regionZ);
|
||||||
IndexedStorageFile file = this.cache.remove(key);
|
IndexedStorageFile file = this.cache.remove(key);
|
||||||
|
CachedFile cached = trackedFiles.remove(key);
|
||||||
|
if (cached != null) {
|
||||||
|
cached.markedForClose = true;
|
||||||
|
}
|
||||||
if (file != null) {
|
if (file != null) {
|
||||||
try {
|
try {
|
||||||
file.close();
|
file.close();
|
||||||
@@ -204,16 +403,18 @@ public class IndexedStorageChunkStorageProvider implements IChunkStorageProvider
|
|||||||
try {
|
try {
|
||||||
Path corruptedPath = regionFile.resolveSibling(regionFile.getFileName() + ".corrupted");
|
Path corruptedPath = regionFile.resolveSibling(regionFile.getFileName() + ".corrupted");
|
||||||
Files.move(regionFile, corruptedPath, StandardCopyOption.REPLACE_EXISTING);
|
Files.move(regionFile, corruptedPath, StandardCopyOption.REPLACE_EXISTING);
|
||||||
System.err.println("[IndexedStorageCache] Marked region as corrupted: " + corruptedPath);
|
LOGGER.at(Level.WARNING).log("Marked region %d.%d as corrupted: %s", regionX, regionZ, corruptedPath);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
System.err.println("[IndexedStorageCache] Failed to mark region as corrupted: " + e.getMessage());
|
LOGGER.at(Level.SEVERE).withCause(e).log("Failed to mark region %d.%d as corrupted", regionX, regionZ);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
public IndexedStorageFile getOrCreate(int regionX, int regionZ, boolean flushOnWrite) {
|
public IndexedStorageFile getOrCreate(int regionX, int regionZ, boolean flushOnWrite) {
|
||||||
return this.cache.computeIfAbsent(ChunkUtil.indexChunk(regionX, regionZ), k -> {
|
long regionKey = ChunkUtil.indexChunk(regionX, regionZ);
|
||||||
|
IndexedStorageFile file = this.cache.computeIfAbsent(regionKey, k -> {
|
||||||
|
startCleanupTask();
|
||||||
try {
|
try {
|
||||||
if (!Files.exists(this.path)) {
|
if (!Files.exists(this.path)) {
|
||||||
try {
|
try {
|
||||||
@@ -230,6 +431,9 @@ public class IndexedStorageChunkStorageProvider implements IChunkStorageProvider
|
|||||||
throw SneakyThrow.sneakyThrow(var9);
|
throw SneakyThrow.sneakyThrow(var9);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
trackFile(regionKey, file);
|
||||||
|
recordAccess(regionKey);
|
||||||
|
return file;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
|
|||||||
Reference in New Issue
Block a user