diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index c334a2d842ed..e4462e8829aa 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -398,17 +398,10 @@ public Statistics getStatistics() { } /** - * Increase the number of bytes write into the container. - * Also decrement committed bytes against the bytes written. - * @param bytes the number of bytes write into the container. + * Calculate how much committedBytes should be decremented for write. + * This is used both for decrementing on write and restoring on failure. */ - private void incrWriteBytes(long bytes) { - /* - Increase the cached Used Space in VolumeInfo as it - maybe not updated, DU or DedicatedDiskSpaceUsage runs - periodically to update the Used Space in VolumeInfo. - */ - this.getVolume().incrementUsedSpace(bytes); + private long getCommittedBytesDecrement(long bytes) { // Calculate bytes used before this write operation. // Note that getBytesUsed() already includes the 'bytes' from the current write. long bytesUsedBeforeWrite = getBytesUsed() - bytes; @@ -417,8 +410,31 @@ private void incrWriteBytes(long bytes) { if (committedSpace && availableSpaceBeforeWrite > 0) { // Decrement committed space only by the portion of the write that fits within the originally committed space, // up to maxSize - long decrement = Math.min(bytes, availableSpaceBeforeWrite); - this.getVolume().incCommittedBytes(-decrement); + return Math.min(bytes, availableSpaceBeforeWrite); + } + return 0; + } + + /** + * Increase the number of bytes write into the container. + * Also decrement committed bytes against the bytes written. + * @param bytes the number of bytes write into the container. + */ + private void incrWriteBytes(long bytes) { + long committedBytesDecrement = getCommittedBytesDecrement(bytes); + if (committedBytesDecrement > 0) { + this.getVolume().incCommittedBytes(-committedBytesDecrement); + } + } + + /** + * Restore committedBytes when a write operation fails after writeChunk succeeded. + * This undoes the committedBytes decrement done in incrWriteBytes(). + */ + public void restoreCommittedBytesOnWriteFailure(long bytes) { + long committedBytesDecrement = getCommittedBytesDecrement(bytes); + if (committedBytesDecrement > 0) { + this.getVolume().incCommittedBytes(committedBytesDecrement); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index f331db7defc3..80bcc9d831a5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -93,6 +93,7 @@ public class HddsVolume extends StorageVolume { private ContainerController controller; private final AtomicLong committedBytes = new AtomicLong(); // till Open containers become full + private final AtomicLong spaceReservedForWrites = new AtomicLong(); // for in-flight writes private Function gatherContainerUsages = (K) -> 0L; private final ConcurrentSkipListSet containerIds = new ConcurrentSkipListSet<>(); @@ -410,6 +411,33 @@ public long getCommittedBytes() { return committedBytes.get(); } + /** + * Reserve space for an in-flight write operation. + * + * @param bytes bytes to reserve + */ + public void reserveSpaceForWrite(long bytes) { + spaceReservedForWrites.addAndGet(bytes); + } + + /** + * Release space reserved for write when write completes or fails. + * + * @param bytes bytes to release + */ + public void releaseReservedSpaceForWrite(long bytes) { + spaceReservedForWrites.addAndGet(-bytes); + } + + /** + * Get the space reserved for in-flight writes. + * + * @return bytes reserved for in-flight writes + */ + public long getSpaceReservedForWrites() { + return spaceReservedForWrites.get(); + } + public long getFreeSpaceToSpare(long volumeCapacity) { return getDatanodeConfig().getMinFreeSpace(volumeCapacity); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 13bfa834dbc6..b4dc879535ca 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1037,6 +1037,11 @@ ContainerCommandResponseProto handleWriteChunk( } ContainerProtos.BlockData blockDataProto = null; + HddsVolume volume = kvContainer.getContainerData().getVolume(); + long bytesToWrite = 0; + boolean spaceReserved = false; + boolean writeChunkSucceeded = false; + try { checkContainerOpen(kvContainer); @@ -1057,9 +1062,17 @@ ContainerCommandResponseProto handleWriteChunk( ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList()); // TODO: Can improve checksum validation here. Make this one-shot after protocol change. validateChunkChecksumData(data, chunkInfo); + bytesToWrite = chunkInfo.getLen(); + + // Reserve space before writing + if (volume != null && bytesToWrite > 0) { + volume.reserveSpaceForWrite(bytesToWrite); + spaceReserved = true; + } } chunkManager .writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext); + writeChunkSucceeded = true; final boolean isCommit = dispatcherContext.getStage().isCommit(); if (isCommit && writeChunk.hasBlock()) { @@ -1086,14 +1099,19 @@ ContainerCommandResponseProto handleWriteChunk( metrics.incContainerOpsLatencies(Type.PutBlock, Time.monotonicNowNanos() - startTime); } + if (spaceReserved) { + commitSpaceReservedForWrite(volume, bytesToWrite); + } // We should increment stats after writeChunk if (isWrite) { metrics.incContainerBytesStats(Type.WriteChunk, writeChunk .getChunkData().getLen()); } } catch (StorageContainerException ex) { + releaseSpaceReservedForWrite(volume, spaceReserved, writeChunkSucceeded, bytesToWrite, kvContainer); return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { + releaseSpaceReservedForWrite(volume, spaceReserved, writeChunkSucceeded, bytesToWrite, kvContainer); return ContainerUtils.logAndReturnError(LOG, new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION), request); @@ -1102,6 +1120,29 @@ ContainerCommandResponseProto handleWriteChunk( return getWriteChunkResponseSuccess(request, blockDataProto); } + /** + * Commit space reserved for write to usedSpace when write operation succeeds. + */ + private void commitSpaceReservedForWrite(HddsVolume volume, long bytes) { + volume.incrementUsedSpace(bytes); + volume.releaseReservedSpaceForWrite(bytes); + } + + /** + * Release space reserved for write when write operation fails. + * Also restores committedBytes if it was decremented during write. + */ + private void releaseSpaceReservedForWrite(HddsVolume volume, boolean spaceReserved, + boolean writeChunkSucceeded, long bytes, KeyValueContainer kvContainer) { + if (spaceReserved) { + volume.releaseReservedSpaceForWrite(bytes); + // Only restore committedBytes if write chunk succeeded + if (writeChunkSucceeded) { + kvContainer.getContainerData().restoreCommittedBytesOnWriteFailure(bytes); + } + } + } + /** * Handle Write Chunk operation for closed container. Calls ChunkManager to process the request. */ diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 6afee1c5d77f..960008c3609d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -53,6 +53,7 @@ import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; @@ -68,6 +69,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; @@ -75,15 +77,19 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.token.TokenVerifier; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; @@ -94,18 +100,21 @@ import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner; import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.GenericTestUtils.LogCapturer; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -898,6 +907,159 @@ public void testICRsOnContainerClose(ContainerLayoutVersion containerLayoutVersi } } + /** + * Test that space tracking (usedSpace and committedBytes) is correctly + * managed during successful write operations. + */ + @Test + public void testWriteChunkSpaceTrackingSuccess() throws Exception { + final long containerID = 1L; + final String testDir = tempDir.toString(); + final String clusterId = UUID.randomUUID().toString(); + final String datanodeId = UUID.randomUUID().toString(); + OzoneConfiguration testConf = new OzoneConfiguration(); + final ContainerSet containerSet = newContainerSet(); + final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + + HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(testConf) + .clusterID(clusterId).datanodeUuid(datanodeId) + .volumeSet(volumeSet) + .build(); + hddsVolume.format(clusterId); + hddsVolume.createWorkingDir(clusterId, null); + hddsVolume.createTmpDirs(clusterId); + + when(volumeSet.getVolumesList()) + .thenReturn(Collections.singletonList(hddsVolume)); + + final ContainerMetrics metrics = ContainerMetrics.create(testConf); + final KeyValueHandler kvHandler = new KeyValueHandler(testConf, + datanodeId, containerSet, volumeSet, metrics, + c -> { }, new ContainerChecksumTreeManager(testConf)); + kvHandler.setClusterID(clusterId); + + final ContainerCommandRequestProto createContainer = + createContainerRequest(datanodeId, containerID); + kvHandler.handleCreateContainer(createContainer, null); + + KeyValueContainer container = (KeyValueContainer) containerSet.getContainer(containerID); + assertNotNull(container); + + long initialUsedSpace = hddsVolume.getCurrentUsage().getUsedSpace(); + long initialCommittedBytes = hddsVolume.getCommittedBytes(); + long initialReservedSpace = hddsVolume.getSpaceReservedForWrites(); + + long chunkSize = 1024 * 1024; // 1MB + ContainerCommandRequestProto writeRequest = + createWriteChunkRequest(datanodeId, chunkSize); + ContainerProtos.ContainerCommandResponseProto response = + kvHandler.handleWriteChunk(writeRequest, container, null); + assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + + long finalUsedSpace = hddsVolume.getCurrentUsage().getUsedSpace(); + long finalCommittedBytes = hddsVolume.getCommittedBytes(); + long finalReservedSpace = hddsVolume.getSpaceReservedForWrites(); + + assertEquals(initialUsedSpace + chunkSize, finalUsedSpace, + "usedSpace should be incremented by chunk size after successful write"); + assertTrue(finalCommittedBytes < initialCommittedBytes, + "committedBytes should be decremented after successful write"); + assertEquals(initialReservedSpace, finalReservedSpace, + "spaceReservedForWrites should be back to initial value after successful write"); + } + + /** + * Test that space tracking is correctly rolled back when write operation fails. + * This test uses reflection to mock the ChunkManager and inject a failure during + * writeChunk(), which happens AFTER space is reserved. This verifies that: + * 1. usedSpace remains unchanged (never incremented on failure) + * 2. spaceReservedForWrites is released (incremented then decremented back) + * 3. committedBytes is restored (decremented by writeChunk, then incremented back on rollback) + */ + @Test + public void testWriteChunkSpaceTrackingFailure() throws Exception { + final long containerID = 1L; + final String testDir = tempDir.toString(); + final String clusterId = UUID.randomUUID().toString(); + final String datanodeId = UUID.randomUUID().toString(); + OzoneConfiguration testConf = new OzoneConfiguration(); + final ContainerSet containerSet = newContainerSet(); + final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + + HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(testConf) + .clusterID(clusterId).datanodeUuid(datanodeId) + .volumeSet(volumeSet) + .build(); + hddsVolume.format(clusterId); + hddsVolume.createWorkingDir(clusterId, null); + hddsVolume.createTmpDirs(clusterId); + + when(volumeSet.getVolumesList()) + .thenReturn(Collections.singletonList(hddsVolume)); + + final ContainerMetrics metrics = ContainerMetrics.create(testConf); + final KeyValueHandler kvHandler = new KeyValueHandler(testConf, + datanodeId, containerSet, volumeSet, metrics, + c -> { }, new ContainerChecksumTreeManager(testConf)); + kvHandler.setClusterID(clusterId); + + final ContainerCommandRequestProto createContainer = + createContainerRequest(datanodeId, containerID); + kvHandler.handleCreateContainer(createContainer, null); + + KeyValueContainer container = (KeyValueContainer) containerSet.getContainer(containerID); + assertNotNull(container); + + long initialUsedSpace = hddsVolume.getCurrentUsage().getUsedSpace(); + long initialCommittedBytes = hddsVolume.getCommittedBytes(); + long initialReservedSpace = hddsVolume.getSpaceReservedForWrites(); + + // Use reflection to replace the chunkManager in the handler with a spy, + // so we can inject a failure during writeChunk() + Field chunkManagerField = KeyValueHandler.class.getDeclaredField("chunkManager"); + chunkManagerField.setAccessible(true); + ChunkManager originalChunkManager = (ChunkManager) chunkManagerField.get(kvHandler); + ChunkManager spyChunkManager = spy(originalChunkManager); + + // Configure the spy to throw an IOException on writeChunk call + doAnswer(invocation -> { + throw new IOException("Simulated disk write failure"); + }).when(spyChunkManager).writeChunk( + any(Container.class), + any(BlockID.class), + any(ChunkInfo.class), + any(ChunkBuffer.class), + any(DispatcherContext.class)); + + chunkManagerField.set(kvHandler, spyChunkManager); + + try { + // Attempt to write a chunk - should fail during chunkManager.writeChunk() + // but AFTER space has been reserved + long chunkSize = 1024 * 1024; // 1MB + ContainerCommandRequestProto writeRequest = + createWriteChunkRequest(datanodeId, chunkSize); + ContainerProtos.ContainerCommandResponseProto response = + kvHandler.handleWriteChunk(writeRequest, container, null); + + assertNotEquals(ContainerProtos.Result.SUCCESS, response.getResult(), + "Write should fail due to injected IOException"); + + long finalUsedSpace = hddsVolume.getCurrentUsage().getUsedSpace(); + long finalCommittedBytes = hddsVolume.getCommittedBytes(); + long finalReservedSpace = hddsVolume.getSpaceReservedForWrites(); + + assertEquals(initialUsedSpace, finalUsedSpace, + "usedSpace should remain unchanged after failed write"); + assertEquals(initialCommittedBytes, finalCommittedBytes, + "committedBytes should remain unchanged after failed write (decremented then restored)"); + assertEquals(initialReservedSpace, finalReservedSpace, + "spaceReservedForWrites should be back to initial value after failed write"); + } finally { + chunkManagerField.set(kvHandler, originalChunkManager); + } + } + private static ContainerCommandRequestProto createContainerRequest( String datanodeId, long containerID) { return ContainerCommandRequestProto.newBuilder() @@ -938,4 +1100,33 @@ private KeyValueHandler createKeyValueHandler(Path path) throws IOException { return kvHandler; } + + /** + * Helper method to create a WriteChunk request for testing. + */ + private ContainerCommandRequestProto createWriteChunkRequest( + String datanodeId, long chunkSize) { + final long containerID = 1L; + final long localID = 1L; + ByteString data = ByteString.copyFrom(new byte[(int) chunkSize]); + + ContainerProtos.ChunkInfo chunk = ContainerProtos.ChunkInfo.newBuilder() + .setChunkName(localID + "_chunk_1") + .setOffset(0) + .setLen(data.size()) + .setChecksumData(Checksum.getNoChecksumDataProto()) + .build(); + + WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto.newBuilder() + .setBlockID(new BlockID(containerID, localID).getDatanodeBlockIDProtobuf()) + .setChunkData(chunk) + .setData(data); + + return ContainerCommandRequestProto.newBuilder() + .setContainerID(containerID) + .setCmdType(ContainerProtos.Type.WriteChunk) + .setDatanodeUuid(datanodeId) + .setWriteChunk(writeChunkRequest) + .build(); + } }