Source code for GeoHealthCheck.plugins.probe.esrifs
from GeoHealthCheck.probe import Probe
from GeoHealthCheck.result import Result, push_result
[docs]class ESRIFSDrilldown(Probe):
"""
Probe for ESRI FeatureServer endpoint "drilldown": starting
with top /FeatureServer endpoint: get Layers and get Features on these.
Test e.g. from https://sampleserver6.arcgisonline.com/arcgis/rest/services
(at least sampleserver6 is ArcGIS 10.6.1 supporting Paging).
"""
NAME = 'ESRIFS Drilldown'
DESCRIPTION = 'Traverses an ESRI FeatureServer ' \
'(REST) API endpoint by drilling down'
RESOURCE_TYPE = 'ESRI:FS'
REQUEST_METHOD = 'GET'
PARAM_DEFS = {
'drilldown_level': {
'type': 'string',
'description': 'How heavy the drilldown should be.\
basic: test presence of Capabilities, \
full: go through Layers, get Features',
'default': 'basic',
'required': True,
'range': ['basic', 'full']
}
}
"""Param defs"""
def __init__(self):
Probe.__init__(self)
def get_request_headers(self):
headers = Probe.get_request_headers(self)
# Clear possibly dangling ESRI header
# https://github.com/geopython/GeoHealthCheck/issues/293
if 'X-Esri-Authorization' in headers:
del headers['X-Esri-Authorization']
if 'Authorization' in headers:
# https://enterprise.arcgis.com/en/server/latest/
# administer/linux/about-arcgis-tokens.htm
auth_val = headers['Authorization']
if 'Bearer' in auth_val:
headers['X-Esri-Authorization'] = headers['Authorization']
return headers
def perform_esrifs_get_request(self, url):
response = self.perform_get_request(url).json()
error_msg = 'code=%d message=%s'
# May have error like:
# {
# "error" :
# {
# "code" : 499,
# "message" : "Token Required",
# "messageCode" : "GWM_0003",
# "details" : [
# "Token Required"
# ]
# }
# }
if 'error' in response:
err = response['error']
raise Exception(error_msg % (err['code'], err['message']))
return response
[docs] def perform_request(self):
"""
Perform the drilldown.
"""
# Be sure to use bare root URL http://.../FeatureServer
fs_url = self._resource.url.split('?')[0]
# Assemble request templates with root FS URL
req_tpl = {
'fs_caps': fs_url + '?f=json',
'layer_caps': fs_url + '/%d?f=json',
'get_features': fs_url +
'/%d/query?where=1=1'
'&outFields=*&resultOffset=0&'
'resultRecordCount=1&f=json',
'get_feature_by_id': fs_url +
'/%d/query?where=%s=%s&outFields=*&f=json'
}
# 1. Test top Service endpoint existence
result = Result(True, 'Test Service Endpoint')
result.start()
layers = []
try:
fs_caps = self.perform_esrifs_get_request(req_tpl['fs_caps'])
for attr in ['currentVersion', 'layers']:
val = fs_caps.get(attr, None)
if val is None:
msg = 'Service: missing attr: %s' % attr
result = push_result(
self, result, False, msg, 'Test Layer:')
continue
layers = fs_caps.get('layers', [])
except Exception as err:
result.set(False, str(err))
result.stop()
self.result.add_result(result)
if len(layers) == 0:
return
# 2. Test each Layer Capabilities
result = Result(True, 'Test Layer Capabilities')
result.start()
layer_ids = []
layer_caps = []
try:
for layer in layers:
layer_ids.append(layer['id'])
for layer_id in layer_ids:
layer_caps.append(self.perform_esrifs_get_request(
req_tpl['layer_caps'] % layer_id))
except Exception as err:
result.set(False, str(err))
result.stop()
self.result.add_result(result)
if self._parameters['drilldown_level'] == 'basic':
return
# ASSERTION: will do full drilldown from here
# 3. Test getting Features from Layers
result = Result(True, 'Test Layers')
result.start()
layer_id = 0
try:
for layer_id in layer_ids:
try:
features = self.perform_esrifs_get_request(
req_tpl['get_features'] % layer_id)
obj_id_field_name = features['objectIdFieldName']
features = features['features']
if len(features) == 0:
continue
# At least one Feature: use first and try to get by id
object_id = features[0]['attributes'][obj_id_field_name]
feature = self.perform_get_request(
req_tpl['get_feature_by_id'] % (
layer_id, obj_id_field_name,
str(object_id))).json()
feature = feature['features']
if len(feature) == 0:
msg = 'layer: %d: missing Feature - id: %s' \
% (layer_id, str(object_id))
result = push_result(
self, result, False, msg,
'Test Layer: %d' % layer_id)
except Exception as e:
msg = 'GetLayer: id=%d: err=%s ' \
% (layer_id, str(e))
result = push_result(
self, result, False, msg, 'Test Get Features:')
continue
except Exception as err:
result.set(False, 'Layer: id=%d : err=%s'
% (layer_id, str(err)))
result.stop()
# Add to overall Probe result
self.result.add_result(result)