# -*- coding: utf-8 -*-

# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code

import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp
import hashlib
from ccxt.base.types import Balances, Int, Num, Order, OrderBook, OrderSide, OrderType, Position, Str, Strings, Ticker, Tickers, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
from ccxt.base.errors import AuthenticationError
from ccxt.base.errors import ArgumentsRequired
from ccxt.base.errors import BadRequest
from ccxt.base.errors import InvalidNonce


class okx(ccxt.async_support.okx):

    def describe(self):
        return self.deep_extend(super(okx, self).describe(), {
            'has': {
                'ws': True,
                'watchTicker': True,
                'watchTickers': True,
                'watchOrderBook': True,
                'watchTrades': True,
                'watchTradesForSymbols': True,
                'watchOrderBookForSymbols': True,
                'watchBalance': True,
                'watchOHLCV': True,
                'watchOHLCVForSymbols': True,
                'watchOrders': True,
                'watchMyTrades': True,
                'watchPositions': True,
                'createOrderWs': True,
                'editOrderWs': True,
                'cancelOrderWs': True,
                'cancelOrdersWs': True,
                'cancelAllOrdersWs': True,
            },
            'urls': {
                'api': {
                    'ws': 'wss://ws.okx.com:8443/ws/v5',
                },
                'test': {
                    'ws': 'wss://wspap.okx.com:8443/ws/v5',
                },
            },
            'options': {
                'watchOrderBook': {
                    #
                    # bbo-tbt
                    # 1. Newly added channel that sends tick-by-tick Level 1 data
                    # 2. All API users can subscribe
                    # 3. Public depth channel, verification not required
                    #
                    # books-l2-tbt
                    # 1. Only users who're VIP5 and above can subscribe
                    # 2. Identity verification required before subscription
                    #
                    # books50-l2-tbt
                    # 1. Only users who're VIP4 and above can subscribe
                    # 2. Identity verification required before subscription
                    #
                    # books
                    # 1. All API users can subscribe
                    # 2. Public depth channel, verification not required
                    #
                    # books5
                    # 1. All API users can subscribe
                    # 2. Public depth channel, verification not required
                    # 3. Data feeds will be delivered every 100ms(vs. every 200ms now)
                    #
                    'depth': 'books',
                },
                'watchBalance': 'spot',  # margin, futures, swap
                'watchTicker': {
                    'channel': 'tickers',  # tickers, sprd-tickers, index-tickers, block-tickers
                },
                'watchTickers': {
                    'channel': 'tickers',  # tickers, sprd-tickers, index-tickers, block-tickers
                },
                'watchOrders': {
                    'type': 'ANY',  # SPOT, MARGIN, SWAP, FUTURES, OPTION, ANY
                },
                'watchMyTrades': {
                    'type': 'ANY',  # SPOT, MARGIN, SWAP, FUTURES, OPTION, ANY
                },
                'createOrderWs': {
                    'op': 'batch-orders',  # order, batch-orders
                },
                'editOrderWs': {
                    'op': 'amend-order',  # amend-order, batch-amend-orders
                },
                'ws': {
                    # 'inflate': True,
                },
                'checksum': True,
            },
            'streaming': {
                # okex does not support built-in ws protocol-level ping-pong
                # instead it requires a custom text-based ping-pong
                'ping': self.ping,
                'keepAlive': 20000,
            },
        })

    def get_url(self, channel: str, access='public'):
        # for context: https://www.okx.com/help-center/changes-to-v5-api-websocket-subscription-parameter-and-url
        isSandbox = self.options['sandboxMode']
        sandboxSuffix = '?brokerId=9999' if isSandbox else ''
        isBusiness = (access == 'business')
        isPublic = (access == 'public')
        url = self.urls['api']['ws']
        if isBusiness or (channel.find('candle') > -1) or (channel == 'orders-algo'):
            return url + '/business' + sandboxSuffix
        elif isPublic:
            return url + '/public' + sandboxSuffix
        return url + '/private' + sandboxSuffix

    async def subscribe_multiple(self, access, channel, symbols: Strings = None, params={}):
        await self.load_markets()
        if symbols is None:
            symbols = self.symbols
        symbols = self.market_symbols(symbols)
        url = self.get_url(channel, access)
        messageHash = channel
        args = []
        messageHash += '::' + ','.join(symbols)
        for i in range(0, len(symbols)):
            marketId = self.market_id(symbols[i])
            arg = {
                'channel': channel,
                'instId': marketId,
            }
            args.append(self.extend(arg, params))
        request = {
            'op': 'subscribe',
            'args': args,
        }
        return await self.watch(url, messageHash, request, messageHash)

    async def subscribe(self, access, messageHash, channel, symbol, params={}):
        await self.load_markets()
        url = self.get_url(channel, access)
        firstArgument = {
            'channel': channel,
        }
        if symbol is not None:
            market = self.market(symbol)
            messageHash += ':' + market['id']
            firstArgument['instId'] = market['id']
        request = {
            'op': 'subscribe',
            'args': [
                self.deep_extend(firstArgument, params),
            ],
        }
        return await self.watch(url, messageHash, request, messageHash)

    async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a particular symbol
        :param str symbol: unified symbol of the market to fetch trades for
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        return await self.watch_trades_for_symbols([symbol], since, limit, params)

    async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a particular symbol
        :param str symbol: unified symbol of the market to fetch trades for
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        symbolsLength = len(symbols)
        if symbolsLength == 0:
            raise ArgumentsRequired(self.id + ' watchTradesForSymbols() requires a non-empty array of symbols')
        await self.load_markets()
        symbols = self.market_symbols(symbols)
        channel = 'trades'
        topics = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            messageHashes.append(channel + ':' + symbol)
            marketId = self.market_id(symbol)
            topic = {
                'channel': channel,
                'instId': marketId,
            }
            topics.append(topic)
        request = {
            'op': 'subscribe',
            'args': topics,
        }
        url = self.get_url(channel, 'public')
        trades = await self.watch_multiple(url, messageHashes, request, messageHashes)
        if self.newUpdates:
            first = self.safe_value(trades, 0)
            tradeSymbol = self.safe_string(first, 'symbol')
            limit = trades.getLimit(tradeSymbol, limit)
        return self.filter_by_since_limit(trades, since, limit, 'timestamp', True)

    def handle_trades(self, client: Client, message):
        #
        #     {
        #         "arg": {channel: "trades", instId: "BTC-USDT"},
        #         "data": [
        #             {
        #                 "instId": "BTC-USDT",
        #                 "tradeId": "216970876",
        #                 "px": "31684.5",
        #                 "sz": "0.00001186",
        #                 "side": "buy",
        #                 "ts": "1626531038288"
        #             }
        #         ]
        #     }
        #
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        marketId = self.safe_string(arg, 'instId')
        symbol = self.safe_symbol(marketId)
        data = self.safe_value(message, 'data', [])
        tradesLimit = self.safe_integer(self.options, 'tradesLimit', 1000)
        for i in range(0, len(data)):
            trade = self.parse_trade(data[i])
            messageHash = channel + ':' + symbol
            stored = self.safe_value(self.trades, symbol)
            if stored is None:
                stored = ArrayCache(tradesLimit)
                self.trades[symbol] = stored
            stored.append(trade)
            client.resolve(stored, messageHash)

    async def watch_ticker(self, symbol: str, params={}) -> Ticker:
        """
        :see: https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market
        :param str symbol: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        channel = None
        channel, params = self.handle_option_and_params(params, 'watchTicker', 'channel', 'tickers')
        params['channel'] = channel
        ticker = await self.watch_tickers([symbol], params)
        return self.safe_value(ticker, symbol)

    async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers:
        """
        :see: https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-tickers-channel
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list
        :param str[] [symbols]: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.channel]: the channel to subscribe to, tickers by default. Can be tickers, sprd-tickers, index-tickers, block-tickers
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        if self.is_empty(symbols):
            raise ArgumentsRequired(self.id + ' watchTickers requires a list of symbols')
        channel = None
        channel, params = self.handle_option_and_params(params, 'watchTickers', 'channel', 'tickers')
        newTickers = await self.subscribe_multiple('public', channel, symbols, params)
        if self.newUpdates:
            return newTickers
        return self.filter_by_array(self.tickers, 'symbol', symbols)

    def handle_ticker(self, client: Client, message):
        #
        #     {
        #         "arg": {channel: "tickers", instId: "BTC-USDT"},
        #         "data": [
        #             {
        #                 "instType": "SPOT",
        #                 "instId": "BTC-USDT",
        #                 "last": "31500.1",
        #                 "lastSz": "0.00001754",
        #                 "askPx": "31500.1",
        #                 "askSz": "0.00998144",
        #                 "bidPx": "31500",
        #                 "bidSz": "3.05652439",
        #                 "open24h": "31697",
        #                 "high24h": "32248",
        #                 "low24h": "31165.6",
        #                 "sodUtc0": "31385.5",
        #                 "sodUtc8": "32134.9",
        #                 "volCcy24h": "503403597.38138519",
        #                 "vol24h": "15937.10781721",
        #                 "ts": "1626526618762"
        #             }
        #         ]
        #     }
        #
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        data = self.safe_value(message, 'data', [])
        newTickers = []
        for i in range(0, len(data)):
            ticker = self.parse_ticker(data[i])
            symbol = ticker['symbol']
            self.tickers[symbol] = ticker
            newTickers.append(ticker)
        messageHashes = self.find_message_hashes(client, channel + '::')
        for i in range(0, len(messageHashes)):
            messageHash = messageHashes[i]
            parts = messageHash.split('::')
            symbolsString = parts[1]
            symbols = symbolsString.split(',')
            tickers = self.filter_by_array(newTickers, 'symbol', symbols)
            tickersSymbols = list(tickers.keys())
            numTickers = len(tickersSymbols)
            if numTickers > 0:
                client.resolve(tickers, messageHash)
        return message

    async def watch_ohlcv(self, symbol: str, timeframe='1m', since: Int = None, limit: Int = None, params={}) -> List[list]:
        """
        watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
        :param str symbol: unified symbol of the market to fetch OHLCV data for
        :param str timeframe: the length of time each candle represents
        :param int [since]: timestamp in ms of the earliest candle to fetch
        :param int [limit]: the maximum amount of candles to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns int[][]: A list of candles ordered, open, high, low, close, volume
        """
        await self.load_markets()
        symbol = self.symbol(symbol)
        interval = self.safe_string(self.timeframes, timeframe, timeframe)
        name = 'candle' + interval
        ohlcv = await self.subscribe('public', name, name, symbol, params)
        if self.newUpdates:
            limit = ohlcv.getLimit(symbol, limit)
        return self.filter_by_since_limit(ohlcv, since, limit, 0, True)

    async def watch_ohlcv_for_symbols(self, symbolsAndTimeframes: List[List[str]], since: Int = None, limit: Int = None, params={}):
        """
        watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
        :param str[][] symbolsAndTimeframes: array of arrays containing unified symbols and timeframes to fetch OHLCV data for, example [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]
        :param int [since]: timestamp in ms of the earliest candle to fetch
        :param int [limit]: the maximum amount of candles to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns int[][]: A list of candles ordered, open, high, low, close, volume
        """
        symbolsLength = len(symbolsAndTimeframes)
        if symbolsLength == 0 or not isinstance(symbolsAndTimeframes[0], list):
            raise ArgumentsRequired(self.id + " watchOHLCVForSymbols() requires a an array of symbols and timeframes, like  [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]")
        await self.load_markets()
        topics = []
        messageHashes = []
        for i in range(0, len(symbolsAndTimeframes)):
            symbolAndTimeframe = symbolsAndTimeframes[i]
            sym = symbolAndTimeframe[0]
            tf = symbolAndTimeframe[1]
            marketId = self.market_id(sym)
            interval = self.safe_string(self.timeframes, tf, tf)
            channel = 'candle' + interval
            topic = {
                'channel': channel,
                'instId': marketId,
            }
            topics.append(topic)
            messageHashes.append('multi:' + channel + ':' + sym)
        request = {
            'op': 'subscribe',
            'args': topics,
        }
        url = self.get_url('candle', 'public')
        symbol, timeframe, candles = await self.watch_multiple(url, messageHashes, request, messageHashes)
        if self.newUpdates:
            limit = candles.getLimit(symbol, limit)
        filtered = self.filter_by_since_limit(candles, since, limit, 0, True)
        return self.create_ohlcv_object(symbol, timeframe, filtered)

    def handle_ohlcv(self, client: Client, message):
        #
        #     {
        #         "arg": {channel: "candle1m", instId: "BTC-USDT"},
        #         "data": [
        #             [
        #                 "1626690720000",
        #                 "31334",
        #                 "31334",
        #                 "31334",
        #                 "31334",
        #                 "0.0077",
        #                 "241.2718"
        #             ]
        #         ]
        #     }
        #
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        data = self.safe_value(message, 'data', [])
        marketId = self.safe_string(arg, 'instId')
        market = self.safe_market(marketId)
        symbol = market['symbol']
        interval = channel.replace('candle', '')
        # use a reverse lookup in a static map instead
        timeframe = self.find_timeframe(interval)
        for i in range(0, len(data)):
            parsed = self.parse_ohlcv(data[i], market)
            self.ohlcvs[symbol] = self.safe_value(self.ohlcvs, symbol, {})
            stored = self.safe_value(self.ohlcvs[symbol], timeframe)
            if stored is None:
                limit = self.safe_integer(self.options, 'OHLCVLimit', 1000)
                stored = ArrayCacheByTimestamp(limit)
                self.ohlcvs[symbol][timeframe] = stored
            stored.append(parsed)
            messageHash = channel + ':' + market['id']
            client.resolve(stored, messageHash)
            # for multiOHLCV we need special object, to other "multi"
            # methods, because OHLCV response item does not contain symbol
            # or timeframe, thus otherwise it would be unrecognizable
            messageHashForMulti = 'multi:' + channel + ':' + symbol
            client.resolve([symbol, timeframe, stored], messageHashForMulti)

    async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
        """
        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
        :param str symbol: unified symbol of the market to fetch the order book for
        :param int [limit]: the maximum amount of order book entries to return
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        #
        # bbo-tbt
        # 1. Newly added channel that sends tick-by-tick Level 1 data
        # 2. All API users can subscribe
        # 3. Public depth channel, verification not required
        #
        # books-l2-tbt
        # 1. Only users who're VIP5 and above can subscribe
        # 2. Identity verification required before subscription
        #
        # books50-l2-tbt
        # 1. Only users who're VIP4 and above can subscribe
        # 2. Identity verification required before subscription
        #
        # books
        # 1. All API users can subscribe
        # 2. Public depth channel, verification not required
        #
        # books5
        # 1. All API users can subscribe
        # 2. Public depth channel, verification not required
        # 3. Data feeds will be delivered every 100ms(vs. every 200ms now)
        #
        return await self.watch_order_book_for_symbols([symbol], limit, params)

    async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook:
        """
        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data
        :param str[] symbols: unified array of symbols
        :param int [limit]: 1,5, 400, 50(l2-tbt, vip4+) or 40000(vip5+) the maximum amount of order book entries to return
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols)
        options = self.safe_value(self.options, 'watchOrderBook', {})
        depth = self.safe_string(options, 'depth', 'books')
        if limit is not None:
            if limit == 1:
                depth = 'bbo-tbt'
            elif limit > 1 and limit <= 5:
                depth = 'books5'
            elif limit == 400:
                depth = 'books'
            elif limit == 50:
                depth = 'books50-l2-tbt'  # Make sure you have VIP4 and above
            elif limit == 4000:
                depth = 'books-l2-tbt'  # Make sure you have VIP5 and above
        if (depth == 'books-l2-tbt') or (depth == 'books50-l2-tbt'):
            await self.authenticate({'access': 'public'})
        topics = []
        messageHashes = []
        for i in range(0, len(symbols)):
            symbol = symbols[i]
            messageHashes.append(depth + ':' + symbol)
            marketId = self.market_id(symbol)
            topic = {
                'channel': depth,
                'instId': marketId,
            }
            topics.append(topic)
        request = {
            'op': 'subscribe',
            'args': topics,
        }
        url = self.get_url(depth, 'public')
        orderbook = await self.watch_multiple(url, messageHashes, request, messageHashes)
        return orderbook.limit()

    def handle_delta(self, bookside, delta):
        #
        #     [
        #         "31685",  # price
        #         "0.78069158",  # amount
        #         "0",  # liquidated orders
        #         "17"  # orders
        #     ]
        #
        price = self.safe_float(delta, 0)
        amount = self.safe_float(delta, 1)
        bookside.store(price, amount)

    def handle_deltas(self, bookside, deltas):
        for i in range(0, len(deltas)):
            self.handle_delta(bookside, deltas[i])

    def handle_order_book_message(self, client: Client, message, orderbook, messageHash):
        #
        #     {
        #         "asks": [
        #             ['31738.3', '0.05973179', "0", "3"],
        #             ['31738.5', '0.11035404', "0", "2"],
        #             ['31739.6', '0.01', "0", "1"],
        #         ],
        #         "bids": [
        #             ['31738.2', '0.67557666', "0", "9"],
        #             ['31738', '0.02466947', "0", "2"],
        #             ['31736.3', '0.01705046', "0", "2"],
        #         ],
        #         "instId": "BTC-USDT",
        #         "ts": "1626537446491"
        #     }
        #
        asks = self.safe_value(message, 'asks', [])
        bids = self.safe_value(message, 'bids', [])
        storedAsks = orderbook['asks']
        storedBids = orderbook['bids']
        self.handle_deltas(storedAsks, asks)
        self.handle_deltas(storedBids, bids)
        checksum = self.safe_bool(self.options, 'checksum', True)
        if checksum:
            asksLength = len(storedAsks)
            bidsLength = len(storedBids)
            payloadArray = []
            for i in range(0, 25):
                if i < bidsLength:
                    payloadArray.append(self.number_to_string(storedBids[i][0]))
                    payloadArray.append(self.number_to_string(storedBids[i][1]))
                if i < asksLength:
                    payloadArray.append(self.number_to_string(storedAsks[i][0]))
                    payloadArray.append(self.number_to_string(storedAsks[i][1]))
            payload = ':'.join(payloadArray)
            responseChecksum = self.safe_integer(message, 'checksum')
            localChecksum = self.crc32(payload, True)
            if responseChecksum != localChecksum:
                error = InvalidNonce(self.id + ' invalid checksum')
                client.reject(error, messageHash)
        timestamp = self.safe_integer(message, 'ts')
        orderbook['timestamp'] = timestamp
        orderbook['datetime'] = self.iso8601(timestamp)
        return orderbook

    def handle_order_book(self, client: Client, message):
        #
        # snapshot
        #
        #     {
        #         "arg": {channel: 'books-l2-tbt', instId: "BTC-USDT"},
        #         "action": "snapshot",
        #         "data": [
        #             {
        #                 "asks": [
        #                     ['31685', '0.78069158', "0", "17"],
        #                     ['31685.1', '0.0001', "0", "1"],
        #                     ['31685.6', '0.04543165', "0", "1"],
        #                 ],
        #                 "bids": [
        #                     ['31684.9', '0.01', "0", "1"],
        #                     ['31682.9', '0.0001', "0", "1"],
        #                     ['31680.7', '0.01', "0", "1"],
        #                 ],
        #                 "ts": "1626532416403",
        #                 "checksum": -1023440116
        #             }
        #         ]
        #     }
        #
        # update
        #
        #     {
        #         "arg": {channel: 'books-l2-tbt', instId: "BTC-USDT"},
        #         "action": "update",
        #         "data": [
        #             {
        #                 "asks": [
        #                     ['31657.7', '0', "0", "0"],
        #                     ['31659.7', '0.01', "0", "1"],
        #                     ['31987.3', '0.01', "0", "1"]
        #                 ],
        #                 "bids": [
        #                     ['31642.9', '0.50296385', "0", "4"],
        #                     ['31639.9', '0', "0", "0"],
        #                     ['31638.7', '0.01', "0", "1"],
        #                 ],
        #                 "ts": "1626535709008",
        #                 "checksum": 830931827
        #             }
        #         ]
        #     }
        #
        # books5
        #
        #     {
        #         "arg": {channel: "books5", instId: "BTC-USDT"},
        #         "data": [
        #             {
        #                 "asks": [
        #                     ['31738.3', '0.05973179', "0", "3"],
        #                     ['31738.5', '0.11035404', "0", "2"],
        #                     ['31739.6', '0.01', "0", "1"],
        #                 ],
        #                 "bids": [
        #                     ['31738.2', '0.67557666', "0", "9"],
        #                     ['31738', '0.02466947', "0", "2"],
        #                     ['31736.3', '0.01705046', "0", "2"],
        #                 ],
        #                 "instId": "BTC-USDT",
        #                 "ts": "1626537446491"
        #             }
        #         ]
        #     }
        #
        # bbo-tbt
        #
        #     {
        #         "arg":{
        #             "channel":"bbo-tbt",
        #             "instId":"BTC-USDT"
        #         },
        #         "data":[
        #             {
        #                 "asks":[["36232.2","1.8826134","0","17"]],
        #                 "bids":[["36232.1","0.00572212","0","2"]],
        #                 "ts":"1651826598363"
        #             }
        #         ]
        #     }
        #
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        action = self.safe_string(message, 'action')
        data = self.safe_value(message, 'data', [])
        marketId = self.safe_string(arg, 'instId')
        market = self.safe_market(marketId)
        symbol = market['symbol']
        depths = {
            'bbo-tbt': 1,
            'books': 400,
            'books5': 5,
            'books-l2-tbt': 400,
            'books50-l2-tbt': 50,
        }
        limit = self.safe_integer(depths, channel)
        messageHash = channel + ':' + symbol
        if action == 'snapshot':
            for i in range(0, len(data)):
                update = data[i]
                orderbook = self.order_book({}, limit)
                self.orderbooks[symbol] = orderbook
                orderbook['symbol'] = symbol
                self.handle_order_book_message(client, update, orderbook, messageHash)
                client.resolve(orderbook, messageHash)
        elif action == 'update':
            if symbol in self.orderbooks:
                orderbook = self.orderbooks[symbol]
                for i in range(0, len(data)):
                    update = data[i]
                    self.handle_order_book_message(client, update, orderbook, messageHash)
                    client.resolve(orderbook, messageHash)
        elif (channel == 'books5') or (channel == 'bbo-tbt'):
            orderbook = self.safe_value(self.orderbooks, symbol)
            if orderbook is None:
                orderbook = self.order_book({}, limit)
            self.orderbooks[symbol] = orderbook
            for i in range(0, len(data)):
                update = data[i]
                timestamp = self.safe_integer(update, 'ts')
                snapshot = self.parse_order_book(update, symbol, timestamp, 'bids', 'asks', 0, 1)
                orderbook.reset(snapshot)
                client.resolve(orderbook, messageHash)
        return message

    async def authenticate(self, params={}):
        self.check_required_credentials()
        access = self.safe_string(params, 'access', 'private')
        params = self.omit(params, ['access'])
        url = self.get_url('users', access)
        messageHash = 'authenticated'
        client = self.client(url)
        future = client.future(messageHash)
        authenticated = self.safe_value(client.subscriptions, messageHash)
        if authenticated is None:
            timestamp = str(self.seconds())
            method = 'GET'
            path = '/users/self/verify'
            auth = timestamp + method + path
            signature = self.hmac(self.encode(auth), self.encode(self.secret), hashlib.sha256, 'base64')
            operation = 'login'
            request = {
                'op': operation,
                'args': [
                    {
                        'apiKey': self.apiKey,
                        'passphrase': self.password,
                        'timestamp': timestamp,
                        'sign': signature,
                    },
                ],
            }
            message = self.extend(request, params)
            self.watch(url, messageHash, message, messageHash)
        return await future

    async def watch_balance(self, params={}) -> Balances:
        """
        watch balance and get the amount of funds available for trading or funds locked in orders
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `balance structure <https://docs.ccxt.com/#/?id=balance-structure>`
        """
        await self.load_markets()
        await self.authenticate()
        return await self.subscribe('private', 'account', 'account', None, params)

    def handle_balance(self, client: Client, message):
        #
        #     {
        #         "arg": {channel: "account"},
        #         "data": [
        #             {
        #                 "adjEq": '',
        #                 "details": [
        #                     {
        #                         "availBal": '',
        #                         "availEq": "8.21009913",
        #                         "cashBal": "8.21009913",
        #                         "ccy": "USDT",
        #                         "coinUsdPrice": "0.99994",
        #                         "crossLiab": '',
        #                         "disEq": "8.2096065240522",
        #                         "eq": "8.21009913",
        #                         "eqUsd": "8.2096065240522",
        #                         "frozenBal": "0",
        #                         "interest": '',
        #                         "isoEq": "0",
        #                         "isoLiab": '',
        #                         "liab": '',
        #                         "maxLoan": '',
        #                         "mgnRatio": '',
        #                         "notionalLever": "0",
        #                         "ordFrozen": "0",
        #                         "twap": "0",
        #                         "uTime": "1621927314996",
        #                         "upl": "0"
        #                     },
        #                 ],
        #                 "imr": '',
        #                 "isoEq": "0",
        #                 "mgnRatio": '',
        #                 "mmr": '',
        #                 "notionalUsd": '',
        #                 "ordFroz": '',
        #                 "totalEq": "22.1930992296832",
        #                 "uTime": "1626692120916"
        #             }
        #         ]
        #     }
        #
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        type = 'spot'
        balance = self.parseTradingBalance(message)
        oldBalance = self.safe_value(self.balance, type, {})
        newBalance = self.deep_extend(oldBalance, balance)
        self.balance[type] = self.safe_balance(newBalance)
        client.resolve(self.balance[type], channel)

    def order_to_trade(self, order, market=None):
        info = self.safe_value(order, 'info', {})
        timestamp = self.safe_integer(info, 'fillTime')
        feeMarketId = self.safe_string(info, 'fillFeeCcy')
        isTaker = self.safe_string(info, 'execType', '') == 'T'
        return self.safe_trade({
            'info': info,
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
            'symbol': self.safe_string(order, 'symbol'),
            'id': self.safe_string(info, 'tradeId'),
            'order': self.safe_string(order, 'id'),
            'type': self.safe_string(order, 'type'),
            'takerOrMaker': 'taker' if (isTaker) else 'maker',
            'side': self.safe_string(order, 'side'),
            'price': self.safe_number(info, 'fillPx'),
            'amount': self.safe_number(info, 'fillSz'),
            'cost': self.safe_number(order, 'cost'),
            'fee': {
                'cost': self.safe_number(info, 'fillFee'),
                'currency': self.safe_currency_code(feeMarketId),
            },
        }, market)

    async def watch_my_trades(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        watches information on multiple trades made by the user
        :see: https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-order-channel
        :param str [symbol]: unified market symbol of the market trades were made in
        :param int [since]: the earliest time in ms to fetch trades for
        :param int [limit]: the maximum number of trade structures to retrieve
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param bool [params.stop]: True if fetching trigger or conditional trades
        :param str [params.type]: 'spot', 'swap', 'future', 'option', 'ANY', 'SPOT', 'MARGIN', 'SWAP', 'FUTURES' or 'OPTION'
        :param str [params.marginMode]: 'cross' or 'isolated', for automatically setting the type to spot margin
        :returns dict[]: a list of [trade structures]{@link https://docs.ccxt.com/#/?id=trade-structure
        """
        # By default, receive order updates from any instrument type
        type = None
        type, params = self.handle_option_and_params(params, 'watchMyTrades', 'type', 'ANY')
        isStop = self.safe_bool(params, 'stop', False)
        params = self.omit(params, ['stop'])
        await self.load_markets()
        await self.authenticate({'access': 'business' if isStop else 'private'})
        channel = 'orders-algo' if isStop else 'orders'
        messageHash = channel + '::myTrades'
        market = None
        if symbol is not None:
            market = self.market(symbol)
            symbol = market['symbol']
            type = market['type']
            messageHash = messageHash + '::' + symbol
        if type == 'future':
            type = 'futures'
        uppercaseType = type.upper()
        marginMode = None
        marginMode, params = self.handle_margin_mode_and_params('watchMyTrades', params)
        if uppercaseType == 'SPOT':
            if marginMode is not None:
                uppercaseType = 'MARGIN'
        request = {
            'instType': uppercaseType,
        }
        orders = await self.subscribe('private', messageHash, channel, None, self.extend(request, params))
        if self.newUpdates:
            limit = orders.getLimit(symbol, limit)
        return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True)

    async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]:
        """
        :see: https://www.okx.com/docs-v5/en/#trading-account-websocket-positions-channel
        watch all open positions
        :param str[]|None symbols: list of unified market symbols
        :param dict params: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `position structure <https://docs.ccxt.com/en/latest/manual.html#position-structure>`
        """
        await self.load_markets()
        await self.authenticate(params)
        symbols = self.market_symbols(symbols)
        request = {
            'instType': 'ANY',
        }
        channel = 'positions'
        newPositions = None
        if symbols is None:
            arg = {
                'channel': 'positions',
                'instType': 'ANY',
            }
            args = [arg]
            nonSymbolRequest = {
                'op': 'subscribe',
                'args': args,
            }
            url = self.get_url(channel, 'private')
            newPositions = await self.watch(url, channel, nonSymbolRequest, channel)
        else:
            newPositions = await self.subscribe_multiple('private', channel, symbols, self.extend(request, params))
        if self.newUpdates:
            return newPositions
        return self.filter_by_symbols_since_limit(self.positions, symbols, since, limit, True)

    def handle_positions(self, client, message):
        #
        #    {
        #        arg: {
        #            channel: 'positions',
        #            instType: 'ANY',
        #            instId: 'XRP-USDT-SWAP',
        #            uid: '464737184507959869'
        #        },
        #        data: [{
        #            adl: '1',
        #            availPos: '',
        #            avgPx: '0.52668',
        #            baseBal: '',
        #            baseBorrowed: '',
        #            baseInterest: '',
        #            bizRefId: '',
        #            bizRefType: '',
        #            cTime: '1693151444408',
        #            ccy: 'USDT',
        #            closeOrderAlgo: [],
        #            deltaBS: '',
        #            deltaPA: '',
        #            gammaBS: '',
        #            gammaPA: '',
        #            idxPx: '0.52683',
        #            imr: '17.564000000000004',
        #            instId: 'XRP-USDT-SWAP',
        #            instType: 'SWAP',
        #            interest: '',
        #            last: '0.52691',
        #            lever: '3',
        #            liab: '',
        #            liabCcy: '',
        #            liqPx: '0.3287514731020614',
        #            margin: '',
        #            markPx: '0.52692',
        #            mgnMode: 'cross',
        #            mgnRatio: '69.00363001456147',
        #            mmr: '0.26346',
        #            notionalUsd: '52.68620388000001',
        #            optVal: '',
        #            pTime: '1693151906023',
        #            pendingCloseOrdLiabVal: '',
        #            pos: '1',
        #            posCcy: '',
        #            posId: '616057041198907393',
        #            posSide: 'net',
        #            quoteBal: '',
        #            quoteBorrowed: '',
        #            quoteInterest: '',
        #            spotInUseAmt: '',
        #            spotInUseCcy: '',
        #            thetaBS: '',
        #            thetaPA: '',
        #            tradeId: '138745402',
        #            uTime: '1693151444408',
        #            upl: '0.0240000000000018',
        #            uplLastPx: '0.0229999999999952',
        #            uplRatio: '0.0013670539986328',
        #            uplRatioLastPx: '0.001310093415356',
        #            usdPx: '',
        #            vegaBS: '',
        #            vegaPA: ''
        #        }]
        #    }
        #
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel', '')
        data = self.safe_value(message, 'data', [])
        if self.positions is None:
            self.positions = ArrayCacheBySymbolBySide()
        cache = self.positions
        newPositions = []
        for i in range(0, len(data)):
            rawPosition = data[i]
            position = self.parse_position(rawPosition)
            newPositions.append(position)
            cache.append(position)
        messageHashes = self.find_message_hashes(client, channel + '::')
        for i in range(0, len(messageHashes)):
            messageHash = messageHashes[i]
            parts = messageHash.split('::')
            symbolsString = parts[1]
            symbols = symbolsString.split(',')
            positions = self.filter_by_array(newPositions, 'symbol', symbols, False)
            if not self.is_empty(positions):
                client.resolve(positions, messageHash)
        client.resolve(newPositions, channel)

    async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
        """
        watches information on multiple orders made by the user
        :see: https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-order-channel
        :param str [symbol]: unified market symbol of the market the orders were made in
        :param int [since]: the earliest time in ms to fetch orders for
        :param int [limit]: the maximum number of order structures to retrieve
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param bool [params.stop]: True if fetching trigger or conditional orders
        :param str [params.type]: 'spot', 'swap', 'future', 'option', 'ANY', 'SPOT', 'MARGIN', 'SWAP', 'FUTURES' or 'OPTION'
        :param str [params.marginMode]: 'cross' or 'isolated', for automatically setting the type to spot margin
        :returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
        """
        type = None
        # By default, receive order updates from any instrument type
        type, params = self.handle_option_and_params(params, 'watchOrders', 'type', 'ANY')
        isStop = self.safe_value_2(params, 'stop', 'trigger', False)
        params = self.omit(params, ['stop', 'trigger'])
        await self.load_markets()
        await self.authenticate({'access': 'business' if isStop else 'private'})
        market = None
        if symbol is not None:
            market = self.market(symbol)
            symbol = market['symbol']
            type = market['type']
        if type == 'future':
            type = 'futures'
        uppercaseType = type.upper()
        marginMode = None
        marginMode, params = self.handle_margin_mode_and_params('watchOrders', params)
        if uppercaseType == 'SPOT':
            if marginMode is not None:
                uppercaseType = 'MARGIN'
        request = {
            'instType': uppercaseType,
        }
        channel = 'orders-algo' if isStop else 'orders'
        orders = await self.subscribe('private', channel, channel, symbol, self.extend(request, params))
        if self.newUpdates:
            limit = orders.getLimit(symbol, limit)
        return self.filter_by_symbol_since_limit(orders, symbol, since, limit, True)

    def handle_orders(self, client: Client, message, subscription=None):
        #
        #     {
        #         "arg":{
        #             "channel":"orders",
        #             "instType":"SPOT"
        #         },
        #         "data":[
        #             {
        #                 "accFillSz":"0",
        #                 "amendResult":"",
        #                 "avgPx":"",
        #                 "cTime":"1634548275191",
        #                 "category":"normal",
        #                 "ccy":"",
        #                 "clOrdId":"e847386590ce4dBC330547db94a08ba0",
        #                 "code":"0",
        #                 "execType":"",
        #                 "fee":"0",
        #                 "feeCcy":"USDT",
        #                 "fillFee":"0",
        #                 "fillFeeCcy":"",
        #                 "fillNotionalUsd":"",
        #                 "fillPx":"",
        #                 "fillSz":"0",
        #                 "fillTime":"",
        #                 "instId":"ETH-USDT",
        #                 "instType":"SPOT",
        #                 "lever":"",
        #                 "msg":"",
        #                 "notionalUsd":"451.4516256",
        #                 "ordId":"370257534141235201",
        #                 "ordType":"limit",
        #                 "pnl":"0",
        #                 "posSide":"",
        #                 "px":"60000",
        #                 "rebate":"0",
        #                 "rebateCcy":"ETH",
        #                 "reqId":"",
        #                 "side":"sell",
        #                 "slOrdPx":"",
        #                 "slTriggerPx":"",
        #                 "state":"live",
        #                 "sz":"0.007526",
        #                 "tag":"",
        #                 "tdMode":"cash",
        #                 "tgtCcy":"",
        #                 "tpOrdPx":"",
        #                 "tpTriggerPx":"",
        #                 "tradeId":"",
        #                 "uTime":"1634548275191"
        #             }
        #         ]
        #     }
        #
        self.handle_my_trades(client, message)
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        orders = self.safe_value(message, 'data', [])
        ordersLength = len(orders)
        if ordersLength > 0:
            limit = self.safe_integer(self.options, 'ordersLimit', 1000)
            if self.orders is None:
                self.orders = ArrayCacheBySymbolById(limit)
                self.triggerOrders = ArrayCacheBySymbolById(limit)
            stored = self.triggerOrders if (channel == 'orders-algo') else self.orders
            marketIds = []
            parsed = self.parse_orders(orders)
            for i in range(0, len(parsed)):
                order = parsed[i]
                stored.append(order)
                symbol = order['symbol']
                market = self.market(symbol)
                marketIds.append(market['id'])
            client.resolve(stored, channel)
            for i in range(0, len(marketIds)):
                messageHash = channel + ':' + marketIds[i]
                client.resolve(stored, messageHash)

    def handle_my_trades(self, client: Client, message):
        #
        #     {
        #         "arg":{
        #             "channel":"orders",
        #             "instType":"SPOT"
        #         },
        #         "data":[
        #             {
        #                 "accFillSz":"0",
        #                 "amendResult":"",
        #                 "avgPx":"",
        #                 "cTime":"1634548275191",
        #                 "category":"normal",
        #                 "ccy":"",
        #                 "clOrdId":"e847386590ce4dBC330547db94a08ba0",
        #                 "code":"0",
        #                 "execType":"",
        #                 "fee":"0",
        #                 "feeCcy":"USDT",
        #                 "fillFee":"0",
        #                 "fillFeeCcy":"",
        #                 "fillNotionalUsd":"",
        #                 "fillPx":"",
        #                 "fillSz":"0",
        #                 "fillTime":"",
        #                 "instId":"ETH-USDT",
        #                 "instType":"SPOT",
        #                 "lever":"",
        #                 "msg":"",
        #                 "notionalUsd":"451.4516256",
        #                 "ordId":"370257534141235201",
        #                 "ordType":"limit",
        #                 "pnl":"0",
        #                 "posSide":"",
        #                 "px":"60000",
        #                 "rebate":"0",
        #                 "rebateCcy":"ETH",
        #                 "reqId":"",
        #                 "side":"sell",
        #                 "slOrdPx":"",
        #                 "slTriggerPx":"",
        #                 "state":"live",
        #                 "sz":"0.007526",
        #                 "tag":"",
        #                 "tdMode":"cash",
        #                 "tgtCcy":"",
        #                 "tpOrdPx":"",
        #                 "tpTriggerPx":"",
        #                 "tradeId":"",
        #                 "uTime":"1634548275191"
        #             }
        #         ]
        #     }
        #
        arg = self.safe_value(message, 'arg', {})
        channel = self.safe_string(arg, 'channel')
        rawOrders = self.safe_value(message, 'data', [])
        filteredOrders = []
        # filter orders with no last trade id
        for i in range(0, len(rawOrders)):
            rawOrder = rawOrders[i]
            tradeId = self.safe_string(rawOrder, 'tradeId', '')
            if len(tradeId) > 0:
                order = self.parse_order(rawOrder)
                filteredOrders.append(order)
        tradesLength = len(filteredOrders)
        if tradesLength == 0:
            return
        if self.myTrades is None:
            limit = self.safe_integer(self.options, 'tradesLimit', 1000)
            self.myTrades = ArrayCacheBySymbolById(limit)
        myTrades = self.myTrades
        symbols = {}
        for i in range(0, len(filteredOrders)):
            rawTrade = filteredOrders[i]
            trade = self.order_to_trade(rawTrade)
            myTrades.append(trade)
            symbol = trade['symbol']
            symbols[symbol] = True
        messageHash = channel + '::myTrades'
        client.resolve(self.myTrades, messageHash)
        tradeSymbols = list(symbols.keys())
        for i in range(0, len(tradeSymbols)):
            symbolMessageHash = messageHash + '::' + tradeSymbols[i]
            client.resolve(self.orders, symbolMessageHash)

    async def create_order_ws(self, symbol: str, type: OrderType, side: OrderSide, amount: float, price: Num = None, params={}) -> Order:
        """
        :see: https://www.okx.com/docs-v5/en/#websocket-api-trade-place-order
        create a trade order
        :param str symbol: unified symbol of the market to create an order in
        :param str type: 'market' or 'limit'
        :param str side: 'buy' or 'sell'
        :param float amount: how much of currency you want to trade in units of base currency
        :param float|None [price]: the price at which the order is to be fullfilled, in units of the quote currency, ignored in market orders
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param boolean params['test']: test order, default False
        :returns dict: an `order structure <https://docs.ccxt.com/#/?id=order-structure>`
        """
        await self.load_markets()
        await self.authenticate()
        url = self.get_url('private', 'private')
        messageHash = str(self.nonce())
        op = None
        op, params = self.handle_option_and_params(params, 'createOrderWs', 'op', 'batch-orders')
        args = self.create_order_request(symbol, type, side, amount, price, params)
        ordType = self.safe_string(args, 'ordType')
        if (ordType == 'trigger') or (ordType == 'conditional') or (type == 'oco') or (type == 'move_order_stop') or (type == 'iceberg') or (type == 'twap'):
            raise BadRequest(self.id + ' createOrderWs() does not support algo trading. self.options["createOrderWs"]["op"] must be either order or batch-order')
        if (op != 'order') and (op != 'batch-orders'):
            raise BadRequest(self.id + ' createOrderWs() does not support algo trading. self.options["createOrderWs"]["op"] must be either order or privatePostTradeOrder or privatePostTradeOrderAlgo')
        request = {
            'id': messageHash,
            'op': op,
            'args': [args],
        }
        return await self.watch(url, messageHash, request, messageHash)

    def handle_place_orders(self, client: Client, message):
        #
        #  batch-orders/order/cancel-order
        #    {
        #        "id": "1689281055",
        #        "op": "batch-orders",
        #        "code": "0",
        #        "msg": '',
        #        "data": [{
        #            "tag": "e847386590ce4dBC",
        #            "ordId": "599823446566084608",
        #            "clOrdId": "e847386590ce4dBCb939511604f394b0",
        #            "sCode": "0",
        #            "sMsg": "Order successfully placed."
        #        },
        #        ...
        #        ]
        #    }
        #
        messageHash = self.safe_string(message, 'id')
        args = self.safe_value(message, 'data', [])
        # filter out partial errors
        args = self.filter_by(args, 'sCode', '0')
        # if empty means request failed and handle error
        if self.is_empty(args):
            method = self.safe_string(message, 'op')
            stringMsg = self.json(message)
            self.handle_errors(None, None, client.url, method, None, stringMsg, stringMsg, None, None)
        orders = self.parse_orders(args, None, None, None)
        first = self.safe_dict(orders, 0, {})
        client.resolve(first, messageHash)

    async def edit_order_ws(self, id: str, symbol: str, type: OrderType, side: OrderSide, amount: float, price: Num = None, params={}) -> Order:
        """
        edit a trade order
        :see: https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-amend-order
        :see: https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-amend-multiple-orders
        :param str id: order id
        :param str symbol: unified symbol of the market to create an order in
        :param str type: 'market' or 'limit'
        :param str side: 'buy' or 'sell'
        :param float amount: how much of the currency you want to trade in units of the base currency
        :param float|None [price]: the price at which the order is to be fullfilled, in units of the quote currency, ignored in market orders
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: an `order structure <https://docs.ccxt.com/#/?id=order-structure>`
        """
        await self.load_markets()
        await self.authenticate()
        url = self.get_url('private', 'private')
        messageHash = str(self.nonce())
        op = None
        op, params = self.handle_option_and_params(params, 'editOrderWs', 'op', 'amend-order')
        args = self.edit_order_request(id, symbol, type, side, amount, price, params)
        request = {
            'id': messageHash,
            'op': op,
            'args': [args],
        }
        return await self.watch(url, messageHash, self.extend(request, params), messageHash)

    async def cancel_order_ws(self, id: str, symbol: Str = None, params={}) -> Order:
        """
        :see: https://okx-docs.github.io/apidocs/websocket_api/en/#cancel-order-trade
        cancel multiple orders
        :param str id: order id
        :param str symbol: unified market symbol, default is None
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.clOrdId]: client order id
        :returns dict: an list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
        """
        if symbol is None:
            raise BadRequest(self.id + ' cancelOrderWs() requires a symbol argument')
        await self.load_markets()
        await self.authenticate()
        url = self.get_url('private', 'private')
        messageHash = str(self.nonce())
        clientOrderId = self.safe_string_2(params, 'clOrdId', 'clientOrderId')
        params = self.omit(params, ['clientOrderId', 'clOrdId'])
        arg = {
            'instId': self.market_id(symbol),
        }
        if clientOrderId is not None:
            arg['clOrdId'] = clientOrderId
        else:
            arg['ordId'] = id
        request = {
            'id': messageHash,
            'op': 'cancel-order',
            'args': [self.extend(arg, params)],
        }
        return await self.watch(url, messageHash, request, messageHash)

    async def cancel_orders_ws(self, ids: List[str], symbol: Str = None, params={}):
        """
        :see: https://www.okx.com/docs-v5/en/#order-book-trading-trade-ws-mass-cancel-order
        cancel multiple orders
        :param str[] ids: order ids
        :param str symbol: unified market symbol, default is None
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: an list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
        """
        idsLength = len(ids)
        if idsLength > 20:
            raise BadRequest(self.id + ' cancelOrdersWs() accepts up to 20 ids at a time')
        if symbol is None:
            raise BadRequest(self.id + ' cancelOrdersWs() requires a symbol argument')
        await self.load_markets()
        await self.authenticate()
        url = self.get_url('private', 'private')
        messageHash = str(self.nonce())
        args = []
        for i in range(0, idsLength):
            arg = {
                'instId': self.market_id(symbol),
                'ordId': ids[i],
            }
            args.append(arg)
        request = {
            'id': messageHash,
            'op': 'batch-cancel-orders',
            'args': args,
        }
        return await self.watch(url, messageHash, self.deep_extend(request, params), messageHash)

    async def cancel_all_orders_ws(self, symbol: Str = None, params={}):
        """
        :see: https://docs.okx.com/websockets/#message-cancelAll
        cancel all open orders of a type. Only applicable to Option in Portfolio Margin mode, and MMP privilege is required.
        :param str symbol: unified market symbol, only orders in the market of self symbol are cancelled when symbol is not None
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
        """
        if symbol is None:
            raise BadRequest(self.id + ' cancelAllOrdersWs() requires a symbol argument')
        await self.load_markets()
        await self.authenticate()
        market = self.market(symbol)
        if market['type'] != 'option':
            raise BadRequest(self.id + 'cancelAllOrdersWs is only applicable to Option in Portfolio Margin mode, and MMP privilege is required.')
        url = self.get_url('private', 'private')
        messageHash = str(self.nonce())
        request = {
            'id': messageHash,
            'op': 'mass-cancel',
            'args': [self.extend({
                'instType': 'OPTION',
                'instFamily': market['id'],
            }, params)],
        }
        return await self.watch(url, messageHash, request, messageHash)

    def handle_cancel_all_orders(self, client: Client, message):
        #
        #    {
        #        "id": "1512",
        #        "op": "mass-cancel",
        #        "data": [
        #            {
        #                "result": True
        #            }
        #        ],
        #        "code": "0",
        #        "msg": ""
        #    }
        #
        messageHash = self.safe_string(message, 'id')
        data = self.safe_value(message, 'data', [])
        client.resolve(data, messageHash)

    def handle_subscription_status(self, client: Client, message):
        #
        #     {event: 'subscribe', arg: {channel: "tickers", instId: "BTC-USDT"}}
        #
        # channel = self.safe_string(message, "channel")
        # client.subscriptions[channel] = message
        return message

    def handle_authenticate(self, client: Client, message):
        #
        #     {event: "login", success: True}
        #
        future = self.safe_value(client.futures, 'authenticated')
        future.resolve(True)

    def ping(self, client):
        # okex does not support built-in ws protocol-level ping-pong
        # instead it requires custom text-based ping-pong
        return 'ping'

    def handle_pong(self, client: Client, message):
        client.lastPong = self.milliseconds()
        return message

    def handle_error_message(self, client: Client, message):
        #
        #     {event: 'error', msg: "Illegal request: {"op":"subscribe","args":["spot/ticker:BTC-USDT"]}", code: "60012"}
        #     {event: 'error", msg: "channel:ticker,instId:BTC-USDT doesn"t exist", code: "60018"}
        #
        errorCode = self.safe_integer(message, 'code')
        try:
            if errorCode:
                feedback = self.id + ' ' + self.json(message)
                self.throw_exactly_matched_exception(self.exceptions['exact'], errorCode, feedback)
                messageString = self.safe_value(message, 'msg')
                if messageString is not None:
                    self.throw_broadly_matched_exception(self.exceptions['broad'], messageString, feedback)
        except Exception as e:
            if isinstance(e, AuthenticationError):
                messageHash = 'authenticated'
                client.reject(e, messageHash)
                if messageHash in client.subscriptions:
                    del client.subscriptions[messageHash]
                return False
            else:
                client.reject(e)
        return message

    def handle_message(self, client: Client, message):
        if not self.handle_error_message(client, message):
            return
        #
        #     {event: 'subscribe', arg: {channel: "tickers", instId: "BTC-USDT"}}
        #     {event: 'login", msg: '", code: "0"}
        #
        #     {
        #         "arg": {channel: "tickers", instId: "BTC-USDT"},
        #         "data": [
        #             {
        #                 "instType": "SPOT",
        #                 "instId": "BTC-USDT",
        #                 "last": "31500.1",
        #                 "lastSz": "0.00001754",
        #                 "askPx": "31500.1",
        #                 "askSz": "0.00998144",
        #                 "bidPx": "31500",
        #                 "bidSz": "3.05652439",
        #                 "open24h": "31697",
        #                 "high24h": "32248",
        #                 "low24h": "31165.6",
        #                 "sodUtc0": "31385.5",
        #                 "sodUtc8": "32134.9",
        #                 "volCcy24h": "503403597.38138519",
        #                 "vol24h": "15937.10781721",
        #                 "ts": "1626526618762"
        #             }
        #         ]
        #     }
        #
        #     {event: 'error', msg: "Illegal request: {"op":"subscribe","args":["spot/ticker:BTC-USDT"]}", code: "60012"}
        #     {event: 'error", msg: "channel:ticker,instId:BTC-USDT doesn"t exist", code: "60018"}
        #     {event: 'error', msg: "Invalid OK_ACCESS_KEY", code: "60005"}
        #     {
        #         "event": "error",
        #         "msg": "Illegal request: {"op":"login","args":["de89b035-b233-44b2-9a13-0ccdd00bda0e","7KUcc8YzQhnxBE3K","1626691289","H57N99mBt5NvW8U19FITrPdOxycAERFMaapQWRqLaSE="]}",
        #         "code": "60012"
        #     }
        #
        #
        #
        if message == 'pong':
            self.handle_pong(client, message)
            return
        # table = self.safe_string(message, 'table')
        # if table is None:
        event = self.safe_string_2(message, 'event', 'op')
        if event is not None:
            methods = {
                # 'info': self.handleSystemStatus,
                # 'book': 'handleOrderBook',
                'login': self.handle_authenticate,
                'subscribe': self.handle_subscription_status,
                'order': self.handle_place_orders,
                'batch-orders': self.handle_place_orders,
                'amend-order': self.handle_place_orders,
                'batch-amend-orders': self.handle_place_orders,
                'cancel-order': self.handle_place_orders,
                'mass-cancel': self.handle_cancel_all_orders,
            }
            method = self.safe_value(methods, event)
            if method is not None:
                method(client, message)
        else:
            arg = self.safe_value(message, 'arg', {})
            channel = self.safe_string(arg, 'channel')
            methods = {
                'bbo-tbt': self.handle_order_book,  # newly added channel that sends tick-by-tick Level 1 data, all API users can subscribe, public depth channel, verification not required
                'books': self.handle_order_book,  # all API users can subscribe, public depth channel, verification not required
                'books5': self.handle_order_book,  # all API users can subscribe, public depth channel, verification not required, data feeds will be delivered every 100ms(vs. every 200ms now)
                'books50-l2-tbt': self.handle_order_book,  # only users who're VIP4 and above can subscribe, identity verification required before subscription
                'books-l2-tbt': self.handle_order_book,  # only users who're VIP5 and above can subscribe, identity verification required before subscription
                'tickers': self.handle_ticker,
                'positions': self.handle_positions,
                'index-tickers': self.handle_ticker,
                'sprd-tickers': self.handle_ticker,
                'block-tickers': self.handle_ticker,
                'trades': self.handle_trades,
                'account': self.handle_balance,
                # 'margin_account': self.handle_balance,
                'orders': self.handle_orders,
                'orders-algo': self.handle_orders,
            }
            method = self.safe_value(methods, channel)
            if method is None:
                if channel.find('candle') == 0:
                    self.handle_ohlcv(client, message)
            else:
                method(client, message)
