Pistache —— Promise

时间:2021-7-21 作者:qvyue

前言

Promise/Future是一种异步编程机制,第一次见是在openwhisk中,由scala语言提供的语言特性,加上对scala本身就不熟悉,可给我恶心坏了。后来发现原来C++中也有Promise的实现,功能上比较简单的。

而本文中的Promise,并非C++标准中的std::promise,而是Pistache为了进行异步编程,对 Promises/A+ 标准的实现,在功能上类似与JavaScript的Promise,关于JS的Promise,可以参考:Using Promises – JavaScript | MDN

本文的题目和之前写的不同,缺了“源码分析”四个字,因为的确是不好分析,Pistache实现这一部分用了将近1700行的代码,而且几乎都是复杂的模板类,对于我这样的C++小白,可读性太差了。好容易才从功能上摸的差不多,所以源码实现就不分析了,只求搞清楚怎么用,怎么个异步就行了。

定义

为了使用Promise,我们需要定一个Promise对象:

    Async::Promise p([success](Async::Resolver& resolve,
                                    Async::Rejection& reject) {
        doSomeAsync(success, resolve, reject);
        printf("2n");
    });

或者

    Async::Promise p1([success](Async::Deferred deferred) {
        doSomeAsync(success, deferred);
        printf("2n");
    });

根据我对源码的分析,两者其实并没有太的区别,Async::Deferred仅仅就是把Async::Resolver和Async::Rejection封装到了一个类里面。

可以看到,这是一个模板类:template Async::Promise 而模板的类型,就是Promise给我们的“保证”的值的类型。异步操作有没有完成的标志就是,是否填充了此值。而填充的方法就是使用Async::Resolver提供的接口。

定义Promise对象时,需要传入一个处理函数,此函数将执行异步操作,函数的参数是Async::Resolver和Async::Rejection,这是Promise提供的进行数据填充或者发生异常时调用的接口。

异步处理

我们用一个函数在 表示 异步操作,他们分别对应了两类接口定义:

void doSomeAsync(bool success, Async::Resolver& resolver, Async::Rejection& rejection)
{
    if (success)
        resolver(3);
    else
        rejection("3");
}

void doSomeAsync(const bool success, Async::Deferred& deferred)
{
    if (success)
        deferred.resolve(3);
    else
        deferred.reject("3");
}

成功时我们使用resolver(3)进行数据填充,当发生异常时,使用 rejection("3")发出异常信息,注意,在这里rejection接口,其实是调用了std::make_exception_ptr()进行了异常的封装,这部分可以参考我写的《学习 C++ 的异常处理》

回调函数

我们使用then()函数添加回调:

  • 如果添加回调时,数据已经填充或者异常,那么将直接执行回调
  • 填充数据或者产生异常时,如果存在回调,那么将直接执行回调

then()函数使用如下,需要传入两个函数,第一个用于处理数据成功填充时的回调,第二个处理发生异常时的回调,他们的参数分别是resolver填充的数据或者rejection抛出的异常,注意,异常使用了exception_ptr来进行传递。

p.then(
        [](int a) {
            printf("%dn", a);
        },
        [](std::exception_ptr& eptr) {
            try
            {
                std::rethrow_exception(eptr);
            }
            catch (const char* e)
            {
                printf("%sn", e);
            }
        });

执行顺序

下面是完整的代码,结果将按数字顺序(1->2->3->4)输出,这是为啥呢?

  • 函数main()开始执行,打印 “1”
  • 创建Promise对象,进入到异步处理流程,首先调用了doSomeAsync
    • 显然doSomeAsync()并不是异步操作,因此会直接执行resolver(3);
    • 此时并没有设置回调,因为then()还没开始执行呢,但是数据填充已经完成
  • doSomeAsync返回后,继续执行,打印 “2”
  • Promise对象p创建完毕
  • 添加回调函数then(),由于此时数据已经填充,那么将直接执行回调,打印 “3”
  • then() 执行结束,继续执行打印 “4”

那我们假设doSomeAsync()就是异步执行的,顺序会是怎么样的呢?(假设异步执行需要一定的时间才会填充数据):

  • 函数main()开始执行,打印 “1”
  • 创建Promise对象,进入到异步处理流程,首先调用了doSomeAsync
    • 因为doSomeAsync()是异步的,因此立刻返回
    • 线程继续执行,打印 “2”
  • Promise对象p创建完毕
  • 添加回调函数then(),因为数据没有填充,因此回调函数不会执行
  • then() 执行结束,继续执行打印 “4”
  • doSomeAsync() 经过一段时间的运行,终于填充了数据,此时回调函数已经设置,因此调用回调函数,打印 “3”

顺序为:1->2->4->3

完整代码:

void doSomeAsync(bool success, Async::Resolver& resolver, Async::Rejection& rejection)
{
    if (success)
        resolver(3);
    else
        rejection("3");
}

void doSomeAsync(const bool success, Async::Deferred &deferred)
{
    if (success)
        deferred.resolve(3);
    else
        deferred.reject("3");
}

int main()
{
    printf("1n");
    bool success = true;
    Async::Promise p([success](Async::Resolver& resolve,
                                    Async::Rejection& reject) {
        doSomeAsync(success, resolve, reject);
        printf("2n");
    });
    Async::Promise p1([success](Async::Deferred deferred) {
        doSomeAsync(success, deferred);
    });

    p.then(
        [](int a) {
            printf("%dn", a);
        },
        [](std::exception_ptr& eptr) {
            try
            {
                std::rethrow_exception(eptr);
            }
            catch (const char* e)
            {
                printf("%sn", e);
            }
        });
    printf("4");
}

实现异步

那如何实现异步的呢,我们可以使用多线程,或者是基于epoll和eventfd,后者就是Pistache实现异步写的方法,我们在Pistache源码分析 —— Server的初始化和请求处理中的peerQueue中提到过。

1、多线程

最简单的方法就是使用多线程,我们修改doSomeAsync()如下:

void doSomeAsync(bool success, Async::Resolver& resolver, Async::Rejection& rejection)
{
    if (success)
        std::thread([&resolver]() {
            sleep(1);
            resolver(3);
        }).detach();
    else
        std::thread ([&rejection]() {
            sleep(1);
            rejection("3");
        }).detach();
}
2、epoll+eventfd

见《Pistache源码分析 ——PollableQueue类》和《Pistache源码分析 —— 异步写机制》

声明:本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:qvyue@qq.com 进行举报,并提供相关证据,工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。