在Java Maven项目中通过Spark查询Cassandra中的数据

2023-12-07

我正在尝试编写一个简单的代码,在其中创建一个架构,插入一些表,然后提取一些信息并将其打印出来。但是,我收到错误。我正在使用 Datastax cassandra Spark 连接器。我一直在使用这两个例子来帮助我尝试实现这一目标:

https://gist.github.com/jacek-lewandowski/278bfc936ca990bee35a

http://www.datastax.com/documentation/developer/java-driver/1.0/java-driver/quick_start/qsSimpleClientAddSession_t.html

但是,第二个示例不使用 cassandra Spark 连接器或一般 Spark。

这是我的代码:

package com.angel.testspark.test;


import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.google.common.base.Optional;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;

import scala.Tuple2;

import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App 
{
    private transient SparkConf conf;

    private App(SparkConf conf) {
        this.conf = conf;
    }
    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {
        CassandraConnector connector = CassandraConnector.apply(sc.getConf());

        // Prepare the schema
        try (Session session = connector.openSession()) {
            session.execute("DROP KEYSPACE IF EXISTS tester");
            session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
            session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)");
            session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");       

            session.execute(
                      "INSERT INTO tester.emp (id, fname, lname, role) " +
                      "VALUES (" +
                          "0001," +
                          "'Angel'," +
                          "'Pay'," +
                          "'IT Engineer'" +
                          ");");
            session.execute(
                      "INSERT INTO tester.emp (id, fname, lname, role) " +
                      "VALUES (" +
                          "0002," +
                          "'John'," +
                          "'Doe'," +
                          "'IT Engineer'" +
                          ");");
            session.execute(
                      "INSERT INTO tester.emp (id, fname, lname, role) " +
                      "VALUES (" +
                          "0003," +
                          "'Jane'," +
                          "'Doe'," +
                          "'IT Analyst'" +
                          ");");
                session.execute(
                      "INSERT INTO tester.dept (id, dname) " +
                      "VALUES (" +
                          "1553," +
                          "'Commerce'" +
                          ");");

                ResultSet results = session.execute("SELECT * FROM tester.emp " +
                        "WHERE role = 'IT Engineer';");
            for (Row row : results) {
                System.out.print(row.getString("fname"));
                System.out.print(" ");
                System.out.print(row.getString("lname"));
                System.out.println(); 
            }
                System.out.println();
            }

        }

    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}

这是我的错误:

