Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8c46a5b
feat(notifications): add expired orders notifications
shoom3301 Sep 3, 2025
4750e8b
fix: consider order as expired only when it is not filled
shoom3301 Sep 3, 2025
a2a157a
fix: support eth-flow orders for expired order notifications
shoom3301 Sep 3, 2025
7258aa7
chore: update POLLING_INTERVAL
shoom3301 Sep 3, 2025
e588291
chore: update sdk
shoom3301 Sep 4, 2025
07bcf87
feat: add OrdersAppDataRepository to get app-data
shoom3301 Sep 4, 2025
7125ccc
fix: do not send trade notifications of bridging orders
shoom3301 Sep 4, 2025
d3f4f22
chore: fix comments
shoom3301 Sep 4, 2025
0390cea
chore: improve types
shoom3301 Sep 4, 2025
d43e678
chore: fix using set
shoom3301 Sep 4, 2025
5cdca68
chore: fix using erc20Repository
shoom3301 Sep 4, 2025
c570630
chore: fix notification id
shoom3301 Sep 4, 2025
70c97be
chore: add threshold for expired orders
shoom3301 Sep 4, 2025
6297039
fix: optimize expired orders query
shoom3301 Sep 4, 2025
6aa9af6
chore: fix ORDER_EXPIRATION_THRESHOLD
shoom3301 Sep 4, 2025
f22ce22
chore: adjust expired orders query
shoom3301 Sep 4, 2025
0899701
chore: fix sign
shoom3301 Sep 4, 2025
89cd92b
chore: add threshold for partially-fillable orders
shoom3301 Sep 4, 2025
d5080f0
Merge branch 'feat/expired-orders-ntofications' of https://github.com…
shoom3301 Sep 4, 2025
6b3814e
chore: fix LatestAppDataDocVersion
shoom3301 Sep 4, 2025
11cfaa9
chore: npm -> yarn
shoom3301 Sep 4, 2025
8a3bff9
chore: consider multiple presign events
shoom3301 Sep 4, 2025
ac2e321
fix: warn instead of error when orm analytics env vars are missing
alfetopito Sep 5, 2025
ed5e264
feat: add OrdersRepository
alfetopito Sep 5, 2025
e3b25b6
feat: add order type info to the trade notification msg
alfetopito Sep 5, 2025
7139d02
refactor: chunk orders
alfetopito Sep 5, 2025
97d9d93
fix: add colomn to separate title from summary
alfetopito Sep 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/api/src/app/plugins/orm-analytics.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import 'reflect-metadata';
import { isDbEnabled } from '@cowprotocol/repositories';
import { FastifyInstance } from 'fastify';
import typeORMPlugin from 'typeorm-fastify-plugin';
import fp from 'fastify-plugin';
import 'reflect-metadata';
import typeORMPlugin from 'typeorm-fastify-plugin';
import { PoolInfo } from '../data/poolInfo';
import { isDbEnabled } from '@cowprotocol/repositories';

