本文共 2993 字,大约阅读时间需要 9 分钟。
过去,在Cassandra中批量加载数据一直很困难。尽管Cassandra从一开始就具有BinaryMemtable接口,但是BinaryMemtable难以使用,并且与普通客户端写入相比,吞吐量有了较小的提高。
Cassandra 0.8.1引入了解决此问题的新工具: sstableloader有关最新信息,请参见 。
sstableloader 是一种稳定的数据文件处理工具,将已经生成好的数据流式传输到整个群集。它不是简单地将sstables复制到每一个节点,按照集群复制策略将文件分发给不同节点。从生产经验来看,此工具使用主要两个主要步骤:
如下加载csv文件(也可以是其他类型文件),解析后生成SSTable文件,注意:生成的文件路径必须按照一定规范存放
:…/keyspace/tableName/各种元数据文件。注意:生成表数据的主键列不允许重复
,以就是说相同主键的数据不允许重复addRow()。
// Prepare SSTable writer CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();// set output directory builder.inDirectory(outputDir) // set target schema .forTable(SCHEMA) // set CQL statement to put data .using(INSERT_STMT) // set partitioner if needed // default is Murmur3Partitioner so set if you use different one. .withPartitioner(new Murmur3Partitioner());CQLSSTableWriter writer = builder.build();// ...snip... while ((line = csvReader.read()) != null){ // We use Java types here based on // https://www.datastax.com/drivers/java/2.0/com/datastax/driver/core/DataType.Name.html#asJavaClass%28%29 writer.addRow(ticker, DATE_FORMAT.parse(line.get(0)), new BigDecimal(line.get(1)), new BigDecimal(line.get(2)), new BigDecimal(line.get(3)), new BigDecimal(line.get(4)), Long.parseLong(line.get(5)), new BigDecimal(line.get(6)));}writer.close();
可在,生成SSTable之后,您只需使用sstableloader以集群为目标,如前所述。cqlstablewriter仍然有一些限制,比如不能并行使用,或者还不支持用户定义的类型。版本不断更新迭代,请继续关注
进入cassandra的bin目录,找到sstableloader工具:
bin/sstableloader <dir_path> 详细的选项可以的介绍,一般常见的选项有: -d, –nodes 目标集群的nodes -u, –username 用户名 -pw, –password 密码 -t, –throttle 限速,单位Mbits/s (默认不限制) -cph, –connections-per-host 和每个节点建立多少连接调用:main(String[] args){ cmdExecute("/home/appuser/apache-cassandra-3.6.x/bin/sstableloader -d 192.168.0.1,192.168.0.2,192.168.0.3 -u cassuser -pw upassword -t 100 -cph 100 /home/output/keyspace/tableName");}private String cmdExecute(String commandLine){ Process process; try { ?process=Runtime.getRuntime().exec(commandLine); BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream())); String line; while ((line = br.readLine()) != null) { log.info(line); } int exitCode = process.waitFor(); log.info("exitCode = "+ exitCode); process.destoryForcibly(); }catch(Exception ex){ log. error("异常", ex); } return result; }
代码方式执行命令
方式进行调用。转载地址:http://mccpi.baihongyu.com/