14/09/18 11:22:18 WARN util.Utils: Your hostname, APAY-M-R03K resolves to a loopback address: 127.0.0.1; using 10.150.79.164 instead (on interface en0)
14/09/18 11:22:18 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
14/09/18 11:22:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/09/18 11:22:18 INFO Remoting: Starting remoting
14/09/18 11:22:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:50506]
14/09/18 11:22:18 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:50506]
14/09/18 11:22:18 INFO spark.SparkEnv: Registering BlockManagerMaster
14/09/18 11:22:18 INFO storage.DiskBlockManager: Created local directory at /var/folders/57/8s5fx3ks06bd2rzkq7yg1xs40000gn/T/spark-local-20140918112218-2c8d
14/09/18 11:22:18 INFO storage.MemoryStore: MemoryStore started with capacity 2.1 GB.
14/09/18 11:22:18 INFO network.ConnectionManager: Bound socket to port 50507 with id = ConnectionManagerId(10.150.79.164,50507)
14/09/18 11:22:18 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/09/18 11:22:18 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager 10.150.79.164:50507 with 2.1 GB RAM
14/09/18 11:22:18 INFO storage.BlockManagerMaster: Registered BlockManager
14/09/18 11:22:18 INFO spark.HttpServer: Starting HTTP Server
14/09/18 11:22:18 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:18 INFO server.AbstractConnector: Started [email protected]:50508
14/09/18 11:22:18 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.150.79.164:50508
14/09/18 11:22:19 INFO spark.SparkEnv: Registering MapOutputTracker
14/09/18 11:22:19 INFO spark.HttpFileServer: HTTP File server directory is /var/folders/57/8s5fx3ks06bd2rzkq7yg1xs40000gn/T/spark-a0dc4491-1901-4a7a-86f4-4adc181fe45c
14/09/18 11:22:19 INFO spark.HttpServer: Starting HTTP Server
14/09/18 11:22:19 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:19 INFO server.AbstractConnector: Started [email protected]:50509
14/09/18 11:22:19 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/09/18 11:22:19 INFO server.AbstractConnector: Started [email protected]:4040
14/09/18 11:22:19 INFO ui.SparkUI: Started Spark Web UI at http://10.150.79.164:4040
14/09/18 11:22:19 WARN core.FrameCompressor: Cannot find LZ4 class, you should make sure the LZ4 library is in the classpath if you intend to use it. LZ4 compression will not be available for the protocol.
14/09/18 11:22:19 INFO core.Cluster: New Cassandra host /127.0.0.1:9042 added
14/09/18 11:22:19 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" com.datastax.driver.core.exceptions.InvalidQueryException: No indexed columns present in by-columns clause with Equal operator
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:172)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
    at com.sun.proxy.$Proxy6.execute(Unknown Source)
    at com.angel.testspark.test.App.createSchema(App.java:85)
    at com.angel.testspark.test.App.run(App.java:38)
    at com.angel.testspark.test.App.main(App.java:109)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: No indexed columns present in by-columns clause with Equal operator
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:108)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
    at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:367)
    at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:584)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
14/09/18 11:22:20 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

我相信这可能只是一个语法错误,我只是不确定它在哪里以及是什么。

任何帮助都会很棒,谢谢。我搜索过互联网,但没有找到使用 cassandra 和 Spark 在 java 中插入数据和提取数据的简单示例。

******编辑:@BryceAtNetwork23 和 @mikea 对于我的语法错误是正确的,所以我编辑了问题并修复了它。我收到了一个新错误,所以我粘贴了新错误并更新了代码


尝试通过 cqlsh 运行 CQL,您应该会得到相同/类似的错误:

aploetz@cqlsh:stackoverflow> CREATE TABLE dept (id INT PRIMARY KEY, dname TEXT);
aploetz@cqlsh:stackoverflow> INSERT INTO dept (id, dname) VALUES (1553,Commerce);
<ErrorMessage code=2000 [Syntax error in CQL query] message="line 1:50 no viable alternative at
input ')' (... dname) VALUES (1553,Commerce[)]...)">

在“Commerce”两边加上单引号,它应该可以工作:

session.execute(
                  "INSERT INTO tester.dept (id, dname) " +
                  "VALUES (" +
                      "1553," +
                      "'Commerce'" +
                      ");");

但现在我收到了一个新错误...

还可以尝试从 cqlsh 运行它。

aploetz@cqlsh:stackoverflow> SELECT * FROM emp WHERE role = 'IT Engineer';
code=2200 [Invalid query] message="No indexed columns present in by-columns clause with Equal operator"

发生这种情况是因为role未定义为您的主键。 Cassandra 不允许您按任意列值进行查询。解决这个问题的最佳方法是创建一个名为 empByRole 的附加查询表,其中role作为分区键。像这样:

CREATE TABLE empByRole 
    (id INT, fname TEXT, lname TEXT, role TEXT,
    PRIMARY KEY (role,id)
);

aploetz@cqlsh:stackoverflow> INSERT INTO empByRole (id, fname, lname, role) VALUES (0001,'Angel','Pay','IT Engineer');
aploetz@cqlsh:stackoverflow> SELECT * FROM empByRole WHERE role = 'IT Engineer';

 role        | id | fname | lname
-------------+----+-------+-------
 IT Engineer |  1 | Angel |   Pay

(1 rows)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在Java Maven项目中通过Spark查询Cassandra中的数据 的相关文章

随机推荐