pybotters.helpers.bitbank

Helper functions for bitbank Private Stream API and PubNub.

async pybotters.helpers.bitbank.subscribe_with_callback(client: pybotters.Client, callback: Callable[[Any], object], /, channel: str | None = None, token: str | None = None, tt: str | None = None, tr: str | None = None, *, bitbank_server: str = 'https://api.bitbank.cc', pubnub_server: str = 'https://ps.pndsn.com', subscribe_key: str = 'sub-c-ecebae8e-dd60-11e6-b6b1-02ee2ddab7fe') None

Subscribe to bitbank Private Stream API and handle a callback.

サンプル

import asyncio
import os
from contextlib import suppress

import pybotters
from pybotters.helpers.bitbank import subscribe_with_callback

BITBANK_API_KEY = os.environ["BITBANK_API_KEY"]
BITBANK_API_SECRET = os.environ["BITBANK_API_SECRET"]

apis = {"bitbank": [BITBANK_API_KEY, BITBANK_API_SECRET]}

async def main():
    async with pybotters.Client(apis=apis) as client:
        store = pybotters.bitbankPrivateDataStore()
        task = asyncio.create_task(subscribe_with_callback(client, store.onmessage))
        try:
            # Example: Watch active orders ...
            with store.spot_order.watch() as stream:
                async for _ in stream:
                    print(store.spot_order.find())
        finally:
            task.cancel()
            await asyncio.gather(task, return_exceptions=True)


if __name__ == "__main__":
    with suppress(KeyboardInterrupt):
        asyncio.run(main())
async pybotters.helpers.bitbank.subscribe(client: pybotters.Client, /, channel: str | None = None, token: str | None = None, tt: str | None = None, tr: str | None = None, *, bitbank_server: str = 'https://api.bitbank.cc', pubnub_server: str = 'https://ps.pndsn.com', subscribe_key: str = 'sub-c-ecebae8e-dd60-11e6-b6b1-02ee2ddab7fe') AsyncGenerator[SubscribeResponse]

Subscribe to bitbank Private Stream API.

This function yields SubscribeResponse.

It automatically fetches PubNub credentials (channel and token) from bitbank if not provided. It also refetches the token if it expires.

サンプル

import asyncio
import os
from contextlib import suppress

import pybotters
from pybotters.helpers.bitbank import subscribe

BITBANK_API_KEY = os.environ["BITBANK_API_KEY"]
BITBANK_API_SECRET = os.environ["BITBANK_API_SECRET"]

apis = {"bitbank": [BITBANK_API_KEY, BITBANK_API_SECRET]}

async def main():
    async with pybotters.Client(apis=apis) as client:
        async for message in subscribe(client):
            print(message)

if __name__ == "__main__":
    with suppress(KeyboardInterrupt):
        asyncio.run(main())
async pybotters.helpers.bitbank.fetch_channel_and_token(client: pybotters.Client, /, *, bitbank_server: str = 'https://api.bitbank.cc') BaseResponse

Fetch channel and token from bitbank.

GET /user/subscribe

https://github.com/bitbankinc/bitbank-api-docs/blob/master/rest-api.md#private-stream

async pybotters.helpers.bitbank.fetch_subscribe(clinet: pybotters.Client, /, channel: str, token: str | None = None, tt: str | None = None, tr: str | None = None, *, pubnub_server: str = 'https://ps.pndsn.com', subscribe_key: str = 'sub-c-ecebae8e-dd60-11e6-b6b1-02ee2ddab7fe') SubscribeResponse

Fetch bitbank subscription messages from PubNub.

GET /v2/subscribe/:sub_key/:channel/:callback

https://www.pubnub.com/docs/sdks/rest-api/subscribe-v-2

Functions

fetch_channel_and_token(client, /, *[, ...])

Fetch channel and token from bitbank.

fetch_subscribe(clinet, /, channel[, token, ...])

Fetch bitbank subscription messages from PubNub.

subscribe(client, /[, channel, token, tt, ...])

Subscribe to bitbank Private Stream API.

subscribe_with_callback(client, callback, /)

Subscribe to bitbank Private Stream API and handle a callback.

Classes

BaseResponse

bitbank base response.

ChannelAndToken

bitbank response for /v1/user/subscribe.

SubscribeMessage

The "m" field in SubscribeResponse.

SubscribeResponse

PubNub Subscribe response.

TimeToken

The "t" field for SubscribeResponse.