1 2 3 4 5 6 | import geopandas import dask_geopandas import pandas as pd import planetary_computer import pystac_client import shapely.geometry |
1 | geom = {"coordinates":[[[36.61127547076927,37.04043521970452],[36.61127547076927,37.00072876242824],[36.655897425034425,37.00072876242824],[36.655897425034425,37.04043521970452],[36.61127547076927,37.04043521970452]]],"type":"Polygon"} |
1 2 3 4 5 6 7 8 9 10 11 | catalog = pystac_client.Client.open( "https://planetarycomputer.microsoft.com/api/stac/v1", modifier=planetary_computer.sign_inplace, ) items = catalog.search( intersects=geom, collections=["ms-buildings"], query={"msbuildings:region": {"eq": "Turkey"}} ) item = next(items.items()) |
1 | list(items.items()) |
[<Item id=Turkey_2022-07-06>, <Item id=Turkey_2022-06-14>]
dask-geopandas needs to read the parquet metadata to get the geospatial footprint of each partition. We'll do that in parallel on the "cluster" to keep things fast.
1 2 3 4 | from distributed import Client client = Client() client |
Client
Client-a427fd24-a821-11ed-83d5-124602f6e725
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: /user/taugspurger@microsoft.com/proxy/8787/status |
Cluster Info
LocalCluster
4b47dfda
Dashboard: /user/taugspurger@microsoft.com/proxy/8787/status | Workers: 4 |
Total threads: 4 | Total memory: 32.00 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-899b1d45-bca1-4962-92cd-ffefe403c757
Comm: tcp://127.0.0.1:41993 | Workers: 4 |
Dashboard: /user/taugspurger@microsoft.com/proxy/8787/status | Total threads: 4 |
Started: Just now | Total memory: 32.00 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:44289 | Total threads: 1 |
Dashboard: /user/taugspurger@microsoft.com/proxy/36827/status | Memory: 8.00 GiB |
Nanny: tcp://127.0.0.1:42667 | |
Local directory: /tmp/dask-worker-space/worker-o1hge_m5 |
Worker: 1
Comm: tcp://127.0.0.1:38789 | Total threads: 1 |
Dashboard: /user/taugspurger@microsoft.com/proxy/37905/status | Memory: 8.00 GiB |
Nanny: tcp://127.0.0.1:33485 | |
Local directory: /tmp/dask-worker-space/worker-q27g0rpf |
Worker: 2
Comm: tcp://127.0.0.1:42849 | Total threads: 1 |
Dashboard: /user/taugspurger@microsoft.com/proxy/44421/status | Memory: 8.00 GiB |
Nanny: tcp://127.0.0.1:37101 | |
Local directory: /tmp/dask-worker-space/worker-h07twob6 |
Worker: 3
Comm: tcp://127.0.0.1:42203 | Total threads: 1 |
Dashboard: /user/taugspurger@microsoft.com/proxy/35277/status | Memory: 8.00 GiB |
Nanny: tcp://127.0.0.1:36659 | |
Local directory: /tmp/dask-worker-space/worker-jurml791 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | import dask.dataframe as dd import fsspec def read_parquet(asset): fs, token, [root] = fsspec.get_fs_token_paths(asset.href, storage_options=asset.extra_fields["table:storage_options"]) # Get the raw paths (fast enough to do this locally. Could be done on the cluster too) paths = fs.ls(root) paths = [f"az://{p}" for p in paths] # Read each partition's metadata on the cluster df_futures = client.map( dask_geopandas.read_parquet, paths, storage_options=asset.extra_fields["table:storage_options"] ) # workaround https://github.com/geopandas/dask-geopandas/issues/237 def get_spatial_partitions(x): return x.spatial_partitions spatial_partitions_futures = client.map(get_spatial_partitions, df_futures) # Pull back locally. This takes the most time, waiting for computation dfs, spatial_partitions = client.gather([df_futures, spatial_partitions_futures]) for df, sp in zip(dfs, spatial_partitions): df.spatial_partitions = None full_country = dd.concat(dfs) full_country.spatial_partitions = pd.concat(spatial_partitions, ignore_index=True) return full_country |
1 2 3 | asset = item.assets["data"] %time full_country = read_parquet(asset) |
CPU times: user 5.31 s, sys: 568 ms, total: 5.88 s Wall time: 30.8 s
1 2 3 4 | shape = shapely.geometry.shape(geom) m = full_country.spatial_partitions.explore(style_kwds={"fill": False}) m |
Make this Notebook Trusted to load map: File -> Trust Notebook
1 2 | %%time subset = full_country.clip(shapely.geometry.shape(geom)).compute() |
CPU times: user 77.3 ms, sys: 48.4 ms, total: 126 ms Wall time: 742 ms
/srv/conda/envs/notebook/lib/python3.10/site-packages/geopandas/tools/clip.py:67: FutureWarning: In a future version, `df.iloc[:, i] = newvals` will attempt to set the values inplace instead of always setting a new array. To retain the old behavior, use either `df[df.columns[i]] = newvals` or, if columns are non-unique, `df.isetitem(i, newvals)` clipped.loc[
1 | subset.shape |
(4814, 3)