加入收藏 | 设为首页 | 会员中心 | 我要投稿 宜春站长网 (https://www.0795zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

Reactive-MongoDB异步Java Driver解读

发布时间:2021-12-22 16:49:37 所属栏目:大数据 来源:互联网
导读:从3.0 版本开始,MongoDB 开始提供异步方式的驱动(Java Async Driver),这为应用提供了一种更高性能的选择。 但实质上,使用同步驱动(Java Sync Driver)的项目也不在少数,或许是因为先入为主的原因(同步Driver的文档说明更加的完善),又或者是为了兼容旧的 M
从3.0 版本开始,MongoDB 开始提供异步方式的驱动(Java Async Driver),这为应用提供了一种更高性能的选择。
 
但实质上,使用同步驱动(Java Sync Driver)的项目也不在少数,或许是因为先入为主的原因(同步Driver的文档说明更加的完善),又或者是为了兼容旧的 MongoDB 版本。
 
无论如何,由于 Reactive 的发展,未来使用异步驱动应该是一个趋势。
 
在使用 Async Driver 之前,需要对 Reactive 的概念有一些熟悉。
 
理解 Reactive (响应式)
 
响应式(Reactive)是一种异步的、面向数据流的开发方式,最早是来自于.NET 平台上的 Reactive Extensions 库,随后被扩展为各种编程语言的实现。
 
在著名的 Reactive Manifesto(响应式宣言) 中,对 Reactive 定义了四个特征:
 
 
 
及时响应(Responsive):系统能及时的响应请求。
 
有韧性(Resilient):系统在出现异常时仍然可以响应,即支持容错。
有弹性(Elastic):在不同的负载下,系统可弹性伸缩来保证运行。
消息驱动(Message Driven):不同组件之间使用异步消息传递来进行交互,并确保松耦合及相互隔离。
在响应式宣言的所定义的这些系统特征中,无一不与响应式的流有若干的关系,于是乎就有了 2013年发起的 响应式流规范(Reactive Stream Specification)。
 
https://www.reactive-streams.org/
 
其中,对于响应式流的处理环节又做了如下定义:
 
具有处理无限数量的元素的能力,即允许流永不结束
按序处理
异步地传递元素
实现非阻塞的负压(back-pressure)
Java 平台则是在 JDK 9 版本上发布了对 Reactive Streams 的支持。
 
下面介绍响应式流的几个关键接口:
 
Publisher
Publisher 是数据的发布者。Publisher 接口只有一个方法 subscribe,用于添加数据的订阅者,也就是 Subscriber。
 
Subscriber
Subscriber 是数据的订阅者。Subscriber 接口有4个方法,都是作为不同事件的处理器。在订阅者成功订阅到发布者之后,其 onSubscribe(Subscription s) 方法会被调用。
 
Subscription 表示的是当前的订阅关系。
 
当订阅成功后,可以使用 Subscription 的 request(long n) 方法来请求发布者发布 n 条数据。发布者可能产生3种不同的消息通知,分别对应 Subscriber 的另外3个回调方法。
 
数据通知:对应 onNext 方法,表示发布者产生的数据。
 
错误通知:对应 onError 方法,表示发布者产生了错误。
 
结束通知:对应 onComplete 方法,表示发布者已经完成了所有数据的发布。
 
在上述3种通知中,错误通知和结束通知都是终结通知,也就是在终结通知之后,不会再有其他通知产生。
 
Subscription
Subscription 表示的是一个订阅关系。除了之前提到的 request 方法之外,还有 cancel 方法用来取消订阅。需要注意的是,在 cancel 方法调用之后,发布者仍然有可能继续发布通知。但订阅最终会被取消。

 
MongoDB 的异步驱动为 mongo-java-driver-reactivestreams 组件,其实现了 Reactive Stream 的上述接口。
 
> 除了 reactivestream 之外,MongoDB 的异步驱动还包含 RxJava 等风格的版本,有兴趣的读者可以进一步了解
 
http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/
 
使用示例
 
接下来,通过一个简单的例子来演示一下 Reactive 方式的代码风格:
 
A. 引入依赖
 
org.mongodb
   mongodb-driver-reactivestreams
   1.11.0
> 引入mongodb-driver-reactivestreams 将会自动添加 reactive-streams, bson, mongodb-driver-async组件
 
B. 连接数据库
 
//服务器实例表List servers =newArrayList();
servers.add(newServerAddress("localhost",27018));//配置构建器MongoClientSettings.Builder settingsBuilder =MongoClientSettings.builder();//传入服务器实例
settingsBuilder.applyToClusterSettings(
        builder -> builder.hosts(servers));//构建 Client 实例MongoClient mongoClient =MongoClients.create(settingsBuilder.build());
C. 实现文档查询
 
//获得数据库对象MongoDatabase database = client.getDatabase(databaseName);//获得集合MongoCollection collection = database.getCollection(collectionName);//异步返回PublisherFindPublisher publisher = collection.find();//订阅实现
publisher.subscribe(newSubscriber(){
    @Override
    publicvoid onSubscribe(Subscription s){
        System.out.println("start...");
        //执行请求
        s.request(Integer.MAX_VALUE);
 
    }
    @Override
    publicvoid onNext(Document document){
        //获得文档
        System.out.println("Document:"+ document.toJson());
    }
 
    @Override
    publicvoid onError(Throwable t){
        System.out.println("error occurs.");
    }
 
    @Override
    publicvoid onComplete(){
        System.out.println("finished.");
    }});
注意到,与使用同步驱动不同的是,collection.find()方法返回的不是 Cursor,而是一个 FindPublisher对象,这是Publisher接口的一层扩展。
 
而且,在返回 Publisher 对象时,此时并没有产生真正的数据库IO请求。真正发起请求需要通过调用 Subscription.request()方法。
 
在上面的代码中,为了读取由 Publisher 产生的结果,通过自定义一个Subscriber,在onSubscribe 事件触发时就执行 数据库的请求,之后分别对 onNext、onError、onComplete进行处理。
 
尽管这种实现方式是纯异步的,但在使用上比较繁琐。试想如果对于每个数据库操作都要完成一个Subscriber 逻辑,那么开发的工作量是巨大的。
 
为了尽可能复用重复的逻辑,可以对Subscriber的逻辑做一层封装,包含如下功能:
 
使用 List 容器对请求结果进行缓存
实现阻塞等待结果的方法,可指定超时时间
捕获异常,在等待结果时抛出
代码如下:
 
publicclassObservableSubscriberimplementsSubscriber{
 
    //响应数据
    privatefinalList received;
    //错误信息
    privatefinalList errors;
    //等待对象
    privatefinalCountDownLatch latch;
    //订阅器
    privatevolatileSubscription subscription;
    //是否完成
    privatevolatileboolean completed;
 
    publicObservableSubscriber(){
        this.received =newArrayList();
        this.errors =newArrayList();
        this.latch =newCountDownLatch(1);
    }
 
    @Override
    publicvoid onSubscribe(finalSubscription s){
        subscription = s;
    }
 
    @Override
    publicvoid onNext(final T t){
        received.add(t);
    }
 
    @Override
    publicvoid onError(finalThrowable t){
        errors.add(t);
        onComplete();
    }
 
    @Override
    publicvoid onComplete(){
        completed =true;
        latch.countDown();
    }
 
    publicSubscription getSubscription(){
        return subscription;
    }
 
    publicList getReceived(){
        return received;
    }
 
    publicThrowable getError(){
        if(errors.size()>0){
            return errors.get(0);
        }
        returnnull;
    }
 
    publicboolean isCompleted(){
        return completed;
    }
 
    /**
     * 阻塞一定时间等待结果
     *
     * @param timeout
     * @param unit
     * @return
     * @throws Throwable
     */
    publicListget(finallong timeout,finalTimeUnit unit)throwsThrowable{
        return await(timeout, unit).getReceived();
    }
 
    /**
     * 一直阻塞等待请求完成
     *
     * @return
     * @throws Throwable
     */
    publicObservableSubscriber await()throwsThrowable{
        return await(Long.MAX_VALUE,TimeUnit.MILLISECONDS);
    }
 
    /**
     * 阻塞一定时间等待完成
     *
     * @param timeout
     * @param unit
     * @return
     * @throws Throwable
     */
    publicObservableSubscriber await(finallong timeout,finalTimeUnit unit)throwsThrowable{
        subscription.request(Integer.MAX_VALUE);
        if(!latch.await(timeout, unit)){
            thrownewMongoTimeoutException("Publisher onComplete timed out");
        }
        if(!errors.isEmpty()){
            throw errors.get(0);
        }
        returnthis;
    }}
借助这个基础的工具类,我们对于文档的异步操作就变得简单多了。
 
比如对于文档查询的操作可以改造如下:
 
ObservableSubscriber subscriber =newObservableSubscriber();
collection.find().subscribe(subscriber);//结果处理
subscriber.get(15,TimeUnit.SECONDS).forEach( d ->{
    System.out.println("Document:"+ d.toJson());});
当然,这个例子还有可以继续完善,比如使用 List 作为缓存,则要考虑数据量的问题,避免将全部(或超量) 的文档一次性转入内存。

(编辑:宜春站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读