我正在制作一个分布式排序系统,其中每台机器(客户端)都有特定范围的值(分区)来存储。我发送和接收数据没有任何问题,但同步是我的问题。在从服务器检索想要的分区之前,它必须首先从其他客户端之一到达服务器。
我已经尝试过delay
和其他各种 RPC 调用来尝试同步客户端和服务器,但是客户端运行不同步,程序中断。我想我必须合并future
or promise
,但我不确定如何。
每个客户端都有一个唯一的 ID(从 0 开始),并将根据该编号存储分区。一行数据如下所示:AsfAGHM5om 00000000000000000000000000000000 0000222200002222000022220000222200002222000000001111
编辑:例如更新的代码
客户
val numberOfClients = 3
var sentPartitionsCounter = 0
while(sentPartitionsCounter < numberOfClients) {
if(sentPartitionsCounter != id){
sendUnwantedPartition("src/main/scala/testFile."+sentPartitionsCounter.toString) //"partition."+client.sentPartitionsCounter
sentPartitionsCounter += 1
}
client.getWantedPartition()
}
def sendUnwantedPartition(filename: String): Unit = {
val dataList = Source.fromFile(filename).getLines.toList
val dataSeq = for {
dataLine <- dataList
dataValues = dataLine.split(" ", 2)
} yield (Data(key = dataValues(0), value = dataValues(1)))
val partitionID = filename takeRight 1
val request = Dataset(data = dataSeq, partitionID = partitionID.toInt)
val response = blockingStub.getUnwantedPartitions(request)
print("sent partitions")
}
def getWantedPartition(): Unit = {
val request = ID(id = id)
val response = blockingStub.sendWantedPartitions(request)
val wantedPartitions = new File("clientFiles/wantedPartitions"+id.toString+".txt")
val printWriter: PrintWriter = new PrintWriter(new FileWriter(wantedPartitions, true));
if(!wantedPartitions.exists()){
wantedPartitions.createNewFile()
}
for {
data <- response.data
} yield (printWriter.append(data.key + " " + data.value + "\n"))
printWriter.close();
}
服务器
override def getUnwantedPartitions(req: Dataset) = {
val filename = "serverPartitions/partition"+req.partitionID+".txt"
val partition = new File(filename)
val printWriter: PrintWriter = new PrintWriter(new FileWriter(partition, true));
if(!partition.exists()){
partition.createNewFile()
}
for {
data <- req.data
} yield (printWriter.append(data.key + " " + data.value + "\n"))
printWriter.close();
Future.successful(Text("Got unwanted data"))
}
override def sendWantedPartitions(req: ID) = {
val filename = "serverPartitions/partition"+req.id+".txt"
try {
val dataList = Source.fromFile(filename).getLines.toList
val dataSeq = for {
dataLine <- dataList
dataValues = dataLine.split(" ", 2)
} yield (Data(key = dataValues(0), value = dataValues(1)))
val reply = Dataset(data = dataSeq, partitionID = req.id)
new File(filename).delete()
Future.successful(reply)
} catch {
// Partition hasn't been received yet
case e: FileNotFoundException => Future.successful(Dataset())
}
}
原型
syntax = "proto3";
package protoPackage;
service DistrSorting {
rpc getUnwantedPartitions(Dataset) returns (Text) {}
rpc sendWantedPartitions(ID) returns (Dataset) {}
}
message ID {
int32 id = 1;
}
message Text {
string text = 1;
}
message Dataset {
repeated Data data = 1;
int32 partitionID = 2;
}
message Data {
string key = 1;
string value = 2;
}