/** * Copyright (c) 2012 YCSB contributors. All rights reserved. *

* Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. *

* Redis client binding for YCSB. *

* All YCSB records are mapped to a Redis *hash field*. For scanning * operations, all keys are saved (by an arbitrary hash) in a sorted set. *

* Redis client binding for YCSB. *

* All YCSB records are mapped to a Redis *hash field*. For scanning * operations, all keys are saved (by an arbitrary hash) in a sorted set. */ /** * Redis client binding for YCSB. * * All YCSB records are mapped to a Redis *hash field*. For scanning * operations, all keys are saved (by an arbitrary hash) in a sorted set. */ package site.ycsb.db; import redis.clients.jedis.Tuple; import site.ycsb.ByteArrayByteIterator; import site.ycsb.ByteIterator; import site.ycsb.DB; import site.ycsb.DBException; import site.ycsb.Status; import redis.clients.jedis.BasicCommands; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisCommands; import redis.clients.jedis.Protocol; import site.ycsb.StringByteIterator; import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.HashSet; import java.util.Properties; import java.util.Set; import java.util.Vector; import static java.nio.charset.StandardCharsets.UTF_8; /** * YCSB binding for Redis. * * See {@code redis/README.md} for details. */ public class RedisClient extends DB { public static final String HOST_PROPERTY = "redis.host"; public static final String PORT_PROPERTY = "redis.port"; public static final String PASSWORD_PROPERTY = "redis.password"; public static final String CLUSTER_PROPERTY = "redis.cluster"; public static final String COMMAND_PROPERTY = "redis.command_group"; public static final String TIMEOUT_PROPERTY = "redis.timeout"; public static final String DEBUG_PROPERTY = "redis.debug"; public static final String RANGE_PROPERTY = "redis.range"; public static final String FIELD_COUNT = "fieldcount"; public static final String FIELD_LENGTH = "fieldlength"; /** the default command group is "string", it can be {string, hash}. */ public enum CommandType { COMMAND_STRING, COMMAND_HASH, COMMAND_LIST, COMMAND_SET, COMMAND_ZSET } private static final String STRING_COMMAND = "string"; private static final String HASH_COMMAND = "hash"; private static final String LIST_COMMAD = "list"; private static final String SET_COMMAND = "set"; private static final String ZSET_COMMAND = "zset"; private static final String RETURN_OK = "OK"; private static final Set COMMAND_TABLE = new HashSet() { { add(STRING_COMMAND); add(HASH_COMMAND); add(LIST_COMMAD); add(SET_COMMAND); add(ZSET_COMMAND); } }; private JedisCommands jedis; private String commandGroup = STRING_COMMAND; private CommandType commandType = CommandType.COMMAND_STRING; private int fieldCount = 0; private int fieldLength = 0; private int timeout = 2000; private Boolean debugMode = false; private int[] range = null; private static CommandType convertToCommandType(String command) throws DBException { if (command.equals(STRING_COMMAND)) { return CommandType.COMMAND_STRING; } else if (command.equals(HASH_COMMAND)) { return CommandType.COMMAND_HASH; } else if (command.equals(LIST_COMMAD)) { return CommandType.COMMAND_LIST; } else if (command.equals(SET_COMMAND)) { return CommandType.COMMAND_SET; } else if (command.equals(ZSET_COMMAND)) { return CommandType.COMMAND_ZSET; } else { throw new DBException(String.format("command %s is invalid", command)); } } @Override public void init() throws DBException { Properties props = getProperties(); int port; String debugModeString = props.getProperty(DEBUG_PROPERTY); if (debugModeString != null) { debugMode = Boolean.parseBoolean(debugModeString); } String rangeString = props.getProperty(RANGE_PROPERTY); if (rangeString != null) { range = new int[2]; String[] r = rangeString.split(","); assert r.length == 2; range[0] = Integer.parseInt(r[0]); range[1] = Integer.parseInt(r[1]); } String fieldCountString = props.getProperty(FIELD_COUNT); if (fieldCountString != null) { fieldCount = Integer.parseInt(fieldCountString); } String fieldLengthString = props.getProperty(FIELD_LENGTH); if (fieldLengthString != null) { fieldLength = Integer.parseInt(fieldLengthString); } String portString = props.getProperty(PORT_PROPERTY); if (portString != null) { port = Integer.parseInt(portString); } else { port = Protocol.DEFAULT_PORT; } String host = props.getProperty(HOST_PROPERTY); String timeoutString = props.getProperty(TIMEOUT_PROPERTY); if (timeoutString != null) { timeout = Integer.parseInt(timeoutString); } boolean clusterEnabled = Boolean.parseBoolean(props.getProperty(CLUSTER_PROPERTY)); if (clusterEnabled) { Set jedisClusterNodes = new HashSet<>(); jedisClusterNodes.add(new HostAndPort(host, port)); jedis = new JedisCluster(jedisClusterNodes, timeout); } else { jedis = new Jedis(host, port, timeout); ((Jedis) jedis).connect(); } String password = props.getProperty(PASSWORD_PROPERTY); if (password != null) { ((BasicCommands) jedis).auth(password); } commandGroup = props.getProperty(COMMAND_PROPERTY, STRING_COMMAND); if (commandGroup == null) { throw new DBException(String.format("the command group is illegal")); } if (!COMMAND_TABLE.contains(commandGroup)) { System.out.println("the command group should be in " + COMMAND_TABLE); throw new DBException(String.format("the command group is invalid, commandGroup: %s", commandGroup)); } commandType = convertToCommandType(commandGroup); if (debugMode) { System.out.println(String.format("Properties: %s", props.toString())); } } @Override public void cleanup() throws DBException { try { ((Closeable) jedis).close(); } catch (IOException e) { throw new DBException("Closing connection failed."); } } // XXX jedis.select(int index) to switch to `table` @Override public Status read(String table, String key, Set fields, Map result) { assert fields == null; switch (commandType) { case COMMAND_STRING: if (fields == null) { String fieldValue = jedis.get(key); if (debugMode) { System.out.println(String.format("get key:%s fields:%s, vals:%s", key, fields, fieldValue)); } if (fieldValue != null && !fieldValue.isEmpty()) { return Status.OK; } return Status.NOT_FOUND; } break; case COMMAND_HASH: if (fields == null) { // hgetAll Map values = jedis.hgetAll(key); if (debugMode) { System.out.println(String.format("hgetAll key:%s, vals:%s", key, values)); } if (values != null && !values.isEmpty()) { if (values.size() == fieldCount) { return Status.OK; } else { return Status.ERROR; } } return Status.NOT_FOUND; } break; case COMMAND_LIST: if (fields == null) { // lrange int startIdx = 0; int endIdx = -1; if (range != null) { startIdx = range[0]; endIdx = range[1]; } List values = jedis.lrange(key, startIdx, endIdx); if (debugMode) { System.out.println(String.format("lrange key:%s start:%d end:%d, vals:%s", key, startIdx, endIdx, values)); } if (values != null && !values.isEmpty()) { return Status.OK; } return Status.NOT_FOUND; } break; case COMMAND_SET: if (fields == null) { // smembers Set members = jedis.smembers(key); if (debugMode) { System.out.println(String.format("smembers key:%s, vals:%s", key, members)); } if (members != null && !members.isEmpty()) { return Status.OK; } return Status.NOT_FOUND; } break; case COMMAND_ZSET: if (fields == null) { // zrangeWithScores int startIdx = 0; int endIdx = -1; if (range != null) { startIdx = range[0]; endIdx = range[1]; } Set members = jedis.zrangeWithScores(key, startIdx, endIdx); if (debugMode) { System.out.println(String.format("zrangeWithScores key:%s start:%d end:%d, vals:%s", key, startIdx, endIdx, members)); } if (members != null && !members.isEmpty()) { return Status.OK; } return Status.NOT_FOUND; } break; default: } return Status.ERROR; } public Status insertHelper(String table, String key, Map values, boolean update) { switch (commandType) { case COMMAND_STRING: assert values.size() == 1; // set Map.Entry field0 = values.entrySet().iterator().next(); String field0Value = field0.getValue().toString(); if (debugMode) { System.out.println(String.format("set key:%s val:%s", key, field0Value)); } String stringRet = jedis.set(key, field0Value); if (stringRet.equals(RETURN_OK)) { return Status.OK; } break; case COMMAND_HASH: // hmset Map fieldValues = StringByteIterator.getStringMap(values); if (debugMode) { System.out.println(String.format("hmset key:%s fields:%s", key, fieldValues)); } if (update) { assert values.size() == 1; } String hashRet = jedis.hmset(key, fieldValues); if (hashRet.equals(RETURN_OK)) { return Status.OK; } break; case COMMAND_LIST: // lpush String[] listValues = getStringArrayFromMapValues(values); if (debugMode) { System.out.println(String.format("lpush key:%s val:%s", key, Arrays.toString(listValues))); } Long listRet = jedis.lpush(key, listValues); if (update) { assert(values.size() == 1); String popValue = jedis.rpop(key); if (popValue == null) { System.out.println("rpop error"); return Status.ERROR; } } return Status.OK; case COMMAND_SET: // sadd String[] members = getStringArrayFromMapValues(values); if (debugMode) { System.out.println(String.format("sadd key:%s members:%s", key, Arrays.toString(members))); } long addNumSet = jedis.sadd(key, members); if (update) { assert members.length == 1; if (addNumSet == 1) { String popMember = jedis.spop(key); if (popMember == null) { return Status.ERROR; } } else if (addNumSet > 1) { System.err.println("Unexpect error"); return Status.ERROR; } } return Status.OK; case COMMAND_ZSET: // zadd Map scoreMembers = getStringScoreMapFromMapValues(values); if (debugMode) { System.out.println(String.format("zadd key:%s scoreMembers:%s", key, scoreMembers)); } long addNumZset = jedis.zadd(key, scoreMembers); if (update) { assert scoreMembers.size() == 1; if (addNumZset == 1) { long removeNum = jedis.zremrangeByRank(key, fieldCount, fieldCount); if (removeNum != 1) { return Status.ERROR; } } else if (addNumZset > 1) { System.err.println("Unexpect error"); return Status.ERROR; } } return Status.OK; default: } return Status.ERROR; } @Override public Status insert(String table, String key, Map values) { return insertHelper(table, key, values, false); } @Override public Status delete(String table, String key) { return jedis.del(key) == 0 ? Status.ERROR : Status.OK; } @Override public Status update(String table, String key, Map values) { return insertHelper(table, key, values, true); } @Override public Status scan(String table, String startkey, int recordcount, Set fields, Vector> result) { return Status.NOT_IMPLEMENTED; } /* * Calculate a hash for a key to store it in an index. The actual return value * of this function is not interesting -- it primarily needs to be fast and * scattered along the whole space of doubles. In a real world scenario one * would probably use the ASCII values of the keys. */ private static int hash32(String key) { return key.hashCode(); } private static String[] getStringArrayFromMapValues(Map m) { String[] ret = new String[m.size()]; int i = 0; for (Map.Entry entry : m.entrySet()) { ret[i] = entry.getValue().toString(); ++i; } return ret; } private static Map getStringScoreMapFromMapValues(Map m) { Map ret = new HashMap<>(m.size()); int i = 0; for (Map.Entry entry : m.entrySet()) { String member = entry.getValue().toString(); Double score = Math.random(); ret.put(member, score); } return ret; } private String serializeValues(final Map values) throws IOException { try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { final ByteBuffer buf = ByteBuffer.allocate(4); for (final Map.Entry value : values.entrySet()) { // map kv to record[key_len(4) + key + value_len(4) + value] final byte[] keyBytes = value.getKey().getBytes(UTF_8); final byte[] valueBytes = value.getValue().toArray(); buf.putInt(keyBytes.length); baos.write(buf.array()); baos.write(keyBytes); buf.clear(); buf.putInt(valueBytes.length); baos.write(buf.array()); baos.write(valueBytes); buf.clear(); } return new String(buf.array(), UTF_8); } } private Map deserializeValues(final String valueString, final Set fields, final Map result) { byte[] values = valueString.getBytes(UTF_8); final ByteBuffer buf = ByteBuffer.allocate(4); int offset = 0; while (offset < values.length) { buf.put(values, offset, 4); buf.flip(); final int keyLen = buf.getInt(); buf.clear(); offset += 4; final String key = new String(values, offset, keyLen, UTF_8); offset += keyLen; buf.put(values, offset, 4); buf.flip(); final int valueLen = buf.getInt(); buf.clear(); offset += 4; if (fields == null || fields.contains(key)) { result.put(key, new ByteArrayByteIterator(values, offset, valueLen)); } offset += valueLen; } return result; } }