0

我正在使用给定的插入语句创建一个 VoltDB 表

CREATE TABLE EMPLOYEE (
    ID VARCHAR(4) NOT NULL,
    CODE VARCHAR(4) NOT NULL,
    FIRST_NAME VARCHAR(30) NOT NULL,
    LAST_NAME VARCHAR(30) NOT NULL,
    PRIMARY KEY (ID, CODE)
);

并用

PARTITION TABLE EMPLOYEE ON COLUMN ID;

我已经编写了一个 Spark 作业来将数据插入 VoltDB,我正在使用下面的 scala 代码将记录插入 VoltDB,如果我们不对表进行分区,代码效果很好

import org.voltdb._;
import org.voltdb.client._;
import scala.collection.JavaConverters._

val voltClient:Client = ClientFactory.createClient();
voltClient.createConnection("IP:PORT");

val empDf = spark.read.format("csv")
          .option("inferSchema", "true")
          .option("header", "true")
          .option("sep", ",")
          .load("/FileStore/tables/employee.csv")

// Code to convert scala seq to java varargs
def callProcedure(procName: String, parameters: Any*): ClientResponse =
    voltClient.callProcedure(procName, paramsToJavaObjects(parameters: _*): _*)

def paramsToJavaObjects(params: Any*) = params.map { param ⇒
    val value = param match {
      case None    ⇒ null
      case Some(v) ⇒ v
      case _       ⇒ param
    }
    value.asInstanceOf[AnyRef]
}

empDf.collect().foreach { row =>
  callProcedure("EMPLOYEE.insert", row.toSeq:_*);
}

但是如果我对表进行分区,我会得到以下错误

Mispartitioned tuple in single-partition insert statement.
Constraint Type PARTITIONING, Table CatalogId EMPLOYEE
Relevant Tuples:
ID  CODE  FIRST_NAME  LAST_NAME 
--- ----- ----------- ----------
1   CD01  Naresh       "Joshi"
    at org.voltdb.client.ClientImpl.internalSyncCallProcedure(ClientImpl.java:485)
    at org.voltdb.client.ClientImpl.callProcedureWithClientTimeout(ClientImpl.java:324)
    at org.voltdb.client.ClientImpl.callProcedure(ClientImpl.java:260)
    at line4c569b049a9d4e51a3e8fda7cbb043de32.$read$$iw$$iw$$iw$$iw$$iw$$iw.callProcedure(command-3986740264398828:9)
    at line4c569b049a9d4e51a3e8fda7cbb043de40.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-3986740264399793:8)
    at line4c569b049a9d4e51a3e8fda7cbb043de40.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-3986740264399793:7)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

我找到了一个关于该问题的链接(https://forum.voltdb.com/forum/voltdb-discussions/building-voltdb-applications/1182-mispartitioned-tuple-in-single-partition-insert-statement)并试图使用以下查询对过程进行分区

PARTITION PROCEDURE EMPLOYEE.insert ON TABLE EMPLOYEE COLUMN ID;

PARTITION PROCEDURE EMPLOYEE.insert ON TABLE EMPLOYEE COLUMN ID [PARAMETER 0];

但是我[Ad Hoc DDL Input]: VoltDB DDL Error: "Partition references an undefined procedure "EMPLOYEE.insert""在执行这些语句时遇到错误。

但是,我可以使用存储过程插入数据,但是对于上述使用存储过程将数据插入分区表@AdHoc的场景,我无法找出问题或解决方案。EMPLOYEE.insert

4

1 回答 1

1

过程“EMPLOYEE.insert”是所谓的“默认”过程,它在您创建表 EMPLOYEE 时由 VoltDB 自动生成。它已经根据表的分区自动分区,因此您不能调用“PARTITION PROCEDURE EMPLOYEE.insert ...”来覆盖它。

我认为正在发生的事情是该过程由 ID 列分区,该列在 EMPLOYEE 表中是一个 VARCHAR。因此输入参数应该是一个字符串。但是,我认为您的代码以某种方式读取 CSV 文件并将第一列作为 int 值传递。

java 客户端 callProcedure(String procedureName, Object... params) 方法接受参数的可变参数。这可以是任何 Object[]。在服务器的某处进行检查,其中参数的 # 必须与过程预期的 # 匹配,否则过程调用将作为拒绝返回,并且它永远不会被执行。但是,我认为在您的情况下,参数 # 是可以的,因此它会尝试执行该过程。它对对应于 ID 的第一个参数值进行哈希处理,然后确定应该去哪个分区。调用被路由到该分区执行。当它执行时,它会尝试插入这些值,但是会再次检查该分区的分区键值是否正确,但这是失败的。

我认为如果该值作为 int 传入,它会被散列到错误的分区。然后在该分区中,它尝试将值插入到列中,这是一个 VARCHAR,因此它可能隐式地将 int 转换为字符串,但它不在正确的分区中,因此插入失败并出现此错误“Mispartitioned tuple in single-分区插入语句。” 如果您编写了一个 java 存储过程并将错误的列配置为分区键,您会遇到同样的错误。

披露:我在 VoltDB 工作。

于 2019-01-24T19:02:39.973 回答