2017-02-23 60 views
1

背景

我目前正在为使用gatling的压力测试工具开发能力分析集。在Gatling场景中产生多个进一步操作的动作

这部分涉及使用滚动查询加载elasticsearch,然后加载更新API调用。

我想实现

第1步是什么:运行滚动发起并保存在那里可以通过进一步滚动使用的_scroll_id查询

步骤2:运行滚动查询上重复一遍,作为每个滚动查询的一部分,对返回的每个匹配进行修改,并将其重新索引到elasticsearch中,从一次滚动查询操作中有效产生多达1000个操作,并对结果进行采样。

第1步很容易。第2步不是很多。

我已经试过

我目前正试图通过一个分析JSON格式结果的ResponseTransformer实现这一目标,使修改每一个和火灾关闭一个线程试图另一个exec(http(...).post(...) etc)到索引中的每一个这些变化回到elasticsearch。

基本上,我认为我会用错误的方式去解决它。索引线程永远不会运行,更不用说采样了。

这里是我的滚动查询行为的主体:

... 

    val pool = Executors.newFixedThreadPool(parallelism) 

    val query = exec(http("Scroll Query") 
    .get(s"/_search/scroll") 
    .body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query 
    .check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response 
    .transformResponse { case response if response.isReceived => 
     new ResponseWrapper(response) { 
     val responseJson = JSON.parseFull(response.body.string) 
     // Get the hits and 
     val hits = responseJson.get.asInstanceOf[Map[String, Any]]("hits").asInstanceOf[Map[String,Any]]("hits").asInstanceOf[List[Map[String, Any]]] 
     for (hit <- hits) { 
      val id = hit.get("_id").get.asInstanceOf[String] 
      val immutableSource = hit.get("_source").get.asInstanceOf[Map[String, Any]] 
      val source = collection.mutable.Map(immutableSource.toSeq: _*) // Make the map mutable 
      source("newfield") = "testvalue" // Make a modification 
      Thread.sleep(pause) // Pause to simulate topology throughput 
      pool.execute(new DocumentIndexer(index, doctype, id, source)) // Create a new thread that executes the index request 
     } 
     } 
    }) // Make some mods and re-index into elasticsearch 

    ... 

DocumentIndexer看起来是这样的:

class DocumentIndexer(index: String, doctype: String, id: String, source: scala.collection.mutable.Map[String, Any]) extends Runnable { 

    ... 

    val httpConf = http 
    .baseURL(s"http://$host:$port/${index}/${doctype}/${id}") 
    .acceptHeader("application/json") 
    .doNotTrackHeader("1") 
    .disableWarmUp 

    override def run() { 

    val json = new ObjectMapper().writeValueAsString(source) 

    exec(http(s"Index ${id}") 
     .post("/_update") 
     .body(StringBody(json)).asJSON) 

    } 

} 

问题

  1. 使用加特林这甚至可能吗?
  2. 我该如何达到我想达到的目标?

感谢您的任何帮助/建议!

回答

1

可以通过使用jsonPath来提取JSON命中数组并将元素保存到会话中,然后使用操作链中的foreach和循环中的索引任务执行索引因此。

即:
ScrollQuery

... 
    val query = exec(http("Scroll Query") 
    .get(s"/_search/scroll") 
    .body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query 
    .check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response 
    .check(jsonPath("$.hits.hits[*]").ofType[Map[String,Any]].findAll.saveAs("hitsJson")) // Save a List of hit Maps into the session 
) 
... 

模拟

... 
    val scrollQueries = scenario("Enrichment Topologies").exec(ScrollQueryInitiator.query, repeat(numberOfPagesToScrollThrough, "scrollQueryCounter"){ 
     exec(ScrollQuery.query, pause(10 seconds).foreach("${hitsJson}", "hit"){ exec(HitProcessor.query) }) 
    }) 
... 

HitProcessor

... 
    def getBody(session: Session): String = { 
    val hit = session("hit").as[Map[String,Any]] 
    val id = hit("_id").asInstanceOf[String] 
    val source = mapAsScalaMap(hit("_source").asInstanceOf[java.util.LinkedHashMap[String,Any]]) 
    source.put("newfield", "testvalue") 
    val sourceJson = new ObjectMapper().writeValueAsString(mapAsJavaMap(source)) 
    val json = s"""{"doc":${sourceJson}}""" 
    json 
    } 

    def getId(session: Session): String = { 
    val hit = session("hit").as[Map[String,Any]] 
    val id = URLEncoder.encode(hit("_id").asInstanceOf[String], "UTF-8") 
    val uri = s"/${index}/${doctype}/${id}/_update" 
    uri 
    } 

    val query = exec(http(s"Index Item") 
    .post(session => getId(session)) 
    .body(StringBody(session => getBody(session))).asJSON) 
... 

声明:此代码仍然需要优化!而且我还没有实际学到很多Scala。随意用更好的解决方案发表评论

做完了这个之后,我现在真正想要实现的是并行化给定数量的索引任务。即:我得到1000个命中,我想为每个单独的命中执行一个索引任务,但不是迭代它们并且一个接一个地执行它们,我想一次同时执行10个命中任务。

但是,我认为这是一个单独的问题,真的,所以我会这样介绍它。