Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions db/clickhouse/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ CREATE TABLE umami.website_event
event_name String,
tag String,
distinct_id String,
visitor_id String,
created_at DateTime('UTC'),
job_id Nullable(UUID)
)
Expand Down Expand Up @@ -123,6 +124,7 @@ CREATE TABLE umami.website_event_stats_hourly
max_time SimpleAggregateFunction(max, DateTime('UTC')),
tag SimpleAggregateFunction(groupArrayArray, Array(String)),
distinct_id String,
visitor_id String,
created_at Datetime('UTC')
)
ENGINE = AggregatingMergeTree
Expand Down Expand Up @@ -176,6 +178,7 @@ SELECT
max_time,
tag,
distinct_id,
visitor_id,
timestamp as created_at
FROM (SELECT
website_id,
Expand Down Expand Up @@ -214,6 +217,7 @@ FROM (SELECT
max(created_at) max_time,
arrayFilter(x -> x != '', groupArray(tag)) tag,
distinct_id,
visitor_id,
toStartOfHour(created_at) timestamp
FROM umami.website_event
GROUP BY website_id,
Expand All @@ -230,6 +234,7 @@ GROUP BY website_id,
city,
event_type,
distinct_id,
visitor_id,
timestamp);

-- projections
Expand Down Expand Up @@ -281,3 +286,16 @@ JOIN (SELECT event_id, string_value as currency
WHERE positionCaseInsensitive(data_key, 'currency') > 0) c
ON c.event_id = ed.event_id
WHERE positionCaseInsensitive(data_key, 'revenue') > 0;

-- identity linking
CREATE TABLE umami.identity_link
(
website_id UUID,
visitor_id String,
distinct_id String,
created_at DateTime('UTC'),
linked_at DateTime('UTC')
)
ENGINE = ReplacingMergeTree(linked_at)
ORDER BY (website_id, visitor_id, distinct_id)
SETTINGS index_granularity = 8192;
35 changes: 27 additions & 8 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ model Session {
region String? @db.VarChar(20)
city String? @db.VarChar(50)
distinctId String? @map("distinct_id") @db.VarChar(50)
visitorId String? @map("visitor_id") @db.VarChar(50)
createdAt DateTime? @default(now()) @map("created_at") @db.Timestamptz(6)

websiteEvents WebsiteEvent[]
Expand All @@ -60,6 +61,7 @@ model Session {
@@index([websiteId, createdAt, country])
@@index([websiteId, createdAt, region])
@@index([websiteId, createdAt, city])
@@index([websiteId, visitorId])
@@map("session")
}

Expand All @@ -76,14 +78,15 @@ model Website {
updatedAt DateTime? @updatedAt @map("updated_at") @db.Timestamptz(6)
deletedAt DateTime? @map("deleted_at") @db.Timestamptz(6)

user User? @relation("user", fields: [userId], references: [id])
createUser User? @relation("createUser", fields: [createdBy], references: [id])
team Team? @relation(fields: [teamId], references: [id])
eventData EventData[]
reports Report[]
revenue Revenue[]
segments Segment[]
sessionData SessionData[]
user User? @relation("user", fields: [userId], references: [id])
createUser User? @relation("createUser", fields: [createdBy], references: [id])
team Team? @relation(fields: [teamId], references: [id])
eventData EventData[]
reports Report[]
revenue Revenue[]
segments Segment[]
sessionData SessionData[]
identityLinks IdentityLink[]

@@index([userId])
@@index([teamId])
Expand Down Expand Up @@ -316,3 +319,19 @@ model Pixel {
@@index([createdAt])
@@map("pixel")
}

model IdentityLink {
id String @id @unique @map("identity_link_id") @db.Uuid
websiteId String @map("website_id") @db.Uuid
visitorId String @map("visitor_id") @db.VarChar(50)
distinctId String @map("distinct_id") @db.VarChar(50)
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(6)
linkedAt DateTime @default(now()) @updatedAt @map("linked_at") @db.Timestamptz(6)

website Website @relation(fields: [websiteId], references: [id], onDelete: Cascade)

@@unique([websiteId, visitorId, distinctId])
@@index([websiteId, distinctId])
@@index([websiteId, visitorId])
@@map("identity_link")
}
19 changes: 18 additions & 1 deletion src/app/api/send/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { secret, uuid, hash } from '@/lib/crypto';
import { COLLECTION_TYPE, EVENT_TYPE } from '@/lib/constants';
import { anyObjectParam, urlOrPathParam } from '@/lib/schema';
import { safeDecodeURI, safeDecodeURIComponent } from '@/lib/url';
import { createSession, saveEvent, saveSessionData } from '@/queries/sql';
import { createSession, saveEvent, saveSessionData, createIdentityLink } from '@/queries/sql';
import { serializeError } from 'serialize-error';

interface Cache {
Expand Down Expand Up @@ -41,6 +41,7 @@ const schema = z.object({
userAgent: z.string().optional(),
timestamp: z.coerce.number().int().optional(),
id: z.string().optional(),
vid: z.string().max(50).optional(),
})
.refine(
data => {
Expand Down Expand Up @@ -80,6 +81,7 @@ export async function POST(request: Request) {
tag,
timestamp,
id,
vid: visitorId,
} = payload;

const sourceId = websiteId || pixelId || linkId;
Expand Down Expand Up @@ -146,6 +148,7 @@ export async function POST(request: Request) {
region,
city,
distinctId: id,
visitorId,
createdAt,
});
}
Expand Down Expand Up @@ -226,6 +229,7 @@ export async function POST(request: Request) {

// Session
distinctId: id,
visitorId,
browser,
os,
device,
Expand Down Expand Up @@ -265,6 +269,19 @@ export async function POST(request: Request) {
createdAt,
});
}

// Create identity link when both visitorId and distinctId are present
// Fire-and-forget to avoid adding latency to the tracking endpoint
if (visitorId && id && websiteId) {
createIdentityLink({
websiteId,
visitorId,
distinctId: id,
}).catch(e => {
// eslint-disable-next-line no-console
console.error('Failed to create identity link:', e);
});
}
}

const token = createToken({ websiteId, sessionId, visitId, iat }, secret());
Expand Down
3 changes: 3 additions & 0 deletions src/queries/sql/events/saveEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export interface SaveEventArgs {

// Session
distinctId?: string;
visitorId?: string;
browser?: string;
os?: string;
device?: string;
Expand Down Expand Up @@ -164,6 +165,7 @@ async function clickhouseQuery({
referrerQuery,
referrerDomain,
distinctId,
visitorId,
browser,
os,
device,
Expand Down Expand Up @@ -220,6 +222,7 @@ async function clickhouseQuery({
event_name: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null,
tag: tag,
distinct_id: distinctId,
visitor_id: visitorId,
created_at: getUTCString(createdAt),
browser,
os,
Expand Down
59 changes: 35 additions & 24 deletions src/queries/sql/getWebsiteStats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,30 @@ async function relationalQuery(
`
select
cast(coalesce(sum(t.c), 0) as bigint) as "pageviews",
count(distinct t.session_id) as "visitors",
count(distinct coalesce(t.resolved_identity, t.session_id::text)) as "visitors",
count(distinct t.visit_id) as "visits",
coalesce(sum(case when t.c = 1 then 1 else 0 end), 0) as "bounces",
cast(coalesce(sum(${getTimestampDiffSQL('t.min_time', 't.max_time')}), 0) as bigint) as "totaltime"
from (
select
website_event.session_id,
website_event.visit_id,
il.distinct_id as "resolved_identity",
count(*) as "c",
min(website_event.created_at) as "min_time",
max(website_event.created_at) as "max_time"
from website_event
${cohortQuery}
${joinSessionQuery}
${joinSessionQuery}
left join session on session.session_id = website_event.session_id
and session.website_id = website_event.website_id
left join identity_link il on il.visitor_id = session.visitor_id
and il.website_id = session.website_id
where website_event.website_id = {{websiteId::uuid}}
and website_event.created_at between {{startDate}} and {{endDate}}
and website_event.event_type != 2
${filterQuery}
group by 1, 2
group by 1, 2, 3
) as t
`,
queryParams,
Expand All @@ -79,47 +84,53 @@ async function clickhouseQuery(
sql = `
select
sum(t.c) as "pageviews",
uniq(t.session_id) as "visitors",
uniq(coalesce(t.resolved_identity, toString(t.session_id))) as "visitors",
uniq(t.visit_id) as "visits",
sum(if(t.c = 1, 1, 0)) as "bounces",
sum(max_time-min_time) as "totaltime"
from (
select
session_id,
visit_id,
we.session_id,
we.visit_id,
il.distinct_id as resolved_identity,
count(*) c,
min(created_at) min_time,
max(created_at) max_time
from website_event
min(we.created_at) min_time,
max(we.created_at) max_time
from website_event we
${cohortQuery}
where website_id = {websiteId:UUID}
and created_at between {startDate:DateTime64} and {endDate:DateTime64}
and event_type != 2
left join identity_link final il on il.visitor_id = we.visitor_id
and il.website_id = we.website_id
where we.website_id = {websiteId:UUID}
and we.created_at between {startDate:DateTime64} and {endDate:DateTime64}
and we.event_type != 2
${filterQuery}
group by session_id, visit_id
group by we.session_id, we.visit_id, il.distinct_id
) as t;
`;
} else {
sql = `
select
sum(t.c) as "pageviews",
uniq(session_id) as "visitors",
uniq(coalesce(resolved_identity, toString(session_id))) as "visitors",
uniq(visit_id) as "visits",
sumIf(1, t.c = 1) as "bounces",
sum(max_time-min_time) as "totaltime"
from (select
session_id,
visit_id,
sum(views) c,
min(min_time) min_time,
max(max_time) max_time
from website_event_stats_hourly "website_event"
we.session_id,
we.visit_id,
il.distinct_id as resolved_identity,
sum(we.views) c,
min(we.min_time) min_time,
max(we.max_time) max_time
from website_event_stats_hourly we
${cohortQuery}
where website_id = {websiteId:UUID}
and created_at between {startDate:DateTime64} and {endDate:DateTime64}
and event_type != 2
left join identity_link final il on il.visitor_id = we.visitor_id
and il.website_id = we.website_id
where we.website_id = {websiteId:UUID}
and we.created_at between {startDate:DateTime64} and {endDate:DateTime64}
and we.event_type != 2
${filterQuery}
group by session_id, visit_id
group by we.session_id, we.visit_id, il.distinct_id
) as t;
`;
}
Expand Down
76 changes: 76 additions & 0 deletions src/queries/sql/identity/createIdentityLink.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Identity Stitching - Links anonymous browser sessions to authenticated user identities
*
* Design decisions:
* - One visitor can link to multiple distinct_ids (user logs into different accounts)
* - One distinct_id can link to multiple visitors (user on multiple devices/browsers)
* - Links are additive and never invalidated (preserves historical journey)
* - Uses ReplacingMergeTree in ClickHouse with linked_at for deduplication
* - Upsert pattern ensures idempotency for repeated identify() calls
*
* Edge cases handled:
* - Safari private browsing: visitorId will be undefined, no link created
* - localStorage cleared: new visitorId generated, creates new link
* - Multiple tabs: same visitorId shared via localStorage
*/
import { uuid } from '@/lib/crypto';
import prisma from '@/lib/prisma';
import clickhouse from '@/lib/clickhouse';
import kafka from '@/lib/kafka';
import { CLICKHOUSE, PRISMA, runQuery } from '@/lib/db';

export interface CreateIdentityLinkArgs {
websiteId: string;
visitorId: string;
distinctId: string;
}

export async function createIdentityLink(data: CreateIdentityLinkArgs) {
return runQuery({
[PRISMA]: () => relationalQuery(data),
[CLICKHOUSE]: () => clickhouseQuery(data),
});
}

async function relationalQuery({ websiteId, visitorId, distinctId }: CreateIdentityLinkArgs) {
const { client } = prisma;

return client.identityLink.upsert({
where: {
websiteId_visitorId_distinctId: {
websiteId,
visitorId,
distinctId,
},
},
update: {
linkedAt: new Date(),
},
create: {
id: uuid(),
websiteId,
visitorId,
distinctId,
},
});
}

async function clickhouseQuery({ websiteId, visitorId, distinctId }: CreateIdentityLinkArgs) {
const { insert, getUTCString } = clickhouse;
const { sendMessage } = kafka;

const now = getUTCString(new Date());
const message = {
website_id: websiteId,
visitor_id: visitorId,
distinct_id: distinctId,
created_at: now,
linked_at: now,
};

if (kafka.enabled) {
await sendMessage('identity_link', message);
} else {
await insert('identity_link', [message]);
}
}
Loading