Source code for GeoHealthCheck.plugins.probe.wfs3
from owslib.ogcapi.features import Features
from openapi_spec_validator import openapi_v3_spec_validator
from GeoHealthCheck.probe import Probe
from GeoHealthCheck.result import Result, push_result
[docs]class WFS3Caps(Probe):
"""Probe for OGC WFS3 API (OAFeat) main endpoint url"""
NAME = 'OGC API Features (OAFeat) Capabilities'
DESCRIPTION = 'Validate OGC API Features (OAFeat) ' \
'endpoint landing page'
RESOURCE_TYPE = 'OGC:WFS3'
REQUEST_METHOD = 'GET'
REQUEST_HEADERS = {'Accept': 'application/json'}
# e.g. https://demo.pygeoapi.io/master
REQUEST_TEMPLATE = ''
def __init__(self):
Probe.__init__(self)
CHECKS_AVAIL = {
'GeoHealthCheck.plugins.check.checks.HttpStatusNoError': {
'default': True
},
'GeoHealthCheck.plugins.check.checks.JsonParse': {
'default': True
},
'GeoHealthCheck.plugins.check.checks.ContainsStrings': {
'set_params': {
'strings': {
'name': 'Contains required strings',
'value': ['/conformance', '/collections',
'service', 'links']
}
},
'default': True
},
}
"""Validate OGC API Features (OAFeat) endpoint landing page"""
def type_for_link(links, rel):
content_type = 'application/json'
for link in links:
if link['rel'] == rel:
content_type = link.get('type', content_type)
# We only want JSON content types (e.g. items)
# for OWSLib for now.
if 'json' in content_type:
break
return content_type
def set_accept_header(oa_feat, content_type):
oa_feat.headers['Accept'] = content_type
[docs]class WFS3Drilldown(Probe):
"""
Probe for OGC API Features (OAFeat) endpoint "drilldown" or
"crawl": starting with top endpoint: get Collections and fetch
Features on them etc. Uses the OWSLib owslib.ogcapi package.
TODO: class needs renaming: WFS3 is now OAFeat.
"""
NAME = 'OGC API Features (OAFeat) Drilldown'
DESCRIPTION = 'Traverses an OGC API Features (OAFeat) API ' \
'endpoint by drilling down'
RESOURCE_TYPE = 'OGC:WFS3'
REQUEST_METHOD = 'GET'
REQUEST_HEADERS = {'Accept': 'application/json'}
PARAM_DEFS = {
'drilldown_level': {
'type': 'string',
'description': 'How thorough the drilldown should be.\
basic: test presence endpoints, \
full: go through collections, fetch Features',
'default': 'basic',
'required': True,
'range': ['basic', 'full']
}
}
"""Param defs"""
def __init__(self):
Probe.__init__(self)
[docs] def perform_request(self):
"""
Perform the drilldown.
See https://github.com/geopython/OWSLib/blob/
master/tests/doctests/wfs3_GeoServerCapabilities.txt
"""
oa_feat = None
collections = None
# 1.1 Test Landing Page
result = Result(True, 'Test Landing Page')
result.start()
try:
oa_feat = Features(self._resource.url,
headers=self.get_request_headers())
except Exception as err:
result.set(False, '%s:%s' % (result.message, str(err)))
result.stop()
self.result.add_result(result)
# 1.2 Test top endpoints existence: /conformance
result = Result(True, 'conformance endpoint exists')
result.start()
try:
set_accept_header(oa_feat, type_for_link(
oa_feat.links, 'conformance'))
oa_feat.conformance()
except Exception as err:
result.set(False, str(err))
result.stop()
self.result.add_result(result)
# 1.3 Test top endpoints existence: /collections
result = Result(True, 'Get collections')
result.start()
try:
set_accept_header(oa_feat, type_for_link(
oa_feat.links, 'data'))
collections = oa_feat.collections()['collections']
except Exception as err:
result.set(False, '%s:%s' % (result.message, str(err)))
result.stop()
self.result.add_result(result)
# 1.4 Test top endpoints existence: OpenAPI doc
result = Result(True, 'Test OpenAPI Doc')
result.start()
try:
# OWSLib 0.20.0+ has call to '/api now.
set_accept_header(oa_feat, type_for_link(
oa_feat.links, 'service-desc'))
api_doc = oa_feat.api()
for attr in ['components', 'paths', 'openapi']:
val = api_doc.get(attr, None)
if val is None:
msg = 'missing attr: %s' % attr
result = push_result(
self, result, False, msg, 'Test OpenAPI doc')
continue
except Exception as err:
result.set(False, '%s:%s' % (result.message, str(err)))
result.stop()
self.result.add_result(result)
if self._parameters['drilldown_level'] == 'basic':
return
# ASSERTION: will do full drilldown, level 2, from here
# 2. Test layers
# TODO: use parameters to work on less/more drilling
# "full" could be all layers.
result = Result(True, 'Test Collections')
result.start()
coll_id = ''
try:
for collection in collections:
coll_id = collection['id']
coll_id = coll_id
try:
set_accept_header(oa_feat, type_for_link(
collection['links'], 'self'))
coll = oa_feat.collection(coll_id)
# TODO: Maybe also add crs
for attr in ['id', 'links']:
val = coll.get(attr, None)
if val is None:
msg = '%s: missing attr: %s' \
% (coll_id, attr)
result = push_result(
self, result, False, msg, 'Test Collection')
continue
except Exception as e:
msg = 'GetCollection %s: OWSLib err: %s ' \
% (str(e), coll_id)
result = push_result(
self, result, False, msg, 'Test GetCollection')
continue
try:
set_accept_header(oa_feat, 'application/geo+json')
items = oa_feat.collection_items(coll_id, limit=1)
except Exception as e:
msg = 'GetItems %s: OWSLib err: %s ' % (str(e), coll_id)
result = push_result(
self, result, False, msg, 'Test GetItems')
continue
features = items.get('features', None)
if features is None:
msg = 'GetItems %s: No features attr' % coll_id
result = push_result(
self, result, False, msg, 'Test GetItems')
continue
type = items.get('type', '')
if type != 'FeatureCollection':
msg = '%s:%s type not FeatureCollection: %s' \
% (coll_id, type, val)
result = push_result(
self, result, False, msg, 'Test GetItems')
continue
if len(items['features']) > 0:
fid = items['features'][0]['id']
try:
item = oa_feat.collection_item(coll_id, fid)
except Exception as e:
msg = 'GetItem %s: OWSLib err: %s' \
% (str(e), coll_id)
result = push_result(
self, result, False, msg, 'Test GetItem')
continue
for attr in \
['id', 'links', 'properties', 'geometry', 'type']:
val = item.get(attr, None)
if val is None:
msg = '%s:%s missing attr: %s' \
% (coll_id, str(fid), attr)
result = push_result(
self, result, False, msg, 'Test GetItem')
continue
if attr == 'type' and val != 'Feature':
msg = '%s:%s type not Feature: %s' \
% (coll_id, str(fid), val)
result = push_result(
self, result, False, msg, 'Test GetItem')
continue
except Exception as err:
result.set(False, 'Collection err: %s : e=%s'
% (coll_id, str(err)))
result.stop()
# Add to overall Probe result
self.result.add_result(result)
[docs]class WFS3OpenAPIValidator(Probe):
"""
Probe for OGC API Features (OAFeat) OpenAPI Document Validation.
Uses https://pypi.org/project/openapi-spec-validator/.
"""
NAME = 'OGC API Features (OAFeat) OpenAPI Validator'
DESCRIPTION = 'Validates OGC API Features (OAFeat) api endpoint for ' \
'OpenAPI compliance'
RESOURCE_TYPE = 'OGC:WFS3'
REQUEST_HEADERS = {'Accept': 'application/json'}
REQUEST_METHOD = 'GET'
"""Param defs"""
def __init__(self):
Probe.__init__(self)
[docs] def perform_request(self):
"""
Perform the validation.
Uses https://github.com/p1c2u/openapi-spec-validator on
the specfile (dict) returned from the OpenAPI endpoint.
"""
# Step 1 basic sanity check
result = Result(True, 'OpenAPI Sanity Check')
result.start()
api_doc = None
try:
oa_feat = Features(self._resource.url,
headers=self.get_request_headers())
set_accept_header(oa_feat, type_for_link(
oa_feat.links, 'service-desc'))
api_doc = oa_feat.api()
# Basic sanity check
for attr in ['components', 'paths', 'openapi']:
val = api_doc.get(attr, None)
if val is None:
msg = 'OpenAPI doc: missing attr: %s' % attr
result.set(False, msg)
break
except Exception as err:
result.set(False, '%s:%s' % (result.message, str(err)))
result.stop()
self.result.add_result(result)
# No use to proceed if OpenAPI basics not complied
if api_doc is None or result.success is False:
return
# ASSERTION: OpenAPI doc exists, next OpenAPI Validation
# Step 2 detailed OpenAPI Compliance test
result = Result(True, 'Validate OpenAPI Compliance')
result.start()
try:
# Call the openapi-spec-validator and iterate through errors
errors_iterator = openapi_v3_spec_validator.iter_errors(api_doc)
for error in errors_iterator:
# Add each validation error as separate Result object
result = push_result(
self, result, False,
str(error), 'OpenAPI Compliance Result')
except Exception as err:
result.set(False, '%s:%s' % (result.message, str(err)))
result.stop()
# Add to overall Probe result
self.result.add_result(result)
# class OGCAPIFeaturesCollection(Probe):
# """Probe for OGC API Features (OAFeat) - Features Collection endpoint"""
#
# NAME = 'OGC API Features Collection'
# DESCRIPTION = 'Validate an OGC API Features Collection'
# RESOURCE_TYPE = 'OGC:WFS3'
#
# REQUEST_METHOD = 'GET'
#
# def __init__(self):
# Probe.__init__(self)
#
# CHECKS_AVAIL = {
# 'GeoHealthCheck.plugins.check.checks.HttpStatusNoError': {
# 'default': True
# },
# 'GeoHealthCheck.plugins.check.checks.JsonParse': {
# 'default': True
# },
# 'GeoHealthCheck.plugins.check.checks.ContainsStrings': {
# 'default': True,
# 'set_params': {
# 'strings': {
# 'name': 'Must contain links to at least Feature'
# 'Collections, Conformance and OpenAPI
# endpoint',
# 'value': ['links', 'href', '/collections',
# '/conformance', '/api']
# }
# }
# },
# }
# """
# Checks avail for all specific Caps checks.
# Optionally override Check.PARAM_DEFS using set_params
# e.g. with specific `value` or even `name`.
# """
#
# # Overridden: expand param-ranges from WMS metadata
# def expand_params(self, resource):
#
# # Use WMS Capabilities doc to get metadata for
# # PARAM_DEFS ranges/defaults
# try:
# wms = self.get_metadata_cached(resource, version='1.1.1')
# layers = wms.contents
# self.layer_count = len(layers)
#
# # Layers to select
# self.PARAM_DEFS['layers']['range'] = list(layers.keys())
#
# # Image Format
# for oper in wms.operations:
# if oper.name == 'GetMap':
# self.PARAM_DEFS['format']['range'] = \
# oper.formatOptions
# break
#
# # Take random layer to determine generic attrs
# for layer_name in layers:
# layer_entry = layers[layer_name]
# break
#
# # SRS
# srs_range = layer_entry.crsOptions
# self.PARAM_DEFS['srs']['range'] = srs_range
#
# # bbox list: 0-3 is bbox, 4 is SRS
# bbox = layer_entry.boundingBox
# bbox_srs = bbox[4]
# self.PARAM_DEFS['srs']['default'] = bbox_srs
# # if it is not EPSG:4326 we need to transform bbox
# # if bbox_srs != 'EPSG:4326':
# # bbox = transform_bbox('EPSG:4326', bbox_srs, bbox[:-1])
#
# self.PARAM_DEFS['bbox']['default'] = \
# [str(x) for x in bbox[:-1]]
#
# self.PARAM_DEFS['exceptions']['range'] = wms.exceptions
# except Exception as err:
# raise err