Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c03436e
Mute broken test
serbel324 Oct 9, 2025
fc1a7ff
Merge branch 'ydb-platform:main' into main
serbel324 Oct 10, 2025
57845e2
Merge branch 'ydb-platform:main' into main
serbel324 Oct 10, 2025
85f212d
Merge branch 'ydb-platform:main' into main
serbel324 Oct 14, 2025
c88e723
Merge branch 'ydb-platform:main' into main
serbel324 Oct 21, 2025
c8dae01
Merge branch 'ydb-platform:main' into main
serbel324 Oct 22, 2025
28bb43b
Merge branch 'ydb-platform:main' into main
serbel324 Oct 30, 2025
aaaf09a
Merge branch 'ydb-platform:main' into main
serbel324 Nov 5, 2025
d6a05be
Merge branch 'ydb-platform:main' into main
serbel324 Nov 11, 2025
41aceb1
Merge branch 'ydb-platform:main' into main
serbel324 Nov 12, 2025
379c3e1
Merge branch 'ydb-platform:main' into main
serbel324 Nov 15, 2025
4415f7e
Merge branch 'ydb-platform:main' into main
serbel324 Nov 17, 2025
6e9b82c
Merge branch 'ydb-platform:main' into main
serbel324 Nov 18, 2025
838aded
Merge branch 'ydb-platform:main' into main
serbel324 Nov 26, 2025
3635fa6
Merge branch 'ydb-platform:main' into main
serbel324 Nov 27, 2025
356a16d
Merge branch 'ydb-platform:main' into main
serbel324 Nov 27, 2025
857e2e5
Merge branch 'ydb-platform:main' into main
serbel324 Dec 3, 2025
9185bc3
Merge branch 'ydb-platform:main' into main
serbel324 Dec 3, 2025
50130fc
Merge branch 'ydb-platform:main' into main
serbel324 Dec 4, 2025
91ffdc1
Merge branch 'ydb-platform:main' into main
serbel324 Dec 8, 2025
94e2813
Merge branch 'ydb-platform:main' into main
serbel324 Dec 8, 2025
9e849d1
Merge branch 'ydb-platform:main' into main
serbel324 Dec 9, 2025
f1f9742
Merge branch 'ydb-platform:main' into main
serbel324 Dec 9, 2025
f94a180
Merge branch 'ydb-platform:main' into main
serbel324 Dec 10, 2025
c868ac0
Merge branch 'ydb-platform:main' into main
serbel324 Dec 11, 2025
e0b25c7
Merge branch 'ydb-platform:main' into main
serbel324 Dec 11, 2025
0db239e
Merge branch 'ydb-platform:main' into main
serbel324 Dec 12, 2025
02888e9
Add TParameters constructor for TEvPut
serbel324 Dec 12, 2025
218ab2e
Fix UT build
serbel324 Dec 12, 2025
f5fb607
Cancel request on actor death
serbel324 Dec 15, 2025
cf7a029
Fix BlobId reference
serbel324 Dec 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 70 additions & 36 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "defs.h"

#include "blobstorage_pdisk_category.h"
#include "blobstorage_relevance.h"
#include "boot_type.h"
#include "events.h"
#include "tablet_types.h"
Expand Down Expand Up @@ -39,10 +40,6 @@ static constexpr ui64 MaxCollectGarbageFlagsPerMessage = 10000;
static constexpr TDuration VDiskCooldownTimeout = TDuration::Seconds(15);
static constexpr TDuration VDiskCooldownTimeoutOnProxy = TDuration::Seconds(12);

struct TMessageRelevanceTracker {};
using TMessageRelevanceOwner = std::shared_ptr<TMessageRelevanceTracker>;
using TMessageRelevanceWatcher = std::weak_ptr<TMessageRelevanceTracker>;

