Source code for GeoHealthCheck.plugins.probe.esrifs
from GeoHealthCheck.probe import Probe
from GeoHealthCheck.result import Result, push_result
[docs]class ESRIFSDrilldown(Probe):
    """
    Probe for ESRI FeatureServer endpoint "drilldown": starting
    with top /FeatureServer endpoint: get Layers and get Features on these.
    Test e.g. from https://sampleserver6.arcgisonline.com/arcgis/rest/services
    (at least sampleserver6 is ArcGIS 10.6.1 supporting Paging).
    """
    NAME = 'ESRIFS Drilldown'
    DESCRIPTION = 'Traverses an ESRI FeatureServer ' \
                  '(REST) API endpoint by drilling down'
    RESOURCE_TYPE = 'ESRI:FS'
    REQUEST_METHOD = 'GET'
    PARAM_DEFS = {
        'drilldown_level': {
            'type': 'string',
            'description': 'How heavy the drilldown should be.\
                            basic: test presence of Capabilities, \
                            full: go through Layers, get Features',
            'default': 'basic',
            'required': True,
            'range': ['basic', 'full']
        }
    }
    """Param defs"""
    def __init__(self):
        Probe.__init__(self)
    def get_request_headers(self):
        headers = Probe.get_request_headers(self)
        # Clear possibly dangling ESRI header
        # https://github.com/geopython/GeoHealthCheck/issues/293
        if 'X-Esri-Authorization' in headers:
            del headers['X-Esri-Authorization']
        if 'Authorization' in headers:
            # https://enterprise.arcgis.com/en/server/latest/
            #     administer/linux/about-arcgis-tokens.htm
            auth_val = headers['Authorization']
            if 'Bearer' in auth_val:
                headers['X-Esri-Authorization'] = headers['Authorization']
        return headers
    def perform_esrifs_get_request(self, url):
        response = self.perform_get_request(url).json()
        error_msg = 'code=%d message=%s'
        # May have error like:
        # {
        #   "error" :
        #   {
        #     "code" : 499,
        #     "message" : "Token Required",
        #     "messageCode" : "GWM_0003",
        #     "details" : [
        #       "Token Required"
        #     ]
        #   }
        # }
        if 'error' in response:
            err = response['error']
            raise Exception(error_msg % (err['code'], err['message']))
        return response
[docs]    def perform_request(self):
        """
        Perform the drilldown.
        """
        # Be sure to use bare root URL http://.../FeatureServer
        fs_url = self._resource.url.split('?')[0]
        # Assemble request templates with root FS URL
        req_tpl = {
            'fs_caps': fs_url + '?f=json',
            'layer_caps': fs_url + '/%d?f=json',
            'get_features': fs_url +
            '/%d/query?where=1=1'
            '&outFields=*&resultOffset=0&'
            'resultRecordCount=1&f=json',
            'get_feature_by_id': fs_url +
            '/%d/query?where=%s=%s&outFields=*&f=json'
        }
        # 1. Test top Service endpoint existence
        result = Result(True, 'Test Service Endpoint')
        result.start()
        layers = []
        try:
            fs_caps = self.perform_esrifs_get_request(req_tpl['fs_caps'])
            for attr in ['currentVersion', 'layers']:
                val = fs_caps.get(attr, None)
                if val is None:
                    msg = 'Service: missing attr: %s' % attr
                    result = push_result(
                        self, result, False, msg, 'Test Layer:')
                    continue
            layers = fs_caps.get('layers', [])
        except Exception as err:
            result.set(False, str(err))
        result.stop()
        self.result.add_result(result)
        if len(layers) == 0:
            return
        # 2. Test each Layer Capabilities
        result = Result(True, 'Test Layer Capabilities')
        result.start()
        layer_ids = []
        layer_caps = []
        try:
            for layer in layers:
                layer_ids.append(layer['id'])
            for layer_id in layer_ids:
                layer_caps.append(self.perform_esrifs_get_request(
                    req_tpl['layer_caps'] % layer_id))
        except Exception as err:
            result.set(False, str(err))
        result.stop()
        self.result.add_result(result)
        if self._parameters['drilldown_level'] == 'basic':
            return
        # ASSERTION: will do full drilldown from here
        # 3. Test getting Features from Layers
        result = Result(True, 'Test Layers')
        result.start()
        layer_id = 0
        try:
            for layer_id in layer_ids:
                try:
                    features = self.perform_esrifs_get_request(
                        req_tpl['get_features'] % layer_id)
                    obj_id_field_name = features['objectIdFieldName']
                    features = features['features']
                    if len(features) == 0:
                        continue
                    # At least one Feature: use first and try to get by id
                    object_id = features[0]['attributes'][obj_id_field_name]
                    feature = self.perform_get_request(
                        req_tpl['get_feature_by_id'] % (
                            layer_id, obj_id_field_name,
                            str(object_id))).json()
                    feature = feature['features']
                    if len(feature) == 0:
                        msg = 'layer: %d: missing Feature - id: %s' \
                              % (layer_id, str(object_id))
                        result = push_result(
                            self, result, False, msg,
                            'Test Layer: %d' % layer_id)
                except Exception as e:
                    msg = 'GetLayer: id=%d: err=%s ' \
                          % (layer_id, str(e))
                    result = push_result(
                        self, result, False, msg, 'Test Get Features:')
                    continue
        except Exception as err:
            result.set(False, 'Layer: id=%d : err=%s'
                       % (layer_id, str(err)))
        result.stop()
        # Add to overall Probe result
        self.result.add_result(result)