diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/news | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/news')
-rw-r--r-- | contrib/python/Twisted/py2/twisted/news/__init__.py | 6 | ||||
-rw-r--r-- | contrib/python/Twisted/py2/twisted/news/database.py | 1046 | ||||
-rw-r--r-- | contrib/python/Twisted/py2/twisted/news/news.py | 92 | ||||
-rw-r--r-- | contrib/python/Twisted/py2/twisted/news/nntp.py | 1050 | ||||
-rw-r--r-- | contrib/python/Twisted/py2/twisted/news/tap.py | 143 |
5 files changed, 2337 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/news/__init__.py b/contrib/python/Twisted/py2/twisted/news/__init__.py new file mode 100644 index 0000000000..b31924820a --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/news/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Twisted News: A NNTP-based news service. +""" diff --git a/contrib/python/Twisted/py2/twisted/news/database.py b/contrib/python/Twisted/py2/twisted/news/database.py new file mode 100644 index 0000000000..5a25dd37b8 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/news/database.py @@ -0,0 +1,1046 @@ +# -*- test-case-name: twisted.news.test -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +News server backend implementations. +""" + +import getpass, pickle, time, socket +import os +import StringIO +from hashlib import md5 +from email.Message import Message +from email.Generator import Generator +from zope.interface import implementer, Interface + +from twisted.news.nntp import NNTPError +from twisted.mail import smtp +from twisted.internet import defer +from twisted.enterprise import adbapi +from twisted.persisted import dirdbm + + + +ERR_NOGROUP, ERR_NOARTICLE = range(2, 4) # XXX - put NNTP values here (I guess?) + +OVERVIEW_FMT = [ + 'Subject', 'From', 'Date', 'Message-ID', 'References', + 'Bytes', 'Lines', 'Xref' +] + +def hexdigest(md5): #XXX: argh. 1.5.2 doesn't have this. + return ''.join(map(lambda x: hex(ord(x))[2:], md5.digest())) + +class Article: + def __init__(self, head, body): + self.body = body + self.headers = {} + header = None + for line in head.split('\r\n'): + if line[0] in ' \t': + i = list(self.headers[header]) + i[1] += '\r\n' + line + else: + i = line.split(': ', 1) + header = i[0].lower() + self.headers[header] = tuple(i) + + if not self.getHeader('Message-ID'): + s = str(time.time()) + self.body + id = hexdigest(md5(s)) + '@' + socket.gethostname() + self.putHeader('Message-ID', '<%s>' % id) + + if not self.getHeader('Bytes'): + self.putHeader('Bytes', str(len(self.body))) + + if not self.getHeader('Lines'): + self.putHeader('Lines', str(self.body.count('\n'))) + + if not self.getHeader('Date'): + self.putHeader('Date', time.ctime(time.time())) + + + def getHeader(self, header): + h = header.lower() + if h in self.headers: + return self.headers[h][1] + else: + return '' + + + def putHeader(self, header, value): + self.headers[header.lower()] = (header, value) + + + def textHeaders(self): + headers = [] + for i in self.headers.values(): + headers.append('%s: %s' % i) + return '\r\n'.join(headers) + '\r\n' + + def overview(self): + xover = [] + for i in OVERVIEW_FMT: + xover.append(self.getHeader(i)) + return xover + + +class NewsServerError(Exception): + pass + + +class INewsStorage(Interface): + """ + An interface for storing and requesting news articles + """ + + def listRequest(): + """ + Returns a deferred whose callback will be passed a list of 4-tuples + containing (name, max index, min index, flags) for each news group + """ + + + def subscriptionRequest(): + """ + Returns a deferred whose callback will be passed the list of + recommended subscription groups for new server users + """ + + + def postRequest(message): + """ + Returns a deferred whose callback will be invoked if 'message' + is successfully posted to one or more specified groups and + whose errback will be invoked otherwise. + """ + + + def overviewRequest(): + """ + Returns a deferred whose callback will be passed the a list of + headers describing this server's overview format. + """ + + + def xoverRequest(group, low, high): + """ + Returns a deferred whose callback will be passed a list of xover + headers for the given group over the given range. If low is None, + the range starts at the first article. If high is None, the range + ends at the last article. + """ + + + def xhdrRequest(group, low, high, header): + """ + Returns a deferred whose callback will be passed a list of XHDR data + for the given group over the given range. If low is None, + the range starts at the first article. If high is None, the range + ends at the last article. + """ + + + def listGroupRequest(group): + """ + Returns a deferred whose callback will be passed a two-tuple of + (group name, [article indices]) + """ + + + def groupRequest(group): + """ + Returns a deferred whose callback will be passed a five-tuple of + (group name, article count, highest index, lowest index, group flags) + """ + + + def articleExistsRequest(id): + """ + Returns a deferred whose callback will be passed with a true value + if a message with the specified Message-ID exists in the database + and with a false value otherwise. + """ + + + def articleRequest(group, index, id = None): + """ + Returns a deferred whose callback will be passed a file-like object + containing the full article text (headers and body) for the article + of the specified index in the specified group, and whose errback + will be invoked if the article or group does not exist. If id is + not None, index is ignored and the article with the given Message-ID + will be returned instead, along with its index in the specified + group. + """ + + + def headRequest(group, index): + """ + Returns a deferred whose callback will be passed the header for + the article of the specified index in the specified group, and + whose errback will be invoked if the article or group does not + exist. + """ + + + def bodyRequest(group, index): + """ + Returns a deferred whose callback will be passed the body for + the article of the specified index in the specified group, and + whose errback will be invoked if the article or group does not + exist. + """ + +class NewsStorage: + """ + Backwards compatibility class -- There is no reason to inherit from this, + just implement INewsStorage instead. + """ + def listRequest(self): + raise NotImplementedError() + def subscriptionRequest(self): + raise NotImplementedError() + def postRequest(self, message): + raise NotImplementedError() + def overviewRequest(self): + return defer.succeed(OVERVIEW_FMT) + def xoverRequest(self, group, low, high): + raise NotImplementedError() + def xhdrRequest(self, group, low, high, header): + raise NotImplementedError() + def listGroupRequest(self, group): + raise NotImplementedError() + def groupRequest(self, group): + raise NotImplementedError() + def articleExistsRequest(self, id): + raise NotImplementedError() + def articleRequest(self, group, index, id = None): + raise NotImplementedError() + def headRequest(self, group, index): + raise NotImplementedError() + def bodyRequest(self, group, index): + raise NotImplementedError() + + + +class _ModerationMixin: + """ + Storage implementations can inherit from this class to get the easy-to-use + C{notifyModerators} method which will take care of sending messages which + require moderation to a list of moderators. + """ + sendmail = staticmethod(smtp.sendmail) + + def notifyModerators(self, moderators, article): + """ + Send an article to a list of group moderators to be moderated. + + @param moderators: A C{list} of C{str} giving RFC 2821 addresses of + group moderators to notify. + + @param article: The article requiring moderation. + @type article: L{Article} + + @return: A L{Deferred} which fires with the result of sending the email. + """ + # Moderated postings go through as long as they have an Approved + # header, regardless of what the value is + group = article.getHeader('Newsgroups') + subject = article.getHeader('Subject') + + if self._sender is None: + # This case should really go away. This isn't a good default. + sender = 'twisted-news@' + socket.gethostname() + else: + sender = self._sender + + msg = Message() + msg['Message-ID'] = smtp.messageid() + msg['From'] = sender + msg['To'] = ', '.join(moderators) + msg['Subject'] = 'Moderate new %s message: %s' % (group, subject) + msg['Content-Type'] = 'message/rfc822' + + payload = Message() + for header, value in article.headers.values(): + payload.add_header(header, value) + payload.set_payload(article.body) + + msg.attach(payload) + + out = StringIO.StringIO() + gen = Generator(out, False) + gen.flatten(msg) + msg = out.getvalue() + + return self.sendmail(self._mailhost, sender, moderators, msg) + + + +@implementer(INewsStorage) +class PickleStorage(_ModerationMixin): + """ + A trivial NewsStorage implementation using pickles + + Contains numerous flaws and is generally unsuitable for any + real applications. Consider yourself warned! + """ + sharedDBs = {} + + def __init__(self, filename, groups=None, moderators=(), + mailhost=None, sender=None): + """ + @param mailhost: A C{str} giving the mail exchange host which will + accept moderation emails from this server. Must accept emails + destined for any address specified as a moderator. + + @param sender: A C{str} giving the address which will be used as the + sender of any moderation email generated by this server. + """ + self.datafile = filename + self.load(filename, groups, moderators) + self._mailhost = mailhost + self._sender = sender + + + def getModerators(self, groups): + # first see if any groups are moderated. if so, nothing gets posted, + # but the whole messages gets forwarded to the moderator address + moderators = [] + for group in groups: + moderators.extend(self.db['moderators'].get(group, None)) + return filter(None, moderators) + + + def listRequest(self): + "Returns a list of 4-tuples: (name, max index, min index, flags)" + l = self.db['groups'] + r = [] + for i in l: + if len(self.db[i].keys()): + low = min(self.db[i].keys()) + high = max(self.db[i].keys()) + 1 + else: + low = high = 0 + if i in self.db['moderators']: + flags = 'm' + else: + flags = 'y' + r.append((i, high, low, flags)) + return defer.succeed(r) + + def subscriptionRequest(self): + return defer.succeed(['alt.test']) + + def postRequest(self, message): + cleave = message.find('\r\n\r\n') + headers, article = message[:cleave], message[cleave + 4:] + + a = Article(headers, article) + groups = a.getHeader('Newsgroups').split() + xref = [] + + # Check moderated status + moderators = self.getModerators(groups) + if moderators and not a.getHeader('Approved'): + return self.notifyModerators(moderators, a) + + for group in groups: + if group in self.db: + if len(self.db[group].keys()): + index = max(self.db[group].keys()) + 1 + else: + index = 1 + xref.append((group, str(index))) + self.db[group][index] = a + + if len(xref) == 0: + return defer.fail(None) + + a.putHeader('Xref', '%s %s' % ( + socket.gethostname().split()[0], + ''.join(map(lambda x: ':'.join(x), xref)) + )) + + self.flush() + return defer.succeed(None) + + + def overviewRequest(self): + return defer.succeed(OVERVIEW_FMT) + + + def xoverRequest(self, group, low, high): + if group not in self.db: + return defer.succeed([]) + r = [] + for i in self.db[group].keys(): + if (low is None or i >= low) and (high is None or i <= high): + r.append([str(i)] + self.db[group][i].overview()) + return defer.succeed(r) + + + def xhdrRequest(self, group, low, high, header): + if group not in self.db: + return defer.succeed([]) + r = [] + for i in self.db[group].keys(): + if low is None or i >= low and high is None or i <= high: + r.append((i, self.db[group][i].getHeader(header))) + return defer.succeed(r) + + + def listGroupRequest(self, group): + if group in self.db: + return defer.succeed((group, self.db[group].keys())) + else: + return defer.fail(None) + + def groupRequest(self, group): + if group in self.db: + if len(self.db[group].keys()): + num = len(self.db[group].keys()) + low = min(self.db[group].keys()) + high = max(self.db[group].keys()) + else: + num = low = high = 0 + flags = 'y' + return defer.succeed((group, num, high, low, flags)) + else: + return defer.fail(ERR_NOGROUP) + + + def articleExistsRequest(self, id): + for group in self.db['groups']: + for a in self.db[group].values(): + if a.getHeader('Message-ID') == id: + return defer.succeed(1) + return defer.succeed(0) + + + def articleRequest(self, group, index, id = None): + if id is not None: + raise NotImplementedError + + if group in self.db: + if index in self.db[group]: + a = self.db[group][index] + return defer.succeed(( + index, + a.getHeader('Message-ID'), + StringIO.StringIO(a.textHeaders() + '\r\n' + a.body) + )) + else: + return defer.fail(ERR_NOARTICLE) + else: + return defer.fail(ERR_NOGROUP) + + + def headRequest(self, group, index): + if group in self.db: + if index in self.db[group]: + a = self.db[group][index] + return defer.succeed((index, a.getHeader('Message-ID'), a.textHeaders())) + else: + return defer.fail(ERR_NOARTICLE) + else: + return defer.fail(ERR_NOGROUP) + + + def bodyRequest(self, group, index): + if group in self.db: + if index in self.db[group]: + a = self.db[group][index] + return defer.succeed((index, a.getHeader('Message-ID'), StringIO.StringIO(a.body))) + else: + return defer.fail(ERR_NOARTICLE) + else: + return defer.fail(ERR_NOGROUP) + + + def flush(self): + with open(self.datafile, 'w') as f: + pickle.dump(self.db, f) + + + def load(self, filename, groups = None, moderators = ()): + if filename in PickleStorage.sharedDBs: + self.db = PickleStorage.sharedDBs[filename] + else: + try: + with open(filename) as f: + self.db = pickle.load(f) + PickleStorage.sharedDBs[filename] = self.db + except IOError: + self.db = PickleStorage.sharedDBs[filename] = {} + self.db['groups'] = groups + if groups is not None: + for i in groups: + self.db[i] = {} + self.db['moderators'] = dict(moderators) + self.flush() + + +class Group: + name = None + flags = '' + minArticle = 1 + maxArticle = 0 + articles = None + + def __init__(self, name, flags = 'y'): + self.name = name + self.flags = flags + self.articles = {} + + +@implementer(INewsStorage) +class NewsShelf(_ModerationMixin): + """ + A NewStorage implementation using Twisted's dirdbm persistence module. + """ + def __init__(self, mailhost, path, sender=None): + """ + @param mailhost: A C{str} giving the mail exchange host which will + accept moderation emails from this server. Must accept emails + destined for any address specified as a moderator. + + @param sender: A C{str} giving the address which will be used as the + sender of any moderation email generated by this server. + """ + self.path = path + self._mailhost = self.mailhost = mailhost + self._sender = sender + + if not os.path.exists(path): + os.mkdir(path) + + self.dbm = dirdbm.Shelf(os.path.join(path, "newsshelf")) + if not len(self.dbm.keys()): + self.initialize() + + + def initialize(self): + # A dictionary of group name/Group instance items + self.dbm['groups'] = dirdbm.Shelf(os.path.join(self.path, 'groups')) + + # A dictionary of group name/email address + self.dbm['moderators'] = dirdbm.Shelf(os.path.join(self.path, 'moderators')) + + # A list of group names + self.dbm['subscriptions'] = [] + + # A dictionary of MessageID strings/xref lists + self.dbm['Message-IDs'] = dirdbm.Shelf(os.path.join(self.path, 'Message-IDs')) + + + def addGroup(self, name, flags): + self.dbm['groups'][name] = Group(name, flags) + + + def addSubscription(self, name): + self.dbm['subscriptions'] = self.dbm['subscriptions'] + [name] + + + def addModerator(self, group, email): + self.dbm['moderators'][group] = email + + + def listRequest(self): + result = [] + for g in self.dbm['groups'].values(): + result.append((g.name, g.maxArticle, g.minArticle, g.flags)) + return defer.succeed(result) + + + def subscriptionRequest(self): + return defer.succeed(self.dbm['subscriptions']) + + + def getModerator(self, groups): + # first see if any groups are moderated. if so, nothing gets posted, + # but the whole messages gets forwarded to the moderator address + for group in groups: + try: + return self.dbm['moderators'][group] + except KeyError: + pass + return None + + + def notifyModerator(self, moderator, article): + """ + Notify a single moderator about an article requiring moderation. + + C{notifyModerators} should be preferred. + """ + return self.notifyModerators([moderator], article) + + + def postRequest(self, message): + cleave = message.find('\r\n\r\n') + headers, article = message[:cleave], message[cleave + 4:] + + article = Article(headers, article) + groups = article.getHeader('Newsgroups').split() + xref = [] + + # Check for moderated status + moderator = self.getModerator(groups) + if moderator and not article.getHeader('Approved'): + return self.notifyModerators([moderator], article) + + + for group in groups: + try: + g = self.dbm['groups'][group] + except KeyError: + pass + else: + index = g.maxArticle + 1 + g.maxArticle += 1 + g.articles[index] = article + xref.append((group, str(index))) + self.dbm['groups'][group] = g + + if not xref: + return defer.fail(NewsServerError("No groups carried: " + ' '.join(groups))) + + article.putHeader('Xref', '%s %s' % (socket.gethostname().split()[0], ' '.join(map(lambda x: ':'.join(x), xref)))) + self.dbm['Message-IDs'][article.getHeader('Message-ID')] = xref + return defer.succeed(None) + + + def overviewRequest(self): + return defer.succeed(OVERVIEW_FMT) + + + def xoverRequest(self, group, low, high): + if group not in self.dbm['groups']: + return defer.succeed([]) + + if low is None: + low = 0 + if high is None: + high = self.dbm['groups'][group].maxArticle + r = [] + for i in range(low, high + 1): + if i in self.dbm['groups'][group].articles: + r.append([str(i)] + self.dbm['groups'][group].articles[i].overview()) + return defer.succeed(r) + + + def xhdrRequest(self, group, low, high, header): + if group not in self.dbm['groups']: + return defer.succeed([]) + + if low is None: + low = 0 + if high is None: + high = self.dbm['groups'][group].maxArticle + r = [] + for i in range(low, high + 1): + if i in self.dbm['groups'][group].articles: + r.append((i, self.dbm['groups'][group].articles[i].getHeader(header))) + return defer.succeed(r) + + + def listGroupRequest(self, group): + if group in self.dbm['groups']: + return defer.succeed((group, self.dbm['groups'][group].articles.keys())) + return defer.fail(NewsServerError("No such group: " + group)) + + + def groupRequest(self, group): + try: + g = self.dbm['groups'][group] + except KeyError: + return defer.fail(NewsServerError("No such group: " + group)) + else: + flags = g.flags + low = g.minArticle + high = g.maxArticle + num = high - low + 1 + return defer.succeed((group, num, high, low, flags)) + + + def articleExistsRequest(self, id): + return defer.succeed(id in self.dbm['Message-IDs']) + + + def articleRequest(self, group, index, id = None): + if id is not None: + try: + xref = self.dbm['Message-IDs'][id] + except KeyError: + return defer.fail(NewsServerError("No such article: " + id)) + else: + group, index = xref[0] + index = int(index) + + try: + a = self.dbm['groups'][group].articles[index] + except KeyError: + return defer.fail(NewsServerError("No such group: " + group)) + else: + return defer.succeed(( + index, + a.getHeader('Message-ID'), + StringIO.StringIO(a.textHeaders() + '\r\n' + a.body) + )) + + + def headRequest(self, group, index, id = None): + if id is not None: + try: + xref = self.dbm['Message-IDs'][id] + except KeyError: + return defer.fail(NewsServerError("No such article: " + id)) + else: + group, index = xref[0] + index = int(index) + + try: + a = self.dbm['groups'][group].articles[index] + except KeyError: + return defer.fail(NewsServerError("No such group: " + group)) + else: + return defer.succeed((index, a.getHeader('Message-ID'), a.textHeaders())) + + + def bodyRequest(self, group, index, id = None): + if id is not None: + try: + xref = self.dbm['Message-IDs'][id] + except KeyError: + return defer.fail(NewsServerError("No such article: " + id)) + else: + group, index = xref[0] + index = int(index) + + try: + a = self.dbm['groups'][group].articles[index] + except KeyError: + return defer.fail(NewsServerError("No such group: " + group)) + else: + return defer.succeed((index, a.getHeader('Message-ID'), StringIO.StringIO(a.body))) + + +@implementer(INewsStorage) +class NewsStorageAugmentation: + """ + A NewsStorage implementation using Twisted's asynchronous DB-API + """ + schema = """ + + CREATE TABLE groups ( + group_id SERIAL, + name VARCHAR(80) NOT NULL, + + flags INTEGER DEFAULT 0 NOT NULL + ); + + CREATE UNIQUE INDEX group_id_index ON groups (group_id); + CREATE UNIQUE INDEX name_id_index ON groups (name); + + CREATE TABLE articles ( + article_id SERIAL, + message_id TEXT, + + header TEXT, + body TEXT + ); + + CREATE UNIQUE INDEX article_id_index ON articles (article_id); + CREATE UNIQUE INDEX article_message_index ON articles (message_id); + + CREATE TABLE postings ( + group_id INTEGER, + article_id INTEGER, + article_index INTEGER NOT NULL + ); + + CREATE UNIQUE INDEX posting_article_index ON postings (article_id); + + CREATE TABLE subscriptions ( + group_id INTEGER + ); + + CREATE TABLE overview ( + header TEXT + ); + """ + + def __init__(self, info): + self.info = info + self.dbpool = adbapi.ConnectionPool(**self.info) + + + def __setstate__(self, state): + self.__dict__ = state + self.info['password'] = getpass.getpass('Database password for %s: ' % (self.info['user'],)) + self.dbpool = adbapi.ConnectionPool(**self.info) + del self.info['password'] + + + def listRequest(self): + # COALESCE may not be totally portable + # it is shorthand for + # CASE WHEN (first parameter) IS NOT NULL then (first parameter) ELSE (second parameter) END + sql = """ + SELECT groups.name, + COALESCE(MAX(postings.article_index), 0), + COALESCE(MIN(postings.article_index), 0), + groups.flags + FROM groups LEFT OUTER JOIN postings + ON postings.group_id = groups.group_id + GROUP BY groups.name, groups.flags + ORDER BY groups.name + """ + return self.dbpool.runQuery(sql) + + + def subscriptionRequest(self): + sql = """ + SELECT groups.name FROM groups,subscriptions WHERE groups.group_id = subscriptions.group_id + """ + return self.dbpool.runQuery(sql) + + + def postRequest(self, message): + cleave = message.find('\r\n\r\n') + headers, article = message[:cleave], message[cleave + 4:] + article = Article(headers, article) + return self.dbpool.runInteraction(self._doPost, article) + + + def _doPost(self, transaction, article): + # Get the group ids + groups = article.getHeader('Newsgroups').split() + if not len(groups): + raise NNTPError('Missing Newsgroups header') + + sql = """ + SELECT name, group_id FROM groups + WHERE name IN (%s) + """ % (', '.join([("'%s'" % (adbapi.safe(group),)) for group in groups]),) + + transaction.execute(sql) + result = transaction.fetchall() + + # No relevant groups, bye bye! + if not len(result): + raise NNTPError('None of groups in Newsgroup header carried') + + # Got some groups, now find the indices this article will have in each + sql = """ + SELECT groups.group_id, COALESCE(MAX(postings.article_index), 0) + 1 + FROM groups LEFT OUTER JOIN postings + ON postings.group_id = groups.group_id + WHERE groups.group_id IN (%s) + GROUP BY groups.group_id + """ % (', '.join([("%d" % (id,)) for (group, id) in result]),) + + transaction.execute(sql) + indices = transaction.fetchall() + + if not len(indices): + raise NNTPError('Internal server error - no indices found') + + # Associate indices with group names + gidToName = dict([(b, a) for (a, b) in result]) + gidToIndex = dict(indices) + + nameIndex = [] + for i in gidToName: + nameIndex.append((gidToName[i], gidToIndex[i])) + + # Build xrefs + xrefs = socket.gethostname().split()[0] + xrefs = xrefs + ' ' + ' '.join([('%s:%d' % (group, id)) for (group, id) in nameIndex]) + article.putHeader('Xref', xrefs) + + # Hey! The article is ready to be posted! God damn f'in finally. + sql = """ + INSERT INTO articles (message_id, header, body) + VALUES ('%s', '%s', '%s') + """ % ( + adbapi.safe(article.getHeader('Message-ID')), + adbapi.safe(article.textHeaders()), + adbapi.safe(article.body) + ) + + transaction.execute(sql) + + # Now update the posting to reflect the groups to which this belongs + for gid in gidToName: + sql = """ + INSERT INTO postings (group_id, article_id, article_index) + VALUES (%d, (SELECT last_value FROM articles_article_id_seq), %d) + """ % (gid, gidToIndex[gid]) + transaction.execute(sql) + + return len(nameIndex) + + + def overviewRequest(self): + sql = """ + SELECT header FROM overview + """ + return self.dbpool.runQuery(sql).addCallback(lambda result: [header[0] for header in result]) + + + def xoverRequest(self, group, low, high): + sql = """ + SELECT postings.article_index, articles.header + FROM articles,postings,groups + WHERE postings.group_id = groups.group_id + AND groups.name = '%s' + AND postings.article_id = articles.article_id + %s + %s + """ % ( + adbapi.safe(group), + low is not None and "AND postings.article_index >= %d" % (low,) or "", + high is not None and "AND postings.article_index <= %d" % (high,) or "" + ) + + return self.dbpool.runQuery(sql).addCallback( + lambda results: [ + [id] + Article(header, None).overview() for (id, header) in results + ] + ) + + + def xhdrRequest(self, group, low, high, header): + sql = """ + SELECT articles.header + FROM groups,postings,articles + WHERE groups.name = '%s' AND postings.group_id = groups.group_id + AND postings.article_index >= %d + AND postings.article_index <= %d + """ % (adbapi.safe(group), low, high) + + return self.dbpool.runQuery(sql).addCallback( + lambda results: [ + (i, Article(h, None).getHeader(h)) for (i, h) in results + ] + ) + + + def listGroupRequest(self, group): + sql = """ + SELECT postings.article_index FROM postings,groups + WHERE postings.group_id = groups.group_id + AND groups.name = '%s' + """ % (adbapi.safe(group),) + + return self.dbpool.runQuery(sql).addCallback( + lambda results, group = group: (group, [res[0] for res in results]) + ) + + + def groupRequest(self, group): + sql = """ + SELECT groups.name, + COUNT(postings.article_index), + COALESCE(MAX(postings.article_index), 0), + COALESCE(MIN(postings.article_index), 0), + groups.flags + FROM groups LEFT OUTER JOIN postings + ON postings.group_id = groups.group_id + WHERE groups.name = '%s' + GROUP BY groups.name, groups.flags + """ % (adbapi.safe(group),) + + return self.dbpool.runQuery(sql).addCallback( + lambda results: tuple(results[0]) + ) + + + def articleExistsRequest(self, id): + sql = """ + SELECT COUNT(message_id) FROM articles + WHERE message_id = '%s' + """ % (adbapi.safe(id),) + + return self.dbpool.runQuery(sql).addCallback( + lambda result: bool(result[0][0]) + ) + + + def articleRequest(self, group, index, id = None): + if id is not None: + sql = """ + SELECT postings.article_index, articles.message_id, articles.header, articles.body + FROM groups,postings LEFT OUTER JOIN articles + ON articles.message_id = '%s' + WHERE groups.name = '%s' + AND groups.group_id = postings.group_id + """ % (adbapi.safe(id), adbapi.safe(group)) + else: + sql = """ + SELECT postings.article_index, articles.message_id, articles.header, articles.body + FROM groups,articles LEFT OUTER JOIN postings + ON postings.article_id = articles.article_id + WHERE postings.article_index = %d + AND postings.group_id = groups.group_id + AND groups.name = '%s' + """ % (index, adbapi.safe(group)) + + return self.dbpool.runQuery(sql).addCallback( + lambda result: ( + result[0][0], + result[0][1], + StringIO.StringIO(result[0][2] + '\r\n' + result[0][3]) + ) + ) + + + def headRequest(self, group, index): + sql = """ + SELECT postings.article_index, articles.message_id, articles.header + FROM groups,articles LEFT OUTER JOIN postings + ON postings.article_id = articles.article_id + WHERE postings.article_index = %d + AND postings.group_id = groups.group_id + AND groups.name = '%s' + """ % (index, adbapi.safe(group)) + + return self.dbpool.runQuery(sql).addCallback(lambda result: result[0]) + + + def bodyRequest(self, group, index): + sql = """ + SELECT postings.article_index, articles.message_id, articles.body + FROM groups,articles LEFT OUTER JOIN postings + ON postings.article_id = articles.article_id + WHERE postings.article_index = %d + AND postings.group_id = groups.group_id + AND groups.name = '%s' + """ % (index, adbapi.safe(group)) + + return self.dbpool.runQuery(sql).addCallback( + lambda result: result[0] + ).addCallback( + # result is a tuple of (index, id, body) + lambda result: (result[0], result[1], StringIO.StringIO(result[2])) + ) + +#### +#### XXX - make these static methods some day +#### +def makeGroupSQL(groups): + res = '' + for g in groups: + res = res + """\n INSERT INTO groups (name) VALUES ('%s');\n""" % (adbapi.safe(g),) + return res + + +def makeOverviewSQL(): + res = '' + for o in OVERVIEW_FMT: + res = res + """\n INSERT INTO overview (header) VALUES ('%s');\n""" % (adbapi.safe(o),) + return res diff --git a/contrib/python/Twisted/py2/twisted/news/news.py b/contrib/python/Twisted/py2/twisted/news/news.py new file mode 100644 index 0000000000..6f1da245a7 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/news/news.py @@ -0,0 +1,92 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + + +""" +Maintainer: Jp Calderone +""" + +from __future__ import print_function + +from twisted.news import nntp +from twisted.internet import protocol, reactor + +import time + +class NNTPFactory(protocol.ServerFactory): + """A factory for NNTP server protocols.""" + + protocol = nntp.NNTPServer + + def __init__(self, backend): + self.backend = backend + + def buildProtocol(self, connection): + p = self.protocol() + p.factory = self + return p + + +class UsenetClientFactory(protocol.ClientFactory): + def __init__(self, groups, storage): + self.lastChecks = {} + self.groups = groups + self.storage = storage + + + def clientConnectionLost(self, connector, reason): + pass + + + def clientConnectionFailed(self, connector, reason): + print('Connection failed: ', reason) + + + def updateChecks(self, addr): + self.lastChecks[addr] = time.mktime(time.gmtime()) + + + def buildProtocol(self, addr): + last = self.lastChecks.setdefault(addr, time.mktime(time.gmtime()) - (60 * 60 * 24 * 7)) + p = nntp.UsenetClientProtocol(self.groups, last, self.storage) + p.factory = self + return p + + +# XXX - Maybe this inheritance doesn't make so much sense? +class UsenetServerFactory(NNTPFactory): + """A factory for NNTP Usenet server protocols.""" + + protocol = nntp.NNTPServer + + def __init__(self, backend, remoteHosts = None, updatePeriod = 60): + NNTPFactory.__init__(self, backend) + self.updatePeriod = updatePeriod + self.remoteHosts = remoteHosts or [] + self.clientFactory = UsenetClientFactory(self.remoteHosts, self.backend) + + + def startFactory(self): + self._updateCall = reactor.callLater(0, self.syncWithRemotes) + + + def stopFactory(self): + if self._updateCall: + self._updateCall.cancel() + self._updateCall = None + + + def buildProtocol(self, connection): + p = self.protocol() + p.factory = self + return p + + + def syncWithRemotes(self): + for remote in self.remoteHosts: + reactor.connectTCP(remote, 119, self.clientFactory) + self._updateCall = reactor.callLater(self.updatePeriod, self.syncWithRemotes) + + +# backwards compatibility +Factory = UsenetServerFactory diff --git a/contrib/python/Twisted/py2/twisted/news/nntp.py b/contrib/python/Twisted/py2/twisted/news/nntp.py new file mode 100644 index 0000000000..6c5d12b277 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/news/nntp.py @@ -0,0 +1,1050 @@ +# -*- test-case-name: twisted.news.test.test_nntp -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + + +""" +NNTP protocol support. + +The following protocol commands are currently understood:: + + LIST LISTGROUP XOVER XHDR + POST GROUP ARTICLE STAT HEAD + BODY NEXT MODE STREAM MODE READER SLAVE + LAST QUIT HELP IHAVE XPATH + XINDEX XROVER TAKETHIS CHECK + +The following protocol commands require implementation:: + + NEWNEWS + XGTITLE XPAT + XTHREAD AUTHINFO NEWGROUPS + + +Other desired features: + + - A real backend + - More robust client input handling + - A control protocol +""" + +from __future__ import print_function + +import time + +from twisted.protocols import basic +from twisted.python import log + +def parseRange(text): + articles = text.split('-') + if len(articles) == 1: + try: + a = int(articles[0]) + return a, a + except ValueError: + return None, None + elif len(articles) == 2: + try: + if len(articles[0]): + l = int(articles[0]) + else: + l = None + if len(articles[1]): + h = int(articles[1]) + else: + h = None + except ValueError: + return None, None + return l, h + + +def extractCode(line): + line = line.split(' ', 1) + if len(line) != 2: + return None + try: + return int(line[0]), line[1] + except ValueError: + return None + + +class NNTPError(Exception): + def __init__(self, string): + self.string = string + + def __str__(self): + return 'NNTPError: %s' % self.string + + +class NNTPClient(basic.LineReceiver): + MAX_COMMAND_LENGTH = 510 + + def __init__(self): + self.currentGroup = None + + self._state = [] + self._error = [] + self._inputBuffers = [] + self._responseCodes = [] + self._responseHandlers = [] + + self._postText = [] + + self._newState(self._statePassive, None, self._headerInitial) + + + def gotAllGroups(self, groups): + "Override for notification when fetchGroups() action is completed" + + + def getAllGroupsFailed(self, error): + "Override for notification when fetchGroups() action fails" + + + def gotOverview(self, overview): + "Override for notification when fetchOverview() action is completed" + + + def getOverviewFailed(self, error): + "Override for notification when fetchOverview() action fails" + + + def gotSubscriptions(self, subscriptions): + "Override for notification when fetchSubscriptions() action is completed" + + + def getSubscriptionsFailed(self, error): + "Override for notification when fetchSubscriptions() action fails" + + + def gotGroup(self, group): + "Override for notification when fetchGroup() action is completed" + + + def getGroupFailed(self, error): + "Override for notification when fetchGroup() action fails" + + + def gotArticle(self, article): + "Override for notification when fetchArticle() action is completed" + + + def getArticleFailed(self, error): + "Override for notification when fetchArticle() action fails" + + + def gotHead(self, head): + "Override for notification when fetchHead() action is completed" + + + def getHeadFailed(self, error): + "Override for notification when fetchHead() action fails" + + + def gotBody(self, info): + "Override for notification when fetchBody() action is completed" + + + def getBodyFailed(self, body): + "Override for notification when fetchBody() action fails" + + + def postedOk(self): + "Override for notification when postArticle() action is successful" + + + def postFailed(self, error): + "Override for notification when postArticle() action fails" + + + def gotXHeader(self, headers): + "Override for notification when getXHeader() action is successful" + + + def getXHeaderFailed(self, error): + "Override for notification when getXHeader() action fails" + + + def gotNewNews(self, news): + "Override for notification when getNewNews() action is successful" + + + def getNewNewsFailed(self, error): + "Override for notification when getNewNews() action fails" + + + def gotNewGroups(self, groups): + "Override for notification when getNewGroups() action is successful" + + + def getNewGroupsFailed(self, error): + "Override for notification when getNewGroups() action fails" + + + def setStreamSuccess(self): + "Override for notification when setStream() action is successful" + + + def setStreamFailed(self, error): + "Override for notification when setStream() action fails" + + + def fetchGroups(self): + """ + Request a list of all news groups from the server. gotAllGroups() + is called on success, getGroupsFailed() on failure + """ + self.sendLine('LIST') + self._newState(self._stateList, self.getAllGroupsFailed) + + + def fetchOverview(self): + """ + Request the overview format from the server. gotOverview() is called + on success, getOverviewFailed() on failure + """ + self.sendLine('LIST OVERVIEW.FMT') + self._newState(self._stateOverview, self.getOverviewFailed) + + + def fetchSubscriptions(self): + """ + Request a list of the groups it is recommended a new user subscribe to. + gotSubscriptions() is called on success, getSubscriptionsFailed() on + failure + """ + self.sendLine('LIST SUBSCRIPTIONS') + self._newState(self._stateSubscriptions, self.getSubscriptionsFailed) + + + def fetchGroup(self, group): + """ + Get group information for the specified group from the server. gotGroup() + is called on success, getGroupFailed() on failure. + """ + self.sendLine('GROUP %s' % (group,)) + self._newState(None, self.getGroupFailed, self._headerGroup) + + + def fetchHead(self, index = ''): + """ + Get the header for the specified article (or the currently selected + article if index is '') from the server. gotHead() is called on + success, getHeadFailed() on failure + """ + self.sendLine('HEAD %s' % (index,)) + self._newState(self._stateHead, self.getHeadFailed) + + + def fetchBody(self, index = ''): + """ + Get the body for the specified article (or the currently selected + article if index is '') from the server. gotBody() is called on + success, getBodyFailed() on failure + """ + self.sendLine('BODY %s' % (index,)) + self._newState(self._stateBody, self.getBodyFailed) + + + def fetchArticle(self, index = ''): + """ + Get the complete article with the specified index (or the currently + selected article if index is '') or Message-ID from the server. + gotArticle() is called on success, getArticleFailed() on failure. + """ + self.sendLine('ARTICLE %s' % (index,)) + self._newState(self._stateArticle, self.getArticleFailed) + + + def postArticle(self, text): + """ + Attempt to post an article with the specified text to the server. 'text' + must consist of both head and body data, as specified by RFC 850. If the + article is posted successfully, postedOk() is called, otherwise postFailed() + is called. + """ + self.sendLine('POST') + self._newState(None, self.postFailed, self._headerPost) + self._postText.append(text) + + + def fetchNewNews(self, groups, date, distributions = ''): + """ + Get the Message-IDs for all new news posted to any of the given + groups since the specified date - in seconds since the epoch, GMT - + optionally restricted to the given distributions. gotNewNews() is + called on success, getNewNewsFailed() on failure. + + One invocation of this function may result in multiple invocations + of gotNewNews()/getNewNewsFailed(). + """ + date, timeStr = time.strftime('%y%m%d %H%M%S', time.gmtime(date)).split() + line = 'NEWNEWS %%s %s %s %s' % (date, timeStr, distributions) + groupPart = '' + while len(groups) and len(line) + len(groupPart) + len(groups[-1]) + 1 < NNTPClient.MAX_COMMAND_LENGTH: + group = groups.pop() + groupPart = groupPart + ',' + group + + self.sendLine(line % (groupPart,)) + self._newState(self._stateNewNews, self.getNewNewsFailed) + + if len(groups): + self.fetchNewNews(groups, date, distributions) + + + def fetchNewGroups(self, date, distributions): + """ + Get the names of all new groups created/added to the server since + the specified date - in seconds since the ecpoh, GMT - optionally + restricted to the given distributions. gotNewGroups() is called + on success, getNewGroupsFailed() on failure. + """ + date, timeStr = time.strftime('%y%m%d %H%M%S', time.gmtime(date)).split() + self.sendLine('NEWGROUPS %s %s %s' % (date, timeStr, distributions)) + self._newState(self._stateNewGroups, self.getNewGroupsFailed) + + + def fetchXHeader(self, header, low = None, high = None, id = None): + """ + Request a specific header from the server for an article or range + of articles. If 'id' is not None, a header for only the article + with that Message-ID will be requested. If both low and high are + None, a header for the currently selected article will be selected; + If both low and high are zero-length strings, headers for all articles + in the currently selected group will be requested; Otherwise, high + and low will be used as bounds - if one is None the first or last + article index will be substituted, as appropriate. + """ + if id is not None: + r = header + ' <%s>' % (id,) + elif low is high is None: + r = header + elif high is None: + r = header + ' %d-' % (low,) + elif low is None: + r = header + ' -%d' % (high,) + else: + r = header + ' %d-%d' % (low, high) + self.sendLine('XHDR ' + r) + self._newState(self._stateXHDR, self.getXHeaderFailed) + + + def setStream(self): + """ + Set the mode to STREAM, suspending the normal "lock-step" mode of + communications. setStreamSuccess() is called on success, + setStreamFailed() on failure. + """ + self.sendLine('MODE STREAM') + self._newState(None, self.setStreamFailed, self._headerMode) + + + def quit(self): + self.sendLine('QUIT') + self.transport.loseConnection() + + + def _newState(self, method, error, responseHandler = None): + self._inputBuffers.append([]) + self._responseCodes.append(None) + self._state.append(method) + self._error.append(error) + self._responseHandlers.append(responseHandler) + + + def _endState(self): + buf = self._inputBuffers[0] + del self._responseCodes[0] + del self._inputBuffers[0] + del self._state[0] + del self._error[0] + del self._responseHandlers[0] + return buf + + + def _newLine(self, line, check = 1): + if check and line and line[0] == '.': + line = line[1:] + self._inputBuffers[0].append(line) + + + def _setResponseCode(self, code): + self._responseCodes[0] = code + + + def _getResponseCode(self): + return self._responseCodes[0] + + + def lineReceived(self, line): + if not len(self._state): + self._statePassive(line) + elif self._getResponseCode() is None: + code = extractCode(line) + if code is None or not (200 <= code[0] < 400): # An error! + self._error[0](line) + self._endState() + else: + self._setResponseCode(code) + if self._responseHandlers[0]: + self._responseHandlers[0](code) + else: + self._state[0](line) + + + def _statePassive(self, line): + log.msg('Server said: %s' % line) + + + def _passiveError(self, error): + log.err('Passive Error: %s' % (error,)) + + + def _headerInitial(self, response): + (code, message) = response + if code == 200: + self.canPost = 1 + else: + self.canPost = 0 + self._endState() + + + def _stateList(self, line): + if line != '.': + data = filter(None, line.strip().split()) + self._newLine((data[0], int(data[1]), int(data[2]), data[3]), 0) + else: + self.gotAllGroups(self._endState()) + + + def _stateOverview(self, line): + if line != '.': + self._newLine(filter(None, line.strip().split()), 0) + else: + self.gotOverview(self._endState()) + + + def _stateSubscriptions(self, line): + if line != '.': + self._newLine(line.strip(), 0) + else: + self.gotSubscriptions(self._endState()) + + + def _headerGroup(self, response): + (code, line) = response + self.gotGroup(tuple(line.split())) + self._endState() + + + def _stateArticle(self, line): + if line != '.': + if line.startswith('.'): + line = line[1:] + self._newLine(line, 0) + else: + self.gotArticle('\n'.join(self._endState())+'\n') + + + def _stateHead(self, line): + if line != '.': + self._newLine(line, 0) + else: + self.gotHead('\n'.join(self._endState())) + + + def _stateBody(self, line): + if line != '.': + if line.startswith('.'): + line = line[1:] + self._newLine(line, 0) + else: + self.gotBody('\n'.join(self._endState())+'\n') + + + def _headerPost(self, response): + (code, message) = response + if code == 340: + self.transport.write(self._postText[0].replace('\n', '\r\n').replace('\r\n.', '\r\n..')) + if self._postText[0][-1:] != '\n': + self.sendLine('') + self.sendLine('.') + del self._postText[0] + self._newState(None, self.postFailed, self._headerPosted) + else: + self.postFailed('%d %s' % (code, message)) + self._endState() + + + def _headerPosted(self, response): + (code, message) = response + if code == 240: + self.postedOk() + else: + self.postFailed('%d %s' % (code, message)) + self._endState() + + + def _stateXHDR(self, line): + if line != '.': + self._newLine(line.split(), 0) + else: + self._gotXHeader(self._endState()) + + + def _stateNewNews(self, line): + if line != '.': + self._newLine(line, 0) + else: + self.gotNewNews(self._endState()) + + + def _stateNewGroups(self, line): + if line != '.': + self._newLine(line, 0) + else: + self.gotNewGroups(self._endState()) + + + def _headerMode(self, response): + (code, message) = response + if code == 203: + self.setStreamSuccess() + else: + self.setStreamFailed((code, message)) + self._endState() + + +class NNTPServer(basic.LineReceiver): + COMMANDS = [ + 'LIST', 'GROUP', 'ARTICLE', 'STAT', 'MODE', 'LISTGROUP', 'XOVER', + 'XHDR', 'HEAD', 'BODY', 'NEXT', 'LAST', 'POST', 'QUIT', 'IHAVE', + 'HELP', 'SLAVE', 'XPATH', 'XINDEX', 'XROVER', 'TAKETHIS', 'CHECK' + ] + + def __init__(self): + self.servingSlave = 0 + + + def connectionMade(self): + self.inputHandler = None + self.currentGroup = None + self.currentIndex = None + self.sendLine('200 server ready - posting allowed') + + def lineReceived(self, line): + if self.inputHandler is not None: + self.inputHandler(line) + else: + parts = line.strip().split() + if len(parts): + cmd, parts = parts[0].upper(), parts[1:] + if cmd in NNTPServer.COMMANDS: + func = getattr(self, 'do_%s' % cmd) + try: + func(*parts) + except TypeError: + self.sendLine('501 command syntax error') + log.msg("501 command syntax error") + log.msg("command was", line) + log.deferr() + except: + self.sendLine('503 program fault - command not performed') + log.msg("503 program fault") + log.msg("command was", line) + log.deferr() + else: + self.sendLine('500 command not recognized') + + + def do_LIST(self, subcmd = '', *dummy): + subcmd = subcmd.strip().lower() + if subcmd == 'newsgroups': + # XXX - this could use a real implementation, eh? + self.sendLine('215 Descriptions in form "group description"') + self.sendLine('.') + elif subcmd == 'overview.fmt': + defer = self.factory.backend.overviewRequest() + defer.addCallbacks(self._gotOverview, self._errOverview) + log.msg('overview') + elif subcmd == 'subscriptions': + defer = self.factory.backend.subscriptionRequest() + defer.addCallbacks(self._gotSubscription, self._errSubscription) + log.msg('subscriptions') + elif subcmd == '': + defer = self.factory.backend.listRequest() + defer.addCallbacks(self._gotList, self._errList) + else: + self.sendLine('500 command not recognized') + + + def _gotList(self, list): + self.sendLine('215 newsgroups in form "group high low flags"') + for i in list: + self.sendLine('%s %d %d %s' % tuple(i)) + self.sendLine('.') + + + def _errList(self, failure): + print('LIST failed: ', failure) + self.sendLine('503 program fault - command not performed') + + + def _gotSubscription(self, parts): + self.sendLine('215 information follows') + for i in parts: + self.sendLine(i) + self.sendLine('.') + + + def _errSubscription(self, failure): + print('SUBSCRIPTIONS failed: ', failure) + self.sendLine('503 program fault - comand not performed') + + + def _gotOverview(self, parts): + self.sendLine('215 Order of fields in overview database.') + for i in parts: + self.sendLine(i + ':') + self.sendLine('.') + + + def _errOverview(self, failure): + print('LIST OVERVIEW.FMT failed: ', failure) + self.sendLine('503 program fault - command not performed') + + + def do_LISTGROUP(self, group = None): + group = group or self.currentGroup + if group is None: + self.sendLine('412 Not currently in newsgroup') + else: + defer = self.factory.backend.listGroupRequest(group) + defer.addCallbacks(self._gotListGroup, self._errListGroup) + + + def _gotListGroup(self, result): + (group, articles) = result + self.currentGroup = group + if len(articles): + self.currentIndex = int(articles[0]) + else: + self.currentIndex = None + + self.sendLine('211 list of article numbers follow') + for i in articles: + self.sendLine(str(i)) + self.sendLine('.') + + + def _errListGroup(self, failure): + print('LISTGROUP failed: ', failure) + self.sendLine('502 no permission') + + + def do_XOVER(self, range): + if self.currentGroup is None: + self.sendLine('412 No news group currently selected') + else: + l, h = parseRange(range) + defer = self.factory.backend.xoverRequest(self.currentGroup, l, h) + defer.addCallbacks(self._gotXOver, self._errXOver) + + + def _gotXOver(self, parts): + self.sendLine('224 Overview information follows') + for i in parts: + self.sendLine('\t'.join(map(str, i))) + self.sendLine('.') + + + def _errXOver(self, failure): + print('XOVER failed: ', failure) + self.sendLine('420 No article(s) selected') + + + def xhdrWork(self, header, range): + if self.currentGroup is None: + self.sendLine('412 No news group currently selected') + else: + if range is None: + if self.currentIndex is None: + self.sendLine('420 No current article selected') + return + else: + l = h = self.currentIndex + else: + # FIXME: articles may be a message-id + l, h = parseRange(range) + + if l is h is None: + self.sendLine('430 no such article') + else: + return self.factory.backend.xhdrRequest(self.currentGroup, l, h, header) + + + def do_XHDR(self, header, range = None): + d = self.xhdrWork(header, range) + if d: + d.addCallbacks(self._gotXHDR, self._errXHDR) + + + def _gotXHDR(self, parts): + self.sendLine('221 Header follows') + for i in parts: + self.sendLine('%d %s' % i) + self.sendLine('.') + + def _errXHDR(self, failure): + print('XHDR failed: ', failure) + self.sendLine('502 no permission') + + + def do_POST(self): + self.inputHandler = self._doingPost + self.message = '' + self.sendLine('340 send article to be posted. End with <CR-LF>.<CR-LF>') + + + def _doingPost(self, line): + if line == '.': + self.inputHandler = None + article = self.message + self.message = '' + + defer = self.factory.backend.postRequest(article) + defer.addCallbacks(self._gotPost, self._errPost) + else: + self.message = self.message + line + '\r\n' + + + def _gotPost(self, parts): + self.sendLine('240 article posted ok') + + + def _errPost(self, failure): + print('POST failed: ', failure) + self.sendLine('441 posting failed') + + + def do_CHECK(self, id): + d = self.factory.backend.articleExistsRequest(id) + d.addCallbacks(self._gotCheck, self._errCheck) + + + def _gotCheck(self, result): + if result: + self.sendLine("438 already have it, please don't send it to me") + else: + self.sendLine('238 no such article found, please send it to me') + + + def _errCheck(self, failure): + print('CHECK failed: ', failure) + self.sendLine('431 try sending it again later') + + + def do_TAKETHIS(self, id): + self.inputHandler = self._doingTakeThis + self.message = '' + + + def _doingTakeThis(self, line): + if line == '.': + self.inputHandler = None + article = self.message + self.message = '' + d = self.factory.backend.postRequest(article) + d.addCallbacks(self._didTakeThis, self._errTakeThis) + else: + self.message = self.message + line + '\r\n' + + + def _didTakeThis(self, result): + self.sendLine('239 article transferred ok') + + + def _errTakeThis(self, failure): + print('TAKETHIS failed: ', failure) + self.sendLine('439 article transfer failed') + + + def do_GROUP(self, group): + defer = self.factory.backend.groupRequest(group) + defer.addCallbacks(self._gotGroup, self._errGroup) + + + def _gotGroup(self, result): + (name, num, high, low, flags) = result + self.currentGroup = name + self.currentIndex = low + self.sendLine('211 %d %d %d %s group selected' % (num, low, high, name)) + + + def _errGroup(self, failure): + print('GROUP failed: ', failure) + self.sendLine('411 no such group') + + + def articleWork(self, article, cmd, func): + if self.currentGroup is None: + self.sendLine('412 no newsgroup has been selected') + else: + if not article: + if self.currentIndex is None: + self.sendLine('420 no current article has been selected') + else: + article = self.currentIndex + else: + if article[0] == '<': + return func(self.currentGroup, index = None, id = article) + else: + try: + article = int(article) + return func(self.currentGroup, article) + except ValueError: + self.sendLine('501 command syntax error') + + + def do_ARTICLE(self, article = None): + defer = self.articleWork(article, 'ARTICLE', self.factory.backend.articleRequest) + if defer: + defer.addCallbacks(self._gotArticle, self._errArticle) + + + def _gotArticle(self, result): + (index, id, article) = result + self.currentIndex = index + self.sendLine('220 %d %s article' % (index, id)) + s = basic.FileSender() + d = s.beginFileTransfer(article, self.transport) + d.addCallback(self.finishedFileTransfer) + + ## + ## Helper for FileSender + ## + def finishedFileTransfer(self, lastsent): + if lastsent != '\n': + line = '\r\n.' + else: + line = '.' + self.sendLine(line) + ## + + def _errArticle(self, failure): + print('ARTICLE failed: ', failure) + self.sendLine('423 bad article number') + + + def do_STAT(self, article = None): + defer = self.articleWork(article, 'STAT', self.factory.backend.articleRequest) + if defer: + defer.addCallbacks(self._gotStat, self._errStat) + + + def _gotStat(self, result): + (index, id, article) = result + self.currentIndex = index + self.sendLine('223 %d %s article retreived - request text separately' % (index, id)) + + + def _errStat(self, failure): + print('STAT failed: ', failure) + self.sendLine('423 bad article number') + + + def do_HEAD(self, article = None): + defer = self.articleWork(article, 'HEAD', self.factory.backend.headRequest) + if defer: + defer.addCallbacks(self._gotHead, self._errHead) + + + def _gotHead(self, result): + (index, id, head) = result + self.currentIndex = index + self.sendLine('221 %d %s article retrieved' % (index, id)) + self.transport.write(head + '\r\n') + self.sendLine('.') + + + def _errHead(self, failure): + print('HEAD failed: ', failure) + self.sendLine('423 no such article number in this group') + + + def do_BODY(self, article): + defer = self.articleWork(article, 'BODY', self.factory.backend.bodyRequest) + if defer: + defer.addCallbacks(self._gotBody, self._errBody) + + + def _gotBody(self, result): + (index, id, body) = result + self.currentIndex = index + self.sendLine('221 %d %s article retrieved' % (index, id)) + self.lastsent = '' + s = basic.FileSender() + d = s.beginFileTransfer(body, self.transport) + d.addCallback(self.finishedFileTransfer) + + def _errBody(self, failure): + print('BODY failed: ', failure) + self.sendLine('423 no such article number in this group') + + + # NEXT and LAST are just STATs that increment currentIndex first. + # Accordingly, use the STAT callbacks. + def do_NEXT(self): + i = self.currentIndex + 1 + defer = self.factory.backend.articleRequest(self.currentGroup, i) + defer.addCallbacks(self._gotStat, self._errStat) + + + def do_LAST(self): + i = self.currentIndex - 1 + defer = self.factory.backend.articleRequest(self.currentGroup, i) + defer.addCallbacks(self._gotStat, self._errStat) + + + def do_MODE(self, cmd): + cmd = cmd.strip().upper() + if cmd == 'READER': + self.servingSlave = 0 + self.sendLine('200 Hello, you can post') + elif cmd == 'STREAM': + self.sendLine('500 Command not understood') + else: + # This is not a mistake + self.sendLine('500 Command not understood') + + + def do_QUIT(self): + self.sendLine('205 goodbye') + self.transport.loseConnection() + + + def do_HELP(self): + self.sendLine('100 help text follows') + self.sendLine('Read the RFC.') + self.sendLine('.') + + + def do_SLAVE(self): + self.sendLine('202 slave status noted') + self.servingeSlave = 1 + + + def do_XPATH(self, article): + # XPATH is a silly thing to have. No client has the right to ask + # for this piece of information from me, and so that is what I'll + # tell them. + self.sendLine('502 access restriction or permission denied') + + + def do_XINDEX(self, article): + # XINDEX is another silly command. The RFC suggests it be relegated + # to the history books, and who am I to disagree? + self.sendLine('502 access restriction or permission denied') + + + def do_XROVER(self, range=None): + """ + Handle a request for references of all messages in the currently + selected group. + + This generates the same response a I{XHDR References} request would + generate. + """ + self.do_XHDR('References', range) + + + def do_IHAVE(self, id): + self.factory.backend.articleExistsRequest(id).addCallback(self._foundArticle) + + + def _foundArticle(self, result): + if result: + self.sendLine('437 article rejected - do not try again') + else: + self.sendLine('335 send article to be transferred. End with <CR-LF>.<CR-LF>') + self.inputHandler = self._handleIHAVE + self.message = '' + + + def _handleIHAVE(self, line): + if line == '.': + self.inputHandler = None + self.factory.backend.postRequest( + self.message + ).addCallbacks(self._gotIHAVE, self._errIHAVE) + + self.message = '' + else: + self.message = self.message + line + '\r\n' + + + def _gotIHAVE(self, result): + self.sendLine('235 article transferred ok') + + + def _errIHAVE(self, failure): + print('IHAVE failed: ', failure) + self.sendLine('436 transfer failed - try again later') + + +class UsenetClientProtocol(NNTPClient): + """ + A client that connects to an NNTP server and asks for articles new + since a certain time. + """ + + def __init__(self, groups, date, storage): + """ + Fetch all new articles from the given groups since the + given date and dump them into the given storage. groups + is a list of group names. date is an integer or floating + point representing seconds since the epoch (GMT). storage is + any object that implements the NewsStorage interface. + """ + NNTPClient.__init__(self) + self.groups, self.date, self.storage = groups, date, storage + + + def connectionMade(self): + NNTPClient.connectionMade(self) + log.msg("Initiating update with remote host: " + str(self.transport.getPeer())) + self.setStream() + self.fetchNewNews(self.groups, self.date, '') + + + def articleExists(self, exists, article): + if exists: + self.fetchArticle(article) + else: + self.count = self.count - 1 + self.disregard = self.disregard + 1 + + + def gotNewNews(self, news): + self.disregard = 0 + self.count = len(news) + log.msg("Transferring " + str(self.count) + + " articles from remote host: " + str(self.transport.getPeer())) + for i in news: + self.storage.articleExistsRequest(i).addCallback(self.articleExists, i) + + + def getNewNewsFailed(self, reason): + log.msg("Updated failed (" + reason + ") with remote host: " + str(self.transport.getPeer())) + self.quit() + + + def gotArticle(self, article): + self.storage.postRequest(article) + self.count = self.count - 1 + if not self.count: + log.msg("Completed update with remote host: " + str(self.transport.getPeer())) + if self.disregard: + log.msg("Disregarded %d articles." % (self.disregard,)) + self.factory.updateChecks(self.transport.getPeer()) + self.quit() diff --git a/contrib/python/Twisted/py2/twisted/news/tap.py b/contrib/python/Twisted/py2/twisted/news/tap.py new file mode 100644 index 0000000000..d6b8293414 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/news/tap.py @@ -0,0 +1,143 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + + +from __future__ import print_function + +from twisted.news import news, database +from twisted.application import strports +from twisted.python import usage, log + +class DBOptions(usage.Options): + optParameters = [ + ['module', None, 'pyPgSQL.PgSQL', "DB-API 2.0 module to use"], + ['dbhost', None, 'localhost', "Host where database manager is listening"], + ['dbuser', None, 'news', "Username with which to connect to database"], + ['database', None, 'news', "Database name to use"], + ['schema', None, 'schema.sql', "File to which to write SQL schema initialisation"], + + # XXX - Hrm. + ["groups", "g", "groups.list", "File containing group list"], + ["servers", "s", "servers.list", "File containing server list"] + ] + + def postOptions(self): + # XXX - Hmmm. + with open(self['groups']) as f: + self['groups'] = [g.strip() for g in f.readlines() if not g.startswith('#')] + with open(self['servers']) as f: + self['servers'] = [s.strip() for s in f.readlines() if not s.startswith('#')] + + try: + __import__(self['module']) + except ImportError: + log.msg("Warning: Cannot import %s" % (self['module'],)) + + with open(self['schema'], 'w') as f: + f.write( + database.NewsStorageAugmentation.schema + '\n' + + database.makeGroupSQL(self['groups']) + '\n' + + database.makeOverviewSQL() + ) + + info = { + 'host': self['dbhost'], 'user': self['dbuser'], + 'database': self['database'], 'dbapiName': self['module'] + } + self.db = database.NewsStorageAugmentation(info) + + +class PickleOptions(usage.Options): + optParameters = [ + ['file', None, 'news.pickle', "File to which to save pickle"], + + # XXX - Hrm. + ["groups", "g", "groups.list", "File containing group list"], + ["servers", "s", "servers.list", "File containing server list"], + ["moderators", "m", "moderators.list", + "File containing moderators list"], + ] + + subCommands = None + + def postOptions(self): + # XXX - Hmmm. + filename = self['file'] + with open(self['groups']) as f: + self['groups'] = [g.strip() for g in f.readlines() + if not g.startswith('#')] + with open(self['servers']) as f: + self['servers'] = [s.strip() for s in f.readlines() + if not s.startswith('#')] + with open(self['moderators']) as f: + self['moderators'] = [s.split() for s in f.readlines() + if not s.startswith('#')] + self.db = database.PickleStorage(filename, self['groups'], + self['moderators']) + + +class Options(usage.Options): + synopsis = "[options]" + + groups = None + servers = None + subscriptions = None + + optParameters = [ + ["port", "p", "119", "Listen port"], + ["interface", "i", "", "Interface to which to bind"], + ["datadir", "d", "news.db", "Root data storage path"], + ["mailhost", "m", "localhost", "Host of SMTP server to use"] + ] + compData = usage.Completions( + optActions={"datadir" : usage.CompleteDirs(), + "mailhost" : usage.CompleteHostnames(), + "interface" : usage.CompleteNetInterfaces()} + ) + + def __init__(self): + usage.Options.__init__(self) + self.groups = [] + self.servers = [] + self.subscriptions = [] + + + def opt_group(self, group): + """The name of a newsgroup to carry.""" + self.groups.append([group, None]) + + + def opt_moderator(self, moderator): + """The email of the moderator for the most recently passed group.""" + self.groups[-1][1] = moderator + + + def opt_subscription(self, group): + """A newsgroup to list as a recommended subscription.""" + self.subscriptions.append(group) + + + def opt_server(self, server): + """The address of a Usenet server to pass messages to and receive messages from.""" + self.servers.append(server) + + +def makeService(config): + if not len(config.groups): + raise usage.UsageError("No newsgroups specified") + + db = database.NewsShelf(config['mailhost'], config['datadir']) + for (g, m) in config.groups: + if m: + db.addGroup(g, 'm') + db.addModerator(g, m) + else: + db.addGroup(g, 'y') + for s in config.subscriptions: + print(s) + db.addSubscription(s) + s = config['port'] + if config['interface']: + # Add a warning here + s += ':interface='+config['interface'] + return strports.service(s, news.UsenetServerFactory(db, config.servers)) |