# -- coding: utf-8 -- import time import random import hmac import base64 import copy import urllib.request, urllib.parse, urllib.error import http.client import collections import sys from hashlib import sha1 class V3Api: #定义变量 URI_PREFIX = '/v3/openapi/apps/' OS_PREFIX = 'OPENSEARCH' VERB = 'GET' def __init__(self, address = '', port = ''): self.address = address self.port = port def runQuery(self, app_name = None, access_key = None, secret = None, http_header = {}, http_params = {}): query, header = self.buildQuery(app_name = app_name, access_key = access_key, secret = secret, http_header = http_header, http_params = http_params) conn = http.client.HTTPConnection(self.address, self.port) conn.request(self.VERB, url = query, headers = header) response = conn.getresponse() return response.status, response.getheaders(), response.read() def buildQuery(self, app_name = None, access_key = None, secret = None, http_header = {}, http_params = {}): uri = self.URI_PREFIX if app_name is not None: uri += app_name param = [] for key, value in http_params.items(): param.append(urllib.parse.quote(key) + '=' + urllib.parse.quote(value)) query = ('&'.join(param)) request_header = self.buildRequestHeader(uri = uri, access_key = access_key, secret = secret, http_params = http_params, http_header = http_header) return uri + query, request_header # 此处为签名代码实现 def buildAuthorization(self, uri, access_key, secret, http_params, request_header): canonicalized = self.VERB + '\n'\ + self.__getHeader(request_header, 'Content-MD5', '') + '\n' \ + self.__getHeader(request_header, 'Content-Type', '') + '\n' \ + self.__getHeader(request_header, 'Date', '') + '\n' \ + self.__canonicalizedHeaders(request_header) \ + self.__canonicalizedResource(uri, http_params) signature_hmac = hmac.new(secret.encode('utf-8'), canonicalized.encode('utf-8'), 'sha1') signature = base64.b64encode(signature_hmac.digest()) return '%s %s%s%s' %(self.OS_PREFIX, access_key, ':', signature.decode('utf-8')) def __getHeader(self, header, key, default_value = None): if key in header and header[key] is not None: return header[key] return default_value def __canonicalizedResource(self, uri, http_params): canonicalized = urllib.parse.quote(uri).replace('%2F', '/') sorted_params = sorted(list(http_params.items()), key = lambda http_params : http_params[0]) params = [] for (key, value) in sorted_params: if value is None or len(value) == 0: continue params.append(urllib.parse.quote(key) + '=' + urllib.parse.quote(value)) return canonicalized + '&'.join(params) def generateDate(self, format = "%Y-%m-%dT%H:%M:%SZ", timestamp = None): if timestamp is None: return time.strftime(format, time.gmtime()) else: return time.strftime(format, timestamp) def generateNonce(self): return str(int(time.time()*100)) + str(random.randint(1000, 9999)) def __canonicalizedHeaders(self, request_header): header = {} for key, value in request_header.items(): if key is None or value is None: continue k = key.strip(' \t') v = value.strip(' \t') if k.startswith('X-Opensearch-') and len(v) > 0: header[k] = v if len(header) == 0: return '' sorted_header = sorted(list(header.items()), key=lambda header: header[0]) canonicalized = '' for (key, value) in sorted_header: canonicalized += (key.lower() + ':' + value + '\n') return canonicalized # 构建Request Header def buildRequestHeader(self, uri, access_key, secret, http_params, http_header): request_header = copy.deepcopy(http_header) if 'Content-MD5' not in request_header: request_header['Content-MD5'] = '' if 'Content-Type' not in request_header: request_header['Content-Type'] = 'application/json' if 'Date' not in request_header: request_header['Date'] = self.generateDate() if 'X-Opensearch-Nonce' not in request_header: request_header['X-Opensearch-Nonce'] = self.generateNonce() if 'Authorization' not in request_header: request_header['Authorization'] = self.buildAuthorization(uri, access_key, secret, http_params, request_header) key_del = [] for key, value in request_header.items(): if value is None: key_del.append(key) for key in key_del: del request_header[key] return request_header if __name__ == '__main__': accesskey_id = '替换为AccessKeyId' accesskey_secret = '替换为secret' # 下面host地址,替换为访问对应应用api地址,例如华东1区 internet_host = 'opensearch-cn-hangzhou.aliyuncs.com' appname = '替换为应用ID 或 应用名' api = V3Api(address = internet_host, port = '80') #下面为设置查询信息 query_subsentences = { } print(api.runQuery(app_name = appname, access_key=accesskey_id, secret=accesskey_secret, http_params=query_subsentences, http_header={}))