用python3对search中的数据进行增删改查

  其他常见问题
内容纲要

概要说明

本案例用于实现 python3中通过elasticsearch连接search,通过api对数据库进行增删改查。


详细说明

1. 确认环境的python版本为3.3+,本次试验环境python版本为3.6.8:

python -V

2. 确认环境的Search中对应的elasticsearch版本:

# 在search server的节点执行以下命令,其中version字段中的number即为elasticsearch版本
[root@tdh-01/tmp/log]# curl -XGET localhost:9200
{
  "name" : "tdh-01",
  "cluster_name" : "cluster",
  "cluster_uuid" : "nuaHilBRSp2zd_87rZWQ7g",
  "version" : {
    "number" : "5.4.1",
    "build_hash" : "Unknown",
    "build_date" : "Unknown",
    "build_snapshot" : true,
    "lucene_version" : "6.5.1"
  },
  "tagline" : "You Know, for Search"
}

2. 安装相应的依赖包:

pip install elasticsearch==5.4.0

elasticsearch库的版本需要和search中elasticsearch大版本保持一致,否则无法正常使用。

3. 测试search是否可以连通:

from elasticsearch import Elasticsearch

# 无安全认证的集群
es_client = Elasticsearch(host + ':' + str(port), maxsize=15)
# 输出为true,则表可以连通,可以进行下一步
print(es_client.ping())

4. 准备search表

注意:第一列映射为Transwarp Search中相映射的Index的 _id ,必须是 STRING 类型的。

CREATE TABLE esdrive_test(
  search_id STRING,
  content STRING,
  tint INT,
  tbool BOOLEAN
)STORED AS ES
with shard number 10
replication 1;

创建成功后可以执行以下命令,查看索引基本信息,其中mappings字段中的default_type_即为该index的type,后续脚本中会需要用到

[root@tdh-01/tmp/log]# curl -XGET localhost:9200/default.esdrive_test?pretty
{
  "default.esdrive_test" : {
    "aliases" : { },
    "mappings" : {
      "default_type_" : {
        "properties" : {
          "content" : {
            "type" : "keyword"
          },
          "tbool" : {
            "type" : "boolean"
          },
          "tint" : {
            "type" : "integer"
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "creation_date" : "1641094067672",
        "number_of_shards" : "10",
        "number_of_replicas" : "1",
        "uuid" : "a8scA68uTvyR3cRch-zQHw",
        "version" : {
          "created" : "5040199"
        },
        "provided_name" : "default.esdrive_test"
      }
    }
  }
}

5. demo脚本

from elasticsearch import Elasticsearch
from time import sleep

class SearchApi():
    def __init__(self, host, port, user=None, pwd=None):
        if user and pwd:
            self.es_client = Elasticsearch(host + ':' + str(port), http_auth=(user, pwd), maxsize=15)  # 有安全认证的ES集群
        else:
            self.es_client = Elasticsearch(host + ':' + str(port), maxsize=15)  # 无安全认证的集群

    # 查询数据
    def get_index_data(self, index_name, id=None, doc_type='default_type_'):
        # 根据id查询单条数据
        if id:
            index_info = self.es_client.get(index=index_name, doc_type=doc_type, id=id)

        # 根据查询条件获取所有数据
        else:
            print('start get all data')
            body = {
                "query": {
                    "match_all": {}
                }
            }
            index_info = self.es_client.search(index=index_name, doc_type=doc_type, body=body)
        return index_info

     # 单条插入
    def single_insert(self, index_name, request_data, doc_type="default_type_"):
        # data_id对应SEACH表中的第一个字段,document的唯一标识
        data_id = request_data['id']
        data = request_data['insert_data']

        # 插入单条数据
        self.es_client.index(index=index_name, doc_type=doc_type, id=data_id, body=data)

    # 根据_id单条编辑
    def update_single_data(self, index_name, request_data, doc_type="default_type_"):
        # data_id对应SEACH表中的第一个字段,document的唯一标识
        data_id = request_data['id']
        data = request_data['update_data']

        # 根据id编辑
        self.es_client.update(index=index_name, id=data_id, doc_type=doc_type, body=data)

    # 根据_id单条删除
    def delete_single_data(self, index_name, id=None, doc_type="default_type_"):
        if id:
            self.es_client.delete(index=index_name, id=id, doc_type=doc_type,)

if __name__ == "__main__":
    es_client = SearchApi('172.22.33.1', 9200, user='admin', pwd='123456')
    # 其中id字段对应search表中第一个字段search_id
    insert_data = {
        "id": 1,
        "insert_data": {'content': '1', 'tint': 1, 'tbool': True}
    }
    # index_name可以再search的head页面查看
    es_client.single_insert('default.esdrive_test', insert_data)

    # 编辑数据
    update_data = {
        "id": 1,
        "update_data": {"doc": {'content': '1', 'tint': 1, 'tbool': False}}
    }
    es_client.update_single_data('default.esdrive_test', request_data=update_data)
    sleep(2)  # api操作存在一定的延时
    all_data = es_client.get_index_data('default.esdrive_test')
    print(all_data)

    # 删除数据
    es_client.delete_single_data('default.esdrive_test', id=1)
    sleep(2)  # api操作存在一定的延时
    all_data = es_client.get_index_data('default.esdrive_test')
    print(all_data)

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。