Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
package info.bitrich.xchangestream.binance;

import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.*;
import static org.knowm.xchange.binance.BinanceResilience.*;
import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.EXECUTION_REPORT;
import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.ORDER_TRADE_UPDATE;
import static info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.TRADE_LITE;
import static org.knowm.xchange.binance.BinanceResilience.ORDERS_PER_10_SECONDS_RATE_LIMITER;
import static org.knowm.xchange.binance.BinanceResilience.ORDERS_PER_DAY_RATE_LIMITER;
import static org.knowm.xchange.binance.BinanceResilience.ORDERS_PER_MINUTE_RATE_LIMITER;
import static org.knowm.xchange.binance.BinanceResilience.REQUEST_WEIGHT_RATE_LIMITER;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes;
import info.bitrich.xchangestream.binance.dto.account.AccountUpdateBinanceWebSocketTransaction;
import info.bitrich.xchangestream.binance.dto.trade.*;
import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketOrderCancelAndReplaceResponse;
import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketOrderResponse;
import info.bitrich.xchangestream.binance.dto.trade.ExecutionReportBinanceUserTransaction;
import info.bitrich.xchangestream.binance.dto.trade.ExecutionReportBinanceUserTransaction.ExecutionType;
import info.bitrich.xchangestream.binance.dto.trade.OrderTradeUpdateBinanceWebSocketTransaction;
import info.bitrich.xchangestream.binance.dto.trade.TradeLiteBinanceWebsocketTransaction;
import info.bitrich.xchangestream.core.StreamingTradeService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.github.resilience4j.rxjava3.ratelimiter.operator.RateLimiterOperator;
Expand Down Expand Up @@ -61,7 +70,8 @@ public class BinanceStreamingTradeService implements StreamingTradeService {
private final BinanceExchange exchange;
private final ResilienceRegistries resilienceRegistries;
private volatile BinanceUserDataStreamingService binanceUserDataStreamingService;
@Setter private volatile BinanceUserTradeStreamingService binanceUserTradeStreamingService;
@Setter
private volatile BinanceUserTradeStreamingService binanceUserTradeStreamingService;

private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

Expand Down Expand Up @@ -175,12 +185,14 @@ public Observable<OpenPosition> getPositionChanges(Instrument instrument) {
}
}

