ElasticSearch接触后的一点心得

最近项目需要使用elasticsearch来加速数据的搜索,上网找了一下教程,发现不是那么简单。光安装这一块就费了很大功夫。

中文文档安装head插件文档索引时报错

首先在windows下安装elasticsearch,必须的条件就是java环境,然后去官网下载压缩包解压,配置环境变量,然后执行elasticsearch.bat,访问http://localhost:9200测试是否启动。

启动的时候可能会报错,大概是java虚拟环境的内存分配不合理。如图

更改config/jvm.options,增加

-Xms512m
-Xmx1g

 

然后是head插件的安装,5.*以上版本bin/elasticseach-plugin.bat没法直接在命令行下使用。只能去github上clone项目后,再自行安装。

安装后访问,http://localhost:9100测试,这儿可能会提示集群未连接,这是因为浏览器限制了跨域,打开config/elasticsearch.yml,增加以下两句接即可。

 http.cors.enabled: true
 http.cors.allow-origin: "*"

一切就绪开始使用时,又出现问题,为某个文档索引时,命令如下:

curl -X PUT ""http://localhost:9200/megacorp/employee/1?pretty" -d "{"name":"laokiea","age":20}"

执行就会报错failed_to_parse,应该是解析json的时候出错。查询得知,windows使用双引号,需要加\,类似于转义。

elasticsearch有php的api,研究了下,可以很轻松的建立索引,索引文档等,但是需要composer依赖。下载安装composer,编写composer.json

{
    "require": {
        "elasticsearch/elasticsearch": "~5.0"
    }
}

然后composer install命令,即可在项目文件夹下创建vendor目录

举例针对一个产品表,10万条数据,全部索引,代码(用的TP框架)如下:


require "././././vendor/autoload.php";
public function createIndexAndBatchIndex()
    {
    	$params['index'] = "sys_config";
    	$client = \Elasticsearch\ClientBuilder::create()->build();
    	$client->indices()->delete($params);
    	$client->indices()->create($params);

    	//索引文档
    	$Model = M("ranzhi_100001.product","yd_");
    	$total = $Model->field(["count(*)"=>"total"])->find()["total"];
    	for($i=0;$i<ceil($total/5000);$i++){
    		$begin = 5000*$i;
    		$end = 5000*($i+1);
    		$products = $Model->field(["id","sname","name"])->limit($begin,$end)->select();
    		$documents = ['body'=> []];
    		$j = 1;
    		foreach ($products as $product) {
    			$documents['body'][] = ["index" => ["_index" => "sys_config", "_type" => 'config', "_id" => $begin+$j]];
    			$documents['body'][] = ["sname"=>$product['sname'],"name" => $product['name']];
    			$j++;
    		}
    		$client->bulk($documents);
    		unset($documents);
    	}
    	echo "all has been indexed";
    }

结果如图

100000+数据全部被索引。


03.01 总结:

通常项目里使用elasticsearch,做法是,拷贝需要搜索的数据到elasticsearch,然后用其做搜索查询,但是可能会有 并发操作的问题,举个例子,比如有100件商品,A,B两个人同时去卖一个商品,

那可能就会造成A,B 都以为只减少了一个产品, 但实际上却减少了两个。这有点类似数据库的锁问题。

这里可以有两个方法解决:

第一种,对文档的操作通常会有版本号,比如创建一个文档返回的结果里,_version是1,然后再去执行一次,这次已经变成更新了,_version也会随之增加成2,如果这时候我们再去访问一次但是在地址后面带上?version=1,表示说只希望对version为1的文档进行操作。执行就会返回409错误。再回到上面的问题,A或B在做更新时,先取得最新的文档信息,然后再请求时附带上信息里的_version信息,如果有别人已经修改过,那么_version信息也会被更改,那么此次请求肯定是不成功的。

第二种,还是类似上面的做法用版本号去控制,只不过不是使用系统分配的版本号,而是使用自定义的,比如你的系统里有某些信息(时间戳等)可以充当。自定义的版本号必须是正数且大于0,请求时在查询字符串后面带上?version=your version&version_type=external。


3.6总结:

今天开始尝试编写代码。首先第一个问题就是分词,elasticsearch自带好几种分词器,对中文也有chinese分词,但是效果不是很好,所以采用ik分词器。对中文分词实现的很好。

先去github上clone项目。再利用maven打包,mvn -package,打包成jar文件,放入es目录下的plugins下,重启es,报错,提示ik版本与es版本不符合,于是改成直接下载tar压缩包,地址:https://github.com/medcl/elasticsearch-analysis-ik/releases,选择对应版本,解压后直接放入es目录下的plugins/ik,无需打包。

