测试
# -*- coding: utf-8 -*-
"""
Created on 2019-6-13 10:19:25
@author: chenlin3
"""
import esSdk
class EsSdkTest():
def test(self):
name = EsSdkTest.__name__
print('begin, %s' % name)
list=[]
col = {}
col['colName'] = 'id'
col['colType'] = 1
col['showName'] = 'ID'
list.append(col)
col1 = {}
col1['colName'] = 'name'
col1['colType'] = 1
col1['showName'] = '姓名'
list.append(col1)
col2 = {}
col2['colName'] = 'age'
col2['colType'] = 2
col2['showName'] = '年龄'
list.append(col2)
col3 = {}
col3['colName'] = 'birth'
col3['colType'] = 3
col3['showName'] = '生日'
list.append(col3)
dataList = []
data = {}
data['id'] = '1'
data['name'] = '地点'
data['age'] = '20'
data['birth'] = '2019/01/01'
dataList.append(data)
data = {}
data['id'] = '2'
data['name'] = '地点22'
data['age'] = '4,057,934,769.4598'
data['birth'] = '2019/02/01'
dataList.append(data)
data = {}
data['id'] = '2'
data['name'] = '地点11'
data['age'] = '60,435,766'
data['birth'] = ''
dataList.append(data)
print(dataList)
esSdk.importData('sdk_index_2','sdktest',list,dataList)
if __name__ == '__main__':
test = EsSdkTest()
test.test()
逻辑
# -*- coding: utf-8 -*-
"""
Created on 2019-6-13 10:19:25
@author: chenlin3
"""
from elasticsearch import Elasticsearch
import datetime
from elasticsearch import helpers
# from elasticsearch.helpers import bulk
# es = Elasticsearch(
# ['elsearch.com'],
# timeout=3600
# )
es = Elasticsearch([{'host':'10.10.8.6','port':9200,'timeout':6000}])
def importData(tableName, projectId, columnList, dataList):
indexName = "scene_" + tableName + "_" + projectId
res = createIndex(indexName, columnList)
if res :
bulk_data(indexName, columnList, dataList)
print("success")
def bulk_data(indexName, columnList, dataList):
actions = []
elements = {}
for data in dataList:
element = {}
for column in columnList:
name = column["colName"]
type = column["colType"]
val = data[name]
if type==2:
val = str(val).replace(",","")
if type ==3 and val =="":
val = None
element[name] = val
element["runtime"] = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
elements["_index"] = indexName
elements["_type"] = indexName
elements["_source"] = element
actions.append(elements)
helpers.bulk(es, actions)
def createIndex(indexName, columnList):
created = False
try:
if es.indices.exists(indexName) is not True:
properties = {}
for col in columnList:
colName = col["colName"]
prop = {}
if col["colType"] == 1:
prop["type"] = "text"
prop["index"] = "not_analyzed"
if col["colType"] == 2:
prop["type"] = "double"
prop["index"] = "not_analyzed"
if col["colType"] == 3:
prop["type"] = "date"
prop["format"] = "yyyy/MM/dd||yyyy/MM/dd HH:mm:ss"
prop["index"] = "not_analyzed"
properties[colName] = prop
prop = {}
prop["type"] = "date"
prop["format"] = "yyyy/MM/dd||yyyy/MM/dd HH:mm:ss"
prop["index"] = "not_analyzed"
properties["runtime"] = prop
body = {
"settings": {
"index.refresh_interval": "30s",
"index.translog.sync_interval": "30s",
"index.translog.flush_threshold_size": "1gb",
"index.merge.scheduler.max_thread_count": 1
},
"mappings": {
"all":{
"enabled":False
},
indexName:{
"properties": properties
}
}
}
es.indices.create(index=indexName, body=body)
created = True
except Exception as ex:
print(ex)
raise ex
finally:
return created