内容纲要
概要说明
本案例用于实现 python3中通过elasticsearch连接search,通过api对数据库进行增删改查。
详细说明
1. 确认环境的python版本为3.3+,本次试验环境python版本为3.6.8:
python -V
2. 确认环境的Search中对应的elasticsearch版本:
# 在search server的节点执行以下命令,其中version字段中的number即为elasticsearch版本
[root@tdh-01/tmp/log]# curl -XGET localhost:9200
{
"name" : "tdh-01",
"cluster_name" : "cluster",
"cluster_uuid" : "nuaHilBRSp2zd_87rZWQ7g",
"version" : {
"number" : "5.4.1",
"build_hash" : "Unknown",
"build_date" : "Unknown",
"build_snapshot" : true,
"lucene_version" : "6.5.1"
},
"tagline" : "You Know, for Search"
}
2. 安装相应的依赖包:
pip install elasticsearch==5.4.0
elasticsearch库的版本需要和search中elasticsearch大版本保持一致,否则无法正常使用。
3. 测试search是否可以连通:
from elasticsearch import Elasticsearch
# 无安全认证的集群
es_client = Elasticsearch(host + ':' + str(port), maxsize=15)
# 输出为true,则表可以连通,可以进行下一步
print(es_client.ping())
4. 准备search表
注意:第一列映射为Transwarp Search中相映射的Index的 _id ,必须是 STRING 类型的。
CREATE TABLE esdrive_test(
search_id STRING,
content STRING,
tint INT,
tbool BOOLEAN
)STORED AS ES
with shard number 10
replication 1;
创建成功后可以执行以下命令,查看索引基本信息,其中mappings字段中的default_type_即为该index的type,后续脚本中会需要用到
[root@tdh-01/tmp/log]# curl -XGET localhost:9200/default.esdrive_test?pretty
{
"default.esdrive_test" : {
"aliases" : { },
"mappings" : {
"default_type_" : {
"properties" : {
"content" : {
"type" : "keyword"
},
"tbool" : {
"type" : "boolean"
},
"tint" : {
"type" : "integer"
}
}
}
},
"settings" : {
"index" : {
"creation_date" : "1641094067672",
"number_of_shards" : "10",
"number_of_replicas" : "1",
"uuid" : "a8scA68uTvyR3cRch-zQHw",
"version" : {
"created" : "5040199"
},
"provided_name" : "default.esdrive_test"
}
}
}
}
5. demo脚本
from elasticsearch import Elasticsearch
from time import sleep
class SearchApi():
def __init__(self, host, port, user=None, pwd=None):
if user and pwd:
self.es_client = Elasticsearch(host + ':' + str(port), http_auth=(user, pwd), maxsize=15) # 有安全认证的ES集群
else:
self.es_client = Elasticsearch(host + ':' + str(port), maxsize=15) # 无安全认证的集群
# 查询数据
def get_index_data(self, index_name, id=None, doc_type='default_type_'):
# 根据id查询单条数据
if id:
index_info = self.es_client.get(index=index_name, doc_type=doc_type, id=id)
# 根据查询条件获取所有数据
else:
print('start get all data')
body = {
"query": {
"match_all": {}
}
}
index_info = self.es_client.search(index=index_name, doc_type=doc_type, body=body)
return index_info
# 单条插入
def single_insert(self, index_name, request_data, doc_type="default_type_"):
# data_id对应SEACH表中的第一个字段,document的唯一标识
data_id = request_data['id']
data = request_data['insert_data']
# 插入单条数据
self.es_client.index(index=index_name, doc_type=doc_type, id=data_id, body=data)
# 根据_id单条编辑
def update_single_data(self, index_name, request_data, doc_type="default_type_"):
# data_id对应SEACH表中的第一个字段,document的唯一标识
data_id = request_data['id']
data = request_data['update_data']
# 根据id编辑
self.es_client.update(index=index_name, id=data_id, doc_type=doc_type, body=data)
# 根据_id单条删除
def delete_single_data(self, index_name, id=None, doc_type="default_type_"):
if id:
self.es_client.delete(index=index_name, id=id, doc_type=doc_type,)
if __name__ == "__main__":
es_client = SearchApi('172.22.33.1', 9200, user='admin', pwd='123456')
# 其中id字段对应search表中第一个字段search_id
insert_data = {
"id": 1,
"insert_data": {'content': '1', 'tint': 1, 'tbool': True}
}
# index_name可以再search的head页面查看
es_client.single_insert('default.esdrive_test', insert_data)
# 编辑数据
update_data = {
"id": 1,
"update_data": {"doc": {'content': '1', 'tint': 1, 'tbool': False}}
}
es_client.update_single_data('default.esdrive_test', request_data=update_data)
sleep(2) # api操作存在一定的延时
all_data = es_client.get_index_data('default.esdrive_test')
print(all_data)
# 删除数据
es_client.delete_single_data('default.esdrive_test', id=1)
sleep(2) # api操作存在一定的延时
all_data = es_client.get_index_data('default.esdrive_test')
print(all_data)