然后测试:

curl -X GET "http://localhost:9200/_analyze?analyzer=ik_max_word&text=你好啊&pretty"

出现乱码并没有按照中文分词,原因是windows下curl默认使用的编码是gbk。需要接受utf-8的编码。于是写一段代码转成%xx的格式:

<?php
$str = "你好啊";
$utf = '';
$str = iconv($str, "GB2312","UTF-8");
for($i=0;$i<strlen($str);$i++){
// %表示转义
$utf .= sprintf("%%%02X",ord(substr($str,$i,1)));
}

再测试成功,说明中文分词可以。


3.7总结

放上索引数据的脚本:

 #!/usr/bin/env php
<?php

$v_autoload =  "../../vendor/autoload.php";
require $v_autoload;

// 以下都是测试数据
$user = "root";
$pass = "";
$dbname = "ranzhi_100001";
$host = "127.0.0.1";
$batchNum = 5000;
$es_hosts = ["localhost:9200"];

$options = [
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_PERSISTENT => false,
PDO::MYSQL_ATTR_INIT_COMMAND => "set names utf8mb4",
];
try{$pdo = new PDO("mysql:dbname=".$dbname.";host=".$host,$user,$pass,$options);}
catch(PDOException $e){
     die("PDO init fail : ".$e->getMessage());
}

$countSql = "SELECT count(*) as count from yd_stock";
$count = $pdo->query($countSql)->fetch(PDO::FETCH_OBJ)->count;

try{$client = \Elasticsearch\ClientBuilder::create()->setHosts($es_hosts)->build();}
catch(Exception $e){
     die("Elasticsearch init fail : ".$e->getMessage());
}
// 为库存产品相关产品建立索引
$index = [
     "index" => '1pei',
     "body" => [
          "settings" => [
               "number_of_shards" => 5,   // 索引数据分配到三个主分片上
               "number_of_replicas" => 2, // 每一个主分片都有两个复制分片
          ],
          "mappings" => [
               // 对stockProduct类型映射,(只对可能用于搜索的属性索引)
               "stockProduct" => [
                    // 查询到的文档主体以json格式保存在_source字段中
                    "_source" => ["enabled" => true],

                    // 此选项表示在该类型下索引文档,id默认从sid这个字段中生成
                    "_id" => ["path" => "sid"],

                    // 搜索禁用_all,如果启用,类似一个独立的字段,分词器采用ik
                    "_all" => [
                         "enabled" => false,
                         "analyzer" => 'ik_max_word',
                    ],
                    // 此设置表示如果该类型增加了新的字段,会抛出一个异常,但是可以在内部对象中设置成true
                    "dynamic" => "strict",

                    // 建立一个动态模板用于查询相同类型下的相同字段,但是类型不同的情况
                    "dynamic_templates" => [
                         "es" => [
                              "match" => "*_es",
                              // 规定匹配的字段只能是string类型
                              "match_mapping_type" => "string",
                              "mapping" => [
                                   "type" => "string",
                                   "index" => "analyzed",
                                   "analyzer" => "ik_max_word",
                              ],
                         ],
                         "es_path" => [
                              // 用于内部对象
                              "path_match" => "info.name",
                              "match_mappping_type" => "string",
                              "mapping" => [
                                   "type" => "string",
                                   "index" => "analyzed",
                                   "analyzer" => "ik_max_word",
                              ],
                         ],
                    ],

                    "properties" => [
                         "sid" => ["type" => "long"],
                         "pid" => ["type" => "long"],
                         "partid" => ["type" => "long"],
                         "productName" => ["type" => "text","index" => "analyzed","analyzer" => 'ik_max_word'],
                         "sname" => ["type" => "text","index" => "analyzed","analyzer" => 'ik_max_word'],
                         "pn" => ["type" => "text"],
                         "oem" => ["type" => "text"],
                         "manufacturer" => ["type" => "text","index" => "analyzed","analyzer" => 'ik_max_word'],
                         "madein" => ["type" => "text","index" => "analyzed","analyzer" => 'ik_max_word'],
                         "dwgno" => ["type" => "text"],
                         "materialNumber" => ["type" => "text"],
                         "inStockNo" => ["type" => "text"],
                         "standard" => ["type" => "text","index" => "analyzed","analyzer" => 'ik_max_word'],
                         "unit" => ["type" => "text","index" => "analyzed","analyzer" => 'ik_max_word'],
                         "store" => ["type" => "long"],
                         "shelf" => ["type" => "long"],
                         "count" => ["type" => "long"],
                         "dynacount" => ["type" => "long"],
                         "chainID" => ["type" => "long"],
                         "provider" => ["type" => "long"],
                         "providerName" => ["type" => "text","index" => "analyzed","analyzer" => 'ik_max_word'],
                         "storeName" => ["type" => "text","index" => "analyzed","analyzer" => 'ik_max_word'],
                         "shelfName" => ["type" => "text","index" => "analyzed","analyzer" => 'ik_max_word'],
                         "partGroupName" => ["type" => "text","index" => "analyzed","analyzer" => 'ik_max_word'],
                    ],
               ],
          ],
     ],
];
try{if($client->indices()->getSettings(["index"=>"1pei"])) goto _index;}
catch(Exception $e){}
$client->indices()->create($index);

