Client API Threading

    BTS API provides two types of calls. There are RPC (remote procedure calls) which accept an argument and return a value and there are streams of tabular data which we call subscriptions. All communication to and from the server happens on a connection thread owned by the ServiceContext object and data must be marshaled back to the right context when the call is complete.

    RPC

    C#

    All API calls provide both a synchronous and asynchronous version. If you are calling the API from a WPF or Windows Forms application you should always prefer the async version of the method because it will efficiently make the call and resume execution on the correct thread without blocking or lagging user interactions. The synchronous calls are intended for use in console applications where nothing else needs use of the thread until the data is available.

    The async version of the method will return a Result<T> type which is used to signal if an error occurred during the call. You should test the Result.success to see if the call succeeded before using Result.value.

        var s = context.locate<api.user.AuthService>();
        var result = await s.get_users_async(new GetUsersRequest());
        if (result.success) {
            Console.WriteLine(result.value);
        }
        else {
            Console.WriteLine($"Error {result.error_message}");
        }
    

    C++

    All C++ API calls also provide both a blocking synchronous version and a callback based asynchronous version. In the asynchronous case the callback will execute on the ServiceContext thread and you will need to handle marshaling that data to the appropriate thread. Just like in c# the callback will receive a Result<T> which will indicate success or failure. Result.value returns an std::shared_ptr so you have control over the lifetime of the value.

    Subscriptions

    BTS provides streaming updates for some types of data. These updates are incremental in nature and will provide a snapshot of the current state on a new subscription. Streaming updates use the ReactiveX library to manage subscribing and unsubscribing. This library provides a rich set of operators that you can use to slide and dice the data streams based on your specific application requirements.

    C#

    The subscription callback will arrive on the ServiceContext thread but it can be easily marshaled to the thread of your choice using the ObserveOn operator. For a WPF or Windows Forms application the most common use of this will be ObserveOnDispatcher which will simply execute the callback in the context of the original calling thread. By your choice of where to place the ObserveOnDispatcher call in your reactive call chain you can choose whether other operators execute before after switching threads.

    In this example the Where and SelectMany operators will execute on ServiceContext connection thread and only the resulting matches will be marshaled across to the calling thread. It is safe to unsubscribe by disposing sub from any thread.

        var orders = context.locate<OrderService>();
        ObservableCollection<OrderRecord> orders_with_errors;
        IDisposable sub =
            orders
            .subscribe_orders()
            .SelectMany(update => update.added)
            .Where(o => o.State.Status == OrderStatus.Error)
            .ObserveOnDispatcher()
            .Subscribe(o => {
                orders_with_errors.Add(o);
            });
    

    C++

    The C++ subscription callbacks use the RxCpp library which implements the ReactiveX operators. The subscribe call will return an rxcpp::composite_subscription which implements RAII and will unsubscribe when it is destroyed.

    #include "lib/api/public/ServiceContext.h"
    #include "lib/api/services/OrderService.client.pb.h"
    #include <rxcpp/rx.hpp>
    #include <google/protobuf/util/json_util.h>
    //...
    bts::api::ServiceContext ctx;
    auto client = ctx.locate<bts::api::orderman::OrderService>();
    auto sub =
        client.subscribe_orders().subscribe([=](const bts::api::orderman::OrdersUpdate& update) {
            std::string result;
            for(const auto& order : update.added()) {
                auto status = google::protobuf::util::MessageToJsonString(*order, &result, {});
                std::cout << "New order: " << result << std::endl;
            }
        });