struct TStorageStatusFlags {
ui32 Raw = 0;

Expand Down Expand Up @@ -1076,6 +1073,17 @@ struct TEvBlobStorage {
std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; // (TabletId, Generation) pairs
std::optional<TMessageRelevanceWatcher> ExternalRelevanceWatcher;

struct TParameters {
TLogoBlobID BlobId;
TRope Buffer;
TInstant Deadline;
NKikimrBlobStorage::EPutHandleClass HandleClass = NKikimrBlobStorage::TabletLog;
ETactic Tactic = TacticDefault;
bool IssueKeepFlag = false;
bool IgnoreBlock = false;
std::optional<TMessageRelevanceWatcher> ExternalRelevanceWatcher = std::nullopt;
};

TEvPut(TCloneEventPolicy, const TEvPut& origin)
: Id(origin.Id)
, Buffer(origin.Buffer)
Expand All @@ -1088,58 +1096,84 @@ struct TEvBlobStorage {
, ExternalRelevanceWatcher(origin.ExternalRelevanceWatcher)
{}

TEvPut(const TLogoBlobID &id, TRope &&buffer, TInstant deadline,
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
ETactic tactic = TacticDefault, bool issueKeepFlag = false, bool ignoreBlock = false,
std::optional<TMessageRelevanceWatcher> externalRelevanceWatcher = std::nullopt)
: Id(id)
, Buffer(std::move(buffer))
, Deadline(deadline)
, HandleClass(handleClass)
, Tactic(tactic)
, IssueKeepFlag(issueKeepFlag)
, IgnoreBlock(ignoreBlock)
, ExternalRelevanceWatcher(externalRelevanceWatcher)
TEvPut(TParameters parameters)
: Id(parameters.BlobId)
, Buffer(std::move(parameters.Buffer))
, Deadline(parameters.Deadline)
, HandleClass(parameters.HandleClass)
, Tactic(parameters.Tactic)
, IssueKeepFlag(parameters.IssueKeepFlag)
, IgnoreBlock(parameters.IgnoreBlock)
, ExternalRelevanceWatcher(std::move(parameters.ExternalRelevanceWatcher))
{
Y_ABORT_UNLESS(Id, "EvPut invalid: LogoBlobId must have non-zero tablet field, id# %s", Id.ToString().c_str());
Y_ABORT_UNLESS(Buffer.size() < (40 * 1024 * 1024),
"EvPut invalid: LogoBlobId# %s buffer.Size# %zu",
id.ToString().data(), Buffer.size());
Y_ABORT_UNLESS(Buffer.size() == id.BlobSize(),
Id.ToString().data(), Buffer.size());
Y_ABORT_UNLESS(Buffer.size() == Id.BlobSize(),
"EvPut invalid: LogoBlobId# %s buffer.Size# %zu",
id.ToString().data(), Buffer.size());
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&id, sizeof(id));
Id.ToString().data(), Buffer.size());
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&Id, sizeof(Id));
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(Buffer.GetContiguousSpan().Data(), Buffer.size());
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&deadline, sizeof(deadline));
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&handleClass, sizeof(handleClass));
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&tactic, sizeof(tactic));
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&Deadline, sizeof(Deadline));
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&HandleClass, sizeof(HandleClass));
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&Tactic, sizeof(Tactic));
}


TEvPut(const TLogoBlobID &id, TRope &&buffer, TInstant deadline,
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
ETactic tactic = TacticDefault, bool issueKeepFlag = false, bool ignoreBlock = false)
: TEvPut(TParameters{
.BlobId = id,
.Buffer = std::move(buffer),
.Deadline = deadline,
.HandleClass = handleClass,
.Tactic = tactic,
.IssueKeepFlag = issueKeepFlag,
.IgnoreBlock = ignoreBlock,
})
{}

TEvPut(const TLogoBlobID &id, TRcBuf &&buffer, TInstant deadline,
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
ETactic tactic = TacticDefault, bool issueKeepFlag = false,
std::optional<TMessageRelevanceWatcher> externalRelevanceWatcher = std::nullopt)
: TEvPut(id, TRope(std::move(buffer)), deadline, handleClass, tactic, issueKeepFlag,
/*ignoreBlock=*/false, std::move(externalRelevanceWatcher))
ETactic tactic = TacticDefault, bool issueKeepFlag = false)
: TEvPut(TParameters{
.BlobId = id,
.Buffer = TRope(std::move(buffer)),
.Deadline = deadline,
.HandleClass = handleClass,
.Tactic = tactic,
.IssueKeepFlag = issueKeepFlag,
})
{}