_index:
// 索引数据
$allDataSql = "SELECT t4.name as partGroupName, s.id as sid, p.id as pid, p.partid, p.name as productName, p.sname,p.pn,p.oem, p.manufacturer, p.madein, p.dwgno, p.materialNumber, p.standard, p.unit,s.store, store.name as storeName,shelf.name as shelfName,s.shelf,s.count,s.dynacount, s.inStockNo, s.provider,c.name as providerName,s.chainID FROM yd_stock s LEFT JOIN yd_product p on s.pid = p.id LEFT JOIN ranzhico.scm_parts t3 on p.partid = t3.id LEFT JOIN ranzhico.scm_parts t4 on t4.id = t3.parent LEFT JOIN crm_customer c on s.provider = c.id LEFT JOIN yd_store store on s.store = store.id LEFT JOIN yd_shelf shelf on shelf.id = s.shelf RIGHT JOIN (SELECT id FROM yd_stock limit %d,%d) b on b.id = s.id";
$properties = array_keys($index['body']['mappings']['stockProduct']['properties']);

for($i=0;$i < ceil($count/$batchNum); $i++){
     $beginTime = time();
     $begin = $batchNum * $i;
     $end = $batchNum * ($i+1);
     $stmt = $pdo->prepare(sprintf($allDataSql, $begin, $batchNum));
     $stmt->execute();
     $result =  $stmt->fetchAll(PDO::FETCH_OBJ);
     $documents = ['body'=> []];
     foreach ($result as $id => $stock) {
          $documents['body'][] = ["index" => ["_index" => "1pei", "_type" => 'stockProduct', "_id" => $stock->sid,"timestamp" => time(),]];
          foreach($properties as $property){
               $document[$property] = $stock->$property;
          }
          $documents['body'][] = $document;
     }
     $client->bulk($documents);
     unset($documents,$result);
     $currentMemory = memory_get_usage()/1024/2024;
     echo "[$i] time success from [$begin] to [$end], spent [".(time()-$beginTime)."]s, current memory is [".$currentMemory."M]".PHP_EOL;
}
echo "data indexed complately".PHP_EOL;

 

测试脚本:,成功索引。

查询数据:,中文分词可以使用。

研究了5.0接口文档,其中一个future(并发)用法值得尝试一下,如下

$params = [
		    'index' => '1pei',
		    'type' => 'stockProduct',
		    'id' => 433138,
		    'client' => [
		        'future' => 'lazy'
		    ]
		];
$future = $client->get($params);
// 此时返回的不是相应体,而是一个future对象

可以同时多个请求,并在一个数组内将所有的结果返回。


3.8总结

继续研究php api,最后给了一个插件用来转换curl和php客户端的dsl,挺方便的,依赖composer,执行命令安装

composer require ongr\elasticsearch-dsl

比如需要以下查询


        $boolQuery = new \ONGR\ElasticsearchDSL\Query\Compound\BoolQuery();
        $geoQuery = new \ONGR\ElasticsearchDSL\Query\TermLevel\termQuery("store",2);
        $matchQuery = new \ONGR\ElasticsearchDSL\Query\FullText\matchQuery("sname","离合器");
        $boolQuery->add($geoQuery,\ONGR\ElasticsearchDSL\Query\Compound\BoolQuery::FILTER);
        $boolQuery->add($matchQuery, \ONGR\ElasticsearchDSL\Query\Compound\BoolQuery::MUST);
      
        $search = new \ONGR\ElasticsearchDSL\Search();
        $search->addQuery($boolQuery);
      
    	$params = [
    		"index" => "1pei",
    		"type" => "stockProduct",
    		"body" => $search->toArray(),
    		// "client" => [
    		// 	"verbose" => true,
    		// ]
    	];

构造的结果类似

{
    "query": {
        "bool": {
            "must": [
                {
                    "match": {
                        "sname" : "离合器",
                    }
                }
            ],
            "filter": [
                {
                    "term": {
                        "store": "2",
                    }
                }
            ]
        }
    }
}
省去了自己编写dsl的麻烦,还容易出错。