Source code for merit.org

import os
import logging
import datetime

import requests

from .merit import Merit
from . import exceptions


# Get an instance of a logger
logging.basicConfig(format='[Merit %(asctime)s %(levelname)s]: %(message)s')
logger = logging.getLogger(__name__)


[docs]class Org(Merit): """Merit Organizations To properly create an instance, begin with :func:`merit.Merit.link_with_merit` flow, retreive an org_access_token, and exchange it for an org_id :param app_id: ID for the app, given during creation :param app_secret: secret for the app, given during creation :param production: specify the environment for your API calls :param org_id: ID for the Merit Organization :return: `merit.Org` object """ def __init__(self, org_id: str = os.getenv("MERIT_ORG_ID"), app_id: str = None, app_secret: str = None, production: bool = True ): if app_id and app_secret: super(Org, self).__init__(app_id, app_secret, production) else: super(Org, self).__init__() self.org_id = org_id self.auth_timeout = 3600 # seconds self.authenticated_at = None self.org_access_token = self.get_org_access_token()
[docs] def get_org_access_token(self) -> str: """Call Merit API for Org Access Token.""" access_url = f"{self.domain}/orgs/{self.org_id}/access" logger.info(f"Calling {access_url}") response = requests.post(access_url, auth=(self.app_id, self.app_secret)) if response.status_code == 200: self.authenticated_at = datetime.datetime.now() self.org_access_token = response.json().get("orgAccessToken") return self.org_access_token else: logger.error(f"{response.text}") return None
[docs] def authenticate(self): """Get or refresh org_access_token.""" if not self.org_access_token: self.get_org_access_token() if (self.authenticated_at < (datetime.datetime.now() - datetime.timedelta(seconds=self.auth_timeout))): self.get_org_access_token()
[docs] def get_api(self, path: str, params: dict = None) -> requests.Response: """Unified endpoint for all GET calls out to API. :param path: the relative path, appended to `domain`, be sure to include a leading slash :param params: a `dict` of query params to include :return: `requests.response` object """ # authenticate self.authenticate() # call api url = f"{self.domain}{path}" headers = {"Authorization": f"Bearer {self.org_access_token}"} logger.info(f"Calling {url}") response = requests.get(url, headers=headers, params=params) if response.status_code == 200: return response else: logger.error(f"({response.status_code}) {response.text}") return response
[docs] def post_api(self, path: str, data: dict = None) -> requests.Response: """Unified endpoint for all POST calls out to API. :param path: the relative path, appended to `domain`, be sure to include a leading slash :param data: a `dict` of json data to send along with the POST :return: a `requests.response` object """ # authenticate self.authenticate() # call api url = f"{self.domain}{path}" headers = {"Authorization": f"Bearer {self.org_access_token}"} logger.info(f"Calling {url}") response = requests.post(url, json=data, headers=headers) if response.status_code == 200: return response else: logger.error(f"({response.status_code}) {response.text}") return response
[docs] def login_with_merit(self, success_url: str, failure_url: str, permissions: list = ["CanViewPublicProfile"], org_ids: list = None) -> str: """Initiate process to Login with Merit for a member. :param success_url: relative path where Merit will redirect the user after successful authentication :param failure_url: relative path where Merit will redirect the user after unsuccessful authentication :param permissions: a list of permissions you wish to request from the member :param org_ids: a list of org_ids for which you wish to request permission from the member. Must be included if `CanViewAllStandardMeritsFromOrg` is included in `permissions`. :return: URL to redirect user to to begin link_with_merit flow (https://app.merits.com/link-app/?token=5aa5a3992bfa4e0006c47cdf) """ path = f"/orgs/{self.org_id}/request_loginwithmerit_url" # validate params valid_permissions = ["CanViewPublicProfile", "CanViewAllStandardMeritsFromOrg", "CanViewAllStandardMerits"] requested_permissions = [] if type(permissions) != list: raise TypeError("Permission variable must be a list of strings.") for permission in permissions: if type(permission) != str: raise TypeError("Permission variable must be a list of strings.") if permission not in valid_permissions: raise exceptions.RequestedPermissionException(f"Requested Permissions ({permissions}) is not valid. Valid Permissions are: ({valid_permissions})") # add to list requested_permissions.append({"permissionType": permission}) # org_ids do not get validated if "CanViewAllStandardMeritsFromOrg" in permissions: if not org_ids: raise exceptions.RequestedPermissionException("Permission CanViewAllStandardMeritsFromOrg requested without specifiying org_ids.") if type(org_ids) != list: raise TypeError("org_ids variable must be a list of strings.") for org_id in org_ids: if type(org_id) != str: raise TypeError("org_ids variable must be a list of strings.") # add to list requested_permissions.append({"permissionType": "CanViewAllStandardMeritsFromOrg", "orgId": org_id }) data = { "requestedPermissions": requested_permissions, "successUrl": success_url, "failureUrl": failure_url, "state": f"<3-from-merit-{datetime.datetime.now():%d-%m-%Y-%H-%M-%S}", } response = self.post_api(path, data=data) # { # "request_loginwithmerit_url": "https://sig.ma/login-with-merit/?token=5aa5a3992bfa4e0006c47cdf" # "expiration": "2019-01-31T18:48:51.000Z", # "state": "initiated-from-merit-registration-%d-%m-%Y-%H-%M-%S" # } if response.status_code == 200: url = response.json().get("request_loginwithmerit_url") if url: return url logger.error(response.text) return None
[docs] def get_member_id_from_token(self, member_id_token: str) -> str: """Exchange member_id_token from login_with_merit for permanent member_id. :param member_id_token: the token to exchange :return: member_id """ # validate member_id_token if type(member_id_token) != str: raise TypeError(f"member_id_token {member_id_token} is not a string.") response = self.get_api("/member_id", {"member_id_token": member_id_token}) if response.status_code == 200: data = response.json() if "memberId" in data: return data.get("memberId") logger.error(response.text) return None
[docs] def get_member_info(self, member_id: str) -> dict: """Get Merit information about Member. :param member_id: the ID of the Member you wish to retreive :return: {"id": "573564e698ae3b96668fd517","name": {"firstName": "Omer","lastName":"Zach"}} """ response = self.get_api(f"/members/{member_id}") if response.status_code == 200: data = response.json() if "id" in data: return data logger.error(response.text) return None
[docs] def get_member_access_merit(self, member_id: str) -> dict: """Get Member's Access merit for this app, which returns more details than `get_member_info` :param member_id: the ID of the Member you wish to retreive :return: a `Merit` dict """ response = self.get_api(f"/members/{member_id}/access_merit") if response.status_code == 200: data = response.json() if "id" in data: return data logger.error(response.text) return None
[docs] def get_member_merits(self, member_id: str, template_id: str = None, limit: int = 100) -> list: """Get all Member merits by specifications. :param member_id: the ID of the Member you wish to retreive :type member_id: str :param template_id: the ID of the MeritTemplate you wish to retreive :type template_id: str, optional :param limit: the number of results you wish to retreive :type limit: int, optional :return: a list of merits issued owned by the Member, filtered as specified :rtype: list """ # validate params params = {} if type(member_id) != str: raise TypeError(f"member_id ({member_id}) must be a string.") if type(limit) != int: raise TypeError(f"limit ({limit}) must be an integer.") params["limit"] = limit if template_id: if type(template_id) != str: raise TypeError(f"template_id ({template_id}) must be a string.") params["merittemplate_id"] = template_id # init returned list merit_list = [] next_page = True while next_page: # call api, parse response response = self.get_api(f"/members/{member_id}/merits", params) if response.status_code == 200: data = response.json() else: logger.error(response.text) return merit_list # add merits into list merit_list.extend(data.get("merits", [])) # stop at user provided limit if len(merit_list) >= limit: print(f"stopping at limit {len(merit_list)}") return merit_list # loop again if more pages next_page = data.get("paging", {}).get("pageInfo", {}).get("hasNextPage", False) params["starting_after"] = data.get("paging", {}).get("cursors", {}).get("after", "") return merit_list
[docs] def member_has_active_merit(self, member_id: str, template_id: str) -> bool: """Check whether Member has an active merit from the Template specified. :param member_id: the ID of the Member you wish to check :param template_id: the ID of the MeritTemplate you wish to check :return: a boolean whether the Member passes the test :rtype: bool """ # look for first active merit for merit in self.get_member_merits(member_id, template_id): if merit.get("active") is True: return True return False
[docs] def get_org_info(self) -> dict: """Get Merit information about Organization. :return: {"id": "5b442b02b85f223fffe9e851","title": "Millbrae CERT","description": "This is an example Org","website": "http://www.example.com","address": "1001 Broadway, Millbrae, CA, USA","phone": "+1 650-296-9525","email": "admin@example.com","logoUrl": "https://images.sig.ma/5c4f598f774d570006465f9e?rect=0,0,150,150"} """ response = self.get_api(f"/orgs/{self.org_id}") if response.status_code == 200: data = response.json() if "id" in data: return data logger.error(response.text) return None
[docs] def search_orgs(self, query: str) -> list: """Search for Organization by name based on provided query. :param query: the name you wish to search for :type query: str :return: a `list` of `dicts` of all Organizations with a matching name. :rtype: list """ if len(query) < 3: raise exceptions.SearchQueryException("Search query must be longer than 3 characters.") response = self.get_api("/orgs/search", {"limit": 10, "search_string": query}) if response.status_code == 200: data = response.json() if "results" in data: return data.get("results") logger.error(response.text) return []
[docs] def get_field(self, field_id: str) -> dict: """Return details of specified Field. :param field_id: the ID of the field you wish to retreive :return: a `dict` of the Field's info """ response = self.get_api(f"/fields/{field_id}") if response.status_code == 200: return response.json() else: logger.error(response.text) return None
[docs] def get_all_org_merit_templates(self, limit: int = 100, org_id: str = None) -> list: """Return a list of all MeritTemplates owned by the Org.""" if not org_id: org_id = self.org_id response = self.get_api(f"/orgs/{org_id}/merittemplates?limit={limit}") if response.status_code == 200: return response.json().get("merittemplates") else: logger.error(response.text) return []
[docs] def get_org_merit_template_choices(self, include_none: bool = True) -> list: """Return a formatted tuple of form choices of available MeritTemplates.""" choices = [] for template in self.get_all_org_merit_templates(): choices.append((template.get("id"), template.get("title"))) choices = sorted(choices, key = lambda x: x[1]) if include_none: choices.insert(0, (None, "-----")) return choices
[docs] def get_merit_template(self, template_id: str) -> dict: """Return details of specified MeritTemplate. :param template_id: the ID of the MeritTemplate you wish to retreive :return: all details about that MeritTemplate """ response = self.get_api(f"/merittemplates/{template_id}") if response.status_code == 200: return response.json() else: logger.error(response.text) return None
[docs] def get_template_field_choices(self, template_id: str) -> list: """Return list of fields used in specified Template. :param template_id: the ID of the MeritTemplate you wish to retreive :return: a list of Field dicts used in that MeritTemplate """ return [self.get_field(field.get("fieldId")) for field in self.get_merit_template(template_id).get("enabledFieldSettings")]
[docs] def get_merit(self, merit_id: str) -> dict: """Return details of specified Merit. :param merit_id: the ID of the Merit you wish to retreive :return: all details about that Merit """ response = self.get_api(f"/orgs/{self.org_id}/merits/{merit_id}") if response.status_code == 200: return response.json() logger.error(response.text) return None
[docs] def get_all_merits(self, template_id: str = None, merit_status: str = None, email: str = None, limit: int = 100) -> list: """Get all Org merits by specifications. :param template_id: the ID of the MeritTemplate you wish to retreive :type template_id: str, optional :param merit_status: the status you wish to filter by :type merit_status: str, optional :param email: a member's email you wish to filter by :type email: str, optional :param limit: the number of results you wish to retreive :type limit: int, optional :return: a list of merits issued by the Org, filtered as specified :rtype: list """ # valid merit_status values are: valid_statuses = ["Accepted", "Forfeited", "Pending", "Rejected", "Reported", "Revoked", "Transferred", "TransferredUnverified", "Unapproved", "UnapprovedUnverified", "Unverified"] # TODO: allow user to provide a list of statuses if merit_status and merit_status not in valid_statuses: raise exceptions.MeritStatusException(f"Merit Status ({merit_status}) is not valid. Valid statuses are: ({valid_statuses})") params = {"limit": limit,} if merit_status: params["merit_status"] = merit_status # TODO: allow user to provide a list of template_ids if template_id: params["merittemplate_id"] = template_id # TODO: allow user to provide a list of template_ids if email: params["recipient_email"] = email # init returned list merit_list = [] next_page = True while next_page: # call api, parse response response = self.get_api(f"/orgs/{self.org_id}/merits", params) if response.status_code == 200: data = response.json() else: logger.error(response.text) return merit_list # add merits into list merit_list.extend(data.get("merits", [])) # stop at user provided limit if len(merit_list) >= limit: return merit_list # loop again if more pages next_page = data.get("paging", {}).get("pageInfo", {}).get("hasNextPage", False) params["starting_after"] = data.get("paging", {}).get("cursors", {}).get("after", "") return merit_list
[docs] def get_template_pending_merits(self, template_id: str) -> list: """Return all proposed merits from site MeritTemplate. :param template_id: the ID of the MeritTemplate you wish to retreive :return: a `list` of all merits matching criteria """ return self.get_all_org_merits(template_id, "Unapproved")
[docs] def propose_merit(self, data: dict) -> str: """Propose a merit as specified. :param data: a full `dict` ready to be proposed as a merit :return: the ID of the proposed merit """ response = self.post_api("/merits/propose", data) if response.status_code == 200: id = response.json().get("id") if id: return id else: logger.error(response.text) return None
[docs] def send_merit(self, data: dict) -> str: """Send merit as specified. :param data: a full `dict` ready to be sent as a merit :return: the ID of the sent merit """ response = self.post_api("/merits/send", data) if response.status_code == 200: id = response.json().get("id") if id: return id else: logger.error(response.text) return None
[docs] def edit_merit(self, merit_id: str, data: dict) -> bool: """Edit specified merit. :param merit_id: the ID of the merit to edit :param data: a `dict` with the edits you wish to make :return: a `bool` indicating the status of the edit """ response = self.post_api(f"/merits/{merit_id}", data) if response.status_code == 200: return True else: logger.error(response.text) return False
[docs] def revoke_merit(self, merit_id: str, reason: str) -> bool: """Revoke specified merit. :param merit_id: the ID of the merit to edit :param reason: the reason for revoking the merit :return: a `bool` indicating the status of the revocation """ response = self.post_api(f"/merits/{merit_id}/revoke", {"revocationReason": reason}) if response.status_code == 200: return True else: logger.error(response.text) return False
[docs] def uuid_translation(self, merit_id: str, email: str) -> str: """Translated a given Member's email into a static QR URL :param merit_id: the ID of the merit to translate :param email: the email of the Member to translate :return: a `URL` of the static lookup link """ response = self.post_api(f"/uuidTranslation/merit/{merit_id}/email/{email}") if response.status_code == 200: return response.json().get("translationUrl") else: logger.error(response.text) return None
[docs] def update_email(self, merit_id: str, email: str) -> str: """Transfer merit to new email address. :param merit_id: the ID of the merit to transfer :param email: the new email for the Member :return: the ID of the new merit """ response = self.post_api(f"/merits/{merit_id}/transfer", {"newRecipientEmail": email}) if response.status_code == 200: new_merit = response.json().get("newMerit") if new_merit: return new_merit.get("id") return None else: logger.error(response.text) return None