Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public String toString() {

public final static class FateLockEntry {

private static final String DELIMITER = "_";
private static final String DELIMITER = "~";

final LockType lockType;
final FateId fateId;
Expand All @@ -87,7 +87,8 @@ private FateLockEntry(LockType lockType, FateId fateId, LockRange range) {
}

private FateLockEntry(String entry) {
var fields = entry.split(DELIMITER, 4);
var fields = entry.split(DELIMITER);
Preconditions.checkArgument(fields.length == 4, entry);
this.lockType = LockType.valueOf(fields[0]);
this.fateId = FateId.from(fields[1]);
this.range = LockRange.of(decodeRow(fields[2]), decodeRow(fields[3]));
Expand All @@ -109,23 +110,29 @@ private String encodeRow(Text row) {
if (row == null) {
return "N";
} else {
return "P" + Base64.getEncoder().encodeToString(TextUtil.getBytes(row));
return "P" + Base64.getUrlEncoder().encodeToString(TextUtil.getBytes(row));
}
}

private Text decodeRow(String enc) {
if (enc.charAt(0) == 'P') {
return new Text(Base64.getDecoder().decode(enc.substring(1)));
return new Text(Base64.getUrlDecoder().decode(enc.substring(1)));
} else if (enc.charAt(0) == 'N') {
return null;
} else {
throw new IllegalArgumentException("Unexpected prefix " + enc);
}
}

private String checkForDelimiter(String s) {
Preconditions.checkArgument(!s.contains(DELIMITER), s);
return s;
}

public String serialize() {
return lockType.name() + DELIMITER + fateId.canonical() + DELIMITER
+ encodeRow(range.getStartRow()) + DELIMITER + encodeRow(range.getEndRow());
return checkForDelimiter(lockType.name()) + DELIMITER + checkForDelimiter(fateId.canonical())
+ DELIMITER + checkForDelimiter(encodeRow(range.getStartRow())) + DELIMITER
+ checkForDelimiter(encodeRow(range.getEndRow()));
}

public static FateLockEntry from(LockType lockType, FateId fateId, LockRange range) {
Expand Down Expand Up @@ -183,19 +190,21 @@ public final static class NodeName {
public long addEntry(FateLockEntry entry) {

String dataString = entry.serialize();
Preconditions.checkState(!dataString.contains("#"));
Preconditions.checkState(!dataString.contains("#") && !dataString.contains("/"), dataString);

String newPath;
try {
while (true) {
try {
newPath =
zoo.putPersistentSequential(path + "/" + PREFIX + dataString + "#", new byte[0]);
var prefix = path + "/" + PREFIX + dataString + "#";
log.trace("Attempting to create {} in zookeeper", prefix);
newPath = zoo.putPersistentSequential(prefix, new byte[0]);
String[] parts = newPath.split("/");
String last = parts[parts.length - 1];
return new NodeName(last).sequence;
} catch (NoNodeException nne) {
// the parent does not exist so try to create it
log.trace("Attempting to create parent {}", path.toString(), nne);
zoo.putPersistentData(path.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class FateLockTest {
public void testParsing() {
var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
// ZooKeeper docs state that sequence numbers are formatted using %010d
String lockData = "WRITE_" + fateId.canonical() + "_N_N";
String lockData = "WRITE~" + fateId.canonical() + "~N~N";
var lockNode =
new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%010d", 40));
assertEquals(40, lockNode.sequence);
Expand Down