Examples¶
このページでは pybotters のサンプルコードを紹介します。
注釈
ここのサンプルコードでは rich ライブラリの print を利用しています。
rich はターミナル上に美しい形式のテキストを表示してくれるライブラリです。
API で取得した JSON を rich.print すると、人間に分かりやすく綺麗に表示してくれるのでとても役に立ちます。
別途インストールすることをおすすめします。
pip install rich
Trading bot¶
pybotters で WebSocket API と DataStore を利用した bitFlyer トレード bot の基礎的な構成例です。
説明はコードの下にあります。
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
import os
from contextlib import suppress
from dataclasses import dataclass
from typing import Literal
import pybotters
with suppress(ImportError):
from rich import print
@dataclass
class OrderCondition:
"""Order condition"""
is_execute: bool
side: Literal["BUY", "SELL"] | None
price: float | None
size: float | None
def your_awesome_algorithm(childorders, positions, board, ticker, executions):
"""Calculate the ordering conditions with your algorithm
You should do the following:
1. Calculate limit price, ordrer side and size from board, ticker and executions
2. Judge whether or not to execute from childorders and positions
"""
...
# Mock algorithm: Buy at best bid - 1000 if there are no existing orders
price = round(ticker["best_bid"] - 1000.0)
is_execute = True if not len(childorders) else False
return OrderCondition(is_execute=is_execute, side="BUY", price=price, size=0.01)
async def main():
"""Main function"""
product_code = os.getenv("PRODUCT_CODE", "FX_BTC_JPY")
apis = {
"bitflyer": [os.getenv("BITFLYER_API_KEY"), os.getenv("BITFLYER_API_SECRET")]
}
base_url = "https://api.bitflyer.com"
async with pybotters.Client(apis=apis, base_url=base_url) as client:
store = pybotters.bitFlyerDataStore()
print("Initializing DataStore from HTTP ...")
await store.initialize(
client.get("/v1/me/getchildorders", params={"product_code": product_code}),
client.get("/v1/me/getpositions", params={"product_code": product_code}),
)
print("Connecting to WebSocket ...")
await client.ws_connect(
"wss://ws.lightstream.bitflyer.com/json-rpc",
send_json=[
{
"method": "subscribe",
"params": {"channel": "child_order_events"},
"id": 1,
},
{
"method": "subscribe",
"params": {"channel": f"lightning_board_snapshot_{product_code}"},
"id": 2,
},
{
"method": "subscribe",
"params": {"channel": f"lightning_board_{product_code}"},
"id": 3,
},
{
"method": "subscribe",
"params": {"channel": f"lightning_ticker_{product_code}"},
"id": 4,
},
{
"method": "subscribe",
"params": {"channel": f"lightning_executions_{product_code}"},
"id": 5,
},
],
hdlr_json=store.onmessage,
)
print("Waiting snapshot data from WebSocket ...")
while not (len(store.board) and len(store.ticker)):
await store.wait()
print("Start main loop")
while True:
# Retrieve data from DataStore
childorders = store.childorders.find({"product_code": product_code})
positions = store.positions.find({"product_code": product_code})
board = store.board.sorted({"product_code": product_code})
ticker = store.ticker.get({"product_code": product_code})
executions = store.executions.find({"product_code": product_code})
# Calculate the ordering conditions with your algorithm
condition = your_awesome_algorithm(
childorders, positions, board, ticker, executions
)
print(condition)
"""Example:
OrderCondition(
is_execute=True,
side="BUY",
price=1234567.0,
size=0.01,
)
"""
if condition.is_execute:
# Execute order
with store.childorderevents.watch() as stream:
print("Sending order ...")
r = await client.fetch(
"POST",
"/v1/me/sendchildorder",
data={
"product_code": product_code,
"child_order_type": "LIMIT",
"side": condition.side,
"price": condition.price,
"size": condition.size,
},
)
print(f"<Reponse [{r.response.status} {r.response.reason}]>")
print(r.data)
if r.response.ok and "child_order_acceptance_id" in r.data:
print("Waiting for DataStore updates ...")
child_order_acceptance_id = r.data["child_order_acceptance_id"]
async for change in stream:
if (
change.data["child_order_acceptance_id"]
== child_order_acceptance_id
):
break
print("Waiting for next loop ...")
await asyncio.sleep(1.0)
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
Workflow¶
このサンプルコードは以下のような処理フローとなっています。
環境変数から API 認証情報などを取得する
DataStore を生成して HTTP API で初期化する
WebSocket に接続する
WebSocket からの初期データを待機する
メインループを開始する
メインループは、以下のような処理フローとなっています。
DataStore から各種データを取得する
- アルゴリズム関数にデータを入力して、注文条件を計算する
注釈
アルゴリズムはトレード bot の損益を左右する最も重要な部分です。 あなたのトレード戦略を実装しましょう!
サンプルコードでは「既存の注文がない場合、ベスト Bid - 1000 に買い指値を置く」というアルゴリズムになっています。
- 注文の執行条件が真の場合、注文を執行する
注文イベントの変更ストリームを開く
HTTP API で注文を送信する
- HTTP API レスポンスが正常なら、DataStore のデータを同期するため待機します
注文イベントの変更ストリームを開始して、データを取得するまで待機する
取得したデータの注文受付 ID が HTTP API と同じ ID ならストリームを抜ける
注釈
DataStore の注文やポジションを利用してアルゴリズムで注文条件を管理している場合、この待機処理は重要です 。
HTTP API で注文した結果は WebSocket の注文イベントとして流れてきます。 その注文イベントによって DataStore のデータが更新されます。 しかしその WebSocket の注文イベントはいつ流れてくるか分かりません。 このサンプルコードでは 1 秒後に次のループに移ります。 bitFlyer の取引マッチングエンジンが混雑している場合、 1 秒より後に流れてくるかもしれません。 そうすると、DataStore が同期されていないので次のループでは 既存の注文がない ことになります。 サンプルコードでのアルゴリズムは、既存の注文がない場合指値注文をするので 重複注文が発生することになります 。
WebSocket を利用しつつ注文やポジションを厳密に管理するアプローチでは、このようにして待機処理を行います。 もちろん、このように厳密に管理せず小口で注文するようなアプローチも考えられるので、あなたのトレード戦略にあったアプローチを検討してください。
1 秒待機して次のループに移る
Place Order¶
Hyperliquid¶
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters>=1.7.0",
# "rich",
# ]
# ///
import asyncio
import os
from decimal import Decimal
from pprint import pprint
import pybotters
# Your private key: 0x00000...
HYPERLIQUID_TESTNET_PRIVATE_KEY = os.environ["HYPERLIQUID_TESTNET_PRIVATE_KEY"]
def normalize_number(number: str) -> str:
"""Normalize a number.
e.g. "3300.0" -> "3300"
Hyperliquid API expects normalized numbers. Otherwise, `L1 error` will occur.
"""
return format(Decimal(number).normalize(), "f")
async def main() -> None:
"""Place a limit order in Hyperliquid Testnet.
[!WARNING]
Please change the private key, target asset, and limit price, etc.
Default: ETH, buy, 3300, 0.1
"""
apis = {
# Mainnet is "hyperliquid"
"hyperliquid_testnet": [HYPERLIQUID_TESTNET_PRIVATE_KEY],
}
base_url = "https://api.hyperliquid-testnet.xyz"
async with pybotters.Client(apis=apis, base_url=base_url) as client:
# Retrieve perpetuals metadata
# https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/info-endpoint/perpetuals#retrieve-perpetuals-metadata
r = await client.fetch(
"POST",
"/info",
data={"type": "meta"},
)
if not r.data:
raise r.data.error
# Construct asset index
asset_index: dict[str, int] = {
x["name"]: i for i, x in enumerate(r.data["universe"])
}
"""e.g.
{"BTC": 3, "ETH": 4, ...: ...}
"""
# Place an order
# https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/exchange-endpoint#place-an-order
r = await client.fetch(
"POST",
"/exchange",
# [!IMPORTANT]
# You can omit "nonce" and "signature". They are automatically populated by pybotters 🪄
data={
"action": {
"type": "order",
"orders": [
{
"a": asset_index["ETH"], # asset
"b": True, # is_buy
"p": normalize_number("3300.0"), # limit_px
"s": normalize_number("0.1"), # size
"r": False, # reduce_only
"t": {"limit": {"tif": "Gtc"}}, # order_type
}
],
"grouping": "na",
},
},
)
pprint(r.data)
if __name__ == "__main__":
asyncio.run(main())
Order Book watching¶
対応取引所の 取引所固有の DataStore を利用した板情報を監視するサンプルコードです。
このサンプルコードでは以下について理解できるでしょう。
それぞれの取引所に WebSocket 接続する
WebSocket でチャンネルを購読する
Order Book 系ストアの扱い方
Order Book 系ストアのデータ構造
bitFlyer¶
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
from contextlib import suppress
import pybotters
with suppress(ImportError):
from rich import print
async def main():
async with pybotters.Client() as client:
store = pybotters.bitFlyerDataStore()
await client.ws_connect(
"wss://ws.lightstream.bitflyer.com/json-rpc",
send_json=[
{
"method": "subscribe",
"params": {"channel": "lightning_board_snapshot_FX_BTC_JPY"},
"id": 1,
},
{
"method": "subscribe",
"params": {"channel": "lightning_board_FX_BTC_JPY"},
"id": 2,
},
],
hdlr_json=store.onmessage,
)
while True:
orderbook = store.board.sorted(limit=2)
print(orderbook)
await store.board.wait()
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
GMO Coin¶
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
from contextlib import suppress
import pybotters
with suppress(ImportError):
from rich import print
async def main():
async with pybotters.Client() as client:
store = pybotters.GMOCoinDataStore()
await client.ws_connect(
"wss://api.coin.z.com/ws/public/v1",
send_json={
"command": "subscribe",
"channel": "orderbooks",
"symbol": "BTC_JPY",
},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.orderbooks.sorted(limit=2)
print(orderbook)
await store.orderbooks.wait()
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
bitbank¶
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
from contextlib import suppress
import pybotters
with suppress(ImportError):
from rich import print
async def main():
async with pybotters.Client() as client:
store = pybotters.bitbankDataStore()
await client.ws_connect(
"wss://stream.bitbank.cc/socket.io/?EIO=3&transport=websocket",
send_str=[
'42["join-room","depth_whole_btc_jpy"]',
'42["join-room","depth_diff_btc_jpy"]',
],
hdlr_str=store.onmessage,
)
while True:
orderbook = store.depth.sorted(limit=2)
print(orderbook)
await store.depth.wait()
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
Coincheck¶
Coincheck の orderbook example は、ドキュメントで説明されていない
GET /api/order_books?version=1.0 と
{"type": "subscribe", "channel": "[pair]-limit-range-orderbook"}
を利用しています。これらの payload には sequence_number が含まれるため、
REST snapshot と WebSocket 更新を整合させて扱えます。
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
from contextlib import suppress
import pybotters
with suppress(ImportError):
from rich import print
async def main():
async with pybotters.Client(base_url="https://coincheck.com") as client:
store = pybotters.CoincheckDataStore()
await client.ws_connect(
"wss://ws-api.coincheck.com/",
send_json={"type": "subscribe", "channel": "btc_jpy-limit-range-orderbook"},
hdlr_json=store.onmessage,
)
await store.initialize(
client.get("/api/order_books", params={"pair": "btc_jpy", "version": "1.0"})
)
while True:
orderbook = store.orderbook.sorted(limit=2)
print(orderbook)
await store.orderbook.wait()
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
Bybit¶
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
from contextlib import suppress
import pybotters
with suppress(ImportError):
from rich import print
async def main():
async with pybotters.Client() as client:
store = pybotters.BybitDataStore()
await client.ws_connect(
"wss://stream.bybit.com/v5/public/linear",
send_json={"op": "subscribe", "args": ["orderbook.50.BTCUSDT"]},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.orderbook.sorted(limit=2)
print(orderbook)
await store.orderbook.wait()
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
Binance¶
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
from contextlib import suppress
import pybotters
with suppress(ImportError):
from rich import print
async def main():
async with pybotters.Client(base_url="https://fapi.binance.com") as client:
store = pybotters.BinanceUSDSMDataStore()
await client.ws_connect(
"wss://fstream.binance.com/stream",
send_json={
"method": "SUBSCRIBE",
"params": ["btcusdt@depth"],
"id": 1,
},
hdlr_json=store.onmessage,
)
await store.initialize(
client.get("/fapi/v1/depth", params={"symbol": "BTCUSDT", "limit": "1000"})
)
while True:
orderbook = store.orderbook.sorted(limit=2)
print(orderbook)
await store.orderbook.wait()
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
OKX¶
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
from contextlib import suppress
import pybotters
with suppress(ImportError):
from rich import print
async def main():
async with pybotters.Client() as client:
store = pybotters.OKXDataStore()
await client.ws_connect(
"wss://ws.okx.com:8443/ws/v5/public",
send_json={
"op": "subscribe",
"args": [{"channel": "books", "instId": "BTC-USDT"}],
},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.books.sorted(limit=2)
print(orderbook)
await store.books.wait()
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
Phemex¶
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
from contextlib import suppress
import pybotters
with suppress(ImportError):
from rich import print
async def main():
async with pybotters.Client() as client:
store = pybotters.PhemexDataStore()
await client.ws_connect(
"wss://ws.phemex.com",
send_json={
"id": 1234,
"method": "orderbook_p.subscribe",
"params": ["BTCUSDT"],
},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.orderbook.sorted(limit=2)
print(orderbook)
await store.orderbook.wait()
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
Bitget¶
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
from contextlib import suppress
import pybotters
with suppress(ImportError):
from rich import print
async def main():
async with pybotters.Client() as client:
store = pybotters.BitgetV2DataStore()
await client.ws_connect(
"wss://ws.bitget.com/v2/ws/public",
send_json={
"op": "subscribe",
"args": [
{
"instType": "USDT-FUTURES",
"channel": "books",
"instId": "BTCUSDT",
}
],
},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.book.sorted(limit=2)
print(orderbook)
await store.book.wait()
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
KuCoin¶
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
from contextlib import suppress
import pybotters
with suppress(ImportError):
from rich import print
async def main():
async with pybotters.Client(base_url="https://api-futures.kucoin.com") as client:
store = pybotters.KuCoinDataStore()
await store.initialize(client.post("/api/v1/bullet-public"))
await client.ws_connect(
store.endpoint,
send_json={
"id": 1545910660739,
"type": "subscribe",
"topic": "/contractMarket/level2Depth50:XBTUSDTM",
"privateChannel": False,
"response": True,
},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.orderbook50.sorted(limit=2)
print(orderbook)
await store.orderbook50.wait()
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
BitMEX¶
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
from contextlib import suppress
import pybotters
with suppress(ImportError):
from rich import print
async def main():
async with pybotters.Client() as client:
store = pybotters.BitMEXDataStore()
await client.ws_connect(
"wss://ws.bitmex.com/realtime",
send_json={
"op": "subscribe",
"args": ["orderBookL2_25:XBTUSD"],
},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.orderbook.sorted(limit=2)
print(orderbook)
await store.orderbook.wait()
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
Hyperliquid¶
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
from contextlib import suppress
import pybotters
with suppress(ImportError):
from rich import print
async def main():
async with pybotters.Client() as client:
store = pybotters.HyperliquidDataStore()
await client.ws_connect(
"wss://api.hyperliquid.xyz/ws",
send_json={
"method": "subscribe",
"subscription": {"type": "l2Book", "coin": "ETH"},
},
hdlr_json=store.onmessage,
)
while True:
orderbook = store.l2_book.sorted(limit=2)
print(orderbook)
await store.l2_book.wait()
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
Open orders¶
Coincheck¶
Coincheck の WebSocket は新規発注のイベントが配信されない仕様となっています。
次のように専用のメソッド feed_response() を使って REST API のレスポンスを DataStore に取り込みます。
警告
このサンプルは実際に少額の注文を発注します。
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters>=1.10",
# ]
# ///
import asyncio
import contextlib
import os
import pybotters
COINCHECK_API_KEY = os.environ["COINCHECK_API_KEY"]
COINCHECK_API_SECRET = os.environ["COINCHECK_API_SECRET"]
async def main() -> None:
apis = {
"coincheck": [COINCHECK_API_KEY, COINCHECK_API_SECRET],
}
base_url = "https://coincheck.com"
async with pybotters.Client(apis=apis, base_url=base_url) as client:
store = pybotters.CoincheckPrivateDataStore()
# Connect to WebSocket
await client.ws_connect(
"wss://stream.coincheck.com",
send_json={"type": "subscribe", "channels": ["order-events"]},
hdlr_json=store.onmessage,
)
# Initialize open orders
await store.initialize(
client.get("/api/exchange/orders/opens"),
)
# Fetch ticker
r = await client.fetch(
"GET",
"/api/ticker",
params={"pair": "btc_jpy"},
)
assert r.data
ask = r.data["ask"]
bid = r.data["bid"]
print(f"ask: {ask}, bid: {bid}")
# Place new order on the bid side for btc_jpy
r = await client.fetch(
"POST",
"/api/exchange/orders",
data={
"pair": "btc_jpy",
"order_type": "buy",
"rate": str(bid),
"amount": "0.001",
},
)
assert r.data
print(r.data)
# > [!IMPORTANT]
# Feed order response
store.order.feed_response(r.data)
# Watch for order changes
with store.order.watch() as stream:
async for change in stream:
result = store.order.find()
if len(result) == 0:
print("Executed orders!", change)
break
if __name__ == "__main__":
with contextlib.suppress(KeyboardInterrupt):
asyncio.run(main())
Helpers¶
GMO Coin¶
helpers.GMOCoinHelper を利用したサンプルコードです。
manage_ws_token() を利用することで、Private WebSocket のアクセストークン を管理します。
デフォルトでは 5 分ごとにアクセストークンを延長します。 延長が失敗した場合は、アクセストークンを新しく作成します。
このメソッドは無限ループとなっているので、 asyncio.create_task() でタスクとしてスケジュールしてください。
以下は適当なチャンネルを購読して、アクセストークン管理ヘルパーのタスクをスケジュールするサンプルコードです。
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters",
# "rich",
# ]
# ///
import asyncio
import os
from contextlib import suppress
import pybotters
from pybotters.helpers import GMOCoinHelper
with suppress(ImportError):
from rich import print
async def main():
apis = {
"gmocoin": [
os.getenv("GMOCOIN_API_KEY", ""),
os.getenv("GMOCOIN_API_SECRET", ""),
],
}
async with pybotters.Client(apis=apis) as client:
store = pybotters.GMOCoinDataStore()
# Create a helper instance for GMOCoin.
gmohelper = GMOCoinHelper(client)
# Alias for POST /private/v1/ws-auth .
token = await gmohelper.create_access_token()
ws = client.ws_connect(
# Build the Private WebSocket URL.
f"wss://api.coin.z.com/ws/private/v1/{token}",
send_json={
"command": "subscribe",
"channel": "positionSummaryEvents",
"option": "PERIODIC",
},
hdlr_json=store.onmessage,
)
# Create a task to manage WebSocket URL and access token.
asyncio.create_task(
gmohelper.manage_ws_token(ws, token),
)
with store.position_summary.watch() as stream:
async for change in stream:
print(change.data)
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())
bitbank¶
bitbankPrivateDataStore と組み込みの PubNub クライアント pybotters.helpers.bitbank を利用したサンプルコードです。
このサンプルコードでは、まず REST API を利用してアクティブオーダーを初期化します。 その後組み込みの PubNub クライアントを利用して Private Stream API を非同期でサブスクライブします。 そして、DataStore の watch 機能を利用してアクティブオーダーの変更を監視します。
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pybotters>=1.8",
# ]
# ///
import asyncio
import os
from contextlib import suppress
import pybotters
from pybotters.helpers.bitbank import subscribe_with_callback
BITBANK_API_KEY = os.environ["BITBANK_API_KEY"]
BITBANK_API_SECRET = os.environ["BITBANK_API_SECRET"]
async def main():
apis = apis = {"bitbank": [BITBANK_API_KEY, BITBANK_API_SECRET]}
base_url = "https://api.bitbank.cc/v1"
async with pybotters.Client(apis=apis, base_url=base_url) as client:
store = pybotters.bitbankPrivateDataStore()
await store.initialize(client.get("/user/spot/active_orders"))
task = asyncio.create_task(subscribe_with_callback(client, store.onmessage))
try:
# Example: Watch active orders ...
with store.spot_order.watch() as stream:
async for _ in stream:
print(store.spot_order.find())
finally:
task.cancel()
await asyncio.gather(task, return_exceptions=True)
if __name__ == "__main__":
with suppress(KeyboardInterrupt):
asyncio.run(main())