1

在 ORM 中,我希望能够做到这一点:

session.add(Lake("hello",Polygon([(0,0),(1,0),(1,1)])))
lake = session.get(Lake).first()
assert isinstance(lake.geometry, Polygon)
assert isinstance(lake.geometry.get_exterior_ring().get_points()[0], Point)
print(lake.geometry.get_exterior_ring().get_points()[0].x)

相反,我看到访问我的湖点的唯一方法是通过一个相当复杂的代码怪物:

ring = func.ST_ExteriorRing(func.ST_GeomFromWKB(Lake.geometry))
node_count = func.ST_NPoints(ring)
node_series = func.generate_series(1, node_count)
node_n = func.ST_PointN(ring, node_series)
node_n_x = func.ST_X(node_n)
node_n_y = func.ST_Y(node_n)
rows = session.query(Lake, node_n_x, node_n_y).all()
lake_coasts = {}
for row in rows:
    lake = row[0]
    if not lake in lake_coasts:
        lake_coasts[lake] = []
    lake_coast = lake_coasts[lake]
    lake_coast.append((row[1], row[2]))
for lake in lake_coasts:
    lake_coast = lake_coasts[lake]
    print("Lake #{0}: \"{1}\" has its coasts at {2}"
          .format(lake.id, lake.name, lake_coast))

虽然这个循环让我得到了我想要的坐标,但我不知道如何实现一些返回这个实例的海岸的 Lake.get_coast()。

我也放弃了对带有 MULTIPOLYGONs 的 ""Lake""s 实施相同的方法,因为嵌套到要点对于 postgis 来说太多了(至少我是这样阅读错误消息的)

我是 postgis、gis、python 和 sqla 的新手,但在两天的谷歌搜索中,我在 SQL-Alchemy 2 中找不到任何看起来像 ORM 的东西,但只有一些 SQL 辅助函数(postgis)来解析 WKB,但只能在数据库。我需要一些额外的框架吗?我看到了 gdal、ogr、fiona,但我觉得看错了方向。是否有一些使用结构良好的 SQL-Alchemy2 的开源项目?实体、DAO 等等?除了这个极简主义的例子,你如何实际使用这个怪物?

4

1 回答 1

0

您可以使用GeoAlchemy2Shapely

    from geoalchemy2 import Geography
    from geoalchemy2.shape import from_shape
    from shapely.geometry import Polygon, Point, shape
    from sqlalchemy import create_engine, Column, Integer, func
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import scoped_session
    from sqlalchemy.orm import sessionmaker

    Base = declarative_base()
    db_session = scoped_session(sessionmaker())
    db_session.configure(bind=create_engine(connection_string))

    """
    create table polygon (
        id serial not null primary key,
        polygon geography(POLYGON) not null
    );
    """

    class TablePolygon(Base):
        __tablename__ = 'polygon'

        id = Column(Integer, primary_key=True)
        polygon = Column(Geography('POLYGON'), nullable=False)

    db_session.add(
        TablePolygon(
            id=1,
            polygon=from_shape(
                Polygon([
                    [-74.0133476, 40.7013069], [-73.9776421, 40.7121134], [-74.012661, 40.7230482],
                    [-74.0133476, 40.7013069]
                ])
            )
        )
    )

    geojson = """{"coordinates": [[[-73.9597893,40.8024021],[-73.9846802,40.7668997],[-73.9726639,40.7623468],[-73.9460564,40.7969415],[-73.9597893,40.8024021]]],"type": "Polygon"}"""

    db_session.add(
        TablePolygon(
            id=2,
            polygon=from_shape(
                shape(json.loads(geojson))
            )
        )
    )

    def find_polygon_id(point: Point):
        return db_session.query(
            TablePolygon.id
        ).filter(
            func.ST_Intersects(TablePolygon.polygon, from_shape(point))
        ).scalar()

    print(find_polygon_id(Point(-73.9822769, 40.7564925)))  # prints "None"
    print(find_polygon_id(Point(-73.9625359, 40.7858887)))  # prints "2"
    print(find_polygon_id(Point(-74.0059662, 40.7138058)))  # prints "1"
于 2022-02-12T09:38:14.997 回答