Tag: bigdata

Neo4j关系指数 – 搜索关系属性

我有一个具有以下结构的neo4j图。 (账户)— [交易] —(账户) Transaction是一个neo4j关系,Account是一个节点。 每个交易都设置了各种属性,例如交易ID,金额,日期和各种其他银行信息。 我可以通过帐户ID运行搜索,并返回正常。 但是,当我按事务ID搜索时,neo4J搜索整个图形而不是使用索引,搜索失败。 我使用org.neo4j.unsafe.batchinsert.BatchInserterImpl.createDeferredSchemaIndex()为Account.number和Transaction.txid创建了索引。 该索引似乎适用于帐户(节点)搜索,但不适用于事务(关系)搜索。 (我也为节点和关系启用了自动索引,但它没有改变的东西) 我认为不支持关系属性的索引,因此考虑使中间节点保存属性信息。 但是,如果可能的话,我更愿意坚持我原来的设计。 知道怎么办吗?

在Map中跳过.csv的第一行会减少java

由于映射器函数针对每一行运行,我是否可以知道如何跳过第一行。 对于某些文件,它包含我想忽略的列标题

如何在Java Spark RDD上执行标准偏差和平均操作?

我有一个看起来像这样的JavaRDD。 [ [A,8] [B,3] [C,5] [A,2] [B,8] … … ] 我希望我的结果是卑鄙的 [ [A,5] [B,5.5] [C,5] ] 如何仅使用Java RDD执行此操作。 PS:我想避免groupBy操作,所以我没有使用DataFrames。

Flink Streaming:如何根据数据将一个数据流输出到不同的输出?

在Apache Flink中,我有一组元组。 让我们假设一个非常简单的Tuple1 。 元组可以在其值字段中具有任意值(例如,’P1’,’P2’等)。 可能值的集合是有限的,但我事先并不知道全集(因此可能存在’P362’)。 我想根据元组内部的值将该元组写入某个输出位置。 所以我希望有以下文件结构: /output/P1 /output/P2 在文档中我只发现了写入我事先知道的位置的可能性(例如stream.writeCsv(“/output/somewhere”) ),但没有办法让数据的内容决定数据实际结束的位置。 我在文档中读到了关于输出拆分的内容,但这似乎没有提供一种方法将输出重定向到我希望拥有它的方式(或者我只是不明白这是如何工作的)。 可以使用Flink API完成,如果是这样,怎么做? 如果没有,是否可能有第三方图书馆可以做到这一点,还是我必须自己构建这样的东西? 更新 按照Matthias的建议,我想出了一个筛选接收函数,它确定输出路径,然后在序列化后将元组写入相应的文件。 我把它放在这里供参考,也许对其他人有用: public class SiftingSinkFunction extends RichSinkFunction { private final OutputSelector outputSelector; private final MapFunction serializationFunction; private final String basePath; Map<String, TextOutputFormat> formats = new HashMap(); /** * @param outputSelector the selector which determines into which output(s) a […]

反向排序减速键

将Map Output键以相反的顺序输入减速器的最佳方法是什么? 默认情况下,reducer按键的升序接收所有键。 任何帮助或评论广泛赞赏。 简单来说,在正常情况下,如果地图发出密钥1,4,3,5,2,则减速器接收与1,2,3,4,5相同的密钥。 我希望减速机能够获得5,4,3,2,1 。

为什么Kafka消费者表现缓慢?

我有一个简单的主题,一个简单的Kafka使用者和生产者,使用默认配置。 程序很简单,我有两个线程。 在生产者中,它不断发送16个字节的数据。 在消费者方面,它不断接收。 我发现生产者的吞吐量大约是10MB / s,这很好。 但消费者的吞吐量仅为0.2MB / s。 我已经禁用了所有的调试日志,但这并没有让它变得更好。 测试在本地计算机上运行。 什么机构都知道出了什么问题? 谢谢! 我使用的代码如下:制片人: KafkaProducer producer = new KafkaProducer(props); int size = 16; byte[] payload = new byte[size]; String key = “key”; Arrays.fill(payload, (byte) 1); ProducerRecord record = new ProducerRecord(“test”,0,key.getBytes(),payload); while(true){ producer.send(record); } 消费者: Properties consumerProps = new Properties(); consumerProps.put(“zookeeper.connect”, “localhost:2181”); consumerProps.put(“group.id”, “test”); ConsumerConnector […]

如何生成海量数据?

我正在用nutch和hadoop做一些测试,我需要大量的数据。 我想从20GB开始,到100GB,500GB,最终达到1-2TB。 问题是我没有这么多的数据,所以我正在考虑如何制作它。 数据本身可以是任何类型。 一个想法是获取一组初始数据并复制它。 但它不够好,因为需要彼此不同的文件(相同的文件被忽略)。 另一个想法是编写一个程序,用于创建具有虚拟数据的文件。 还有其他想法吗?

如何使用spark处理一系列hbase行?

我正在尝试使用HBase作为spark的数据源。 因此,第一步是从HBase表创建RDD。 由于Spark使用hadoop输入格式,我可以通过创建rdd找到一种使用所有行的方法http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25 / lighting-a-spark-with-hbase但我们如何为范围扫描创建RDD? 欢迎所有建议。

如何在Kafka中使用多个消费者?

我是一名学习卡夫卡的新学生,我遇到了一些基本问题,理解多个消费者,文章,文档等对目前来说都没有太大的帮助。 我试图做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,向主题发布100条简单消息并让我的消费者检索它们。 我已经成功地做到了这一点,但是当我尝试引入第二个消费者来消费刚刚发布消息的同一主题时,它不会收到任何消息。 我的理解是,对于每个主题,您可以拥有来自不同消费者群体的消费者,并且每个消费者群体都可以获得针对某个主题生成的消息的完整副本。 它是否正确? 如果没有,那么建立多个消费者的正确方法是什么? 这是我到目前为止写的消费者类: public class AlternateConsumer extends Thread { private final KafkaConsumer consumer; private final String topic; private final Boolean isAsync = false; public AlternateConsumer(String topic, String consumerGroup) { Properties properties = new Properties(); properties.put(“bootstrap.servers”, “localhost:9092”); properties.put(“group.id”, consumerGroup); properties.put(“partition.assignment.strategy”, “roundrobin”); properties.put(“enable.auto.commit”, “true”); properties.put(“auto.commit.interval.ms”, “1000”); properties.put(“session.timeout.ms”, “30000”); properties.put(“key.deserializer”, “org.apache.kafka.common.serialization.IntegerDeserializer”); properties.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); consumer […]

如何在hadoop中序列化对象(在HDFS中)

我有一个HashMap <String,ArrayList >。 我想将我的HashMap对象(hmap)序列化为HDFS位置,然后在Mapper和Reducers中对其进行反序列化以便使用它。 为了在HDFS上序列化我的HashMap对象,我使用了普通的java对象序列化代码,如下所示但是出错了(权限被拒绝) try { FileOutputStream fileOut =new FileOutputStream(“hashmap.ser”); ObjectOutputStream out = new ObjectOutputStream(fileOut); out.writeObject(hm); out.close(); } catch(Exception e) { e.printStackTrace(); } 我得到以下exception java.io.FileNotFoundException: hashmap.ser (Permission denied) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:110) at KMerIndex.createIndex(KMerIndex.java:121) at MyDriverClass.formRefIndex(MyDriverClass.java:717) at MyDriverClass.main(MyDriverClass.java:768) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.util.RunJar.run(RunJar.java:221) at org.apache.hadoop.util.RunJar.main(RunJar.java:136) […]