本文共 10075 字,大约阅读时间需要 33 分钟。
前提说明:
提交一段内容到ES,ES内部会存储成两个数据库:
文档库
结构如下:
_index _type _id _version _source _id _version _source _id _version _source ...
反向索引库
结构如下:
term词项 _id 词频、位置term词项 _id 词频、位置term词项 _id 词频、位置...
因此,在我们查询的时候,一般使用term词项先去反向索引库中查询到文档_id,然后根据_id去文档库中找到最初上传上去的所有文档原始内容_source,当然,前提是上传的时候有保存原始数据
新增文档,自动生成文档id:
POST twitter/_doc/{ "id": 1, "user" : "kimchy", "post_date" : "2009-11-15T14:12:12", "message" : "trying out Elasticsearch"}
新增文档,指定文档id(id若存在则为修改):
PUT twitter/_doc/1{ "id": 1, "user" : "kimchy", "post_date" : "2009-11-15T14:12:12", "message" : "trying out Elasticsearch"}
返回结果:
{ "_index": "twitter", #所属索引 "_type": "_doc", #所属mapping type "_id": "p-D3ymMBl4RK_V6aWu_V", #文档id "_version": 1, #文档版本 "result": "created", #创建成功 "_shards": { "total": 3, #所在分片有三个副本 "successful": 1, #只有一个副本成功写入,可能节点机器只有一台 "failed": 0 #失败副本数 }, "_seq_no": 0, #第几次操作该文档 "_primary_term": 3 #词项数}
HEAD twitter/_doc/1 #查看是否存储,返回200表示已存储GET twitter/_doc/1 #返回源数据的查询GET twitter/_doc/1?_source #同上GET twitter/_doc/1?_source=false #不返回源数据的查询GET twitter1/_doc/1?stored_fields=tags,counter #查询存储的字段,即字段中store=false的字段不会被查询出来
存储和不存储的字段用例如下:
PUT twitter11{ "mappings": { "_doc": { "properties": { "counter": { "type": "integer", "store": false }, "tags": { "type": "keyword", "store": true } } } }}PUT twitter11/_doc/1{ "counter" : 1, "tags" : ["red"]}GET twitter11/_doc/1?stored_fields=tags,counter
使用_mget命令
GET /_mget{ "docs" : [ { "_index" : "twitter", "_type" : "_doc", "_id" : "1" }, { "_index" : "twitter", "_type" : "_doc", "_id" : "2" "stored_fields" : ["field3", "field4"] } ]}
GET /twitter/_mget{ "docs" : [ { "_type" : "_doc", "_id" : "1" }, { "_type" : "_doc", "_id" : "2" } ]}
GET /twitter/_doc/_mget{ "docs" : [ { "_id" : "1" }, { "_id" : "2" } ]}
GET /twitter/_doc/_mget{ "ids" : ["1", "2"]}
根据文档id删除:
DELETE twitter/_doc/1 #指定文档id进行删除DELETE twitter/_doc/1?version=1 #指定文档id和版本号进行删除
查询删除,会先查询然后在删除,可能耗时会比较长:
POST twitter/_delete_by_query{ "query": { "match": { "message": "some message" } }}
查询删除耗时较长,可能会出现文档冲突的可能(此时有一个修改操作的话),那么可以使用conflicts=proceed参数解决冲突问题(当有文档存在版本冲突时,不放弃删除操作,将会记录冲突的文档,然后继续删除其他复合查询的文档):
POST twitter/_doc/_delete_by_query?conflicts=proceed{ "query": { "match_all": { } }}
查看"查询删除"的任务,因为查询删除是一个耗时的过程:
GET _tasks?detailed=true&actions=*/delete/byquery #查看查询删除的任务GET /_tasks/taskId:1 #查看具体任务的状态POST _tasks/task_id:1/_cancel #取消任务,取消并不会回滚
PUT twitter/_doc/1{ "id": 1, "user" : "kimchy", "post_date" : "2009-11-15T14:12:12", "message" : "trying out Elasticsearch"}
指定版本号更新,类似于数据库的乐观锁,这里的版本号也是类似乐观锁控制更新
PUT twitter/_doc/1?version=1{ "id": 1, "user" : "kimchy", "post_date" : "2009-11-15T14:12:12", "message" : "trying out Elasticsearch"}
painless是es内置的一种脚本语言,ctx执行上下文对象(通过它还可访问_index, _type, _id, _version, _routing and _now (the current timestamp) ),params是参数集合。
脚本更新要求索引的_source 字段是启用的。
更新流程如下:
- 获取到原文档
- 通过_source字段的原始数据,执行脚本修改
- 删除原索引文档
- 索引修改后的文档
使用脚本可以降低了一些网络往返,并减少了get和索引之间版本冲突的可能性。
准备一个文档:
PUT uptest/_doc/1{ "counter" : 1, "tags" : ["red"]}
对文档1的counter+4:
POST uptest/_doc/1/_update{ "script" : { "source": "ctx._source.counter += params.count", "lang": "painless", "params" : { "count" : 4 } }}
往数组中加入元素:
POST uptest/_doc/1/_update{ "script" : { "source": "ctx._source.tags.add(params.tag)", "lang": "painless", "params" : { "tag" : "blue" } }}
添加一个字段:
POST uptest/_doc/1/_update{ "script" : "ctx._source.new_field = 'value_of_new_field'"}
移除一个文档:
POST uptest/_doc/1/_update{ "script" : "ctx._source.remove('new_field')"}
判断删除或do nothing
POST uptest/_doc/1/_update{ "script" : { "source": "if (ctx._source.tags.contains(params.tag)) { ctx.op = 'delete' } else { ctx.op = 'none' }", "lang": "painless", "params" : { "tag" : "green" } }}
连续两次更新相同的内容,第二次会检测不需要更新,返回noop
POST uptest/_doc/1/_update{ "doc" : { "name" : "new_name" }}
如果希望第二次也更新,则设置不做noop检测
POST uptest/_doc/1/_update{ "doc" : { "name" : "new_name" }, "detect_noop": false}
upsert 操作:如果要更新的文档存在,则执行脚本进行更新,如不存在,则把 upsert中的内容作为一个新文档写入
POST uptest/_doc/1/_update{ "script" : { "source": "ctx._source.counter += params.count", "lang": "painless", "params" : { "count" : 4 } }, "upsert" : { "counter" : 1 }}
通过条件查询来更新文档
POST twitter/_update_by_query{ "script": { "source": "ctx._source.likes++", "lang": "painless" }, "query": { "term": { "user": "kimchy" } }}
批量操作API /_bulk 让我们可以在一次调用中执行多个索引、删除操作。
这可以大大提高索引数据的速度。批量操作内容体需按如下以新行分割的json结构格式给出:
action_and_meta_data\noptional_source\naction_and_meta_data\noptional_source\n....action_and_meta_data\noptional_source\n
- 一行命令,一行数据,如此反复
- action_and_meta_data:action表示命令,可以是index、create、delete、update中的一种,meta_data表示元数据,一般指_index、_type、_id 比如下面的例子:
POST _bulk{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }{ "field1" : "value1" }{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }{ "field1" : "value3" }{ "update" : { "_id" : "1", "_type" : "_doc", "_index" : "test"} }{ "doc" : { "field2" : "value2"} }
- 请求端点可以是:/_bulk, /{index}/_bulk, {index}/{type}/_bulk
- 如果请求端点是{index}/{type}/_bulk,则meta_data中的_index和_type可以不写
curl + json 文件 批量索引多个文档:
curl -H "Content-Type: application/json" -XPOST "localhost:9200/bank/_doc/_bulk?pretty&refresh" --data-binary "@accounts.json"
accounts.json要放在执行命令的当前目录,json数据如下:
{ "index":{ "_id":"1"}}{ "account_number":1,"balance":39225,"firstname":"Amber","lastname":"Duke","age":32,"gender":"M","address":"880 Holmes Lane","employer":"Pyrami","email":"amberduke@pyrami.com","city":"Brogan","state":"IL"}{ "index":{ "_id":"6"}}{ "account_number":6,"balance":5686,"firstname":"Hattie","lastname":"Bond","age":36,"gender":"M","address":"671 Bristol Street","employer":"Netagy","email":"hattiebond@netagy.com","city":"Dante","state":"TN"}{ "index":{ "_id":"13"}}{ "account_number":13,"balance":32838,"firstname":"Nanette","lastname":"Bates","age":28,"gender":"F","address":"789 Madison Street","employer":"Quility","email":"nanettebates@quility.com","city":"Nogal","state":"VA"}{ "index":{ "_id":"18"}}{ "account_number":18,"balance":4180,"firstname":"Dale","lastname":"Adams","age":33,"gender":"M","address":"467 Hutchinson Court","employer":"Boink","email":"daleadams@boink.com","city":"Orick","state":"MD"}
Reindex API /_reindex 让我们可以将一个索引中的数据重索引到另一个索引中(拷贝),要求源索引的_source 是开启的,目标索引的setting 、mapping 信息与源索引无关。
POST _reindex{ "source": { "index": "twitter" }, "dest": { "index": "new_twitter" }}
重索引要考虑的一个问题:目标索引中存在源索引中的数据,这些数据的version如何处理。
情形一:如果没有指定version_type 或指定为 internal,则会采用目标索引中的版本,重索引过程中,执行的就是新增操作。
POST _reindex{ "source": { "index": "twitter" }, "dest": { "index": "new_twitter", "version_type": "internal" }}
情形二:如果想使用源索引中的版本来进行版本控制更新,则设置 version_type 为extenal。重索引操作将写入不存在的,更新旧版本的数据。
POST _reindex{ "source": { "index": "twitter" }, "dest": { "index": "new_twitter", "version_type": "external" }}
情形三:如果你只想从源索引中复制目标索引中不存在的文档数据,可以指定 op_type 为 create 。此时存在的文档将触发 版本冲突(会导致放弃操作),可设置“conflicts”: “proceed“,跳过继续
POST _reindex{ "conflicts": "proceed", "source": { "index": "twitter" }, "dest": { "index": "new_twitter", "op_type": "create" }}
情形四:也可以只索引源索引的一部分数据,通过 type 或 查询来指定你需要的数据
POST _reindex{ "source": { "index": "twitter", "type": "_doc", "query": { "term": { "user": "kimchy" } } }, "dest": { "index": "new_twitter" }}
情形五:从多个源获取数据
POST _reindex{ "source": { "index": ["twitter", "blog"], "type": ["_doc", "post"] }, "dest": { "index": "all_together" }}
情形六:可以限定文档数量
POST _reindex{ "size": 10000, "source": { "index": "twitter", "sort": { "date": "desc" } }, "dest": { "index": "new_twitter" }}
情形七:可以选择复制源文档的那些字段
POST _reindex{ "source": { "index": "twitter", "_source": ["user", "_doc"] }, "dest": { "index": "new_twitter" }}
情形八:可以用scropt脚本来改变文档
POST _reindex{ "source": { "index": "twitter" }, "dest": { "index": "new_twitter", "version_type": "external" }, "script": { "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}", "lang": "painless" }}
情形九:可以指定路由值
POST _reindex{ "source": { "index": "source", "query": { "match": { "company": "cat" } } }, "dest": { "index": "dest", "routing": "=cat" }}
情形十:可以从远程数据源进行复制
POST _reindex{ "source": { "remote": { "host": "http://otherhost:9200", "username": "user", "password": "pass" }, "index": "source", "query": { "match": { "test": "data" } } }, "dest": { "index": "dest" }}
通过task来查询执行状态
GET _tasks?detailed=true&actions=*reindex
对于索引、更新、删除操作如果想操作完后立马重刷新可见,可带上refresh参数。
PUT /test/_doc/1?refresh{ "test": "test"}PUT /test/_doc/2?refresh=true{ "test": "test"}
refresh参数说明:
- 未给值或=true,则立马会重刷新读索引
- =false ,相当于没带refresh 参数,遵循内部的定时刷新
- =wait_for ,登记等待刷新,当登记的请求数达到index.max_refresh_listeners 参数设定的值时(defaults to 1000),将触发重刷新
转载地址:http://egsxi.baihongyu.com/