Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,17 +398,10 @@ public Statistics getStatistics() {
}

/**
* Increase the number of bytes write into the container.
* Also decrement committed bytes against the bytes written.
* @param bytes the number of bytes write into the container.
* Calculate how much committedBytes should be decremented for write.
* This is used both for decrementing on write and restoring on failure.
*/
private void incrWriteBytes(long bytes) {
/*
Increase the cached Used Space in VolumeInfo as it
maybe not updated, DU or DedicatedDiskSpaceUsage runs
periodically to update the Used Space in VolumeInfo.
*/
this.getVolume().incrementUsedSpace(bytes);
private long getCommittedBytesDecrement(long bytes) {
// Calculate bytes used before this write operation.
// Note that getBytesUsed() already includes the 'bytes' from the current write.
long bytesUsedBeforeWrite = getBytesUsed() - bytes;
Expand All @@ -417,8 +410,31 @@ private void incrWriteBytes(long bytes) {
if (committedSpace && availableSpaceBeforeWrite > 0) {
// Decrement committed space only by the portion of the write that fits within the originally committed space,
// up to maxSize
long decrement = Math.min(bytes, availableSpaceBeforeWrite);
this.getVolume().incCommittedBytes(-decrement);
return Math.min(bytes, availableSpaceBeforeWrite);
}
return 0;
}

/**
* Increase the number of bytes write into the container.
* Also decrement committed bytes against the bytes written.
* @param bytes the number of bytes write into the container.
*/
private void incrWriteBytes(long bytes) {
long committedBytesDecrement = getCommittedBytesDecrement(bytes);
if (committedBytesDecrement > 0) {
this.getVolume().incCommittedBytes(-committedBytesDecrement);
}
}

/**
* Restore committedBytes when a write operation fails after writeChunk succeeded.
* This undoes the committedBytes decrement done in incrWriteBytes().
*/
public void restoreCommittedBytesOnWriteFailure(long bytes) {
long committedBytesDecrement = getCommittedBytesDecrement(bytes);
if (committedBytesDecrement > 0) {
this.getVolume().incCommittedBytes(committedBytesDecrement);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class HddsVolume extends StorageVolume {
private ContainerController controller;

private final AtomicLong committedBytes = new AtomicLong(); // till Open containers become full
private final AtomicLong spaceReservedForWrites = new AtomicLong(); // for in-flight writes
private Function<HddsVolume, Long> gatherContainerUsages = (K) -> 0L;

private final ConcurrentSkipListSet<Long> containerIds = new ConcurrentSkipListSet<>();
Expand Down Expand Up @@ -410,6 +411,33 @@ public long getCommittedBytes() {
return committedBytes.get();
}

/**
* Reserve space for an in-flight write operation.
*
* @param bytes bytes to reserve
*/
public void reserveSpaceForWrite(long bytes) {
spaceReservedForWrites.addAndGet(bytes);
}

/**
* Release space reserved for write when write completes or fails.
*
* @param bytes bytes to release
*/
public void releaseReservedSpaceForWrite(long bytes) {
spaceReservedForWrites.addAndGet(-bytes);
}

/**
* Get the space reserved for in-flight writes.
*
* @return bytes reserved for in-flight writes
*/
public long getSpaceReservedForWrites() {
return spaceReservedForWrites.get();
}

public long getFreeSpaceToSpare(long volumeCapacity) {
return getDatanodeConfig().getMinFreeSpace(volumeCapacity);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,11 @@ ContainerCommandResponseProto handleWriteChunk(
}

ContainerProtos.BlockData blockDataProto = null;
HddsVolume volume = kvContainer.getContainerData().getVolume();
long bytesToWrite = 0;
boolean spaceReserved = false;
boolean writeChunkSucceeded = false;

try {
checkContainerOpen(kvContainer);

Expand All @@ -1057,9 +1062,17 @@ ContainerCommandResponseProto handleWriteChunk(
ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList());
// TODO: Can improve checksum validation here. Make this one-shot after protocol change.
validateChunkChecksumData(data, chunkInfo);
bytesToWrite = chunkInfo.getLen();

// Reserve space before writing
if (volume != null && bytesToWrite > 0) {
volume.reserveSpaceForWrite(bytesToWrite);
spaceReserved = true;
}
}
chunkManager
.writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext);
writeChunkSucceeded = true;

final boolean isCommit = dispatcherContext.getStage().isCommit();
if (isCommit && writeChunk.hasBlock()) {
Expand All @@ -1086,14 +1099,19 @@ ContainerCommandResponseProto handleWriteChunk(
metrics.incContainerOpsLatencies(Type.PutBlock, Time.monotonicNowNanos() - startTime);
}

if (spaceReserved) {
commitSpaceReservedForWrite(volume, bytesToWrite);
}
// We should increment stats after writeChunk
if (isWrite) {
metrics.incContainerBytesStats(Type.WriteChunk, writeChunk
.getChunkData().getLen());
}
} catch (StorageContainerException ex) {
releaseSpaceReservedForWrite(volume, spaceReserved, writeChunkSucceeded, bytesToWrite, kvContainer);
return ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ex) {
releaseSpaceReservedForWrite(volume, spaceReserved, writeChunkSucceeded, bytesToWrite, kvContainer);
return ContainerUtils.logAndReturnError(LOG,
new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION),
request);
Expand All @@ -1102,6 +1120,29 @@ ContainerCommandResponseProto handleWriteChunk(
return getWriteChunkResponseSuccess(request, blockDataProto);
}

/**
* Commit space reserved for write to usedSpace when write operation succeeds.
*/
private void commitSpaceReservedForWrite(HddsVolume volume, long bytes) {
volume.incrementUsedSpace(bytes);
volume.releaseReservedSpaceForWrite(bytes);
}

/**
* Release space reserved for write when write operation fails.
* Also restores committedBytes if it was decremented during write.
*/
private void releaseSpaceReservedForWrite(HddsVolume volume, boolean spaceReserved,
boolean writeChunkSucceeded, long bytes, KeyValueContainer kvContainer) {
if (spaceReserved) {
volume.releaseReservedSpaceForWrite(bytes);
// Only restore committedBytes if write chunk succeeded
if (writeChunkSucceeded) {
kvContainer.getContainerData().restoreCommittedBytesOnWriteFailure(bytes);
}
}
}

/**
* Handle Write Chunk operation for closed container. Calls ChunkManager to process the request.
*/
Expand Down
Loading