Examples¶
このページでは pybotters
のサンプルコードを紹介します。
注釈
ここのサンプルコードでは rich ライブラリの print を利用しています。
rich
はターミナル上に美しい形式のテキストを表示してくれるライブラリです。
API で取得した JSON を rich.print
すると、人間に分かりやすく綺麗に表示してくれるのでとても役に立ちます。
別途インストールすることをおすすめします。
pip install rich
Trading bot¶
pybotters
で WebSocket API と DataStore を利用した bitFlyer トレード bot の基礎的な構成例です。
説明はコードの下にあります。
import asyncio
import os
from dataclasses import dataclass
from typing import Literal, Optional
import pybotters
try:
from rich import print
except ImportError:
pass
@dataclass
class OrderCondition:
"""Order condition"""
is_execute: bool
side: Optional[Literal["BUY", "SELL"]]
price: Optional[float]
size: Optional[float]
def your_awesome_algorithm(childorders, positions, board, ticker, executions):
"""Calculate the ordering conditions with your algorithm
You should do the following:
1. Calculate limit price, ordrer side and size from board, ticker and executions
2. Judge whether or not to execute from childorders and positions
"""
...
# Mock algorithm: Buy at best bid - 1000 if there are no existing orders
price = round(ticker["best_bid"] - 1000.0)
is_execute = True if not len(childorders) else False
return OrderCondition(is_execute=is_execute, side="BUY", price=price, size=0.01)
async def main():
"""Main function"""
product_code = os.getenv("PRODUCT_CODE", "FX_BTC_JPY")
apis = {
"bitflyer": [os.getenv("BITFLYER_API_KEY"), os.getenv("BITFLYER_API_SECRET")]
}
base_url = "https://api.bitflyer.com"
async with pybotters.Client(apis=apis, base_url=base_url) as client:
store = pybotters.bitFlyerDataStore()
print("Initializing DataStore from HTTP ...")
await store.initialize(
client.get("/v1/me/getchildorders", params={"product_code": product_code}),
client.get("/v1/me/getpositions", params={"product_code": product_code}),
)
print("Connecting to WebSocket ...")
await client.ws_connect(
"wss://ws.lightstream.bitflyer.com/json-rpc",
send_json=[
{
"method": "subscribe",
"params": {"channel": "child_order_events"},
"id": 1,
},
{
"method": "subscribe",
"params": {"channel": f"lightning_board_snapshot_{product_code}"},
"id": 2,
},
{
"method": "subscribe",
"params": {"channel": f"lightning_board_{product_code}"},
"id": 3,
},
{
"method": "subscribe",
"params": {"channel": f"lightning_ticker_{product_code}"},
"id": 4,
},
{
"method": "subscribe",
"params": {"channel": f"lightning_executions_{product_code}"},
"id": 5,
},
],
hdlr_json=store.onmessage,
)
print("Waiting snapshot data from WebSocket ...")
while not (len(store.board) and len(store.ticker)):
await store.wait()
print("Start main loop")
while True:
# Retrieve data from DataStore
childorders = store.childorders.find({"product_code": product_code})
positions = store.positions.find({"product_code": product_code})
board = store.board.sorted({"product_code": product_code})
ticker = store.ticker.get({"product_code": product_code})
executions = store.executions.find({"product_code": product_code})
# Calculate the ordering conditions with your algorithm
condition = your_awesome_algorithm(
childorders, positions, board, ticker, executions
)
print(condition)
"""Example:
OrderCondition(
is_execute=True,
side="BUY",
price=1234567.0,
size=0.01,
)
"""
if condition.is_execute:
# Execute order
with store.childorderevents.watch() as stream:
print("Sending order ...")
r = await client.fetch(
"POST",
"/v1/me/sendchildorder",
data={
"product_code": product_code,
"child_order_type": "LIMIT",
"side": condition.side,
"price": condition.price,
"size": condition.size,
},
)
print(f"<Reponse [{r.response.status} {r.response.reason}]>")
print(r.data)
if r.response.ok and "child_order_acceptance_id" in r.data:
print("Waiting for DataStore updates ...")
child_order_acceptance_id = r.data["child_order_acceptance_id"]
async for change in stream:
if (
change.data["child_order_acceptance_id"]
== child_order_acceptance_id
):
break
print("Waiting for next loop ...")
await asyncio.sleep(1.0)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Workflow¶
このサンプルコードは以下のような処理フローとなっています。
環境変数から API 認証情報などを取得する
DataStore を生成して HTTP API で初期化する
WebSocket に接続する
WebSocket からの初期データを待機する
メインループを開始する
メインループは、以下のような処理フローとなっています。
DataStore から各種データを取得する
- アルゴリズム関数にデータを入力して、注文条件を計算する
注釈
アルゴリズムはトレード bot の損益を左右する最も重要な部分です。 あなたのトレード戦略を実装しましょう!
サンプルコードでは「既存の注文がない場合、ベスト Bid - 1000 に買い指値を置く」というアルゴリズムになっています。
- 注文の執行条件が真の場合、注文を執行する
注文イベントの変更ストリームを開く
HTTP API で注文を送信する
- HTTP API レスポンスが正常なら、DataStore のデータを同期するため待機します
注文イベントの変更ストリームを開始して、データを取得するまで待機する
取得したデータの注文受付 ID が HTTP API と同じ ID ならストリームを抜ける
注釈
DataStore の注文やポジションを利用してアルゴリズムで注文条件を管理している場合、この待機処理は重要です 。
HTTP API で注文した結果は WebSocket の注文イベントとして流れてきます。 その注文イベントによって DataStore のデータが更新されます。 しかしその WebSocket の注文イベントはいつ流れてくるか分かりません。 このサンプルコードでは 1 秒後に次のループに移ります。 bitFlyer の取引マッチングエンジンが混雑している場合、 1 秒より後に流れてくるかもしれません。 そうすると、DataStore が同期されていないので次のループでは 既存の注文がない ことになります。 サンプルコードでのアルゴリズムは、既存の注文がない場合指値注文をするので 重複注文が発生することになります 。
WebSocket を利用しつつ注文やポジションを厳密に管理するアプローチでは、このようにして待機処理を行います。 もちろん、このように厳密に管理せず小口で注文するようなアプローチも考えられるので、あなたのトレード戦略にあったアプローチを検討してください。
1 秒待機して次のループに移る
Order Book watching¶
対応取引所の 取引所固有の DataStore を利用した板情報を監視するサンプルコードです。
このサンプルコードでは以下について理解できるでしょう。
それぞれの取引所に WebSocket 接続する
WebSocket でチャンネルを購読する
Order Book 系ストアの扱い方
Order Book 系ストアのデータ構造
bitFlyer¶
import asyncio
import pybotters
try:
from rich import print
except ImportError:
pass
async def main():
async with pybotters.Client() as client:
store = pybotters.bitFlyerDataStore()
await client.ws_connect(
"wss://ws.lightstream.bitflyer.com/json-rpc",
send_json=[
{
"method": "subscribe",
"params": {"channel": "lightning_board_snapshot_FX_BTC_JPY"},
"id": 1,
},
{
"method": "subscribe",
"params": {"channel": "lightning_board_FX_BTC_JPY"},
"id": 2,
},
],
hdlr_json=store.onmessage,
)
while True:
orderbook = store.board.sorted(limit=2)
print(orderbook)
await store.board.wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
GMO Coin¶
import asyncio
import pybotters
try:
from rich import print
except ImportError:
pass
async def main():
async with pybotters.Client() as client:
store = pybotters.GMOCoinDataStore()
await client.ws_connect(
"wss://api.coin.z.com/ws/public/v1",
send_json={
"command": "subscribe",
"channel": "orderbooks",
"symbol": "BTC_JPY",
},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.orderbooks.sorted(limit=2)
print(orderbook)
await store.orderbooks.wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
bitbank¶
import asyncio
import pybotters
try:
from rich import print
except ImportError:
pass
async def main():
async with pybotters.Client() as client:
store = pybotters.bitbankDataStore()
await client.ws_connect(
"wss://stream.bitbank.cc/socket.io/?EIO=3&transport=websocket",
send_str=[
'42["join-room","depth_whole_btc_jpy"]',
'42["join-room","depth_diff_btc_jpy"]',
],
hdlr_str=store.onmessage,
)
while True:
orderbook = store.depth.sorted(limit=2)
print(orderbook)
await store.depth.wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Coincheck¶
import asyncio
import pybotters
try:
from rich import print
except ImportError:
pass
async def main():
async with pybotters.Client(base_url="https://coincheck.com") as client:
store = pybotters.CoincheckDataStore()
await client.ws_connect(
"wss://ws-api.coincheck.com/",
send_json={"type": "subscribe", "channel": "btc_jpy-orderbook"},
hdlr_json=store.onmessage,
)
await store.initialize(
client.get("/api/order_books", params={"pair": "btc_jpy"})
)
while True:
orderbook = store.orderbook.sorted(limit=2)
print(orderbook)
await store.orderbook.wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Bybit¶
import asyncio
import pybotters
try:
from rich import print
except ImportError:
pass
async def main():
async with pybotters.Client() as client:
store = pybotters.BybitDataStore()
await client.ws_connect(
"wss://stream.bybit.com/v5/public/linear",
send_json={"op": "subscribe", "args": ["orderbook.50.BTCUSDT"]},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.orderbook.sorted(limit=2)
print(orderbook)
await store.orderbook.wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Binance¶
import asyncio
import pybotters
try:
from rich import print
except ImportError:
pass
async def main():
async with pybotters.Client(base_url="https://fapi.binance.com") as client:
store = pybotters.BinanceUSDSMDataStore()
await client.ws_connect(
"wss://fstream.binance.com/stream",
send_json={
"method": "SUBSCRIBE",
"params": ["btcusdt@depth"],
"id": 1,
},
hdlr_json=store.onmessage,
)
await store.initialize(
client.get("/fapi/v1/depth", params={"symbol": "BTCUSDT", "limit": "1000"})
)
while True:
orderbook = store.orderbook.sorted(limit=2)
print(orderbook)
await store.orderbook.wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
OKX¶
import asyncio
import pybotters
try:
from rich import print
except ImportError:
pass
async def main():
async with pybotters.Client() as client:
store = pybotters.OKXDataStore()
await client.ws_connect(
"wss://ws.okx.com:8443/ws/v5/public",
send_json={
"op": "subscribe",
"args": [{"channel": "books", "instId": "BTC-USDT"}],
},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.books.sorted(limit=2)
print(orderbook)
await store.books.wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Phemex¶
import asyncio
import pybotters
try:
from rich import print
except ImportError:
pass
async def main():
async with pybotters.Client() as client:
store = pybotters.PhemexDataStore()
await client.ws_connect(
"wss://ws.phemex.com",
send_json={
"id": 1234,
"method": "orderbook_p.subscribe",
"params": ["BTCUSDT"],
},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.orderbook.sorted(limit=2)
print(orderbook)
await store.orderbook.wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Bitget¶
import asyncio
import pybotters
try:
from rich import print
except ImportError:
pass
async def main():
async with pybotters.Client() as client:
store = pybotters.BitgetDataStore()
await client.ws_connect(
"wss://ws.bitget.com/mix/v1/stream",
send_json={
"op": "subscribe",
"args": [{"instType": "mc", "channel": "books", "instId": "BTCUSDT"}],
},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.orderbook.sorted(limit=2)
print(orderbook)
await store.orderbook.wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
KuCoin¶
import asyncio
import pybotters
try:
from rich import print
except ImportError:
pass
async def main():
async with pybotters.Client(base_url="https://api-futures.kucoin.com") as client:
store = pybotters.KuCoinDataStore()
await store.initialize(client.post("/api/v1/bullet-public"))
await client.ws_connect(
store.endpoint,
send_json={
"id": 1545910660739,
"type": "subscribe",
"topic": "/contractMarket/level2Depth50:XBTUSDTM",
"privateChannel": False,
"response": True,
},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.orderbook50.sorted(limit=2)
print(orderbook)
await store.orderbook50.wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass