Examples

このページでは pybotters のサンプルコードを紹介します。

注釈

ここのサンプルコードでは rich ライブラリの print を利用しています。

rich はターミナル上に美しい形式のテキストを表示してくれるライブラリです。 API で取得した JSON を rich.print すると、人間に分かりやすく綺麗に表示してくれるのでとても役に立ちます。 別途インストールすることをおすすめします。

pip install rich

Trading bot

pybottersWebSocket APIDataStore を利用した bitFlyer トレード bot の基礎的な構成例です。 説明はコードの下にあります。

import asyncio
import os
from dataclasses import dataclass
from typing import Literal, Optional

import pybotters

try:
    from rich import print
except ImportError:
    pass


@dataclass
class OrderCondition:
    """Order condition"""

    is_execute: bool
    side: Optional[Literal["BUY", "SELL"]]
    price: Optional[float]
    size: Optional[float]


def your_awesome_algorithm(childorders, positions, board, ticker, executions):
    """Calculate the ordering conditions with your algorithm

    You should do the following:

    1. Calculate limit price, ordrer side and size from board, ticker and executions
    2. Judge whether or not to execute from childorders and positions
    """

    ...

    # Mock algorithm: Buy at best bid - 1000 if there are no existing orders
    price = round(ticker["best_bid"] - 1000.0)
    is_execute = True if not len(childorders) else False

    return OrderCondition(is_execute=is_execute, side="BUY", price=price, size=0.01)


async def main():
    """Main function"""

    product_code = os.getenv("PRODUCT_CODE", "FX_BTC_JPY")

    apis = {
        "bitflyer": [os.getenv("BITFLYER_API_KEY"), os.getenv("BITFLYER_API_SECRET")]
    }
    base_url = "https://api.bitflyer.com"
    async with pybotters.Client(apis=apis, base_url=base_url) as client:
        store = pybotters.bitFlyerDataStore()

        print("Initializing DataStore from HTTP ...")
        await store.initialize(
            client.get("/v1/me/getchildorders", params={"product_code": product_code}),
            client.get("/v1/me/getpositions", params={"product_code": product_code}),
        )

        print("Connecting to WebSocket ...")
        await client.ws_connect(
            "wss://ws.lightstream.bitflyer.com/json-rpc",
            send_json=[
                {
                    "method": "subscribe",
                    "params": {"channel": "child_order_events"},
                    "id": 1,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": f"lightning_board_snapshot_{product_code}"},
                    "id": 2,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": f"lightning_board_{product_code}"},
                    "id": 3,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": f"lightning_ticker_{product_code}"},
                    "id": 4,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": f"lightning_executions_{product_code}"},
                    "id": 5,
                },
            ],
            hdlr_json=store.onmessage,
        )

        print("Waiting snapshot data from WebSocket ...")
        while not (len(store.board) and len(store.ticker)):
            await store.wait()

        print("Start main loop")
        while True:
            # Retrieve data from DataStore
            childorders = store.childorders.find({"product_code": product_code})
            positions = store.positions.find({"product_code": product_code})
            board = store.board.sorted({"product_code": product_code})
            ticker = store.ticker.get({"product_code": product_code})
            executions = store.executions.find({"product_code": product_code})

            # Calculate the ordering conditions with your algorithm
            condition = your_awesome_algorithm(
                childorders, positions, board, ticker, executions
            )

            print(condition)
            """Example:
            OrderCondition(
                is_execute=True,
                side="BUY",
                price=1234567.0,
                size=0.01,
            )
            """

            if condition.is_execute:
                # Execute order
                with store.childorderevents.watch() as stream:
                    print("Sending order ...")
                    r = await client.fetch(
                        "POST",
                        "/v1/me/sendchildorder",
                        data={
                            "product_code": product_code,
                            "child_order_type": "LIMIT",
                            "side": condition.side,
                            "price": condition.price,
                            "size": condition.size,
                        },
                    )
                    print(f"<Reponse [{r.response.status} {r.response.reason}]>")
                    print(r.data)

                    if r.response.ok and "child_order_acceptance_id" in r.data:
                        print("Waiting for DataStore updates ...")
                        child_order_acceptance_id = r.data["child_order_acceptance_id"]
                        async for change in stream:
                            if (
                                change.data["child_order_acceptance_id"]
                                == child_order_acceptance_id
                            ):
                                break

            print("Waiting for next loop ...")
            await asyncio.sleep(1.0)


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

Workflow