TEvPut(const TLogoBlobID &id, const TString &buffer, TInstant deadline,
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
ETactic tactic = TacticDefault, bool issueKeepFlag = false,
std::optional<TMessageRelevanceWatcher> externalRelevanceWatcher = std::nullopt)
: TEvPut(id, TRope(buffer), deadline, handleClass, tactic, issueKeepFlag,
/*ignoreBlock=*/false, std::move(externalRelevanceWatcher))
ETactic tactic = TacticDefault, bool issueKeepFlag = false)
: TEvPut(TParameters{
.BlobId = id,
.Buffer = TRope(buffer),
.Deadline = deadline,
.HandleClass = handleClass,
.Tactic = tactic,
.IssueKeepFlag = issueKeepFlag,
})
{}


TEvPut(const TLogoBlobID &id, const TSharedData &buffer, TInstant deadline,
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
ETactic tactic = TacticDefault, bool issueKeepFlag = false,
std::optional<TMessageRelevanceWatcher> externalRelevanceWatcher = std::nullopt)
: TEvPut(id, TRope(buffer), deadline, handleClass, tactic, issueKeepFlag,
/*ignoreBlock=*/false, std::move(externalRelevanceWatcher))
ETactic tactic = TacticDefault, bool issueKeepFlag = false)
: TEvPut(TParameters{
.BlobId = id,
.Buffer = TRope(buffer),
.Deadline = deadline,
.HandleClass = handleClass,
.Tactic = tactic,
.IssueKeepFlag = issueKeepFlag,
})
{}