public Single<Integer> placeMarketOrder(MarketOrder marketOrder) {
return placeOrder(marketOrder);
@Override
public Single<Integer> placeMarketOrder(MarketOrder order, Object... args) {
return placeOrder(order);
}

public Single<Integer> placeLimitOrder(LimitOrder limitOrder) {
return placeOrder(limitOrder);
@Override
public Single<Integer> placeLimitOrder(LimitOrder order, Object... args) {
return placeOrder(order);
}

public Single<Integer> placeOrder(Order order) {
Expand Down Expand Up @@ -215,7 +227,9 @@ public Single<Integer> placeOrder(Order order) {
resilienceRegistries
.rateLimiters()
.rateLimiter(REQUEST_WEIGHT_RATE_LIMITER)));
} else throw new UnsupportedOperationException("Only spot and futures supported");
} else {
throw new UnsupportedOperationException("Only spot and futures supported");
}
}
} else {
throw new UnsupportedOperationException("binanceUserTradeStreamingService not authorized");
Expand All @@ -228,7 +242,8 @@ private Observable<Integer> placeOrderInternal(Order order) {
.flatMap(
node -> {
TypeReference<BinanceWebsocketOrderResponse<BinanceNewOrder>> typeReference =
new TypeReference<>() {};
new TypeReference<>() {
};
BinanceWebsocketOrderResponse<BinanceNewOrder> response =
mapper.treeToValue(node, typeReference);
if (response.getStatus() == 200) {
Expand All @@ -240,15 +255,17 @@ private Observable<Integer> placeOrderInternal(Order order) {
});
}

public Single<Integer> changeOrder(LimitOrder limitOrder, CancelOrderParams... orderParams) {
@Override
public Single<Integer> changeOrder(LimitOrder limitOrder, Object... args) {
if (binanceUserTradeStreamingService.isAuthorized()) {
if (exchange.isFuturesEnabled()) {
return binanceUserTradeStreamingService
.subscribeChannel(String.valueOf(System.nanoTime()), "order.modify", limitOrder)
.flatMap(
node -> {
TypeReference<BinanceWebsocketOrderResponse<BinanceNewOrder>> typeReference =
new TypeReference<>() {};
new TypeReference<>() {
};
BinanceWebsocketOrderResponse<BinanceNewOrder> response =
mapper.treeToValue(node, typeReference);
if (response.getStatus() == 200) {
Expand All @@ -275,14 +292,14 @@ public Single<Integer> changeOrder(LimitOrder limitOrder, CancelOrderParams... o
.subscribeChannel(
String.valueOf(System.nanoTime()),
"order.cancelReplace",
limitOrder,
orderParams[0])
limitOrder)
.flatMap(
node -> {
TypeReference<
BinanceWebsocketOrderResponse<
BinanceWebsocketOrderCancelAndReplaceResponse>>
typeReference = new TypeReference<>() {};
BinanceWebsocketOrderResponse<
BinanceWebsocketOrderCancelAndReplaceResponse>>
typeReference = new TypeReference<>() {
};
BinanceWebsocketOrderResponse<BinanceWebsocketOrderCancelAndReplaceResponse>
response = mapper.treeToValue(node, typeReference);
if (response.getStatus() == 200) {
Expand All @@ -304,14 +321,16 @@ public Single<Integer> changeOrder(LimitOrder limitOrder, CancelOrderParams... o
.compose(
RateLimiterOperator.of(
resilienceRegistries.rateLimiters().rateLimiter(REQUEST_WEIGHT_RATE_LIMITER)));
} else throw new UnsupportedOperationException("Only spot and futures supported");

} else {
throw new UnsupportedOperationException("Only spot and futures supported");
}
} else {
throw new UnsupportedOperationException("binanceUserTradeStreamingService not authorized");
}
}

public Single<Integer> cancelOrder(CancelOrderParams orderParams) {
@Override
public Single<Integer> cancelOrder(CancelOrderParams orderParams, Object... args) {
if (binanceUserTradeStreamingService.isAuthorized()) {
if (exchange.isFuturesEnabled() || exchange.isSpotEnabled()) {
Observable<Integer> observable =
Expand All @@ -320,7 +339,8 @@ public Single<Integer> cancelOrder(CancelOrderParams orderParams) {
.flatMap(
node -> {
TypeReference<BinanceWebsocketOrderResponse<BinanceNewOrder>> typeReference =
new TypeReference<>() {};
new TypeReference<>() {
};
BinanceWebsocketOrderResponse<BinanceNewOrder> response =
mapper.treeToValue(node, typeReference);
if (response.getStatus() == 200) {
Expand All @@ -343,7 +363,9 @@ public Single<Integer> cancelOrder(CancelOrderParams orderParams) {
}
}

/** Registers subsriptions with the streaming service for the given products. */
/**
* Registers subsriptions with the streaming service for the given products.
*/
public void openSubscriptions() {
if (binanceUserDataStreamingService != null) {
executionReports =
Expand Down Expand Up @@ -372,9 +394,7 @@ public void openSubscriptions() {
}

/**
* User data subscriptions may have to persist across multiple socket connections to different
* URLs and therefore must act in a publisher fashion so that subscribers get an uninterrupted
* stream.
* User data subscriptions may have to persist across multiple socket connections to different URLs and therefore must act in a publisher fashion so that subscribers get an uninterrupted stream.
*/
void setUserDataStreamingService(
BinanceUserDataStreamingService binanceUserDataStreamingService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE
case "order.cancelReplace":
{
LimitOrder limitOrder = (LimitOrder) args[1];
BinanceCancelOrderParams params = (BinanceCancelOrderParams) args[2];
BinanceCancelOrderParams params =
new BinanceCancelOrderParams(limitOrder.getInstrument(), limitOrder.getId(), limitOrder.getUserReference());
Long cancelOrderId = null;
if (params.getOrderId() != null && !params.getOrderId().isEmpty()) {
cancelOrderId = Long.valueOf(params.getOrderId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ public void websocketTrade() throws InterruptedException, IOException {
while (!exchange.isAlive()) {
Thread.sleep(100L);
}
BinanceStreamingTradeService binanceStreamingTradeService =
((BinanceStreamingTradeService) exchange.getStreamingTradeService());
BigDecimal minAmount =
exchange.getExchangeMetaData().getInstruments().get(instrument2).getMinimumAmount();
Ticker ticker = exchange.getMarketDataService().getTicker(instrument2);
Expand All @@ -84,7 +82,7 @@ public void websocketTrade() throws InterruptedException, IOException {
.userReference(limitOrderUserId)
.build();
Disposable limitOrderDisposable =
binanceStreamingTradeService
exchange.getStreamingTradeService()
.placeLimitOrder(limitOrder)
.subscribe(
result -> {
Expand All @@ -102,7 +100,7 @@ public void websocketTrade() throws InterruptedException, IOException {
.userReference(limitOrderUserId)
.build();
Disposable changeOrderDisposable =
binanceStreamingTradeService
exchange.getStreamingTradeService()
.changeOrder(changeOrder)
.subscribe(
result -> {
Expand All @@ -115,7 +113,7 @@ public void websocketTrade() throws InterruptedException, IOException {
LOG.info("changeOrder disposed: {}", changeOrderDisposable.isDisposed());

Disposable cancelOrderDisposable =
binanceStreamingTradeService
exchange.getStreamingTradeService()
.cancelOrder(new BinanceCancelOrderParams(instrument2, null, limitOrderUserId))
.subscribe(
result -> {
Expand All @@ -133,7 +131,7 @@ public void websocketTrade() throws InterruptedException, IOException {
.userReference(marketOrderUserId)
.build();
Disposable marketOrderDisposable =
binanceStreamingTradeService
exchange.getStreamingTradeService()
.placeMarketOrder(marketOrder)
.doOnError(error -> LOG.error("placeMarketOrder error", error))
.subscribe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ public void websocketTrade() throws InterruptedException, IOException {
while (!exchange.isAlive()) {
Thread.sleep(100L);
}
BinanceStreamingTradeService binanceStreamingTradeService =
((BinanceStreamingTradeService) exchange.getStreamingTradeService());
BigDecimal minAmount =
exchange.getExchangeMetaData().getInstruments().get(instrument2).getMinimumAmount();
Ticker ticker = exchange.getMarketDataService().getTicker(instrument2);
Expand All @@ -84,7 +82,7 @@ public void websocketTrade() throws InterruptedException, IOException {
.userReference(limitOrderUserId)
.build();
Disposable limitOrderDisposable =
binanceStreamingTradeService
exchange.getStreamingTradeService()
.placeLimitOrder(limitOrder)
.subscribe(
result -> {
Expand All @@ -103,8 +101,8 @@ public void websocketTrade() throws InterruptedException, IOException {
.userReference(limitOrderUserId)
.build();
Disposable changeOrderDisposable =
binanceStreamingTradeService
.changeOrder(changeOrder, cancelOrderParams)
exchange.getStreamingTradeService()
.changeOrder(changeOrder)
.subscribe(
result -> {
if (logOutput) {
Expand All @@ -116,7 +114,7 @@ public void websocketTrade() throws InterruptedException, IOException {
LOG.info("changeOrder disposed: {}", changeOrderDisposable.isDisposed());

Disposable cancelOrderDisposable =
binanceStreamingTradeService
exchange.getStreamingTradeService()
.cancelOrder(cancelOrderParams)
.subscribe(
result -> {
Expand All @@ -134,7 +132,7 @@ public void websocketTrade() throws InterruptedException, IOException {
.userReference(marketOrderUserId)
.build();
Disposable marketOrderDisposable =
binanceStreamingTradeService
exchange.getStreamingTradeService()
.placeMarketOrder(marketOrder)
.doOnError(error -> LOG.error("placeMarketOrder error", error))
.subscribe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public BybitStreamingTradeService(
this.userTradeService = userTradeService;
}

public Single<Integer> placeMarketOrder(MarketOrder order) {
@Override
public Single<Integer> placeMarketOrder(MarketOrder order, Object... args) {
BybitCategory category = BybitAdapters.getCategory(order.getInstrument());
Observable<Integer> observable =
userTradeService
Expand All @@ -74,7 +75,8 @@ public Single<Integer> placeMarketOrder(MarketOrder order) {
.toSingle();
}

public Single<Integer> placeLimitOrder(LimitOrder order) {
@Override
public Single<Integer> placeLimitOrder(LimitOrder order, Object... args) {
BybitCategory category = BybitAdapters.getCategory(order.getInstrument());
Observable<Integer> observable =
userTradeService
Expand All @@ -96,7 +98,8 @@ public Single<Integer> placeLimitOrder(LimitOrder order) {
.toSingle();
}

public Single<Integer> changeOrder(LimitOrder order) {
@Override
public Single<Integer> changeOrder(LimitOrder order, Object... args) {
BybitCategory category = BybitAdapters.getCategory(order.getInstrument());
Observable<Integer> observable =
userTradeService
Expand Down Expand Up @@ -152,7 +155,8 @@ public Single<List<Integer>> batchChangeOrder(List<LimitOrder> orders) {
}
}

public Single<Integer> cancelOrder(CancelOrderParams params) {
@Override
public Single<Integer> cancelOrder(CancelOrderParams params, Object... args) {
BybitCancelOrderParams bybitParams = (BybitCancelOrderParams) params;
BybitCategory category = BybitAdapters.getCategory(bybitParams.getInstrument());
Observable<Integer> observable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static info.bitrich.xchangestream.bybit.example.BaseBybitExchange.connectMainApi;

import info.bitrich.xchangestream.bybit.BybitStreamingExchange;
import info.bitrich.xchangestream.core.StreamingExchange;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;
import java.io.IOException;
Expand All @@ -28,19 +29,21 @@ public class BybitStreamSpotWebsocketTradeExample {
private static final Logger LOG =
LoggerFactory.getLogger(BybitStreamSpotWebsocketTradeExample.class);
static Instrument instrument = new CurrencyPair("XRP/USDT");
static BybitStreamingExchange exchange;
static BybitStreamingExchange bybitExchange;
static StreamingExchange exchange;

public static void main(String[] args) throws IOException {
exchange = (BybitStreamingExchange) connectMainApi(BybitCategory.SPOT, true);
exchange = connectMainApi(BybitCategory.SPOT, true);
bybitExchange = (BybitStreamingExchange) exchange;
try {
while (!exchange.isAlive()) {
while (!bybitExchange.isAlive()) {
TimeUnit.MILLISECONDS.sleep(100);
}
// main(not demo) api only
websocketTradeExample();
Thread.sleep(1000);
websocketBatchTradeExample();
exchange.disconnect().blockingAwait();
bybitExchange.disconnect().blockingAwait();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
Expand All @@ -52,7 +55,7 @@ private static void websocketBatchTradeExample() throws IOException, Interrupted
CompositeDisposable compositeDisposable = new CompositeDisposable();
BigDecimal minAmount =
exchange.getExchangeMetaData().getInstruments().get(instrument).getMinimumAmount();
Ticker ticker = exchange.getMarketDataService().getTicker(instrument);
Ticker ticker = bybitExchange.getMarketDataService().getTicker(instrument);
minAmount =
getMinAmount(
new BigDecimal("12"),
Expand Down Expand Up @@ -104,7 +107,7 @@ private static void websocketBatchTradeExample() throws IOException, Interrupted
.userReference(limitOrder2UserId)
.build();
compositeDisposable.add(
exchange
bybitExchange
.getStreamingTradeService()
.batchChangeOrder(List.of(changeOrder1, changeOrder2))
.subscribe(
Expand All @@ -117,7 +120,7 @@ private static void websocketBatchTradeExample() throws IOException, Interrupted
ordersToCancel.add(new BybitCancelOrderParams(instrument, "", limitOrder1UserId));
ordersToCancel.add(new BybitCancelOrderParams(instrument, "", limitOrder2UserId));
compositeDisposable.add(
exchange
bybitExchange
.getStreamingTradeService()
.batchCancelOrder(ordersToCancel)
.subscribe(
Expand All @@ -132,7 +135,7 @@ private static void websocketBatchTradeExample() throws IOException, Interrupted
private static void websocketTradeExample() throws IOException, InterruptedException {
BigDecimal minAmount =
exchange.getExchangeMetaData().getInstruments().get(instrument).getMinimumAmount();
Ticker ticker = exchange.getMarketDataService().getTicker(instrument);
Ticker ticker = bybitExchange.getMarketDataService().getTicker(instrument);
minAmount =
getMinAmount(
new BigDecimal("12"),
Expand Down
Loading
Loading