Spark与Cassandra输入/输出

想象一下以下场景:Spark应用程序(Java实现)正在使用Cassandra数据库加载,转换为RDD并处理数据。 该应用程序还从数据库中蒸出新数据,这些数据也由自定义接收器处理。 流处理的输出存储在数据库中。 该实现使用Spring Data Cassandra与数据库集成。

CassandraConfig:

@Configuration @ComponentScan(basePackages = {"org.foo"}) @PropertySource(value = { "classpath:cassandra.properties" }) public class CassandraConfig { @Autowired private Environment env; @Bean public CassandraClusterFactoryBean cluster() { CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean(); cluster.setContactPoints(env.getProperty("cassandra.contactpoints")); cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port"))); return cluster; } @Bean public CassandraMappingContext mappingContext() { return new BasicCassandraMappingContext(); } @Bean public CassandraConverter converter() { return new MappingCassandraConverter(mappingContext()); } @Bean public CassandraSessionFactoryBean session() throws Exception { CassandraSessionFactoryBean session = new CassandraSessionFactoryBean(); session.setCluster(cluster().getObject()); session.setKeyspaceName(env.getProperty("cassandra.keyspace")); session.setConverter(converter()); session.setSchemaAction(SchemaAction.NONE); return session; } @Bean public CassandraOperations cassandraTemplate() throws Exception { return new CassandraTemplate(session().getObject()); } } 

DataProcessor.main方法:

 // Initialize spring application context ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class); ApplicationContextHolder.setApplicationContext(applicationContext); CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class); // Initialize spark context SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(conf); // Load data pages List pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class); // Parallelize the first page JavaRDD rddBuffer = sc.parallelize(pagingResults); while(pagingResults != null && !pagingResults.isEmpty()) { Event lastEvent = pagingResults.get(pagingResults.size() - 1); pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class); // Parallelize page and add to the existing rddBuffer = rddBuffer.union(sc.parallelize(pagingResults)); } // data processing ... 

预计初始加载会有大量数据。 因此,数据在rddBuffer中进行分页,加载和分发。

还有以下选项:

  1. Spark-Cassandra示例( https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala ),尽管文档数量最少这个例子。
  2. Calliope项目( http://tuplejump.github.io/calliope/ )

我想知道Spark与Cassandra集成的最佳实践是什么。 在我的实施中,最好的选择是什么?

Apache Spark 1.0.0,Apache Cassandra 2.0.8

使用Cassandra和Spark的最简单方法是使用DataStax开发的Spark官方开源Cassandra驱动程序: https : //github.com/datastax/spark-cassandra-connector

这个驱动程序是在Cassandra Java Driver之上构建的,它提供了Cassandra和Spark之间的直接桥梁。 与Calliope不同,它不使用Hadoop接口。 此外,它还提供以下独特function:

  • 支持所有Cassandra数据类型,包括集合,开箱即用
  • 将Cassandra行轻量映射到自定义类或元组,而无需在Scala中使用任何implicits或其他高级function
  • 将任何RDD保存到Cassandra
  • 完全支持Cassandra虚拟节点
  • 能够在服务器端过滤/选择,例如利用Cassandra集群列或二级索引

上面代码中的方法是一种经典的集中式算法,只有在一个节点中执行时才能工作。 Cassandra和Spark都是分布式系统,因此有必要对流程进行建模,使其可以分布在多个节点中。

可能的方法很少:如果您知道要获取的行的键,您可以执行以下简单操作:(使用DataStax Java驱动程序)

 val data = sparkContext.parallelize(keys).map{key => val cluster = val cluster = Cluster.builder.addContactPoint(host).build() val session = cluster.connect(keyspace) val statement = session.prepare("...cql...);") val boundStatement = new BoundStatement(sttmt) session.execute(session.execute(boundStatement.bind(...data...) } 

这将有效地分配跨Spark集群的密钥提取。 请注意如何在闭包内完成与C *的连接,因为这可确保在每个单独的分布式工作程序上执行任务时建立连接。

鉴于您的示例使用通配符(即密钥未知),使用Cassandra的Hadoop接口是一个不错的选择。 问题中链接的Spark-Cassandra示例说明了在Cassandra上使用此Hadoop接口。

Calliope是一个通过提供访问该function的简单API来封装使用Hadoop接口的复杂性的库。 它仅在Scala中可用,因为它使用特定的Scalafunction(如即将发布的版本中的implicits和宏)。使用Calliope,您基本上可以声明如何将RDD [type]转换为行键和行值,并且Calliope负责配置hadoop接口到作业。 我们发现Calliope(以及底层的hadoop接口)比使用驱动程序与Cassandra交互快2-4倍。

结论:我将离开Spring-Data配置来访问Cassandra,因为这将限制您到单个节点。 如果可能,请考虑使用简单的并行访问,或者在Scala中使用Calliope进行探索。