我在我的 AWS 账户中获得了这个真实的 DynamoDB 实例,它已经填充了表和行。
我需要创建一个将在本地运行 DynamoDB 的测试场景,因此我为此目的使用 Docker。
我导出了一个包含一些 Real DynamoDB 行的表。我想要实现的是运行传递 .csv 文件或类似文件的 Docker 实例,这样我的 DynamoDb Local 在启动时不会为空。
那可能吗?
我在我的 AWS 账户中获得了这个真实的 DynamoDB 实例,它已经填充了表和行。
我需要创建一个将在本地运行 DynamoDB 的测试场景,因此我为此目的使用 Docker。
我导出了一个包含一些 Real DynamoDB 行的表。我想要实现的是运行传递 .csv 文件或类似文件的 Docker 实例,这样我的 DynamoDb Local 在启动时不会为空。
那可能吗?
我不确定官方 DynamoDB-Local 是否默认支持此功能,而且您似乎需要一个将 csv 导入 dynamodb 的自定义脚本,但是您可以根据需要创建自定义图像。因此,例如,您可以从一个简单的Dockerfile
用途开始amazon/dynamodb-local
,然后添加一个负责导入 csv 文件的 python 脚本,因此步骤可能如下:
/docker-entrypoint-initdb.d
/docker-entrypoint-initdb.d
以便将其传递给 python 脚本 -下面添加的代码- 例如,它将在 dynamodb 中加载它如下(取自mongodb 的入口点): for f in /docker-entrypoint-initdb.d/*; do
case "$f" in
# if csv file pass it to the python script
*.csv) echo "$0: running $f"; . "$f" ;;
*) echo "$0: ignoring $f" ;;
esac
echo
done
以下代码未经我测试或创建,我只是从以下链接中复制了它Import a CSV file into a DynamoDB table using boto (Python package),您可以根据需要对其进行修改或构建自己的脚本或者如果您有更好的建议让我知道以更新答案。
boto 支持 dynamodb-local,如以下答案中所述,这意味着您可以修改脚本以使其与 dynamodb local 一起使用
import boto
MY_ACCESS_KEY_ID = 'copy your access key ID here'
MY_SECRET_ACCESS_KEY = 'copy your secrete access key here'
def do_batch_write(items, table_name, dynamodb_table, dynamodb_conn):
'''
From https://gist.github.com/griggheo/2698152#file-gistfile1-py-L31
'''
batch_list = dynamodb_conn.new_batch_write_list()
batch_list.add_batch(dynamodb_table, puts=items)
while True:
response = dynamodb_conn.batch_write_item(batch_list)
unprocessed = response.get('UnprocessedItems', None)
if not unprocessed:
break
batch_list = dynamodb_conn.new_batch_write_list()
unprocessed_list = unprocessed[table_name]
items = []
for u in unprocessed_list:
item_attr = u['PutRequest']['Item']
item = dynamodb_table.new_item(
attrs=item_attr
)
items.append(item)
batch_list.add_batch(dynamodb_table, puts=items)
def import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types):
'''
Import a CSV file to a DynamoDB table
'''
dynamodb_conn = boto.connect_dynamodb(aws_access_key_id=MY_ACCESS_KEY_ID, aws_secret_access_key=MY_SECRET_ACCESS_KEY)
dynamodb_table = dynamodb_conn.get_table(table_name)
BATCH_COUNT = 2 # 25 is the maximum batch size for Amazon DynamoDB
items = []
count = 0
csv_file = open(csv_file_name, 'r')
for cur_line in csv_file:
count += 1
cur_line = cur_line.strip().split(',')
row = {}
for colunm_number, colunm_name in enumerate(colunm_names):
row[colunm_name] = column_types[colunm_number](cur_line[colunm_number])
item = dynamodb_table.new_item(
attrs=row
)
items.append(item)
if count % BATCH_COUNT == 0:
print 'batch write start ... ',
do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
items = []
print 'batch done! (row number: ' + str(count) + ')'
# flush remaining items, if any
if len(items) > 0:
do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
csv_file.close()
def main():
'''
Demonstration of the use of import_csv_to_dynamodb()
We assume the existence of a table named `test_persons`, with
- Last_name as primary hash key (type: string)
- First_name as primary range key (type: string)
'''
colunm_names = 'Last_name First_name'.split()
table_name = 'test_persons'
csv_file_name = 'test.csv'
column_types = [str, str]
import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types)
if __name__ == "__main__":
main()
#cProfile.run('main()') # if you want to do some profiling
test.csv 的内容(必须与 Python 脚本位于同一文件夹中):
John,Doe
Bob,Smith
Alice,Lee
Foo,Bar
a,b
c,d
e,f
g,h
i,j
j,l