export default fp(async function (fastify: FastifyInstance) {
if (!isDbEnabled) {
Expand All @@ -26,7 +26,7 @@ export default fp(async function (fastify: FastifyInstance) {
);

if (dbParamsAreInvalid) {
console.error(
console.warn(
'Invalid CoW Analytics database parameters, please check COW_ANALYTICS_* env vars'
);
return;
Expand Down
2 changes: 1 addition & 1 deletion apps/notification-producer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ RUN chown -R notification-producer:notification-producer .

# You can remove this install step if you build with `--bundle` option.
# The bundled output will include external dependencies.
RUN npm --prefix notification-producer --omit=dev -f install
RUN yarn --cwd notification-producer install --production --force

CMD [ "node", "notification-producer" ]
28 changes: 23 additions & 5 deletions apps/notification-producer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@ import 'reflect-metadata';

import {
getCacheRepository,
getOnChainPlacedOrdersRepository,
getErc20Repository,
getExpiredOrdersRepository,
getIndexerStateRepository,
getOnChainPlacedOrdersRepository,
getOrdersAppDataRepository,
getOrdersRepository,
getPushNotificationsRepository,
getPushSubscriptionsRepository
getPushSubscriptionsRepository,
} from '@cowprotocol/services';

import { Runnable } from '../types';
import { TradeNotificationProducer } from './producers/trade/TradeNotificationProducer';
import { ALL_SUPPORTED_CHAIN_IDS } from '@cowprotocol/cow-sdk';
import { logger } from '@cowprotocol/shared';
import ms from 'ms';
import { Runnable } from '../types';
import { CmsNotificationProducer } from './producers/cms/CmsNotificationProducer';
import { logger } from '@cowprotocol/shared';
import { ExpiredOrdersNotificationProducer } from './producers/expired-orders/ExpiredOrdersNotificationProducer';
import { TradeNotificationProducer } from './producers/trade/TradeNotificationProducer';

const TIMEOUT_STOP_PRODUCERS = ms(`30s`);

Expand All @@ -36,13 +40,18 @@ async function mainLoop() {
const pushSubscriptionsRepository = getPushSubscriptionsRepository();
const indexerStateRepository = getIndexerStateRepository();
const onChainPlacedOrdersRepository = getOnChainPlacedOrdersRepository();
const expiredOrdersRepository = getExpiredOrdersRepository();
const ordersAppDataRepository = getOrdersAppDataRepository();
const ordersRepository = getOrdersRepository();

const repositories = {
pushNotificationsRepository,
pushSubscriptionsRepository,
indexerStateRepository,
erc20Repository,
onChainPlacedOrdersRepository,
ordersAppDataRepository,
ordersRepository,
};

// Create all producers
Expand All @@ -57,6 +66,15 @@ async function mainLoop() {
chainId,
});
}),

// Expired order producer
...chainIds.map((chainId) => {
return new ExpiredOrdersNotificationProducer({
chainId,
...repositories,
expiredOrdersRepository,
});
}),
];

// Run all producers in the background
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import { BARN_ETH_FLOW_ADDRESSES, ETH_FLOW_ADDRESSES, SupportedChainId } from '@cowprotocol/cow-sdk';
import {
Erc20Repository,
ExpiredOrdersRepository,
IndexerStateRepository,
IndexerStateValue,
OnChainPlacedOrdersRepository,
PushNotificationsRepository,
PushSubscriptionsRepository
} from '@cowprotocol/repositories';

import { Runnable } from '../../../types';
import { doForever, logger } from '@cowprotocol/shared';
import { getExpiredOrderNotification } from './getExpiredOrderNotification';
import { isTruthy } from '../../utils/commonUtils';

async function wait(time: number) {
return new Promise((res) => setTimeout(res, time))
}

const WAIT_TIME = 10_000;
const POLLING_INTERVAL = 120_000; // 2 minutes
const PRODUCER_NAME = 'expired_orders_notification_producer';

export type ExpiredOrdersNotificationProducerProps = {
chainId: SupportedChainId;
erc20Repository: Erc20Repository;
indexerStateRepository: IndexerStateRepository;
pushSubscriptionsRepository: PushSubscriptionsRepository;
expiredOrdersRepository: ExpiredOrdersRepository;
pushNotificationsRepository: PushNotificationsRepository;
onChainPlacedOrdersRepository: OnChainPlacedOrdersRepository;
};

export interface ExpiredOrdersNotificationProducerState extends IndexerStateValue {
lastCheckTimestamp: string;
}

export class ExpiredOrdersNotificationProducer implements Runnable {
isStopping = false;
prefix: string;

constructor(private props: ExpiredOrdersNotificationProducerProps) {
this.prefix = '[ExpiredOrdersNotificationProducer:' + this.props.chainId + ']';
}

/**
* Main loop: Run the Expired orders notification producer. This method runs indefinitely,
* fetching notifications and sending them to the queue.
*
* The method should not throw or finish.
*/
async start(): Promise<void> {
await doForever({
name: 'ExpiredOrdersNotificationProducer:' + this.props.chainId,
callback: async (stop) => {
if (this.isStopping) {
stop();
return;
}
await this.processExpiredOrders();
},
waitTimeMilliseconds: WAIT_TIME,
logger
});
}

async stop(): Promise<void> {
this.isStopping = true;
}

async processExpiredOrders(): Promise<void> {
return this.pollExpiredOrders().then(() => {
return wait(POLLING_INTERVAL);
}).then(() => {
if (this.isStopping) return

return this.processExpiredOrders();
});
}

async pollExpiredOrders() {
const {
chainId,
erc20Repository,
indexerStateRepository,
pushSubscriptionsRepository,
expiredOrdersRepository,
pushNotificationsRepository,
onChainPlacedOrdersRepository
} = this.props;

const nowTimestamp = Math.ceil(Date.now() / 1000);

const stateRegistry =
await indexerStateRepository.get<ExpiredOrdersNotificationProducerState>(
PRODUCER_NAME,
chainId
);

const lastCheckTimestampRaw = stateRegistry?.state.lastCheckTimestamp;

if (lastCheckTimestampRaw) {
const lastCheckTimestamp = Number(lastCheckTimestampRaw);

const ethFlowAddresses = [ETH_FLOW_ADDRESSES[chainId], BARN_ETH_FLOW_ADDRESSES[chainId]].map(t => t.toLowerCase());

const accounts =
await pushSubscriptionsRepository.getAllSubscribedAccounts();

const expiredOrders = await expiredOrdersRepository.fetchExpiredOrdersForAccounts({
chainId,
accounts: [...accounts, ...ethFlowAddresses],
lastCheckTimestamp,
nowTimestamp
});

const ethFlowOrderOwners = expiredOrders.length
? await onChainPlacedOrdersRepository.getAccountsForOrders(chainId, expiredOrders.map(o => o.uid))
: {};

logger.debug(
`${this.prefix} got ${expiredOrders.length} expired orders of ${accounts.length} accounts, lastCheckTimestamp=${lastCheckTimestamp}`
);

const notifications = await Promise.all(expiredOrders.map(order => {
const isEthFlowOrder = ethFlowAddresses.includes(order.owner.toLowerCase());

const orderOwner = isEthFlowOrder
? Object.keys(ethFlowOrderOwners).find(key => {
const orderUids = ethFlowOrderOwners[key];

return orderUids.includes(order.uid.toLowerCase());
})
: order.owner.toLowerCase();

if (!orderOwner) return Promise.resolve(undefined);

return getExpiredOrderNotification(order, {
chainId,
nowTimestamp,
lastCheckTimestamp,
isEthFlowOrder,
owner: orderOwner,
erc20Repository
});
}));

if (notifications.length > 0) {
logger.info(
`${this.prefix} Sending ${notifications.length} notifications`,
JSON.stringify(notifications, null, 2)
);

// Post notifications to queue
pushNotificationsRepository.send(notifications.filter(isTruthy));
}
}

await indexerStateRepository.upsert<ExpiredOrdersNotificationProducerState>(
PRODUCER_NAME,
{ lastCheckTimestamp: nowTimestamp.toString() },
chainId
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { PushNotification } from '@cowprotocol/notifications';
import { Erc20Repository, ParsedExpiredOrder } from '@cowprotocol/repositories';
import { getExplorerUrl } from '@cowprotocol/shared';
import { type SupportedChainId } from '@cowprotocol/cow-sdk';
import { getNotificationSummary } from '../../utils/getNotificationSummary';

export interface ExpiredOrderNotificationContext {
chainId: SupportedChainId;
nowTimestamp: number;
lastCheckTimestamp: number;
isEthFlowOrder: boolean;
owner: string;
erc20Repository: Erc20Repository;
}

export async function getExpiredOrderNotification(
expiredOrder: ParsedExpiredOrder,
notificationContext: ExpiredOrderNotificationContext
): Promise<PushNotification> {
const { chainId, lastCheckTimestamp, nowTimestamp, isEthFlowOrder, owner, erc20Repository } = notificationContext;

const summary = await getNotificationSummary({
chainId,
isEthFlowOrder,
erc20Repository,
sellAmount: expiredOrder.sellAmount,
buyAmount: expiredOrder.buyAmount,
sellTokenAddress: expiredOrder.sellTokenAddress,
buyTokenAddress: expiredOrder.buyTokenAddress
});

const title = `🕐 Order ${summary} has expired`;
const message = `
Expiration time: ${new Date(expiredOrder.validTo * 1000).toISOString()}.
Account: ${owner}.
`.trim();

const url = getExplorerUrl(chainId, expiredOrder.uid);

return {
id: 'OrderExpired-' + expiredOrder.uid + '-' + expiredOrder.validTo + '-' + lastCheckTimestamp,
account: expiredOrder.owner.toLowerCase(),
title,
message,
url,
context: {
chainId: chainId.toString(),
nowTimestamp: nowTimestamp.toString()
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ import {
Erc20Repository,
getViemClients,
IndexerStateValue,
PushNotificationsRepository,
OnChainPlacedOrdersRepository,
OrdersAppDataRepository,
OrdersRepository,
PushNotificationsRepository,
} from '@cowprotocol/repositories';

import { Runnable } from '../../../types';
import { PushSubscriptionsRepository } from '@cowprotocol/repositories';
import { IndexerStateRepository } from '@cowprotocol/repositories';
import {
IndexerStateRepository,
PushSubscriptionsRepository,
} from '@cowprotocol/repositories';
import { doForever, logger } from '@cowprotocol/shared';
import { Runnable } from '../../../types';
import { getTradeNotifications } from './getTradeNotifications';

const WAIT_TIME = 10000;
Expand All @@ -27,6 +31,8 @@ export type TradeNotificationProducerProps = {
indexerStateRepository: IndexerStateRepository;
erc20Repository: Erc20Repository;
onChainPlacedOrdersRepository: OnChainPlacedOrdersRepository;
ordersAppDataRepository: OrdersAppDataRepository;
ordersRepository: OrdersRepository;
};

export interface TradeNotificationProducerState extends IndexerStateValue {
Expand All @@ -44,7 +50,7 @@ export class TradeNotificationProducer implements Runnable {
}

/**
* Main loop: Run the CMS notification producer. This method runs indefinitely,
* Main loop: Run the Trade notification producer. This method runs indefinitely,
* fetching notifications and sending them to the queue.
*
* The method should not throw or finish.
Expand Down Expand Up @@ -174,7 +180,9 @@ export class TradeNotificationProducer implements Runnable {
pushSubscriptionsRepository,
indexerStateRepository,
erc20Repository,
onChainPlacedOrdersRepository
onChainPlacedOrdersRepository,
ordersAppDataRepository,
ordersRepository,
} = this.props;

// Get all accounts subscribed to PUSH notifications
Expand All @@ -189,6 +197,8 @@ export class TradeNotificationProducer implements Runnable {
chainId,
erc20Repository,
onChainPlacedOrdersRepository,
ordersAppDataRepository,
ordersRepository,
prefix: this.prefix,
});

Expand All @@ -204,10 +214,10 @@ export class TradeNotificationProducer implements Runnable {
`${this.prefix} Sending ${notifications.length} notifications`,
JSON.stringify(notifications, null, 2)
);
}

// Post notifications to queue
this.props.pushNotificationsRepository.send(notifications);
// Post notifications to queue
this.props.pushNotificationsRepository.send(notifications);
}

// Update state
await indexerStateRepository.upsert<TradeNotificationProducerState>(
Expand Down
Loading