このサンプルコードは以下のような処理フローとなっています。

  1. 環境変数から API 認証情報などを取得する

  2. DataStore を生成して HTTP API で初期化する

  3. WebSocket に接続する

  4. WebSocket からの初期データを待機する

  5. メインループを開始する

メインループは、以下のような処理フローとなっています。

  1. DataStore から各種データを取得する

  2. アルゴリズム関数にデータを入力して、注文条件を計算する

    注釈

    アルゴリズムはトレード bot の損益を左右する最も重要な部分です。 あなたのトレード戦略を実装しましょう!

    サンプルコードでは「既存の注文がない場合、ベスト Bid - 1000 に買い指値を置く」というアルゴリズムになっています。

  3. 注文の執行条件が真の場合、注文を執行する
    1. 注文イベントの変更ストリームを開く

    2. HTTP API で注文を送信する

    3. HTTP API レスポンスが正常なら、DataStore のデータを同期するため待機します
      1. 注文イベントの変更ストリームを開始して、データを取得するまで待機する

      2. 取得したデータの注文受付 ID が HTTP API と同じ ID ならストリームを抜ける

      注釈

      DataStore の注文やポジションを利用してアルゴリズムで注文条件を管理している場合、この待機処理は重要です

      HTTP API で注文した結果は WebSocket の注文イベントとして流れてきます。 その注文イベントによって DataStore のデータが更新されます。 しかしその WebSocket の注文イベントはいつ流れてくるか分かりません。 このサンプルコードでは 1 秒後に次のループに移ります。 bitFlyer の取引マッチングエンジンが混雑している場合、 1 秒より後に流れてくるかもしれません。 そうすると、DataStore が同期されていないので次のループでは 既存の注文がない ことになります。 サンプルコードでのアルゴリズムは、既存の注文がない場合指値注文をするので 重複注文が発生することになります

      WebSocket を利用しつつ注文やポジションを厳密に管理するアプローチでは、このようにして待機処理を行います。 もちろん、このように厳密に管理せず小口で注文するようなアプローチも考えられるので、あなたのトレード戦略にあったアプローチを検討してください。

  4. 1 秒待機して次のループに移る

Order Book watching

対応取引所の 取引所固有の DataStore を利用した板情報を監視するサンプルコードです。

このサンプルコードでは以下について理解できるでしょう。

  1. それぞれの取引所に WebSocket 接続する

  2. WebSocket でチャンネルを購読する

  3. Order Book 系ストアの扱い方

  4. Order Book 系ストアのデータ構造

bitFlyer

import asyncio

import pybotters

try:
    from rich import print
except ImportError:
    pass


async def main():
    async with pybotters.Client() as client:
        store = pybotters.bitFlyerDataStore()

        await client.ws_connect(
            "wss://ws.lightstream.bitflyer.com/json-rpc",
            send_json=[
                {
                    "method": "subscribe",
                    "params": {"channel": "lightning_board_snapshot_FX_BTC_JPY"},
                    "id": 1,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": "lightning_board_FX_BTC_JPY"},
                    "id": 2,
                },
            ],
            hdlr_json=store.onmessage,
        )

        while True:
            orderbook = store.board.sorted(limit=2)
            print(orderbook)

            await store.board.wait()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

GMO Coin

import asyncio

import pybotters

try:
    from rich import print
except ImportError:
    pass


async def main():
    async with pybotters.Client() as client:
        store = pybotters.GMOCoinDataStore()

        await client.ws_connect(
            "wss://api.coin.z.com/ws/public/v1",
            send_json={
                "command": "subscribe",
                "channel": "orderbooks",
                "symbol": "BTC_JPY",
            },
            hdlr_json=store.onmessage,
        )

        while True:
            orderbook = store.orderbooks.sorted(limit=2)
            print(orderbook)

            await store.orderbooks.wait()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

bitbank

import asyncio

import pybotters

try:
    from rich import print
except ImportError:
    pass


async def main():
    async with pybotters.Client() as client:
        store = pybotters.bitbankDataStore()

        await client.ws_connect(
            "wss://stream.bitbank.cc/socket.io/?EIO=3&transport=websocket",
            send_str=[
                '42["join-room","depth_whole_btc_jpy"]',
                '42["join-room","depth_diff_btc_jpy"]',
            ],
            hdlr_str=store.onmessage,
        )

        while True:
            orderbook = store.depth.sorted(limit=2)
            print(orderbook)

            await store.depth.wait()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

