Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'geoip2' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def reader_manager(self, action=None):
if action == 'open':
try:
self.reader = Reader(self.dbfile)
except FileNotFoundError:
self.logger.error("Could not find GeoLite2 DB! Downloading!")
result_status = self.download()
if result_status:
self.logger.error("Could not download GeoLite2 DB!!!, You may need to manually install it.")
exit(1)
else:
self.reader = Reader(self.dbfile)
else:
self.reader.close()
exit(-1)
import geoip2.database
import matplotlib as mpl
mpl.use("TkAgg")
import matplotlib.pyplot as plt
# Used to sort by country
cidr_dict = {}
# Locations for graphing
latlong = {"longitude": [], "latitude": []}
try:
asn_reader = geoip2.database.Reader("./data/GeoLite2-ASN.mmdb")
city_reader = geoip2.database.Reader("./data/GeoLite2-City.mmdb")
except:
# geoip files do not exist. Tell the user.
print(
"ERROR: You must download the geoip files GeoLite2-ASN.mmdb and GeoLite2-City.mmdb"
)
print(
"from https://dev.maxmind.com/geoip/geoip2/geolite2/ and put them in ./data/"
)
print("\nSteps:")
print("mkdir -p data; cd data")
print("\n# Get city data")
print(
"curl http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz --output GeoLite2-City.tar.gz"
)
print("tar -zxvf GeoLite2-City.tar.gz")
#!/usr/bin/env python
#coding: utf-8
import geoip2.database
import sys
from collections import defaultdict
reader = geoip2.database.Reader('./GeoLite2-Country.mmdb')
try:
with sys.stdin as file:
for rec in file:
try:
parts = rec.strip().split('|')
ip = parts[0]
from_country = None
try:
from_country = reader.country(ip).country.name
except geoip2.errors.AddressNotFoundError:
from_country = 'Unknown'
print '{}|{}'.format(from_country,'|'.join(parts))
# print '{} | {} {} {} | {} | {} | {}'.format(from_country, date[0], date[1], date[2][:4], ip, parts[1][1:13], parts[1][parts[1].find(':')+1:-1])
except:
def __init__(self, column, operator):
import lore # This is crazy, why is this statement necessary?
require(lore.dependencies.GEOIP)
import geoip2.database
if GeoIP.reader is None:
import lore.io
import glob
file = lore.io.download(
'http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz',
cache=True,
extract=True
)
path = [file for file in glob.glob(file.split('.')[0] + '*') if os.path.isdir(file)][0]
GeoIP.reader = geoip2.database.Reader(os.path.join(path, 'GeoLite2-City.mmdb'))
super(GeoIP, self).__init__(column)
self.operator = operator
self.name += '_' + self.operator
def check_geoip_files(storagedir):
try:
gi1 = geoip2.database.Reader(os.path.join(storagedir, 'GeoLite2-City.mmdb'))
gi2 = geoip2.database.Reader(os.path.join(storagedir, 'GeoLite2-ASN.mmdb'))
except IOError:
sys.stderr.write('ERROR: Required GeoIP files not present. Use "-d" flag to put them into %s.\n' % (storagedir))
sys.exit(2)
return (gi1, gi2)
def get_geo_country(self, ip):
country = None
err = None
if ip is not None and self._geoip:
for candidate in str(ip).split(","):
candidate = candidate.strip()
if candidate == "":
continue
try:
country = self._geoip.country(candidate).country.iso_code
except AddressNotFoundError, e:
pass
except Exception, e:
err = e
# Return the first known country.
if country is not None:
return country
if err is not None:
raise err
return country
import geoip2.database
import sys
from collections import defaultdict
reader = geoip2.database.Reader('./GeoLite2-Country.mmdb')
try:
with sys.stdin as file:
for rec in file:
try:
parts = rec.strip().split('|')
ip = parts[0]
from_country = None
try:
from_country = reader.country(ip).country.name
except geoip2.errors.AddressNotFoundError:
from_country = 'Unknown'
print '{}|{}'.format(from_country,'|'.join(parts))
# print '{} | {} {} {} | {} | {} | {}'.format(from_country, date[0], date[1], date[2][:4], ip, parts[1][1:13], parts[1][parts[1].find(':')+1:-1])
except:
pass # ignore all errors for one string
except KeyboardInterrupt:
exit(0)
except:
raise
lines = f.readlines()
for lineWithEnding in lines:
line = lineWithEnding.rstrip('\n').rstrip('\r')
parts = re.split(r'\t', line)
ip = parts[ipColumnFromZero]
try:
if len(ip) == 0:
print(line + "\t")
continue
response = reader.city(ip)
city = response.city.name
if city != None:
print(line + "\t" + city)
else:
print(line + "\t")
except geoip2.errors.AddressNotFoundError:
print(line + "\t")
country_db = path / (country or GEOIP_SETTINGS['GEOIP_COUNTRY'])
if country_db.is_file():
self._country = geoip2.database.Reader(str(country_db), mode=cache)
self._country_file = country_db
city_db = path / (city or GEOIP_SETTINGS['GEOIP_CITY'])
if city_db.is_file():
self._city = geoip2.database.Reader(str(city_db), mode=cache)
self._city_file = city_db
if not self._reader:
raise GeoIP2Exception('Could not load a database from %s.' % path)
elif path.is_file():
# Otherwise, some detective work will be needed to figure out
# whether the given database path is for the GeoIP country or city
# databases.
reader = geoip2.database.Reader(str(path), mode=cache)
db_type = reader.metadata().database_type
if db_type.endswith('City'):
# GeoLite City database detected.
self._city = reader
self._city_file = path
elif db_type.endswith('Country'):
# GeoIP Country database detected.
self._country = reader
self._country_file = path
else:
raise GeoIP2Exception('Unable to recognize database edition: %s' % db_type)
else:
raise GeoIP2Exception('GeoIP path must be a valid file or directory.')
from zentral.conf import settings
from zentral.core.probes.conf import all_probes
from zentral.core.incidents.events import build_incident_events
from zentral.core.incidents.utils import update_or_create_open_incident, update_or_create_open_machine_incident
logger = logging.getLogger('zentral.core.events.pipeline')
city_db_reader = None
try:
city_db_path = settings["events"]["geoip2_city_db"]
except KeyError:
pass
else:
try:
city_db_reader = geoip2.database.Reader(city_db_path)
except Exception:
logger.info("Could not open Geolite2 city database")
def get_city(ip):
try:
return city_db_reader.city(ip)
except Exception:
pass
def enrich_event(event):
if isinstance(event, dict):
event = event_from_event_d(event)
if event.metadata.request and event.metadata.request.ip and not event.metadata.request.geo and city_db_reader:
city = get_city(event.metadata.request.ip)