# -- coding: utf-8 -- import md5,time,random,hmac,base64, copy import urllib from hashlib import sha1 import httplib import hashlib class V3Api: URI_PREFIX = '/v3/openapi/apps/' OS_PREFIX = 'OPENSEARCH' VERB = 'POST' #定义需推送到的应用表名,替换下面内容为数据需推送到应用中某个表名 TABLE_NAME = 'tab' #定义上传数据,将下面待上传数据替换为自己的数据 body_json = u'[{\"cmd\":\"ADD\",\"fields\":{\"id\":1,\"name\":\"测试推送数据\"}}]' def __init__(self, address = '', port = ''): self.address = address self.port = port def runPost(self, app_name = None, access_key = None, secret = None, http_header = {}, http_params = {}): query, header = self.buildQuery(app_name = app_name, access_key = access_key, secret = secret, http_header = http_header, http_params = http_params) conn = httplib.HTTPConnection(self.address, self.port) conn.request(self.VERB, url = query, body = self.body_json.encode('utf-8'), headers = header) response = conn.getresponse() return response.status, response.getheaders(), response.read() def buildQuery(self, app_name = None, access_key = None, secret = None, http_header = {}, http_params = {}): uri = self.URI_PREFIX if app_name is not None: uri += app_name uri += '/{TABLE_NAME}/actions/bulk'.format(TABLE_NAME=self.TABLE_NAME) request_header = self.buildRequestHeader(uri = uri, access_key = access_key, secret = secret, http_params = http_params, http_header = http_header) return uri , request_header def buildAuthorization(self, uri, access_key, secret, http_params, request_header): canonicalized = self.VERB + '\n'\ + self.__getHeader(request_header, 'Content-MD5', hashlib.md5(self.body_json.encode('utf-8')).hexdigest()) + '\n' \ + self.__getHeader(request_header, 'Content-Type', '') + '\n' \ + self.__getHeader(request_header, 'Date', '') + '\n' \ + self.__canonicalizedHeaders(request_header) \ + self.__canonicalizedResource(uri, http_params) h = hmac.new(secret, canonicalized, sha1) signature = base64.encodestring(h.digest()).strip() return '%s %s%s%s' %(self.OS_PREFIX, access_key, ':', signature) def __getHeader(self, header, key, default_value = None): if key in header and header[key] is not None: return header[key] return default_value def __canonicalizedHeaders(self, request_header): header = {} for key, value in request_header.iteritems(): if key is None or value is None: continue k = key.strip(' \t') v = value.strip(' \t') if k.startswith('X-Opensearch-') and len(v) > 0: header[k] = v if len(header) == 0: return '' sorted_header = sorted(header.items(), key=lambda header: header[0]) canonicalized = '' for (key, value) in sorted_header: canonicalized += (key.lower() + ':' + value + '\n') return canonicalized def __canonicalizedResource(self, uri, http_params): canonicalized = urllib.quote(uri).replace('%2F', '/') sorted_params = sorted(http_params.items(), key = lambda http_params : http_params[0]) params = [] for (key, value) in sorted_params: if value is None or len(value) == 0: continue params.append(urllib.quote(key) + '=' + urllib.quote(value)) return canonicalized + '&'.join(params) def generateDate(self, format = "%Y-%m-%dT%H:%M:%SZ", timestamp = None): if timestamp is None: return time.strftime(format, time.gmtime()) else: return time.strftime(format, timestamp) def generateNonce(self): return str(int(time.time()*100)) + str(random.randint(1000, 9999)) def buildRequestHeader(self, uri, access_key, secret, http_params, http_header): request_header = copy.deepcopy(http_header) if 'Content-MD5' not in request_header: request_header['Content-MD5'] = hashlib.md5(self.body_json.encode('utf-8')).hexdigest() if 'Content-Type' not in request_header: request_header['Content-Type'] = 'application/json' if 'Date' not in request_header: request_header['Date'] = self.generateDate() if 'X-Opensearch-Nonce' not in request_header: request_header['X-Opensearch-Nonce'] = self.generateNonce() if 'Authorization' not in request_header: request_header['Authorization'] = self.buildAuthorization(uri, access_key, secret, http_params, request_header) key_del = [] for key, value in request_header.iteritems(): if value is None: key_del.append(key) for key in key_del: del request_header[key] return request_header if __name__ == '__main__': accesskey_id = '替换为AccessKeyId' accesskey_secret = '替换为secret' # 下面host地址,替换为访问对应应用api地址,例如华东1区 internet_host = 'opensearch-cn-hangzhou.aliyuncs.com' appname = '替换为应用名' api = V3Api(address = internet_host, port = '80') print api.runPost(app_name = appname, access_key=accesskey_id, secret=accesskey_secret, http_params={}, http_header={})