Coincheck

import asyncio

import pybotters

try:
    from rich import print
except ImportError:
    pass


async def main():
    async with pybotters.Client(base_url="https://coincheck.com") as client:
        store = pybotters.CoincheckDataStore()

        await client.ws_connect(
            "wss://ws-api.coincheck.com/",
            send_json={"type": "subscribe", "channel": "btc_jpy-orderbook"},
            hdlr_json=store.onmessage,
        )

        await store.initialize(
            client.get("/api/order_books", params={"pair": "btc_jpy"})
        )

        while True:
            orderbook = store.orderbook.sorted(limit=2)
            print(orderbook)

            await store.orderbook.wait()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

Bybit

import asyncio

import pybotters

try:
    from rich import print
except ImportError:
    pass


async def main():
    async with pybotters.Client() as client:
        store = pybotters.BybitDataStore()

        await client.ws_connect(
            "wss://stream.bybit.com/v5/public/linear",
            send_json={"op": "subscribe", "args": ["orderbook.50.BTCUSDT"]},
            hdlr_json=store.onmessage,
        )

        while True:
            orderbook = store.orderbook.sorted(limit=2)
            print(orderbook)

            await store.orderbook.wait()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

Binance

import asyncio

import pybotters

try:
    from rich import print
except ImportError:
    pass


async def main():
    async with pybotters.Client(base_url="https://fapi.binance.com") as client:
        store = pybotters.BinanceUSDSMDataStore()

        await client.ws_connect(
            "wss://fstream.binance.com/stream",
            send_json={
                "method": "SUBSCRIBE",
                "params": ["btcusdt@depth"],
                "id": 1,
            },
            hdlr_json=store.onmessage,
        )

        await store.initialize(
            client.get("/fapi/v1/depth", params={"symbol": "BTCUSDT", "limit": "1000"})
        )

        while True:
            orderbook = store.orderbook.sorted(limit=2)
            print(orderbook)

            await store.orderbook.wait()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

OKX

import asyncio

import pybotters

try:
    from rich import print
except ImportError:
    pass


async def main():
    async with pybotters.Client() as client:
        store = pybotters.OKXDataStore()

        await client.ws_connect(
            "wss://ws.okx.com:8443/ws/v5/public",
            send_json={
                "op": "subscribe",
                "args": [{"channel": "books", "instId": "BTC-USDT"}],
            },
            hdlr_json=store.onmessage,
        )

        while True:
            orderbook = store.books.sorted(limit=2)
            print(orderbook)

            await store.books.wait()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

Phemex

import asyncio

import pybotters

try:
    from rich import print
except ImportError:
    pass


async def main():
    async with pybotters.Client() as client:
        store = pybotters.PhemexDataStore()

        await client.ws_connect(
            "wss://ws.phemex.com",
            send_json={
                "id": 1234,
                "method": "orderbook_p.subscribe",
                "params": ["BTCUSDT"],
            },
            hdlr_json=store.onmessage,
        )

        while True:
            orderbook = store.orderbook.sorted(limit=2)
            print(orderbook)

            await store.orderbook.wait()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

Bitget

import asyncio

import pybotters

try:
    from rich import print
except ImportError:
    pass


async def main():
    async with pybotters.Client() as client:
        store = pybotters.BitgetDataStore()

        await client.ws_connect(
            "wss://ws.bitget.com/mix/v1/stream",
            send_json={
                "op": "subscribe",
                "args": [{"instType": "mc", "channel": "books", "instId": "BTCUSDT"}],
            },
            hdlr_json=store.onmessage,
        )

        while True:
            orderbook = store.orderbook.sorted(limit=2)
            print(orderbook)

            await store.orderbook.wait()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

KuCoin

import asyncio

import pybotters

try:
    from rich import print
except ImportError:
    pass


async def main():
    async with pybotters.Client(base_url="https://api-futures.kucoin.com") as client:
        store = pybotters.KuCoinDataStore()

        await store.initialize(client.post("/api/v1/bullet-public"))

        await client.ws_connect(
            store.endpoint,
            send_json={
                "id": 1545910660739,
                "type": "subscribe",
                "topic": "/contractMarket/level2Depth50:XBTUSDTM",
                "privateChannel": False,
                "response": True,
            },
            hdlr_json=store.onmessage,
        )

        while True:
            orderbook = store.orderbook50.sorted(limit=2)
            print(orderbook)

            await store.orderbook50.wait()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass