Java:使用异步编程优化应用程序

我必须修改dropwizard应用程序以改善其运行时间。 基本上,该应用程序每天接收大约300万个URL,并下载并解析它们以检测恶意内容。 问题是该应用程序只能处理100万个URL。 当我查看应用程序时,我发现它正在进行大量的顺序调用。 我想要一些关于如何通过使其成为异步或其他技术来改进应用程序的建议。

所需代码如下: –

/* Scheduler */ private long triggerDetection(String startDate, String endDate) { for (UrlRequest request : urlRequests) { if (!validateRequests.isWhitelisted(request)) { ContentDetectionClient.detectContent(request); } } } /* Client */ public void detectContent(UrlRequest urlRequest){ Client client = new Client(); URI uri = buildUrl(); /* It returns the URL of this dropwizard application's resource method provided below */ ClientResponse response = client.resource(uri) .type(MediaType.APPLICATION_JSON_TYPE) .post(ClientResponse.class, urlRequest); Integer status = response.getStatus(); if (status >= 200 && status < 300) { log.info("Completed request for url: {}", urlRequest.getUrl()); }else{ log.error("request failed for url: {}", urlRequest.getUrl()); } } private URI buildUrl() { return UriBuilder .fromPath(uriConfiguration.getUrl()) .build(); } /* Resource Method */ @POST @Path("/pageDetection") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) /** * Receives the url of the publisher, crawls the content of that url, applies a detector to check if the content is malicious. * @returns returns the probability of the page being malicious * @throws throws exception if the crawl call failed **/ public DetectionScore detectContent(UrlRequest urlRequest) throws Exception { return contentAnalysisOrchestrator.detectContentPage(urlRequest); } /* Orchestrator */ public DetectionScore detectContentPage(UrlRequest urlRequest) { try { Pair response = crawler.rawLoad(urlRequest.getUrl()); String content = response.getValue().text(); DetectionScore detectionScore = detector.getProbability(urlRequest.getUrl(), content); contentDetectionResultDao.insert(urlRequest.getAffiliateId(), urlRequest.getUrl(),detectionScore.getProbability()*1000, detectionScore.getRecommendation(), urlRequest.getRequestsPerUrl(), -1, urlRequest.getCreatedAt() ); return detectionScore; } catch (IOException e) { log.info("Error while analyzing the url : {}", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } } 

我在考虑以下方法: –

  • 我没有通过POST调用dropwizard资源方法,而是直接从调度程序调用orchestrator.detectContent(urlRequest)

  • 协调器可以返回detectionScore,我将所有的detectScores存储在一个map / table中,并执行批量数据库插入,而不是像在当前代码中那样单独插入。

我想对上述方法以及可能的其他技术进行一些评论,以便我可以改善运行时间。 另外,我刚刚阅读了Java异步编程,但似乎无法理解如何在上面的代码中使用它,所以也希望对此有所帮助。

谢谢。

编辑:我可以想到两个瓶颈:

  • 下载网页
  • 将结果插入数据库(数据库位于另一个系统中)
  • 似乎每次执行1个URL处理

系统有8 GB的内存,其中4 GB似乎是免费的

 $ free -m total used free shared buffers cached Mem: 7843 4496 3346 0 193 2339 -/+ buffers/cache: 1964 5879 Swap: 1952 489 1463 

CPU使用率也很小:

 top - 13:31:19 up 19 days, 15:39, 3 users, load average: 0.00, 0.00, 0.00 Tasks: 215 total, 1 running, 214 sleeping, 0 stopped, 0 zombie Cpu(s): 0.5%us, 0.0%sy, 0.0%ni, 99.4%id, 0.1%wa, 0.0%hi, 0.0%si, 0.0%st Mem: 8031412k total, 4605196k used, 3426216k free, 198040k buffers Swap: 1999868k total, 501020k used, 1498848k free, 2395344k cached 

受Davide(伟大的)答案的启发这里有一个例子,简单的方法来使用简单反应 (我写的I库)来并行化。 请注意,使用客户端驱动服务器上的并发性略有不同。

 LazyReact streamBuilder = new LazyReact(15,15); streamBuilder.fromIterable(urlRequests) .filter(urlReq->!validateRequests.isWhitelisted(urlReq)) .forEach(request -> { ContentDetectionClient.detectContent(request); }); 

说明

看起来你可以从客户端驱动并发。 这意味着您可以在服务器端跨线程分发工作,而无需额外的工作。 在此示例中,我们发出了15个并发请求,但您可以将其设置为接近服务器可以处理的最大值。 您的应用程序是IO Bound,因此您可以使用大量线程来提高性能。

简单反应是一种期货流。 所以我们在这里为ContentDetection客户端的每次调用创建一个Async任务。 我们有15个线程可用,因此可以一次向服务器进行15次调用。

Java 7

Java 7的JDK 8function的后端端口称为StreamSupport ,您还可以通过RetroLambda反向移植Lambda Expressions。

要使用CompletableFutures实现相同的解决方案,我们可以为每个符合条件的URL创建一个Future Task。 更新我认为我们不需要批量处理它们,我们可以使用Executor来限制活跃期货的数量。 我们只需要在最后加入它们。

  Executor exec = Executors.newFixedThreadPool(maxActive);//15 threads List> futures= new ArrayList<>(); for (UrlRequest request : urlRequests) { if (!validateRequests.isWhitelisted(request)) { futures.add(CompletableFuture.runAsync(()->ContentDetectionClient.detectContent(request), exec)); } } CompletableFuture.allOf(futures.toArray()) .join(); 

首先检查你大部分时间都在哪里。

我想大部分时间都没有下载url了。

如果下载url的时间超过90%,可能无法改善您的应用程序,因为瓶颈不是Java,而是您的网络。


仅当下载时间在网络function下时才考虑以下内容

如果下载时间不是那么高,你可能会尝试提高你的表现。 标准方法是使用生产者消费者链。 详情请见此处 。

基本上你可以将你的工作拆分如下:

 Downloading --> Parsing --> Saving 

下载是生产者,解析是下载过程的消费者,生成器用于保存过程和保存是消费者。

每个步骤可以由不同数量的线程执行。 例如,您可以拥有3个下载线程,5个解析线程和1个保存线程。


评论后编辑

假设瓶颈不是cpu时间,所以对java代码进行干预并不重要。

如果您知道每天下载的GigaBytes有多少可以查看它们是否接近您网络的最大带宽。

如果发生这种情况,有不同的可能性

  • 使用Content-Encoding: gzip请求压缩内容Content-Encoding: gzip (因此减少使用的带宽)
  • 在不同网络上工作的不同节点之间拆分应用程序(因此在不同网络之间划分带宽)
  • 更新你的带宽(所以增加你的网络带宽)
  • 请务必仅下载所请求的内容(如果没有请求,则不要使用javascript,图片,CSS等)(以尽量减少带宽的使用)
  • 以前的解决方案的组合