TString Print(bool isFull) const {
TStringStream str;
str << "TEvPut {Id# " << Id.ToString();
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/base/blobstorage_relevance.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include "blobstorage_relevance.h"

namespace NKikimr {

TMessageRelevance::TMessageRelevance(const TMessageRelevanceOwner& onwer,
std::optional<TMessageRelevanceWatcher> external)
: InternalWatcher(onwer)
, ExternalWatcher(external)
{}

bool TMessageRelevance::IsRelevant() const {
return !InternalWatcher.expired() && (!ExternalWatcher || !ExternalWatcher->expired());
}

} // namespace NKikimr
26 changes: 26 additions & 0 deletions ydb/core/base/blobstorage_relevance.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <memory>
#include <optional>

namespace NKikimr {

struct TMessageRelevanceTracker {};
using TMessageRelevanceOwner = std::shared_ptr<TMessageRelevanceTracker>;
using TMessageRelevanceWatcher = std::weak_ptr<TMessageRelevanceTracker>;

class TMessageRelevance {
public:
TMessageRelevance() = default;
TMessageRelevance(const TMessageRelevanceOwner& owner,
std::optional<TMessageRelevanceWatcher> external = std::nullopt);
bool IsRelevant() const;

private:
// tracks request actor state and cancels request when actor dies
TMessageRelevanceWatcher InternalWatcher;
// can be passed as request parameter to cancel request on demand
std::optional<TMessageRelevanceWatcher> ExternalWatcher;
};

} // namespace NKikimr
1 change: 1 addition & 0 deletions ydb/core/base/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ SRCS(
blobstorage.h
blobstorage.cpp
blobstorage_grouptype.cpp
blobstorage_relevance.cpp
boot_type.h
boot_type.cpp
channel_profiles.h
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/backpressure/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class TEventHolder {
TIntrusivePtr<TEventSerializedData> Buffer;
TBSProxyContextPtr BSProxyCtx;
std::unique_ptr<IEventBase> LocalEvent;
std::optional<std::weak_ptr<TMessageRelevanceTracker>> Tracker;
std::optional<TMessageRelevance> Tracker;

public:
TEventHolder()
Expand Down Expand Up @@ -93,7 +93,7 @@ class TEventHolder {
}

bool Relevant() const {
return !Tracker || !Tracker->expired();
return !Tracker || Tracker->IsRelevant();
}

ui32 GetByteSize() const {
Expand Down
11 changes: 4 additions & 7 deletions ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
, RequestStartTime(params.Common.Now)
, Source(params.Common.Source)
, Cookie(params.Common.Cookie)
, RelevanceOwner(std::make_shared<TMessageRelevanceTracker>())
, ExternalRelevanceWatcher(std::move(params.Common.ExternalRelevanceWatcher))
, LatencyQueueKind(params.Common.LatencyQueueKind)
, RacingDomains(&Info->GetTopology())
, ExecutionRelay(std::move(params.Common.ExecutionRelay))
Expand All @@ -240,12 +242,6 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
}

Y_ABORT_UNLESS(CostModel);

if (params.Common.ExternalRelevanceWatcher) {
Relevance = std::move(*params.Common.ExternalRelevanceWatcher);
} else {
Relevance = std::make_shared<TMessageRelevanceTracker>();
}
}

virtual ~TBlobStorageGroupRequestActor() = default;
Expand Down Expand Up @@ -326,7 +322,8 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
private:
const TActorId Source;
const ui64 Cookie;
std::variant<TMessageRelevanceOwner, TMessageRelevanceWatcher> Relevance;
TMessageRelevanceOwner RelevanceOwner;
std::optional<TMessageRelevanceWatcher> ExternalRelevanceWatcher;
ui32 RequestsInFlight = 0;
std::unique_ptr<IEventBase> Response;
const TMaybe<TGroupStat::EKind> LatencyQueueKind;
Expand Down
15 changes: 4 additions & 11 deletions ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1066,8 +1066,7 @@ namespace NKikimr {

if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus> &&
!std::is_same_v<T, TEvBlobStorage::TEvVAssimilate>) {
std::visit([&](const auto& relevance) { ev.MessageRelevanceTracker = relevance; },
Relevance);
ev.MessageRelevanceTracker = TMessageRelevance(RelevanceOwner, ExternalRelevanceWatcher);
ui64 cost;
if constexpr (std::is_same_v<T, TEvBlobStorage::TEvVMultiPut>) {
bool internalQueue;
Expand Down Expand Up @@ -1132,17 +1131,11 @@ namespace NKikimr {
}

bool TBlobStorageGroupRequestActor::CheckForExternalCancellation() {
bool cancelled = false;
std::visit(
TOverloaded{
[&](const TMessageRelevanceOwner&) { cancelled = false; },
[&](const TMessageRelevanceWatcher& watcher) { cancelled = watcher.expired(); }
}, Relevance);

if (cancelled) {
if (ExternalRelevanceWatcher && ExternalRelevanceWatcher->expired()) {
ReplyAndDie(NKikimrProto::ERROR);
return true;
}
return cancelled;
return false;
}

void TBlobStorageGroupProxy::Handle(TEvGetQueuesInfo::TPtr ev) {
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/blobstorage/ut_blobstorage/cancellation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,14 @@ Y_UNIT_TEST_SUITE(Cancellation) {
{
TString data = MakeData(10);
TLogoBlobID blobId(1, 1, 1, 1, data.size(), 1);
TEvBlobStorage::TEvPut* ev = new TEvBlobStorage::TEvPut(blobId, data, TInstant::Max(),
NKikimrBlobStorage::TabletLog, TEvBlobStorage::TEvPut::TacticDefault,
false, owner);
TEvBlobStorage::TEvPut* ev = new TEvBlobStorage::TEvPut(
TEvBlobStorage::TEvPut::TParameters{
.BlobId = blobId,
.Buffer = TRope(data),
.Deadline = TInstant::Max(),
.ExternalRelevanceWatcher = owner,
}
);
owner.reset();
ctx.Env->Runtime->WrapInActorContext(ctx.Edge, [&] {
SendToBSProxy(ctx.Edge, ctx.GroupId, ev);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/vdisk/common/vdisk_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

struct TEventWithRelevanceTracker {
std::optional<TMessageRelevanceWatcher> MessageRelevanceTracker;
std::optional<TMessageRelevance> MessageRelevanceTracker;
};

struct TEvBlobStorage::TEvVPut
Expand Down
Loading